preemptive.go 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. package cluster
  2. import (
  3. "errors"
  4. "net"
  5. "time"
  6. "github.com/langgenius/dify-plugin-daemon/internal/cluster/cluster_id"
  7. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/network"
  10. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  11. )
  12. // Plugin daemon will preemptively try to lock the slot to be the master of the cluster
  13. // and keep update current status of the whole cluster
  14. // once the master is no longer active, one of the slave will try to lock the slot again
  15. // and become the new master
  16. //
  17. // Once a node becomes master, It will take responsibility to gc the nodes has already deactivated
  18. // and all nodes should to maintenance their own status
  19. //
  20. // State:
  21. // - hashmap[cluster-status]
  22. // - node-id:
  23. // - list[ip]:
  24. // - address: string
  25. // - vote: int
  26. // - last_ping_at: int64
  27. // - preemption-lock: node-id
  28. // - node-status-upgrade-status
  29. //
  30. // A node will be removed from the cluster if it is no longer active
  31. var (
  32. i_am_master = false
  33. )
  34. const (
  35. CLUSTER_STATUS_HASH_MAP_KEY = "cluster-status-hash-map"
  36. PREEMPTION_LOCK_KEY = "cluster-master-preemption-lock"
  37. )
  38. const (
  39. MASTER_LOCKING_INTERVAL = time.Millisecond * 500 // interval to try to lock the slot to be the master
  40. MASTER_LOCK_EXPIRED_TIME = time.Second * 5 // expired time of master key
  41. MASTER_GC_INTERVAL = time.Second * 10 // interval to do garbage collection of nodes has already deactivated
  42. NODE_VOTE_INTERVAL = time.Second * 30 // interval to vote the ips of the nodes
  43. UPDATE_NODE_STATUS_INTERVAL = time.Second * 5 // interval to update the status of the node
  44. NODE_DISCONNECTED_TIMEOUT = time.Second * 10 // once a node is no longer active, it will be removed from the cluster
  45. )
  46. // lifetime of the cluster
  47. func (c *Cluster) clusterLifetime() {
  48. ticker_lock_master := time.NewTicker(MASTER_LOCKING_INTERVAL)
  49. defer ticker_lock_master.Stop()
  50. ticker_update_node_status := time.NewTicker(UPDATE_NODE_STATUS_INTERVAL)
  51. defer ticker_update_node_status.Stop()
  52. master_gc_ticker := time.NewTicker(MASTER_GC_INTERVAL)
  53. defer master_gc_ticker.Stop()
  54. node_vote_ticker := time.NewTicker(NODE_VOTE_INTERVAL)
  55. defer node_vote_ticker.Stop()
  56. if err := c.voteIps(); err != nil {
  57. log.Error("failed to vote the ips of the nodes: %s", err.Error())
  58. }
  59. for {
  60. select {
  61. case <-ticker_lock_master.C:
  62. if !i_am_master {
  63. // try lock the slot
  64. if success, err := c.lockMaster(); err != nil {
  65. log.Error("failed to lock the slot to be the master of the cluster: %s", err.Error())
  66. } else if success {
  67. i_am_master = true
  68. log.Info("current node has become the master of the cluster")
  69. } else {
  70. i_am_master = false
  71. log.Info("current node lost the master slot")
  72. }
  73. } else {
  74. // update the master
  75. if err := c.updateMaster(); err != nil {
  76. log.Error("failed to update the master: %s", err.Error())
  77. }
  78. }
  79. case <-ticker_update_node_status.C:
  80. if err := c.updateNodeStatus(); err != nil {
  81. log.Error("failed to update the status of the node: %s", err.Error())
  82. }
  83. case <-master_gc_ticker.C:
  84. if i_am_master {
  85. if err := c.gcNodes(); err != nil {
  86. log.Error("failed to gc the nodes has already deactivated: %s", err.Error())
  87. }
  88. }
  89. case <-node_vote_ticker.C:
  90. if err := c.voteIps(); err != nil {
  91. log.Error("failed to vote the ips of the nodes: %s", err.Error())
  92. }
  93. }
  94. }
  95. }
  96. // try lock the slot to be the master of the cluster
  97. // returns:
  98. // - bool: true if the slot is locked by the node
  99. // - error: error if any
  100. func (c *Cluster) lockMaster() (bool, error) {
  101. var final_error error
  102. for i := 0; i < 3; i++ {
  103. if success, err := cache.SetNX(PREEMPTION_LOCK_KEY, cluster_id.GetInstanceID(), MASTER_LOCK_EXPIRED_TIME); err != nil {
  104. // try again
  105. if final_error == nil {
  106. final_error = err
  107. } else {
  108. final_error = errors.Join(final_error, err)
  109. }
  110. } else if !success {
  111. return false, nil
  112. } else {
  113. return true, nil
  114. }
  115. }
  116. return false, final_error
  117. }
  118. // update master
  119. func (c *Cluster) updateMaster() error {
  120. // update expired time of master key
  121. if _, err := cache.Expire(PREEMPTION_LOCK_KEY, MASTER_LOCK_EXPIRED_TIME); err != nil {
  122. return err
  123. }
  124. return nil
  125. }
  126. // update the status of the node
  127. func (c *Cluster) updateNodeStatus() error {
  128. if err := c.LockNodeStatus(cluster_id.GetInstanceID()); err != nil {
  129. return err
  130. }
  131. defer c.UnlockNodeStatus(cluster_id.GetInstanceID())
  132. // update the status of the node
  133. node_status, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, cluster_id.GetInstanceID())
  134. if err != nil {
  135. if err == cache.ErrNotFound {
  136. // try to get ips configs
  137. ips, err := network.FetchCurrentIps()
  138. if err != nil {
  139. return err
  140. }
  141. node_status = &node{
  142. Ips: parser.Map(func(from net.IP) ip {
  143. return ip{
  144. Address: from.String(),
  145. Votes: []vote{},
  146. }
  147. }, ips),
  148. }
  149. } else {
  150. return err
  151. }
  152. } else {
  153. ips, err := network.FetchCurrentIps()
  154. if err != nil {
  155. return err
  156. }
  157. // add new ip if not exist
  158. for _, _ip := range ips {
  159. found := false
  160. for _, node_ip := range node_status.Ips {
  161. if node_ip.Address == _ip.String() {
  162. found = true
  163. break
  164. }
  165. }
  166. if !found {
  167. node_status.Ips = append(node_status.Ips, ip{
  168. Address: _ip.String(),
  169. Votes: []vote{},
  170. })
  171. }
  172. }
  173. }
  174. // refresh the last ping time
  175. node_status.LastPingAt = time.Now().Unix()
  176. // update the status of the node
  177. if err := cache.SetMapOneField(CLUSTER_STATUS_HASH_MAP_KEY, cluster_id.GetInstanceID(), node_status); err != nil {
  178. return err
  179. }
  180. return nil
  181. }
  182. func (c *Cluster) IsMaster() bool {
  183. return i_am_master
  184. }
  185. func (c *Cluster) IsNodeAlive(node_id string) bool {
  186. node_status, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, node_id)
  187. if err != nil {
  188. return false
  189. }
  190. return time.Since(time.Unix(node_status.LastPingAt, 0)) < NODE_DISCONNECTED_TIMEOUT
  191. }