server.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. package remote_manager
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "os/exec"
  7. "sync"
  8. "time"
  9. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  10. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  11. "github.com/panjf2000/gnet/v2"
  12. gnet_errors "github.com/panjf2000/gnet/v2/pkg/errors"
  13. )
  14. type RemotePluginServer struct {
  15. server *DifyServer
  16. }
  17. // continue accepting new connections
  18. func (r *RemotePluginServer) Read() (*RemotePluginRuntime, error) {
  19. if r.server.response == nil {
  20. return nil, errors.New("plugin server not started")
  21. }
  22. return r.server.response.Read()
  23. }
  24. // Next returns true if there are more connections to be read
  25. func (r *RemotePluginServer) Next() bool {
  26. if r.server.response == nil {
  27. return false
  28. }
  29. return r.server.response.Next()
  30. }
  31. // Wrap wraps the wrap method of stream response
  32. func (r *RemotePluginServer) Wrap(f func(*RemotePluginRuntime)) {
  33. r.server.response.Wrap(f)
  34. }
  35. // Stop stops the server
  36. func (r *RemotePluginServer) Stop() error {
  37. if r.server.response == nil {
  38. return errors.New("plugin server not started")
  39. }
  40. r.server.response.Close()
  41. err := r.server.engine.Stop(context.Background())
  42. if err == gnet_errors.ErrEmptyEngine || err == gnet_errors.ErrEngineInShutdown {
  43. return nil
  44. }
  45. return err
  46. }
  47. // Launch starts the server
  48. func (r *RemotePluginServer) Launch() error {
  49. // kill the process if port is already in use
  50. // TODO: switch to optional
  51. exec.Command("fuser", "-k", "tcp", fmt.Sprintf("%d", r.server.port)).Run()
  52. time.Sleep(time.Millisecond * 100)
  53. err := gnet.Run(
  54. r.server, r.server.addr, gnet.WithMulticore(r.server.multicore),
  55. gnet.WithNumEventLoop(r.server.num_loops),
  56. )
  57. if err != nil {
  58. r.Stop()
  59. }
  60. return err
  61. }
  62. // NewRemotePluginServer creates a new RemotePluginServer
  63. func NewRemotePluginServer(config *app.Config) *RemotePluginServer {
  64. addr := fmt.Sprintf(
  65. "tcp://%s:%d",
  66. config.PluginRemoteInstallingHost,
  67. config.PluginRemoteInstallingPort,
  68. )
  69. response := stream.NewStreamResponse[*RemotePluginRuntime](
  70. config.PluginRemoteInstallingMaxConn,
  71. )
  72. multicore := true
  73. s := &DifyServer{
  74. addr: addr,
  75. port: config.PluginRemoteInstallingPort,
  76. multicore: multicore,
  77. num_loops: config.PluginRemoteInstallServerEventLoopNums,
  78. response: response,
  79. plugins: make(map[int]*RemotePluginRuntime),
  80. plugins_lock: &sync.RWMutex{},
  81. shutdown_chan: make(chan bool),
  82. }
  83. manager := &RemotePluginServer{
  84. server: s,
  85. }
  86. return manager
  87. }