environment.go 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124
  1. package local_manager
  2. import (
  3. "fmt"
  4. "os"
  5. "os/exec"
  6. "path"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  11. "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
  12. )
  13. func (r *LocalPluginRuntime) InitEnvironment() error {
  14. if _, err := os.Stat(path.Join(r.State.RelativePath, ".installed")); err == nil {
  15. return nil
  16. }
  17. // execute init command
  18. handle := exec.Command("bash", r.Config.Execution.Install)
  19. handle.Dir = r.State.RelativePath
  20. // get stdout and stderr
  21. stdout, err := handle.StdoutPipe()
  22. if err != nil {
  23. return err
  24. }
  25. defer stdout.Close()
  26. stderr, err := handle.StderrPipe()
  27. if err != nil {
  28. return err
  29. }
  30. defer stderr.Close()
  31. // start command
  32. if err := handle.Start(); err != nil {
  33. return err
  34. }
  35. defer func() {
  36. if handle.Process != nil {
  37. handle.Process.Kill()
  38. }
  39. }()
  40. var err_msg strings.Builder
  41. var wg sync.WaitGroup
  42. wg.Add(2)
  43. last_active_at := time.Now()
  44. routine.Submit(func() {
  45. defer wg.Done()
  46. // read stdout
  47. buf := make([]byte, 1024)
  48. for {
  49. n, err := stdout.Read(buf)
  50. if err != nil {
  51. break
  52. }
  53. log.Info("installing %s - %s", r.Config.Identity(), string(buf[:n]))
  54. last_active_at = time.Now()
  55. }
  56. })
  57. routine.Submit(func() {
  58. defer wg.Done()
  59. // read stderr
  60. buf := make([]byte, 1024)
  61. for {
  62. n, err := stderr.Read(buf)
  63. if err != nil && err != os.ErrClosed {
  64. last_active_at = time.Now()
  65. err_msg.WriteString(string(buf[:n]))
  66. break
  67. } else if err == os.ErrClosed {
  68. break
  69. }
  70. if n > 0 {
  71. err_msg.WriteString(string(buf[:n]))
  72. last_active_at = time.Now()
  73. }
  74. }
  75. })
  76. routine.Submit(func() {
  77. ticker := time.NewTicker(5 * time.Second)
  78. defer ticker.Stop()
  79. for range ticker.C {
  80. if handle.ProcessState != nil && handle.ProcessState.Exited() {
  81. break
  82. }
  83. if time.Since(last_active_at) > 60*time.Second {
  84. handle.Process.Kill()
  85. err_msg.WriteString("init process exited due to long time no activity")
  86. break
  87. }
  88. }
  89. })
  90. wg.Wait()
  91. if err_msg.Len() > 0 {
  92. return fmt.Errorf("install failed: %s", err_msg.String())
  93. }
  94. if err := handle.Wait(); err != nil {
  95. return err
  96. }
  97. // create .installed file
  98. f, err := os.Create(path.Join(r.State.RelativePath, ".installed"))
  99. if err != nil {
  100. return err
  101. }
  102. defer f.Close()
  103. return nil
  104. }