server.go 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126
  1. package remote_manager
  2. import (
  3. "bufio"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "net"
  8. "os"
  9. "os/exec"
  10. "strings"
  11. "sync"
  12. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  13. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  14. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  15. "github.com/panjf2000/gnet/v2"
  16. gnet_errors "github.com/panjf2000/gnet/v2/pkg/errors"
  17. )
  18. type RemotePluginServer struct {
  19. server *DifyServer
  20. }
  21. // continue accepting new connections
  22. func (r *RemotePluginServer) Read() (*RemotePluginRuntime, error) {
  23. if r.server.response == nil {
  24. return nil, errors.New("plugin server not started")
  25. }
  26. return r.server.response.Read()
  27. }
  28. // Next returns true if there are more connections to be read
  29. func (r *RemotePluginServer) Next() bool {
  30. if r.server.response == nil {
  31. return false
  32. }
  33. return r.server.response.Next()
  34. }
  35. // Wrap wraps the wrap method of stream response
  36. func (r *RemotePluginServer) Wrap(f func(*RemotePluginRuntime)) {
  37. r.server.response.Wrap(f)
  38. }
  39. // Stop stops the server
  40. func (r *RemotePluginServer) Stop() error {
  41. if r.server.response == nil {
  42. return errors.New("plugin server not started")
  43. }
  44. r.server.response.Close()
  45. err := r.server.engine.Stop(context.Background())
  46. if err == gnet_errors.ErrEmptyEngine || err == gnet_errors.ErrEngineInShutdown {
  47. return nil
  48. }
  49. return err
  50. }
  51. // Launch starts the server
  52. func (r *RemotePluginServer) Launch() error {
  53. // kill the process if port is already in use
  54. listener, err := net.Listen("tcp", fmt.Sprintf(":%d", r.server.port))
  55. if err != nil && strings.Contains(err.Error(), "address already in use") {
  56. scanner := bufio.NewScanner(os.Stdin)
  57. log.Info("Port is already in use, do you want to kill the process using the port? (y/n)")
  58. for scanner.Scan() {
  59. if scanner.Text() == "y" {
  60. exec.Command("fuser", "-k", "tcp", fmt.Sprintf("%d", r.server.port)).Run()
  61. } else if scanner.Text() == "n" {
  62. return errors.New("port is already in use")
  63. }
  64. }
  65. } else {
  66. listener.Close()
  67. }
  68. err = gnet.Run(
  69. r.server, r.server.addr, gnet.WithMulticore(r.server.multicore),
  70. gnet.WithNumEventLoop(r.server.num_loops),
  71. )
  72. if err != nil {
  73. r.Stop()
  74. }
  75. return err
  76. }
  77. // NewRemotePluginServer creates a new RemotePluginServer
  78. func NewRemotePluginServer(config *app.Config) *RemotePluginServer {
  79. addr := fmt.Sprintf(
  80. "tcp://%s:%d",
  81. config.PluginRemoteInstallingHost,
  82. config.PluginRemoteInstallingPort,
  83. )
  84. response := stream.NewStreamResponse[*RemotePluginRuntime](
  85. config.PluginRemoteInstallingMaxConn,
  86. )
  87. multicore := true
  88. s := &DifyServer{
  89. addr: addr,
  90. port: config.PluginRemoteInstallingPort,
  91. multicore: multicore,
  92. num_loops: config.PluginRemoteInstallServerEventLoopNums,
  93. response: response,
  94. plugins: make(map[int]*RemotePluginRuntime),
  95. plugins_lock: &sync.RWMutex{},
  96. shutdown_chan: make(chan bool),
  97. }
  98. manager := &RemotePluginServer{
  99. server: s,
  100. }
  101. return manager
  102. }