lifetime.go 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. package cluster
  2. import (
  3. "time"
  4. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  5. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  6. )
  7. const (
  8. // master
  9. // the cluster master is responsible for managing garbage collection for both nodes and plugins.
  10. // typically, each node handles the garbage collection for its own plugins
  11. // However, if a node becomes inactive, the master takes over this task.
  12. // every node has an equal chance of becoming the master.
  13. // once a node is selected as the master, it is locked in that role.
  14. // If the master node becomes inactive, the master slot is released, allowing other nodes to attempt to take over the role.
  15. MASTER_LOCKING_INTERVAL = time.Millisecond * 500 // interval to try to lock the slot to be the master
  16. MASTER_LOCK_EXPIRED_TIME = time.Second * 2 // expired time of master key
  17. MASTER_GC_INTERVAL = time.Second * 10 // interval to do garbage collection of nodes has already deactivated
  18. // node
  19. // To determine the available IPs of the nodes, each node will vote for the IPs of other nodes.
  20. // this voting process will occur every $NODE_VOTE_INTERVAL.
  21. // simultaneously, all nodes will synchronize to the latest status in memory every $UPDATE_NODE_STATUS_INTERVAL.
  22. // each node will also update its own status to remain active. If a node becomes inactive, it will be removed from the cluster.
  23. NODE_VOTE_INTERVAL = time.Second * 30 // interval to vote the ips of the nodes
  24. UPDATE_NODE_STATUS_INTERVAL = time.Second * 5 // interval to update the status of the node
  25. NODE_DISCONNECTED_TIMEOUT = time.Second * 10 // once a node is no longer active, it will be removed from the cluster
  26. // plugin scheduler
  27. // each node will schedule its plugins every $PLUGIN_SCHEDULER_INTERVAL time
  28. // and schedule process will be triggered every $PLUGIN_SCHEDULER_TICKER_INTERVAL time
  29. // not all the plugins will be scheduled every time, only the plugins that are not scheduled in $PLUGIN_SCHEDULER_INTERVAL time will be scheduled
  30. // and the plugins that are not active will be removed from the cluster
  31. PLUGIN_SCHEDULER_TICKER_INTERVAL = time.Second * 3 // interval to schedule the plugins
  32. PLUGIN_SCHEDULER_INTERVAL = time.Second * 10 // interval to schedule the plugins
  33. PLUGIN_DEACTIVATED_TIMEOUT = time.Second * 30 // once a plugin is no longer active, it will be removed from the cluster
  34. )
  35. // lifetime of the cluster
  36. func (c *Cluster) clusterLifetime() {
  37. defer func() {
  38. if err := c.removeSelfNode(); err != nil {
  39. log.Error("failed to remove the self node from the cluster: %s", err.Error())
  40. }
  41. }()
  42. ticker_lock_master := time.NewTicker(MASTER_LOCKING_INTERVAL)
  43. defer ticker_lock_master.Stop()
  44. ticker_update_node_status := time.NewTicker(UPDATE_NODE_STATUS_INTERVAL)
  45. defer ticker_update_node_status.Stop()
  46. master_gc_ticker := time.NewTicker(MASTER_GC_INTERVAL)
  47. defer master_gc_ticker.Stop()
  48. node_vote_ticker := time.NewTicker(NODE_VOTE_INTERVAL)
  49. defer node_vote_ticker.Stop()
  50. plugin_scheduler_ticker := time.NewTicker(PLUGIN_SCHEDULER_INTERVAL)
  51. defer plugin_scheduler_ticker.Stop()
  52. // vote for all ips and find the best one, prepare for later traffic scheduling
  53. routine.Submit(func() {
  54. if err := c.voteIps(); err != nil {
  55. log.Error("failed to vote the ips of the nodes: %s", err.Error())
  56. }
  57. })
  58. // fetch all possible nodes
  59. routine.Submit(func() {
  60. if err := c.updateNodeStatus(); err != nil {
  61. log.Error("failed to update the status of the node: %s", err.Error())
  62. }
  63. })
  64. for {
  65. select {
  66. case <-ticker_lock_master.C:
  67. if !c.i_am_master {
  68. // try lock the slot
  69. if success, err := c.lockMaster(); err != nil {
  70. log.Error("failed to lock the slot to be the master of the cluster: %s", err.Error())
  71. } else if success {
  72. c.i_am_master = true
  73. log.Info("current node has become the master of the cluster")
  74. } else {
  75. if c.i_am_master {
  76. c.i_am_master = false
  77. log.Info("current node has released the master slot")
  78. }
  79. }
  80. } else {
  81. // update the master
  82. if err := c.updateMaster(); err != nil {
  83. log.Error("failed to update the master: %s", err.Error())
  84. }
  85. }
  86. case <-ticker_update_node_status.C:
  87. if err := c.updateNodeStatus(); err != nil {
  88. log.Error("failed to update the status of the node: %s", err.Error())
  89. }
  90. case <-master_gc_ticker.C:
  91. if c.i_am_master {
  92. if err := c.autoGCNodes(); err != nil {
  93. log.Error("failed to gc the nodes have already deactivated: %s", err.Error())
  94. }
  95. if err := c.autoGCPlugins(); err != nil {
  96. log.Error("failed to gc the plugins have already stopped: %s", err.Error())
  97. }
  98. }
  99. case <-node_vote_ticker.C:
  100. if err := c.voteIps(); err != nil {
  101. log.Error("failed to vote the ips of the nodes: %s", err.Error())
  102. }
  103. case <-plugin_scheduler_ticker.C:
  104. if err := c.schedulePlugins(); err != nil {
  105. log.Error("failed to schedule the plugins: %s", err.Error())
  106. }
  107. case <-c.stop_chan:
  108. return
  109. }
  110. }
  111. }