123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- package cluster
- import (
- "errors"
- "fmt"
- "net/http"
- "net/url"
- "sort"
- "time"
- "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
- "github.com/langgenius/dify-plugin-daemon/internal/utils/http_requests"
- )
- func (c *Cluster) voteAddresses() error {
- c.notifyVoting()
- defer c.notifyVotingCompleted()
- var totalErrors error
- addError := func(err error) {
- if err != nil {
- if totalErrors == nil {
- totalErrors = err
- } else {
- totalErrors = errors.Join(totalErrors, err)
- }
- }
- }
- // get all nodes status
- nodes, err := cache.GetMap[node](CLUSTER_STATUS_HASH_MAP_KEY)
- if err == cache.ErrNotFound {
- return nil
- }
- for node_id, nodeStatus := range nodes {
- if node_id == c.id {
- continue
- }
- // vote for ips
- ipsVoting := make(map[string]bool)
- for _, addr := range nodeStatus.Addresses {
- // skip ips which have already been voted by current node in the last 5 minutes
- for _, vote := range addr.Votes {
- if vote.NodeID == c.id {
- if time.Since(time.Unix(vote.VotedAt, 0)) < time.Minute*5 && !vote.Failed {
- continue
- } else if time.Since(time.Unix(vote.VotedAt, 0)) < time.Minute*30 && vote.Failed {
- continue
- }
- }
- }
- ipsVoting[addr.fullAddress()] = c.voteAddress(addr) == nil
- }
- // lock the node status
- if err := c.LockNodeStatus(node_id); err != nil {
- addError(err)
- c.UnlockNodeStatus(node_id)
- continue
- }
- // get the node status again
- nodeStatus, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, node_id)
- if err != nil {
- addError(err)
- c.UnlockNodeStatus(node_id)
- continue
- }
- // update the node status
- for i, ip := range nodeStatus.Addresses {
- // update voting time
- if success, ok := ipsVoting[ip.fullAddress()]; ok {
- // check if the ip has already voted
- alreadyVoted := false
- for j, vote := range ip.Votes {
- if vote.NodeID == c.id {
- nodeStatus.Addresses[i].Votes[j].VotedAt = time.Now().Unix()
- nodeStatus.Addresses[i].Votes[j].Failed = !success
- alreadyVoted = true
- break
- }
- }
- // add a new vote
- if !alreadyVoted {
- nodeStatus.Addresses[i].Votes = append(nodeStatus.Addresses[i].Votes, vote{
- NodeID: c.id,
- VotedAt: time.Now().Unix(),
- Failed: !success,
- })
- }
- }
- }
- // sync the node status
- if err := cache.SetMapOneField(CLUSTER_STATUS_HASH_MAP_KEY, node_id, nodeStatus); err != nil {
- addError(err)
- }
- // unlock the node status
- if err := c.UnlockNodeStatus(node_id); err != nil {
- addError(err)
- }
- }
- return totalErrors
- }
- func (c *Cluster) voteAddress(addr address) error {
- type healthcheck struct {
- Status string `json:"status"`
- }
- healthcheckEndpoint, err := url.JoinPath(fmt.Sprintf("http://%s:%d", addr.Ip, addr.Port), "health/check")
- if err != nil {
- return err
- }
- resp, err := http_requests.GetAndParse[healthcheck](
- http.DefaultClient,
- healthcheckEndpoint,
- http_requests.HttpWriteTimeout(500),
- http_requests.HttpReadTimeout(500),
- )
- if err != nil {
- return err
- }
- if resp.Status != "ok" {
- return errors.New("health check failed")
- }
- return nil
- }
- func (c *Cluster) SortIps(nodeStatus node) []address {
- // sort by votes
- sort.Slice(nodeStatus.Addresses, func(i, j int) bool {
- return len(nodeStatus.Addresses[i].Votes) > len(nodeStatus.Addresses[j].Votes)
- })
- return nodeStatus.Addresses
- }
|