Yeuoly пре 11 месеци
родитељ
комит
c0596b96f2

+ 0 - 109
internal/core/aws/s3.go

@@ -1,109 +0,0 @@
-package aws
-
-import (
-	"context"
-	"io"
-
-	"github.com/aws/aws-sdk-go-v2/config"
-	"github.com/aws/aws-sdk-go-v2/credentials"
-	"github.com/aws/aws-sdk-go-v2/service/s3"
-	"github.com/aws/aws-sdk-go-v2/service/s3/types"
-	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/mapping"
-)
-
-var (
-	s3Client *s3.Client
-	s3Bucket *string
-)
-
-func InitS3(app *app.Config) {
-	// Check if required AWS S3 configuration is provided
-	if app.AWSS3Region == nil || app.AWSS3AccessKey == nil || app.AWSS3SecretKey == nil || app.AWSS3Bucket == nil {
-		log.Panic("AWSS3Region, AWSS3AccessKey, AWSS3SecretKey, and AWSS3Bucket must be set")
-	}
-
-	// Load AWS configuration with provided credentials
-	cfg, err := config.LoadDefaultConfig(
-		context.TODO(),
-		config.WithRegion(*app.AWSS3Region),
-		config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
-			*app.AWSS3AccessKey,
-			*app.AWSS3SecretKey,
-			"",
-		)),
-	)
-
-	// Handle error if AWS config loading fails
-	if err != nil {
-		log.Panic("Failed to load AWS S3 config: %v", err)
-	}
-
-	log.Info("AWS S3 config loaded")
-
-	// Create S3 client
-	s3Client = s3.NewFromConfig(cfg)
-
-	// Store S3 bucket name
-	s3Bucket = app.AWSS3Bucket
-
-	log.Info("AWS S3 client initialized successfully")
-}
-
-func StreamUploadToS3(ctx context.Context, key string, reader io.Reader) error {
-	_, err := s3Client.PutObject(ctx, &s3.PutObjectInput{
-		Bucket: s3Bucket,
-		Key:    &key,
-		Body:   reader,
-	})
-
-	return err
-}
-
-func StreamDownloadFromS3(ctx context.Context, key string) (io.ReadCloser, error) {
-	resp, err := s3Client.GetObject(ctx, &s3.GetObjectInput{
-		Bucket: s3Bucket,
-		Key:    &key,
-	})
-
-	if err != nil {
-		return nil, err
-	}
-
-	return resp.Body, nil
-}
-
-func DeleteFromS3(ctx context.Context, key string) error {
-	_, err := s3Client.DeleteObject(ctx, &s3.DeleteObjectInput{
-		Bucket: s3Bucket,
-		Key:    &key,
-	})
-
-	return err
-}
-
-func ListFromS3(ctx context.Context, prefix string) ([]string, error) {
-	resp, err := s3Client.ListObjects(ctx, &s3.ListObjectsInput{
-		Bucket: s3Bucket,
-		Prefix: &prefix,
-	})
-
-	if err != nil {
-		return nil, err
-	}
-
-	return mapping.MapArray(resp.Contents, func(obj types.Object) string {
-		if obj.Key != nil {
-			return *obj.Key
-		}
-		return ""
-	}), nil
-}
-
-func HeadObject(ctx context.Context, key string) (*s3.HeadObjectOutput, error) {
-	return s3Client.HeadObject(ctx, &s3.HeadObjectInput{
-		Bucket: s3Bucket,
-		Key:    &key,
-	})
-}

+ 4 - 0
internal/core/plugin_manager/local_manager/run.go

