io.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. package debugging_runtime
  2. import (
  3. "encoding/json"
  4. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
  5. "github.com/langgenius/dify-plugin-daemon/internal/types/exception"
  6. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  7. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  9. "github.com/langgenius/dify-plugin-daemon/pkg/entities"
  10. "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
  11. "github.com/panjf2000/gnet/v2"
  12. )
  13. func (r *RemotePluginRuntime) Listen(session_id string) *entities.Broadcast[plugin_entities.SessionMessage] {
  14. listener := entities.NewBroadcast[plugin_entities.SessionMessage]()
  15. listener.OnClose(func() {
  16. // execute in new goroutine to avoid deadlock
  17. routine.Submit(map[string]string{
  18. "module": "debugging_runtime",
  19. "method": "removeMessageCallbackHandler",
  20. }, func() {
  21. r.removeMessageCallbackHandler(session_id)
  22. r.removeSessionMessageCloser(session_id)
  23. })
  24. })
  25. // add session message closer to avoid unexpected connection closed
  26. r.addSessionMessageCloser(session_id, func() {
  27. listener.Send(plugin_entities.SessionMessage{
  28. Type: plugin_entities.SESSION_MESSAGE_TYPE_ERROR,
  29. Data: json.RawMessage(parser.MarshalJson(plugin_entities.ErrorResponse{
  30. ErrorType: exception.PluginConnectionClosedError,
  31. Message: "Connection closed unexpectedly",
  32. Args: map[string]any{},
  33. })),
  34. })
  35. })
  36. r.addMessageCallbackHandler(session_id, func(data []byte) {
  37. // unmarshal the session message
  38. chunk, err := parser.UnmarshalJsonBytes[plugin_entities.SessionMessage](data)
  39. if err != nil {
  40. log.Error("unmarshal json failed: %s, failed to parse session message", err.Error())
  41. return
  42. }
  43. listener.Send(chunk)
  44. })
  45. return listener
  46. }
  47. func (r *RemotePluginRuntime) Write(session_id string, action access_types.PluginAccessAction, data []byte) {
  48. r.conn.AsyncWrite(append(data, '\n'), func(c gnet.Conn, err error) error {
  49. return nil
  50. })
  51. }