clutser_test.go 4.4 KB


  1. package cluster
  2. import (
  3. "testing"
  4. "time"
  5. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  6. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
  7. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  9. )
  10. func createSimulationCluster(nums int) ([]*Cluster, error) {
  11. err := cache.InitRedisClient("0.0.0.0:6379", "difyai123456")
  12. if err != nil {
  13. return nil, err
  14. }
  15. result := make([]*Cluster, 0)
  16. for i := 0; i < nums; i++ {
  17. result = append(result, NewCluster(&app.Config{
  18. ServerPort: 12121,
  19. }))
  20. }
  21. log.SetShowLog(false)
  22. routine.InitPool(1024)
  23. // delete master key
  24. if err := cache.Del(PREEMPTION_LOCK_KEY); err != nil {
  25. return nil, err
  26. }
  27. return result, nil
  28. }
  29. func launchSimulationCluster(clusters []*Cluster, t *testing.T) {
  30. for _, cluster := range clusters {
  31. cluster.Launch()
  32. }
  33. }
  34. func closeSimulationCluster(clusters []*Cluster, t *testing.T) {
  35. for _, cluster := range clusters {
  36. cluster.Close()
  37. // wait for the cluster to close
  38. time.Sleep(time.Second * 1)
  39. // check if the cluster is closed
  40. _, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, cluster.id)
  41. if err == nil {
  42. t.Errorf("cluster is not closed")
  43. return
  44. }
  45. }
  46. }
  47. func TestSingleClusterLifetime(t *testing.T) {
  48. clusters, err := createSimulationCluster(1)
  49. if err != nil {
  50. t.Errorf("create simulation cluster failed: %v", err)
  51. return
  52. }
  53. launchSimulationCluster(clusters, t)
  54. defer closeSimulationCluster(clusters, t)
  55. time.Sleep(time.Second * 1)
  56. _, err = cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, clusters[0].id)
  57. if err != nil {
  58. t.Errorf("get cluster status failed: %v", err)
  59. return
  60. }
  61. }
  62. func TestMultipleClusterLifetime(t *testing.T) {
  63. clusters, err := createSimulationCluster(3)
  64. if err != nil {
  65. t.Errorf("create simulation cluster failed: %v", err)
  66. return
  67. }
  68. launchSimulationCluster(clusters, t)
  69. defer closeSimulationCluster(clusters, t)
  70. time.Sleep(time.Second * 1)
  71. has_master := false
  72. for _, cluster := range clusters {
  73. _, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, cluster.id)
  74. if err != nil {
  75. t.Errorf("get cluster status failed: %v", err)
  76. return
  77. }
  78. if cluster.IsMaster() {
  79. if has_master {
  80. t.Errorf("multiple master")
  81. return
  82. } else {
  83. has_master = true
  84. }
  85. }
  86. }
  87. if !has_master {
  88. t.Errorf("no master")
  89. }
  90. }
  91. func TestClusterSubstituteMaster(t *testing.T) {
  92. clusters, err := createSimulationCluster(3)
  93. if err != nil {
  94. t.Errorf("create simulation cluster failed: %v", err)
  95. return
  96. }
  97. launchSimulationCluster(clusters, t)
  98. defer closeSimulationCluster(clusters, t)
  99. time.Sleep(time.Second * 1)
  100. // close the master
  101. original_master_id := ""
  102. for _, cluster := range clusters {
  103. if cluster.IsMaster() {
  104. cluster.Close()
  105. original_master_id = cluster.id
  106. break
  107. }
  108. }
  109. if original_master_id == "" {
  110. t.Errorf("no master")
  111. return
  112. }
  113. time.Sleep(MASTER_LOCK_EXPIRED_TIME + time.Second)
  114. has_master := false
  115. for _, cluster := range clusters {
  116. if cluster.id == original_master_id {
  117. continue
  118. }
  119. _, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, cluster.id)
  120. if err != nil {
  121. t.Errorf("get cluster status failed: %v", err)
  122. return
  123. }
  124. if cluster.IsMaster() {
  125. if has_master {
  126. t.Errorf("multiple substitute master")
  127. return
  128. } else {
  129. has_master = true
  130. }
  131. }
  132. }
  133. if !has_master {
  134. t.Errorf("no substitute master")
  135. }
  136. }
  137. func TestClusterAutoGCNoLongerActiveNode(t *testing.T) {
  138. clusters, err := createSimulationCluster(3)
  139. if err != nil {
  140. t.Errorf("create simulation cluster failed: %v", err)
  141. return
  142. }
  143. launchSimulationCluster(clusters, t)
  144. defer closeSimulationCluster(clusters, t)
  145. time.Sleep(time.Second * 1)
  146. // randomly close a slave node to close
  147. slave_node_id := ""
  148. for _, cluster := range clusters {
  149. if !cluster.IsMaster() {
  150. slave_node_id = cluster.id
  151. cluster.Close()
  152. // wait for normal gc
  153. time.Sleep(time.Second * 1)
  154. // recover the node status
  155. if err := cluster.updateNodeStatus(); err != nil {
  156. t.Errorf("failed to recover the node status: %v", err)
  157. return
  158. }
  159. break
  160. }
  161. }
  162. if slave_node_id == "" {
  163. t.Errorf("no slave node")
  164. return
  165. }
  166. // wait for master gc task
  167. time.Sleep(MASTER_GC_INTERVAL * 2)
  168. _, err = cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, slave_node_id)
  169. if err == nil {
  170. t.Errorf("slave node is not collected by master gc automatically")
  171. return
  172. }
  173. }