generic.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. package plugin_daemon
  2. import (
  3. "errors"
  4. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/backwards_invocation"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  7. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  8. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  10. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  11. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  12. )
  13. func genericInvokePlugin[Req any, Rsp any](
  14. session *session_manager.Session,
  15. request *Req,
  16. response_buffer_size int,
  17. typ access_types.PluginAccessType,
  18. action access_types.PluginAccessAction,
  19. ) (*stream.StreamResponse[Rsp], error) {
  20. runtime := plugin_manager.GetGlobalPluginManager().Get(session.PluginIdentity())
  21. if runtime == nil {
  22. return nil, errors.New("plugin not found")
  23. }
  24. response := stream.NewStreamResponse[Rsp](response_buffer_size)
  25. listener := runtime.Listen(session.ID())
  26. listener.AddListener(func(message []byte) {
  27. chunk, err := parser.UnmarshalJsonBytes[plugin_entities.SessionMessage](message)
  28. if err != nil {
  29. log.Error("unmarshal json failed: %s", err.Error())
  30. return
  31. }
  32. switch chunk.Type {
  33. case plugin_entities.SESSION_MESSAGE_TYPE_STREAM:
  34. chunk, err := parser.UnmarshalJsonBytes[Rsp](chunk.Data)
  35. if err != nil {
  36. log.Error("unmarshal json failed: %s", err.Error())
  37. response.WriteError(err)
  38. } else {
  39. response.Write(chunk)
  40. }
  41. case plugin_entities.SESSION_MESSAGE_TYPE_INVOKE:
  42. if err := backwards_invocation.InvokeDify(runtime, typ, session, chunk.Data); err != nil {
  43. log.Error("invoke dify failed: %s", err.Error())
  44. return
  45. }
  46. case plugin_entities.SESSION_MESSAGE_TYPE_END:
  47. response.Close()
  48. case plugin_entities.SESSION_MESSAGE_TYPE_ERROR:
  49. e, err := parser.UnmarshalJsonBytes[plugin_entities.ErrorResponse](chunk.Data)
  50. if err != nil {
  51. break
  52. }
  53. response.WriteError(errors.New(e.Error))
  54. response.Close()
  55. default:
  56. response.WriteError(errors.New("unknown stream message type: " + string(chunk.Type)))
  57. response.Close()
  58. }
  59. })
  60. response.OnClose(func() {
  61. listener.Close()
  62. })
  63. session.Write(
  64. session_manager.PLUGIN_IN_STREAM_EVENT_REQUEST,
  65. getInvokePluginMap(
  66. session,
  67. typ,
  68. action,
  69. request,
  70. ),
  71. )
  72. return response, nil
  73. }
  74. func getInvokePluginMap(
  75. session *session_manager.Session,
  76. typ access_types.PluginAccessType,
  77. action access_types.PluginAccessAction,
  78. request any,
  79. ) map[string]any {
  80. req := getBasicPluginAccessMap(session.UserID(), typ, action)
  81. for k, v := range parser.StructToMap(request) {
  82. req[k] = v
  83. }
  84. return req
  85. }