server.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. package remote_manager
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "os/exec"
  7. "sync"
  8. "time"
  9. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/media_manager"
  10. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  11. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  12. "github.com/panjf2000/gnet/v2"
  13. gnet_errors "github.com/panjf2000/gnet/v2/pkg/errors"
  14. )
  15. type RemotePluginServer struct {
  16. server *DifyServer
  17. }
  18. // continue accepting new connections
  19. func (r *RemotePluginServer) Read() (*RemotePluginRuntime, error) {
  20. if r.server.response == nil {
  21. return nil, errors.New("plugin server not started")
  22. }
  23. return r.server.response.Read()
  24. }
  25. // Next returns true if there are more connections to be read
  26. func (r *RemotePluginServer) Next() bool {
  27. if r.server.response == nil {
  28. return false
  29. }
  30. return r.server.response.Next()
  31. }
  32. // Wrap wraps the wrap method of stream response
  33. func (r *RemotePluginServer) Wrap(f func(*RemotePluginRuntime)) {
  34. r.server.response.Async(f)
  35. }
  36. // Stop stops the server
  37. func (r *RemotePluginServer) Stop() error {
  38. if r.server.response == nil {
  39. return errors.New("plugin server not started")
  40. }
  41. r.server.response.Close()
  42. err := r.server.engine.Stop(context.Background())
  43. if err == gnet_errors.ErrEmptyEngine || err == gnet_errors.ErrEngineInShutdown {
  44. return nil
  45. }
  46. return err
  47. }
  48. // Launch starts the server
  49. func (r *RemotePluginServer) Launch() error {
  50. // kill the process if port is already in use
  51. exec.Command("fuser", "-k", "tcp", fmt.Sprintf("%d", r.server.port)).Run()
  52. time.Sleep(time.Millisecond * 100)
  53. err := gnet.Run(
  54. r.server, r.server.addr, gnet.WithMulticore(r.server.multicore),
  55. gnet.WithNumEventLoop(r.server.num_loops),
  56. )
  57. if err != nil {
  58. r.Stop()
  59. }
  60. return err
  61. }
  62. // NewRemotePluginServer creates a new RemotePluginServer
  63. func NewRemotePluginServer(config *app.Config, media_manager *media_manager.MediaManager) *RemotePluginServer {
  64. addr := fmt.Sprintf(
  65. "tcp://%s:%d",
  66. config.PluginRemoteInstallingHost,
  67. config.PluginRemoteInstallingPort,
  68. )
  69. response := stream.NewStream[*RemotePluginRuntime](
  70. config.PluginRemoteInstallingMaxConn,
  71. )
  72. multicore := true
  73. s := &DifyServer{
  74. mediaManager: media_manager,
  75. addr: addr,
  76. port: config.PluginRemoteInstallingPort,
  77. multicore: multicore,
  78. num_loops: config.PluginRemoteInstallServerEventLoopNums,
  79. response: response,
  80. plugins: make(map[int]*RemotePluginRuntime),
  81. plugins_lock: &sync.RWMutex{},
  82. shutdown_chan: make(chan bool),
  83. max_conn: int32(config.PluginRemoteInstallingMaxConn),
  84. }
  85. manager := &RemotePluginServer{
  86. server: s,
  87. }
  88. return manager
  89. }