node.go 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. package cluster
  2. import (
  3. "errors"
  4. "net"
  5. "strings"
  6. "sync/atomic"
  7. "time"
  8. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
  10. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  11. "github.com/langgenius/dify-plugin-daemon/internal/utils/network"
  12. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  13. )
  14. // update the status of the node
  15. func (c *Cluster) updateNodeStatus() error {
  16. c.notifyNodeUpdate()
  17. defer c.notifyNodeUpdateCompleted()
  18. if err := c.LockNodeStatus(c.id); err != nil {
  19. return err
  20. }
  21. defer c.UnlockNodeStatus(c.id)
  22. // update the status of the node
  23. node_status, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, c.id)
  24. if err != nil {
  25. if err == cache.ErrNotFound {
  26. // try to get ips configs
  27. ips, err := network.FetchCurrentIps()
  28. if err != nil {
  29. return err
  30. }
  31. node_status = &node{
  32. Ips: parser.Map(func(from net.IP) ip {
  33. return ip{
  34. Address: from.String(),
  35. Votes: []vote{},
  36. }
  37. }, ips),
  38. }
  39. } else {
  40. return err
  41. }
  42. } else {
  43. ips, err := network.FetchCurrentIps()
  44. if err != nil {
  45. return err
  46. }
  47. // add new ip if not exist
  48. for _, _ip := range ips {
  49. found := false
  50. for _, node_ip := range node_status.Ips {
  51. if node_ip.Address == _ip.String() {
  52. found = true
  53. break
  54. }
  55. }
  56. if !found {
  57. node_status.Ips = append(node_status.Ips, ip{
  58. Address: _ip.String(),
  59. Votes: []vote{},
  60. })
  61. }
  62. }
  63. }
  64. // refresh the last ping time
  65. node_status.LastPingAt = time.Now().Unix()
  66. // update the status of the node
  67. if err := cache.SetMapOneField(CLUSTER_STATUS_HASH_MAP_KEY, c.id, node_status); err != nil {
  68. return err
  69. }
  70. // get all the nodes
  71. nodes, err := c.GetNodes()
  72. if err != nil {
  73. return err
  74. }
  75. // update self nodes map
  76. c.nodes.Clear()
  77. for node_id, node := range nodes {
  78. c.nodes.Store(node_id, node)
  79. }
  80. return nil
  81. }
  82. func (c *Cluster) GetNodes() (map[string]node, error) {
  83. nodes, err := cache.GetMap[node](CLUSTER_STATUS_HASH_MAP_KEY)
  84. if err != nil {
  85. return nil, err
  86. }
  87. for node_id, node := range nodes {
  88. // filter out the disconnected nodes
  89. if !node.available() {
  90. delete(nodes, node_id)
  91. }
  92. }
  93. return nodes, nil
  94. }
  95. // FetchPluginAvailableNodesByHashedId fetches the available nodes of the given plugin
  96. func (c *Cluster) FetchPluginAvailableNodesByHashedId(hashed_plugin_id string) ([]string, error) {
  97. states, err := cache.ScanMap[entities.PluginRuntimeState](
  98. PLUGIN_STATE_MAP_KEY, c.getScanPluginsByIdKey(hashed_plugin_id),
  99. )
  100. if err != nil {
  101. return nil, err
  102. }
  103. nodes := make([]string, 0)
  104. for key := range states {
  105. node_id, _, err := c.splitNodePluginJoin(key)
  106. if err != nil {
  107. continue
  108. }
  109. if c.nodes.Exits(node_id) {
  110. nodes = append(nodes, node_id)
  111. }
  112. }
  113. return nodes, nil
  114. }
  115. func (c *Cluster) FetchPluginAvailableNodesById(plugin_id string) ([]string, error) {
  116. hashed_plugin_id := entities.HashedIdentity(plugin_id)
  117. return c.FetchPluginAvailableNodesByHashedId(hashed_plugin_id)
  118. }
  119. func (c *Cluster) IsMaster() bool {
  120. return c.i_am_master
  121. }
  122. func (c *Cluster) IsNodeAlive(node_id string) bool {
  123. node_status, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, node_id)
  124. if err != nil {
  125. return false
  126. }
  127. return node_status.available()
  128. }
  129. // gc the nodes has already deactivated
  130. func (c *Cluster) autoGCNodes() error {
  131. if atomic.LoadInt32(&c.is_in_auto_gc_nodes) == 1 {
  132. return nil
  133. }
  134. defer atomic.StoreInt32(&c.is_in_auto_gc_nodes, 0)
  135. var total_errors error
  136. add_error := func(err error) {
  137. if err != nil {
  138. if total_errors == nil {
  139. total_errors = err
  140. } else {
  141. total_errors = errors.Join(total_errors, err)
  142. }
  143. }
  144. }
  145. // get all nodes status
  146. nodes, err := cache.GetMap[node](CLUSTER_STATUS_HASH_MAP_KEY)
  147. if err == cache.ErrNotFound {
  148. return nil
  149. }
  150. for node_id, node_status := range nodes {
  151. // delete the node if it is disconnected
  152. if !node_status.available() {
  153. // gc the node
  154. if err := c.gcNode(node_id); err != nil {
  155. add_error(err)
  156. continue
  157. }
  158. }
  159. }
  160. return total_errors
  161. }
  162. // remove the resource associated with the node
  163. func (c *Cluster) gcNode(node_id string) error {
  164. // remove all plugins associated with the node
  165. if err := c.forceGCNodePlugins(node_id); err != nil {
  166. return err
  167. }
  168. // remove the node from the cluster
  169. c.nodes.Delete(node_id)
  170. if err := c.LockNodeStatus(node_id); err != nil {
  171. return err
  172. }
  173. defer c.UnlockNodeStatus(node_id)
  174. err := cache.DelMapField(CLUSTER_STATUS_HASH_MAP_KEY, node_id)
  175. if err != nil {
  176. return err
  177. } else {
  178. log.Info("node %s has been removed from the cluster due to being disconnected", node_id)
  179. }
  180. return nil
  181. }
  182. // remove self node from the cluster
  183. func (c *Cluster) removeSelfNode() error {
  184. return c.gcNode(c.id)
  185. }
  186. const (
  187. CLUSTER_UPDATE_NODE_STATUS_LOCK_PREFIX = "cluster-update-node-status-lock"
  188. )
  189. func (c *Cluster) LockNodeStatus(node_id string) error {
  190. key := strings.Join([]string{CLUSTER_UPDATE_NODE_STATUS_LOCK_PREFIX, node_id}, ":")
  191. return cache.Lock(key, time.Second*5, time.Second)
  192. }
  193. func (c *Cluster) UnlockNodeStatus(node_id string) error {
  194. key := strings.Join([]string{CLUSTER_UPDATE_NODE_STATUS_LOCK_PREFIX, node_id}, ":")
  195. return cache.Unlock(key)
  196. }