vote.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. package cluster
  2. import (
  3. "errors"
  4. "fmt"
  5. "net/http"
  6. "net/url"
  7. "sort"
  8. "time"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
  10. "github.com/langgenius/dify-plugin-daemon/internal/utils/http_requests"
  11. )
  12. func (c *Cluster) voteAddresses() error {
  13. c.notifyVoting()
  14. defer c.notifyVotingCompleted()
  15. var totalErrors error
  16. addError := func(err error) {
  17. if err != nil {
  18. if totalErrors == nil {
  19. totalErrors = err
  20. } else {
  21. totalErrors = errors.Join(totalErrors, err)
  22. }
  23. }
  24. }
  25. // get all nodes status
  26. nodes, err := cache.GetMap[node](CLUSTER_STATUS_HASH_MAP_KEY)
  27. if err == cache.ErrNotFound {
  28. return nil
  29. }
  30. for node_id, nodeStatus := range nodes {
  31. if node_id == c.id {
  32. continue
  33. }
  34. // vote for ips
  35. ipsVoting := make(map[string]bool)
  36. for _, addr := range nodeStatus.Addresses {
  37. // skip ips which have already been voted by current node in the last 5 minutes
  38. for _, vote := range addr.Votes {
  39. if vote.NodeID == c.id {
  40. if time.Since(time.Unix(vote.VotedAt, 0)) < time.Minute*5 && !vote.Failed {
  41. continue
  42. } else if time.Since(time.Unix(vote.VotedAt, 0)) < time.Minute*30 && vote.Failed {
  43. continue
  44. }
  45. }
  46. }
  47. ipsVoting[addr.fullAddress()] = c.voteAddress(addr) == nil
  48. }
  49. // lock the node status
  50. if err := c.LockNodeStatus(node_id); err != nil {
  51. addError(err)
  52. c.UnlockNodeStatus(node_id)
  53. continue
  54. }
  55. // get the node status again
  56. nodeStatus, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, node_id)
  57. if err != nil {
  58. addError(err)
  59. c.UnlockNodeStatus(node_id)
  60. continue
  61. }
  62. // update the node status
  63. for i, ip := range nodeStatus.Addresses {
  64. // update voting time
  65. if success, ok := ipsVoting[ip.fullAddress()]; ok {
  66. // check if the ip has already voted
  67. alreadyVoted := false
  68. for j, vote := range ip.Votes {
  69. if vote.NodeID == c.id {
  70. nodeStatus.Addresses[i].Votes[j].VotedAt = time.Now().Unix()
  71. nodeStatus.Addresses[i].Votes[j].Failed = !success
  72. alreadyVoted = true
  73. break
  74. }
  75. }
  76. // add a new vote
  77. if !alreadyVoted {
  78. nodeStatus.Addresses[i].Votes = append(nodeStatus.Addresses[i].Votes, vote{
  79. NodeID: c.id,
  80. VotedAt: time.Now().Unix(),
  81. Failed: !success,
  82. })
  83. }
  84. }
  85. }
  86. // sync the node status
  87. if err := cache.SetMapOneField(CLUSTER_STATUS_HASH_MAP_KEY, node_id, nodeStatus); err != nil {
  88. addError(err)
  89. }
  90. // unlock the node status
  91. if err := c.UnlockNodeStatus(node_id); err != nil {
  92. addError(err)
  93. }
  94. }
  95. return totalErrors
  96. }
  97. func (c *Cluster) voteAddress(addr address) error {
  98. type healthcheck struct {
  99. Status string `json:"status"`
  100. }
  101. healthcheckEndpoint, err := url.JoinPath(fmt.Sprintf("http://%s:%d", addr.Ip, addr.Port), "health/check")
  102. if err != nil {
  103. return err
  104. }
  105. resp, err := http_requests.GetAndParse[healthcheck](
  106. http.DefaultClient,
  107. healthcheckEndpoint,
  108. http_requests.HttpWriteTimeout(500),
  109. http_requests.HttpReadTimeout(500),
  110. )
  111. if err != nil {
  112. return err
  113. }
  114. if resp.Status != "ok" {
  115. return errors.New("health check failed")
  116. }
  117. return nil
  118. }
  119. func (c *Cluster) SortIps(nodeStatus node) []address {
  120. // sort by votes
  121. sort.Slice(nodeStatus.Addresses, func(i, j int) bool {
  122. return len(nodeStatus.Addresses[i].Votes) > len(nodeStatus.Addresses[j].Votes)
  123. })
  124. return nodeStatus.Addresses
  125. }