123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219 |
- package cluster
- import (
- "testing"
- "time"
- "github.com/langgenius/dify-plugin-daemon/internal/types/app"
- "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
- "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
- "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
- )
- func createSimulationCluster(nums int) ([]*Cluster, error) {
- err := cache.InitRedisClient("0.0.0.0:6379", "difyai123456")
- if err != nil {
- return nil, err
- }
- result := make([]*Cluster, 0)
- for i := 0; i < nums; i++ {
- result = append(result, NewCluster(&app.Config{
- ServerPort: 12121,
- }))
- }
- log.SetShowLog(false)
- routine.InitPool(1024)
- // delete master key
- if err := cache.Del(PREEMPTION_LOCK_KEY); err != nil {
- return nil, err
- }
- return result, nil
- }
- func launchSimulationCluster(clusters []*Cluster) {
- for _, cluster := range clusters {
- cluster.Launch()
- }
- }
- func closeSimulationCluster(clusters []*Cluster, t *testing.T) {
- for _, cluster := range clusters {
- cluster.Close()
- // wait for the cluster to close
- <-cluster.NotifyClusterStopped()
- // check if the cluster is closed
- _, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, cluster.id)
- if err == nil {
- t.Errorf("cluster is not closed")
- return
- }
- }
- }
- func TestSingleClusterLifetime(t *testing.T) {
- clusters, err := createSimulationCluster(1)
- if err != nil {
- t.Errorf("create simulation cluster failed: %v", err)
- return
- }
- launchSimulationCluster(clusters)
- defer closeSimulationCluster(clusters, t)
- <-clusters[0].NotifyBecomeMaster()
- _, err = cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, clusters[0].id)
- if err != nil {
- t.Errorf("get cluster status failed: %v", err)
- return
- }
- }
- func TestMultipleClusterLifetime(t *testing.T) {
- clusters, err := createSimulationCluster(3)
- if err != nil {
- t.Errorf("create simulation cluster failed: %v", err)
- return
- }
- launchSimulationCluster(clusters)
- defer closeSimulationCluster(clusters, t)
- select {
- case <-clusters[0].NotifyBecomeMaster():
- case <-clusters[1].NotifyBecomeMaster():
- case <-clusters[2].NotifyBecomeMaster():
- }
- has_master := false
- for _, cluster := range clusters {
- _, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, cluster.id)
- if err != nil {
- t.Errorf("get cluster status failed: %v", err)
- return
- }
- if cluster.IsMaster() {
- if has_master {
- t.Errorf("multiple master")
- return
- } else {
- has_master = true
- }
- }
- }
- if !has_master {
- t.Errorf("no master")
- }
- }
- func TestClusterSubstituteMaster(t *testing.T) {
- clusters, err := createSimulationCluster(3)
- if err != nil {
- t.Errorf("create simulation cluster failed: %v", err)
- return
- }
- launchSimulationCluster(clusters)
- defer closeSimulationCluster(clusters, t)
- select {
- case <-clusters[0].NotifyBecomeMaster():
- case <-clusters[1].NotifyBecomeMaster():
- case <-clusters[2].NotifyBecomeMaster():
- }
- // close the master
- original_master_id := ""
- for _, cluster := range clusters {
- if cluster.IsMaster() {
- cluster.Close()
- original_master_id = cluster.id
- break
- }
- }
- if original_master_id == "" {
- t.Errorf("no master")
- return
- }
- time.Sleep(MASTER_LOCK_EXPIRED_TIME + time.Second)
- has_master := false
- for _, cluster := range clusters {
- if cluster.id == original_master_id {
- continue
- }
- _, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, cluster.id)
- if err != nil {
- t.Errorf("get cluster status failed: %v", err)
- return
- }
- if cluster.IsMaster() {
- if has_master {
- t.Errorf("multiple substitute master")
- return
- } else {
- has_master = true
- }
- }
- }
- if !has_master {
- t.Errorf("no substitute master")
- }
- }
- func TestClusterAutoGCNoLongerActiveNode(t *testing.T) {
- clusters, err := createSimulationCluster(3)
- if err != nil {
- t.Errorf("create simulation cluster failed: %v", err)
- return
- }
- launchSimulationCluster(clusters)
- defer closeSimulationCluster(clusters, t)
- select {
- case <-clusters[0].NotifyBecomeMaster():
- case <-clusters[1].NotifyBecomeMaster():
- case <-clusters[2].NotifyBecomeMaster():
- }
- // randomly close a slave node to close
- slave_node_id := ""
- for _, cluster := range clusters {
- if !cluster.IsMaster() {
- slave_node_id = cluster.id
- cluster.Close()
- // wait for the cluster to close
- <-cluster.NotifyClusterStopped()
- // recover the node status
- if err := cluster.updateNodeStatus(); err != nil {
- t.Errorf("failed to recover the node status: %v", err)
- return
- }
- break
- }
- }
- if slave_node_id == "" {
- t.Errorf("no slave node")
- return
- }
- // wait for master gc task
- time.Sleep(NODE_DISCONNECTED_TIMEOUT*2 + time.Second)
- _, err = cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, slave_node_id)
- if err == nil {
- t.Errorf("slave node is not collected by master gc automatically")
- return
- }
- }
|