소스 검색

feat: local process

Yeuoly 1 년 전
부모
커밋
6ce6fa058f

+ 5 - 1
.env.example

@@ -1,3 +1,7 @@
 DIFY_PLUGIN_HOST=127.0.0.1
 DIFY_PLUGIN_PORT=5002
-DIFY_PLUGIN_KEY=your_plugin_key
+DIFY_PLUGIN_KEY=your_plugin_key
+
+STORAGE_PATH=/var/dify/plugins
+
+PLATFORM=local

+ 53 - 28
cmd/test/main.go

@@ -1,33 +1,58 @@
 package main
 
-import (
-	"fmt"
-	"time"
-)
+import "fmt"
+
+// Speaker 接口
+type Speaker interface {
+	Speak() string
+}
+
+// Namer 接口
+type Namer interface {
+	Name() string
+}
+
+// Greeter 接口,组合了 Speaker 和 Namer 接口
+type Greeter interface {
+	Speaker
+	Namer
+}
+
+// Dog 结构体
+type Dog struct {
+	name string
+}
+
+// Dog 的 Speak 方法
+func (d *Dog) Speak() string {
+	return fmt.Sprintf("%s says woof!", d.name)
+}
+
+// Dog 的 Name 方法
+func (d *Dog) Name() string {
+	return d.name
+}
+
+// GermanShepherd 结构体,组合了 Dog
+type GermanShepherd struct {
+	Dog
+}
+
+// Labrador 结构体,组合了 Dog
+type Labrador struct {
+	Dog
+}
+
+// 使用 Greeter 接口的函数
+func GreetAndSpeak(g Greeter) {
+	fmt.Println("Hello", g.Name())
+	fmt.Println(g.Speak())
+}
 
 func main() {
-	ch := c()
-	for i := range ch {
-		if i == 20 {
-			break
-		}
-		fmt.Println(i)
-	}
-
-	for {
-		time.Sleep(1 * time.Second)
-	}
-}
-
-func c() <-chan int {
-	c := make(chan int)
-	go func() {
-		for i := 0; i < 10000; i++ {
-			fmt.Println("send", i)
-			c <- i
-		}
-
-		close(c)
-	}()
-	return c
+	gs := GermanShepherd{Dog{name: "Rex"}}
+	lb := Labrador{Dog{name: "Buddy"}}
+
+	GreetAndSpeak(&gs)
+	GreetAndSpeak(&lb)
 }

+ 1 - 1
examples/baisc_math/manifest.json

@@ -3,7 +3,7 @@
     "author": "Yeuoly",
     "name": "basic_math",
     "datetime": 1719812022,
