io.go 1.9 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859
  1. package debugging_runtime
  2. import (
  3. "encoding/json"
  4. "github.com/langgenius/dify-plugin-daemon/internal/types/exception"
  5. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  6. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  7. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  8. "github.com/langgenius/dify-plugin-daemon/pkg/entities"
  9. "github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
  10. "github.com/panjf2000/gnet/v2"
  11. )
  12. func (r *RemotePluginRuntime) Listen(session_id string) *entities.Broadcast[plugin_entities.SessionMessage] {
  13. listener := entities.NewBroadcast[plugin_entities.SessionMessage]()
  14. listener.OnClose(func() {
  15. // execute in new goroutine to avoid deadlock
  16. routine.Submit(map[string]string{
  17. "module": "debugging_runtime",
  18. "method": "removeMessageCallbackHandler",
  19. }, func() {
  20. r.removeMessageCallbackHandler(session_id)
  21. r.removeSessionMessageCloser(session_id)
  22. })
  23. })
  24. // add session message closer to avoid unexpected connection closed
  25. r.addSessionMessageCloser(session_id, func() {
  26. listener.Send(plugin_entities.SessionMessage{
  27. Type: plugin_entities.SESSION_MESSAGE_TYPE_ERROR,
  28. Data: json.RawMessage(parser.MarshalJson(plugin_entities.ErrorResponse{
  29. ErrorType: exception.PluginConnectionClosedError,
  30. Message: "Connection closed unexpectedly",
  31. Args: map[string]any{},
  32. })),
  33. })
  34. })
  35. r.addMessageCallbackHandler(session_id, func(data []byte) {
  36. // unmarshal the session message
  37. chunk, err := parser.UnmarshalJsonBytes[plugin_entities.SessionMessage](data)
  38. if err != nil {
  39. log.Error("unmarshal json failed: %s, failed to parse session message", err.Error())
  40. return
  41. }
  42. listener.Send(chunk)
  43. })
  44. return listener
  45. }
  46. func (r *RemotePluginRuntime) Write(session_id string, data []byte) {
  47. r.conn.AsyncWrite(append(data, '\n'), func(c gnet.Conn, err error) error {
  48. return nil
  49. })
  50. }