run.go 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135
  1. package local_manager
  2. import (
  3. "errors"
  4. "fmt"
  5. "os"
  6. "os/exec"
  7. "sync"
  8. "github.com/langgenius/dify-plugin-daemon/internal/process"
  9. "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
  10. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  11. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  12. )
  13. func (r *LocalPluginRuntime) gc() {
  14. if r.io_identity != "" {
  15. RemoveStdio(r.io_identity)
  16. }
  17. if r.wait_chan != nil {
  18. close(r.wait_chan)
  19. r.wait_chan = nil
  20. }
  21. }
  22. func (r *LocalPluginRuntime) init() {
  23. r.wait_chan = make(chan bool)
  24. r.SetLaunching()
  25. }
  26. func (r *LocalPluginRuntime) Type() plugin_entities.PluginRuntimeType {
  27. return plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL
  28. }
  29. func (r *LocalPluginRuntime) StartPlugin() error {
  30. defer log.Info("plugin %s stopped", r.Config.Identity())
  31. r.init()
  32. // start plugin
  33. // TODO: use exec.Command("bash") instead of exec.Command("bash", r.Config.Execution.Launch)
  34. e := exec.Command("bash")
  35. e.Dir = r.State.AbsolutePath
  36. // add env INSTALL_METHOD=local
  37. e.Env = append(e.Env, "INSTALL_METHOD=local", "PATH="+os.Getenv("PATH"))
  38. // NOTE: subprocess will be taken care of by subprocess manager
  39. // ensure all subprocess are killed when parent process exits, especially on Golang debugger
  40. process.WrapProcess(e)
  41. // get writer
  42. stdin, err := e.StdinPipe()
  43. if err != nil {
  44. r.SetRestarting()
  45. return fmt.Errorf("get stdin pipe failed: %s", err.Error())
  46. }
  47. defer stdin.Close()
  48. stdout, err := e.StdoutPipe()
  49. if err != nil {
  50. r.SetRestarting()
  51. return fmt.Errorf("get stdout pipe failed: %s", err.Error())
  52. }
  53. defer stdout.Close()
  54. stderr, err := e.StderrPipe()
  55. if err != nil {
  56. r.SetRestarting()
  57. return fmt.Errorf("get stderr pipe failed: %s", err.Error())
  58. }
  59. defer stderr.Close()
  60. if err := e.Start(); err != nil {
  61. r.SetRestarting()
  62. return err
  63. }
  64. // add to subprocess manager
  65. process.NewProcess(e)
  66. defer process.RemoveProcess(e)
  67. defer func() {
  68. // wait for plugin to exit
  69. err = e.Wait()
  70. if err != nil {
  71. r.SetRestarting()
  72. log.Error("plugin %s exited with error: %s", r.Config.Identity(), err.Error())
  73. }
  74. r.gc()
  75. }()
  76. defer e.Process.Kill()
  77. log.Info("plugin %s started", r.Config.Identity())
  78. // setup stdio
  79. stdio := PutStdioIo(r.Config.Identity(), stdin, stdout, stderr)
  80. r.io_identity = stdio.GetID()
  81. defer stdio.Stop()
  82. wg := sync.WaitGroup{}
  83. wg.Add(2)
  84. // listen to plugin stdout
  85. routine.Submit(func() {
  86. defer wg.Done()
  87. stdio.StartStdout()
  88. })
  89. // listen to plugin stderr
  90. routine.Submit(func() {
  91. defer wg.Done()
  92. stdio.StartStderr()
  93. })
  94. // wait for plugin to exit
  95. err = stdio.Wait()
  96. if err != nil {
  97. return err
  98. }
  99. wg.Wait()
  100. // plugin has exited
  101. r.SetPending()
  102. return nil
  103. }
  104. func (r *LocalPluginRuntime) Wait() (<-chan bool, error) {
  105. if r.wait_chan == nil {
  106. return nil, errors.New("plugin not started")
  107. }
  108. return r.wait_chan, nil
  109. }