plugin_test.go 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. package cluster
  2. import (
  3. "testing"
  4. "time"
  5. "github.com/google/uuid"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
  7. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_runtime"
  8. "github.com/langgenius/dify-plugin-daemon/pkg/entities"
  9. "github.com/langgenius/dify-plugin-daemon/pkg/entities/manifest_entities"
  10. "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
  11. )
  12. type fakePlugin struct {
  13. plugin_entities.PluginRuntime
  14. basic_runtime.BasicChecksum
  15. }
  16. func (r *fakePlugin) InitEnvironment() error {
  17. return nil
  18. }
  19. func (r *fakePlugin) Checksum() (string, error) {
  20. return "", nil
  21. }
  22. func (r *fakePlugin) Identity() (plugin_entities.PluginUniqueIdentifier, error) {
  23. return plugin_entities.PluginUniqueIdentifier(""), nil
  24. }
  25. func (r *fakePlugin) StartPlugin() error {
  26. return nil
  27. }
  28. func (r *fakePlugin) Type() plugin_entities.PluginRuntimeType {
  29. return plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL
  30. }
  31. func (r *fakePlugin) Wait() (<-chan bool, error) {
  32. return nil, nil
  33. }
  34. func (r *fakePlugin) Listen(string) *entities.Broadcast[plugin_entities.SessionMessage] {
  35. return nil
  36. }
  37. func (r *fakePlugin) Write(string, access_types.PluginAccessAction, []byte) {
  38. }
  39. func getRandomPluginRuntime() fakePlugin {
  40. return fakePlugin{
  41. PluginRuntime: plugin_entities.PluginRuntime{
  42. Config: plugin_entities.PluginDeclaration{
  43. PluginDeclarationWithoutAdvancedFields: plugin_entities.PluginDeclarationWithoutAdvancedFields{
  44. Name: uuid.New().String(),
  45. Label: plugin_entities.I18nObject{
  46. EnUS: "label",
  47. },
  48. Version: "0.0.1",
  49. Type: manifest_entities.PluginType,
  50. Author: "Yeuoly",
  51. CreatedAt: time.Now(),
  52. Plugins: plugin_entities.PluginExtensions{
  53. Tools: []string{"test"},
  54. },
  55. },
  56. },
  57. },
  58. }
  59. }
  60. func TestPluginScheduleLifetime(t *testing.T) {
  61. plugin := getRandomPluginRuntime()
  62. cluster, err := createSimulationCluster(1)
  63. if err != nil {
  64. t.Errorf("create simulation cluster failed: %v", err)
  65. return
  66. }
  67. launchSimulationCluster(cluster)
  68. defer closeSimulationCluster(cluster, t)
  69. time.Sleep(time.Second * 1)
  70. // add plugin to the cluster
  71. err = cluster[0].RegisterPlugin(&plugin)
  72. if err != nil {
  73. t.Errorf("register plugin failed: %v", err)
  74. return
  75. }
  76. identity, err := plugin.Identity()
  77. if err != nil {
  78. t.Errorf("get plugin identity failed: %v", err)
  79. return
  80. }
  81. hashedIdentity := plugin_entities.HashedIdentity(identity.String())
  82. nodes, err := cluster[0].FetchPluginAvailableNodesByHashedId(hashedIdentity)
  83. if err != nil {
  84. t.Errorf("fetch plugin available nodes failed: %v", err)
  85. return
  86. }
  87. if len(nodes) != 1 {
  88. t.Errorf("plugin not scheduled")
  89. return
  90. }
  91. if nodes[0] != cluster[0].id {
  92. t.Errorf("plugin scheduled to wrong node")
  93. return
  94. }
  95. // trigger plugin stop
  96. plugin.TriggerStop()
  97. // wait for the plugin to stop
  98. time.Sleep(time.Second * 1)
  99. // check if the plugin is stopped
  100. nodes, err = cluster[0].FetchPluginAvailableNodesByHashedId(hashedIdentity)
  101. if err != nil {
  102. t.Errorf("fetch plugin available nodes failed: %v", err)
  103. return
  104. }
  105. if len(nodes) != 0 {
  106. t.Errorf("plugin not stopped")
  107. return
  108. }
  109. }
  110. // TODO: I need to implement this test, now it's randomly working
  111. // func TestPluginScheduleWhenMasterClusterShutdown(t *testing.T) {
  112. // plugins := []fakePlugin{
  113. // getRandomPluginRuntime(),
  114. // getRandomPluginRuntime(),
  115. // }
  116. // cluster, err := createSimulationCluster(2)
  117. // if err != nil {
  118. // t.Errorf("create simulation cluster failed: %v", err)
  119. // return
  120. // }
  121. // // set master gc interval to 1 second
  122. // for _, node := range cluster {
  123. // node.nodeDisconnectedTimeout = time.Second * 2
  124. // node.masterGcInterval = time.Second * 1
  125. // node.pluginSchedulerInterval = time.Second * 1
  126. // node.pluginSchedulerTickerInterval = time.Second * 1
  127. // node.updateNodeStatusInterval = time.Second * 1
  128. // node.pluginDeactivatedTimeout = time.Second * 2
  129. // node.showLog = true
  130. // }
  131. // launchSimulationCluster(cluster)
  132. // defer closeSimulationCluster(cluster, t)
  133. // // add plugin to the cluster
  134. // for i, plugin := range plugins {
  135. // err = cluster[i].RegisterPlugin(&plugin)
  136. // if err != nil {
  137. // t.Errorf("register plugin failed: %v", err)
  138. // return
  139. // }
  140. // }
  141. // // wait for the plugin to be scheduled
  142. // time.Sleep(time.Second * 1)
  143. // // close master node and wait for new master to be elected
  144. // masterIdx := -1
  145. // for i, node := range cluster {
  146. // if node.IsMaster() {
  147. // masterIdx = i
  148. // // close the master node
  149. // node.Close()
  150. // break
  151. // }
  152. // }
  153. // if masterIdx == -1 {
  154. // t.Errorf("master node not found")
  155. // return
  156. // }
  157. // // wait for the new master to be elected
  158. // i := 0
  159. // for ; i < 10; i++ {
  160. // time.Sleep(time.Second * 1)
  161. // found := false
  162. // for i, node := range cluster {
  163. // if node.IsMaster() && i != masterIdx {
  164. // found = true
  165. // break
  166. // }
  167. // }
  168. // if found {
  169. // break
  170. // }
  171. // }
  172. // if i == 10 {
  173. // t.Errorf("master node is not elected")
  174. // return
  175. // }
  176. // // check if plugins[master_idx] is removed
  177. // identity, err := plugins[masterIdx].Identity()
  178. // if err != nil {
  179. // t.Errorf("get plugin identity failed: %v", err)
  180. // return
  181. // }
  182. // hashedIdentity := plugin_entities.HashedIdentity(identity.String())
  183. // ticker := time.NewTicker(time.Second)
  184. // timeout := time.NewTimer(time.Second * 20)
  185. // done := false
  186. // for !done {
  187. // select {
  188. // case <-ticker.C:
  189. // nodes, err := cluster[masterIdx].FetchPluginAvailableNodesByHashedId(hashedIdentity)
  190. // if err != nil {
  191. // t.Errorf("fetch plugin available nodes failed: %v", err)
  192. // return
  193. // }
  194. // if len(nodes) == 0 {
  195. // done = true
  196. // }
  197. // case <-timeout.C:
  198. // t.Errorf("plugin not removed")
  199. // return
  200. // }
  201. // }
  202. // // check if plugins[1-master_idx] is still scheduled
  203. // identity, err = plugins[1-masterIdx].Identity()
  204. // if err != nil {
  205. // t.Errorf("get plugin identity failed: %v", err)
  206. // return
  207. // }
  208. // hashedIdentity = plugin_entities.HashedIdentity(identity.String())
  209. // nodes, err := cluster[1-masterIdx].FetchPluginAvailableNodesByHashedId(hashedIdentity)
  210. // if err != nil {
  211. // t.Errorf("fetch plugin available nodes failed: %v", err)
  212. // return
  213. // }
  214. // if len(nodes) != 1 {
  215. // t.Errorf("plugin not scheduled")
  216. // return
  217. // }
  218. // }