hooks.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141
  1. package remote_manager
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  6. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  7. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  8. "github.com/panjf2000/gnet/v2"
  9. )
  10. type DifyServer struct {
  11. gnet.BuiltinEventEngine
  12. engine gnet.Engine
  13. // listening address
  14. addr string
  15. // enabled multicore
  16. multicore bool
  17. // event loop count
  18. num_loops int
  19. // read new connections
  20. response *stream.StreamResponse[*RemotePluginRuntime]
  21. plugins map[int]*RemotePluginRuntime
  22. plugins_lock *sync.RWMutex
  23. shutdown_chan chan bool
  24. }
  25. func (s *DifyServer) OnBoot(c gnet.Engine) (action gnet.Action) {
  26. s.engine = c
  27. return gnet.None
  28. }
  29. func (s *DifyServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
  30. // new plugin connected
  31. c.SetContext(&codec{})
  32. runtime := &RemotePluginRuntime{
  33. conn: c,
  34. response: stream.NewStreamResponse[[]byte](512),
  35. callbacks: make(map[string][]func([]byte)),
  36. callbacks_lock: &sync.RWMutex{},
  37. shutdown_chan: make(chan bool),
  38. alive: true,
  39. }
  40. // store plugin runtime
  41. s.plugins_lock.Lock()
  42. s.plugins[c.Fd()] = runtime
  43. s.plugins_lock.Unlock()
  44. // start a timer to check if handshake is completed in 10 seconds
  45. time.AfterFunc(time.Second*10, func() {
  46. if !runtime.handshake {
  47. // close connection
  48. c.Close()
  49. }
  50. })
  51. // verified
  52. verified := true
  53. if verified {
  54. return nil, gnet.None
  55. }
  56. return nil, gnet.Close
  57. }
  58. func (s *DifyServer) OnClose(c gnet.Conn, err error) (action gnet.Action) {
  59. // plugin disconnected
  60. s.plugins_lock.Lock()
  61. plugin := s.plugins[c.Fd()]
  62. delete(s.plugins, c.Fd())
  63. s.plugins_lock.Unlock()
  64. // close plugin
  65. plugin.close()
  66. return gnet.None
  67. }
  68. func (s *DifyServer) OnShutdown(c gnet.Engine) {
  69. close(s.shutdown_chan)
  70. }
  71. func (s *DifyServer) OnTraffic(c gnet.Conn) (action gnet.Action) {
  72. codec := c.Context().(*codec)
  73. messages, err := codec.Decode(c)
  74. if err != nil {
  75. return gnet.Close
  76. }
  77. // get plugin runtime
  78. s.plugins_lock.RLock()
  79. runtime, ok := s.plugins[c.Fd()]
  80. s.plugins_lock.RUnlock()
  81. if !ok {
  82. return gnet.Close
  83. }
  84. // handle messages
  85. for _, message := range messages {
  86. s.onMessage(runtime, message)
  87. }
  88. return gnet.None
  89. }
  90. func (s *DifyServer) onMessage(runtime *RemotePluginRuntime, message []byte) {
  91. // handle message
  92. if !runtime.handshake {
  93. // process handle shake if not completed
  94. declaration, err := parser.UnmarshalJsonBytes[plugin_entities.PluginDeclaration](message)
  95. if err != nil {
  96. // close connection if handshake failed
  97. runtime.conn.Write([]byte("handshake failed\n"))
  98. runtime.conn.Close()
  99. return
  100. }
  101. runtime.Config = declaration
  102. // handshake completed
  103. runtime.handshake = true
  104. // publish runtime to watcher
  105. s.response.Write(runtime)
  106. } else {
  107. // continue handle messages if handshake completed
  108. runtime.response.Write(message)
  109. }
  110. }