hooks.go 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. package remote_manager
  2. import (
  3. "sync"
  4. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  5. "github.com/panjf2000/gnet/v2"
  6. )
  7. type DifyServer struct {
  8. gnet.BuiltinEventEngine
  9. engine gnet.Engine
  10. // listening address
  11. addr string
  12. // enabled multicore
  13. multicore bool
  14. // event loop count
  15. num_loops int
  16. // read new connections
  17. response *stream.StreamResponse[*RemotePluginRuntime]
  18. plugins map[int]*RemotePluginRuntime
  19. plugins_lock *sync.RWMutex
  20. shutdown_chan chan bool
  21. }
  22. func (s *DifyServer) OnBoot(c gnet.Engine) (action gnet.Action) {
  23. s.engine = c
  24. return gnet.None
  25. }
  26. func (s *DifyServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
  27. // new plugin connected
  28. c.SetContext(&codec{})
  29. runtime := &RemotePluginRuntime{
  30. conn: c,
  31. response: stream.NewStreamResponse[[]byte](512),
  32. callbacks: make(map[string][]func([]byte)),
  33. callbacks_lock: &sync.RWMutex{},
  34. shutdown_chan: make(chan bool),
  35. alive: true,
  36. }
  37. // store plugin runtime
  38. s.plugins_lock.Lock()
  39. s.plugins[c.Fd()] = runtime
  40. s.plugins_lock.Unlock()
  41. s.response.Write(runtime)
  42. // verified
  43. verified := true
  44. if verified {
  45. return nil, gnet.None
  46. }
  47. return nil, gnet.Close
  48. }
  49. func (s *DifyServer) OnClose(c gnet.Conn, err error) (action gnet.Action) {
  50. // plugin disconnected
  51. s.plugins_lock.Lock()
  52. plugin := s.plugins[c.Fd()]
  53. delete(s.plugins, c.Fd())
  54. s.plugins_lock.Unlock()
  55. // close plugin
  56. plugin.close()
  57. return gnet.None
  58. }
  59. func (s *DifyServer) OnShutdown(c gnet.Engine) {
  60. close(s.shutdown_chan)
  61. }
  62. func (s *DifyServer) OnTraffic(c gnet.Conn) (action gnet.Action) {
  63. codec := c.Context().(*codec)
  64. messages, err := codec.Decode(c)
  65. if err != nil {
  66. return gnet.Close
  67. }
  68. // get plugin runtime
  69. s.plugins_lock.RLock()
  70. runtime, ok := s.plugins[c.Fd()]
  71. s.plugins_lock.RUnlock()
  72. if !ok {
  73. return gnet.Close
  74. }
  75. // handle messages
  76. for _, message := range messages {
  77. runtime.response.Write(message)
  78. }
  79. return gnet.None
  80. }