瀏覽代碼

Revert: refactor: serverless connector This reverts commit b413e6c96546cc50b1dbc94731d24f5c51e7a455.

Yeuoly 6 月之前
父節點
當前提交
d08d78e8b2

+ 11 - 12
internal/core/plugin_manager/install_to_serverless.go

@@ -13,7 +13,6 @@ import (
 
 // InstallToAWSFromPkg installs a plugin to AWS Lambda
 func (p *PluginManager) InstallToAWSFromPkg(
-	originalPackager []byte,
 	decoder decoder.PluginDecoder,
 	source string,
 	meta map[string]any,
@@ -33,7 +32,7 @@ func (p *PluginManager) InstallToAWSFromPkg(
 		return nil, err
 	}
 
-	response, err := serverless.LaunchPlugin(originalPackager, decoder)
+	response, err := serverless.UploadPlugin(decoder)
 	if err != nil {
 		return nil, err
 	}
@@ -50,17 +49,17 @@ func (p *PluginManager) InstallToAWSFromPkg(
 			newResponse.Close()
 		}()
 
-		functionUrl := ""
-		functionName := ""
+		lambdaUrl := ""
+		lambdaFunctionName := ""
 
-		response.Async(func(r serverless.LaunchFunctionResponse) {
+		response.Async(func(r serverless.LaunchAWSLambdaFunctionResponse) {
 			if r.Event == serverless.Info {
 				newResponse.Write(PluginInstallResponse{
 					Event: PluginInstallEventInfo,
 					Data:  "Installing...",
 				})
 			} else if r.Event == serverless.Done {
-				if functionUrl == "" || functionName == "" {
+				if lambdaUrl == "" || lambdaFunctionName == "" {
 					newResponse.Write(PluginInstallResponse{
 						Event: PluginInstallEventError,
 						Data:  "Internal server error, failed to get lambda url or function name",
@@ -77,8 +76,8 @@ func (p *PluginManager) InstallToAWSFromPkg(
 					serverlessModel := &models.ServerlessRuntime{
 						Checksum:               checksum,
 						Type:                   models.SERVERLESS_RUNTIME_TYPE_AWS_LAMBDA,
-						FunctionURL:            functionUrl,
-						FunctionName:           functionName,
+						FunctionURL:            lambdaUrl,
+						FunctionName:           lambdaFunctionName,
 						PluginUniqueIdentifier: uniqueIdentity.String(),
 						Declaration:            declaration,
 					}
@@ -107,10 +106,10 @@ func (p *PluginManager) InstallToAWSFromPkg(
 					Event: PluginInstallEventError,
 					Data:  "Internal server error",
 				})
-			} else if r.Event == serverless.FunctionUrl {
-				functionUrl = r.Message
-			} else if r.Event == serverless.Function {
-				functionName = r.Message
+			} else if r.Event == serverless.LambdaUrl {
+				lambdaUrl = r.Message
+			} else if r.Event == serverless.Lambda {
+				lambdaFunctionName = r.Message
 			} else {
 				newResponse.WriteError(fmt.Errorf("unknown event: %s, with message: %s", r.Event, r.Message))
 			}

+ 45 - 111
internal/core/plugin_manager/serverless_connector/connector.go

@@ -7,15 +7,15 @@ import (
 	"net/url"
 
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/http_requests"
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
-	"github.com/langgenius/dify-plugin-daemon/pkg/entities/plugin_entities"
+	"github.com/langgenius/dify-plugin-daemon/pkg/entities"
 )
 
-type ServerlessFunction struct {
+var ()
+
+type LambdaFunction struct {
 	FunctionName string `json:"function_name" validate:"required"`
-	FunctionDRN  string `json:"function_drn" validate:"required"`
+	FunctionARN  string `json:"function_arn" validate:"required"`
 	FunctionURL  string `json:"function_url" validate:"required"`
 }
 
@@ -44,78 +44,72 @@ func Ping() error {
 }
 
 var (
-	ErrFunctionNotFound = errors.New("no function found")
+	ErrNoLambdaFunction = errors.New("no lambda function found")
 )
 
-// Fetch the function from serverless connector, return error if failed
-func FetchFunction(manifest plugin_entities.PluginDeclaration, checksum string) (*ServerlessFunction, error) {
-	filename := getFunctionFilename(manifest, checksum)
+// Fetch the lambda function from serverless connector, return error if failed
+func FetchLambda(identity string, checksum string) (*LambdaFunction, error) {
+	request := map[string]any{
+		"plugin": map[string]any{
+			"config": map[string]any{
+				"identity": identity,
+				"checksum": checksum,
+			},
+		},
+	}
 
-	url, err := url.JoinPath(baseurl.String(), "/v1/runner/instances")
+	url, err := url.JoinPath(baseurl.String(), "/v1/lambda/fetch")
 	if err != nil {
 		return nil, err
 	}
 
-	response, err := http_requests.GetAndParse[RunnerInstances](
+	response, err := http_requests.PostAndParse[entities.GenericResponse[LambdaFunction]](
 		client,
 		url,
 		http_requests.HttpHeader(map[string]string{
 			"Authorization": SERVERLESS_CONNECTOR_API_KEY,
 		}),
-		http_requests.HttpParams(map[string]string{
-			"filename": filename,
-		}),
+		http_requests.HttpPayloadJson(request),
 	)
-
 	if err != nil {
 		return nil, err
 	}
 
-	if response.Error != "" {
-		return nil, fmt.Errorf("unexpected response from plugin controller: %s", response.Error)
-	}
-
-	if len(response.Items) == 0 {
-		return nil, ErrFunctionNotFound
+	if response.Code != 0 {
+		if response.Code == -404 {
+			return nil, ErrNoLambdaFunction
+		}
+		return nil, fmt.Errorf("unexpected response from serverless connector: %s", response.Message)
 	}
 
-	return &ServerlessFunction{
-		FunctionName: response.Items[0].Name,
-		FunctionDRN:  response.Items[0].ResourceName,
-		FunctionURL:  response.Items[0].Endpoint,
-	}, nil
+	return &response.Data, nil
 }
 
-type LaunchFunctionEvent string
+type LaunchAWSLambdaFunctionEvent string
 
 const (
-	Error       LaunchFunctionEvent = "error"
-	Info        LaunchFunctionEvent = "info"
-	Function    LaunchFunctionEvent = "function"
-	FunctionUrl LaunchFunctionEvent = "function_url"
-	Done        LaunchFunctionEvent = "done"
+	Error     LaunchAWSLambdaFunctionEvent = "error"
+	Info      LaunchAWSLambdaFunctionEvent = "info"
+	Lambda    LaunchAWSLambdaFunctionEvent = "lambda"
+	LambdaUrl LaunchAWSLambdaFunctionEvent = "lambda_url"
+	Done      LaunchAWSLambdaFunctionEvent = "done"
 )
 
-type LaunchFunctionResponse struct {
-	Event   LaunchFunctionEvent `json:"event"`
-	Message string              `json:"message"`
+type LaunchAWSLambdaFunctionResponse struct {
+	Event   LaunchAWSLambdaFunctionEvent `json:"event"`
+	Message string                       `json:"message"`
 }
 
-// Setup the function from serverless connector, it will receive the context as the input
+// Launch the lambda function from serverless connector, it will receive the context_tar as the input
 // and build it a docker image, then run it on serverless platform like AWS Lambda
 // it returns a event stream, the caller should consider it as a async operation
-func SetupFunction(
-	manifest plugin_entities.PluginDeclaration,
-	checksum string,
-	context io.Reader,
-) (*stream.Stream[LaunchFunctionResponse], error) {
-	url, err := url.JoinPath(baseurl.String(), "/v1/launch")
+func LaunchLambda(identity string, checksum string, context_tar io.Reader) (*stream.Stream[LaunchAWSLambdaFunctionResponse], error) {
+	url, err := url.JoinPath(baseurl.String(), "/v1/lambda/launch")
 	if err != nil {
 		return nil, err
 	}
 
-	// join a filename
-	serverless_connector_response, err := http_requests.PostAndParseStream[LaunchFunctionResponseChunk](
+	response, err := http_requests.PostAndParseStream[LaunchAWSLambdaFunctionResponse](
 		client,
 		url,
 		http_requests.HttpHeader(map[string]string{
@@ -124,79 +118,19 @@ func SetupFunction(
 		http_requests.HttpReadTimeout(240000),
 		http_requests.HttpWriteTimeout(240000),
 		http_requests.HttpPayloadMultipart(
-			map[string]string{},
-			map[string]http_requests.HttpPayloadMultipartFile{
-				"context": {
-					Filename: getFunctionFilename(manifest, checksum),
-					Reader:   context,
-				},
+			map[string]string{
+				"identity": identity,
+				"checksum": checksum,
+			},
+			map[string]io.Reader{
+				"context": context_tar,
 			},
 		),
 	)
+
 	if err != nil {
 		return nil, err
 	}
 
-	response := stream.NewStream[LaunchFunctionResponse](10)
-
-	routine.Submit(map[string]string{
-		"module": "serverless_connector",
-		"func":   "SetupFunction",
-	}, func() {
-		defer response.Close()
-		if err := serverless_connector_response.Async(func(chunk LaunchFunctionResponseChunk) {
-			if chunk.State == LAUNCH_STATE_FAILED {
-				response.Write(LaunchFunctionResponse{
-					Event:   Error,
-					Message: chunk.Message,
-				})
-				return
-			}
-
-			switch chunk.Stage {
-			case LAUNCH_STAGE_START, LAUNCH_STAGE_BUILD:
-				response.Write(LaunchFunctionResponse{
-					Event:   Info,
-					Message: "Building plugin...",
-				})
-			case LAUNCH_STAGE_RUN:
-				if chunk.State == LAUNCH_STATE_SUCCESS {
-					data, err := parser.ParserCommaSeparatedValues[LaunchFunctionFinalStageMessage]([]byte(chunk.Message))
-					if err != nil {
-						response.Write(LaunchFunctionResponse{
-							Event:   Error,
-							Message: err.Error(),
-						})
-						return
-					}
-
-					response.Write(LaunchFunctionResponse{
-						Event:   Function,
-						Message: data.Name,
-					})
-					response.Write(LaunchFunctionResponse{
-						Event:   FunctionUrl,
-						Message: data.Endpoint,
-					})
-				} else {
-					response.Write(LaunchFunctionResponse{
-						Event:   Info,
-						Message: "Launching plugin...",
-					})
-				}
-			case LAUNCH_STAGE_END:
-				response.Write(LaunchFunctionResponse{
-					Event:   Done,
-					Message: "Plugin launched",
-				})
-			}
-		}); err != nil {
-			response.Write(LaunchFunctionResponse{
-				Event:   Error,
-				Message: err.Error(),
-			})
-		}
-	})
-
 	return response, nil
 }

+ 161 - 0
internal/core/plugin_manager/serverless_connector/packager.go

@@ -0,0 +1,161 @@
+package serverless
+
+import (
+	"archive/tar"
+	"compress/gzip"
+	"errors"
+	"io"
+	"io/fs"
+	"os"
+	"path"
+	"strings"
+	"time"
+
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/serverless_runtime/dockerfile"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/tmpfile"
+)
+
+type Packager struct {
+	decoder decoder.PluginDecoder
+}
+
+func NewPackager(decoder decoder.PluginDecoder) *Packager {
+	return &Packager{
+		decoder: decoder,
+	}
+}
+
+type dockerFileInfo struct {
+	fs.FileInfo
+
+	size int64
+}
+
+func (d *dockerFileInfo) Size() int64 {
+	return d.size
+}
+
+func (d *dockerFileInfo) Name() string {
+	return "Dockerfile"
+}
+
+func (d *dockerFileInfo) Mode() os.FileMode {
+	return 0644
+}
+
+func (d *dockerFileInfo) ModTime() time.Time {
+	return time.Now()
+}
+
+func (d *dockerFileInfo) IsDir() bool {
+	return false
+}
+
+func (d *dockerFileInfo) Sys() any {
+	return nil
+}
+
+// Pack takes a plugin and packs it into a tar file with dockerfile inside
+// returns a *os.File with the tar file
+func (p *Packager) Pack() (*os.File, error) {
+	declaration, err := p.decoder.Manifest()
+	if err != nil {
+		return nil, err
+	}
+
+	// walk through the plugin directory and add it to a tar file
+	// create a tmpfile
+	tmpfile, cleanup, err := tmpfile.CreateTempFile("plugin-aws-tar-*")
+	if err != nil {
+		return nil, err
+	}
+	success := false
+
+	defer func() {
+		if !success {
+			cleanup()
+		}
+	}()
+
+	gzipWriter, err := gzip.NewWriterLevel(tmpfile, gzip.BestCompression)
+	if err != nil {
+		return nil, err
+	}
+	defer gzipWriter.Close()
+
+	tarWriter := tar.NewWriter(gzipWriter)
+	defer tarWriter.Close()
+
+	if err := p.decoder.Walk(func(filename, dir string) error {
+		if strings.ToLower(filename) == "dockerfile" {
+			return errors.New("dockerfile is not allowed to be in the plugin directory")
+		}
+
+		fullFilename := path.Join(dir, filename)
+
+		state, err := p.decoder.Stat(fullFilename)
+		if err != nil {
+			return err
+		}
+
+		tarHeader, err := tar.FileInfoHeader(state, fullFilename)
+		if err != nil {
+			return err
+		}
+		tarHeader.Name = fullFilename
+
+		// write tar header
+		if err := tarWriter.WriteHeader(tarHeader); err != nil {
+			return err
+		}
+
+		// write file content
+		fileReader, err := p.decoder.FileReader(fullFilename)
+		if err != nil {
+			return err
+		}
+		if _, err := io.Copy(tarWriter, fileReader); err != nil {
+			fileReader.Close()
+			return err
+		}
+		// release resources
+		fileReader.Close()
+
+		return nil
+	}); err != nil {
+		return nil, err
+	}
+
+	// add dockerfile
+	dockerfile, err := dockerfile.GenerateDockerfile(&declaration)
+	if err != nil {
+		return nil, err
+	}
+
+	tarHeader, err := tar.FileInfoHeader(&dockerFileInfo{
+		size: int64(len(dockerfile)),
+	}, "Dockerfile")
+	if err != nil {
+		return nil, err
+	}
+
+	// create a fake dockerfile stat
+	if err := tarWriter.WriteHeader(tarHeader); err != nil {
+		return nil, err
+	}
+
+	if _, err := tarWriter.Write([]byte(dockerfile)); err != nil {
+		return nil, err
+	}
+
+	// close writers to flush data
+	tarWriter.Close()
+	gzipWriter.Close()
+
+	tmpfile.Seek(0, io.SeekStart)
+
+	success = true
+
+	return tmpfile, nil
+}

+ 124 - 0
internal/core/plugin_manager/serverless_connector/packager_test.go

@@ -0,0 +1,124 @@
+package serverless
+
+import (
+	"archive/tar"
+	"compress/gzip"
+	"embed"
+	"io"
+	"io/fs"
+	"os"
+	"path"
+	"path/filepath"
+	"testing"
+
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
+)
+
+//go:embed packager_test_plugin/*
+var test_plugin embed.FS
+
+func TestPackager_Pack(t *testing.T) {
+	// create a temp dir
+	tmpDir, err := os.MkdirTemp("", "test_plugin")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer os.RemoveAll(tmpDir)
+
+	// copy the test_plugin to the temp dir
+	if err := fs.WalkDir(test_plugin, ".", func(path string, d fs.DirEntry, err error) error {
+		if err != nil {
+			return err
+		}
+
+		if d.IsDir() {
+			// create the dir
+			os.MkdirAll(filepath.Join(tmpDir, path), 0755)
+		} else {
+			// copy the file
+			originFile, err := test_plugin.Open(path)
+			if err != nil {
+				return err
+			}
+			defer originFile.Close()
+
+			content, err := io.ReadAll(originFile)
+			if err != nil {
+				return err
+			}
+
+			if err := os.WriteFile(filepath.Join(tmpDir, path), content, 0644); err != nil {
+				return err
+			}
+		}
+
+		return nil
+	}); err != nil {
+		t.Fatal(err)
+	}
+
+	decoder, err := decoder.NewFSPluginDecoder(path.Join(tmpDir, "packager_test_plugin"))
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	packager := NewPackager(decoder)
+
+	f, err := packager.Pack()
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer func() {
+		f.Close()
+		os.Remove(f.Name())
+	}()
+
+	gzipReader, err := gzip.NewReader(f)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer gzipReader.Close()
+
+	// Create a new tar reader
+	tarReader := tar.NewReader(gzipReader)
+
+	dockerfileFound := false
+	requirementsFound := false
+	mainPyFound := false
+	jinaYamlFound := false
+	// Iterate through the files in the tar.gz archive
+	for {
+		header, err := tarReader.Next()
+		if err == io.EOF {
+			break // End of archive
+		}
+		if err != nil {
+			t.Fatal(err)
+		}
+
+		switch header.Name {
+		case "Dockerfile":
+			dockerfileFound = true
+		case "requirements.txt":
+			requirementsFound = true
+		case "main.py":
+			mainPyFound = true
+		case "provider/jina.yaml":
+			jinaYamlFound = true
+		}
+	}
+
+	// Check if all required files are present
+	if !dockerfileFound {
+		t.Error("Dockerfile not found in the packed archive")
+	}
+	if !requirementsFound {
+		t.Error("requirements.txt not found in the packed archive")
+	}
+	if !mainPyFound {
+		t.Error("main.py not found in the packed archive")
+	}
+	if !jinaYamlFound {
+		t.Error("jina.yaml not found in the packed archive")
+	}
+}

+ 5 - 0
internal/core/plugin_manager/serverless_connector/packager_test_plugin/main.py

@@ -0,0 +1,5 @@
+def main():
+    print("Hello, World!")
+
+if __name__ == "__main__":
+    main()

+ 30 - 0
internal/core/plugin_manager/serverless_connector/packager_test_plugin/manifest.yaml

@@ -0,0 +1,30 @@
+version: 0.0.1
+type: plugin
+author: "Yeuoly"
+name: "jina"
+label:
+  en_US: "Jina"
+created_at: "2024-07-12T08:03:44.658609186Z"
+resource:
+  memory: 1048576
+  permission:
+    tool:
+      enabled: true
+    model:
+      enabled: true
+      llm: true
+plugins:
+  models:
+    - "provider/jina.yaml"
+execution:
+  install: install.sh
+  launch: launch.sh
+meta:
+  version: 0.0.1
+  arch:
+    - "amd64"
+    - "arm64"
+  runner:
+    language: "python"
+    version: "3.12"
+    entrypoint: "main"

+ 82 - 0
internal/core/plugin_manager/serverless_connector/packager_test_plugin/provider/jina.yaml

@@ -0,0 +1,82 @@
+provider: jina
+label:
+  en_US: Jina
+description:
+  en_US: Embedding and Rerank Model Supported
+icon_small:
+  en_US: icon_s_en.svg
+icon_large:
+  en_US: icon_l_en.svg
+background: "#EFFDFD"
+help:
+  title:
+    en_US: Get your API key from Jina AI
+    zh_Hans: 从 Jina 获取 API Key
+  url:
+    en_US: https://jina.ai/
+supported_model_types:
+  - text-embedding
+  - rerank
+configurate_methods:
+  - predefined-model
+  - customizable-model
+provider_credential_schema:
+  credential_form_schemas:
+    - variable: api_key
+      label:
+        en_US: API Key
+      type: secret-input
+      required: true
+      placeholder:
+        zh_Hans: 在此输入您的 API Key
+        en_US: Enter your API Key
+model_credential_schema:
+  model:
+    label:
+      en_US: Model Name
+      zh_Hans: 模型名称
+    placeholder:
+      en_US: Enter your model name
+      zh_Hans: 输入模型名称
+  credential_form_schemas:
+    - variable: api_key
+      label:
+        en_US: API Key
+      type: secret-input
+      required: true
+      placeholder:
+        zh_Hans: 在此输入您的 API Key
+        en_US: Enter your API Key
+    - variable: base_url
+      label:
+        zh_Hans: 服务器 URL
+        en_US: Base URL
+      type: text-input
+      required: true
+      placeholder:
+        zh_Hans: Base URL, e.g. https://api.jina.ai/v1
+        en_US: Base URL, e.g. https://api.jina.ai/v1
+      default: 'https://api.jina.ai/v1'
+    - variable: context_size
+      label:
+        zh_Hans: 上下文大小
+        en_US: Context size
+      placeholder:
+        zh_Hans: 输入上下文大小
+        en_US: Enter context size
+      required: false
+      type: text-input
+      default: '8192'
+models:
+  rerank:
+    predefined:
+      - "models/**/*.yaml"
+  text-embedding:
+    predefined:
+      - "models/**/*.yaml"
+extra:
+  python:
+    provider_source: provider/jina.py
+    model_sources:
+      - "models/text_embedding/text_embedding.py"
+      - "models/rerank/rerank.py"

+ 0 - 0
internal/core/plugin_manager/serverless_connector/packager_test_plugin/requirements.txt


+ 23 - 13
internal/core/plugin_manager/serverless_connector/launch.go

@@ -1,7 +1,7 @@
 package serverless
 
 import (
-	"bytes"
+	"os"
 	"time"
 
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
@@ -13,9 +13,9 @@ var (
 	AWS_LAUNCH_LOCK_PREFIX = "aws_launch_lock_"
 )
 
-// LaunchPlugin uploads the plugin to specific serverless connector
-// return the function url and name
-func LaunchPlugin(originPackage []byte, decoder decoder.PluginDecoder) (*stream.Stream[LaunchFunctionResponse], error) {
+// UploadPlugin uploads the plugin to the AWS Lambda
+// return the lambda url and name
+func UploadPlugin(decoder decoder.PluginDecoder) (*stream.Stream[LaunchAWSLambdaFunctionResponse], error) {
 	checksum, err := decoder.Checksum()
 	if err != nil {
 		return nil, err
@@ -32,23 +32,24 @@ func LaunchPlugin(originPackage []byte, decoder decoder.PluginDecoder) (*stream.
 		return nil, err
 	}
 
-	function, err := FetchFunction(manifest, checksum)
+	identity := manifest.Identity()
+	function, err := FetchLambda(identity, checksum)
 	if err != nil {
-		if err != ErrFunctionNotFound {
+		if err != ErrNoLambdaFunction {
 			return nil, err
 		}
 	} else {
 		// found, return directly
-		response := stream.NewStream[LaunchFunctionResponse](3)
-		response.Write(LaunchFunctionResponse{
-			Event:   FunctionUrl,
+		response := stream.NewStream[LaunchAWSLambdaFunctionResponse](3)
+		response.Write(LaunchAWSLambdaFunctionResponse{
+			Event:   LambdaUrl,
 			Message: function.FunctionURL,
 		})
-		response.Write(LaunchFunctionResponse{
-			Event:   Function,
+		response.Write(LaunchAWSLambdaFunctionResponse{
+			Event:   Lambda,
 			Message: function.FunctionName,
 		})
-		response.Write(LaunchFunctionResponse{
+		response.Write(LaunchAWSLambdaFunctionResponse{
 			Event:   Done,
 			Message: "",
 		})
@@ -56,7 +57,16 @@ func LaunchPlugin(originPackage []byte, decoder decoder.PluginDecoder) (*stream.
 		return response, nil
 	}
 
-	response, err := SetupFunction(manifest, checksum, bytes.NewReader(originPackage))
+	// create lambda function
+	packager := NewPackager(decoder)
+	context, err := packager.Pack()
+	if err != nil {
+		return nil, err
+	}
+	defer os.Remove(context.Name())
+	defer context.Close()
+
+	response, err := LaunchLambda(identity, checksum, context)
 	if err != nil {
 		return nil, err
 	}

+ 1 - 1
internal/service/install_plugin.go

@@ -215,7 +215,7 @@ func InstallPluginRuntimeToTenant(
 					})
 					return
 				}
-				stream, err = manager.InstallToAWSFromPkg(pkgFile, zipDecoder, source, metas[i])
+				stream, err = manager.InstallToAWSFromPkg(zipDecoder, source, metas[i])
 			} else if config.Platform == app.PLATFORM_LOCAL {
 				stream, err = manager.InstallToLocal(pluginUniqueIdentifier, source, metas[i])
 			} else {

+ 1 - 6
internal/utils/http_requests/http_options.go

@@ -41,14 +41,9 @@ func HttpPayloadJson(payload interface{}) HttpOptions {
 	return HttpOptions{"payloadJson", payload}
 }
 
-type HttpPayloadMultipartFile struct {
-	Filename string
-	Reader   io.Reader
-}
-
 // which is used for POST method only
 // payload follows the form data format, and files is a map from filename to file
-func HttpPayloadMultipart(payload map[string]string, files map[string]HttpPayloadMultipartFile) HttpOptions {
+func HttpPayloadMultipart(payload map[string]string, files map[string]io.Reader) HttpOptions {
 	return HttpOptions{"payloadMultipart", map[string]interface{}{
 		"payload": payload,
 		"files":   files,

+ 4 - 4
internal/utils/http_requests/http_request.go

@@ -44,14 +44,14 @@ func buildHttpRequest(method string, url string, options ...HttpOptions) (*http.
 			buffer := new(bytes.Buffer)
 			writer := multipart.NewWriter(buffer)
 
-			files := option.Value.(map[string]any)["files"].(map[string]HttpPayloadMultipartFile)
-			for filename, file := range files {
-				part, err := writer.CreateFormFile(filename, file.Filename)
+			files := option.Value.(map[string]any)["files"].(map[string]io.Reader)
+			for filename, reader := range files {
+				part, err := writer.CreateFormFile(filename, filename)
 				if err != nil {
 					writer.Close()
 					return nil, err
 				}
-				_, err = io.Copy(part, file.Reader)
+				_, err = io.Copy(part, reader)
 				if err != nil {
 					writer.Close()
 					return nil, err

+ 0 - 80
internal/utils/parser/comma.go

@@ -1,80 +0,0 @@
-package parser
-
-import (
-	"bytes"
-	"fmt"
-	"reflect"
-	"strconv"
-)
-
-// ParserCommaSeparatedValues parses the comma separated values
-// and returns a map of key-value pairs
-// examples:
-// data: a=1,b=2
-//
-//	T: type struct {
-//		A int `comma:"a"`
-//		B string `comma:"b"`
-//	}
-//
-//	return:
-//	T{A: 1, B: "2"}
-func ParserCommaSeparatedValues[T any](data []byte) (T, error) {
-	var result T
-	if len(data) == 0 {
-		return result, nil
-	}
-
-	// Split by comma
-	pairs := bytes.Split(data, []byte(","))
-
-	// Create map to store key-value pairs
-	values := make(map[string]string)
-
-	// Parse each key-value pair
-	for _, pair := range pairs {
-		kv := bytes.Split(pair, []byte("="))
-		if len(kv) != 2 {
-			return result, fmt.Errorf("invalid key-value pair: %s", pair)
-		}
-		key := string(bytes.TrimSpace(kv[0]))
-		value := string(bytes.TrimSpace(kv[1]))
-		values[key] = value
-	}
-
-	// Convert map to struct using reflection
-	resultValue := reflect.ValueOf(&result).Elem()
-	resultType := resultValue.Type()
-
-	for i := 0; i < resultType.NumField(); i++ {
-		field := resultType.Field(i)
-		fieldValue := resultValue.Field(i)
-
-		// Get comma tag value
-		tag := field.Tag.Get("comma")
-		if tag == "" {
-			tag = field.Name
-		}
-
-		if value, ok := values[tag]; ok {
-			switch field.Type.Kind() {
-			case reflect.String:
-				fieldValue.SetString(value)
-			case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
-				if intVal, err := strconv.ParseInt(value, 10, 64); err == nil {
-					fieldValue.SetInt(intVal)
-				}
-			case reflect.Float32, reflect.Float64:
-				if floatVal, err := strconv.ParseFloat(value, 64); err == nil {
-					fieldValue.SetFloat(floatVal)
-				}
-			case reflect.Bool:
-				if boolVal, err := strconv.ParseBool(value); err == nil {
-					fieldValue.SetBool(boolVal)
-				}
-			}
-		}
-	}
-
-	return result, nil
-}

+ 0 - 129
internal/utils/parser/comma_test.go

@@ -1,129 +0,0 @@
-package parser
-
-import (
-	"testing"
-)
-
-func TestCommaSeparatedValues(t *testing.T) {
-	type testStruct struct {
-		A        int     `comma:"a"`
-		B        string  `comma:"b"`
-		C        bool    `comma:"c"`
-		D        float64 `comma:"d"`
-		E        string  // no tag
-		Endpoint string  `comma:"endpoint"`
-		Name     string  `comma:"name"`
-		ID       string  `comma:"id"`
-	}
-
-	tests := []struct {
-		name    string
-		input   []byte
-		want    testStruct
-		wantErr bool
-	}{
-		{
-			name:  "basic test",
-			input: []byte("a=1,b=hello,c=true,d=3.14,E=world"),
-			want: testStruct{
-				A: 1,
-				B: "hello",
-				C: true,
-				D: 3.14,
-				E: "world",
-			},
-			wantErr: false,
-		},
-		{
-			name:    "empty input",
-			input:   []byte(""),
-			want:    testStruct{},
-			wantErr: false,
-		},
-		{
-			name:  "partial fields",
-			input: []byte("a=42,c=false"),
-			want: testStruct{
-				A: 42,
-				C: false,
-			},
-			wantErr: false,
-		},
-		{
-			name:  "extra fields ignored",
-			input: []byte("a=1,b=test,extra=ignored"),
-			want: testStruct{
-				A: 1,
-				B: "test",
-			},
-			wantErr: false,
-		},
-		{
-			name:    "invalid format - missing value",
-			input:   []byte("a=1,b"),
-			wantErr: true,
-		},
-		{
-			name:    "invalid format - missing equals",
-			input:   []byte("a=1,b:2"),
-			wantErr: true,
-		},
-		{
-			name:  "whitespace handling",
-			input: []byte("a = 1, b = hello "),
-			want: testStruct{
-				A: 1,
-				B: "hello",
-			},
-			wantErr: false,
-		},
-		{
-			name:  "type conversion - invalid int",
-			input: []byte("a=notanumber,b=test"),
-			want: testStruct{
-				B: "test",
-			},
-			wantErr: false,
-		},
-		{
-			name:  "type conversion - invalid bool",
-			input: []byte("c=notabool,b=test"),
-			want: testStruct{
-				B: "test",
-			},
-			wantErr: false,
-		},
-		{
-			name:  "type conversion - invalid float",
-			input: []byte("d=notafloat,b=test"),
-			want: testStruct{
-				B: "test",
-			},
-			wantErr: false,
-		},
-		{
-			name:  "actual example",
-			input: []byte("endpoint=test-plugin.dify.uwu,name=c31a98df2ef139d6532d8da8caa2bb63,id=c31a98df2ef139d6532d8da8caa2bb63"),
-			want: testStruct{
-				Endpoint: "test-plugin.dify.uwu",
-				Name:     "c31a98df2ef139d6532d8da8caa2bb63",
-				ID:       "c31a98df2ef139d6532d8da8caa2bb63",
-			},
-			wantErr: false,
-		},
-	}
-
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			got, err := ParserCommaSeparatedValues[testStruct](tt.input)
-			if (err != nil) != tt.wantErr {
-				t.Errorf("ParserCommaSeparatedValues() error = %v, wantErr %v", err, tt.wantErr)
-				return
-			}
-			if !tt.wantErr && got != tt.want {
-				t.Errorf("ParserCommaSeparatedValues() = %v, want %v", got, tt.want)
-			}
-		})
-	}
-
-}