lifetime.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. package cluster
  2. import (
  3. "time"
  4. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
  5. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  6. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  7. )
  8. const (
  9. // master
  10. // the cluster master is responsible for managing garbage collection for both nodes and plugins.
  11. // typically, each node handles the garbage collection for its own plugins
  12. // However, if a node becomes inactive, the master takes over this task.
  13. // every node has an equal chance of becoming the master.
  14. // once a node is selected as the master, it is locked in that role.
  15. // If the master node becomes inactive, the master slot is released, allowing other nodes to attempt to take over the role.
  16. MASTER_LOCKING_INTERVAL = time.Millisecond * 500 // interval to try to lock the slot to be the master
  17. MASTER_LOCK_EXPIRED_TIME = time.Second * 2 // expired time of master key
  18. MASTER_GC_INTERVAL = time.Second * 10 // interval to do garbage collection of nodes has already deactivated
  19. // node
  20. // To determine the available IPs of the nodes, each node will vote for the IPs of other nodes.
  21. // this voting process will occur every $NODE_VOTE_INTERVAL.
  22. // simultaneously, all nodes will synchronize to the latest status in memory every $UPDATE_NODE_STATUS_INTERVAL.
  23. // each node will also update its own status to remain active. If a node becomes inactive, it will be removed from the cluster.
  24. NODE_VOTE_INTERVAL = time.Second * 30 // interval to vote the ips of the nodes
  25. UPDATE_NODE_STATUS_INTERVAL = time.Second * 5 // interval to update the status of the node
  26. NODE_DISCONNECTED_TIMEOUT = time.Second * 10 // once a node is no longer active, it will be removed from the cluster
  27. // plugin scheduler
  28. // each node will schedule its plugins every $PLUGIN_SCHEDULER_INTERVAL time
  29. // and schedule process will be triggered every $PLUGIN_SCHEDULER_TICKER_INTERVAL time
  30. // not all the plugins will be scheduled every time, only the plugins that are not scheduled in $PLUGIN_SCHEDULER_INTERVAL time will be scheduled
  31. // and the plugins that are not active will be removed from the cluster
  32. PLUGIN_SCHEDULER_TICKER_INTERVAL = time.Second * 3 // interval to schedule the plugins
  33. PLUGIN_SCHEDULER_INTERVAL = time.Second * 10 // interval to schedule the plugins
  34. PLUGIN_DEACTIVATED_TIMEOUT = time.Second * 30 // once a plugin is no longer active, it will be removed from the cluster
  35. )
  36. const (
  37. CLUSTER_NEW_NODE_CHANNEL = "cluster-new-node-channel"
  38. )
  39. // lifetime of the cluster
  40. func (c *Cluster) clusterLifetime() {
  41. defer func() {
  42. if err := c.removeSelfNode(); err != nil {
  43. log.Error("failed to remove the self node from the cluster: %s", err.Error())
  44. }
  45. c.notifyClusterStopped()
  46. close(c.notify_cluster_stopped_chan)
  47. close(c.notify_become_master_chan)
  48. close(c.notify_master_gc_chan)
  49. close(c.notify_master_gc_completed_chan)
  50. close(c.notify_voting_chan)
  51. close(c.notify_voting_completed_chan)
  52. close(c.notify_plugin_schedule_chan)
  53. close(c.notify_plugin_schedule_completed_chan)
  54. close(c.notify_node_update_chan)
  55. close(c.notify_node_update_completed_chan)
  56. }()
  57. ticker_lock_master := time.NewTicker(MASTER_LOCKING_INTERVAL)
  58. defer ticker_lock_master.Stop()
  59. ticker_update_node_status := time.NewTicker(UPDATE_NODE_STATUS_INTERVAL)
  60. defer ticker_update_node_status.Stop()
  61. master_gc_ticker := time.NewTicker(MASTER_GC_INTERVAL)
  62. defer master_gc_ticker.Stop()
  63. node_vote_ticker := time.NewTicker(NODE_VOTE_INTERVAL)
  64. defer node_vote_ticker.Stop()
  65. plugin_scheduler_ticker := time.NewTicker(PLUGIN_SCHEDULER_INTERVAL)
  66. defer plugin_scheduler_ticker.Stop()
  67. // vote for all ips and find the best one, prepare for later traffic scheduling
  68. routine.Submit(func() {
  69. if err := c.updateNodeStatus(); err != nil {
  70. log.Error("failed to update the status of the node: %s", err.Error())
  71. }
  72. if err := cache.Publish(CLUSTER_NEW_NODE_CHANNEL, newNodeEvent{
  73. NodeID: c.id,
  74. }); err != nil {
  75. log.Error("failed to publish the new node event: %s", err.Error())
  76. }
  77. if err := c.voteAddresses(); err != nil {
  78. log.Error("failed to vote the ips of the nodes: %s", err.Error())
  79. }
  80. })
  81. new_node_chan, cancel := cache.Subscribe[newNodeEvent](CLUSTER_NEW_NODE_CHANNEL)
  82. defer cancel()
  83. for {
  84. select {
  85. case <-ticker_lock_master.C:
  86. if !c.i_am_master {
  87. // try lock the slot
  88. if success, err := c.lockMaster(); err != nil {
  89. log.Error("failed to lock the slot to be the master of the cluster: %s", err.Error())
  90. } else if success {
  91. c.i_am_master = true
  92. log.Info("current node has become the master of the cluster")
  93. c.notifyBecomeMaster()
  94. } else {
  95. if c.i_am_master {
  96. c.i_am_master = false
  97. log.Info("current node has released the master slot")
  98. }
  99. }
  100. } else {
  101. // update the master
  102. if err := c.updateMaster(); err != nil {
  103. log.Error("failed to update the master: %s", err.Error())
  104. }
  105. }
  106. case <-ticker_update_node_status.C:
  107. if err := c.updateNodeStatus(); err != nil {
  108. log.Error("failed to update the status of the node: %s", err.Error())
  109. }
  110. case <-master_gc_ticker.C:
  111. if c.i_am_master {
  112. c.notifyMasterGC()
  113. if err := c.autoGCNodes(); err != nil {
  114. log.Error("failed to gc the nodes have already deactivated: %s", err.Error())
  115. }
  116. if err := c.autoGCPlugins(); err != nil {
  117. log.Error("failed to gc the plugins have already stopped: %s", err.Error())
  118. }
  119. c.notifyMasterGCCompleted()
  120. }
  121. case <-node_vote_ticker.C:
  122. if err := c.voteAddresses(); err != nil {
  123. log.Error("failed to vote the ips of the nodes: %s", err.Error())
  124. }
  125. case _, ok := <-new_node_chan:
  126. if ok {
  127. // vote for the new node
  128. if err := c.voteAddresses(); err != nil {
  129. log.Error("failed to vote the ips of the nodes: %s", err.Error())
  130. }
  131. }
  132. case <-plugin_scheduler_ticker.C:
  133. if err := c.schedulePlugins(); err != nil {
  134. log.Error("failed to schedule the plugins: %s", err.Error())
  135. }
  136. case <-c.stop_chan:
  137. return
  138. }
  139. }
  140. }