hooks.go 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. package remote_manager
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  6. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
  7. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  9. "github.com/panjf2000/gnet/v2"
  10. )
  11. type DifyServer struct {
  12. gnet.BuiltinEventEngine
  13. engine gnet.Engine
  14. // listening address
  15. addr string
  16. port uint16
  17. // enabled multicore
  18. multicore bool
  19. // event loop count
  20. num_loops int
  21. // read new connections
  22. response *stream.StreamResponse[*RemotePluginRuntime]
  23. plugins map[int]*RemotePluginRuntime
  24. plugins_lock *sync.RWMutex
  25. shutdown_chan chan bool
  26. }
  27. func (s *DifyServer) OnBoot(c gnet.Engine) (action gnet.Action) {
  28. s.engine = c
  29. return gnet.None
  30. }
  31. func (s *DifyServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
  32. // new plugin connected
  33. c.SetContext(&codec{})
  34. runtime := &RemotePluginRuntime{
  35. conn: c,
  36. response: stream.NewStreamResponse[[]byte](512),
  37. callbacks: make(map[string][]func([]byte)),
  38. callbacks_lock: &sync.RWMutex{},
  39. shutdown_chan: make(chan bool),
  40. alive: true,
  41. }
  42. // store plugin runtime
  43. s.plugins_lock.Lock()
  44. s.plugins[c.Fd()] = runtime
  45. s.plugins_lock.Unlock()
  46. // start a timer to check if handshake is completed in 10 seconds
  47. time.AfterFunc(time.Second*10, func() {
  48. if !runtime.handshake {
  49. // close connection
  50. c.Close()
  51. }
  52. })
  53. // verified
  54. verified := true
  55. if verified {
  56. return nil, gnet.None
  57. }
  58. return nil, gnet.Close
  59. }
  60. func (s *DifyServer) OnClose(c gnet.Conn, err error) (action gnet.Action) {
  61. // plugin disconnected
  62. s.plugins_lock.Lock()
  63. plugin := s.plugins[c.Fd()]
  64. delete(s.plugins, c.Fd())
  65. s.plugins_lock.Unlock()
  66. // close plugin
  67. plugin.onDisconnected()
  68. return gnet.None
  69. }
  70. func (s *DifyServer) OnShutdown(c gnet.Engine) {
  71. close(s.shutdown_chan)
  72. }
  73. func (s *DifyServer) OnTraffic(c gnet.Conn) (action gnet.Action) {
  74. codec := c.Context().(*codec)
  75. messages, err := codec.Decode(c)
  76. if err != nil {
  77. return gnet.Close
  78. }
  79. // get plugin runtime
  80. s.plugins_lock.RLock()
  81. runtime, ok := s.plugins[c.Fd()]
  82. s.plugins_lock.RUnlock()
  83. if !ok {
  84. return gnet.Close
  85. }
  86. // handle messages
  87. for _, message := range messages {
  88. s.onMessage(runtime, message)
  89. }
  90. return gnet.None
  91. }
  92. func (s *DifyServer) onMessage(runtime *RemotePluginRuntime, message []byte) {
  93. // handle message
  94. if runtime.handshake_failed {
  95. // do nothing if handshake has failed
  96. return
  97. }
  98. if !runtime.handshake {
  99. key := string(message)
  100. info, err := GetConnectionInfo(key)
  101. if err == cache.ErrNotFound {
  102. // close connection if handshake failed
  103. runtime.conn.Write([]byte("handshake failed, invalid key\n"))
  104. runtime.conn.Close()
  105. runtime.handshake_failed = true
  106. return
  107. } else if err != nil {
  108. // close connection if handshake failed
  109. runtime.conn.Write([]byte("internal error\n"))
  110. runtime.conn.Close()
  111. return
  112. }
  113. runtime.State.TenantID = info.TenantId
  114. // handshake completed
  115. runtime.handshake = true
  116. } else if !runtime.registration_transferred {
  117. // process handle shake if not completed
  118. declaration, err := parser.UnmarshalJsonBytes[plugin_entities.PluginDeclaration](message)
  119. if err != nil {
  120. // close connection if handshake failed
  121. runtime.conn.Write([]byte("handshake failed\n"))
  122. runtime.conn.Close()
  123. return
  124. }
  125. runtime.Config = declaration
  126. // registration transferred
  127. runtime.registration_transferred = true
  128. // publish runtime to watcher
  129. s.response.Write(runtime)
  130. } else {
  131. // continue handle messages if handshake completed
  132. runtime.response.Write(message)
  133. }
  134. }