소스 검색

refactor: local plugin runtime

Yeuoly 11 달 전
부모
커밋
1697a0ec75

+ 6 - 97
internal/core/plugin_manager/local_manager/environment.go

@@ -3,16 +3,10 @@ package local_manager
 import (
 	"fmt"
 	"os"
-	"os/exec"
 	"path"
-	"strings"
-	"sync"
-	"syscall"
-	"time"
 
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/constants"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
 )
 
 func (r *LocalPluginRuntime) InitEnvironment() error {
@@ -20,101 +14,16 @@ func (r *LocalPluginRuntime) InitEnvironment() error {
 		return nil
 	}
 
-	// execute init command, create
-	// TODO
-	handle := exec.Command("bash")
-	handle.Dir = r.State.AbsolutePath
-	handle.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
-
-	// get stdout and stderr
-	stdout, err := handle.StdoutPipe()
-	if err != nil {
-		return err
+	var err error
+	if r.Config.Meta.Runner.Language == constants.Python {
+		err = r.InitPythonEnvironment()
+	} else {
+		return fmt.Errorf("unsupported language: %s", r.Config.Meta.Runner.Language)
 	}
-	defer stdout.Close()
 
-	stderr, err := handle.StderrPipe()
 	if err != nil {
 		return err
 	}
-	defer stderr.Close()
-
-	// start command
-	if err := handle.Start(); err != nil {
-		return err
-	}
-	defer func() {
-		if handle.Process != nil {
-			handle.Process.Kill()
-		}
-	}()
-
-	var err_msg strings.Builder
-	var wg sync.WaitGroup
-	wg.Add(2)
-
-	last_active_at := time.Now()
-
-	routine.Submit(func() {
-		defer wg.Done()
-		// read stdout
-		buf := make([]byte, 1024)
-		for {
-			n, err := stdout.Read(buf)
-			if err != nil {
-				break
-			}
-			log.Info("installing %s - %s", r.Config.Identity(), string(buf[:n]))
-			last_active_at = time.Now()
-		}
-	})
-
-	routine.Submit(func() {
-		defer wg.Done()
-		// read stderr
-		buf := make([]byte, 1024)
-		for {
-			n, err := stderr.Read(buf)
-			if err != nil && err != os.ErrClosed {
-				last_active_at = time.Now()
-				err_msg.WriteString(string(buf[:n]))
-				break
-			} else if err == os.ErrClosed {
-				break
-			}
-
-			if n > 0 {
-				err_msg.WriteString(string(buf[:n]))
-				last_active_at = time.Now()
-			}
-		}
-	})
-
-	routine.Submit(func() {
-		ticker := time.NewTicker(5 * time.Second)
-		defer ticker.Stop()
-		for range ticker.C {
-			if handle.ProcessState != nil && handle.ProcessState.Exited() {
-				break
-			}
-
-			if time.Since(last_active_at) > 60*time.Second {
-				handle.Process.Kill()
-				err_msg.WriteString("init process exited due to long time no activity")
-				break
-			}
-		}
-	})
-
-	wg.Wait()
-
-	if err_msg.Len() > 0 {
-		return fmt.Errorf("install failed: %s", err_msg.String())
-	}
-
-	if err := handle.Wait(); err != nil {
-		return err
-	}
 
 	// create .installed file
 	f, err := os.Create(path.Join(r.State.AbsolutePath, ".installed"))

+ 158 - 9
internal/core/plugin_manager/local_manager/environment_python.go

@@ -1,19 +1,168 @@
 package local_manager
 
-import "os/exec"
+import (
+	"context"
+	"fmt"
+	"os"
+	"os/exec"
+	"path"
+	"path/filepath"
+	"strings"
+	"sync"
+	"time"
 
-func (p *LocalPluginRuntime) InitPythonEnvironment(requirements_txt string) error {
-	// create virtual env
-	identity, err := p.Identity()
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
+)
+
+func (p *LocalPluginRuntime) InitPythonEnvironment() error {
+	// execute init command, create a virtual environment
+	success := false
+
+	cmd := exec.Command("bash", "-c", "python3 -m venv .venv")
+	cmd.Dir = p.State.WorkingPath
+	if err := cmd.Run(); err != nil {
+		return fmt.Errorf("failed to create virtual environment: %s", err)
+	}
+	defer func() {
+		// if init failed, remove the .venv directory
+		if !success {
+			os.RemoveAll(path.Join(p.State.WorkingPath, ".venv"))
+		}
+	}()
+
+	// wait for the virtual environment to be created
+	if err := cmd.Wait(); err != nil {
+		return fmt.Errorf("failed to create virtual environment: %s", err)
+	}
+
+	// try find python interpreter and pip
+	pip_path, err := filepath.Abs(path.Join(p.State.WorkingPath, ".venv/bin/pip"))
 	if err != nil {
-		return err
+		return fmt.Errorf("failed to find pip: %s", err)
 	}
 
-	cmd := exec.Command("python", "-m", "venv", identity.String())
+	python_path, err := filepath.Abs(path.Join(p.State.WorkingPath, ".venv/bin/python"))
+	if err != nil {
+		return fmt.Errorf("failed to find python: %s", err)
+	}
+
+	if _, err := os.Stat(pip_path); err != nil {
+		return fmt.Errorf("failed to find pip: %s", err)
+	}
 
-	// set working directory
-	cmd.Dir = p.WorkingPath
+	if _, err := os.Stat(python_path); err != nil {
+		return fmt.Errorf("failed to find python: %s", err)
+	}
+
+	p.python_interpreter_path = python_path
+
+	// try find requirements.txt
+	requirements_path := path.Join(p.State.WorkingPath, "requirements.txt")
+	if _, err := os.Stat(requirements_path); err != nil {
+		return fmt.Errorf("failed to find requirements.txt: %s", err)
+	}
+
+	// install dependencies
+	ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
+	defer cancel()
+
+	cmd = exec.CommandContext(ctx, pip_path, "install", "-r", requirements_path)
+	cmd.Dir = p.State.WorkingPath
+	if err := cmd.Run(); err != nil {
+		return fmt.Errorf("failed to install dependencies: %s", err)
+	}
+
+	// get stdout and stderr
+	stdout, err := cmd.StdoutPipe()
+	if err != nil {
+		return fmt.Errorf("failed to get stdout: %s", err)
+	}
+	defer stdout.Close()
+
+	stderr, err := cmd.StderrPipe()
+	if err != nil {
+		return fmt.Errorf("failed to get stderr: %s", err)
+	}
+	defer stderr.Close()
+
+	// start command
+	if err := cmd.Start(); err != nil {
+		return fmt.Errorf("failed to start command: %s", err)
+	}
+	defer func() {
+		if cmd.Process != nil {
+			cmd.Process.Kill()
+		}
+	}()
+
+	var err_msg strings.Builder
+	var wg sync.WaitGroup
+	wg.Add(2)
+
+	last_active_at := time.Now()
+
+	routine.Submit(func() {
+		defer wg.Done()
+		// read stdout
+		buf := make([]byte, 1024)
+		for {
+			n, err := stdout.Read(buf)
+			if err != nil {
+				break
+			}
+			log.Info("installing %s - %s", p.Config.Identity(), string(buf[:n]))
+			last_active_at = time.Now()
+		}
+	})
+
+	routine.Submit(func() {
+		defer wg.Done()
+		// read stderr
+		buf := make([]byte, 1024)
+		for {
+			n, err := stderr.Read(buf)
+			if err != nil && err != os.ErrClosed {
+				last_active_at = time.Now()
+				err_msg.WriteString(string(buf[:n]))
+				break
+			} else if err == os.ErrClosed {
+				break
+			}
+
+			if n > 0 {
+				err_msg.WriteString(string(buf[:n]))
+				last_active_at = time.Now()
+			}
+		}
+	})
+
+	routine.Submit(func() {
+		ticker := time.NewTicker(5 * time.Second)
+		defer ticker.Stop()
+		for range ticker.C {
+			if cmd.ProcessState != nil && cmd.ProcessState.Exited() {
+				break
+			}
+
+			if time.Since(last_active_at) > 60*time.Second {
+				cmd.Process.Kill()
+				err_msg.WriteString("init process exited due to long time no activity")
+				break
+			}
+		}
+	})
+
+	wg.Wait()
+
+	if err_msg.Len() > 0 {
+		return fmt.Errorf("install failed: %s", err_msg.String())
+	}
+
+	if err := cmd.Wait(); err != nil {
+		return fmt.Errorf("failed to install dependencies: %s", err)
+	}
 
-	// TODO
+	success = true
 	return nil
 }

+ 18 - 3
internal/core/plugin_manager/local_manager/run.go

@@ -8,6 +8,7 @@ import (
 	"sync"
 
 	"github.com/langgenius/dify-plugin-daemon/internal/process"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/constants"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
@@ -33,14 +34,28 @@ func (r *LocalPluginRuntime) Type() plugin_entities.PluginRuntimeType {
 	return plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL
 }
 
+func (r *LocalPluginRuntime) getCmd() (*exec.Cmd, error) {
+	if r.Config.Meta.Runner.Language == constants.Python {
+		cmd := exec.Command(r.python_interpreter_path, "-m", r.Config.Meta.Runner.Entrypoint)
+		cmd.Dir = r.State.WorkingPath
+		return cmd, nil
+	}
+
+	return nil, fmt.Errorf("unsupported language: %s", r.Config.Meta.Runner.Language)
+}
+
 func (r *LocalPluginRuntime) StartPlugin() error {
 	defer log.Info("plugin %s stopped", r.Config.Identity())
 
 	r.init()
+
 	// start plugin
-	// TODO: use exec.Command("bash") instead of exec.Command("bash", r.Config.Execution.Launch)
-	e := exec.Command("bash")
-	e.Dir = r.State.AbsolutePath
+	e, err := r.getCmd()
+	if err != nil {
+		return err
+	}
+
+	e.Dir = r.State.WorkingPath
 	// add env INSTALL_METHOD=local
 	e.Env = append(e.Env, "INSTALL_METHOD=local", "PATH="+os.Getenv("PATH"))
 

+ 3 - 0
internal/core/plugin_manager/local_manager/type.go

@@ -11,4 +11,7 @@ type LocalPluginRuntime struct {
 
 	wait_chan   chan bool
 	io_identity string
+
+	// python interpreter path, currently only support python
+	python_interpreter_path string
 }