daemon.go 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. package plugin_daemon
  2. import (
  3. "errors"
  4. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
  5. "github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
  6. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  7. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  10. )
  11. type ToolResponseChunk = plugin_entities.InvokeToolResponseChunk
  12. func InvokeTool(session *session_manager.Session, provider_name string, tool_name string, tool_parameters map[string]any) (
  13. *stream.StreamResponse[ToolResponseChunk], error,
  14. ) {
  15. runtime := plugin_manager.Get(session.PluginIdentity())
  16. if runtime == nil {
  17. return nil, errors.New("plugin not found")
  18. }
  19. response := stream.NewStreamResponse[ToolResponseChunk](512)
  20. listener := runtime.Listen(session.ID())
  21. listener.AddListener(func(message []byte) {
  22. chunk, err := parser.UnmarshalJsonBytes[plugin_entities.StreamMessage](message)
  23. if err != nil {
  24. log.Error("unmarshal json failed: %s", err.Error())
  25. return
  26. }
  27. switch chunk.Type {
  28. case plugin_entities.STREAM_MESSAGE_TYPE_STREAM:
  29. chunk, err := parser.UnmarshalJsonBytes[ToolResponseChunk](chunk.Data)
  30. if err != nil {
  31. log.Error("unmarshal json failed: %s", err.Error())
  32. return
  33. }
  34. response.Write(chunk)
  35. case plugin_entities.STREAM_MESSAGE_TYPE_INVOKE:
  36. // TODO: invoke dify
  37. case plugin_entities.STREAM_MESSAGE_TYPE_END:
  38. response.Close()
  39. default:
  40. log.Error("unknown stream message type: %s", chunk.Type)
  41. response.Close()
  42. }
  43. })
  44. response.OnClose(func() {
  45. listener.Close()
  46. })
  47. runtime.Write(session.ID(), []byte(parser.MarshalJson(
  48. map[string]any{
  49. "provider": provider_name,
  50. "tool": tool_name,
  51. "parameters": tool_parameters,
  52. "session_id": session.ID(),
  53. },
  54. )))
  55. return response, nil
  56. }