123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304 |
- package cluster
- import (
- "sync"
- "sync/atomic"
- "time"
- "github.com/google/uuid"
- "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
- "github.com/langgenius/dify-plugin-daemon/internal/types/app"
- "github.com/langgenius/dify-plugin-daemon/internal/utils/mapping"
- )
- type Cluster struct {
- // id is the unique id of the cluster
- id string
- // i_am_master is the flag to indicate whether the current node is the master node
- iAmMaster bool
- // main http port of the current node
- port uint16
- // plugins stores all the plugin life time of the current node
- plugins mapping.Map[string, *pluginLifeTime]
- pluginLock sync.RWMutex
- manager *plugin_manager.PluginManager
- // nodes stores all the nodes of the cluster
- nodes mapping.Map[string, node]
- // signals for waiting for the cluster to stop
- stopChan chan bool
- stopped int32
- isInAutoGcNodes int32
- isInAutoGcPlugins int32
- // channels to notify cluster event
- notifyBecomeMasterChan chan bool
- notifyMasterGcChan chan bool
- notifyMasterGcCompletedChan chan bool
- notifyVotingChan chan bool
- notifyVotingCompletedChan chan bool
- notifyPluginScheduleChan chan bool
- notifyPluginScheduleCompletedChan chan bool
- notifyNodeUpdateChan chan bool
- notifyNodeUpdateCompletedChan chan bool
- notifyClusterStoppedChan chan bool
- showLog bool
- masterGcInterval time.Duration
- masterLockingInterval time.Duration
- masterLockExpiredTime time.Duration
- nodeVoteInterval time.Duration
- nodeDisconnectedTimeout time.Duration
- updateNodeStatusInterval time.Duration
- pluginSchedulerInterval time.Duration
- pluginSchedulerTickerInterval time.Duration
- pluginDeactivatedTimeout time.Duration
- }
- func NewCluster(config *app.Config, plugin_manager *plugin_manager.PluginManager) *Cluster {
- return &Cluster{
- id: uuid.New().String(),
- port: uint16(config.ServerPort),
- stopChan: make(chan bool),
- showLog: config.DisplayClusterLog,
- masterGcInterval: MASTER_GC_INTERVAL,
- masterLockingInterval: MASTER_LOCKING_INTERVAL,
- masterLockExpiredTime: MASTER_LOCK_EXPIRED_TIME,
- nodeVoteInterval: NODE_VOTE_INTERVAL,
- nodeDisconnectedTimeout: NODE_DISCONNECTED_TIMEOUT,
- updateNodeStatusInterval: UPDATE_NODE_STATUS_INTERVAL,
- pluginSchedulerInterval: PLUGIN_SCHEDULER_INTERVAL,
- pluginSchedulerTickerInterval: PLUGIN_SCHEDULER_TICKER_INTERVAL,
- pluginDeactivatedTimeout: PLUGIN_DEACTIVATED_TIMEOUT,
- manager: plugin_manager,
- notifyBecomeMasterChan: make(chan bool),
- notifyMasterGcChan: make(chan bool),
- notifyMasterGcCompletedChan: make(chan bool),
- notifyVotingChan: make(chan bool),
- notifyVotingCompletedChan: make(chan bool),
- notifyPluginScheduleChan: make(chan bool),
- notifyPluginScheduleCompletedChan: make(chan bool),
- notifyNodeUpdateChan: make(chan bool),
- notifyNodeUpdateCompletedChan: make(chan bool),
- notifyClusterStoppedChan: make(chan bool),
- }
- }
- func (c *Cluster) Launch() {
- go c.clusterLifetime()
- }
- func (c *Cluster) Close() error {
- if atomic.CompareAndSwapInt32(&c.stopped, 0, 1) {
- close(c.stopChan)
- }
- return nil
- }
- func (c *Cluster) ID() string {
- return c.id
- }
- // trigger for master event
- func (c *Cluster) notifyBecomeMaster() {
- if atomic.LoadInt32(&c.stopped) == 1 {
- return
- }
- select {
- case c.notifyBecomeMasterChan <- true:
- default:
- }
- }
- // receive the master event
- func (c *Cluster) NotifyBecomeMaster() <-chan bool {
- if atomic.LoadInt32(&c.stopped) == 1 {
- return nil
- }
- return c.notifyBecomeMasterChan
- }
- // trigger for master gc event
- func (c *Cluster) notifyMasterGC() {
- if atomic.LoadInt32(&c.stopped) == 1 {
- return
- }
- select {
- case c.notifyMasterGcChan <- true:
- default:
- }
- }
- // trigger for master gc completed event
- func (c *Cluster) notifyMasterGCCompleted() {
- if atomic.LoadInt32(&c.stopped) == 1 {
- return
- }
- select {
- case c.notifyMasterGcCompletedChan <- true:
- default:
- }
- }
- // trigger for voting event
- func (c *Cluster) notifyVoting() {
- if atomic.LoadInt32(&c.stopped) == 1 {
- return
- }
- select {
- case c.notifyVotingChan <- true:
- default:
- }
- }
- // trigger for voting completed event
- func (c *Cluster) notifyVotingCompleted() {
- if atomic.LoadInt32(&c.stopped) == 1 {
- return
- }
- select {
- case c.notifyVotingCompletedChan <- true:
- default:
- }
- }
- // trigger for plugin schedule event
- func (c *Cluster) notifyPluginSchedule() {
- if atomic.LoadInt32(&c.stopped) == 1 {
- return
- }
- select {
- case c.notifyPluginScheduleChan <- true:
- default:
- }
- }
- // trigger for plugin schedule completed event
- func (c *Cluster) notifyPluginScheduleCompleted() {
- if atomic.LoadInt32(&c.stopped) == 1 {
- return
- }
- select {
- case c.notifyPluginScheduleCompletedChan <- true:
- default:
- }
- }
- // trigger for node update event
- func (c *Cluster) notifyNodeUpdate() {
- if atomic.LoadInt32(&c.stopped) == 1 {
- return
- }
- select {
- case c.notifyNodeUpdateChan <- true:
- default:
- }
- }
- // trigger for node update completed event
- func (c *Cluster) notifyNodeUpdateCompleted() {
- if atomic.LoadInt32(&c.stopped) == 1 {
- return
- }
- select {
- case c.notifyNodeUpdateCompletedChan <- true:
- default:
- }
- }
- // trigger for cluster stopped event
- func (c *Cluster) notifyClusterStopped() {
- select {
- case c.notifyClusterStoppedChan <- true:
- default:
- }
- }
- // receive the master gc event
- func (c *Cluster) NotifyMasterGC() <-chan bool {
- if atomic.LoadInt32(&c.stopped) == 1 {
- return nil
- }
- return c.notifyMasterGcChan
- }
- // receive the master gc completed event
- func (c *Cluster) NotifyMasterGCCompleted() <-chan bool {
- if atomic.LoadInt32(&c.stopped) == 1 {
- return nil
- }
- return c.notifyMasterGcCompletedChan
- }
- // receive the voting event
- func (c *Cluster) NotifyVoting() <-chan bool {
- if atomic.LoadInt32(&c.stopped) == 1 {
- return nil
- }
- return c.notifyVotingChan
- }
- // receive the voting completed event
- func (c *Cluster) NotifyVotingCompleted() <-chan bool {
- if atomic.LoadInt32(&c.stopped) == 1 {
- return nil
- }
- return c.notifyVotingCompletedChan
- }
- // receive the plugin schedule event
- func (c *Cluster) NotifyPluginSchedule() <-chan bool {
- if atomic.LoadInt32(&c.stopped) == 1 {
- return nil
- }
- return c.notifyPluginScheduleChan
- }
- // receive the plugin schedule completed event
- func (c *Cluster) NotifyPluginScheduleCompleted() <-chan bool {
- if atomic.LoadInt32(&c.stopped) == 1 {
- return nil
- }
- return c.notifyPluginScheduleCompletedChan
- }
- // receive the node update event
- func (c *Cluster) NotifyNodeUpdate() <-chan bool {
- if atomic.LoadInt32(&c.stopped) == 1 {
- return nil
- }
- return c.notifyNodeUpdateChan
- }
- // receive the node update completed event
- func (c *Cluster) NotifyNodeUpdateCompleted() <-chan bool {
- if atomic.LoadInt32(&c.stopped) == 1 {
- return nil
- }
- return c.notifyNodeUpdateCompletedChan
- }
- // receive the cluster stopped event
- func (c *Cluster) NotifyClusterStopped() <-chan bool {
- return c.notifyClusterStoppedChan
- }
|