run.go 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. package local_manager
  2. import (
  3. "fmt"
  4. "os/exec"
  5. "sync"
  6. "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/stdio_holder"
  7. "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
  8. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  9. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  10. )
  11. func (r *LocalPluginRuntime) StartPlugin() error {
  12. r.State.Status = entities.PLUGIN_RUNTIME_STATUS_LAUNCHING
  13. defer func() {
  14. r.io_identity = ""
  15. }()
  16. defer log.Info("plugin %s stopped", r.Config.Identity())
  17. // start plugin
  18. e := exec.Command("bash", "launch.sh")
  19. e.Dir = r.State.RelativePath
  20. // get writer
  21. stdin, err := e.StdinPipe()
  22. if err != nil {
  23. r.State.Status = entities.PLUGIN_RUNTIME_STATUS_RESTARTING
  24. e.Process.Kill()
  25. return fmt.Errorf("get stdin pipe failed: %s", err.Error())
  26. }
  27. stdout, err := e.StdoutPipe()
  28. if err != nil {
  29. r.State.Status = entities.PLUGIN_RUNTIME_STATUS_RESTARTING
  30. e.Process.Kill()
  31. return fmt.Errorf("get stdout pipe failed: %s", err.Error())
  32. }
  33. stderr, err := e.StderrPipe()
  34. if err != nil {
  35. r.State.Status = entities.PLUGIN_RUNTIME_STATUS_RESTARTING
  36. e.Process.Kill()
  37. return fmt.Errorf("get stderr pipe failed: %s", err.Error())
  38. }
  39. if err := e.Start(); err != nil {
  40. r.State.Status = entities.PLUGIN_RUNTIME_STATUS_RESTARTING
  41. return err
  42. }
  43. defer func() {
  44. // wait for plugin to exit
  45. err = e.Wait()
  46. if err != nil {
  47. r.State.Status = entities.PLUGIN_RUNTIME_STATUS_RESTARTING
  48. log.Error("plugin %s exited with error: %s", r.Config.Identity(), err.Error())
  49. }
  50. }()
  51. log.Info("plugin %s started", r.Config.Identity())
  52. stdio := stdio_holder.Put(r.Config.Identity(), stdin, stdout, stderr)
  53. // set io identity
  54. r.io_identity = stdio.GetID()
  55. wg := sync.WaitGroup{}
  56. wg.Add(1)
  57. // listen to plugin stdout
  58. routine.Submit(func() {
  59. defer wg.Done()
  60. stdio.StartStdout()
  61. })
  62. err = stdio.StartStderr()
  63. if err != nil {
  64. r.State.Status = entities.PLUGIN_RUNTIME_STATUS_RESTARTING
  65. e.Process.Kill()
  66. return err
  67. }
  68. wg.Wait()
  69. // plugin has exited
  70. r.State.Status = entities.PLUGIN_RUNTIME_STATUS_PENDING
  71. return nil
  72. }