clutser_test.go 5.0 KB


  1. package cluster
  2. import (
  3. "testing"
  4. "time"
  5. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  6. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
  7. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  9. )
  10. func createSimulationCluster(nums int) ([]*Cluster, error) {
  11. err := cache.InitRedisClient("0.0.0.0:6379", "difyai123456")
  12. if err != nil {
  13. return nil, err
  14. }
  15. result := make([]*Cluster, 0)
  16. for i := 0; i < nums; i++ {
  17. result = append(result, NewCluster(&app.Config{
  18. ServerPort: 12121,
  19. }))
  20. }
  21. log.SetShowLog(false)
  22. routine.InitPool(1024)
  23. // delete master key
  24. if err := cache.Del(PREEMPTION_LOCK_KEY); err != nil {
  25. return nil, err
  26. }
  27. return result, nil
  28. }
  29. func TestSingleClusterLifetime(t *testing.T) {
  30. clusters, err := createSimulationCluster(1)
  31. if err != nil {
  32. t.Errorf("create simulation cluster failed: %v", err)
  33. return
  34. }
  35. clusters[0].Launch()
  36. defer func() {
  37. // check if the cluster is closed
  38. _, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, clusters[0].id)
  39. if err == nil {
  40. t.Errorf("cluster is not closed")
  41. return
  42. }
  43. }()
  44. defer clusters[0].Close()
  45. time.Sleep(time.Second * 1)
  46. _, err = cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, clusters[0].id)
  47. if err != nil {
  48. t.Errorf("get cluster status failed: %v", err)
  49. return
  50. }
  51. }
  52. func TestMultipleClusterLifetime(t *testing.T) {
  53. clusters, err := createSimulationCluster(3)
  54. if err != nil {
  55. t.Errorf("create simulation cluster failed: %v", err)
  56. return
  57. }
  58. for _, cluster := range clusters {
  59. cluster.Launch()
  60. defer func(cluster *Cluster) {
  61. cluster.Close()
  62. // wait for the cluster to close
  63. time.Sleep(time.Second * 1)
  64. // check if the cluster is closed
  65. _, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, cluster.id)
  66. if err == nil {
  67. t.Errorf("cluster is not closed")
  68. return
  69. }
  70. }(cluster)
  71. }
  72. time.Sleep(time.Second * 1)
  73. has_master := false
  74. for _, cluster := range clusters {
  75. _, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, cluster.id)
  76. if err != nil {
  77. t.Errorf("get cluster status failed: %v", err)
  78. return
  79. }
  80. if cluster.IsMaster() {
  81. if has_master {
  82. t.Errorf("multiple master")
  83. return
  84. } else {
  85. has_master = true
  86. }
  87. }
  88. }
  89. if !has_master {
  90. t.Errorf("no master")
  91. }
  92. }
  93. func TestClusterSubstituteMaster(t *testing.T) {
  94. clusters, err := createSimulationCluster(3)
  95. if err != nil {
  96. t.Errorf("create simulation cluster failed: %v", err)
  97. return
  98. }
  99. for _, cluster := range clusters {
  100. cluster.Launch()
  101. defer func(cluster *Cluster) {
  102. cluster.Close()
  103. // wait for the cluster to close
  104. time.Sleep(time.Second * 1)
  105. // check if the cluster is closed
  106. _, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, cluster.id)
  107. if err == nil {
  108. t.Errorf("cluster is not closed")
  109. return
  110. }
  111. }(cluster)
  112. }
  113. time.Sleep(time.Second * 1)
  114. // close the master
  115. original_master_id := ""
  116. for _, cluster := range clusters {
  117. if cluster.IsMaster() {
  118. cluster.Close()
  119. original_master_id = cluster.id
  120. break
  121. }
  122. }
  123. if original_master_id == "" {
  124. t.Errorf("no master")
  125. return
  126. }
  127. time.Sleep(MASTER_LOCK_EXPIRED_TIME + time.Second)
  128. has_master := false
  129. for _, cluster := range clusters {
  130. if cluster.id == original_master_id {
  131. continue
  132. }
  133. _, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, cluster.id)
  134. if err != nil {
  135. t.Errorf("get cluster status failed: %v", err)
  136. return
  137. }
  138. if cluster.IsMaster() {
  139. if has_master {
  140. t.Errorf("multiple substitute master")
  141. return
  142. } else {
  143. has_master = true
  144. }
  145. }
  146. }
  147. if !has_master {
  148. t.Errorf("no substitute master")
  149. }
  150. }
  151. func TestClusterAutoGCNoLongerActiveNode(t *testing.T) {
  152. clusters, err := createSimulationCluster(3)
  153. if err != nil {
  154. t.Errorf("create simulation cluster failed: %v", err)
  155. return
  156. }
  157. for _, cluster := range clusters {
  158. cluster.Launch()
  159. defer func(cluster *Cluster) {
  160. cluster.Close()
  161. // wait for the cluster to close
  162. time.Sleep(time.Second * 1)
  163. // check if the cluster is closed
  164. _, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, cluster.id)
  165. if err == nil {
  166. t.Errorf("cluster is not closed")
  167. return
  168. }
  169. }(cluster)
  170. }
  171. time.Sleep(time.Second * 1)
  172. // randomly close a slave node to close
  173. slave_node_id := ""
  174. for _, cluster := range clusters {
  175. if !cluster.IsMaster() {
  176. slave_node_id = cluster.id
  177. cluster.Close()
  178. // wait for normal gc
  179. time.Sleep(time.Second * 1)
  180. // recover the node status
  181. if err := cluster.updateNodeStatus(); err != nil {
  182. t.Errorf("failed to recover the node status: %v", err)
  183. return
  184. }
  185. break
  186. }
  187. }
  188. if slave_node_id == "" {
  189. t.Errorf("no slave node")
  190. return
  191. }
  192. // wait for master gc task
  193. time.Sleep(MASTER_GC_INTERVAL * 2)
  194. _, err = cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, slave_node_id)
  195. if err == nil {
  196. t.Errorf("slave node is not collected by master gc automatically")
  197. return
  198. }
  199. }