| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139 | package processimport (	"bytes"	"os"	"os/exec"	"os/signal"	"path"	"strconv"	"sync"	"syscall"	"github.com/langgenius/dify-plugin-daemon/internal/types/app"	"github.com/langgenius/dify-plugin-daemon/internal/utils/log")var (	l               *sync.Mutex	subprocess_path string)func subprocesses() []int {	if _, err := os.Stat(subprocess_path); err != nil {		if err == os.ErrNotExist {			os.MkdirAll(path.Dir(subprocess_path), 0755)			os.WriteFile(subprocess_path, []byte{}, 0644)		} else {			log.Error("Error checking subprocesses file")			return []int{}		}	}	data, err := os.ReadFile(subprocess_path)	if err != nil {		log.Error("Error reading subprocesses file")		return []int{}	}	nums := bytes.Split(data, []byte("\n"))	procs := make([]int, 0)	for _, num := range nums {		if len(num) == 0 {			continue		}		proc, err := strconv.Atoi(string(num))		if err != nil {			log.Error("Error parsing subprocesses file")			return []int{}		}		procs = append(procs, proc)	}	return procs}func addSubprocess(pid int) {	l.Lock()	defer l.Unlock()	procs := subprocesses()	procs = append(procs, pid)	data := []byte{}	for _, proc := range procs {		data = append(data, []byte(strconv.Itoa(proc)+"\n")...)	}	os.WriteFile(subprocess_path, data, 0644)}func removeSubprocess(pid int) {	l.Lock()	defer l.Unlock()	procs := subprocesses()	new_procs := []int{}	for _, proc := range procs {		if proc == pid {			continue		}		new_procs = append(new_procs, proc)	}	data := []byte{}	for _, proc := range new_procs {		data = append(data, []byte(strconv.Itoa(proc)+"\n")...)	}	os.WriteFile(subprocess_path, data, 0644)}func clearSubprocesses() {	os.WriteFile(subprocess_path, []byte{}, 0644)}func Init(config *app.Config) {	l = &sync.Mutex{}	subprocess_path = config.ProcessCachingPath	sig_exit := make(chan os.Signal, 1)	signal.Notify(sig_exit, os.Interrupt, syscall.SIGTERM)	sig_reload := make(chan os.Signal, 1)	signal.Notify(sig_reload, syscall.SIGUSR2)	// kill all subprocesses	TerminateAll()	go func() {		for {			select {			case <-sig_reload:				TerminateAll()			case <-sig_exit:				TerminateAll()				os.Exit(0)			}		}	}()}func WrapProcess(cmd *exec.Cmd) {	cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}}func NewProcess(cmd *exec.Cmd) {	addSubprocess(cmd.Process.Pid)}func RemoveProcess(cmd *exec.Cmd) {	removeSubprocess(cmd.Process.Pid)}func TerminateAll() {	l.Lock()	defer l.Unlock()	for _, pid := range subprocesses() {		log.Info("Killing uncleaned subprocess %d", pid)		syscall.Kill(-pid, syscall.SIGKILL)	}	clearSubprocesses()}
 |