Browse Source

refactor: serverless

Yeuoly 10 months ago
parent
commit
e6e4f108c7

+ 0 - 71
internal/core/plugin_manager/aws_manager/environment.go

@@ -4,15 +4,9 @@ import (
 	"fmt"
 	"net"
 	"net/http"
-	"os"
 	"time"
 
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
-)
-
-var (
-	AWS_LAUNCH_LOCK_PREFIX = "aws_launch_lock_"
 )
 
 func (r *AWSPluginRuntime) InitEnvironment() error {
@@ -33,68 +27,3 @@ func (r *AWSPluginRuntime) InitEnvironment() error {
 func (r *AWSPluginRuntime) Identity() (plugin_entities.PluginUniqueIdentifier, error) {
 	return plugin_entities.PluginUniqueIdentifier(fmt.Sprintf("%s@%s", r.Config.Identity(), r.Checksum())), nil
 }
-
-// UploadPlugin uploads the plugin to the AWS Lambda
-func (r *AWSPluginRuntime) UploadPlugin() error {
-	r.Log("Starting to initialize environment")
-	// check if the plugin has already been initialized, at most 300s
-	if err := cache.Lock(AWS_LAUNCH_LOCK_PREFIX+r.Checksum(), 300*time.Second, 300*time.Second); err != nil {
-		return err
-	}
-	defer cache.Unlock(AWS_LAUNCH_LOCK_PREFIX + r.Checksum())
-	r.Log("Started to initialize environment")
-
-	identity, err := r.Identity()
-	if err != nil {
-		return err
-	}
-	function, err := fetchLambda(identity.String(), r.Checksum())
-	if err != nil {
-		if err != ErrNoLambdaFunction {
-			return err
-		}
-	} else {
-		// found, return directly
-		r.LambdaURL = function.FunctionURL
-		r.LambdaName = function.FunctionName
-		r.Log(fmt.Sprintf("Found existing lambda function: %s", r.LambdaName))
-		return nil
-	}
-
-	// create it if not found
-	r.Log("Creating new lambda function")
-
-	// create lambda function
-	packager := NewPackager(r, r.Decoder)
-	context, err := packager.Pack()
-	if err != nil {
-		return err
-	}
-	defer os.Remove(context.Name())
-	defer context.Close()
-
-	response, err := launchLambda(identity.String(), r.Checksum(), context)
-	if err != nil {
-		return err
-	}
-
-	for response.Next() {
-		response, err := response.Read()
-		if err != nil {
-			return err
-		}
-
-		switch response.Event {
-		case Error:
-			return fmt.Errorf("error: %s", response.Message)
-		case LambdaUrl:
-			r.LambdaURL = response.Message
-		case Lambda:
-			r.LambdaName = response.Message
-		case Info:
-			r.Log(fmt.Sprintf("installing: %s", response.Message))
-		}
-	}
-
-	return nil
-}

+ 7 - 0
internal/core/plugin_manager/installer.go

@@ -0,0 +1,7 @@
+package plugin_manager
+
+import "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
+
+type Installer interface {
+	Install(decoder decoder.PluginDecoder) error
+}

+ 0 - 4
internal/core/plugin_manager/lifetime.go

@@ -86,7 +86,3 @@ func (p *PluginManager) fullDuplexLifetime(r plugin_entities.PluginFullDuplexLif
 		r.AddRestarts()
 	}
 }
-
-func (p *PluginManager) serverlessLifetime(r plugin_entities.PluginServerlessLifetime, onStop func()) {
-	//
-}

+ 2 - 2
internal/core/plugin_manager/aws_manager/serverless_connector_client.go

@@ -1,4 +1,4 @@
-package aws_manager
+package serverless
 
 import (
 	"net"
@@ -35,7 +35,7 @@ func Init(config *app.Config) {
 
 	SERVERLESS_CONNECTOR_API_KEY = *config.DifyPluginServerlessConnectorAPIKey
 
-	if err := ping(); err != nil {
+	if err := Ping(); err != nil {
 		log.Panic("Failed to ping serverless connector", err)
 	}
 }

+ 4 - 4
internal/core/plugin_manager/aws_manager/serverless_connector.go

@@ -1,4 +1,4 @@
-package aws_manager
+package serverless
 
 import (
 	"errors"
@@ -20,7 +20,7 @@ type LambdaFunction struct {
 }
 
 // Ping the serverless connector, return error if failed
-func ping() error {
+func Ping() error {
 	url, err := url.JoinPath(baseurl.String(), "/ping")
 	if err != nil {
 		return err
@@ -51,7 +51,7 @@ var (
 )
 
 // Fetch the lambda function from serverless connector, return error if failed
-func fetchLambda(identity string, checksum string) (*LambdaFunction, error) {
+func FetchLambda(identity string, checksum string) (*LambdaFunction, error) {
 	request := map[string]any{
 		"config": map[string]any{
 			"identity": identity,
@@ -104,7 +104,7 @@ type LaunchAWSLambdaFunctionResponse struct {
 // Launch the lambda function from serverless connector, it will receive the context_tar as the input
 // and build it a docker image, then run it on serverless platform like AWS Lambda
 // it returns a event stream, the caller should consider it as a async operation
-func launchLambda(identity string, checksum string, context_tar io.Reader) (*stream.StreamResponse[LaunchAWSLambdaFunctionResponse], error) {
+func LaunchLambda(identity string, checksum string, context_tar io.Reader) (*stream.StreamResponse[LaunchAWSLambdaFunctionResponse], error) {
 	url, err := url.JoinPath(baseurl.String(), "/v1/lambda/launch")
 	if err != nil {
 		return nil, err

+ 1 - 1
internal/core/plugin_manager/aws_manager/packager.go

@@ -1,4 +1,4 @@
-package aws_manager
+package serverless
 
 import (
 	"archive/tar"

+ 1 - 1
internal/core/plugin_manager/aws_manager/packager_test.go

@@ -1,4 +1,4 @@
-package aws_manager
+package serverless
 
 import (
 	"archive/tar"

internal/core/plugin_manager/aws_manager/packager_test_plugin/main.py → internal/core/plugin_manager/serverless/packager_test_plugin/main.py


internal/core/plugin_manager/aws_manager/packager_test_plugin/manifest.yaml → internal/core/plugin_manager/serverless/packager_test_plugin/manifest.yaml


internal/core/plugin_manager/aws_manager/packager_test_plugin/provider/jina.yaml → internal/core/plugin_manager/serverless/packager_test_plugin/provider/jina.yaml


internal/core/plugin_manager/aws_manager/packager_test_plugin/requirements.txt → internal/core/plugin_manager/serverless/packager_test_plugin/requirements.txt


+ 79 - 0
internal/core/plugin_manager/serverless/upload.go

@@ -0,0 +1,79 @@
+package serverless
+
+import (
+	"fmt"
+	"os"
+	"time"
+
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/aws_manager"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
+)
+
+var (
+	AWS_LAUNCH_LOCK_PREFIX = "aws_launch_lock_"
+)
+
+// UploadPlugin uploads the plugin to the AWS Lambda
+func UploadPlugin(r *aws_manager.AWSPluginRuntime) error {
+	r.Log("Starting to initialize environment")
+	// check if the plugin has already been initialized, at most 300s
+	if err := cache.Lock(AWS_LAUNCH_LOCK_PREFIX+r.Checksum(), 300*time.Second, 300*time.Second); err != nil {
+		return err
+	}
+	defer cache.Unlock(AWS_LAUNCH_LOCK_PREFIX + r.Checksum())
+	r.Log("Started to initialize environment")
+
+	identity, err := r.Identity()
+	if err != nil {
+		return err
+	}
+	function, err := FetchLambda(identity.String(), r.Checksum())
+	if err != nil {
+		if err != ErrNoLambdaFunction {
+			return err
+		}
+	} else {
+		// found, return directly
+		r.LambdaURL = function.FunctionURL
+		r.LambdaName = function.FunctionName
+		r.Log(fmt.Sprintf("Found existing lambda function: %s", r.LambdaName))
+		return nil
+	}
+
+	// create it if not found
+	r.Log("Creating new lambda function")
+
+	// create lambda function
+	packager := NewPackager(r, r.Decoder)
+	context, err := packager.Pack()
+	if err != nil {
+		return err
+	}
+	defer os.Remove(context.Name())
+	defer context.Close()
+
+	response, err := LaunchLambda(identity.String(), r.Checksum(), context)
+	if err != nil {
+		return err
+	}
+
+	for response.Next() {
+		response, err := response.Read()
+		if err != nil {
+			return err
+		}
+
+		switch response.Event {
+		case Error:
+			return fmt.Errorf("error: %s", response.Message)
+		case LambdaUrl:
+			r.LambdaURL = response.Message
+		case Lambda:
+			r.LambdaName = response.Message
+		case Info:
+			r.Log(fmt.Sprintf("installing: %s", response.Message))
+		}
+	}
+
+	return nil
+}