-    "exec": ".venv/bin/python -m main",
+    "exec": "conda activate basic_math && python -m main",
     "resource": {
         "memory": 1048576,
         "storage": 1048576,

+ 5 - 0
internal/core/plugin_manager/aws_manager/environment.go

@@ -0,0 +1,5 @@
+package aws_manager
+
+func (r *AWSPluginRuntime) InitEnvironment() error {
+	return nil
+}

+ 6 - 0
internal/core/plugin_manager/aws_manager/run.go

@@ -0,0 +1,6 @@
+package aws_manager
+
+func (r *AWSPluginRuntime) StartPlugin() error {
+
+	return nil
+}

+ 7 - 0
internal/core/plugin_manager/aws_manager/type.go

@@ -0,0 +1,7 @@
+package aws_manager
+
+import "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+
+type AWSPluginRuntime struct {
+	entities.PluginRuntime
+}

+ 42 - 2
internal/core/plugin_manager/lifetime.go

@@ -1,7 +1,47 @@
 package plugin_manager
 
-import "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+import (
+	"time"
 
-func lifetime(r *entities.PluginRuntime) {
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
+)
 
+func lifetime(r entities.PluginRuntimeInterface) {
+	start_failed_times := 0
+
+	for !r.Stopped() {
+		if err := r.InitEnvironment(); err != nil {
+			log.Error("init environment failed: %s, retry in 30s", err.Error())
+			time.Sleep(30 * time.Second)
+			if start_failed_times == 3 {
+				log.Error(
+					"init environment failed 3 times, plugin %s:%s has been stopped",
+					r.Configuration().Name,
+					r.Configuration().Version,
+				)
+				r.Stop()
+			}
+			start_failed_times++
+			continue
+		}
+
+		start_failed_times = 0
+		// start plugin
+		if err := r.StartPlugin(); err != nil {
+			log.Error("start plugin failed: %s, retry in 30s", err.Error())
+			time.Sleep(30 * time.Second)
+			if start_failed_times == 3 {
+				log.Error(
+					"start plugin failed 3 times, plugin %s:%s has been stopped",
+					r.Configuration().Name,
+					r.Configuration().Version,
+				)
+				r.Stop()
+			}
+
+			start_failed_times++
+			continue
+		}
+	}
 }

+ 122 - 0
internal/core/plugin_manager/local_manager/environment.go

@@ -0,0 +1,122 @@
+package local_manager
+
+import (
+	"fmt"
+	"os"
+	"os/exec"
+	"path"
+	"strings"
+	"sync"
+	"time"
+
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
+)
+
+func (r *LocalPluginRuntime) InitEnvironment() error {
+	if _, err := os.Stat(path.Join(r.State.RelativePath, ".installed")); err == nil {
+		return nil
+	}
+
+	// execute init command
+	handle := exec.Command("bash", "install.sh")
+	handle.Dir = r.State.RelativePath
+
+	// get stdout and stderr
+	stdout, err := handle.StdoutPipe()
+	if err != nil {
+		return err
+	}
+	defer stdout.Close()
+
+	stderr, err := handle.StderrPipe()
+	if err != nil {
+		return err
+	}
+	defer stderr.Close()
+
+	// start command
+	if err := handle.Start(); err != nil {
+		return err
+	}
+	defer func() {
+		if handle.Process != nil {
+			handle.Process.Kill()
+		}
+	}()
+
+	var err_msg strings.Builder
+	var wg sync.WaitGroup
+	wg.Add(2)
+
+	last_active_at := time.Now()
+
+	go func() {
+		defer wg.Done()
+		// read stdout
+		buf := make([]byte, 1024)
+		for {
+			n, err := stdout.Read(buf)
+			if err != nil {
+				break
+			}
+			log.Info("installing %s:%s - %s", r.Config.Name, r.Config.Name, string(buf[:n]))
+			last_active_at = time.Now()
+		}
+	}()
+
+	go func() {
+		defer wg.Done()
+		// read stderr
+		buf := make([]byte, 1024)
+		for {
+			n, err := stderr.Read(buf)
+			if err != nil && err != os.ErrClosed {
+				last_active_at = time.Now()
+				err_msg.WriteString(string(buf[:n]))
+				break
+			} else if err == os.ErrClosed {
+				break
+			}
+
+			if n > 0 {
+				err_msg.WriteString(string(buf[:n]))
+				last_active_at = time.Now()
+			}
+		}
+	}()
+
+	go func() {
+		ticker := time.NewTicker(5 * time.Second)
+		defer ticker.Stop()
+		for range ticker.C {
+			if handle.ProcessState != nil && handle.ProcessState.Exited() {
+				break
+			}
+
+			if time.Since(last_active_at) > 60*time.Second {
+				handle.Process.Kill()
+				err_msg.WriteString("init process exited due to long time no activity")
+				break
+			}
+		}
+	}()
+
+	wg.Wait()
+
+	if err_msg.Len() > 0 {
+		return fmt.Errorf("install failed: %s", err_msg.String())
+	}
+
+	if err := handle.Wait(); err != nil {
+		return err
+	}
+
+	// create .installed file
+	f, err := os.Create(path.Join(r.State.RelativePath, ".installed"))
+	if err != nil {
+		return err
+	}
+	defer f.Close()
+
+	return nil
+}

+ 5 - 0
internal/core/plugin_manager/local_manager/run.go

@@ -0,0 +1,5 @@
+package local_manager
+
+func (r *LocalPluginRuntime) StartPlugin() error {
+	return nil
+}

+ 7 - 0
internal/core/plugin_manager/local_manager/type.go

@@ -0,0 +1,7 @@
+package local_manager
+
+import "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+
+type LocalPluginRuntime struct {
+	entities.PluginRuntime
+}

+ 1 - 1
internal/core/plugin_manager/manager.go

@@ -29,5 +29,5 @@ func Init(configuration *app.Config) {
 	// TODO: init plugin manager
 	log.Info("start plugin manager daemon...")
 
-	startWatcher(configuration.StoragePath)
+	startWatcher(configuration.StoragePath, configuration.Platform)
 }

+ 20 - 3
internal/core/plugin_manager/watcher.go

@@ -4,19 +4,36 @@ import (
 	"os"
 	"path"
 
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/aws_manager"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/local_manager"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
 )
 
-func startWatcher(path string) {
+func startWatcher(path string, platform string) {
 	// load local plugins firstly
 	for plugin := range loadNewPlugins(path) {
+		var plugin_interface entities.PluginRuntimeInterface
+
+		if platform == app.PLATFORM_AWS_LAMBDA {
+			plugin_interface = &aws_manager.AWSPluginRuntime{
+				PluginRuntime: plugin,
+			}
+		} else if platform == app.PLATFORM_LOCAL {
+			plugin_interface = &local_manager.LocalPluginRuntime{
+				PluginRuntime: plugin,
+			}
+		} else {
+			log.Error("unsupported platform: %s for plugin: %s", platform, plugin.Config.Name)
+			continue
+		}
 
 		log.Info("loaded plugin: %s:%s", plugin.Config.Name, plugin.Config.Version)
-		m.Store(plugin.Info.ID, &plugin)
+		m.Store(plugin.Config.Name, plugin_interface)
 
-		lifetime(&plugin)
+		lifetime(plugin_interface)
 	}
 }
 

+ 7 - 0
internal/types/app/config.go

@@ -5,4 +5,11 @@ type Config struct {
 	DifyPluginPort int16  `envconfig:"DIFY_PLUGIN_PORT"`
 	DifyPluginKey  string `envconfig:"DIFY_PLUGIN_KEY"`
 	StoragePath    string `envconfig:"STORAGE_PATH"`
+
+	Platform string `envconfig:"PLATFORM"`
 }
+
+const (
+	PLATFORM_LOCAL      = "local"
+	PLATFORM_AWS_LAMBDA = "aws_lambda"
+)

+ 49 - 0
internal/types/entities/config.go

@@ -0,0 +1,49 @@
+package entities
+
+type PluginConfiguration struct {
+	Version  string                      `json:"version"`
+	Author   string                      `json:"author"`
+	Name     string                      `json:"name"`
+	Datetime int64                       `json:"datetime"`
+	Exec     string                      `json:"exec"`
+	Resource PluginConfigurationResource `json:"resource"`
+	Meta     PluginConfigurationMeta     `json:"meta"`
+}
+
+type PluginConfigurationResource struct {
+	Memory     int64                         `json:"memory"`
+	Storage    int64                         `json:"storage"`
+	Permission PluginConfigurationPermission `json:"permission"`
+}
+
+type PluginConfigurationMeta struct {
+	Version string   `json:"version"`
+	Arch    []string `json:"arch"`
+	Runner  struct {
+		Language string `json:"language"`
+		Version  string `json:"version"`
+	} `json:"runner"`
+}
+
+type PluginExtension struct {
+	Tool  bool `json:"tool"`
+	Model bool `json:"model"`
+}
+
+type PluginConfigurationPermission struct {
+	Model PluginConfigurationPermissionModel `json:"model"`
+	Tool  PluginConfigurationPermissionTool  `json:"tool"`
+}
+
+type PluginConfigurationPermissionModel struct {
+	Enabled       bool `json:"enabled"`
+	LLM           bool `json:"llm"`
+	TextEmbedding bool `json:"text_embedding"`
+	Rerank        bool `json:"rerank"`
+	TTS           bool `json:"tts"`
+	STT           bool `json:"stt"`
+}
+
+type PluginConfigurationPermissionTool struct {
+	Enabled bool `json:"enabled"`
+}

+ 23 - 49
internal/types/entities/runtime.go

@@ -4,22 +4,32 @@ import (
 	"time"
 )
 
-const (
-	PLUGIN_RUNTIME_TYPE_LOCAL      = "local"
-	PLUGIN_RUNTIME_TYPE_AWS_LAMBDA = "aws_lambda"
+type (
+	PluginRuntime struct {
+		State     PluginRuntimeState  `json:"state"`
+		Config    PluginConfiguration `json:"config"`
+		Connector PluginConnector     `json:"-"`
+	}
+
+	PluginRuntimeInterface interface {
+		InitEnvironment() error
+		StartPlugin() error
+		Stopped() bool
+		Stop()
+		Configuration() *PluginConfiguration
+	}
 )
 
-type PluginRuntime struct {
-	Info      PluginRuntimeInfo   `json:"info"`
-	State     PluginRuntimeState  `json:"state"`
-	Config    PluginConfiguration `json:"config"`
-	Connector PluginConnector     `json:"-"`
+func (r *PluginRuntime) Stopped() bool {
+	return r.State.Stopped
 }
 
-type PluginRuntimeInfo struct {
-	Type    string `json:"type"`
-	ID      string `json:"id"`
-	Restart bool   `json:"restart"`
+func (r *PluginRuntime) Stop() {
+	r.State.Stopped = true
+}
+
+func (r *PluginRuntime) Configuration() *PluginConfiguration {
+	return &r.Config
 }
 
 type PluginRuntimeState struct {
@@ -28,46 +38,10 @@ type PluginRuntimeState struct {
 	RelativePath string     `json:"relative_path"`
 	ActiveAt     *time.Time `json:"active_at"`
 	DeadAt       *time.Time `json:"dead_at"`
+	Stopped      bool       `json:"stopped"`
 	Verified     bool       `json:"verified"`
 }
 
-type PluginConfiguration struct {
-	Version  string                      `json:"version"`
-	Author   string                      `json:"author"`
-	Name     string                      `json:"name"`
-	Datetime int64                       `json:"datetime"`
-	Resource PluginConfigurationResource `json:"resource"`
-}
-
-type PluginConfigurationResource struct {
-	Memory     int64                         `json:"memory"`
-	Storage    int64                         `json:"storage"`
-	Permission PluginConfigurationPermission `json:"permission"`
-}
-
-type PluginExtension struct {
-	Tool  bool `json:"tool"`
-	Model bool `json:"model"`
-}
-
-type PluginConfigurationPermission struct {
-	Model PluginConfigurationPermissionModel `json:"model"`
-	Tool  PluginConfigurationPermissionTool  `json:"tool"`
-}
-
-type PluginConfigurationPermissionModel struct {
-	Enabled       bool `json:"enabled"`
-	LLM           bool `json:"llm"`
-	TextEmbedding bool `json:"text_embedding"`
-	Rerank        bool `json:"rerank"`
-	TTS           bool `json:"tts"`
-	STT           bool `json:"stt"`
-}
-
-type PluginConfigurationPermissionTool struct {
-	Enabled bool `json:"enabled"`
-}
-
 type PluginConnector interface {
 	OnMessage(func([]byte))
 	Read([]byte) int