vote.go 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146
  1. package cluster
  2. import (
  3. "errors"
  4. "fmt"
  5. "net/http"
  6. "net/url"
  7. "sort"
  8. "time"
  9. "github.com/langgenius/dify-plugin-daemon/internal/cluster/cluster_id"
  10. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
  11. "github.com/langgenius/dify-plugin-daemon/internal/utils/http_requests"
  12. )
  13. func (c *Cluster) voteIps() error {
  14. var total_errors error
  15. add_error := func(err error) {
  16. if err != nil {
  17. if total_errors == nil {
  18. total_errors = err
  19. } else {
  20. total_errors = errors.Join(total_errors, err)
  21. }
  22. }
  23. }
  24. // get all nodes status
  25. nodes, err := cache.GetMap[node](CLUSTER_STATUS_HASH_MAP_KEY)
  26. if err == cache.ErrNotFound {
  27. return nil
  28. }
  29. for node_id, node_status := range nodes {
  30. if node_id == cluster_id.GetInstanceID() {
  31. continue
  32. }
  33. // vote for ips
  34. ips_voting := make(map[string]bool)
  35. for _, ip := range node_status.Ips {
  36. // skip ips which have already been voted by current node in the last 5 minutes
  37. for _, vote := range ip.Votes {
  38. if vote.NodeID == cluster_id.GetInstanceID() {
  39. if time.Since(time.Unix(vote.VotedAt, 0)) < time.Minute*5 && !vote.Failed {
  40. continue
  41. } else if time.Since(time.Unix(vote.VotedAt, 0)) < time.Minute*30 && vote.Failed {
  42. continue
  43. }
  44. }
  45. }
  46. ips_voting[ip.Address] = c.voteIp(ip) == nil
  47. }
  48. // lock the node status
  49. if err := c.LockNodeStatus(node_id); err != nil {
  50. add_error(err)
  51. c.UnlockNodeStatus(node_id)
  52. continue
  53. }
  54. // get the node status again
  55. node_status, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, node_id)
  56. if err != nil {
  57. add_error(err)
  58. c.UnlockNodeStatus(node_id)
  59. continue
  60. }
  61. // update the node status
  62. for i, ip := range node_status.Ips {
  63. // update voting time
  64. if success, ok := ips_voting[ip.Address]; ok {
  65. // check if the ip has already voted
  66. already_voted := false
  67. for j, vote := range ip.Votes {
  68. if vote.NodeID == cluster_id.GetInstanceID() {
  69. node_status.Ips[i].Votes[j].VotedAt = time.Now().Unix()
  70. node_status.Ips[i].Votes[j].Failed = !success
  71. already_voted = true
  72. break
  73. }
  74. }
  75. // add a new vote
  76. if !already_voted {
  77. node_status.Ips[i].Votes = append(node_status.Ips[i].Votes, vote{
  78. NodeID: cluster_id.GetInstanceID(),
  79. VotedAt: time.Now().Unix(),
  80. Failed: !success,
  81. })
  82. }
  83. }
  84. }
  85. // sync the node status
  86. if err := cache.SetMapOneField(CLUSTER_STATUS_HASH_MAP_KEY, node_id, node_status); err != nil {
  87. add_error(err)
  88. }
  89. // unlock the node status
  90. if err := c.UnlockNodeStatus(node_id); err != nil {
  91. add_error(err)
  92. }
  93. }
  94. return total_errors
  95. }
  96. func (c *Cluster) voteIp(ip ip) error {
  97. type healthcheck struct {
  98. Status string `json:"status"`
  99. }
  100. healthcheck_endpoint, err := url.JoinPath(fmt.Sprintf("http://%s:%d", ip.Address, c.port), "health/check")
  101. if err != nil {
  102. return err
  103. }
  104. resp, err := http_requests.GetAndParse[healthcheck](
  105. http.DefaultClient,
  106. healthcheck_endpoint,
  107. http_requests.HttpWriteTimeout(500),
  108. http_requests.HttpReadTimeout(500),
  109. )
  110. if err != nil {
  111. return err
  112. }
  113. if resp.Status != "ok" {
  114. return errors.New("health check failed")
  115. }
  116. return nil
  117. }
  118. func (c *Cluster) SortIps(node_status node) []ip {
  119. // sort by votes
  120. sort.Slice(node_status.Ips, func(i, j int) bool {
  121. return len(node_status.Ips[i].Votes) > len(node_status.Ips[j].Votes)
  122. })
  123. return node_status.Ips
  124. }