server.go 2.0 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798
  1. package remote_manager
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "sync"
  7. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  9. "github.com/panjf2000/gnet/v2"
  10. gnet_errors "github.com/panjf2000/gnet/v2/pkg/errors"
  11. )
  12. type RemotePluginServer struct {
  13. server *DifyServer
  14. }
  15. // continue accepting new connections
  16. func (r *RemotePluginServer) Read() (*RemotePluginRuntime, error) {
  17. if r.server.response == nil {
  18. return nil, errors.New("plugin server not started")
  19. }
  20. return r.server.response.Read()
  21. }
  22. // Next returns true if there are more connections to be read
  23. func (r *RemotePluginServer) Next() bool {
  24. if r.server.response == nil {
  25. return false
  26. }
  27. return r.server.response.Next()
  28. }
  29. // Stop stops the server
  30. func (r *RemotePluginServer) Stop() error {
  31. if r.server.response == nil {
  32. return errors.New("plugin server not started")
  33. }
  34. r.server.response.Close()
  35. err := r.server.engine.Stop(context.Background())
  36. if err == gnet_errors.ErrEmptyEngine || err == gnet_errors.ErrEngineInShutdown {
  37. return nil
  38. }
  39. return err
  40. }
  41. // Launch starts the server
  42. func (r *RemotePluginServer) Launch() error {
  43. err := gnet.Run(
  44. r.server, r.server.addr, gnet.WithMulticore(r.server.multicore),
  45. gnet.WithNumEventLoop(r.server.num_loops),
  46. )
  47. if err != nil {
  48. r.Stop()
  49. }
  50. return err
  51. }
  52. // NewRemotePluginServer creates a new RemotePluginServer
  53. func NewRemotePluginServer(config *app.Config) *RemotePluginServer {
  54. addr := fmt.Sprintf(
  55. "tcp://%s:%d",
  56. config.PluginRemoteInstallingHost,
  57. config.PluginRemoteInstallingPort,
  58. )
  59. response := stream.NewStreamResponse[*RemotePluginRuntime](
  60. config.PluginRemoteInstallingMaxConn,
  61. )
  62. multicore := true
  63. s := &DifyServer{
  64. addr: addr,
  65. multicore: multicore,
  66. num_loops: config.PluginRemoteInstallServerEventLoopNums,
  67. response: response,
  68. plugins: make(map[int]*RemotePluginRuntime),
  69. plugins_lock: &sync.RWMutex{},
  70. shutdown_chan: make(chan bool),
  71. }
  72. manager := &RemotePluginServer{
  73. server: s,
  74. }
  75. return manager
  76. }