@@ -3,6 +3,7 @@ package local_manager
 import (
 	"errors"
 	"fmt"
+	"os"
 	"os/exec"
 	"sync"
 
@@ -39,6 +40,9 @@ func (r *LocalPluginRuntime) StartPlugin() error {
 	// start plugin
 	e := exec.Command("bash", r.Config.Execution.Launch)
 	e.Dir = r.State.AbsolutePath
+	// add env INSTALL_METHOD=local
+	e.Env = append(e.Env, "INSTALL_METHOD=local", "PATH="+os.Getenv("PATH"))
+
 	process.WrapProcess(e)
 
 	// get writer

+ 9 - 9
internal/core/plugin_manager/watcher.go

@@ -1,6 +1,7 @@
 package plugin_manager
 
 import (
+	"os"
 	"path"
 	"time"
 
@@ -8,7 +9,6 @@ import (
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/local_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/remote_manager"
-	"github.com/langgenius/dify-plugin-daemon/internal/storage"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
@@ -78,7 +78,7 @@ func (p *PluginManager) handleNewPlugins(config *app.Config) {
 func (p *PluginManager) loadNewPlugins(root_path string) <-chan entities.PluginRuntime {
 	ch := make(chan entities.PluginRuntime)
 
-	plugin_paths, err := storage.List(root_path)
+	plugins, err := os.ReadDir(root_path)
 	if err != nil {
 		log.Error("no plugin found in path: %s", root_path)
 		close(ch)
@@ -86,9 +86,9 @@ func (p *PluginManager) loadNewPlugins(root_path string) <-chan entities.PluginR
 	}
 
 	routine.Submit(func() {
-		for _, plugin_path := range plugin_paths {
-			if plugin_path.IsDir() {
-				configuration_path := path.Join(root_path, plugin_path.Name(), "manifest.yaml")
+		for _, plugin := range plugins {
+			if plugin.IsDir() {
+				configuration_path := path.Join(root_path, plugin.Name(), "manifest.yaml")
 				configuration, err := parsePluginConfig(configuration_path)
 				if err != nil {
 					log.Error("parse plugin config error: %v", err)
@@ -101,15 +101,15 @@ func (p *PluginManager) loadNewPlugins(root_path string) <-chan entities.PluginR
 				}
 
 				// check if .verified file exists
-				verified_path := path.Join(root_path, plugin_path.Name(), ".verified")
-				_, err = storage.Exists(verified_path)
+				verified_path := path.Join(root_path, plugin.Name(), ".verified")
+				_, err = os.Stat(verified_path)
 
 				ch <- entities.PluginRuntime{
 					Config: *configuration,
 					State: entities.PluginRuntimeState{
 						Status:       entities.PLUGIN_RUNTIME_STATUS_PENDING,
 						Restarts:     0,
-						AbsolutePath: path.Join(root_path, plugin_path.Name()),
+						AbsolutePath: path.Join(root_path, plugin.Name()),
 						ActiveAt:     nil,
 						Verified:     err == nil,
 					},
@@ -124,7 +124,7 @@ func (p *PluginManager) loadNewPlugins(root_path string) <-chan entities.PluginR
 }
 
 func parsePluginConfig(configuration_path string) (*plugin_entities.PluginDeclaration, error) {
-	text, err := storage.Read(configuration_path)
+	text, err := os.ReadFile(configuration_path)
 	if err != nil {
 		return nil, err
 	}

+ 4 - 8
internal/core/plugin_packager/decoder/fs.go

@@ -4,7 +4,7 @@ import (
 	"errors"
 	"io/fs"
 	"os"
-	"path"
+	"path/filepath"
 )
 
 var (
@@ -50,16 +50,12 @@ func (d *FSPluginDecoder) Open() error {
 }
 
 func (d *FSPluginDecoder) Walk(fn func(filename string, dir string) error) error {
-	return fs.WalkDir(d.fs, ".", func(root_path string, d fs.DirEntry, err error) error {
+	return filepath.Walk(d.root, func(path string, info fs.FileInfo, err error) error {
 		if err != nil {
 			return err
 		}
 
-		if d.IsDir() {
-			return nil
-		}
-
-		return fn(root_path, d.Name())
+		return fn(info.Name(), filepath.Dir(path))
 	})
 }
 
@@ -68,7 +64,7 @@ func (d *FSPluginDecoder) Close() error {
 }
 
 func (d *FSPluginDecoder) ReadFile(filename string) ([]byte, error) {
-	return os.ReadFile(path.Join(d.root, filename))
+	return os.ReadFile(filepath.Join(d.root, filename))
 }
 
 func (d *FSPluginDecoder) Signature() (string, error) {

+ 0 - 54
internal/storage/global.go

@@ -1,54 +0,0 @@
-package storage
-
-import (
-	"io"
-	"os"
-)
-
-var (
-	globalStorage FSOperator = &Local{}
-)
-
-func Read(path string) ([]byte, error) {
-	return globalStorage.Read(path)
-}
-
-func ReadStream(path string) (io.ReadCloser, error) {
-	return globalStorage.ReadStream(path)
-}
-
-func Write(path string, data []byte) error {
-	return globalStorage.Write(path, data)
-}
-
-func WriteStream(path string, data io.Reader) error {
-	return globalStorage.WriteStream(path, data)
-}
-
-func List(path string) ([]FileInfo, error) {
-	return globalStorage.List(path)
-}
-
-func Stat(path string) (FileInfo, error) {
-	return globalStorage.Stat(path)
-}
-
-func Delete(path string) error {
-	return globalStorage.Delete(path)
-}
-
-func Mkdir(path string, perm os.FileMode) error {
-	return globalStorage.Mkdir(path, perm)
-}
-
-func Rename(oldpath, newpath string) error {
-	return globalStorage.Rename(oldpath, newpath)
-}
-
-func Exists(path string) (bool, error) {
-	return globalStorage.Exists(path)
-}
-
-func SetGlobalStorage(storage FSOperator) {
-	globalStorage = storage
-}

+ 0 - 73
internal/storage/local.go

@@ -1,73 +0,0 @@
-package storage
-
-import (
-	"io"
-	"os"
-)
-
-type Local struct{}
-
-func (l *Local) Read(path string) ([]byte, error) {
-	return os.ReadFile(path)
-}
-
-func (l *Local) ReadStream(path string) (io.ReadCloser, error) {
-	return os.Open(path)
-}
-
-func (l *Local) Write(path string, data []byte) error {
-	return os.WriteFile(path, data, 0644)
-}
-
-func (l *Local) WriteStream(path string, data io.Reader) error {
-	file, err := os.Create(path)
-	if err != nil {
-		return err
-	}
-	defer file.Close()
-	_, err = io.Copy(file, data)
-	return err
-}
-
-func (l *Local) List(path string) ([]FileInfo, error) {
-	entries, err := os.ReadDir(path)
-	if err != nil {
-		return nil, err
-	}
-	file_infos := make([]FileInfo, len(entries))
-	for i, entry := range entries {
-		info, err := entry.Info()
-		if err != nil {
-			return nil, err
-		}
-		file_infos[i] = info
-	}
-	return file_infos, nil
-}
-
-func (l *Local) Stat(path string) (FileInfo, error) {
-	return os.Stat(path)
-}
-
-func (l *Local) Delete(path string) error {
-	return os.Remove(path)
-}
-
-func (l *Local) Mkdir(path string, perm os.FileMode) error {
-	return os.MkdirAll(path, perm)
-}
-
-func (l *Local) Rename(oldpath, newpath string) error {
-	return os.Rename(oldpath, newpath)
-}
-
-func (l *Local) Exists(path string) (bool, error) {
-	_, err := os.Stat(path)
-	if err == nil {
-		return true, nil
-	}
-	if os.IsNotExist(err) {
-		return false, nil
-	}
-	return false, err
-}

+ 0 - 118
internal/storage/s3.go

@@ -1,118 +0,0 @@
-package storage
-
-import (
-	"bytes"
-	"context"
-	"io"
-	"os"
-	"strings"
-	"time"
-
-	"github.com/langgenius/dify-plugin-daemon/internal/core/aws"
-)
-
-type S3 struct{}
-
-func (s *S3) Read(path string) ([]byte, error) {
-	reader, err := s.ReadStream(path)
-	if err != nil {
-		return nil, err
-	}
-	defer reader.Close()
-	return io.ReadAll(reader)
-}
-
-func (s *S3) ReadStream(path string) (io.ReadCloser, error) {
-	return aws.StreamDownloadFromS3(context.Background(), path)
-}
-
-func (s *S3) Write(path string, data []byte) error {
-	return aws.StreamUploadToS3(context.Background(), path, io.NopCloser(bytes.NewReader(data)))
-}
-
-func (s *S3) WriteStream(path string, data io.Reader) error {
-	return aws.StreamUploadToS3(context.Background(), path, data)
-}
-
-func (s *S3) List(path string) ([]FileInfo, error) {
-	keys, err := aws.ListFromS3(context.Background(), path)
-	if err != nil {
-		return nil, err
-	}
-
-	file_infos := make([]FileInfo, len(keys))
-	for i, key := range keys {
-		head, err := aws.HeadObject(context.Background(), key)
-		if err != nil {
-			return nil, err
-		}
-		is_dir := strings.HasSuffix(key, "/")
-		file_infos[i] = &s3FileInfo{
-			name:    strings.TrimSuffix(key, "/"),
-			size:    *head.ContentLength,
-			modTime: *head.LastModified,
-			isDir:   is_dir,
-		}
-	}
-	return file_infos, nil
-}
-
-func (s *S3) Stat(path string) (FileInfo, error) {
-	head, err := aws.HeadObject(context.Background(), path)
-	if err != nil {
-		return nil, err
-	}
-	return &s3FileInfo{
-		name:    path,
-		size:    *head.ContentLength,
-		modTime: *head.LastModified,
-	}, nil
-}
-
-func (s *S3) Delete(path string) error {
-	return aws.DeleteFromS3(context.Background(), path)
-}
-
-func (s *S3) Mkdir(path string, perm os.FileMode) error {
-	// S3 doesn't have directories, so this is a no-op
-	return nil
-}
-
-func (s *S3) Rename(oldpath, newpath string) error {
-	// S3 doesn't support rename directly, so we need to copy and delete
-	reader, err := s.ReadStream(oldpath)
-	if err != nil {
-		return err
-	}
-	defer reader.Close()
-
-	err = aws.StreamUploadToS3(context.Background(), newpath, reader)
-	if err != nil {
-		return err
-	}
-
-	return s.Delete(oldpath)
-}
-
-func (s *S3) Exists(path string) (bool, error) {
-	_, err := aws.HeadObject(context.Background(), path)
-	if err != nil {
-		// TODO: Check if error is specifically "not found" error
-		return false, nil
-	}
-	return true, nil
-}
-
-type s3FileInfo struct {
-	name    string
-	size    int64
-	modTime time.Time
-	isDir   bool
-}
-
-func (fi *s3FileInfo) Name() string       { return fi.name }
-func (fi *s3FileInfo) Size() int64        { return fi.size }
-func (fi *s3FileInfo) Mode() os.FileMode  { return 0 }
-func (fi *s3FileInfo) ModTime() time.Time { return fi.modTime }
-func (fi *s3FileInfo) IsDir() bool        { return fi.isDir }
-func (fi *s3FileInfo) Sys() interface{}   { return nil }

+ 0 - 71
internal/storage/storage.go

@@ -1,71 +0,0 @@
-package storage
-
-import (
-	"io"
-	"os"
-	"time"
-)
-
-// FileInfo represents information about a file
-type FileInfo interface {
-	Name() string
-	Size() int64
-	Mode() os.FileMode
-	ModTime() time.Time
-	IsDir() bool
-}
-
-// FSOperator defines the interface for basic file system operations
-type FSOperator interface {
-	// Read operations
-	Read(path string) ([]byte, error)
-	ReadStream(path string) (io.ReadCloser, error)
-
-	// Write operations
-	Write(path string, data []byte) error
-	WriteStream(path string, data io.Reader) error
-
-	// List operation
-	List(path string) ([]FileInfo, error)
-
-	// Get file info
-	Stat(path string) (FileInfo, error)
-
-	// Delete operation
-	Delete(path string) error
-
-	// Create directory
-	Mkdir(path string, perm os.FileMode) error
-
-	// Rename operation
-	Rename(oldpath, newpath string) error
-
-	// Check if file/directory exists
-	Exists(path string) (bool, error)
-}
-
-// FullFSOperator extends FSOperator with additional operations
-type FullFSOperator interface {
-	FSOperator
-
-	// Copy operation
-	Copy(src, dst string) error
-
-	// Move operation
-	Move(src, dst string) error
-
-	// Recursive delete
-	DeleteAll(path string) error
-
-	// Create file
-	Create(path string) (io.WriteCloser, error)
-
-	// Open file with specific flag and permission
-	OpenFile(path string, flag int, perm os.FileMode) (io.ReadWriteCloser, error)
-
-	// Get file checksum
-	Checksum(path string) (string, error)
-
-	// Watch for file changes
-	Watch(path string) (<-chan FileInfo, error)
-}

+ 1 - 7
internal/types/app/config.go

@@ -20,7 +20,7 @@ type Config struct {
 
 	PluginWebhookEnabled bool `envconfig:"PLUGIN_WEBHOOK_ENABLED"`
 
-	StoragePath        string `envconfig:"STORAGE_PATH"  validate:"required"`
+	StoragePath        string `envconfig:"STORAGE_PLUGIN_PATH"  validate:"required"`
 	ProcessCachingPath string `envconfig:"PROCESS_CACHING_PATH"  validate:"required"`
 
 	Platform PlatformType `envconfig:"PLATFORM" validate:"required"`
@@ -42,12 +42,6 @@ type Config struct {
 	AWSLambdaAccessKey *string `envconfig:"AWS_LAMBDA_ACCESS_KEY" validate:"omitempty"`
 	AWSLambdaSecretKey *string `envconfig:"AWS_LAMBDA_SECRET_KEY" validate:"omitempty"`
 
-	AWSS3Enabled   bool    `envconfig:"AWS_S3_ENABLED" validate:"omitempty"`
-	AWSS3AccessKey *string `envconfig:"AWS_S3_ACCESS_KEY" validate:"omitempty"`
-	AWSS3SecretKey *string `envconfig:"AWS_S3_SECRET_KEY" validate:"omitempty"`
-	AWSS3Region    *string `envconfig:"AWS_S3_REGION" validate:"omitempty"`
-	AWSS3Bucket    *string `envconfig:"AWS_S3_BUCKET" validate:"omitempty"`
-
 	LifetimeCollectionHeartbeatInterval int `envconfig:"LIFETIME_COLLECTION_HEARTBEAT_INTERVAL"  validate:"required"`
 	LifetimeCollectionGCInterval        int `envconfig:"LIFETIME_COLLECTION_GC_INTERVAL" validate:"required"`
 	LifetimeStateGCInterval             int `envconfig:"LIFETIME_STATE_GC_INTERVAL" validate:"required"`