| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304 | package clusterimport (	"sync"	"sync/atomic"	"time"	"github.com/google/uuid"	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"	"github.com/langgenius/dify-plugin-daemon/internal/types/app"	"github.com/langgenius/dify-plugin-daemon/internal/utils/mapping")type Cluster struct {	// id is the unique id of the cluster	id string	// i_am_master is the flag to indicate whether the current node is the master node	iAmMaster bool	// main http port of the current node	port uint16	// plugins stores all the plugin life time of the current node	plugins    mapping.Map[string, *pluginLifeTime]	pluginLock sync.RWMutex	manager *plugin_manager.PluginManager	// nodes stores all the nodes of the cluster	nodes mapping.Map[string, node]	// signals for waiting for the cluster to stop	stopChan chan bool	stopped  int32	isInAutoGcNodes   int32	isInAutoGcPlugins int32	// channels to notify cluster event	notifyBecomeMasterChan            chan bool	notifyMasterGcChan                chan bool	notifyMasterGcCompletedChan       chan bool	notifyVotingChan                  chan bool	notifyVotingCompletedChan         chan bool	notifyPluginScheduleChan          chan bool	notifyPluginScheduleCompletedChan chan bool	notifyNodeUpdateChan              chan bool	notifyNodeUpdateCompletedChan     chan bool	notifyClusterStoppedChan          chan bool	showLog bool	masterGcInterval              time.Duration	masterLockingInterval         time.Duration	masterLockExpiredTime         time.Duration	nodeVoteInterval              time.Duration	nodeDisconnectedTimeout       time.Duration	updateNodeStatusInterval      time.Duration	pluginSchedulerInterval       time.Duration	pluginSchedulerTickerInterval time.Duration	pluginDeactivatedTimeout      time.Duration}func NewCluster(config *app.Config, plugin_manager *plugin_manager.PluginManager) *Cluster {	return &Cluster{		id:                            uuid.New().String(),		port:                          uint16(config.ServerPort),		stopChan:                      make(chan bool),		showLog:                       config.DisplayClusterLog,		masterGcInterval:              MASTER_GC_INTERVAL,		masterLockingInterval:         MASTER_LOCKING_INTERVAL,		masterLockExpiredTime:         MASTER_LOCK_EXPIRED_TIME,		nodeVoteInterval:              NODE_VOTE_INTERVAL,		nodeDisconnectedTimeout:       NODE_DISCONNECTED_TIMEOUT,		updateNodeStatusInterval:      UPDATE_NODE_STATUS_INTERVAL,		pluginSchedulerInterval:       PLUGIN_SCHEDULER_INTERVAL,		pluginSchedulerTickerInterval: PLUGIN_SCHEDULER_TICKER_INTERVAL,		pluginDeactivatedTimeout:      PLUGIN_DEACTIVATED_TIMEOUT,		manager: plugin_manager,		notifyBecomeMasterChan:            make(chan bool),		notifyMasterGcChan:                make(chan bool),		notifyMasterGcCompletedChan:       make(chan bool),		notifyVotingChan:                  make(chan bool),		notifyVotingCompletedChan:         make(chan bool),		notifyPluginScheduleChan:          make(chan bool),		notifyPluginScheduleCompletedChan: make(chan bool),		notifyNodeUpdateChan:              make(chan bool),		notifyNodeUpdateCompletedChan:     make(chan bool),		notifyClusterStoppedChan:          make(chan bool),	}}func (c *Cluster) Launch() {	go c.clusterLifetime()}func (c *Cluster) Close() error {	if atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {		close(c.stopChan)	}	return nil}func (c *Cluster) ID() string {	return c.id}// trigger for master eventfunc (c *Cluster) notifyBecomeMaster() {	if atomic.LoadInt32(&c.stopped) == 1 {		return	}	select {	case c.notifyBecomeMasterChan <- true:	default:	}}// receive the master eventfunc (c *Cluster) NotifyBecomeMaster() <-chan bool {	if atomic.LoadInt32(&c.stopped) == 1 {		return nil	}	return c.notifyBecomeMasterChan}// trigger for master gc eventfunc (c *Cluster) notifyMasterGC() {	if atomic.LoadInt32(&c.stopped) == 1 {		return	}	select {	case c.notifyMasterGcChan <- true:	default:	}}// trigger for master gc completed eventfunc (c *Cluster) notifyMasterGCCompleted() {	if atomic.LoadInt32(&c.stopped) == 1 {		return	}	select {	case c.notifyMasterGcCompletedChan <- true:	default:	}}// trigger for voting eventfunc (c *Cluster) notifyVoting() {	if atomic.LoadInt32(&c.stopped) == 1 {		return	}	select {	case c.notifyVotingChan <- true:	default:	}}// trigger for voting completed eventfunc (c *Cluster) notifyVotingCompleted() {	if atomic.LoadInt32(&c.stopped) == 1 {		return	}	select {	case c.notifyVotingCompletedChan <- true:	default:	}}// trigger for plugin schedule eventfunc (c *Cluster) notifyPluginSchedule() {	if atomic.LoadInt32(&c.stopped) == 1 {		return	}	select {	case c.notifyPluginScheduleChan <- true:	default:	}}// trigger for plugin schedule completed eventfunc (c *Cluster) notifyPluginScheduleCompleted() {	if atomic.LoadInt32(&c.stopped) == 1 {		return	}	select {	case c.notifyPluginScheduleCompletedChan <- true:	default:	}}// trigger for node update eventfunc (c *Cluster) notifyNodeUpdate() {	if atomic.LoadInt32(&c.stopped) == 1 {		return	}	select {	case c.notifyNodeUpdateChan <- true:	default:	}}// trigger for node update completed eventfunc (c *Cluster) notifyNodeUpdateCompleted() {	if atomic.LoadInt32(&c.stopped) == 1 {		return	}	select {	case c.notifyNodeUpdateCompletedChan <- true:	default:	}}// trigger for cluster stopped eventfunc (c *Cluster) notifyClusterStopped() {	select {	case c.notifyClusterStoppedChan <- true:	default:	}}// receive the master gc eventfunc (c *Cluster) NotifyMasterGC() <-chan bool {	if atomic.LoadInt32(&c.stopped) == 1 {		return nil	}	return c.notifyMasterGcChan}// receive the master gc completed eventfunc (c *Cluster) NotifyMasterGCCompleted() <-chan bool {	if atomic.LoadInt32(&c.stopped) == 1 {		return nil	}	return c.notifyMasterGcCompletedChan}// receive the voting eventfunc (c *Cluster) NotifyVoting() <-chan bool {	if atomic.LoadInt32(&c.stopped) == 1 {		return nil	}	return c.notifyVotingChan}// receive the voting completed eventfunc (c *Cluster) NotifyVotingCompleted() <-chan bool {	if atomic.LoadInt32(&c.stopped) == 1 {		return nil	}	return c.notifyVotingCompletedChan}// receive the plugin schedule eventfunc (c *Cluster) NotifyPluginSchedule() <-chan bool {	if atomic.LoadInt32(&c.stopped) == 1 {		return nil	}	return c.notifyPluginScheduleChan}// receive the plugin schedule completed eventfunc (c *Cluster) NotifyPluginScheduleCompleted() <-chan bool {	if atomic.LoadInt32(&c.stopped) == 1 {		return nil	}	return c.notifyPluginScheduleCompletedChan}// receive the node update eventfunc (c *Cluster) NotifyNodeUpdate() <-chan bool {	if atomic.LoadInt32(&c.stopped) == 1 {		return nil	}	return c.notifyNodeUpdateChan}// receive the node update completed eventfunc (c *Cluster) NotifyNodeUpdateCompleted() <-chan bool {	if atomic.LoadInt32(&c.stopped) == 1 {		return nil	}	return c.notifyNodeUpdateCompletedChan}// receive the cluster stopped eventfunc (c *Cluster) NotifyClusterStopped() <-chan bool {	return c.notifyClusterStoppedChan}
 |