Explorar o código

refactor: storage

Yeuoly hai 1 ano
pai
achega
6798809680

+ 1 - 0
internal/core/aws/docker.go

@@ -0,0 +1 @@
+package aws

+ 302 - 0
internal/core/aws/lambda.go

@@ -0,0 +1,302 @@
+package aws
+
+// This file contains functions for interacting with AWS Lambda
+// it take a docker image and push it to ECR, create a lambda function and deploy it
+// also, it will create a function url for the lambda function with auth enabled
+
+import (
+	"context"
+	"encoding/base64"
+	"fmt"
+	"strings"
+
+	"github.com/aws/aws-sdk-go-v2/aws"
+	"github.com/aws/aws-sdk-go-v2/config"
+	"github.com/aws/aws-sdk-go-v2/credentials"
+	"github.com/aws/aws-sdk-go-v2/service/ecr"
+	"github.com/aws/aws-sdk-go-v2/service/lambda"
+	lambdatypes "github.com/aws/aws-sdk-go-v2/service/lambda/types"
+	"github.com/aws/aws-sdk-go-v2/service/sts"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
+)
+
+var (
+	aws_lambda_config *aws.Config
+	lambda_client     *lambda.Client
+	lambda_account_id string
+)
+
+// InitLambda initializes the AWS configuration and validates the credentials
+// It takes a pointer to the app.Config struct as an argument
+func InitLambda(app *app.Config) {
+	// Check if required AWS Lambda configuration is provided
+	if app.AWSLambdaRegion == nil || app.AWSLambdaAccessKey == nil || app.AWSLambdaSecretKey == nil {
+		log.Panic("AWSLambdaRegion, AWSLambdaAccessKey, and AWSLambdaSecretKey must be set")
+	}
+
+	// Load AWS configuration with provided credentials
+	c, err := config.LoadDefaultConfig(
+		context.TODO(),
+		config.WithRegion(*app.AWSLambdaRegion),
+		config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
+			*app.AWSLambdaAccessKey,
+			*app.AWSLambdaSecretKey,
+			"",
+		)),
+	)
+
+	// Handle error if AWS config loading fails
+	if err != nil {
+		log.Panic("Failed to load AWS Lambda config: %v", err)
+	}
+
+	log.Info("AWS Lambda config loaded")
+
+	// Create STS client to validate AWS credentials
+	stsClient := sts.NewFromConfig(c)
+	identity, err := stsClient.GetCallerIdentity(context.TODO(), &sts.GetCallerIdentityInput{})
+	if err != nil {
+		log.Panic("Failed to validate AWS Lambda credentials: %v", err)
+	}
+
+	// Get the account ID
+	lambda_account_id = *identity.Account
+
+	// Create the Lambda client
+	lambda_client = lambda.NewFromConfig(c)
+
+	log.Info("AWS Lambda credentials validated successfully")
+
+	// Store the AWS configuration globally
+	aws_lambda_config = &c
+}
+
+type LambdaFunction struct {
+	FunctionName string
+	FunctionARN  string
+	FunctionURL  string
+}
+
+// PushImageToECR pushes a Docker image to ECR
+func PushImageToECR(ctx context.Context, plugin_runtime entities.PluginRuntimeInterface) (string, error) {
+	ecr_client := ecr.NewFromConfig(*aws_lambda_config)
+
+	// Create ECR repository if it doesn't exist
+	identity, err := plugin_runtime.Identity()
+	if err != nil {
+		return "", fmt.Errorf("failed to get plugin identity: %v", err)
+	}
+	image_name := fmt.Sprintf("dify-plugin-%s-%s", identity, plugin_runtime.Checksum())
+	repo_name := fmt.Sprintf("dify-plugin-%s", image_name)
+	_, err = ecr_client.CreateRepository(ctx, &ecr.CreateRepositoryInput{
+		RepositoryName: aws.String(repo_name),
+	})
+	if err != nil && !strings.Contains(err.Error(), "RepositoryAlreadyExistsException") {
+		return "", fmt.Errorf("failed to create ECR repository: %v", err)
+	}
+
+	// Get ECR authorization token
+	auth_output, err := ecr_client.GetAuthorizationToken(ctx, &ecr.GetAuthorizationTokenInput{})
+	if err != nil {
+		return "", fmt.Errorf("failed to get ECR authorization token: %v", err)
+	}
+
+	if len(auth_output.AuthorizationData) == 0 || auth_output.AuthorizationData[0].AuthorizationToken == nil {
+		return "", fmt.Errorf("invalid ECR authorization data")
+	}
+
+	auth_token, err := base64.StdEncoding.DecodeString(*auth_output.AuthorizationData[0].AuthorizationToken)
+	if err != nil {
+		return "", fmt.Errorf("failed to decode ECR authorization token: %v", err)
+	}
+
+	// Extract username and password from auth token
+	credentials := strings.SplitN(string(auth_token), ":", 2)
+	if len(credentials) != 2 {
+		return "", fmt.Errorf("invalid ECR credentials format")
+	}
+
+	// TODO: Use the extracted credentials to push the Docker image to ECR
+	// This step typically involves using a Docker client library or executing Docker CLI commands
+
+	if auth_output.AuthorizationData[0].ProxyEndpoint == nil {
+		return "", fmt.Errorf("invalid ECR proxy endpoint")
+	}
+
+	return fmt.Sprintf("%s/%s:latest", *auth_output.AuthorizationData[0].ProxyEndpoint, repo_name), nil
+}
+
+// CreateLambdaFunction creates a Lambda function from an ECR image
+func CreateLambdaFunction(ctx context.Context, plugin_runtime entities.PluginRuntimeInterface, image_uri string) (*LambdaFunction, error) {
+	function_name := fmt.Sprintf("dify-plugin-%s", plugin_runtime.Checksum())
+
+	// Get or create the lambda execution role
+	role_arn, err := getOrCreateLambdaExecutionRole(ctx)
+	if err != nil {
+		return nil, fmt.Errorf("failed to get or create Lambda execution role: %v", err)
+	}
+
+	create_output, err := lambda_client.CreateFunction(ctx, &lambda.CreateFunctionInput{
+		FunctionName: aws.String(function_name),
+		Role:         aws.String(role_arn),
+		PackageType:  lambdatypes.PackageTypeImage,
+		Code: &lambdatypes.FunctionCode{
+			ImageUri: aws.String(image_uri),
+		},
+	})
+	if err != nil {
+		return nil, fmt.Errorf("failed to create Lambda function: %v", err)
+	}
+
+	if create_output.FunctionArn == nil {
+		return nil, fmt.Errorf("invalid Lambda function creation output")
+	}
+
+	// Create function URL
+	url_output, err := lambda_client.CreateFunctionUrlConfig(ctx, &lambda.CreateFunctionUrlConfigInput{
+		FunctionName: aws.String(function_name),
+		AuthType:     lambdatypes.FunctionUrlAuthTypeAwsIam,
+	})
+	if err != nil {
+		return nil, fmt.Errorf("failed to create function URL: %v", err)
+	}
+
+	if url_output.FunctionUrl == nil {
+		return nil, fmt.Errorf("invalid function URL creation output")
+	}
+
+	return &LambdaFunction{
+		FunctionName: function_name,
+		FunctionARN:  *create_output.FunctionArn,
+		FunctionURL:  *url_output.FunctionUrl,
+	}, nil
+}
+
+// ListLambdaFunctions lists all Lambda functions with the "dify-plugin-" prefix
+func ListLambdaFunctions(ctx context.Context) ([]*LambdaFunction, error) {
+	var functions []*LambdaFunction
+	var marker *string
+
+	for {
+		output, err := lambda_client.ListFunctions(ctx, &lambda.ListFunctionsInput{
+			Marker: marker,
+		})
+		if err != nil {
+			return nil, fmt.Errorf("failed to list Lambda functions: %v", err)
+		}
+
+		for _, f := range output.Functions {
+			if f.FunctionName == nil || f.FunctionArn == nil {
+				continue
+			}
+			if strings.HasPrefix(*f.FunctionName, "dify-plugin-") {
+				url_output, err := lambda_client.GetFunctionUrlConfig(ctx, &lambda.GetFunctionUrlConfigInput{
+					FunctionName: f.FunctionName,
+				})
+				if err != nil {
+					return nil, fmt.Errorf("failed to get function URL for %s: %v", *f.FunctionName, err)
+				}
+
+				if url_output.FunctionUrl == nil {
+					return nil, fmt.Errorf("invalid function URL output for %s", *f.FunctionName)
+				}
+
+				functions = append(functions, &LambdaFunction{
+					FunctionName: *f.FunctionName,
+					FunctionARN:  *f.FunctionArn,
+					FunctionURL:  *url_output.FunctionUrl,
+				})
+			}
+		}
+
+		if output.NextMarker == nil {
+			break
+		}
+		marker = output.NextMarker
+	}
+
+	return functions, nil
+}
+
+// GetLambdaFunction retrieves a specific Lambda function by its checksum
+func GetLambdaFunction(ctx context.Context, identity string, checksum string) (*LambdaFunction, error) {
+	function_name := fmt.Sprintf("dify-plugin-%s-%s", identity, checksum)
+
+	output, err := lambda_client.GetFunction(ctx, &lambda.GetFunctionInput{
+		FunctionName: aws.String(function_name),
+	})
+	if err != nil {
+		return nil, fmt.Errorf("failed to get Lambda function: %v", err)
+	}
+
+	if output.Configuration == nil || output.Configuration.FunctionName == nil || output.Configuration.FunctionArn == nil {
+		return nil, fmt.Errorf("invalid GetFunction output")
+	}
+
+	url_output, err := lambda_client.GetFunctionUrlConfig(ctx, &lambda.GetFunctionUrlConfigInput{
+		FunctionName: aws.String(function_name),
+	})
+	if err != nil {
+		return nil, fmt.Errorf("failed to get function URL: %v", err)
+	}
+
+	if url_output.FunctionUrl == nil {
+		return nil, fmt.Errorf("invalid function URL output")
+	}
+
+	return &LambdaFunction{
+		FunctionName: *output.Configuration.FunctionName,
+		FunctionARN:  *output.Configuration.FunctionArn,
+		FunctionURL:  *url_output.FunctionUrl,
+	}, nil
+}
+
+// UpdateLambdaFunction updates an existing Lambda function with a new image
+func UpdateLambdaFunction(ctx context.Context, plugin_runtime entities.PluginRuntimeInterface, image_uri string) error {
+	// Get the function name
+	identity, err := plugin_runtime.Identity()
+	if err != nil {
+		return fmt.Errorf("failed to get plugin identity: %v", err)
+	}
+	function_name := fmt.Sprintf("dify-plugin-%s-%s", identity, plugin_runtime.Checksum())
+
+	_, err = lambda_client.UpdateFunctionCode(ctx, &lambda.UpdateFunctionCodeInput{
+		FunctionName: aws.String(function_name),
+		ImageUri:     aws.String(image_uri),
+	})
+	if err != nil {
+		return fmt.Errorf("failed to update Lambda function: %v", err)
+	}
+
+	return nil
+}
+
+// DeleteLambdaFunction deletes a Lambda function and its associated function URL
+func DeleteLambdaFunction(ctx context.Context, plugin_runtime entities.PluginRuntimeInterface) error {
+	// Get the function name
+	identity, err := plugin_runtime.Identity()
+	if err != nil {
+		return fmt.Errorf("failed to get plugin identity: %v", err)
+	}
+	function_name := fmt.Sprintf("dify-plugin-%s-%s", identity, plugin_runtime.Checksum())
+
+	// Delete function URL
+	_, err = lambda_client.DeleteFunctionUrlConfig(ctx, &lambda.DeleteFunctionUrlConfigInput{
+		FunctionName: aws.String(function_name),
+	})
+	if err != nil {
+		return fmt.Errorf("failed to delete function URL: %v", err)
+	}
+
+	// Delete Lambda function
+	_, err = lambda_client.DeleteFunction(ctx, &lambda.DeleteFunctionInput{
+		FunctionName: aws.String(function_name),
+	})
+	if err != nil {
+		return fmt.Errorf("failed to delete Lambda function: %v", err)
+	}
+
+	return nil
+}

