| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159 | package clusterimport (	"time"	"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine")const (	// master	// the cluster master is responsible for managing garbage collection for both nodes and plugins.	// typically, each node handles the garbage collection for its own plugins	// However, if a node becomes inactive, the master takes over this task.	// every node has an equal chance of becoming the master.	// once a node is selected as the master, it is locked in that role.	// If the master node becomes inactive, the master slot is released, allowing other nodes to attempt to take over the role.	MASTER_LOCKING_INTERVAL  = time.Millisecond * 500 // interval to try to lock the slot to be the master	MASTER_LOCK_EXPIRED_TIME = time.Second * 2        // expired time of master key	MASTER_GC_INTERVAL       = time.Second * 10       // interval to do garbage collection of nodes has already deactivated	// node	// To determine the available IPs of the nodes, each node will vote for the IPs of other nodes.	// this voting process will occur every $NODE_VOTE_INTERVAL.	// simultaneously, all nodes will synchronize to the latest status in memory every $UPDATE_NODE_STATUS_INTERVAL.	// each node will also update its own status to remain active. If a node becomes inactive, it will be removed from the cluster.	NODE_VOTE_INTERVAL          = time.Second * 30 // interval to vote the ips of the nodes	UPDATE_NODE_STATUS_INTERVAL = time.Second * 5  // interval to update the status of the node	NODE_DISCONNECTED_TIMEOUT   = time.Second * 10 // once a node is no longer active, it will be removed from the cluster	// plugin scheduler	// each node will schedule its plugins every $PLUGIN_SCHEDULER_INTERVAL time	// and schedule process will be triggered every $PLUGIN_SCHEDULER_TICKER_INTERVAL time	// not all the plugins will be scheduled every time, only the plugins that are not scheduled in $PLUGIN_SCHEDULER_INTERVAL time will be scheduled	// and the plugins that are not active will be removed from the cluster	PLUGIN_SCHEDULER_TICKER_INTERVAL = time.Second * 3  // interval to schedule the plugins	PLUGIN_SCHEDULER_INTERVAL        = time.Second * 10 // interval to schedule the plugins	PLUGIN_DEACTIVATED_TIMEOUT       = time.Second * 30 // once a plugin is no longer active, it will be removed from the cluster)const (	CLUSTER_NEW_NODE_CHANNEL = "cluster-new-node-channel")// lifetime of the clusterfunc (c *Cluster) clusterLifetime() {	defer func() {		if err := c.removeSelfNode(); err != nil {			log.Error("failed to remove the self node from the cluster: %s", err.Error())		}		c.notifyClusterStopped()		close(c.notify_cluster_stopped_chan)		close(c.notify_become_master_chan)		close(c.notify_master_gc_chan)		close(c.notify_master_gc_completed_chan)		close(c.notify_voting_chan)		close(c.notify_voting_completed_chan)		close(c.notify_plugin_schedule_chan)		close(c.notify_plugin_schedule_completed_chan)		close(c.notify_node_update_chan)		close(c.notify_node_update_completed_chan)	}()	ticker_lock_master := time.NewTicker(MASTER_LOCKING_INTERVAL)	defer ticker_lock_master.Stop()	ticker_update_node_status := time.NewTicker(UPDATE_NODE_STATUS_INTERVAL)	defer ticker_update_node_status.Stop()	master_gc_ticker := time.NewTicker(MASTER_GC_INTERVAL)	defer master_gc_ticker.Stop()	node_vote_ticker := time.NewTicker(NODE_VOTE_INTERVAL)	defer node_vote_ticker.Stop()	plugin_scheduler_ticker := time.NewTicker(PLUGIN_SCHEDULER_INTERVAL)	defer plugin_scheduler_ticker.Stop()	// vote for all ips and find the best one, prepare for later traffic scheduling	routine.Submit(func() {		if err := c.updateNodeStatus(); err != nil {			log.Error("failed to update the status of the node: %s", err.Error())		}		if err := cache.Publish(CLUSTER_NEW_NODE_CHANNEL, newNodeEvent{			NodeID: c.id,		}); err != nil {			log.Error("failed to publish the new node event: %s", err.Error())		}		if err := c.voteIps(); err != nil {			log.Error("failed to vote the ips of the nodes: %s", err.Error())		}	})	new_node_chan, cancel := cache.Subscribe[newNodeEvent](CLUSTER_NEW_NODE_CHANNEL)	defer cancel()	for {		select {		case <-ticker_lock_master.C:			if !c.i_am_master {				// try lock the slot				if success, err := c.lockMaster(); err != nil {					log.Error("failed to lock the slot to be the master of the cluster: %s", err.Error())				} else if success {					c.i_am_master = true					log.Info("current node has become the master of the cluster")					c.notifyBecomeMaster()				} else {					if c.i_am_master {						c.i_am_master = false						log.Info("current node has released the master slot")					}				}			} else {				// update the master				if err := c.updateMaster(); err != nil {					log.Error("failed to update the master: %s", err.Error())				}			}		case <-ticker_update_node_status.C:			if err := c.updateNodeStatus(); err != nil {				log.Error("failed to update the status of the node: %s", err.Error())			}		case <-master_gc_ticker.C:			if c.i_am_master {				c.notifyMasterGC()				if err := c.autoGCNodes(); err != nil {					log.Error("failed to gc the nodes have already deactivated: %s", err.Error())				}				if err := c.autoGCPlugins(); err != nil {					log.Error("failed to gc the plugins have already stopped: %s", err.Error())				}				c.notifyMasterGCCompleted()			}		case <-node_vote_ticker.C:			if err := c.voteIps(); err != nil {				log.Error("failed to vote the ips of the nodes: %s", err.Error())			}		case _, ok := <-new_node_chan:			if ok {				// vote for the new node				if err := c.voteIps(); err != nil {					log.Error("failed to vote the ips of the nodes: %s", err.Error())				}			}		case <-plugin_scheduler_ticker.C:			if err := c.schedulePlugins(); err != nil {				log.Error("failed to schedule the plugins: %s", err.Error())			}		case <-c.stop_chan:			return		}	}}
 |