Yeuoly 1 год назад
Родитель
Сommit
d35cf3cf4e
4 измененных файлов с 113 добавлено и 22 удалено
  1. 8 0
      cmd/server/main.go
  2. 102 20
      internal/process/manager.go
  3. 1 1
      internal/server/server.go
  4. 2 1
      internal/types/app/config.go

+ 8 - 0
cmd/server/main.go

@@ -34,6 +34,8 @@ func setDefault(config *app.Config) {
 	setDefaultInt(&config.LifetimeCollectionHeartbeatInterval, 5)
 	setDefaultInt(&config.LifetimeStateGCInterval, 300)
 	setDefaultInt(&config.DifyInvocationConnectionIdleTimeout, 120)
+
+	setDebugString(&config.ProcessCachingPath, "/tmp/dify-plugin-daemon-subprocesses")
 }
 
 func setDefaultInt[T constraints.Integer](value *T, defaultValue T) {
@@ -41,3 +43,9 @@ func setDefaultInt[T constraints.Integer](value *T, defaultValue T) {
 		*value = defaultValue
 	}
 }
+
+func setDebugString(value *string, defaultValue string) {
+	if *value == "" {
+		*value = defaultValue
+	}
+}

+ 102 - 20
internal/process/manager.go

@@ -1,29 +1,115 @@
 package process
 
 import (
+	"bytes"
 	"os"
 	"os/exec"
 	"os/signal"
+	"path"
+	"strconv"
 	"sync"
 	"syscall"
+
+	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 )
 
 var (
-	subprocesses map[int]*exec.Cmd
-	l            *sync.Mutex
+	l               *sync.Mutex
+	subprocess_path string
 )
 
-func Init() {
-	subprocesses = make(map[int]*exec.Cmd)
+func subprocesses() []int {
+	if _, err := os.Stat(subprocess_path); err != nil {
+		if err == os.ErrNotExist {
+			os.MkdirAll(path.Dir(subprocess_path), 0755)
+			os.WriteFile(subprocess_path, []byte{}, 0644)
+		} else {
+			log.Error("Error checking subprocesses file")
+			return []int{}
+		}
+	}
+
+	data, err := os.ReadFile(subprocess_path)
+	if err != nil {
+		log.Error("Error reading subprocesses file")
+		return []int{}
+	}
+	nums := bytes.Split(data, []byte("\n"))
+	procs := make([]int, 0)
+	for _, num := range nums {
+		if len(num) == 0 {
+			continue
+		}
+		proc, err := strconv.Atoi(string(num))
+		if err != nil {
+			log.Error("Error parsing subprocesses file")
+			return []int{}
+		}
+		procs = append(procs, proc)
+	}
+
+	return procs
+}
+
+func addSubprocess(pid int) {
+	l.Lock()
+	defer l.Unlock()
+
+	procs := subprocesses()
+	procs = append(procs, pid)
+	data := []byte{}
+	for _, proc := range procs {
+		data = append(data, []byte(strconv.Itoa(proc)+"\n")...)
+	}
+	os.WriteFile(subprocess_path, data, 0644)
+}
+
+func removeSubprocess(pid int) {
+	l.Lock()
+	defer l.Unlock()
+
+	procs := subprocesses()
+	new_procs := []int{}
+	for _, proc := range procs {
+		if proc == pid {
+			continue
+		}
+		new_procs = append(new_procs, proc)
+	}
+	data := []byte{}
+	for _, proc := range new_procs {
+		data = append(data, []byte(strconv.Itoa(proc)+"\n")...)
+	}
+	os.WriteFile(subprocess_path, data, 0644)
+}
+
+func clearSubprocesses() {
+	os.WriteFile(subprocess_path, []byte{}, 0644)
+}
+
+func Init(config *app.Config) {
 	l = &sync.Mutex{}
+	subprocess_path = config.ProcessCachingPath
 
-	sig := make(chan os.Signal, 1)
-	signal.Notify(sig, os.Interrupt, syscall.SIGTERM)
+	sig_exit := make(chan os.Signal, 1)
+	signal.Notify(sig_exit, os.Interrupt, syscall.SIGTERM)
+	sig_reload := make(chan os.Signal, 1)
+	signal.Notify(sig_reload, syscall.SIGUSR2)
+
+	// kill all subprocesses
+	TerminateAll()
 
 	go func() {
-		<-sig
-		TerminateAll()
-		os.Exit(0)
+		for {
+			select {
+			case <-sig_reload:
+				TerminateAll()
+			case <-sig_exit:
+				TerminateAll()
+				os.Exit(0)
+			}
+		}
 	}()
 }
 
@@ -32,25 +118,21 @@ func WrapProcess(cmd *exec.Cmd) {
 }
 
 func NewProcess(cmd *exec.Cmd) {
-	l.Lock()
-	defer l.Unlock()
-	subprocesses[cmd.Process.Pid] = cmd
+	addSubprocess(cmd.Process.Pid)
 }
 
 func RemoveProcess(cmd *exec.Cmd) {
-	l.Lock()
-	defer l.Unlock()
-
-	delete(subprocesses, cmd.Process.Pid)
+	removeSubprocess(cmd.Process.Pid)
 }
 
 func TerminateAll() {
 	l.Lock()
 	defer l.Unlock()
 
-	for _, cmd := range subprocesses {
-		if cmd.Process != nil {
-			syscall.Kill(-cmd.Process.Pid, syscall.SIGKILL)
-		}
+	for _, pid := range subprocesses() {
+		log.Info("Killing uncleaned subprocess %d", pid)
+		syscall.Kill(-pid, syscall.SIGKILL)
 	}
+
+	clearSubprocesses()
 }

+ 1 - 1
internal/server/server.go

@@ -12,7 +12,7 @@ func Run(config *app.Config) {
 	routine.InitPool(config.RoutinePoolSize)
 
 	// init process lifetime
-	process.Init()
+	process.Init(config)
 
 	// init plugin daemon
 	plugin_manager.Init(config)

+ 2 - 1
internal/types/app/config.go

@@ -9,7 +9,8 @@ type Config struct {
 	PluginRemoteInstallingHost string `envconfig:"PLUGIN_REMOTE_INSTALLING_HOST"`
 	PluginRemoteInstallingPort int16  `envconfig:"PLUGIN_REMOTE_INSTALLING_PORT"`
 
-	StoragePath string `envconfig:"STORAGE_PATH"`
+	StoragePath        string `envconfig:"STORAGE_PATH"`
+	ProcessCachingPath string `envconfig:"PROCESS_CACHING_PATH"`
 
 	Platform PlatformType `envconfig:"PLATFORM"`