node.go 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228
  1. package cluster
  2. import (
  3. "errors"
  4. "net"
  5. "strings"
  6. "sync/atomic"
  7. "time"
  8. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
  10. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  11. "github.com/langgenius/dify-plugin-daemon/internal/utils/network"
  12. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  13. )
  14. // update the status of the node
  15. func (c *Cluster) updateNodeStatus() error {
  16. if err := c.LockNodeStatus(c.id); err != nil {
  17. return err
  18. }
  19. defer c.UnlockNodeStatus(c.id)
  20. // update the status of the node
  21. node_status, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, c.id)
  22. if err != nil {
  23. if err == cache.ErrNotFound {
  24. // try to get ips configs
  25. ips, err := network.FetchCurrentIps()
  26. if err != nil {
  27. return err
  28. }
  29. node_status = &node{
  30. Ips: parser.Map(func(from net.IP) ip {
  31. return ip{
  32. Address: from.String(),
  33. Votes: []vote{},
  34. }
  35. }, ips),
  36. }
  37. } else {
  38. return err
  39. }
  40. } else {
  41. ips, err := network.FetchCurrentIps()
  42. if err != nil {
  43. return err
  44. }
  45. // add new ip if not exist
  46. for _, _ip := range ips {
  47. found := false
  48. for _, node_ip := range node_status.Ips {
  49. if node_ip.Address == _ip.String() {
  50. found = true
  51. break
  52. }
  53. }
  54. if !found {
  55. node_status.Ips = append(node_status.Ips, ip{
  56. Address: _ip.String(),
  57. Votes: []vote{},
  58. })
  59. }
  60. }
  61. }
  62. // refresh the last ping time
  63. node_status.LastPingAt = time.Now().Unix()
  64. // update the status of the node
  65. if err := cache.SetMapOneField(CLUSTER_STATUS_HASH_MAP_KEY, c.id, node_status); err != nil {
  66. return err
  67. }
  68. // get all the nodes
  69. nodes, err := c.GetNodes()
  70. if err != nil {
  71. return err
  72. }
  73. // update self nodes map
  74. c.node_lock.Lock()
  75. defer c.node_lock.Unlock()
  76. for node_id, node := range nodes {
  77. c.nodes.Clear()
  78. c.nodes.Store(node_id, node)
  79. }
  80. return nil
  81. }
  82. func (c *Cluster) GetNodes() (map[string]node, error) {
  83. nodes, err := cache.GetMap[node](CLUSTER_STATUS_HASH_MAP_KEY)
  84. if err != nil {
  85. return nil, err
  86. }
  87. for node_id, node := range nodes {
  88. // filter out the disconnected nodes
  89. if !node.available() {
  90. delete(nodes, node_id)
  91. }
  92. }
  93. return nodes, nil
  94. }
  95. // FetchPluginAvailableNodes fetches the available nodes of the given plugin
  96. func (c *Cluster) FetchPluginAvailableNodes(hashed_plugin_id string) ([]string, error) {
  97. states, err := cache.ScanMap[entities.PluginRuntimeState](
  98. PLUGIN_STATE_MAP_KEY, c.getScanPluginsByIdKey(hashed_plugin_id),
  99. )
  100. if err != nil {
  101. return nil, err
  102. }
  103. nodes := make([]string, 0)
  104. for key := range states {
  105. node_id, _, err := c.splitNodePluginJoin(key)
  106. if err != nil {
  107. continue
  108. }
  109. if c.nodes.Exits(node_id) {
  110. nodes = append(nodes, node_id)
  111. }
  112. }
  113. return nodes, nil
  114. }
  115. func (c *Cluster) IsMaster() bool {
  116. return c.i_am_master
  117. }
  118. func (c *Cluster) IsNodeAlive(node_id string) bool {
  119. node_status, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, node_id)
  120. if err != nil {
  121. return false
  122. }
  123. return node_status.available()
  124. }
  125. // gc the nodes has already deactivated
  126. func (c *Cluster) autoGCNodes() error {
  127. if atomic.LoadInt32(&c.is_in_auto_gc_nodes) == 1 {
  128. return nil
  129. }
  130. defer atomic.StoreInt32(&c.is_in_auto_gc_nodes, 0)
  131. var total_errors error
  132. add_error := func(err error) {
  133. if err != nil {
  134. if total_errors == nil {
  135. total_errors = err
  136. } else {
  137. total_errors = errors.Join(total_errors, err)
  138. }
  139. }
  140. }
  141. // get all nodes status
  142. nodes, err := cache.GetMap[node](CLUSTER_STATUS_HASH_MAP_KEY)
  143. if err == cache.ErrNotFound {
  144. return nil
  145. }
  146. for node_id, node_status := range nodes {
  147. // delete the node if it is disconnected
  148. if !node_status.available() {
  149. // gc the node
  150. if err := c.gcNode(node_id); err != nil {
  151. add_error(err)
  152. continue
  153. }
  154. }
  155. }
  156. return total_errors
  157. }
  158. // remove the resource associated with the node
  159. func (c *Cluster) gcNode(node_id string) error {
  160. // remove all plugins associated with the node
  161. if err := c.forceGCNodePlugins(node_id); err != nil {
  162. return err
  163. }
  164. // remove the node from the cluster
  165. c.node_lock.Lock()
  166. c.nodes.Delete(node_id)
  167. c.node_lock.Unlock()
  168. if err := c.LockNodeStatus(node_id); err != nil {
  169. return err
  170. }
  171. defer c.UnlockNodeStatus(node_id)
  172. err := cache.DelMapField(CLUSTER_STATUS_HASH_MAP_KEY, node_id)
  173. if err != nil {
  174. return err
  175. } else {
  176. log.Info("node %s has been removed from the cluster due to being disconnected", node_id)
  177. }
  178. return nil
  179. }
  180. // remove self node from the cluster
  181. func (c *Cluster) removeSelfNode() error {
  182. return c.gcNode(c.id)
  183. }
  184. const (
  185. CLUSTER_UPDATE_NODE_STATUS_LOCK_PREFIX = "cluster-update-node-status-lock"
  186. )
  187. func (c *Cluster) LockNodeStatus(node_id string) error {
  188. key := strings.Join([]string{CLUSTER_UPDATE_NODE_STATUS_LOCK_PREFIX, node_id}, ":")
  189. return cache.Lock(key, time.Second*5, time.Second)
  190. }
  191. func (c *Cluster) UnlockNodeStatus(node_id string) error {
  192. key := strings.Join([]string{CLUSTER_UPDATE_NODE_STATUS_LOCK_PREFIX, node_id}, ":")
  193. return cache.Unlock(key)
  194. }