tester.go 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. package plugin_manager
  2. import (
  3. "errors"
  4. "runtime"
  5. "time"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon"
  7. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
  8. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_manager"
  9. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/local_manager"
  10. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
  11. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  12. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  13. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  14. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  15. )
  16. func (p *PluginManager) TestPlugin(
  17. path string,
  18. request map[string]any,
  19. access_type access_types.PluginAccessType,
  20. access_action access_types.PluginAccessAction,
  21. timeout string,
  22. ) (*stream.Stream[any], error) {
  23. // launch plugin runtime
  24. plugin, err := p.getLocalPluginRuntime(path)
  25. if err != nil {
  26. return nil, errors.Join(err, errors.New("failed to load plugin"))
  27. }
  28. // get assets
  29. assets, err := plugin.decoder.Assets()
  30. if err != nil {
  31. return nil, errors.Join(err, errors.New("failed to get assets"))
  32. }
  33. local_plugin_runtime := local_manager.NewLocalPluginRuntime()
  34. local_plugin_runtime.PluginRuntime = plugin.runtime
  35. local_plugin_runtime.PositivePluginRuntime = positive_manager.PositivePluginRuntime{
  36. BasicPluginRuntime: basic_manager.NewBasicPluginRuntime(p.mediaManager),
  37. LocalPackagePath: plugin.runtime.State.AbsolutePath,
  38. WorkingPath: plugin.runtime.State.WorkingPath,
  39. Decoder: plugin.decoder,
  40. }
  41. if err := local_plugin_runtime.RemapAssets(
  42. &local_plugin_runtime.Config,
  43. assets,
  44. ); err != nil {
  45. return nil, errors.Join(err, errors.New("failed to remap assets"))
  46. }
  47. identity, err := local_plugin_runtime.Identity()
  48. if err != nil {
  49. return nil, errors.Join(err, errors.New("failed to get identity"))
  50. }
  51. // local plugin
  52. routine.Submit(func() {
  53. defer func() {
  54. if r := recover(); r != nil {
  55. // print stack trace
  56. buf := make([]byte, 1<<16)
  57. runtime.Stack(buf, true)
  58. log.Error("plugin runtime error: %v, stack trace: %s", r, string(buf))
  59. }
  60. }()
  61. // delete the plugin from the storage when the plugin is stopped
  62. p.fullDuplexLifetime(local_plugin_runtime)
  63. })
  64. // wait for the plugin to start
  65. var timeout_duration time.Duration
  66. if timeout == "" {
  67. timeout_duration = 120 * time.Second
  68. } else {
  69. timeout_duration, err = time.ParseDuration(timeout)
  70. if err != nil {
  71. return nil, errors.Join(err, errors.New("failed to parse timeout"))
  72. }
  73. }
  74. select {
  75. case <-local_plugin_runtime.WaitStarted():
  76. case <-time.After(timeout_duration):
  77. return nil, errors.New("failed to start plugin after " + timeout_duration.String())
  78. }
  79. session := session_manager.NewSession(
  80. session_manager.NewSessionPayload{
  81. TenantID: "test-tenant",
  82. UserID: "test-user",
  83. PluginUniqueIdentifier: identity,
  84. ClusterID: "test-cluster",
  85. InvokeFrom: access_type,
  86. Action: access_action,
  87. Declaration: plugin.runtime.Configuration(),
  88. BackwardsInvocation: manager.BackwardsInvocation(),
  89. IgnoreCache: true,
  90. },
  91. )
  92. session.BindRuntime(local_plugin_runtime)
  93. defer session.Close(session_manager.CloseSessionPayload{
  94. IgnoreCache: true,
  95. })
  96. // try send request
  97. plugin_response, err := plugin_daemon.GenericInvokePlugin[map[string]any, any](session, &request, 1024)
  98. if err != nil {
  99. return nil, errors.Join(err, errors.New("failed to invoke plugin"))
  100. }
  101. plugin_response.OnClose(func() {
  102. local_plugin_runtime.Stop()
  103. })
  104. return plugin_response, nil
  105. }