server.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121
  1. package remote_manager
  2. import (
  3. "bufio"
  4. "context"
  5. "errors"
  6. "fmt"
  7. "net"
  8. "os"
  9. "os/exec"
  10. "strings"
  11. "sync"
  12. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  13. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  14. "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
  15. "github.com/panjf2000/gnet/v2"
  16. gnet_errors "github.com/panjf2000/gnet/v2/pkg/errors"
  17. )
  18. type RemotePluginServer struct {
  19. server *DifyServer
  20. }
  21. // continue accepting new connections
  22. func (r *RemotePluginServer) Read() (*RemotePluginRuntime, error) {
  23. if r.server.response == nil {
  24. return nil, errors.New("plugin server not started")
  25. }
  26. return r.server.response.Read()
  27. }
  28. // Next returns true if there are more connections to be read
  29. func (r *RemotePluginServer) Next() bool {
  30. if r.server.response == nil {
  31. return false
  32. }
  33. return r.server.response.Next()
  34. }
  35. // Stop stops the server
  36. func (r *RemotePluginServer) Stop() error {
  37. if r.server.response == nil {
  38. return errors.New("plugin server not started")
  39. }
  40. r.server.response.Close()
  41. err := r.server.engine.Stop(context.Background())
  42. if err == gnet_errors.ErrEmptyEngine || err == gnet_errors.ErrEngineInShutdown {
  43. return nil
  44. }
  45. return err
  46. }
  47. // Launch starts the server
  48. func (r *RemotePluginServer) Launch() error {
  49. // kill the process if port is already in use
  50. listener, err := net.Listen("tcp", fmt.Sprintf(":%d", r.server.port))
  51. if err != nil && strings.Contains(err.Error(), "address already in use") {
  52. scanner := bufio.NewScanner(os.Stdin)
  53. log.Info("Port is already in use, do you want to kill the process using the port? (y/n)")
  54. for scanner.Scan() {
  55. if scanner.Text() == "y" {
  56. exec.Command("fuser", "-k", "tcp", fmt.Sprintf("%d", r.server.port)).Run()
  57. } else if scanner.Text() == "n" {
  58. return errors.New("port is already in use")
  59. }
  60. }
  61. } else {
  62. listener.Close()
  63. }
  64. err = gnet.Run(
  65. r.server, r.server.addr, gnet.WithMulticore(r.server.multicore),
  66. gnet.WithNumEventLoop(r.server.num_loops),
  67. )
  68. if err != nil {
  69. r.Stop()
  70. }
  71. return err
  72. }
  73. // NewRemotePluginServer creates a new RemotePluginServer
  74. func NewRemotePluginServer(config *app.Config) *RemotePluginServer {
  75. addr := fmt.Sprintf(
  76. "tcp://%s:%d",
  77. config.PluginRemoteInstallingHost,
  78. config.PluginRemoteInstallingPort,
  79. )
  80. response := stream.NewStreamResponse[*RemotePluginRuntime](
  81. config.PluginRemoteInstallingMaxConn,
  82. )
  83. multicore := true
  84. s := &DifyServer{
  85. addr: addr,
  86. port: config.PluginRemoteInstallingPort,
  87. multicore: multicore,
  88. num_loops: config.PluginRemoteInstallServerEventLoopNums,
  89. response: response,
  90. plugins: make(map[int]*RemotePluginRuntime),
  91. plugins_lock: &sync.RWMutex{},
  92. shutdown_chan: make(chan bool),
  93. }
  94. manager := &RemotePluginServer{
  95. server: s,
  96. }
  97. return manager
  98. }