manager.go 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. package process
  2. import (
  3. "bytes"
  4. "os"
  5. "os/exec"
  6. "os/signal"
  7. "path"
  8. "strconv"
  9. "strings"
  10. "sync"
  11. "syscall"
  12. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  13. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  14. )
  15. var (
  16. l *sync.Mutex
  17. subprocess_path string
  18. )
  19. func subprocesses() []int {
  20. if _, err := os.Stat(subprocess_path); err != nil {
  21. if err == os.ErrNotExist {
  22. os.MkdirAll(path.Dir(subprocess_path), 0755)
  23. os.WriteFile(subprocess_path, []byte{}, 0644)
  24. } else {
  25. log.Error("Error checking subprocesses file")
  26. return []int{}
  27. }
  28. }
  29. data, err := os.ReadFile(subprocess_path)
  30. if err != nil {
  31. if !strings.Contains(err.Error(), "no such file or directory") {
  32. log.Error("Error reading subprocesses file")
  33. }
  34. return []int{}
  35. }
  36. nums := bytes.Split(data, []byte("\n"))
  37. procs := make([]int, 0)
  38. for _, num := range nums {
  39. if len(num) == 0 {
  40. continue
  41. }
  42. proc, err := strconv.Atoi(string(num))
  43. if err != nil {
  44. log.Error("Error parsing subprocesses file")
  45. return []int{}
  46. }
  47. procs = append(procs, proc)
  48. }
  49. return procs
  50. }
  51. func addSubprocess(pid int) {
  52. l.Lock()
  53. defer l.Unlock()
  54. procs := subprocesses()
  55. procs = append(procs, pid)
  56. data := []byte{}
  57. for _, proc := range procs {
  58. data = append(data, []byte(strconv.Itoa(proc)+"\n")...)
  59. }
  60. os.WriteFile(subprocess_path, data, 0644)
  61. }
  62. func removeSubprocess(pid int) {
  63. l.Lock()
  64. defer l.Unlock()
  65. procs := subprocesses()
  66. new_procs := []int{}
  67. for _, proc := range procs {
  68. if proc == pid {
  69. continue
  70. }
  71. new_procs = append(new_procs, proc)
  72. }
  73. data := []byte{}
  74. for _, proc := range new_procs {
  75. data = append(data, []byte(strconv.Itoa(proc)+"\n")...)
  76. }
  77. os.WriteFile(subprocess_path, data, 0644)
  78. }
  79. func clearSubprocesses() {
  80. os.WriteFile(subprocess_path, []byte{}, 0644)
  81. }
  82. func Init(config *app.Config) {
  83. l = &sync.Mutex{}
  84. subprocess_path = config.ProcessCachingPath
  85. sig_exit := make(chan os.Signal, 1)
  86. signal.Notify(sig_exit, os.Interrupt, syscall.SIGTERM)
  87. sig_reload := make(chan os.Signal, 1)
  88. signal.Notify(sig_reload, syscall.SIGUSR2)
  89. // kill all subprocesses
  90. TerminateAll()
  91. go func() {
  92. for {
  93. select {
  94. case <-sig_reload:
  95. TerminateAll()
  96. case <-sig_exit:
  97. TerminateAll()
  98. os.Exit(0)
  99. }
  100. }
  101. }()
  102. }
  103. func WrapProcess(cmd *exec.Cmd) {
  104. cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
  105. }
  106. func NewProcess(cmd *exec.Cmd) {
  107. addSubprocess(cmd.Process.Pid)
  108. }
  109. func RemoveProcess(cmd *exec.Cmd) {
  110. removeSubprocess(cmd.Process.Pid)
  111. }
  112. func TerminateAll() {
  113. l.Lock()
  114. defer l.Unlock()
  115. for _, pid := range subprocesses() {
  116. log.Info("Killing uncleaned subprocess %d", pid)
  117. syscall.Kill(-pid, syscall.SIGKILL)
  118. }
  119. clearSubprocesses()
  120. }