daemon.go 1.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465
  1. package plugin_daemon
  2. import (
  3. "errors"
  4. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  6. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  7. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  10. )
  11. type ToolResponseChunk = plugin_entities.InvokeToolResponseChunk
  12. func InvokeTool(session *session_manager.Session, provider_name string, tool_name string, tool_parameters map[string]any) (
  13. *entities.InvocationResponse[ToolResponseChunk], error,
  14. ) {
  15. runtime := plugin_manager.Get(session.PluginIdentity())
  16. if runtime == nil {
  17. return nil, errors.New("plugin not found")
  18. }
  19. response := entities.NewInvocationResponse[ToolResponseChunk](512)
  20. listener := runtime.Listen(session.ID())
  21. listener.AddListener(func(message []byte) {
  22. chunk, err := parser.UnmarshalJsonBytes[plugin_entities.StreamMessage](message)
  23. if err != nil {
  24. log.Error("unmarshal json failed: %s", err.Error())
  25. return
  26. }
  27. switch chunk.Type {
  28. case plugin_entities.STREAM_MESSAGE_TYPE_STREAM:
  29. chunk, err := parser.UnmarshalJsonBytes[ToolResponseChunk](chunk.Data)
  30. if err != nil {
  31. log.Error("unmarshal json failed: %s", err.Error())
  32. return
  33. }
  34. response.Write(chunk)
  35. case plugin_entities.STREAM_MESSAGE_TYPE_END:
  36. response.Close()
  37. default:
  38. log.Error("unknown stream message type: %s", chunk.Type)
  39. response.Close()
  40. }
  41. })
  42. response.OnClose(func() {
  43. listener.Close()
  44. })
  45. runtime.Write(session.ID(), []byte(parser.MarshalJson(
  46. map[string]any{
  47. "provider": provider_name,
  48. "tool": tool_name,
  49. "parameters": tool_parameters,
  50. "session_id": session.ID,
  51. },
  52. )))
  53. return response, nil
  54. }