run.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125
  1. package local_manager
  2. import (
  3. "errors"
  4. "fmt"
  5. "os/exec"
  6. "sync"
  7. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/stdio_holder"
  8. "github.com/langgenius/dify-plugin-daemon/internal/process"
  9. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  10. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  11. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  12. )
  13. func (r *LocalPluginRuntime) gc() {
  14. if r.io_identity != "" {
  15. stdio_holder.Remove(r.io_identity)
  16. }
  17. if r.w != nil {
  18. close(r.w)
  19. r.w = nil
  20. }
  21. }
  22. func (r *LocalPluginRuntime) init() {
  23. r.w = make(chan bool)
  24. r.State.Status = entities.PLUGIN_RUNTIME_STATUS_LAUNCHING
  25. }
  26. func (r *LocalPluginRuntime) StartPlugin() error {
  27. defer log.Info("plugin %s stopped", r.Config.Identity())
  28. r.init()
  29. // start plugin
  30. e := exec.Command("bash", r.Config.Execution.Launch)
  31. e.Dir = r.State.RelativePath
  32. process.WrapProcess(e)
  33. // get writer
  34. stdin, err := e.StdinPipe()
  35. if err != nil {
  36. r.State.Status = entities.PLUGIN_RUNTIME_STATUS_RESTARTING
  37. return fmt.Errorf("get stdin pipe failed: %s", err.Error())
  38. }
  39. defer stdin.Close()
  40. stdout, err := e.StdoutPipe()
  41. if err != nil {
  42. r.State.Status = entities.PLUGIN_RUNTIME_STATUS_RESTARTING
  43. return fmt.Errorf("get stdout pipe failed: %s", err.Error())
  44. }
  45. defer stdout.Close()
  46. stderr, err := e.StderrPipe()
  47. if err != nil {
  48. r.State.Status = entities.PLUGIN_RUNTIME_STATUS_RESTARTING
  49. return fmt.Errorf("get stderr pipe failed: %s", err.Error())
  50. }
  51. defer stderr.Close()
  52. if err := e.Start(); err != nil {
  53. r.State.Status = entities.PLUGIN_RUNTIME_STATUS_RESTARTING
  54. return err
  55. }
  56. // add to subprocess manager
  57. process.NewProcess(e)
  58. defer process.RemoveProcess(e)
  59. defer func() {
  60. // wait for plugin to exit
  61. err = e.Wait()
  62. if err != nil {
  63. r.State.Status = entities.PLUGIN_RUNTIME_STATUS_RESTARTING
  64. log.Error("plugin %s exited with error: %s", r.Config.Identity(), err.Error())
  65. }
  66. r.gc()
  67. }()
  68. defer e.Process.Kill()
  69. log.Info("plugin %s started", r.Config.Identity())
  70. // setup stdio
  71. stdio := stdio_holder.Put(r.Config.Identity(), stdin, stdout, stderr)
  72. r.io_identity = stdio.GetID()
  73. defer stdio.Stop()
  74. wg := sync.WaitGroup{}
  75. wg.Add(2)
  76. // listen to plugin stdout
  77. routine.Submit(func() {
  78. defer wg.Done()
  79. stdio.StartStdout()
  80. })
  81. // listen to plugin stderr
  82. routine.Submit(func() {
  83. defer wg.Done()
  84. stdio.StartStderr()
  85. })
  86. // wait for plugin to exit
  87. err = stdio.Wait()
  88. if err != nil {
  89. return err
  90. }
  91. wg.Wait()
  92. // plugin has exited
  93. r.State.Status = entities.PLUGIN_RUNTIME_STATUS_PENDING
  94. return nil
  95. }
  96. func (r *LocalPluginRuntime) Wait() (<-chan bool, error) {
  97. if r.w == nil {
  98. return nil, errors.New("plugin not started")
  99. }
  100. return r.w, nil
  101. }