+ 71 - 0
internal/core/aws/role.go

@@ -0,0 +1,71 @@
+package aws
+
+import (
+	"context"
+	"errors"
+	"fmt"
+
+	"github.com/aws/aws-sdk-go-v2/aws"
+	"github.com/aws/aws-sdk-go-v2/service/iam"
+)
+
+const (
+	DIFY_PLUGIN_LAMBDA_EXECUTION_ROLE = "dify-plugin-lambda-execution-role"
+)
+
+// getOrCreateLambdaExecutionRole creates a new lambda execution role if it doesn't exist
+// or returns the existing role's ARN
+func getOrCreateLambdaExecutionRole(ctx context.Context) (string, error) {
+	iam_client := iam.NewFromConfig(*aws_lambda_config)
+
+	// Check if the role already exists
+	_, err := iam_client.GetRole(ctx, &iam.GetRoleInput{
+		RoleName: aws.String(DIFY_PLUGIN_LAMBDA_EXECUTION_ROLE),
+	})
+
+	if err == nil {
+		// Role already exists, return its ARN
+		return fmt.Sprintf("arn:aws:iam::%s:role/%s", lambda_account_id, DIFY_PLUGIN_LAMBDA_EXECUTION_ROLE), nil
+	}
+
+	// Create the role if it doesn't exist
+	assume_role_policy_document := `{
+		"Version": "2012-10-17",
+		"Statement": [
+			{
+				"Effect": "Allow",
+				"Principal": {
+					"Service": "lambda.amazonaws.com"
+				},
+				"Action": "sts:AssumeRole"
+			}
+		]
+	}`
+
+	create_role_output, err := iam_client.CreateRole(ctx, &iam.CreateRoleInput{
+		RoleName:                 aws.String(DIFY_PLUGIN_LAMBDA_EXECUTION_ROLE),
+		AssumeRolePolicyDocument: aws.String(assume_role_policy_document),
+	})
+	if err != nil {
+		return "", err
+	}
+
+	// Attach the AWSLambdaBasicExecutionRole policy
+	_, err = iam_client.AttachRolePolicy(ctx, &iam.AttachRolePolicyInput{
+		RoleName:  aws.String(DIFY_PLUGIN_LAMBDA_EXECUTION_ROLE),
+		PolicyArn: aws.String("arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole"),
+	})
+	if err != nil {
+		// Delete the role if the policy attachment fails
+		_, err1 := iam_client.DeleteRole(ctx, &iam.DeleteRoleInput{
+			RoleName: aws.String(DIFY_PLUGIN_LAMBDA_EXECUTION_ROLE),
+		})
+		if err1 != nil {
+			return "", errors.Join(err, err1)
+		}
+
+		return "", err
+	}
+
+	return *create_role_output.Role.Arn, nil
+}

