Yeuoly 1 рік тому
батько
коміт
c714d99250

+ 2 - 0
.env.example

@@ -30,5 +30,7 @@ AWS_S3_BUCKET=dify-plugins
 DIFY_INVOCATION_CONNECTION_IDLE_TIMEOUT=120
 
 STORAGE_PLUGIN_PATH=examples
+PLUGIN_WORKING_PATH=./cwd
+MAX_PLUGIN_PACKAGE_SIZE=52428800
 
 PLATFORM=local

+ 1 - 0
cmd/server/main.go

@@ -40,6 +40,7 @@ func setDefault(config *app.Config) {
 	setDefaultInt(&config.DifyInvocationConnectionIdleTimeout, 120)
 	setDefaultInt(&config.PluginRemoteInstallServerEventLoopNums, 8)
 	setDefaultInt(&config.PluginRemoteInstallingMaxConn, 128)
+	setDefaultInt(&config.MaxPluginPackageSize, 52428800)
 	setDefaultBool(&config.PluginRemoteInstallingEnabled, true)
 	setDefaultBool(&config.PluginWebhookEnabled, true)
 	setDefaultString(&config.DBSslMode, "disable")

+ 2 - 0
internal/cluster/plugin_test.go

@@ -5,12 +5,14 @@ import (
 	"time"
 
 	"github.com/google/uuid"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 )
 
 type fakePlugin struct {
 	entities.PluginRuntime
+	positive_manager.PositivePluginRuntime
 }
 
 func (r *fakePlugin) InitEnvironment() error {

+ 2 - 0
internal/core/plugin_daemon/backwards_invocation/task_test.go

@@ -4,12 +4,14 @@ import (
 	"testing"
 
 	"github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 )
 
 type TPluginRuntime struct {
 	entities.PluginRuntime
+	positive_manager.PositivePluginRuntime
 }
 
 func (r *TPluginRuntime) InitEnvironment() error {

+ 1 - 1
internal/core/plugin_manager/aws_manager/environment.go

@@ -42,7 +42,7 @@ func (r *AWSPluginRuntime) InitEnvironment() error {
 	r.Log("Creating new lambda function")
 
 	// create lambda function
-	packager := NewPackager(r, r.decoder)
+	packager := NewPackager(r, r.Decoder)
 	context, err := packager.Pack()
 	if err != nil {
 		return err

+ 2 - 0
internal/core/plugin_manager/aws_manager/packager_test.go

@@ -11,6 +11,7 @@ import (
 	"path/filepath"
 	"testing"
 
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/constants"
@@ -19,6 +20,7 @@ import (
 
 type TPluginRuntime struct {
 	entities.PluginRuntime
+	positive_manager.PositivePluginRuntime
 }
 
 func (r *TPluginRuntime) InitEnvironment() error {

+ 0 - 4
internal/core/plugin_manager/aws_manager/type.go

@@ -2,7 +2,6 @@ package aws_manager
 
 import (
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
 )
 
@@ -13,7 +12,4 @@ type AWSPluginRuntime struct {
 	// access url for the lambda function
 	lambda_url  string
 	lambda_name string
-
-	// plugin decoder used to manage the plugin
-	decoder decoder.PluginDecoder
 }

+ 3 - 0
internal/core/plugin_manager/lifetime.go

@@ -14,6 +14,9 @@ func (p *PluginManager) lifetime(config *app.Config, r entities.PluginRuntimeInt
 	log.Info("new plugin logged in: %s", configuration.Identity())
 	defer log.Info("plugin %s has exited", configuration.Identity())
 
+	// cleanup plugin runtime state and working directory
+	defer r.Cleanup()
+
 	// stop plugin when the plugin reaches the end of its lifetime
 	defer r.Stop()
 

+ 0 - 24
internal/core/plugin_manager/local_manager/environment.go

@@ -10,8 +10,6 @@ import (
 	"syscall"
 	"time"
 
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/checksum"
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
 )
@@ -125,25 +123,3 @@ func (r *LocalPluginRuntime) InitEnvironment() error {
 
 	return nil
 }
-
-func (r *LocalPluginRuntime) calculateChecksum() string {
-	plugin_decoder, err := decoder.NewFSPluginDecoder(r.LocalPath)
-	if err != nil {
-		return ""
-	}
-
-	checksum, err := checksum.CalculateChecksum(plugin_decoder)
-	if err != nil {
-		return ""
-	}
-
-	return checksum
-}
-
-func (r *LocalPluginRuntime) Checksum() string {
-	if r.checksum == "" {
-		r.checksum = r.calculateChecksum()
-	}
-
-	return r.checksum
-}

+ 0 - 1
internal/core/plugin_manager/local_manager/type.go

@@ -11,5 +11,4 @@ type LocalPluginRuntime struct {
 
 	wait_chan   chan bool
 	io_identity string
-	checksum    string
 }

+ 6 - 1
internal/core/plugin_manager/manager.go

@@ -16,6 +16,9 @@ type PluginManager struct {
 	m sync.Map
 
 	cluster *cluster.Cluster
+
+	maxPluginPackageSize int64
+	workingDirectory     string
 }
 
 var (
@@ -24,7 +27,9 @@ var (
 
 func InitGlobalPluginManager(cluster *cluster.Cluster, configuration *app.Config) {
 	manager = &PluginManager{
-		cluster: cluster,
+		cluster:              cluster,
+		maxPluginPackageSize: configuration.MaxPluginPackageSize,
+		workingDirectory:     configuration.PluginWorkingPath,
 	}
 	manager.Init(configuration)
 }

+ 33 - 0
internal/core/plugin_manager/positive_manager/environment.go

@@ -1 +1,34 @@
 package positive_manager
+
+import (
+	"os"
+
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/checksum"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
+)
+
+func (r *PositivePluginRuntime) calculateChecksum() string {
+	plugin_decoder, err := decoder.NewFSPluginDecoder(r.LocalPackagePath)
+	if err != nil {
+		return ""
+	}
+
+	checksum, err := checksum.CalculateChecksum(plugin_decoder)
+	if err != nil {
+		return ""
+	}
+
+	return checksum
+}
+
+func (r *PositivePluginRuntime) Checksum() string {
+	if r.checksum == "" {
+		r.checksum = r.calculateChecksum()
+	}
+
+	return r.checksum
+}
+
+func (r *PositivePluginRuntime) Cleanup() {
+	os.RemoveAll(r.WorkingPath)
+}

+ 8 - 1
internal/core/plugin_manager/positive_manager/types.go

@@ -1,5 +1,12 @@
 package positive_manager
 
+import "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
+
 type PositivePluginRuntime struct {
-	LocalPath string
+	LocalPackagePath string
+	WorkingPath      string
+	// plugin decoder used to manage the plugin
+	Decoder decoder.PluginDecoder
+
+	checksum string
 }

+ 4 - 0
internal/core/plugin_manager/remote_manager/environment.go

@@ -7,3 +7,7 @@ import (
 func (r *RemotePluginRuntime) Identity() (string, error) {
 	return strings.Join([]string{r.Configuration().Identity(), r.tenant_id}, ":"), nil
 }
+
+func (r *RemotePluginRuntime) Cleanup() {
+	// no cleanup needed
+}

+ 113 - 45
internal/core/plugin_manager/watcher.go

@@ -1,6 +1,8 @@
 package plugin_manager
 
 import (
+	"fmt"
+	"io"
 	"os"
 	"path"
 	"time"
@@ -9,6 +11,8 @@ import (
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/local_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/remote_manager"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/verifier"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
@@ -51,20 +55,24 @@ func (p *PluginManager) handleNewPlugins(config *app.Config) {
 
 		if config.Platform == app.PLATFORM_AWS_LAMBDA {
 			plugin_interface = &aws_manager.AWSPluginRuntime{
-				PluginRuntime: plugin,
+				PluginRuntime: plugin.Runtime,
 				PositivePluginRuntime: positive_manager.PositivePluginRuntime{
-					LocalPath: plugin.State.AbsolutePath,
+					LocalPackagePath: plugin.Runtime.State.AbsolutePath,
+					WorkingPath:      plugin.Runtime.State.WorkingPath,
+					Decoder:          plugin.Decoder,
 				},
 			}
 		} else if config.Platform == app.PLATFORM_LOCAL {
 			plugin_interface = &local_manager.LocalPluginRuntime{
-				PluginRuntime: plugin,
+				PluginRuntime: plugin.Runtime,
 				PositivePluginRuntime: positive_manager.PositivePluginRuntime{
-					LocalPath: plugin.State.AbsolutePath,
+					LocalPackagePath: plugin.Runtime.State.AbsolutePath,
+					WorkingPath:      plugin.Runtime.State.WorkingPath,
+					Decoder:          plugin.Decoder,
 				},
 			}
 		} else {
-			log.Error("unsupported platform: %s for plugin: %s", config.Platform, plugin.Config.Name)
+			log.Error("unsupported platform: %s for plugin: %s", config.Platform, plugin.Runtime.Config.Name)
 			continue
 		}
 
@@ -74,9 +82,14 @@ func (p *PluginManager) handleNewPlugins(config *app.Config) {
 	}
 }
 
+type pluginRuntimeWithDecoder struct {
+	Runtime entities.PluginRuntime
+	Decoder decoder.PluginDecoder
+}
+
 // chan should be closed after using that
-func (p *PluginManager) loadNewPlugins(root_path string) <-chan entities.PluginRuntime {
-	ch := make(chan entities.PluginRuntime)
+func (p *PluginManager) loadNewPlugins(root_path string) <-chan *pluginRuntimeWithDecoder {
+	ch := make(chan *pluginRuntimeWithDecoder)
 
 	plugins, err := os.ReadDir(root_path)
 	if err != nil {
@@ -87,33 +100,14 @@ func (p *PluginManager) loadNewPlugins(root_path string) <-chan entities.PluginR
 
 	routine.Submit(func() {
 		for _, plugin := range plugins {
-			if plugin.IsDir() {
-				configuration_path := path.Join(root_path, plugin.Name(), "manifest.yaml")
-				configuration, err := parsePluginConfig(configuration_path)
+			if !plugin.IsDir() {
+				plugin, err := p.loadPlugin(path.Join(root_path, plugin.Name()))
 				if err != nil {
-					log.Error("parse plugin config error: %v", err)
-					continue
-				}
-
-				status := p.verifyPluginStatus(configuration)
-				if status.exist {
+					log.Error("load plugin error: %v", err)
 					continue
 				}
 
-				// check if .verified file exists
-				verified_path := path.Join(root_path, plugin.Name(), ".verified")
-				_, err = os.Stat(verified_path)
-
-				ch <- entities.PluginRuntime{
-					Config: *configuration,
-					State: entities.PluginRuntimeState{
-						Status:       entities.PLUGIN_RUNTIME_STATUS_PENDING,
-						Restarts:     0,
-						AbsolutePath: path.Join(root_path, plugin.Name()),
-						ActiveAt:     nil,
-						Verified:     err == nil,
-					},
-				}
+				ch <- plugin
 			}
 		}
 
@@ -123,33 +117,107 @@ func (p *PluginManager) loadNewPlugins(root_path string) <-chan entities.PluginR
 	return ch
 }
 
-func parsePluginConfig(configuration_path string) (*plugin_entities.PluginDeclaration, error) {
-	text, err := os.ReadFile(configuration_path)
+func (p *PluginManager) loadPlugin(plugin_path string) (*pluginRuntimeWithDecoder, error) {
+	pack, err := os.Open(plugin_path)
 	if err != nil {
+		log.Error("open plugin package error: %v", err)
 		return nil, err
 	}
+	defer pack.Close()
 
-	result, err := plugin_entities.UnmarshalPluginDeclarationFromYaml(text)
+	if info, err := pack.Stat(); err != nil {
+		log.Error("get plugin package info error: %v", err)
+		return nil, err
+	} else if info.Size() > p.maxPluginPackageSize {
+		log.Error("plugin package size is too large: %d", info.Size())
+		return nil, err
+	}
+
+	plugin_zip, err := io.ReadAll(pack)
 	if err != nil {
+		log.Error("read plugin package error: %v", err)
 		return nil, err
 	}
 
-	return result, nil
-}
+	decoder, err := decoder.NewZipPluginDecoder(plugin_zip)
+	if err != nil {
+		log.Error("create plugin decoder error: %v", err)
+		return nil, err
+	}
 
-type pluginStatusResult struct {
-	exist bool
-}
+	// get manifest
+	manifest, err := decoder.Manifest()
+	if err != nil {
+		log.Error("get plugin manifest error: %v", err)
+		return nil, err
+	}
+
+	// check if already exists
+	if _, exist := p.m.Load(manifest.Identity()); exist {
+		log.Warn("plugin already exists: %s", manifest.Identity())
+		return nil, fmt.Errorf("plugin already exists: %s", manifest.Identity())
+	}
 
-func (p *PluginManager) verifyPluginStatus(config *plugin_entities.PluginDeclaration) pluginStatusResult {
-	_, exist := p.checkPluginExist(config.Identity())
-	if exist {
-		return pluginStatusResult{
-			exist: true,
+	plugin_working_path := path.Join(p.workingDirectory, manifest.Identity())
+
+	// check if working directory exists
+	if _, err := os.Stat(plugin_working_path); err == nil {
+		log.Warn("plugin working directory already exists: %s", plugin_working_path)
+		return nil, fmt.Errorf("plugin working directory already exists: %s", plugin_working_path)
+	}
+
+	// copy to working directory
+	if err := decoder.Walk(func(filename, dir string) error {
+		working_path := path.Join(plugin_working_path, dir)
+		// check if directory exists
+		if err := os.MkdirAll(working_path, 0755); err != nil {
+			return err
+		}
+
+		bytes, err := decoder.ReadFile(filename)
+		if err != nil {
+			return err
+		}
+
+		filename = path.Join(working_path, filename)
+
+		// copy file
+		if err := os.WriteFile(filename, bytes, 0644); err != nil {
+			return err
 		}
+
+		return nil
+	}); err != nil {
+		log.Error("copy plugin to working directory error: %v", err)
+		return nil, err
 	}
 
-	return pluginStatusResult{
-		exist: false,
+	return &pluginRuntimeWithDecoder{
+		Runtime: entities.PluginRuntime{
+			Config: manifest,
+			State: entities.PluginRuntimeState{
+				Status:       entities.PLUGIN_RUNTIME_STATUS_PENDING,
+				Restarts:     0,
+				AbsolutePath: plugin_path,
+				WorkingPath:  plugin_working_path,
+				ActiveAt:     nil,
+				Verified:     verifier.VerifyPlugin(decoder) == nil,
+			},
+		},
+		Decoder: decoder,
+	}, nil
+}
+
+func parsePluginConfig(configuration_path string) (*plugin_entities.PluginDeclaration, error) {
+	text, err := os.ReadFile(configuration_path)
+	if err != nil {
+		return nil, err
 	}
+
+	result, err := plugin_entities.UnmarshalPluginDeclarationFromYaml(text)
+	if err != nil {
+		return nil, err
+	}
+
+	return result, nil
 }

+ 5 - 0
internal/core/plugin_packager/decoder/decoder.go

@@ -3,6 +3,8 @@ package decoder
 import (
 	"io"
 	"io/fs"
+
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 )
 
 // PluginDecoder is an interface for decoding and interacting with plugin files
@@ -32,4 +34,7 @@ type PluginDecoder interface {
 
 	// CreateTime returns the creation time of the plugin as a Unix timestamp
 	CreateTime() (int64, error)
+
+	// Manifest returns the manifest of the plugin
+	Manifest() (plugin_entities.PluginDeclaration, error)
 }

+ 30 - 0
internal/core/plugin_packager/decoder/fs.go

@@ -7,6 +7,9 @@ import (
 	"os"
 	"path/filepath"
 	"strings"
+
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
 )
 
 var (
@@ -20,6 +23,8 @@ type FSPluginDecoder struct {
 	root string
 
 	fs fs.FS
+
+	pluginDeclaration *plugin_entities.PluginDeclaration
 }
 
 func NewFSPluginDecoder(root string) (*FSPluginDecoder, error) {
@@ -32,6 +37,11 @@ func NewFSPluginDecoder(root string) (*FSPluginDecoder, error) {
 		return nil, err
 	}
 
+	// read the manifest file
+	if _, err := decoder.Manifest(); err != nil {
+		return nil, err
+	}
+
 	return decoder, nil
 }
 
@@ -93,3 +103,23 @@ func (d *FSPluginDecoder) Signature() (string, error) {
 func (d *FSPluginDecoder) CreateTime() (int64, error) {
 	return 0, nil
 }
+
+func (d *FSPluginDecoder) Manifest() (plugin_entities.PluginDeclaration, error) {
+	if d.pluginDeclaration != nil {
+		return *d.pluginDeclaration, nil
+	}
+
+	// read the manifest file
+	manifest, err := d.ReadFile("manifest.json")
+	if err != nil {
+		return plugin_entities.PluginDeclaration{}, err
+	}
+
+	dec, err := parser.UnmarshalJsonBytes[plugin_entities.PluginDeclaration](manifest)
+	if err != nil {
+		return plugin_entities.PluginDeclaration{}, err
+	}
+
+	d.pluginDeclaration = &dec
+	return dec, nil
+}

+ 27 - 0
internal/core/plugin_packager/decoder/zip.go

@@ -7,6 +7,7 @@ import (
 	"io/fs"
 	"path"
 
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
 )
 
@@ -18,6 +19,8 @@ type ZipPluginDecoder struct {
 
 	sig         string
 	create_time int64
+
+	pluginDeclaration *plugin_entities.PluginDeclaration
 }
 
 func NewZipPluginDecoder(binary []byte) (*ZipPluginDecoder, error) {
@@ -33,6 +36,10 @@ func NewZipPluginDecoder(binary []byte) (*ZipPluginDecoder, error) {
 		return nil, err
 	}
 
+	if _, err := decoder.Manifest(); err != nil {
+		return nil, err
+	}
+
 	return decoder, nil
 }
 
@@ -155,3 +162,23 @@ func (z *ZipPluginDecoder) CreateTime() (int64, error) {
 
 	return z.create_time, nil
 }
+
+func (z *ZipPluginDecoder) Manifest() (plugin_entities.PluginDeclaration, error) {
+	if z.pluginDeclaration != nil {
+		return *z.pluginDeclaration, nil
+	}
+
+	// read the manifest file
+	manifest, err := z.ReadFile("manifest.json")
+	if err != nil {
+		return plugin_entities.PluginDeclaration{}, err
+	}
+
+	dec, err := parser.UnmarshalJsonBytes[plugin_entities.PluginDeclaration](manifest)
+	if err != nil {
+		return plugin_entities.PluginDeclaration{}, err
+	}
+
+	z.pluginDeclaration = &dec
+	return dec, nil
+}

+ 2 - 0
internal/types/app/config.go

@@ -47,6 +47,8 @@ type Config struct {
 
 	DifyPluginServerlessConnectorURL    *string `envconfig:"DIFY_PLUGIN_SERVERLESS_CONNECTOR_URL"`
 	DifyPluginServerlessConnectorAPIKey *string `envconfig:"DIFY_PLUGIN_SERVERLESS_CONNECTOR_API_KEY"`
+
+	MaxPluginPackageSize int64 `envconfig:"MAX_PLUGIN_PACKAGE_SIZE" validate:"required"`
 }
 
 func (c *Config) Validate() error {

+ 3 - 0
internal/types/entities/runtime.go

@@ -55,6 +55,8 @@ type (
 		Wait() (<-chan bool, error)
 		// returns the runtime type of the plugin
 		Type() PluginRuntimeType
+		// Cleanup the plugin runtime
+		Cleanup()
 
 		// set the plugin to active
 		SetActive()
@@ -199,6 +201,7 @@ type PluginRuntimeState struct {
 	Restarts     int        `json:"restarts"`
 	Status       string     `json:"status"`
 	AbsolutePath string     `json:"absolute_path"`
+	WorkingPath  string     `json:"working_path"`
 	ActiveAt     *time.Time `json:"active_at"`
 	StoppedAt    *time.Time `json:"stopped_at"`
 	Verified     bool       `json:"verified"`