lifetime.go 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283
  1. package cluster
  2. import (
  3. "time"
  4. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  5. )
  6. const (
  7. MASTER_LOCKING_INTERVAL = time.Millisecond * 500 // interval to try to lock the slot to be the master
  8. MASTER_LOCK_EXPIRED_TIME = time.Second * 5 // expired time of master key
  9. MASTER_GC_INTERVAL = time.Second * 10 // interval to do garbage collection of nodes has already deactivated
  10. NODE_VOTE_INTERVAL = time.Second * 30 // interval to vote the ips of the nodes
  11. UPDATE_NODE_STATUS_INTERVAL = time.Second * 5 // interval to update the status of the node
  12. NODE_DISCONNECTED_TIMEOUT = time.Second * 10 // once a node is no longer active, it will be removed from the cluster
  13. PLUGIN_SCHEDULER_INTERVAL = time.Second * 3 // interval to schedule the plugins
  14. )
  15. // lifetime of the cluster
  16. func (c *Cluster) clusterLifetime() {
  17. ticker_lock_master := time.NewTicker(MASTER_LOCKING_INTERVAL)
  18. defer ticker_lock_master.Stop()
  19. ticker_update_node_status := time.NewTicker(UPDATE_NODE_STATUS_INTERVAL)
  20. defer ticker_update_node_status.Stop()
  21. master_gc_ticker := time.NewTicker(MASTER_GC_INTERVAL)
  22. defer master_gc_ticker.Stop()
  23. node_vote_ticker := time.NewTicker(NODE_VOTE_INTERVAL)
  24. defer node_vote_ticker.Stop()
  25. plugin_scheduler_ticker := time.NewTicker(PLUGIN_SCHEDULER_INTERVAL)
  26. defer plugin_scheduler_ticker.Stop()
  27. if err := c.voteIps(); err != nil {
  28. log.Error("failed to vote the ips of the nodes: %s", err.Error())
  29. }
  30. for {
  31. select {
  32. case <-ticker_lock_master.C:
  33. if !c.i_am_master {
  34. // try lock the slot
  35. if success, err := c.lockMaster(); err != nil {
  36. log.Error("failed to lock the slot to be the master of the cluster: %s", err.Error())
  37. } else if success {
  38. c.i_am_master = true
  39. log.Info("current node has become the master of the cluster")
  40. } else {
  41. c.i_am_master = false
  42. log.Info("current node lost the master slot")
  43. }
  44. } else {
  45. // update the master
  46. if err := c.updateMaster(); err != nil {
  47. log.Error("failed to update the master: %s", err.Error())
  48. }
  49. }
  50. case <-ticker_update_node_status.C:
  51. if err := c.updateNodeStatus(); err != nil {
  52. log.Error("failed to update the status of the node: %s", err.Error())
  53. }
  54. case <-master_gc_ticker.C:
  55. if c.i_am_master {
  56. if err := c.gcNodes(); err != nil {
  57. log.Error("failed to gc the nodes has already deactivated: %s", err.Error())
  58. }
  59. }
  60. case <-node_vote_ticker.C:
  61. if err := c.voteIps(); err != nil {
  62. log.Error("failed to vote the ips of the nodes: %s", err.Error())
  63. }
  64. case <-plugin_scheduler_ticker.C:
  65. if err := c.schedulePlugins(); err != nil {
  66. log.Error("failed to schedule the plugins: %s", err.Error())
  67. }
  68. case <-c.stop_chan:
  69. return
  70. }
  71. }
  72. }