node.go 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. package cluster
  2. import (
  3. "errors"
  4. "net"
  5. "strings"
  6. "sync/atomic"
  7. "time"
  8. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
  10. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  11. "github.com/langgenius/dify-plugin-daemon/internal/utils/network"
  12. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  13. )
  14. // update the status of the node
  15. func (c *Cluster) updateNodeStatus() error {
  16. c.notifyNodeUpdate()
  17. defer c.notifyNodeUpdateCompleted()
  18. if err := c.LockNodeStatus(c.id); err != nil {
  19. return err
  20. }
  21. defer c.UnlockNodeStatus(c.id)
  22. // update the status of the node
  23. node_status, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, c.id)
  24. if err != nil {
  25. if err == cache.ErrNotFound {
  26. // try to get ips configs
  27. ips, err := network.FetchCurrentIps()
  28. if err != nil {
  29. return err
  30. }
  31. node_status = &node{
  32. Addresses: parser.Map(func(from net.IP) address {
  33. return address{
  34. Ip: from.String(),
  35. Port: c.port,
  36. Votes: []vote{},
  37. }
  38. }, ips),
  39. }
  40. } else {
  41. return err
  42. }
  43. } else {
  44. ips, err := network.FetchCurrentIps()
  45. if err != nil {
  46. return err
  47. }
  48. // add new ip if not exist
  49. for _, _ip := range ips {
  50. found := false
  51. for _, node_ip := range node_status.Addresses {
  52. if node_ip.Ip == _ip.String() {
  53. found = true
  54. break
  55. }
  56. }
  57. if !found {
  58. node_status.Addresses = append(node_status.Addresses, address{
  59. Ip: _ip.String(),
  60. Port: c.port,
  61. Votes: []vote{},
  62. })
  63. }
  64. }
  65. }
  66. // refresh the last ping time
  67. node_status.LastPingAt = time.Now().Unix()
  68. // update the status of the node
  69. if err := cache.SetMapOneField(CLUSTER_STATUS_HASH_MAP_KEY, c.id, node_status); err != nil {
  70. return err
  71. }
  72. // get all the nodes
  73. nodes, err := c.GetNodes()
  74. if err != nil {
  75. return err
  76. }
  77. // update self nodes map
  78. c.nodes.Clear()
  79. for node_id, node := range nodes {
  80. c.nodes.Store(node_id, node)
  81. }
  82. return nil
  83. }
  84. func (c *Cluster) GetNodes() (map[string]node, error) {
  85. nodes, err := cache.GetMap[node](CLUSTER_STATUS_HASH_MAP_KEY)
  86. if err != nil {
  87. return nil, err
  88. }
  89. for node_id, node := range nodes {
  90. // filter out the disconnected nodes
  91. if !node.available() {
  92. delete(nodes, node_id)
  93. }
  94. }
  95. return nodes, nil
  96. }
  97. // FetchPluginAvailableNodesByHashedId fetches the available nodes of the given plugin
  98. func (c *Cluster) FetchPluginAvailableNodesByHashedId(hashed_plugin_id string) ([]string, error) {
  99. states, err := cache.ScanMap[plugin_entities.PluginRuntimeState](
  100. PLUGIN_STATE_MAP_KEY, c.getScanPluginsByIdKey(hashed_plugin_id),
  101. )
  102. if err != nil {
  103. return nil, err
  104. }
  105. nodes := make([]string, 0)
  106. for key := range states {
  107. node_id, _, err := c.splitNodePluginJoin(key)
  108. if err != nil {
  109. continue
  110. }
  111. if c.nodes.Exists(node_id) {
  112. nodes = append(nodes, node_id)
  113. }
  114. }
  115. return nodes, nil
  116. }
  117. func (c *Cluster) FetchPluginAvailableNodesById(plugin_id string) ([]string, error) {
  118. hashed_plugin_id := plugin_entities.HashedIdentity(plugin_id)
  119. return c.FetchPluginAvailableNodesByHashedId(hashed_plugin_id)
  120. }
  121. func (c *Cluster) IsMaster() bool {
  122. return c.i_am_master
  123. }
  124. func (c *Cluster) IsNodeAlive(node_id string) bool {
  125. node_status, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, node_id)
  126. if err != nil {
  127. return false
  128. }
  129. return node_status.available()
  130. }
  131. // gc the nodes has already deactivated
  132. func (c *Cluster) autoGCNodes() error {
  133. if atomic.LoadInt32(&c.is_in_auto_gc_nodes) == 1 {
  134. return nil
  135. }
  136. defer atomic.StoreInt32(&c.is_in_auto_gc_nodes, 0)
  137. var total_errors error
  138. add_error := func(err error) {
  139. if err != nil {
  140. if total_errors == nil {
  141. total_errors = err
  142. } else {
  143. total_errors = errors.Join(total_errors, err)
  144. }
  145. }
  146. }
  147. // get all nodes status
  148. nodes, err := cache.GetMap[node](CLUSTER_STATUS_HASH_MAP_KEY)
  149. if err == cache.ErrNotFound {
  150. return nil
  151. }
  152. for node_id, node_status := range nodes {
  153. // delete the node if it is disconnected
  154. if !node_status.available() {
  155. // gc the node
  156. if err := c.gcNode(node_id); err != nil {
  157. add_error(err)
  158. continue
  159. }
  160. }
  161. }
  162. return total_errors
  163. }
  164. // remove the resource associated with the node
  165. func (c *Cluster) gcNode(node_id string) error {
  166. // remove all plugins associated with the node
  167. if err := c.forceGCNodePlugins(node_id); err != nil {
  168. return err
  169. }
  170. // remove the node from the cluster
  171. c.nodes.Delete(node_id)
  172. if err := c.LockNodeStatus(node_id); err != nil {
  173. return err
  174. }
  175. defer c.UnlockNodeStatus(node_id)
  176. err := cache.DelMapField(CLUSTER_STATUS_HASH_MAP_KEY, node_id)
  177. if err != nil {
  178. return err
  179. } else {
  180. log.Info("node %s has been removed from the cluster due to being disconnected", node_id)
  181. }
  182. return nil
  183. }
  184. // remove self node from the cluster
  185. func (c *Cluster) removeSelfNode() error {
  186. return c.gcNode(c.id)
  187. }
  188. const (
  189. CLUSTER_UPDATE_NODE_STATUS_LOCK_PREFIX = "cluster-update-node-status-lock"
  190. )
  191. func (c *Cluster) LockNodeStatus(node_id string) error {
  192. key := strings.Join([]string{CLUSTER_UPDATE_NODE_STATUS_LOCK_PREFIX, node_id}, ":")
  193. return cache.Lock(key, time.Second*5, time.Second)
  194. }
  195. func (c *Cluster) UnlockNodeStatus(node_id string) error {
  196. key := strings.Join([]string{CLUSTER_UPDATE_NODE_STATUS_LOCK_PREFIX, node_id}, ":")
  197. return cache.Unlock(key)
  198. }