Browse Source

enhancement: remove force reloading process of each time process launched

Yeuoly 8 months ago
parent
commit
69dfe0d8f8

+ 0 - 7
cmd/commandline/plugin.go

@@ -11,7 +11,6 @@ import (
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/packager"
-	"github.com/langgenius/dify-plugin-daemon/internal/process"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
@@ -222,12 +221,6 @@ endpoint				- allow plugin to register endpoint`,
 			// init plugin manager
 			plugin_manager := plugin_manager.InitGlobalManager(config)
 
-			// start to schedule plugin subprocesses
-			process.Init(config)
-
-			// terminate all subprocesses when test finished
-			defer process.TerminateAll()
-
 			response, err := plugin_manager.TestPlugin(package_path_str, inputs, invoke_type, invoke_action, timeout)
 			if err != nil {
 				log.Error("failed to test plugin, package_path: %s, error: %v", package_path_str, err)

+ 1 - 1
internal/core/plugin_manager/lifetime.go

@@ -11,7 +11,7 @@ func (p *PluginManager) AddPluginRegisterHandler(handler func(r plugin_entities.
 	p.pluginRegisters = append(p.pluginRegisters, handler)
 }
 
-func (p *PluginManager) fullDuplexLifetime(r plugin_entities.PluginFullDuplexLifetime, launched_chan chan error) {
+func (p *PluginManager) fullDuplexLifecycle(r plugin_entities.PluginFullDuplexLifetime, launched_chan chan error) {
 	configuration := r.Configuration()
 
 	log.Info("new plugin logged in: %s", configuration.Identity())

+ 0 - 9
internal/core/plugin_manager/local_manager/environment.go

@@ -2,8 +2,6 @@ package local_manager
 
 import (
 	"fmt"
-	"os"
-	"path"
 
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/constants"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
@@ -21,13 +19,6 @@ func (r *LocalPluginRuntime) InitEnvironment() error {
 		return err
 	}
 
-	// create .installed file
-	f, err := os.Create(path.Join(r.State.WorkingPath, ".installed"))
-	if err != nil {
-		return err
-	}
-	defer f.Close()
-
 	return nil
 }
 

+ 6 - 0
internal/core/plugin_manager/local_manager/environment_python.go

@@ -19,6 +19,12 @@ import (
 func (p *LocalPluginRuntime) InitPythonEnvironment() error {
 	// check if virtual environment exists
 	if _, err := os.Stat(path.Join(p.State.WorkingPath, ".venv")); err == nil {
+		// setup python interpreter path
+		python_path, err := filepath.Abs(path.Join(p.State.WorkingPath, ".venv/bin/python"))
+		if err != nil {
+			return fmt.Errorf("failed to find python: %s", err)
+		}
+		p.python_interpreter_path = python_path
 		return nil
 	}
 

+ 0 - 9
internal/core/plugin_manager/local_manager/run.go

@@ -8,7 +8,6 @@ import (
 	"os/exec"
 	"sync"
 
-	"github.com/langgenius/dify-plugin-daemon/internal/process"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/constants"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
@@ -71,10 +70,6 @@ func (r *LocalPluginRuntime) StartPlugin() error {
 	// add env INSTALL_METHOD=local
 	e.Env = append(e.Env, "INSTALL_METHOD=local", "PATH="+os.Getenv("PATH"))
 
-	// NOTE: subprocess will be taken care of by subprocess manager
-	// ensure all subprocess are killed when parent process exits, especially on Golang debugger
-	process.WrapProcess(e)
-
 	// get writer
 	stdin, err := e.StdinPipe()
 	if err != nil {
@@ -104,10 +99,6 @@ func (r *LocalPluginRuntime) StartPlugin() error {
 		return fmt.Errorf("start plugin failed: %s", err.Error())
 	}
 
-	// add to subprocess manager
-	process.NewProcess(e)
-	defer process.RemoveProcess(e)
-
 	defer func() {
 		// wait for plugin to exit
 		err = e.Wait()

+ 4 - 1
internal/core/plugin_manager/local_manager/type.go

@@ -15,7 +15,10 @@ type LocalPluginRuntime struct {
 	io_identity string
 
 	// python interpreter path, currently only support python
-	python_interpreter_path         string
+	python_interpreter_path string
+
+	// to create a new python virtual environment, we need a default python interpreter
+	// by using its venv module
 	default_python_interpreter_path string
 
 	wait_chan_lock    sync.Mutex

+ 1 - 1
internal/core/plugin_manager/tester.go

@@ -66,7 +66,7 @@ func (p *PluginManager) TestPlugin(
 			}
 		}()
 		// delete the plugin from the storage when the plugin is stopped
-		p.fullDuplexLifetime(local_plugin_runtime, nil)
+		p.fullDuplexLifecycle(local_plugin_runtime, nil)
 	})
 
 	// wait for the plugin to start

+ 7 - 7
internal/core/plugin_manager/watcher.go

@@ -24,9 +24,6 @@ import (
 
 func (p *PluginManager) startLocalWatcher() {
 	go func() {
-		// delete all plugins in working directory
-		os.RemoveAll(p.workingDirectory)
-
 		log.Info("start to handle new plugins in path: %s", p.pluginStoragePath)
 		p.handleNewLocalPlugins()
 		for range time.NewTicker(time.Second * 30).C {
@@ -67,7 +64,7 @@ func (p *PluginManager) startRemoteWatcher(config *app.Config) {
 						}
 						p.m.Delete(identity.String())
 					}()
-					p.fullDuplexLifetime(rpr, nil)
+					p.fullDuplexLifecycle(rpr, nil)
 				})
 			})
 		}()
@@ -132,8 +129,11 @@ func (p *PluginManager) launchLocal(plugin_package_path string) (
 		return nil, nil, fmt.Errorf("plugin decoder is not a zip decoder")
 	}
 
-	if err := decoder.ExtractTo(plugin.runtime.State.WorkingPath); err != nil {
-		return nil, nil, errors.Join(err, fmt.Errorf("extract plugin to working directory error"))
+	// check if the working directory exists, if not, create it, otherwise, launch it directly
+	if _, err := os.Stat(plugin.runtime.State.WorkingPath); err != nil {
+		if err := decoder.ExtractTo(plugin.runtime.State.WorkingPath); err != nil {
+			return nil, nil, errors.Join(err, fmt.Errorf("extract plugin to working directory error"))
+		}
 	}
 
 	success := false
@@ -190,7 +190,7 @@ func (p *PluginManager) launchLocal(plugin_package_path string) (
 			<-p.maxLaunchingLock
 		})
 
-		p.fullDuplexLifetime(local_plugin_runtime, launched_chan)
+		p.fullDuplexLifecycle(local_plugin_runtime, launched_chan)
 	})
 
 	return local_plugin_runtime, launched_chan, nil

+ 0 - 141
internal/process/manager.go

@@ -1,141 +0,0 @@
-package process
-
-import (
-	"bytes"
-	"os"
-	"os/exec"
-	"os/signal"
-	"path"
-	"strconv"
-	"strings"
-	"sync"
-	"syscall"
-
-	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
-)
-
-var (
-	l               *sync.Mutex
-	subprocess_path string
-)
-
-func subprocesses() []int {
-	if _, err := os.Stat(subprocess_path); err != nil {
-		if err == os.ErrNotExist {
-			os.MkdirAll(path.Dir(subprocess_path), 0755)
-			os.WriteFile(subprocess_path, []byte{}, 0644)
-		} else {
-			log.Error("Error checking subprocesses file")
-			return []int{}
-		}
-	}
-
-	data, err := os.ReadFile(subprocess_path)
-	if err != nil {
-		if !strings.Contains(err.Error(), "no such file or directory") {
-			log.Error("Error reading subprocesses file")
-		}
-		return []int{}
-	}
-	nums := bytes.Split(data, []byte("\n"))
-	procs := make([]int, 0)
-	for _, num := range nums {
-		if len(num) == 0 {
-			continue
-		}
-		proc, err := strconv.Atoi(string(num))
-		if err != nil {
-			log.Error("Error parsing subprocesses file")
-			return []int{}
-		}
-		procs = append(procs, proc)
-	}
-
-	return procs
-}
-
-func addSubprocess(pid int) {
-	l.Lock()
-	defer l.Unlock()
-
-	procs := subprocesses()
-	procs = append(procs, pid)
-	data := []byte{}
-	for _, proc := range procs {
-		data = append(data, []byte(strconv.Itoa(proc)+"\n")...)
-	}
-	os.WriteFile(subprocess_path, data, 0644)
-}
-
-func removeSubprocess(pid int) {
-	l.Lock()
-	defer l.Unlock()
-
-	procs := subprocesses()
-	new_procs := []int{}
-	for _, proc := range procs {
-		if proc == pid {
-			continue
-		}
-		new_procs = append(new_procs, proc)
-	}
-	data := []byte{}
-	for _, proc := range new_procs {
-		data = append(data, []byte(strconv.Itoa(proc)+"\n")...)
-	}
-	os.WriteFile(subprocess_path, data, 0644)
-}
-
-func clearSubprocesses() {
-	os.WriteFile(subprocess_path, []byte{}, 0644)
-}
-
-func Init(config *app.Config) {
-	l = &sync.Mutex{}
-	subprocess_path = config.ProcessCachingPath
-
-	sig_exit := make(chan os.Signal, 1)
-	signal.Notify(sig_exit, os.Interrupt, syscall.SIGTERM)
-	sig_reload := make(chan os.Signal, 1)
-	signal.Notify(sig_reload, syscall.SIGUSR2)
-
-	// kill all subprocesses
-	TerminateAll()
-
-	go func() {
-		for {
-			select {
-			case <-sig_reload:
-				TerminateAll()
-			case <-sig_exit:
-				TerminateAll()
-				os.Exit(0)
-			}
-		}
-	}()
-}
-
-func WrapProcess(cmd *exec.Cmd) {
-	cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
-}
-
-func NewProcess(cmd *exec.Cmd) {
-	addSubprocess(cmd.Process.Pid)
-}
-
-func RemoveProcess(cmd *exec.Cmd) {
-	removeSubprocess(cmd.Process.Pid)
-}
-
-func TerminateAll() {
-	l.Lock()
-	defer l.Unlock()
-
-	for _, pid := range subprocesses() {
-		log.Info("Killing uncleaned subprocess %d", pid)
-		syscall.Kill(-pid, syscall.SIGKILL)
-	}
-
-	clearSubprocesses()
-}

+ 0 - 4
internal/server/server.go

@@ -5,7 +5,6 @@ import (
 	"github.com/langgenius/dify-plugin-daemon/internal/core/persistence"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/db"
-	"github.com/langgenius/dify-plugin-daemon/internal/process"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
 )
@@ -17,9 +16,6 @@ func (app *App) Run(config *app.Config) {
 	// init db
 	db.Init(config)
 
-	// init process lifetime
-	process.Init(config)
-
 	// create manager
 	manager := plugin_manager.InitGlobalManager(config)