plugin_test.go 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. package cluster
  2. import (
  3. "testing"
  4. "time"
  5. "github.com/google/uuid"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
  7. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  8. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  9. )
  10. type fakePlugin struct {
  11. plugin_entities.PluginRuntime
  12. positive_manager.PositivePluginRuntime
  13. }
  14. func (r *fakePlugin) InitEnvironment() error {
  15. return nil
  16. }
  17. func (r *fakePlugin) Checksum() (string, error) {
  18. return "", nil
  19. }
  20. func (r *fakePlugin) Identity() (plugin_entities.PluginUniqueIdentifier, error) {
  21. return plugin_entities.PluginUniqueIdentifier(""), nil
  22. }
  23. func (r *fakePlugin) StartPlugin() error {
  24. return nil
  25. }
  26. func (r *fakePlugin) Type() plugin_entities.PluginRuntimeType {
  27. return plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL
  28. }
  29. func (r *fakePlugin) Wait() (<-chan bool, error) {
  30. return nil, nil
  31. }
  32. func (r *fakePlugin) Listen(string) *entities.Broadcast[plugin_entities.SessionMessage] {
  33. return nil
  34. }
  35. func (r *fakePlugin) Write(string, []byte) {
  36. }
  37. func getRandomPluginRuntime() fakePlugin {
  38. return fakePlugin{
  39. PluginRuntime: plugin_entities.PluginRuntime{
  40. Config: plugin_entities.PluginDeclaration{
  41. PluginDeclarationWithoutAdvancedFields: plugin_entities.PluginDeclarationWithoutAdvancedFields{
  42. Name: uuid.New().String(),
  43. Label: plugin_entities.I18nObject{
  44. EnUS: "label",
  45. },
  46. Version: "0.0.1",
  47. Type: plugin_entities.PluginType,
  48. Author: "Yeuoly",
  49. CreatedAt: time.Now(),
  50. Plugins: []string{"test"},
  51. },
  52. },
  53. },
  54. }
  55. }
  56. func TestPluginScheduleLifetime(t *testing.T) {
  57. plugin := getRandomPluginRuntime()
  58. cluster, err := createSimulationCluster(1)
  59. if err != nil {
  60. t.Errorf("create simulation cluster failed: %v", err)
  61. return
  62. }
  63. launchSimulationCluster(cluster)
  64. defer closeSimulationCluster(cluster, t)
  65. time.Sleep(time.Second * 1)
  66. // add plugin to the cluster
  67. err = cluster[0].RegisterPlugin(&plugin)
  68. if err != nil {
  69. t.Errorf("register plugin failed: %v", err)
  70. return
  71. }
  72. hashed_identity, err := plugin.HashedIdentity()
  73. if err != nil {
  74. t.Errorf("get plugin hashed identity failed: %v", err)
  75. return
  76. }
  77. nodes, err := cluster[0].FetchPluginAvailableNodesByHashedId(hashed_identity)
  78. if err != nil {
  79. t.Errorf("fetch plugin available nodes failed: %v", err)
  80. return
  81. }
  82. if len(nodes) != 1 {
  83. t.Errorf("plugin not scheduled")
  84. return
  85. }
  86. if nodes[0] != cluster[0].id {
  87. t.Errorf("plugin scheduled to wrong node")
  88. return
  89. }
  90. // trigger plugin stop
  91. plugin.TriggerStop()
  92. // wait for the plugin to stop
  93. time.Sleep(time.Second * 1)
  94. // check if the plugin is stopped
  95. nodes, err = cluster[0].FetchPluginAvailableNodesByHashedId(hashed_identity)
  96. if err != nil {
  97. t.Errorf("fetch plugin available nodes failed: %v", err)
  98. return
  99. }
  100. if len(nodes) != 0 {
  101. t.Errorf("plugin not stopped")
  102. return
  103. }
  104. }
  105. func TestPluginScheduleWhenMasterClusterShutdown(t *testing.T) {
  106. plugins := []fakePlugin{
  107. getRandomPluginRuntime(),
  108. getRandomPluginRuntime(),
  109. }
  110. cluster, err := createSimulationCluster(2)
  111. if err != nil {
  112. t.Errorf("create simulation cluster failed: %v", err)
  113. return
  114. }
  115. launchSimulationCluster(cluster)
  116. defer closeSimulationCluster(cluster, t)
  117. // add plugin to the cluster
  118. for i, plugin := range plugins {
  119. err = cluster[i].RegisterPlugin(&plugin)
  120. if err != nil {
  121. t.Errorf("register plugin failed: %v", err)
  122. return
  123. }
  124. }
  125. // wait for the plugin to be scheduled
  126. time.Sleep(time.Second * 1)
  127. // close master node and wait for new master to be elected
  128. master_idx := -1
  129. for i, node := range cluster {
  130. if node.IsMaster() {
  131. master_idx = i
  132. // close the master node
  133. node.Close()
  134. break
  135. }
  136. }
  137. if master_idx == -1 {
  138. t.Errorf("master node not found")
  139. return
  140. }
  141. // wait for the new master to be elected
  142. i := 0
  143. for ; i < 10; i++ {
  144. time.Sleep(time.Second * 1)
  145. found := false
  146. for i, node := range cluster {
  147. if node.IsMaster() && i != master_idx {
  148. found = true
  149. break
  150. }
  151. }
  152. if found {
  153. break
  154. }
  155. }
  156. if i == 10 {
  157. t.Errorf("master node is not elected")
  158. return
  159. }
  160. // check if plugins[master_idx] is removed
  161. hashed_identity, err := plugins[master_idx].HashedIdentity()
  162. if err != nil {
  163. t.Errorf("get plugin hashed identity failed: %v", err)
  164. return
  165. }
  166. ticker := time.NewTicker(time.Second)
  167. timeout := time.NewTimer(MASTER_GC_INTERVAL * 2)
  168. done := false
  169. for !done {
  170. select {
  171. case <-ticker.C:
  172. nodes, err := cluster[master_idx].FetchPluginAvailableNodesByHashedId(hashed_identity)
  173. if err != nil {
  174. t.Errorf("fetch plugin available nodes failed: %v", err)
  175. return
  176. }
  177. if len(nodes) == 0 {
  178. done = true
  179. }
  180. case <-timeout.C:
  181. t.Errorf("plugin not removed")
  182. return
  183. }
  184. }
  185. // check if plugins[1-master_idx] is still scheduled
  186. hashed_identity, err = plugins[1-master_idx].HashedIdentity()
  187. if err != nil {
  188. t.Errorf("get plugin hashed identity failed: %v", err)
  189. return
  190. }
  191. nodes, err := cluster[1-master_idx].FetchPluginAvailableNodesByHashedId(hashed_identity)
  192. if err != nil {
  193. t.Errorf("fetch plugin available nodes failed: %v", err)
  194. return
  195. }
  196. if len(nodes) != 1 {
  197. t.Errorf("plugin not scheduled")
  198. return
  199. }
  200. }