| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281 | package clusterimport (	"sync"	"sync/atomic"	"github.com/google/uuid"	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"	"github.com/langgenius/dify-plugin-daemon/internal/types/app"	"github.com/langgenius/dify-plugin-daemon/internal/utils/mapping")type Cluster struct {	// id is the unique id of the cluster	id string	// i_am_master is the flag to indicate whether the current node is the master node	i_am_master bool	// main http port of the current node	port uint16	// plugins stores all the plugin life time of the current node	plugins     mapping.Map[string, *pluginLifeTime]	plugin_lock sync.RWMutex	manager *plugin_manager.PluginManager	// nodes stores all the nodes of the cluster	nodes mapping.Map[string, node]	// signals for waiting for the cluster to stop	stop_chan chan bool	stopped   int32	is_in_auto_gc_nodes   int32	is_in_auto_gc_plugins int32	// channels to notify cluster event	notify_become_master_chan             chan bool	notify_master_gc_chan                 chan bool	notify_master_gc_completed_chan       chan bool	notify_voting_chan                    chan bool	notify_voting_completed_chan          chan bool	notify_plugin_schedule_chan           chan bool	notify_plugin_schedule_completed_chan chan bool	notify_node_update_chan               chan bool	notify_node_update_completed_chan     chan bool	notify_cluster_stopped_chan           chan bool}func NewCluster(config *app.Config, plugin_manager *plugin_manager.PluginManager) *Cluster {	return &Cluster{		id:        uuid.New().String(),		port:      uint16(config.ServerPort),		stop_chan: make(chan bool),		manager: plugin_manager,		notify_become_master_chan:             make(chan bool),		notify_master_gc_chan:                 make(chan bool),		notify_master_gc_completed_chan:       make(chan bool),		notify_voting_chan:                    make(chan bool),		notify_voting_completed_chan:          make(chan bool),		notify_plugin_schedule_chan:           make(chan bool),		notify_plugin_schedule_completed_chan: make(chan bool),		notify_node_update_chan:               make(chan bool),		notify_node_update_completed_chan:     make(chan bool),		notify_cluster_stopped_chan:           make(chan bool),	}}func (c *Cluster) Launch() {	go c.clusterLifetime()}func (c *Cluster) Close() error {	if atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {		close(c.stop_chan)	}	return nil}func (c *Cluster) ID() string {	return c.id}// trigger for master eventfunc (c *Cluster) notifyBecomeMaster() {	if atomic.LoadInt32(&c.stopped) == 1 {		return	}	select {	case c.notify_become_master_chan <- true:	default:	}}// receive the master eventfunc (c *Cluster) NotifyBecomeMaster() <-chan bool {	if atomic.LoadInt32(&c.stopped) == 1 {		return nil	}	return c.notify_become_master_chan}// trigger for master gc eventfunc (c *Cluster) notifyMasterGC() {	if atomic.LoadInt32(&c.stopped) == 1 {		return	}	select {	case c.notify_master_gc_chan <- true:	default:	}}// trigger for master gc completed eventfunc (c *Cluster) notifyMasterGCCompleted() {	if atomic.LoadInt32(&c.stopped) == 1 {		return	}	select {	case c.notify_master_gc_completed_chan <- true:	default:	}}// trigger for voting eventfunc (c *Cluster) notifyVoting() {	if atomic.LoadInt32(&c.stopped) == 1 {		return	}	select {	case c.notify_voting_chan <- true:	default:	}}// trigger for voting completed eventfunc (c *Cluster) notifyVotingCompleted() {	if atomic.LoadInt32(&c.stopped) == 1 {		return	}	select {	case c.notify_voting_completed_chan <- true:	default:	}}// trigger for plugin schedule eventfunc (c *Cluster) notifyPluginSchedule() {	if atomic.LoadInt32(&c.stopped) == 1 {		return	}	select {	case c.notify_plugin_schedule_chan <- true:	default:	}}// trigger for plugin schedule completed eventfunc (c *Cluster) notifyPluginScheduleCompleted() {	if atomic.LoadInt32(&c.stopped) == 1 {		return	}	select {	case c.notify_plugin_schedule_completed_chan <- true:	default:	}}// trigger for node update eventfunc (c *Cluster) notifyNodeUpdate() {	if atomic.LoadInt32(&c.stopped) == 1 {		return	}	select {	case c.notify_node_update_chan <- true:	default:	}}// trigger for node update completed eventfunc (c *Cluster) notifyNodeUpdateCompleted() {	if atomic.LoadInt32(&c.stopped) == 1 {		return	}	select {	case c.notify_node_update_completed_chan <- true:	default:	}}// trigger for cluster stopped eventfunc (c *Cluster) notifyClusterStopped() {	select {	case c.notify_cluster_stopped_chan <- true:	default:	}}// receive the master gc eventfunc (c *Cluster) NotifyMasterGC() <-chan bool {	if atomic.LoadInt32(&c.stopped) == 1 {		return nil	}	return c.notify_master_gc_chan}// receive the master gc completed eventfunc (c *Cluster) NotifyMasterGCCompleted() <-chan bool {	if atomic.LoadInt32(&c.stopped) == 1 {		return nil	}	return c.notify_master_gc_completed_chan}// receive the voting eventfunc (c *Cluster) NotifyVoting() <-chan bool {	if atomic.LoadInt32(&c.stopped) == 1 {		return nil	}	return c.notify_voting_chan}// receive the voting completed eventfunc (c *Cluster) NotifyVotingCompleted() <-chan bool {	if atomic.LoadInt32(&c.stopped) == 1 {		return nil	}	return c.notify_voting_completed_chan}// receive the plugin schedule eventfunc (c *Cluster) NotifyPluginSchedule() <-chan bool {	if atomic.LoadInt32(&c.stopped) == 1 {		return nil	}	return c.notify_plugin_schedule_chan}// receive the plugin schedule completed eventfunc (c *Cluster) NotifyPluginScheduleCompleted() <-chan bool {	if atomic.LoadInt32(&c.stopped) == 1 {		return nil	}	return c.notify_plugin_schedule_completed_chan}// receive the node update eventfunc (c *Cluster) NotifyNodeUpdate() <-chan bool {	if atomic.LoadInt32(&c.stopped) == 1 {		return nil	}	return c.notify_node_update_chan}// receive the node update completed eventfunc (c *Cluster) NotifyNodeUpdateCompleted() <-chan bool {	if atomic.LoadInt32(&c.stopped) == 1 {		return nil	}	return c.notify_node_update_completed_chan}// receive the cluster stopped eventfunc (c *Cluster) NotifyClusterStopped() <-chan bool {	return c.notify_cluster_stopped_chan}
 |