hooks.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162
  1. package remote_manager
  2. import (
  3. "sync"
  4. "time"
  5. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  6. "github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
  7. "github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  9. "github.com/panjf2000/gnet/v2"
  10. )
  11. type DifyServer struct {
  12. gnet.BuiltinEventEngine
  13. engine gnet.Engine
  14. // listening address
  15. addr string
  16. // enabled multicore
  17. multicore bool
  18. // event loop count
  19. num_loops int
  20. // read new connections
  21. response *stream.StreamResponse[*RemotePluginRuntime]
  22. plugins map[int]*RemotePluginRuntime
  23. plugins_lock *sync.RWMutex
  24. shutdown_chan chan bool
  25. }
  26. func (s *DifyServer) OnBoot(c gnet.Engine) (action gnet.Action) {
  27. s.engine = c
  28. return gnet.None
  29. }
  30. func (s *DifyServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
  31. // new plugin connected
  32. c.SetContext(&codec{})
  33. runtime := &RemotePluginRuntime{
  34. conn: c,
  35. response: stream.NewStreamResponse[[]byte](512),
  36. callbacks: make(map[string][]func([]byte)),
  37. callbacks_lock: &sync.RWMutex{},
  38. shutdown_chan: make(chan bool),
  39. alive: true,
  40. }
  41. // store plugin runtime
  42. s.plugins_lock.Lock()
  43. s.plugins[c.Fd()] = runtime
  44. s.plugins_lock.Unlock()
  45. // start a timer to check if handshake is completed in 10 seconds
  46. time.AfterFunc(time.Second*10, func() {
  47. if !runtime.handshake {
  48. // close connection
  49. c.Close()
  50. }
  51. })
  52. // verified
  53. verified := true
  54. if verified {
  55. return nil, gnet.None
  56. }
  57. return nil, gnet.Close
  58. }
  59. func (s *DifyServer) OnClose(c gnet.Conn, err error) (action gnet.Action) {
  60. // plugin disconnected
  61. s.plugins_lock.Lock()
  62. plugin := s.plugins[c.Fd()]
  63. delete(s.plugins, c.Fd())
  64. s.plugins_lock.Unlock()
  65. // close plugin
  66. plugin.close()
  67. return gnet.None
  68. }
  69. func (s *DifyServer) OnShutdown(c gnet.Engine) {
  70. close(s.shutdown_chan)
  71. }
  72. func (s *DifyServer) OnTraffic(c gnet.Conn) (action gnet.Action) {
  73. codec := c.Context().(*codec)
  74. messages, err := codec.Decode(c)
  75. if err != nil {
  76. return gnet.Close
  77. }
  78. // get plugin runtime
  79. s.plugins_lock.RLock()
  80. runtime, ok := s.plugins[c.Fd()]
  81. s.plugins_lock.RUnlock()
  82. if !ok {
  83. return gnet.Close
  84. }
  85. // handle messages
  86. for _, message := range messages {
  87. s.onMessage(runtime, message)
  88. }
  89. return gnet.None
  90. }
  91. func (s *DifyServer) onMessage(runtime *RemotePluginRuntime, message []byte) {
  92. // handle message
  93. if !runtime.handshake {
  94. key := string(message)
  95. info, err := GetConnectionInfo(key)
  96. if err == cache.ErrNotFound {
  97. // close connection if handshake failed
  98. runtime.conn.Write([]byte("handshake failed\n"))
  99. runtime.conn.Close()
  100. return
  101. } else if err != nil {
  102. // close connection if handshake failed
  103. runtime.conn.Write([]byte("internal error\n"))
  104. runtime.conn.Close()
  105. return
  106. }
  107. runtime.State.TenantID = info.TenantId
  108. // handshake completed
  109. runtime.handshake = true
  110. } else if !runtime.registration_transferred {
  111. // process handle shake if not completed
  112. declaration, err := parser.UnmarshalJsonBytes[plugin_entities.PluginDeclaration](message)
  113. if err != nil {
  114. // close connection if handshake failed
  115. runtime.conn.Write([]byte("handshake failed\n"))
  116. runtime.conn.Close()
  117. return
  118. }
  119. runtime.Config = declaration
  120. // registration transferred
  121. runtime.registration_transferred = true
  122. // publish runtime to watcher
  123. s.response.Write(runtime)
  124. } else {
  125. // continue handle messages if handshake completed
  126. runtime.response.Write(message)
  127. }
  128. }