Преглед на файлове

tests: cluster lifetime

Yeuoly преди 1 година
родител
ревизия
4073e2fa5e
променени са 6 файла, в които са добавени 258 реда и са изтрити 23 реда
  1. 6 4
      internal/cluster/init.go
  2. 0 11
      internal/cluster/cluster_id/id.go
  3. 228 0
      internal/cluster/clutser_test.go
  4. 18 7
      internal/cluster/lifetime.go
  5. 5 0
      internal/cluster/node.go
  6. 1 1
      internal/server/server.go

+ 6 - 4
internal/cluster/init.go

@@ -4,7 +4,7 @@ import (
 	"sync"
 	"sync/atomic"
 
-	"github.com/langgenius/dify-plugin-daemon/internal/cluster/cluster_id"
+	"github.com/google/uuid"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/mapping"
 )
@@ -37,19 +37,21 @@ type Cluster struct {
 
 func NewCluster(config *app.Config) *Cluster {
 	return &Cluster{
-		id:        cluster_id.GetInstanceID(),
+		id:        uuid.New().String(),
 		port:      uint16(config.ServerPort),
 		stop_chan: make(chan bool),
 		stopped:   new(int32),
 	}
 }
 
-func (c *Cluster) Launch(config *app.Config) {
+func (c *Cluster) Launch() {
 	go c.clusterLifetime()
 }
 
-func (c *Cluster) Close() {
+func (c *Cluster) Close() error {
 	if atomic.CompareAndSwapInt32(c.stopped, 0, 1) {
 		close(c.stop_chan)
 	}
+
+	return nil
 }

+ 0 - 11
internal/cluster/cluster_id/id.go

@@ -1,11 +0,0 @@
-package cluster_id
-
-import "github.com/google/uuid"
-
-var (
-	instanceId = uuid.New().String()
-)
-
-func GetInstanceID() string {
-	return instanceId
-}

+ 228 - 0
internal/cluster/clutser_test.go

@@ -0,0 +1,228 @@
+package cluster
+
+import (
+	"testing"
+	"time"
+
+	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
+)
+
+func createSimulationCluster(nums int) ([]*Cluster, error) {
+	err := cache.InitRedisClient("0.0.0.0:6379", "difyai123456")
+	if err != nil {
+		return nil, err
+	}
+
+	result := make([]*Cluster, 0)
+	for i := 0; i < nums; i++ {
+		result = append(result, NewCluster(&app.Config{
+			ServerPort: 12121,
+		}))
+	}
+
+	log.SetShowLog(false)
+
+	routine.InitPool(1024)
+
+	return result, nil
+}
+
+func TestSingleClusterLifetime(t *testing.T) {
+	clusters, err := createSimulationCluster(1)
+	if err != nil {
+		t.Errorf("create simulation cluster failed: %v", err)
+		return
+	}
+	clusters[0].Launch()
+	defer func() {
+		// check if the cluster is closed
+		_, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, clusters[0].id)
+		if err == nil {
+			t.Errorf("cluster is not closed")
+			return
+		}
+	}()
+	defer clusters[0].Close()
+
+	time.Sleep(time.Second * 1)
+
+	_, err = cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, clusters[0].id)
+	if err != nil {
+		t.Errorf("get cluster status failed: %v", err)
+		return
+	}
+}
+
+func TestMultipleClusterLifetime(t *testing.T) {
+	clusters, err := createSimulationCluster(3)
+	if err != nil {
+		t.Errorf("create simulation cluster failed: %v", err)
+		return
+	}
+
+	for _, cluster := range clusters {
+		cluster.Launch()
+		defer func(cluster *Cluster) {
+			cluster.Close()
+			// wait for the cluster to close
+			time.Sleep(time.Second * 1)
+			// check if the cluster is closed
+			_, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, cluster.id)
+			if err == nil {
+				t.Errorf("cluster is not closed")
+				return
+			}
+		}(cluster)
+	}
+
+	time.Sleep(time.Second * 1)
+
+	has_master := false
+
+	for _, cluster := range clusters {
+		_, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, cluster.id)
+		if err != nil {
+			t.Errorf("get cluster status failed: %v", err)
+			return
+		}
+
+		if cluster.IsMaster() {
+			if has_master {
+				t.Errorf("multiple master")
+				return
+			} else {
+				has_master = true
+			}
+		}
+	}
+
+	if !has_master {
+		t.Errorf("no master")
+	}
+}
+
+func TestClusterSubstituteMaster(t *testing.T) {
+	clusters, err := createSimulationCluster(3)
+	if err != nil {
+		t.Errorf("create simulation cluster failed: %v", err)
+		return
+	}
+
+	for _, cluster := range clusters {
+		cluster.Launch()
+		defer func(cluster *Cluster) {
+			cluster.Close()
+			// wait for the cluster to close
+			time.Sleep(time.Second * 1)
+			// check if the cluster is closed
+			_, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, cluster.id)
+			if err == nil {
+				t.Errorf("cluster is not closed")
+				return
+			}
+		}(cluster)
+	}
+
+	time.Sleep(time.Second * 1)
+
+	// close the master
+	original_master_id := ""
+	for _, cluster := range clusters {
+		if cluster.IsMaster() {
+			cluster.Close()
+			original_master_id = cluster.id
+			break
+		}
+	}
+	if original_master_id == "" {
+		t.Errorf("no master")
+		return
+	}
+
+	time.Sleep(MASTER_LOCK_EXPIRED_TIME + time.Second)
+
+	has_master := false
+
+	for _, cluster := range clusters {
+		if cluster.id == original_master_id {
+			continue
+		}
+		_, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, cluster.id)
+		if err != nil {
+			t.Errorf("get cluster status failed: %v", err)
+			return
+		}
+
+		if cluster.IsMaster() {
+			if has_master {
+				t.Errorf("multiple substitute master")
+				return
+			} else {
+				has_master = true
+			}
+		}
+	}
+
+	if !has_master {
+		t.Errorf("no substitute master")
+	}
+}
+
+func TestClusterAutoGCNoLongerActiveNode(t *testing.T) {
+	clusters, err := createSimulationCluster(3)
+	if err != nil {
+		t.Errorf("create simulation cluster failed: %v", err)
+		return
+	}
+
+	for _, cluster := range clusters {
+		cluster.Launch()
+		defer func(cluster *Cluster) {
+			cluster.Close()
+			// wait for the cluster to close
+			time.Sleep(time.Second * 1)
+			// check if the cluster is closed
+			_, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, cluster.id)
+			if err == nil {
+				t.Errorf("cluster is not closed")
+				return
+			}
+		}(cluster)
+	}
+
+	time.Sleep(time.Second * 1)
+
+	// randomly close a slave node to close
+	slave_node_id := ""
+	for _, cluster := range clusters {
+		if !cluster.IsMaster() {
+			slave_node_id = cluster.id
+			cluster.Close()
+			// wait for normal gc
+			time.Sleep(time.Second * 1)
+			// recover the node status
+			if err := cluster.updateNodeStatus(); err != nil {
+				t.Errorf("failed to recover the node status: %v", err)
+				return
+			}
+			break
+		}
+	}
+
+	if slave_node_id == "" {
+		t.Errorf("no slave node")
+		return
+	}
+
+	// wait for master gc task
+	time.Sleep(MASTER_GC_INTERVAL * 2)
+
+	_, err = cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, slave_node_id)
+	if err == nil {
+		t.Errorf("slave node is not collected by master gc automatically")
+		return
+	}
+}

