| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 | package clusterimport (	"errors"	"fmt"	"net/http"	"net/url"	"sort"	"time"	"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"	"github.com/langgenius/dify-plugin-daemon/internal/utils/http_requests")func (c *Cluster) voteAddresses() error {	c.notifyVoting()	defer c.notifyVotingCompleted()	var total_errors error	add_error := func(err error) {		if err != nil {			if total_errors == nil {				total_errors = err			} else {				total_errors = errors.Join(total_errors, err)			}		}	}	// get all nodes status	nodes, err := cache.GetMap[node](CLUSTER_STATUS_HASH_MAP_KEY)	if err == cache.ErrNotFound {		return nil	}	for node_id, node_status := range nodes {		if node_id == c.id {			continue		}		// vote for ips		ips_voting := make(map[string]bool)		for _, addr := range node_status.Addresses {			// skip ips which have already been voted by current node in the last 5 minutes			for _, vote := range addr.Votes {				if vote.NodeID == c.id {					if time.Since(time.Unix(vote.VotedAt, 0)) < time.Minute*5 && !vote.Failed {						continue					} else if time.Since(time.Unix(vote.VotedAt, 0)) < time.Minute*30 && vote.Failed {						continue					}				}			}			ips_voting[addr.fullAddress()] = c.voteAddress(addr) == nil		}		// lock the node status		if err := c.LockNodeStatus(node_id); err != nil {			add_error(err)			c.UnlockNodeStatus(node_id)			continue		}		// get the node status again		node_status, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, node_id)		if err != nil {			add_error(err)			c.UnlockNodeStatus(node_id)			continue		}		// update the node status		for i, ip := range node_status.Addresses {			// update voting time			if success, ok := ips_voting[ip.fullAddress()]; ok {				// check if the ip has already voted				already_voted := false				for j, vote := range ip.Votes {					if vote.NodeID == c.id {						node_status.Addresses[i].Votes[j].VotedAt = time.Now().Unix()						node_status.Addresses[i].Votes[j].Failed = !success						already_voted = true						break					}				}				// add a new vote				if !already_voted {					node_status.Addresses[i].Votes = append(node_status.Addresses[i].Votes, vote{						NodeID:  c.id,						VotedAt: time.Now().Unix(),						Failed:  !success,					})				}			}		}		// sync the node status		if err := cache.SetMapOneField(CLUSTER_STATUS_HASH_MAP_KEY, node_id, node_status); err != nil {			add_error(err)		}		// unlock the node status		if err := c.UnlockNodeStatus(node_id); err != nil {			add_error(err)		}	}	return total_errors}func (c *Cluster) voteAddress(addr address) error {	type healthcheck struct {		Status string `json:"status"`	}	healthcheck_endpoint, err := url.JoinPath(fmt.Sprintf("http://%s:%d", addr.Ip, addr.Port), "health/check")	if err != nil {		return err	}	resp, err := http_requests.GetAndParse[healthcheck](		http.DefaultClient,		healthcheck_endpoint,		http_requests.HttpWriteTimeout(500),		http_requests.HttpReadTimeout(500),	)	if err != nil {		return err	}	if resp.Status != "ok" {		return errors.New("health check failed")	}	return nil}func (c *Cluster) SortIps(node_status node) []address {	// sort by votes	sort.Slice(node_status.Addresses, func(i, j int) bool {		return len(node_status.Addresses[i].Votes) > len(node_status.Addresses[j].Votes)	})	return node_status.Addresses}
 |