| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162 | package clusterimport (	"time"	"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine")const (	// master	// the cluster master is responsible for managing garbage collection for both nodes and plugins.	// typically, each node handles the garbage collection for its own plugins	// However, if a node becomes inactive, the master takes over this task.	// every node has an equal chance of becoming the master.	// once a node is selected as the master, it is locked in that role.	// If the master node becomes inactive, the master slot is released, allowing other nodes to attempt to take over the role.	MASTER_LOCKING_INTERVAL  = time.Millisecond * 500 // interval to try to lock the slot to be the master	MASTER_LOCK_EXPIRED_TIME = time.Second * 2        // expired time of master key	MASTER_GC_INTERVAL       = time.Second * 10       // interval to do garbage collection of nodes has already deactivated	// node	// To determine the available IPs of the nodes, each node will vote for the IPs of other nodes.	// this voting process will occur every $NODE_VOTE_INTERVAL.	// simultaneously, all nodes will synchronize to the latest status in memory every $UPDATE_NODE_STATUS_INTERVAL.	// each node will also update its own status to remain active. If a node becomes inactive, it will be removed from the cluster.	NODE_VOTE_INTERVAL          = time.Second * 30 // interval to vote the ips of the nodes	UPDATE_NODE_STATUS_INTERVAL = time.Second * 5  // interval to update the status of the node	NODE_DISCONNECTED_TIMEOUT   = time.Second * 10 // once a node is no longer active, it will be removed from the cluster	// plugin scheduler	// each node will schedule its plugins every $PLUGIN_SCHEDULER_INTERVAL time	// and schedule process will be triggered every $PLUGIN_SCHEDULER_TICKER_INTERVAL time	// not all the plugins will be scheduled every time, only the plugins that are not scheduled in $PLUGIN_SCHEDULER_INTERVAL time will be scheduled	// and the plugins that are not active will be removed from the cluster	PLUGIN_SCHEDULER_TICKER_INTERVAL = time.Second * 3  // interval to schedule the plugins	PLUGIN_SCHEDULER_INTERVAL        = time.Second * 10 // interval to schedule the plugins	PLUGIN_DEACTIVATED_TIMEOUT       = time.Second * 30 // once a plugin is no longer active, it will be removed from the cluster)const (	CLUSTER_NEW_NODE_CHANNEL = "cluster-new-node-channel")// lifetime of the clusterfunc (c *Cluster) clusterLifetime() {	defer func() {		if err := c.removeSelfNode(); err != nil {			log.Error("failed to remove the self node from the cluster: %s", err.Error())		}		c.notifyClusterStopped()		close(c.notifyClusterStoppedChan)		close(c.notifyBecomeMasterChan)		close(c.notifyMasterGcChan)		close(c.notifyMasterGcCompletedChan)		close(c.notifyVotingChan)		close(c.notifyVotingCompletedChan)		close(c.notifyPluginScheduleChan)		close(c.notifyPluginScheduleCompletedChan)		close(c.notifyNodeUpdateChan)		close(c.notifyNodeUpdateCompletedChan)	}()	tickerLockMaster := time.NewTicker(c.masterLockingInterval)	defer tickerLockMaster.Stop()	tickerUpdateNodeStatus := time.NewTicker(c.updateNodeStatusInterval)	defer tickerUpdateNodeStatus.Stop()	masterGcTicker := time.NewTicker(c.masterGcInterval)	defer masterGcTicker.Stop()	nodeVoteTicker := time.NewTicker(c.nodeVoteInterval)	defer nodeVoteTicker.Stop()	pluginSchedulerTicker := time.NewTicker(c.pluginSchedulerTickerInterval)	defer pluginSchedulerTicker.Stop()	// vote for all ips and find the best one, prepare for later traffic scheduling	routine.Submit(map[string]string{		"module":   "cluster",		"function": "voteAddressesWhenInit",	}, func() {		if err := c.updateNodeStatus(); err != nil {			log.Error("failed to update the status of the node: %s", err.Error())		}		if err := cache.Publish(CLUSTER_NEW_NODE_CHANNEL, newNodeEvent{			NodeID: c.id,		}); err != nil {			log.Error("failed to publish the new node event: %s", err.Error())		}		if err := c.voteAddresses(); err != nil {			log.Error("failed to vote the ips of the nodes: %s", err.Error())		}	})	newNodeChan, cancel := cache.Subscribe[newNodeEvent](CLUSTER_NEW_NODE_CHANNEL)	defer cancel()	for {		select {		case <-tickerLockMaster.C:			if !c.iAmMaster {				// try lock the slot				if success, err := c.lockMaster(); err != nil {					log.Error("failed to lock the slot to be the master of the cluster: %s", err.Error())				} else if success {					c.iAmMaster = true					log.Info("current node has become the master of the cluster")					c.notifyBecomeMaster()				} else {					if c.iAmMaster {						c.iAmMaster = false						log.Info("current node has released the master slot")					}				}			} else {				// update the master				if err := c.updateMaster(); err != nil {					log.Error("failed to update the master: %s", err.Error())				}			}		case <-tickerUpdateNodeStatus.C:			if err := c.updateNodeStatus(); err != nil {				log.Error("failed to update the status of the node: %s", err.Error())			}		case <-masterGcTicker.C:			if c.iAmMaster {				c.notifyMasterGC()				if err := c.autoGCNodes(); err != nil {					log.Error("failed to gc the nodes have already deactivated: %s", err.Error())				}				if err := c.autoGCPlugins(); err != nil {					log.Error("failed to gc the plugins have already stopped: %s", err.Error())				}				c.notifyMasterGCCompleted()			}		case <-nodeVoteTicker.C:			if err := c.voteAddresses(); err != nil {				log.Error("failed to vote the ips of the nodes: %s", err.Error())			}		case _, ok := <-newNodeChan:			if ok {				// vote for the new node				if err := c.voteAddresses(); err != nil {					log.Error("failed to vote the ips of the nodes: %s", err.Error())				}			}		case <-pluginSchedulerTicker.C:			if err := c.schedulePlugins(); err != nil {				log.Error("failed to schedule the plugins: %s", err.Error())			}		case <-c.stopChan:			return		}	}}
 |