+ 109 - 0
internal/core/aws/s3.go

@@ -0,0 +1,109 @@
+package aws
+
+import (
+	"context"
+	"io"
+
+	"github.com/aws/aws-sdk-go-v2/config"
+	"github.com/aws/aws-sdk-go-v2/credentials"
+	"github.com/aws/aws-sdk-go-v2/service/s3"
+	"github.com/aws/aws-sdk-go-v2/service/s3/types"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/mapping"
+)
+
+var (
+	s3Client *s3.Client
+	s3Bucket *string
+)
+
+func InitS3(app *app.Config) {
+	// Check if required AWS S3 configuration is provided
+	if app.AWSS3Region == nil || app.AWSS3AccessKey == nil || app.AWSS3SecretKey == nil || app.AWSS3Bucket == nil {
+		log.Panic("AWSS3Region, AWSS3AccessKey, AWSS3SecretKey, and AWSS3Bucket must be set")
+	}
+
+	// Load AWS configuration with provided credentials
+	cfg, err := config.LoadDefaultConfig(
+		context.TODO(),
+		config.WithRegion(*app.AWSS3Region),
+		config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(
+			*app.AWSS3AccessKey,
+			*app.AWSS3SecretKey,
+			"",
+		)),
+	)
+
+	// Handle error if AWS config loading fails
+	if err != nil {
+		log.Panic("Failed to load AWS S3 config: %v", err)
+	}
+
+	log.Info("AWS S3 config loaded")
+
+	// Create S3 client
+	s3Client = s3.NewFromConfig(cfg)
+
+	// Store S3 bucket name
+	s3Bucket = app.AWSS3Bucket
+
+	log.Info("AWS S3 client initialized successfully")
+}
+
+func StreamUploadToS3(ctx context.Context, key string, reader io.Reader) error {
+	_, err := s3Client.PutObject(ctx, &s3.PutObjectInput{
+		Bucket: s3Bucket,
+		Key:    &key,
+		Body:   reader,
+	})
+
+	return err
+}
+
+func StreamDownloadFromS3(ctx context.Context, key string) (io.ReadCloser, error) {
+	resp, err := s3Client.GetObject(ctx, &s3.GetObjectInput{
+		Bucket: s3Bucket,
+		Key:    &key,
+	})
+
+	if err != nil {
+		return nil, err
+	}
+
+	return resp.Body, nil
+}
+
+func DeleteFromS3(ctx context.Context, key string) error {
+	_, err := s3Client.DeleteObject(ctx, &s3.DeleteObjectInput{
+		Bucket: s3Bucket,
+		Key:    &key,
+	})
+
+	return err
+}
+
+func ListFromS3(ctx context.Context, prefix string) ([]string, error) {
+	resp, err := s3Client.ListObjects(ctx, &s3.ListObjectsInput{
+		Bucket: s3Bucket,
+		Prefix: &prefix,
+	})
+
+	if err != nil {
+		return nil, err
+	}
+
+	return mapping.MapArray(resp.Contents, func(obj types.Object) string {
+		if obj.Key != nil {
+			return *obj.Key
+		}
+		return ""
+	}), nil
+}
+
+func HeadObject(ctx context.Context, key string) (*s3.HeadObjectOutput, error) {
+	return s3Client.HeadObject(ctx, &s3.HeadObjectInput{
+		Bucket: s3Bucket,
+		Key:    &key,
+	})
+}

