clutser_test.go 5.0 KB


  1. package cluster
  2. import (
  3. "testing"
  4. "time"
  5. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  6. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
  7. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  9. )
  10. func createSimulationCluster(nums int) ([]*Cluster, error) {
  11. err := cache.InitRedisClient("0.0.0.0:6379", "difyai123456")
  12. if err != nil {
  13. return nil, err
  14. }
  15. result := make([]*Cluster, 0)
  16. for i := 0; i < nums; i++ {
  17. result = append(result, NewCluster(&app.Config{
  18. ServerPort: 12121,
  19. }))
  20. }
  21. log.SetShowLog(false)
  22. routine.InitPool(1024)
  23. // delete master key
  24. if err := cache.Del(PREEMPTION_LOCK_KEY); err != nil {
  25. return nil, err
  26. }
  27. return result, nil
  28. }
  29. func TestSingleClusterLifetime(t *testing.T) {
  30. clusters, err := createSimulationCluster(1)
  31. if err != nil {
  32. t.Errorf("create simulation cluster failed: %v", err)
  33. return
  34. }
  35. clusters[0].Launch()
  36. defer func() {
  37. clusters[0].Close()
  38. // wait for the cluster to close
  39. time.Sleep(time.Second * 1)
  40. // check if the cluster is closed
  41. _, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, clusters[0].id)
  42. if err == nil {
  43. t.Errorf("cluster is not closed")
  44. return
  45. }
  46. }()
  47. time.Sleep(time.Second * 1)
  48. _, err = cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, clusters[0].id)
  49. if err != nil {
  50. t.Errorf("get cluster status failed: %v", err)
  51. return
  52. }
  53. }
  54. func TestMultipleClusterLifetime(t *testing.T) {
  55. clusters, err := createSimulationCluster(3)
  56. if err != nil {
  57. t.Errorf("create simulation cluster failed: %v", err)
  58. return
  59. }
  60. for _, cluster := range clusters {
  61. cluster.Launch()
  62. defer func(cluster *Cluster) {
  63. cluster.Close()
  64. // wait for the cluster to close
  65. time.Sleep(time.Second * 1)
  66. // check if the cluster is closed
  67. _, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, cluster.id)
  68. if err == nil {
  69. t.Errorf("cluster is not closed")
  70. return
  71. }
  72. }(cluster)
  73. }
  74. time.Sleep(time.Second * 1)
  75. has_master := false
  76. for _, cluster := range clusters {
  77. _, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, cluster.id)
  78. if err != nil {
  79. t.Errorf("get cluster status failed: %v", err)
  80. return
  81. }
  82. if cluster.IsMaster() {
  83. if has_master {
  84. t.Errorf("multiple master")
  85. return
  86. } else {
  87. has_master = true
  88. }
  89. }
  90. }
  91. if !has_master {
  92. t.Errorf("no master")
  93. }
  94. }
  95. func TestClusterSubstituteMaster(t *testing.T) {
  96. clusters, err := createSimulationCluster(3)
  97. if err != nil {
  98. t.Errorf("create simulation cluster failed: %v", err)
  99. return
  100. }
  101. for _, cluster := range clusters {
  102. cluster.Launch()
  103. defer func(cluster *Cluster) {
  104. cluster.Close()
  105. // wait for the cluster to close
  106. time.Sleep(time.Second * 1)
  107. // check if the cluster is closed
  108. _, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, cluster.id)
  109. if err == nil {
  110. t.Errorf("cluster is not closed")
  111. return
  112. }
  113. }(cluster)
  114. }
  115. time.Sleep(time.Second * 1)
  116. // close the master
  117. original_master_id := ""
  118. for _, cluster := range clusters {
  119. if cluster.IsMaster() {
  120. cluster.Close()
  121. original_master_id = cluster.id
  122. break
  123. }
  124. }
  125. if original_master_id == "" {
  126. t.Errorf("no master")
  127. return
  128. }
  129. time.Sleep(MASTER_LOCK_EXPIRED_TIME + time.Second)
  130. has_master := false
  131. for _, cluster := range clusters {
  132. if cluster.id == original_master_id {
  133. continue
  134. }
  135. _, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, cluster.id)
  136. if err != nil {
  137. t.Errorf("get cluster status failed: %v", err)
  138. return
  139. }
  140. if cluster.IsMaster() {
  141. if has_master {
  142. t.Errorf("multiple substitute master")
  143. return
  144. } else {
  145. has_master = true
  146. }
  147. }
  148. }
  149. if !has_master {
  150. t.Errorf("no substitute master")
  151. }
  152. }
  153. func TestClusterAutoGCNoLongerActiveNode(t *testing.T) {
  154. clusters, err := createSimulationCluster(3)
  155. if err != nil {
  156. t.Errorf("create simulation cluster failed: %v", err)
  157. return
  158. }
  159. for _, cluster := range clusters {
  160. cluster.Launch()
  161. defer func(cluster *Cluster) {
  162. cluster.Close()
  163. // wait for the cluster to close
  164. time.Sleep(time.Second * 1)
  165. // check if the cluster is closed
  166. _, err := cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, cluster.id)
  167. if err == nil {
  168. t.Errorf("cluster is not closed")
  169. return
  170. }
  171. }(cluster)
  172. }
  173. time.Sleep(time.Second * 1)
  174. // randomly close a slave node to close
  175. slave_node_id := ""
  176. for _, cluster := range clusters {
  177. if !cluster.IsMaster() {
  178. slave_node_id = cluster.id
  179. cluster.Close()
  180. // wait for normal gc
  181. time.Sleep(time.Second * 1)
  182. // recover the node status
  183. if err := cluster.updateNodeStatus(); err != nil {
  184. t.Errorf("failed to recover the node status: %v", err)
  185. return
  186. }
  187. break
  188. }
  189. }
  190. if slave_node_id == "" {
  191. t.Errorf("no slave node")
  192. return
  193. }
  194. // wait for master gc task
  195. time.Sleep(MASTER_GC_INTERVAL * 2)
  196. _, err = cache.GetMapField[node](CLUSTER_STATUS_HASH_MAP_KEY, slave_node_id)
  197. if err == nil {
  198. t.Errorf("slave node is not collected by master gc automatically")
  199. return
  200. }
  201. }