hooks.go 7.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. package remote_manager
  2. import (
  3. "encoding/hex"
  4. "sync"
  5. "time"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_manager"
  7. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/media_manager"
  8. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
  10. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  11. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  12. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  13. "github.com/panjf2000/gnet/v2"
  14. )
  15. type DifyServer struct {
  16. gnet.BuiltinEventEngine
  17. engine gnet.Engine
  18. mediaManager *media_manager.MediaManager
  19. // listening address
  20. addr string
  21. port uint16
  22. // enabled multicore
  23. multicore bool
  24. // event loop count
  25. num_loops int
  26. // read new connections
  27. response *stream.StreamResponse[*RemotePluginRuntime]
  28. plugins map[int]*RemotePluginRuntime
  29. plugins_lock *sync.RWMutex
  30. shutdown_chan chan bool
  31. }
  32. func (s *DifyServer) OnBoot(c gnet.Engine) (action gnet.Action) {
  33. s.engine = c
  34. return gnet.None
  35. }
  36. func (s *DifyServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
  37. // new plugin connected
  38. c.SetContext(&codec{})
  39. runtime := &RemotePluginRuntime{
  40. BasicPluginRuntime: basic_manager.NewBasicPluginRuntime(
  41. s.mediaManager,
  42. ),
  43. conn: c,
  44. response: stream.NewStreamResponse[[]byte](512),
  45. callbacks: make(map[string][]func([]byte)),
  46. callbacks_lock: &sync.RWMutex{},
  47. shutdown_chan: make(chan bool),
  48. alive: true,
  49. }
  50. // store plugin runtime
  51. s.plugins_lock.Lock()
  52. s.plugins[c.Fd()] = runtime
  53. s.plugins_lock.Unlock()
  54. // start a timer to check if handshake is completed in 10 seconds
  55. time.AfterFunc(time.Second*10, func() {
  56. if !runtime.handshake {
  57. // close connection
  58. c.Close()
  59. }
  60. })
  61. // verified
  62. verified := true
  63. if verified {
  64. return nil, gnet.None
  65. }
  66. return nil, gnet.Close
  67. }
  68. func (s *DifyServer) OnClose(c gnet.Conn, err error) (action gnet.Action) {
  69. // plugin disconnected
  70. s.plugins_lock.Lock()
  71. plugin := s.plugins[c.Fd()]
  72. delete(s.plugins, c.Fd())
  73. s.plugins_lock.Unlock()
  74. // close plugin
  75. plugin.onDisconnected()
  76. // clear assets
  77. plugin.ClearAssets()
  78. // uninstall plugin
  79. if plugin.handshake && plugin.registration_transferred &&
  80. plugin.endpoints_registration_transferred &&
  81. plugin.models_registration_transferred &&
  82. plugin.tools_registration_transferred {
  83. if err := plugin.Unregister(); err != nil {
  84. log.Error("unregister plugin failed, error: %v", err)
  85. }
  86. }
  87. return gnet.None
  88. }
  89. func (s *DifyServer) OnShutdown(c gnet.Engine) {
  90. close(s.shutdown_chan)
  91. }
  92. func (s *DifyServer) OnTraffic(c gnet.Conn) (action gnet.Action) {
  93. codec := c.Context().(*codec)
  94. messages, err := codec.Decode(c)
  95. if err != nil {
  96. return gnet.Close
  97. }
  98. // get plugin runtime
  99. s.plugins_lock.RLock()
  100. runtime, ok := s.plugins[c.Fd()]
  101. s.plugins_lock.RUnlock()
  102. if !ok {
  103. return gnet.Close
  104. }
  105. // handle messages
  106. for _, message := range messages {
  107. if len(message) == 0 {
  108. continue
  109. }
  110. s.onMessage(runtime, message)
  111. }
  112. return gnet.None
  113. }
  114. func (s *DifyServer) onMessage(runtime *RemotePluginRuntime, message []byte) {
  115. // handle message
  116. if runtime.handshake_failed {
  117. // do nothing if handshake has failed
  118. return
  119. }
  120. if !runtime.handshake {
  121. key := string(message)
  122. info, err := GetConnectionInfo(key)
  123. if err == cache.ErrNotFound {
  124. // close connection if handshake failed
  125. runtime.conn.Write([]byte("handshake failed, invalid key\n"))
  126. runtime.conn.Close()
  127. runtime.handshake_failed = true
  128. return
  129. } else if err != nil {
  130. // close connection if handshake failed
  131. runtime.conn.Write([]byte("internal error\n"))
  132. runtime.conn.Close()
  133. return
  134. }
  135. runtime.tenant_id = info.TenantId
  136. // handshake completed
  137. runtime.handshake = true
  138. } else if !runtime.registration_transferred {
  139. // process handle shake if not completed
  140. declaration, err := parser.UnmarshalJsonBytes[plugin_entities.PluginDeclaration](message)
  141. if err != nil {
  142. // close connection if handshake failed
  143. runtime.conn.Write([]byte("handshake failed\n"))
  144. runtime.conn.Close()
  145. return
  146. }
  147. runtime.Config = declaration
  148. // registration transferred
  149. runtime.registration_transferred = true
  150. } else if !runtime.tools_registration_transferred {
  151. tools, err := parser.UnmarshalJsonBytes2Slice[plugin_entities.ToolProviderDeclaration](message)
  152. if err != nil {
  153. runtime.conn.Write([]byte("tools register failed\n"))
  154. log.Error("tools register failed, error: %v", err)
  155. runtime.conn.Close()
  156. return
  157. }
  158. runtime.tools_registration_transferred = true
  159. if len(tools) > 0 {
  160. declaration := runtime.Config
  161. declaration.Tool = &tools[0]
  162. runtime.Config = declaration
  163. }
  164. } else if !runtime.models_registration_transferred {
  165. models, err := parser.UnmarshalJsonBytes2Slice[plugin_entities.ModelProviderDeclaration](message)
  166. if err != nil {
  167. runtime.conn.Write([]byte("models register failed\n"))
  168. log.Error("models register failed, error: %v", err)
  169. runtime.conn.Close()
  170. return
  171. }
  172. runtime.models_registration_transferred = true
  173. if len(models) > 0 {
  174. declaration := runtime.Config
  175. declaration.Model = &models[0]
  176. runtime.Config = declaration
  177. }
  178. } else if !runtime.endpoints_registration_transferred {
  179. endpoints, err := parser.UnmarshalJsonBytes2Slice[plugin_entities.EndpointProviderDeclaration](message)
  180. if err != nil {
  181. runtime.conn.Write([]byte("endpoints register failed\n"))
  182. log.Error("endpoints register failed, error: %v", err)
  183. runtime.conn.Close()
  184. return
  185. }
  186. runtime.endpoints_registration_transferred = true
  187. if len(endpoints) > 0 {
  188. declaration := runtime.Config
  189. declaration.Endpoint = &endpoints[0]
  190. runtime.Config = declaration
  191. }
  192. } else if !runtime.assets_transferred {
  193. assets, err := parser.UnmarshalJsonBytes2Slice[plugin_entities.RemoteAssetPayload](message)
  194. if err != nil {
  195. runtime.conn.Write([]byte("assets register failed\n"))
  196. log.Error("assets register failed, error: %v", err)
  197. runtime.conn.Close()
  198. return
  199. }
  200. files := make(map[string][]byte)
  201. for _, asset := range assets {
  202. files[asset.Filename], err = hex.DecodeString(asset.Data)
  203. if err != nil {
  204. runtime.conn.Write([]byte("assets decode failed\n"))
  205. log.Error("assets decode failed, error: %v", err)
  206. runtime.conn.Close()
  207. return
  208. }
  209. }
  210. // remap assets
  211. if err := runtime.RemapAssets(&runtime.Config, files); err != nil {
  212. runtime.conn.Write([]byte("assets remap failed\n"))
  213. log.Error("assets remap failed, error: %v", err)
  214. runtime.conn.Close()
  215. return
  216. }
  217. runtime.assets_transferred = true
  218. runtime.checksum = runtime.calculateChecksum()
  219. runtime.InitState()
  220. runtime.SetActiveAt(time.Now())
  221. // trigger registration event
  222. if err := runtime.Register(); err != nil {
  223. runtime.conn.Write([]byte("register failed\n"))
  224. log.Error("register failed, error: %v", err)
  225. runtime.conn.Close()
  226. return
  227. }
  228. // publish runtime to watcher
  229. s.response.Write(runtime)
  230. } else {
  231. // continue handle messages if handshake completed
  232. runtime.response.Write(message)
  233. }
  234. }
  235. func (s *DifyServer) onAssets(runtime *RemotePluginRuntime, assets []plugin_entities.RemoteAssetPayload) {
  236. }