+ 5 - 1
internal/core/plugin_manager/aws_manager/type.go

@@ -1,7 +1,11 @@
 package aws_manager
 
-import "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+import (
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+)
 
 type AWSPluginRuntime struct {
+	positive_manager.PositivePluginRuntime
 	entities.PluginRuntime
 }

+ 1 - 1
internal/core/plugin_manager/local_manager/environment.go

@@ -127,7 +127,7 @@ func (r *LocalPluginRuntime) InitEnvironment() error {
 }
 
 func (r *LocalPluginRuntime) calculateChecksum() string {
-	plugin_decoder, err := decoder.NewFSPluginDecoder(r.CWD)
+	plugin_decoder, err := decoder.NewFSPluginDecoder(r.LocalPath)
 	if err != nil {
 		return ""
 	}

+ 6 - 6
internal/core/plugin_manager/local_manager/run.go

@@ -17,14 +17,14 @@ func (r *LocalPluginRuntime) gc() {
 		RemoveStdio(r.io_identity)
 	}
 
-	if r.w != nil {
-		close(r.w)
-		r.w = nil
+	if r.wait_chan != nil {
+		close(r.wait_chan)
+		r.wait_chan = nil
 	}
 }
 
 func (r *LocalPluginRuntime) init() {
-	r.w = make(chan bool)
+	r.wait_chan = make(chan bool)
 	r.SetLaunching()
 }
 
@@ -120,8 +120,8 @@ func (r *LocalPluginRuntime) StartPlugin() error {
 }
 
 func (r *LocalPluginRuntime) Wait() (<-chan bool, error) {
-	if r.w == nil {
+	if r.wait_chan == nil {
 		return nil, errors.New("plugin not started")
 	}
-	return r.w, nil
+	return r.wait_chan, nil
 }

+ 7 - 5
internal/core/plugin_manager/local_manager/type.go

@@ -1,13 +1,15 @@
 package local_manager
 
-import "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+import (
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+)
 
 type LocalPluginRuntime struct {
+	positive_manager.PositivePluginRuntime
 	entities.PluginRuntime
-	CWD string
 
+	wait_chan   chan bool
 	io_identity string
-	w           chan bool
-
-	checksum string
+	checksum    string
 }

+ 13 - 0
internal/core/plugin_manager/positive_manager/types.go

@@ -0,0 +1,13 @@
+package positive_manager
+
+import (
+	"errors"
+)
+
+type PositivePluginRuntime struct {
+	LocalPath string
+}
+
+func (r *PositivePluginRuntime) DockerImage() (string, error) {
+	return "", errors.New("not implemented")
+}

+ 5 - 0
internal/core/plugin_manager/remote_manager/type.go

@@ -1,6 +1,7 @@
 package remote_manager
 
 import (
+	"errors"
 	"strings"
 	"sync"
 	"time"
@@ -52,6 +53,10 @@ func (r *RemotePluginRuntime) Identity() (string, error) {
 	return strings.Join([]string{r.Configuration().Identity(), r.tenant_id}, ":"), nil
 }
 
+func (r *RemotePluginRuntime) DockerImage() (string, error) {
+	return "", errors.New("not implemented")
+}
+
 // Listen creates a new listener for the given session_id
 // session id is an unique identifier for a request
 func (r *RemotePluginRuntime) addCallback(session_id string, fn func([]byte)) {

+ 16 - 10
internal/core/plugin_manager/watcher.go

@@ -1,13 +1,14 @@
 package plugin_manager
 
 import (
-	"os"
 	"path"
 	"time"
 
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/aws_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/local_manager"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/remote_manager"
+	"github.com/langgenius/dify-plugin-daemon/internal/storage"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
@@ -51,11 +52,16 @@ func (p *PluginManager) handleNewPlugins(config *app.Config) {
 		if config.Platform == app.PLATFORM_AWS_LAMBDA {
 			plugin_interface = &aws_manager.AWSPluginRuntime{
 				PluginRuntime: plugin,
+				PositivePluginRuntime: positive_manager.PositivePluginRuntime{
+					LocalPath: plugin.State.AbsolutePath,
+				},
 			}
 		} else if config.Platform == app.PLATFORM_LOCAL {
 			plugin_interface = &local_manager.LocalPluginRuntime{
 				PluginRuntime: plugin,
-				CWD:           plugin.State.AbsolutePath,
+				PositivePluginRuntime: positive_manager.PositivePluginRuntime{
+					LocalPath: plugin.State.AbsolutePath,
+				},
 			}
 		} else {
 			log.Error("unsupported platform: %s for plugin: %s", config.Platform, plugin.Config.Name)
@@ -72,7 +78,7 @@ func (p *PluginManager) handleNewPlugins(config *app.Config) {
 func (p *PluginManager) loadNewPlugins(root_path string) <-chan entities.PluginRuntime {
 	ch := make(chan entities.PluginRuntime)
 
-	plugins, err := os.ReadDir(root_path)
+	plugin_paths, err := storage.List(root_path)
 	if err != nil {
 		log.Error("no plugin found in path: %s", root_path)
 		close(ch)
@@ -80,9 +86,9 @@ func (p *PluginManager) loadNewPlugins(root_path string) <-chan entities.PluginR
 	}
 
 	routine.Submit(func() {
-		for _, plugin := range plugins {
-			if plugin.IsDir() {
-				configuration_path := path.Join(root_path, plugin.Name(), "manifest.yaml")
+		for _, plugin_path := range plugin_paths {
+			if plugin_path.IsDir() {
+				configuration_path := path.Join(root_path, plugin_path.Name(), "manifest.yaml")
 				configuration, err := parsePluginConfig(configuration_path)
 				if err != nil {
 					log.Error("parse plugin config error: %v", err)
@@ -95,15 +101,15 @@ func (p *PluginManager) loadNewPlugins(root_path string) <-chan entities.PluginR
 				}
 
 				// check if .verified file exists
-				verified_path := path.Join(root_path, plugin.Name(), ".verified")
-				_, err = os.Stat(verified_path)
+				verified_path := path.Join(root_path, plugin_path.Name(), ".verified")
+				_, err = storage.Exists(verified_path)
 
 				ch <- entities.PluginRuntime{
 					Config: *configuration,
 					State: entities.PluginRuntimeState{
 						Status:       entities.PLUGIN_RUNTIME_STATUS_PENDING,
 						Restarts:     0,
-						AbsolutePath: path.Join(root_path, plugin.Name()),
+						AbsolutePath: path.Join(root_path, plugin_path.Name()),
 						ActiveAt:     nil,
 						Verified:     err == nil,
 					},
@@ -118,7 +124,7 @@ func (p *PluginManager) loadNewPlugins(root_path string) <-chan entities.PluginR
 }
 
 func parsePluginConfig(configuration_path string) (*plugin_entities.PluginDeclaration, error) {
-	text, err := os.ReadFile(configuration_path)
+	text, err := storage.Read(configuration_path)
 	if err != nil {
 		return nil, err
 	}

+ 6 - 0
internal/server/server.go

@@ -2,6 +2,7 @@ package server
 
 import (
 	"github.com/langgenius/dify-plugin-daemon/internal/cluster"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/aws"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/db"
 	"github.com/langgenius/dify-plugin-daemon/internal/process"
@@ -15,6 +16,11 @@ func (a *App) Run(config *app.Config) {
 	// init routine pool
 	routine.InitPool(config.RoutinePoolSize)
 
+	// init aws
+	if config.Platform == app.PLATFORM_AWS_LAMBDA {
+		aws.InitLambda(config)
+	}
+
 	// init db
 	db.Init(config)
 

+ 54 - 0
internal/storage/global.go

@@ -0,0 +1,54 @@
+package storage
+
+import (
+	"io"
+	"os"
+)
+
+var (
+	globalStorage FSOperator = &Local{}
+)
+
+func Read(path string) ([]byte, error) {
+	return globalStorage.Read(path)
+}
+
+func ReadStream(path string) (io.ReadCloser, error) {
+	return globalStorage.ReadStream(path)
+}
+
+func Write(path string, data []byte) error {
+	return globalStorage.Write(path, data)
+}
+
+func WriteStream(path string, data io.Reader) error {
+	return globalStorage.WriteStream(path, data)
+}
+
+func List(path string) ([]FileInfo, error) {
+	return globalStorage.List(path)
+}
+
+func Stat(path string) (FileInfo, error) {
+	return globalStorage.Stat(path)
+}
+
+func Delete(path string) error {
+	return globalStorage.Delete(path)
+}
+
+func Mkdir(path string, perm os.FileMode) error {
+	return globalStorage.Mkdir(path, perm)
+}
+
+func Rename(oldpath, newpath string) error {
+	return globalStorage.Rename(oldpath, newpath)
+}
+
+func Exists(path string) (bool, error) {
+	return globalStorage.Exists(path)
+}
+
+func SetGlobalStorage(storage FSOperator) {
+	globalStorage = storage
+}

+ 73 - 0
internal/storage/local.go

@@ -0,0 +1,73 @@
+package storage
+
+import (
+	"io"
+	"os"
+)
+
+type Local struct{}
+
+func (l *Local) Read(path string) ([]byte, error) {
+	return os.ReadFile(path)
+}
+
+func (l *Local) ReadStream(path string) (io.ReadCloser, error) {
+	return os.Open(path)
+}
+
+func (l *Local) Write(path string, data []byte) error {
+	return os.WriteFile(path, data, 0644)
+}
+
+func (l *Local) WriteStream(path string, data io.Reader) error {
+	file, err := os.Create(path)
+	if err != nil {
+		return err
+	}
+	defer file.Close()
+	_, err = io.Copy(file, data)
+	return err
+}
+
+func (l *Local) List(path string) ([]FileInfo, error) {
+	entries, err := os.ReadDir(path)
+	if err != nil {
+		return nil, err
+	}
+	file_infos := make([]FileInfo, len(entries))
+	for i, entry := range entries {
+		info, err := entry.Info()
+		if err != nil {
+			return nil, err
+		}
+		file_infos[i] = info
+	}
+	return file_infos, nil
+}
+
+func (l *Local) Stat(path string) (FileInfo, error) {
+	return os.Stat(path)
+}
+
+func (l *Local) Delete(path string) error {
+	return os.Remove(path)
+}
+
+func (l *Local) Mkdir(path string, perm os.FileMode) error {
+	return os.MkdirAll(path, perm)
+}
+
+func (l *Local) Rename(oldpath, newpath string) error {
+	return os.Rename(oldpath, newpath)
+}
+
+func (l *Local) Exists(path string) (bool, error) {
+	_, err := os.Stat(path)
+	if err == nil {
+		return true, nil
+	}
+	if os.IsNotExist(err) {
+		return false, nil
+	}
+	return false, err
+}

+ 118 - 0
internal/storage/s3.go

@@ -0,0 +1,118 @@
+package storage
+
+import (
+	"bytes"
+	"context"
+	"io"
+	"os"
+	"strings"
+	"time"
+
+	"github.com/langgenius/dify-plugin-daemon/internal/core/aws"
+)
+
+type S3 struct{}
+
+func (s *S3) Read(path string) ([]byte, error) {
+	reader, err := s.ReadStream(path)
+	if err != nil {
+		return nil, err
+	}
+	defer reader.Close()
+	return io.ReadAll(reader)
+}
+
+func (s *S3) ReadStream(path string) (io.ReadCloser, error) {
+	return aws.StreamDownloadFromS3(context.Background(), path)
+}
+
+func (s *S3) Write(path string, data []byte) error {
+	return aws.StreamUploadToS3(context.Background(), path, io.NopCloser(bytes.NewReader(data)))
+}
+
+func (s *S3) WriteStream(path string, data io.Reader) error {
+	return aws.StreamUploadToS3(context.Background(), path, data)
+}
+
+func (s *S3) List(path string) ([]FileInfo, error) {
+	keys, err := aws.ListFromS3(context.Background(), path)
+	if err != nil {
+		return nil, err
+	}
+
+	file_infos := make([]FileInfo, len(keys))
+	for i, key := range keys {
+		head, err := aws.HeadObject(context.Background(), key)
+		if err != nil {
+			return nil, err
+		}
+		is_dir := strings.HasSuffix(key, "/")
+		file_infos[i] = &s3FileInfo{
+			name:    strings.TrimSuffix(key, "/"),
+			size:    *head.ContentLength,
+			modTime: *head.LastModified,
+			isDir:   is_dir,
+		}
+	}
+	return file_infos, nil
+}
+
+func (s *S3) Stat(path string) (FileInfo, error) {
+	head, err := aws.HeadObject(context.Background(), path)
+	if err != nil {
+		return nil, err
+	}
+	return &s3FileInfo{
+		name:    path,
+		size:    *head.ContentLength,
+		modTime: *head.LastModified,
+	}, nil
+}
+
+func (s *S3) Delete(path string) error {
+	return aws.DeleteFromS3(context.Background(), path)
+}
+
+func (s *S3) Mkdir(path string, perm os.FileMode) error {
+	// S3 doesn't have directories, so this is a no-op
+	return nil
+}
+
+func (s *S3) Rename(oldpath, newpath string) error {
+	// S3 doesn't support rename directly, so we need to copy and delete
+	reader, err := s.ReadStream(oldpath)
+	if err != nil {
+		return err
+	}
+	defer reader.Close()
+
+	err = aws.StreamUploadToS3(context.Background(), newpath, reader)
+	if err != nil {
+		return err
+	}
+
+	return s.Delete(oldpath)
+}
+
+func (s *S3) Exists(path string) (bool, error) {
+	_, err := aws.HeadObject(context.Background(), path)
+	if err != nil {
+		// TODO: Check if error is specifically "not found" error
+		return false, nil
+	}
+	return true, nil
+}
+
+type s3FileInfo struct {
+	name    string
+	size    int64
+	modTime time.Time
+	isDir   bool
+}
+
+func (fi *s3FileInfo) Name() string       { return fi.name }
+func (fi *s3FileInfo) Size() int64        { return fi.size }
+func (fi *s3FileInfo) Mode() os.FileMode  { return 0 }
+func (fi *s3FileInfo) ModTime() time.Time { return fi.modTime }
+func (fi *s3FileInfo) IsDir() bool        { return fi.isDir }
+func (fi *s3FileInfo) Sys() interface{}   { return nil }

+ 71 - 0
internal/storage/storage.go

@@ -0,0 +1,71 @@
+package storage
+
+import (
+	"io"
+	"os"
+	"time"
+)
+
+// FileInfo represents information about a file
+type FileInfo interface {
+	Name() string
+	Size() int64
+	Mode() os.FileMode
+	ModTime() time.Time
+	IsDir() bool
+}
+
+// FSOperator defines the interface for basic file system operations
+type FSOperator interface {
+	// Read operations
+	Read(path string) ([]byte, error)
+	ReadStream(path string) (io.ReadCloser, error)
+
+	// Write operations
+	Write(path string, data []byte) error
+	WriteStream(path string, data io.Reader) error
+
+	// List operation
+	List(path string) ([]FileInfo, error)
+
+	// Get file info
+	Stat(path string) (FileInfo, error)
+
+	// Delete operation
+	Delete(path string) error
+
+	// Create directory
+	Mkdir(path string, perm os.FileMode) error
+
+	// Rename operation
+	Rename(oldpath, newpath string) error
+
+	// Check if file/directory exists
+	Exists(path string) (bool, error)
+}
+
+// FullFSOperator extends FSOperator with additional operations
+type FullFSOperator interface {
+	FSOperator
+
+	// Copy operation
+	Copy(src, dst string) error
+
+	// Move operation
+	Move(src, dst string) error
+
+	// Recursive delete
+	DeleteAll(path string) error
+
+	// Create file
+	Create(path string) (io.WriteCloser, error)
+
+	// Open file with specific flag and permission
+	OpenFile(path string, flag int, perm os.FileMode) (io.ReadWriteCloser, error)
+
+	// Get file checksum
+	Checksum(path string) (string, error)
+
+	// Watch for file changes
+	Watch(path string) (<-chan FileInfo, error)
+}

+ 10 - 0
internal/types/app/config.go

@@ -38,6 +38,16 @@ type Config struct {
 	DBDatabase string `envconfig:"DB_DATABASE" validate:"required"`
 	DBSslMode  string `envconfig:"DB_SSL_MODE" validate:"required,oneof=disable require"`
 
+	AWSLambdaRegion    *string `envconfig:"AWS_LAMBDA_REGION" validate:"omitempty"`
+	AWSLambdaAccessKey *string `envconfig:"AWS_LAMBDA_ACCESS_KEY" validate:"omitempty"`
+	AWSLambdaSecretKey *string `envconfig:"AWS_LAMBDA_SECRET_KEY" validate:"omitempty"`
+
+	AWSS3Enabled   bool    `envconfig:"AWS_S3_ENABLED" validate:"omitempty"`
+	AWSS3AccessKey *string `envconfig:"AWS_S3_ACCESS_KEY" validate:"omitempty"`
+	AWSS3SecretKey *string `envconfig:"AWS_S3_SECRET_KEY" validate:"omitempty"`
+	AWSS3Region    *string `envconfig:"AWS_S3_REGION" validate:"omitempty"`
+	AWSS3Bucket    *string `envconfig:"AWS_S3_BUCKET" validate:"omitempty"`
+
 	LifetimeCollectionHeartbeatInterval int `envconfig:"LIFETIME_COLLECTION_HEARTBEAT_INTERVAL"  validate:"required"`
 	LifetimeCollectionGCInterval        int `envconfig:"LIFETIME_COLLECTION_GC_INTERVAL" validate:"required"`
 	LifetimeStateGCInterval             int `envconfig:"LIFETIME_STATE_GC_INTERVAL" validate:"required"`

+ 5 - 0
internal/types/entities/runtime.go

@@ -21,6 +21,7 @@ type (
 	PluginRuntimeInterface interface {
 		PluginRuntimeTimeLifeInterface
 		PluginRuntimeSessionIOInterface
+		PluginRuntimeDockerInterface
 	}
 
 	PluginRuntimeTimeLifeInterface interface {
@@ -73,6 +74,10 @@ type (
 		Listen(session_id string) *BytesIOListener
 		Write(session_id string, data []byte)
 	}
+
+	PluginRuntimeDockerInterface interface {
+		DockerImage() (string, error)
+	}
 )
 
 func (r *PluginRuntime) Stopped() bool {

+ 9 - 0
internal/utils/mapping/map.go

@@ -0,0 +1,9 @@
+package mapping
+
+func MapArray[T any, R any](arr []T, mapFunc func(T) R) []R {
+	result := make([]R, len(arr))
+	for i, v := range arr {
+		result[i] = mapFunc(v)
+	}
+	return result
+}