run.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. package local_manager
  2. import (
  3. "errors"
  4. "fmt"
  5. "os/exec"
  6. "sync"
  7. "github.com/langgenius/dify-plugin-daemon/internal/process"
  8. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  10. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  11. )
  12. func (r *LocalPluginRuntime) gc() {
  13. if r.io_identity != "" {
  14. RemoveStdio(r.io_identity)
  15. }
  16. if r.w != nil {
  17. close(r.w)
  18. r.w = nil
  19. }
  20. }
  21. func (r *LocalPluginRuntime) init() {
  22. r.w = make(chan bool)
  23. r.State.Status = entities.PLUGIN_RUNTIME_STATUS_LAUNCHING
  24. }
  25. func (r *LocalPluginRuntime) StartPlugin() error {
  26. defer log.Info("plugin %s stopped", r.Config.Identity())
  27. r.init()
  28. // start plugin
  29. e := exec.Command("bash", r.Config.Execution.Launch)
  30. e.Dir = r.State.RelativePath
  31. process.WrapProcess(e)
  32. // get writer
  33. stdin, err := e.StdinPipe()
  34. if err != nil {
  35. r.State.Status = entities.PLUGIN_RUNTIME_STATUS_RESTARTING
  36. return fmt.Errorf("get stdin pipe failed: %s", err.Error())
  37. }
  38. defer stdin.Close()
  39. stdout, err := e.StdoutPipe()
  40. if err != nil {
  41. r.State.Status = entities.PLUGIN_RUNTIME_STATUS_RESTARTING
  42. return fmt.Errorf("get stdout pipe failed: %s", err.Error())
  43. }
  44. defer stdout.Close()
  45. stderr, err := e.StderrPipe()
  46. if err != nil {
  47. r.State.Status = entities.PLUGIN_RUNTIME_STATUS_RESTARTING
  48. return fmt.Errorf("get stderr pipe failed: %s", err.Error())
  49. }
  50. defer stderr.Close()
  51. if err := e.Start(); err != nil {
  52. r.State.Status = entities.PLUGIN_RUNTIME_STATUS_RESTARTING
  53. return err
  54. }
  55. // add to subprocess manager
  56. process.NewProcess(e)
  57. defer process.RemoveProcess(e)
  58. defer func() {
  59. // wait for plugin to exit
  60. err = e.Wait()
  61. if err != nil {
  62. r.State.Status = entities.PLUGIN_RUNTIME_STATUS_RESTARTING
  63. log.Error("plugin %s exited with error: %s", r.Config.Identity(), err.Error())
  64. }
  65. r.gc()
  66. }()
  67. defer e.Process.Kill()
  68. log.Info("plugin %s started", r.Config.Identity())
  69. // setup stdio
  70. stdio := PutStdioIo(r.Config.Identity(), stdin, stdout, stderr)
  71. r.io_identity = stdio.GetID()
  72. defer stdio.Stop()
  73. wg := sync.WaitGroup{}
  74. wg.Add(2)
  75. // listen to plugin stdout
  76. routine.Submit(func() {
  77. defer wg.Done()
  78. stdio.StartStdout()
  79. })
  80. // listen to plugin stderr
  81. routine.Submit(func() {
  82. defer wg.Done()
  83. stdio.StartStderr()
  84. })
  85. // wait for plugin to exit
  86. err = stdio.Wait()
  87. if err != nil {
  88. return err
  89. }
  90. wg.Wait()
  91. // plugin has exited
  92. r.State.Status = entities.PLUGIN_RUNTIME_STATUS_PENDING
  93. return nil
  94. }
  95. func (r *LocalPluginRuntime) Wait() (<-chan bool, error) {
  96. if r.w == nil {
  97. return nil, errors.New("plugin not started")
  98. }
  99. return r.w, nil
  100. }