+ 18 - 7
internal/cluster/lifetime.go

@@ -4,6 +4,7 @@ import (
 	"time"
 
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
 )
 
 const (
@@ -15,7 +16,7 @@ const (
 	// once a node is selected as the master, it is locked in that role.
 	// If the master node becomes inactive, the master slot is released, allowing other nodes to attempt to take over the role.
 	MASTER_LOCKING_INTERVAL  = time.Millisecond * 500 // interval to try to lock the slot to be the master
-	MASTER_LOCK_EXPIRED_TIME = time.Second * 5        // expired time of master key
+	MASTER_LOCK_EXPIRED_TIME = time.Second * 2        // expired time of master key
 	MASTER_GC_INTERVAL       = time.Second * 10       // interval to do garbage collection of nodes has already deactivated
 
 	// node
@@ -39,6 +40,12 @@ const (
 
 // lifetime of the cluster
 func (c *Cluster) clusterLifetime() {
+	defer func() {
+		if err := c.removeSelfNode(); err != nil {
+			log.Error("failed to remove the self node from the cluster: %s", err.Error())
+		}
+	}()
+
 	ticker_lock_master := time.NewTicker(MASTER_LOCKING_INTERVAL)
 	defer ticker_lock_master.Stop()
 
@@ -55,14 +62,18 @@ func (c *Cluster) clusterLifetime() {
 	defer plugin_scheduler_ticker.Stop()
 
 	// vote for all ips and find the best one, prepare for later traffic scheduling
-	if err := c.voteIps(); err != nil {
-		log.Error("failed to vote the ips of the nodes: %s", err.Error())
-	}
+	routine.Submit(func() {
+		if err := c.voteIps(); err != nil {
+			log.Error("failed to vote the ips of the nodes: %s", err.Error())
+		}
+	})
 
 	// fetch all possible nodes
-	if err := c.updateNodeStatus(); err != nil {
-		log.Error("failed to update the status of the node: %s", err.Error())
-	}
+	routine.Submit(func() {
+		if err := c.updateNodeStatus(); err != nil {
+			log.Error("failed to update the status of the node: %s", err.Error())
+		}
+	})
 
 	for {
 		select {

+ 5 - 0
internal/cluster/node.go

@@ -207,6 +207,11 @@ func (c *Cluster) gcNode(node_id string) error {
 	return nil
 }
 
+// remove self node from the cluster
+func (c *Cluster) removeSelfNode() error {
+	return c.gcNode(c.id)
+}
+
 const (
 	CLUSTER_UPDATE_NODE_STATUS_LOCK_PREFIX = "cluster-update-node-status-lock"
 )

+ 1 - 1
internal/server/server.go

@@ -27,7 +27,7 @@ func (a *App) Run(config *app.Config) {
 	a.plugin_manager.Init(config)
 
 	// launch cluster
-	a.cluster.Launch(config)
+	a.cluster.Launch()
 
 	// start http server
 	server(config)