generic.go 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. package plugin_daemon
  2. import (
  3. "errors"
  4. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/backwards_invocation"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  7. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  10. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  11. )
  12. func genericInvokePlugin[Req any, Rsp any](
  13. session *session_manager.Session,
  14. request *Req,
  15. response_buffer_size int,
  16. typ backwards_invocation.PluginAccessType,
  17. action backwards_invocation.PluginAccessAction,
  18. ) (*stream.StreamResponse[Rsp], error) {
  19. runtime := plugin_manager.GetGlobalPluginManager().Get(session.PluginIdentity())
  20. if runtime == nil {
  21. return nil, errors.New("plugin not found")
  22. }
  23. response := stream.NewStreamResponse[Rsp](response_buffer_size)
  24. listener := runtime.Listen(session.ID())
  25. listener.AddListener(func(message []byte) {
  26. chunk, err := parser.UnmarshalJsonBytes[plugin_entities.SessionMessage](message)
  27. if err != nil {
  28. log.Error("unmarshal json failed: %s", err.Error())
  29. return
  30. }
  31. switch chunk.Type {
  32. case plugin_entities.SESSION_MESSAGE_TYPE_STREAM:
  33. chunk, err := parser.UnmarshalJsonBytes[Rsp](chunk.Data)
  34. if err != nil {
  35. log.Error("unmarshal json failed: %s", err.Error())
  36. return
  37. }
  38. response.Write(chunk)
  39. case plugin_entities.SESSION_MESSAGE_TYPE_INVOKE:
  40. if err := backwards_invocation.InvokeDify(runtime, typ, session, chunk.Data); err != nil {
  41. log.Error("invoke dify failed: %s", err.Error())
  42. return
  43. }
  44. case plugin_entities.SESSION_MESSAGE_TYPE_END:
  45. response.Close()
  46. case plugin_entities.SESSION_MESSAGE_TYPE_ERROR:
  47. e, err := parser.UnmarshalJsonBytes[plugin_entities.ErrorResponse](chunk.Data)
  48. if err != nil {
  49. break
  50. }
  51. response.WriteError(errors.New(e.Error))
  52. response.Close()
  53. default:
  54. response.WriteError(errors.New("unknown stream message type: " + string(chunk.Type)))
  55. response.Close()
  56. }
  57. })
  58. response.OnClose(func() {
  59. listener.Close()
  60. })
  61. session.Write(
  62. session_manager.PLUGIN_IN_STREAM_EVENT_REQUEST,
  63. getInvokePluginMap(
  64. session,
  65. typ,
  66. action,
  67. request,
  68. ),
  69. )
  70. return response, nil
  71. }
  72. func getInvokePluginMap(
  73. session *session_manager.Session,
  74. typ backwards_invocation.PluginAccessType,
  75. action backwards_invocation.PluginAccessAction,
  76. request any,
  77. ) map[string]any {
  78. req := getBasicPluginAccessMap(session.UserID(), typ, action)
  79. for k, v := range parser.StructToMap(request) {
  80. req[k] = v
  81. }
  82. return req
  83. }