Pārlūkot izejas kodu

refactor: address from ip to ip:port

Yeuoly 11 mēneši atpakaļ
vecāks
revīzija
e844f96b4a

+ 1 - 1
internal/cluster/cluster.go

@@ -16,7 +16,7 @@ type Cluster struct {
 	// i_am_master is the flag to indicate whether the current node is the master node
 	i_am_master bool
 
-	// port is the health check port of the cluster
+	// main http port of the current node
 	port uint16
 
 	// plugins stores all the plugin life time of the current node

+ 14 - 6
internal/cluster/entities.go

@@ -1,10 +1,18 @@
 package cluster
 
-import "time"
+import (
+	"fmt"
+	"time"
+)
 
-type ip struct {
-	Address string `json:"address"`
-	Votes   []vote `json:"vote"`
+type address struct {
+	Ip    string `json:"ip"`
+	Port  uint16 `json:"port"`
+	Votes []vote `json:"vote"`
+}
+
+func (a *address) fullAddress() string {
+	return fmt.Sprintf("%s:%d", a.Ip, a.Port)
 }
 
 type vote struct {
@@ -14,8 +22,8 @@ type vote struct {
 }
 
 type node struct {
-	Ips        []ip  `json:"ips"`
-	LastPingAt int64 `json:"last_ping_at"`
+	Addresses  []address `json:"ips"`
+	LastPingAt int64     `json:"last_ping_at"`
 }
 
 func (c *node) available() bool {

+ 3 - 3
internal/cluster/lifetime.go

@@ -90,7 +90,7 @@ func (c *Cluster) clusterLifetime() {
 			log.Error("failed to publish the new node event: %s", err.Error())
 		}
 
-		if err := c.voteIps(); err != nil {
+		if err := c.voteAddresses(); err != nil {
 			log.Error("failed to vote the ips of the nodes: %s", err.Error())
 		}
 	})
@@ -137,13 +137,13 @@ func (c *Cluster) clusterLifetime() {
 				c.notifyMasterGCCompleted()
 			}
 		case <-node_vote_ticker.C:
-			if err := c.voteIps(); err != nil {
+			if err := c.voteAddresses(); err != nil {
 				log.Error("failed to vote the ips of the nodes: %s", err.Error())
 			}
 		case _, ok := <-new_node_chan:
 			if ok {
 				// vote for the new node
-				if err := c.voteIps(); err != nil {
+				if err := c.voteAddresses(); err != nil {
 					log.Error("failed to vote the ips of the nodes: %s", err.Error())
 				}
 			}

+ 11 - 9
internal/cluster/node.go

@@ -34,10 +34,11 @@ func (c *Cluster) updateNodeStatus() error {
 				return err
 			}
 			node_status = &node{
-				Ips: parser.Map(func(from net.IP) ip {
-					return ip{
-						Address: from.String(),
-						Votes:   []vote{},
+				Addresses: parser.Map(func(from net.IP) address {
+					return address{
+						Ip:    from.String(),
+						Port:  c.port,
+						Votes: []vote{},
 					}
 				}, ips),
 			}
@@ -52,16 +53,17 @@ func (c *Cluster) updateNodeStatus() error {
 		// add new ip if not exist
 		for _, _ip := range ips {
 			found := false
-			for _, node_ip := range node_status.Ips {
-				if node_ip.Address == _ip.String() {
+			for _, node_ip := range node_status.Addresses {
+				if node_ip.Ip == _ip.String() {
 					found = true
 					break
 				}
 			}
 			if !found {
-				node_status.Ips = append(node_status.Ips, ip{
-					Address: _ip.String(),
-					Votes:   []vote{},
+				node_status.Addresses = append(node_status.Addresses, address{
+					Ip:    _ip.String(),
+					Port:  c.port,
+					Votes: []vote{},
 				})
 			}
 		}

+ 1 - 2
internal/cluster/redirect.go

@@ -4,7 +4,6 @@ import (
 	"errors"
 	"io"
 	"net/http"
-	"strconv"
 )
 
 // RedirectRequest redirects the request to the specified node
@@ -26,7 +25,7 @@ func (c *Cluster) RedirectRequest(
 	// create a new request
 	redirected_request, err := http.NewRequest(
 		request.Method,
-		"http://"+ip.Address+":"+strconv.FormatUint(uint64(c.port), 10)+request.URL.Path,
+		"http://"+ip.fullAddress()+request.URL.Path,
 		request.Body,
 	)
 

+ 15 - 15
internal/cluster/vote.go

@@ -12,7 +12,7 @@ import (
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/http_requests"
 )
 
-func (c *Cluster) voteIps() error {
+func (c *Cluster) voteAddresses() error {
 	c.notifyVoting()
 	defer c.notifyVotingCompleted()
 	var total_errors error
@@ -39,9 +39,9 @@ func (c *Cluster) voteIps() error {
 
 		// vote for ips
 		ips_voting := make(map[string]bool)
-		for _, ip := range node_status.Ips {
+		for _, addr := range node_status.Addresses {
 			// skip ips which have already been voted by current node in the last 5 minutes
-			for _, vote := range ip.Votes {
+			for _, vote := range addr.Votes {
 				if vote.NodeID == c.id {
 					if time.Since(time.Unix(vote.VotedAt, 0)) < time.Minute*5 && !vote.Failed {
 						continue
@@ -51,7 +51,7 @@ func (c *Cluster) voteIps() error {
 				}
 			}
 
-			ips_voting[ip.Address] = c.voteIp(ip) == nil
+			ips_voting[addr.fullAddress()] = c.voteAddress(addr) == nil
 		}
 
 		// lock the node status
@@ -70,22 +70,22 @@ func (c *Cluster) voteIps() error {
 		}
 
 		// update the node status
-		for i, ip := range node_status.Ips {
+		for i, ip := range node_status.Addresses {
 			// update voting time
-			if success, ok := ips_voting[ip.Address]; ok {
+			if success, ok := ips_voting[ip.fullAddress()]; ok {
 				// check if the ip has already voted
 				already_voted := false
 				for j, vote := range ip.Votes {
 					if vote.NodeID == c.id {
-						node_status.Ips[i].Votes[j].VotedAt = time.Now().Unix()
-						node_status.Ips[i].Votes[j].Failed = !success
+						node_status.Addresses[i].Votes[j].VotedAt = time.Now().Unix()
+						node_status.Addresses[i].Votes[j].Failed = !success
 						already_voted = true
 						break
 					}
 				}
 				// add a new vote
 				if !already_voted {
-					node_status.Ips[i].Votes = append(node_status.Ips[i].Votes, vote{
+					node_status.Addresses[i].Votes = append(node_status.Addresses[i].Votes, vote{
 						NodeID:  c.id,
 						VotedAt: time.Now().Unix(),
 						Failed:  !success,
@@ -108,12 +108,12 @@ func (c *Cluster) voteIps() error {
 	return total_errors
 }
 
-func (c *Cluster) voteIp(ip ip) error {
+func (c *Cluster) voteAddress(addr address) error {
 	type healthcheck struct {
 		Status string `json:"status"`
 	}
 
-	healthcheck_endpoint, err := url.JoinPath(fmt.Sprintf("http://%s:%d", ip.Address, c.port), "health/check")
+	healthcheck_endpoint, err := url.JoinPath(fmt.Sprintf("http://%s:%d", addr.Ip, addr.Port), "health/check")
 	if err != nil {
 		return err
 	}
@@ -136,11 +136,11 @@ func (c *Cluster) voteIp(ip ip) error {
 	return nil
 }
 
-func (c *Cluster) SortIps(node_status node) []ip {
+func (c *Cluster) SortIps(node_status node) []address {
 	// sort by votes
-	sort.Slice(node_status.Ips, func(i, j int) bool {
-		return len(node_status.Ips[i].Votes) > len(node_status.Ips[j].Votes)
+	sort.Slice(node_status.Addresses, func(i, j int) bool {
+		return len(node_status.Addresses[i].Votes) > len(node_status.Addresses[j].Votes)
 	})
 
-	return node_status.Ips
+	return node_status.Addresses
 }

+ 3 - 3
internal/cluster/vote_test.go

@@ -30,7 +30,7 @@ func createSimulationHealthCheckSever() (uint16, error) {
 	return uint16(port), nil
 }
 
-func TestVoteIps(t *testing.T) {
+func TestVoteAddresses(t *testing.T) {
 	// create a health check server
 	port, err := createSimulationHealthCheckSever()
 	if err != nil {
@@ -77,9 +77,9 @@ func TestVoteIps(t *testing.T) {
 		}
 
 		for _, node := range nodes {
-			for _, ip := range node.Ips {
+			for _, ip := range node.Addresses {
 				if len(ip.Votes) == 0 {
-					t.Errorf("vote for ip %s failed", ip.Address)
+					t.Errorf("vote for ip %s failed", ip.Ip)
 					return
 				}
 			}