manager.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. package process
  2. import (
  3. "bytes"
  4. "os"
  5. "os/exec"
  6. "os/signal"
  7. "path"
  8. "strconv"
  9. "sync"
  10. "syscall"
  11. "github.com/langgenius/dify-plugin-daemon/internal/types/app"
  12. "github.com/langgenius/dify-plugin-daemon/internal/utils/log"
  13. )
  14. var (
  15. l *sync.Mutex
  16. subprocess_path string
  17. )
  18. func subprocesses() []int {
  19. if _, err := os.Stat(subprocess_path); err != nil {
  20. if err == os.ErrNotExist {
  21. os.MkdirAll(path.Dir(subprocess_path), 0755)
  22. os.WriteFile(subprocess_path, []byte{}, 0644)
  23. } else {
  24. log.Error("Error checking subprocesses file")
  25. return []int{}
  26. }
  27. }
  28. data, err := os.ReadFile(subprocess_path)
  29. if err != nil {
  30. log.Error("Error reading subprocesses file")
  31. return []int{}
  32. }
  33. nums := bytes.Split(data, []byte("\n"))
  34. procs := make([]int, 0)
  35. for _, num := range nums {
  36. if len(num) == 0 {
  37. continue
  38. }
  39. proc, err := strconv.Atoi(string(num))
  40. if err != nil {
  41. log.Error("Error parsing subprocesses file")
  42. return []int{}
  43. }
  44. procs = append(procs, proc)
  45. }
  46. return procs
  47. }
  48. func addSubprocess(pid int) {
  49. l.Lock()
  50. defer l.Unlock()
  51. procs := subprocesses()
  52. procs = append(procs, pid)
  53. data := []byte{}
  54. for _, proc := range procs {
  55. data = append(data, []byte(strconv.Itoa(proc)+"\n")...)
  56. }
  57. os.WriteFile(subprocess_path, data, 0644)
  58. }
  59. func removeSubprocess(pid int) {
  60. l.Lock()
  61. defer l.Unlock()
  62. procs := subprocesses()
  63. new_procs := []int{}
  64. for _, proc := range procs {
  65. if proc == pid {
  66. continue
  67. }
  68. new_procs = append(new_procs, proc)
  69. }
  70. data := []byte{}
  71. for _, proc := range new_procs {
  72. data = append(data, []byte(strconv.Itoa(proc)+"\n")...)
  73. }
  74. os.WriteFile(subprocess_path, data, 0644)
  75. }
  76. func clearSubprocesses() {
  77. os.WriteFile(subprocess_path, []byte{}, 0644)
  78. }
  79. func Init(config *app.Config) {
  80. l = &sync.Mutex{}
  81. subprocess_path = config.ProcessCachingPath
  82. sig_exit := make(chan os.Signal, 1)
  83. signal.Notify(sig_exit, os.Interrupt, syscall.SIGTERM)
  84. sig_reload := make(chan os.Signal, 1)
  85. signal.Notify(sig_reload, syscall.SIGUSR2)
  86. // kill all subprocesses
  87. TerminateAll()
  88. go func() {
  89. for {
  90. select {
  91. case <-sig_reload:
  92. TerminateAll()
  93. case <-sig_exit:
  94. TerminateAll()
  95. os.Exit(0)
  96. }
  97. }
  98. }()
  99. }
  100. func WrapProcess(cmd *exec.Cmd) {
  101. cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
  102. }
  103. func NewProcess(cmd *exec.Cmd) {
  104. addSubprocess(cmd.Process.Pid)
  105. }
  106. func RemoveProcess(cmd *exec.Cmd) {
  107. removeSubprocess(cmd.Process.Pid)
  108. }
  109. func TerminateAll() {
  110. l.Lock()
  111. defer l.Unlock()
  112. for _, pid := range subprocesses() {
  113. log.Info("Killing uncleaned subprocess %d", pid)
  114. syscall.Kill(-pid, syscall.SIGKILL)
  115. }
  116. clearSubprocesses()
  117. }