tester.go 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. package plugin_manager
  2. import (
  3. "errors"
  4. "runtime"
  5. "time"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon"
  7. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
  8. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_manager"
  9. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/local_manager"
  10. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
  11. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  12. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  13. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  14. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  15. )
  16. func (p *PluginManager) TestPlugin(
  17. path string,
  18. request map[string]any,
  19. access_type access_types.PluginAccessType,
  20. access_action access_types.PluginAccessAction,
  21. timeout string,
  22. ) (*stream.Stream[any], error) {
  23. // launch plugin runtime
  24. plugin, err := p.loadPlugin(path)
  25. if err != nil {
  26. return nil, errors.Join(err, errors.New("failed to load plugin"))
  27. }
  28. // get assets
  29. assets, err := plugin.Decoder.Assets()
  30. if err != nil {
  31. return nil, errors.Join(err, errors.New("failed to get assets"))
  32. }
  33. local_plugin_runtime := local_manager.NewLocalPluginRuntime()
  34. local_plugin_runtime.PluginRuntime = plugin.Runtime
  35. local_plugin_runtime.PositivePluginRuntime = positive_manager.PositivePluginRuntime{
  36. BasicPluginRuntime: basic_manager.NewBasicPluginRuntime(p.mediaManager),
  37. LocalPackagePath: plugin.Runtime.State.AbsolutePath,
  38. WorkingPath: plugin.Runtime.State.WorkingPath,
  39. Decoder: plugin.Decoder,
  40. }
  41. if err := local_plugin_runtime.RemapAssets(
  42. &local_plugin_runtime.Config,
  43. assets,
  44. ); err != nil {
  45. return nil, errors.Join(err, errors.New("failed to remap assets"))
  46. }
  47. identity, err := local_plugin_runtime.Identity()
  48. if err != nil {
  49. return nil, errors.Join(err, errors.New("failed to get identity"))
  50. }
  51. // local plugin
  52. routine.Submit(func() {
  53. defer func() {
  54. if r := recover(); r != nil {
  55. // print stack trace
  56. buf := make([]byte, 1<<16)
  57. runtime.Stack(buf, true)
  58. log.Error("plugin runtime error: %v, stack trace: %s", r, string(buf))
  59. }
  60. }()
  61. // delete the plugin from the storage when the plugin is stopped
  62. defer p.runningPluginInStorage.Delete(plugin.Runtime.State.AbsolutePath)
  63. p.fullDuplexLifetime(local_plugin_runtime)
  64. })
  65. // wait for the plugin to start
  66. var timeout_duration time.Duration
  67. if timeout == "" {
  68. timeout_duration = 120 * time.Second
  69. } else {
  70. timeout_duration, err = time.ParseDuration(timeout)
  71. if err != nil {
  72. return nil, errors.Join(err, errors.New("failed to parse timeout"))
  73. }
  74. }
  75. select {
  76. case <-local_plugin_runtime.WaitStarted():
  77. case <-time.After(timeout_duration):
  78. return nil, errors.New("failed to start plugin after " + timeout_duration.String())
  79. }
  80. session := session_manager.NewSession(
  81. session_manager.NewSessionPayload{
  82. TenantID: "test-tenant",
  83. UserID: "test-user",
  84. PluginUniqueIdentifier: identity,
  85. ClusterID: "test-cluster",
  86. InvokeFrom: access_type,
  87. Action: access_action,
  88. Declaration: plugin.Runtime.Configuration(),
  89. BackwardsInvocation: manager.BackwardsInvocation(),
  90. IgnoreCache: true,
  91. },
  92. )
  93. session.BindRuntime(local_plugin_runtime)
  94. defer session.Close(session_manager.CloseSessionPayload{
  95. IgnoreCache: true,
  96. })
  97. // try send request
  98. plugin_response, err := plugin_daemon.GenericInvokePlugin[map[string]any, any](session, &request, 1024)
  99. if err != nil {
  100. return nil, errors.Join(err, errors.New("failed to invoke plugin"))
  101. }
  102. plugin_response.OnClose(func() {
  103. local_plugin_runtime.Stop()
  104. })
  105. return plugin_response, nil
  106. }