preemptive.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. package cluster
  2. import (
  3. "errors"
  4. "net"
  5. "time"
  6. "github.com/langgenius/dify-plugin-daemon/internal/cluster/cluster_id"
  7. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/network"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  10. )
  11. // Plugin daemon will preemptively try to lock the slot to be the master of the cluster
  12. // and keep update current status of the whole cluster
  13. // once the master is no longer active, one of the slave will try to lock the slot again
  14. // and become the new master
  15. //
  16. // Once a node becomes master, It will take responsibility to gc the nodes has already deactivated
  17. // and all nodes should to maintenance their own status
  18. //
  19. // State:
  20. // - hashmap[cluster-status]
  21. // - node-id:
  22. // - list[ip]:
  23. // - address: string
  24. // - vote: int
  25. // - last_ping_at: int64
  26. // - preemption-lock: node-id
  27. // - node-status-upgrade-status
  28. //
  29. // A node will be removed from the cluster if it is no longer active
  30. const (
  31. CLUSTER_STATUS_HASH_MAP_KEY = "cluster-status-hash-map"
  32. PREEMPTION_LOCK_KEY = "cluster-master-preemption-lock"
  33. )
  34. // try lock the slot to be the master of the cluster
  35. // returns:
  36. // - bool: true if the slot is locked by the node
  37. // - error: error if any
  38. func (c *Cluster) lockMaster() (bool, error) {
  39. var final_error error
  40. for i := 0; i < 3; i++ {
  41. if success, err := cache.SetNX(PREEMPTION_LOCK_KEY, cluster_id.GetInstanceID(), MASTER_LOCK_EXPIRED_TIME); err != nil {
  42. // try again
  43. if final_error == nil {
  44. final_error = err
  45. } else {
  46. final_error = errors.Join(final_error, err)
  47. }
  48. } else if !success {
  49. return false, nil
  50. } else {
  51. return true, nil
  52. }
  53. }
  54. return false, final_error
  55. }
  56. // update master
  57. func (c *Cluster) updateMaster() error {
  58. // update expired time of master key
  59. if _, err := cache.Expire(PREEMPTION_LOCK_KEY, MASTER_LOCK_EXPIRED_TIME); err != nil {
  60. return err
  61. }
  62. return nil
  63. }
  64. // update the status of the node
  65. func (c *Cluster) updateNodeStatus() error {
  66. if err := c.LockNodeStatus(cluster_id.GetInstanceID()); err != nil {
  67. return err
  68. }
  69. defer c.UnlockNodeStatus(cluster_id.GetInstanceID())
  70. // update the status of the node
  71. node_status, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, cluster_id.GetInstanceID())
  72. if err != nil {
  73. if err == cache.ErrNotFound {
  74. // try to get ips configs
  75. ips, err := network.FetchCurrentIps()
  76. if err != nil {
  77. return err
  78. }
  79. node_status = &node{
  80. Ips: parser.Map(func(from net.IP) ip {
  81. return ip{
  82. Address: from.String(),
  83. Votes: []vote{},
  84. }
  85. }, ips),
  86. }
  87. } else {
  88. return err
  89. }
  90. } else {
  91. ips, err := network.FetchCurrentIps()
  92. if err != nil {
  93. return err
  94. }
  95. // add new ip if not exist
  96. for _, _ip := range ips {
  97. found := false
  98. for _, node_ip := range node_status.Ips {
  99. if node_ip.Address == _ip.String() {
  100. found = true
  101. break
  102. }
  103. }
  104. if !found {
  105. node_status.Ips = append(node_status.Ips, ip{
  106. Address: _ip.String(),
  107. Votes: []vote{},
  108. })
  109. }
  110. }
  111. }
  112. // refresh the last ping time
  113. node_status.LastPingAt = time.Now().Unix()
  114. // update the status of the node
  115. if err := cache.SetMapOneField(CLUSTER_STATUS_HASH_MAP_KEY, cluster_id.GetInstanceID(), node_status); err != nil {
  116. return err
  117. }
  118. return nil
  119. }
  120. func (c *Cluster) IsMaster() bool {
  121. return c.i_am_master
  122. }
  123. func (c *Cluster) IsNodeAlive(node_id string) bool {
  124. node_status, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, node_id)
  125. if err != nil {
  126. return false
  127. }
  128. return time.Since(time.Unix(node_status.LastPingAt, 0)) < NODE_DISCONNECTED_TIMEOUT
  129. }