lifetime.go 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. package cluster
  2. import (
  3. "time"
  4. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
  5. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  6. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  7. )
  8. const (
  9. // master
  10. // the cluster master is responsible for managing garbage collection for both nodes and plugins.
  11. // typically, each node handles the garbage collection for its own plugins
  12. // However, if a node becomes inactive, the master takes over this task.
  13. // every node has an equal chance of becoming the master.
  14. // once a node is selected as the master, it is locked in that role.
  15. // If the master node becomes inactive, the master slot is released, allowing other nodes to attempt to take over the role.
  16. MASTER_LOCKING_INTERVAL = time.Millisecond * 500 // interval to try to lock the slot to be the master
  17. MASTER_LOCK_EXPIRED_TIME = time.Second * 2 // expired time of master key
  18. MASTER_GC_INTERVAL = time.Second * 10 // interval to do garbage collection of nodes has already deactivated
  19. // node
  20. // To determine the available IPs of the nodes, each node will vote for the IPs of other nodes.
  21. // this voting process will occur every $NODE_VOTE_INTERVAL.
  22. // simultaneously, all nodes will synchronize to the latest status in memory every $UPDATE_NODE_STATUS_INTERVAL.
  23. // each node will also update its own status to remain active. If a node becomes inactive, it will be removed from the cluster.
  24. NODE_VOTE_INTERVAL = time.Second * 30 // interval to vote the ips of the nodes
  25. UPDATE_NODE_STATUS_INTERVAL = time.Second * 5 // interval to update the status of the node
  26. NODE_DISCONNECTED_TIMEOUT = time.Second * 10 // once a node is no longer active, it will be removed from the cluster
  27. // plugin scheduler
  28. // each node will schedule its plugins every $PLUGIN_SCHEDULER_INTERVAL time
  29. // and schedule process will be triggered every $PLUGIN_SCHEDULER_TICKER_INTERVAL time
  30. // not all the plugins will be scheduled every time, only the plugins that are not scheduled in $PLUGIN_SCHEDULER_INTERVAL time will be scheduled
  31. // and the plugins that are not active will be removed from the cluster
  32. PLUGIN_SCHEDULER_TICKER_INTERVAL = time.Second * 3 // interval to schedule the plugins
  33. PLUGIN_SCHEDULER_INTERVAL = time.Second * 10 // interval to schedule the plugins
  34. PLUGIN_DEACTIVATED_TIMEOUT = time.Second * 30 // once a plugin is no longer active, it will be removed from the cluster
  35. )
  36. const (
  37. CLUSTER_NEW_NODE_CHANNEL = "cluster-new-node-channel"
  38. )
  39. // lifetime of the cluster
  40. func (c *Cluster) clusterLifetime() {
  41. defer func() {
  42. if err := c.removeSelfNode(); err != nil {
  43. log.Error("failed to remove the self node from the cluster: %s", err.Error())
  44. }
  45. c.notifyClusterStopped()
  46. close(c.notifyClusterStoppedChan)
  47. close(c.notifyBecomeMasterChan)
  48. close(c.notifyMasterGcChan)
  49. close(c.notifyMasterGcCompletedChan)
  50. close(c.notifyVotingChan)
  51. close(c.notifyVotingCompletedChan)
  52. close(c.notifyPluginScheduleChan)
  53. close(c.notifyPluginScheduleCompletedChan)
  54. close(c.notifyNodeUpdateChan)
  55. close(c.notifyNodeUpdateCompletedChan)
  56. }()
  57. tickerLockMaster := time.NewTicker(c.masterLockingInterval)
  58. defer tickerLockMaster.Stop()
  59. tickerUpdateNodeStatus := time.NewTicker(c.updateNodeStatusInterval)
  60. defer tickerUpdateNodeStatus.Stop()
  61. masterGcTicker := time.NewTicker(c.masterGcInterval)
  62. defer masterGcTicker.Stop()
  63. nodeVoteTicker := time.NewTicker(c.nodeVoteInterval)
  64. defer nodeVoteTicker.Stop()
  65. pluginSchedulerTicker := time.NewTicker(c.pluginSchedulerTickerInterval)
  66. defer pluginSchedulerTicker.Stop()
  67. // vote for all ips and find the best one, prepare for later traffic scheduling
  68. routine.Submit(map[string]string{
  69. "module": "cluster",
  70. "function": "voteAddressesWhenInit",
  71. }, func() {
  72. if err := c.updateNodeStatus(); err != nil {
  73. log.Error("failed to update the status of the node: %s", err.Error())
  74. }
  75. if err := cache.Publish(CLUSTER_NEW_NODE_CHANNEL, newNodeEvent{
  76. NodeID: c.id,
  77. }); err != nil {
  78. log.Error("failed to publish the new node event: %s", err.Error())
  79. }
  80. if err := c.voteAddresses(); err != nil {
  81. log.Error("failed to vote the ips of the nodes: %s", err.Error())
  82. }
  83. })
  84. newNodeChan, cancel := cache.Subscribe[newNodeEvent](CLUSTER_NEW_NODE_CHANNEL)
  85. defer cancel()
  86. for {
  87. select {
  88. case <-tickerLockMaster.C:
  89. if !c.iAmMaster {
  90. // try lock the slot
  91. if success, err := c.lockMaster(); err != nil {
  92. log.Error("failed to lock the slot to be the master of the cluster: %s", err.Error())
  93. } else if success {
  94. c.iAmMaster = true
  95. log.Info("current node has become the master of the cluster")
  96. c.notifyBecomeMaster()
  97. } else {
  98. if c.iAmMaster {
  99. c.iAmMaster = false
  100. log.Info("current node has released the master slot")
  101. }
  102. }
  103. } else {
  104. // update the master
  105. if err := c.updateMaster(); err != nil {
  106. log.Error("failed to update the master: %s", err.Error())
  107. }
  108. }
  109. case <-tickerUpdateNodeStatus.C:
  110. if err := c.updateNodeStatus(); err != nil {
  111. log.Error("failed to update the status of the node: %s", err.Error())
  112. }
  113. case <-masterGcTicker.C:
  114. if c.iAmMaster {
  115. c.notifyMasterGC()
  116. if err := c.autoGCNodes(); err != nil {
  117. log.Error("failed to gc the nodes have already deactivated: %s", err.Error())
  118. }
  119. if err := c.autoGCPlugins(); err != nil {
  120. log.Error("failed to gc the plugins have already stopped: %s", err.Error())
  121. }
  122. c.notifyMasterGCCompleted()
  123. }
  124. case <-nodeVoteTicker.C:
  125. if err := c.voteAddresses(); err != nil {
  126. log.Error("failed to vote the ips of the nodes: %s", err.Error())
  127. }
  128. case _, ok := <-newNodeChan:
  129. if ok {
  130. // vote for the new node
  131. if err := c.voteAddresses(); err != nil {
  132. log.Error("failed to vote the ips of the nodes: %s", err.Error())
  133. }
  134. }
  135. case <-pluginSchedulerTicker.C:
  136. if err := c.schedulePlugins(); err != nil {
  137. log.Error("failed to schedule the plugins: %s", err.Error())
  138. }
  139. case <-c.stopChan:
  140. return
  141. }
  142. }
  143. }