generic.go 2.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. package plugin_daemon
  2. import (
  3. "errors"
  4. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/backwards_invocation"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/backwards_invocation/transaction"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  7. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  10. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  11. )
  12. func genericInvokePlugin[Req any, Rsp any](
  13. session *session_manager.Session,
  14. request *Req,
  15. response_buffer_size int,
  16. ) (*stream.Stream[Rsp], error) {
  17. runtime := session.Runtime()
  18. if runtime == nil {
  19. return nil, errors.New("plugin not found")
  20. }
  21. response := stream.NewStream[Rsp](response_buffer_size)
  22. listener := runtime.Listen(session.ID)
  23. listener.Listen(func(chunk plugin_entities.SessionMessage) {
  24. switch chunk.Type {
  25. case plugin_entities.SESSION_MESSAGE_TYPE_STREAM:
  26. chunk, err := parser.UnmarshalJsonBytes[Rsp](chunk.Data)
  27. if err != nil {
  28. log.Error("unmarshal json failed: %s", err.Error())
  29. response.WriteError(err)
  30. } else {
  31. response.Write(chunk)
  32. }
  33. case plugin_entities.SESSION_MESSAGE_TYPE_INVOKE:
  34. // check if the request contains a aws_event_id
  35. if runtime.Type() == plugin_entities.PLUGIN_RUNTIME_TYPE_AWS {
  36. response.WriteError(errors.New("aws event is not supported by full duplex"))
  37. response.Close()
  38. return
  39. }
  40. if err := backwards_invocation.InvokeDify(
  41. runtime.Configuration(),
  42. session.InvokeFrom,
  43. session,
  44. transaction.NewFullDuplexEventWriter(session),
  45. chunk.Data,
  46. ); err != nil {
  47. log.Error("invoke dify failed: %s", err.Error())
  48. return
  49. }
  50. case plugin_entities.SESSION_MESSAGE_TYPE_END:
  51. response.Close()
  52. case plugin_entities.SESSION_MESSAGE_TYPE_ERROR:
  53. e, err := parser.UnmarshalJsonBytes[plugin_entities.ErrorResponse](chunk.Data)
  54. if err != nil {
  55. break
  56. }
  57. response.WriteError(errors.New(e.Error))
  58. response.Close()
  59. default:
  60. response.WriteError(errors.New("unknown stream message type: " + string(chunk.Type)))
  61. response.Close()
  62. }
  63. })
  64. response.OnClose(func() {
  65. listener.Close()
  66. })
  67. session.Write(
  68. session_manager.PLUGIN_IN_STREAM_EVENT_REQUEST,
  69. getInvokePluginMap(
  70. session,
  71. request,
  72. ),
  73. )
  74. return response, nil
  75. }
  76. func getInvokePluginMap(
  77. session *session_manager.Session,
  78. request any,
  79. ) map[string]any {
  80. req := getBasicPluginAccessMap(session.UserID, session.InvokeFrom, session.Action)
  81. for k, v := range parser.StructToMap(request) {
  82. req[k] = v
  83. }
  84. return req
  85. }