node.go 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. package cluster
  2. import (
  3. "errors"
  4. "net"
  5. "strings"
  6. "sync/atomic"
  7. "time"
  8. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
  10. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  11. "github.com/langgenius/dify-plugin-daemon/internal/utils/network"
  12. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  13. )
  14. // update the status of the node
  15. func (c *Cluster) updateNodeStatus() error {
  16. c.notifyNodeUpdate()
  17. defer c.notifyNodeUpdateCompleted()
  18. if err := c.LockNodeStatus(c.id); err != nil {
  19. return err
  20. }
  21. defer c.UnlockNodeStatus(c.id)
  22. // update the status of the node
  23. nodeStatus, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, c.id)
  24. if err != nil {
  25. if err == cache.ErrNotFound {
  26. // try to get ips configs
  27. ips, err := network.FetchCurrentIps()
  28. if err != nil {
  29. return err
  30. }
  31. nodeStatus = &node{
  32. Addresses: parser.Map(func(from net.IP) address {
  33. return address{
  34. Ip: from.String(),
  35. Port: c.port,
  36. Votes: []vote{},
  37. }
  38. }, ips),
  39. }
  40. } else {
  41. return err
  42. }
  43. } else {
  44. ips, err := network.FetchCurrentIps()
  45. if err != nil {
  46. return err
  47. }
  48. // add new ip if not exist
  49. for _, _ip := range ips {
  50. found := false
  51. for _, node_ip := range nodeStatus.Addresses {
  52. if node_ip.Ip == _ip.String() {
  53. found = true
  54. break
  55. }
  56. }
  57. if !found {
  58. nodeStatus.Addresses = append(nodeStatus.Addresses, address{
  59. Ip: _ip.String(),
  60. Port: c.port,
  61. Votes: []vote{},
  62. })
  63. }
  64. }
  65. }
  66. // refresh the last ping time
  67. nodeStatus.LastPingAt = time.Now().Unix()
  68. // update the status of the node
  69. if err := cache.SetMapOneField(CLUSTER_STATUS_HASH_MAP_KEY, c.id, nodeStatus); err != nil {
  70. return err
  71. }
  72. // get all the nodes
  73. nodes, err := c.GetNodes()
  74. if err != nil {
  75. return err
  76. }
  77. // update self nodes map
  78. c.nodes.Clear()
  79. for nodeId, node := range nodes {
  80. c.nodes.Store(nodeId, node)
  81. }
  82. return nil
  83. }
  84. func (c *Cluster) isNodeAvailable(node *node) bool {
  85. return time.Since(time.Unix(node.LastPingAt, 0)) < c.nodeDisconnectedTimeout
  86. }
  87. func (c *Cluster) GetNodes() (map[string]node, error) {
  88. nodes, err := cache.GetMap[node](CLUSTER_STATUS_HASH_MAP_KEY)
  89. if err != nil {
  90. return nil, err
  91. }
  92. for nodeId, node := range nodes {
  93. // filter out the disconnected nodes
  94. if !c.isNodeAvailable(&node) {
  95. delete(nodes, nodeId)
  96. }
  97. }
  98. return nodes, nil
  99. }
  100. // FetchPluginAvailableNodesByHashedId fetches the available nodes of the given plugin
  101. func (c *Cluster) FetchPluginAvailableNodesByHashedId(hashedPluginId string) ([]string, error) {
  102. states, err := cache.ScanMap[plugin_entities.PluginRuntimeState](
  103. PLUGIN_STATE_MAP_KEY, c.getScanPluginsByIdKey(hashedPluginId),
  104. )
  105. if err != nil {
  106. return nil, err
  107. }
  108. nodes := make([]string, 0)
  109. for key := range states {
  110. nodeId, _, err := c.splitNodePluginJoin(key)
  111. if err != nil {
  112. continue
  113. }
  114. if c.nodes.Exists(nodeId) {
  115. nodes = append(nodes, nodeId)
  116. }
  117. }
  118. return nodes, nil
  119. }
  120. func (c *Cluster) FetchPluginAvailableNodesById(plugin_id string) ([]string, error) {
  121. hashedPluginId := plugin_entities.HashedIdentity(plugin_id)
  122. return c.FetchPluginAvailableNodesByHashedId(hashedPluginId)
  123. }
  124. func (c *Cluster) IsMaster() bool {
  125. return c.iAmMaster
  126. }
  127. func (c *Cluster) IsNodeAlive(nodeId string) bool {
  128. nodeStatus, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, nodeId)
  129. if err != nil {
  130. return false
  131. }
  132. return c.isNodeAvailable(nodeStatus)
  133. }
  134. // gc the nodes has already deactivated
  135. func (c *Cluster) autoGCNodes() error {
  136. if atomic.LoadInt32(&c.isInAutoGcNodes) == 1 {
  137. return nil
  138. }
  139. defer atomic.StoreInt32(&c.isInAutoGcNodes, 0)
  140. var totalErrors error
  141. addError := func(err error) {
  142. if err != nil {
  143. if totalErrors == nil {
  144. totalErrors = err
  145. } else {
  146. totalErrors = errors.Join(totalErrors, err)
  147. }
  148. }
  149. }
  150. // get all nodes status
  151. nodes, err := cache.GetMap[node](CLUSTER_STATUS_HASH_MAP_KEY)
  152. if err == cache.ErrNotFound {
  153. return nil
  154. }
  155. for nodeId, nodeStatus := range nodes {
  156. // delete the node if it is disconnected
  157. if !c.isNodeAvailable(&nodeStatus) {
  158. // gc the node
  159. if err := c.gcNode(nodeId); err != nil {
  160. addError(err)
  161. continue
  162. }
  163. }
  164. }
  165. return totalErrors
  166. }
  167. // remove the resource associated with the node
  168. func (c *Cluster) gcNode(nodeId string) error {
  169. // remove all plugins associated with the node
  170. if err := c.forceGCNodePlugins(nodeId); err != nil {
  171. return err
  172. }
  173. // remove the node from the cluster
  174. c.nodes.Delete(nodeId)
  175. if err := c.LockNodeStatus(nodeId); err != nil {
  176. return err
  177. }
  178. defer c.UnlockNodeStatus(nodeId)
  179. err := cache.DelMapField(CLUSTER_STATUS_HASH_MAP_KEY, nodeId)
  180. if err != nil {
  181. return err
  182. } else {
  183. log.Info("node %s has been removed from the cluster due to being disconnected", nodeId)
  184. }
  185. return nil
  186. }
  187. // remove self node from the cluster
  188. func (c *Cluster) removeSelfNode() error {
  189. return c.gcNode(c.id)
  190. }
  191. const (
  192. CLUSTER_UPDATE_NODE_STATUS_LOCK_PREFIX = "cluster-update-node-status-lock"
  193. )
  194. func (c *Cluster) LockNodeStatus(nodeId string) error {
  195. key := strings.Join([]string{CLUSTER_UPDATE_NODE_STATUS_LOCK_PREFIX, nodeId}, ":")
  196. return cache.Lock(key, time.Second*5, time.Second)
  197. }
  198. func (c *Cluster) UnlockNodeStatus(nodeId string) error {
  199. key := strings.Join([]string{CLUSTER_UPDATE_NODE_STATUS_LOCK_PREFIX, nodeId}, ":")
  200. return cache.Unlock(key)
  201. }