environment.go 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. package local_manager
  2. import (
  3. "fmt"
  4. "os"
  5. "os/exec"
  6. "path"
  7. "strings"
  8. "sync"
  9. "time"
  10. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  11. )
  12. func (r *LocalPluginRuntime) InitEnvironment() error {
  13. if _, err := os.Stat(path.Join(r.State.RelativePath, ".installed")); err == nil {
  14. return nil
  15. }
  16. // execute init command
  17. handle := exec.Command("bash", "install.sh")
  18. handle.Dir = r.State.RelativePath
  19. // get stdout and stderr
  20. stdout, err := handle.StdoutPipe()
  21. if err != nil {
  22. return err
  23. }
  24. defer stdout.Close()
  25. stderr, err := handle.StderrPipe()
  26. if err != nil {
  27. return err
  28. }
  29. defer stderr.Close()
  30. // start command
  31. if err := handle.Start(); err != nil {
  32. return err
  33. }
  34. defer func() {
  35. if handle.Process != nil {
  36. handle.Process.Kill()
  37. }
  38. }()
  39. var err_msg strings.Builder
  40. var wg sync.WaitGroup
  41. wg.Add(2)
  42. last_active_at := time.Now()
  43. go func() {
  44. defer wg.Done()
  45. // read stdout
  46. buf := make([]byte, 1024)
  47. for {
  48. n, err := stdout.Read(buf)
  49. if err != nil {
  50. break
  51. }
  52. log.Info("installing %s:%s - %s", r.Config.Name, r.Config.Name, string(buf[:n]))
  53. last_active_at = time.Now()
  54. }
  55. }()
  56. go func() {
  57. defer wg.Done()
  58. // read stderr
  59. buf := make([]byte, 1024)
  60. for {
  61. n, err := stderr.Read(buf)
  62. if err != nil && err != os.ErrClosed {
  63. last_active_at = time.Now()
  64. err_msg.WriteString(string(buf[:n]))
  65. break
  66. } else if err == os.ErrClosed {
  67. break
  68. }
  69. if n > 0 {
  70. err_msg.WriteString(string(buf[:n]))
  71. last_active_at = time.Now()
  72. }
  73. }
  74. }()
  75. go func() {
  76. ticker := time.NewTicker(5 * time.Second)
  77. defer ticker.Stop()
  78. for range ticker.C {
  79. if handle.ProcessState != nil && handle.ProcessState.Exited() {
  80. break
  81. }
  82. if time.Since(last_active_at) > 60*time.Second {
  83. handle.Process.Kill()
  84. err_msg.WriteString("init process exited due to long time no activity")
  85. break
  86. }
  87. }
  88. }()
  89. wg.Wait()
  90. if err_msg.Len() > 0 {
  91. return fmt.Errorf("install failed: %s", err_msg.String())
  92. }
  93. if err := handle.Wait(); err != nil {
  94. return err
  95. }
  96. // create .installed file
  97. f, err := os.Create(path.Join(r.State.RelativePath, ".installed"))
  98. if err != nil {
  99. return err
  100. }
  101. defer f.Close()
  102. return nil
  103. }