浏览代码

feat: install

Yeuoly 10 月之前
父节点
当前提交
cda65741b1

+ 1 - 1
internal/core/plugin_daemon/generic.go

@@ -18,7 +18,7 @@ func genericInvokePlugin[Req any, Rsp any](
 	request *Req,
 	response_buffer_size int,
 ) (*stream.Stream[Rsp], error) {
-	runtime := plugin_manager.GetGlobalPluginManager().Get(session.PluginUniqueIdentifier)
+	runtime := plugin_manager.Manager().Get(session.PluginUniqueIdentifier)
 	if runtime == nil {
 		return nil, errors.New("plugin not found")
 	}

+ 11 - 0
internal/core/plugin_manager/install.go

@@ -0,0 +1,11 @@
+package plugin_manager
+
+import (
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/installer"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
+)
+
+func (p *PluginManager) Install(decoder decoder.PluginDecoder) (*stream.Stream[installer.PluginInstallResponse], error) {
+	return p.installer(decoder)
+}

+ 1 - 1
internal/core/plugin_manager/installer/aws.go

@@ -5,6 +5,6 @@ import (
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
 )
 
-func AwsInstaller(decoder decoder.PluginDecoder) (*stream.Stream[string], error) {
+func AwsInstaller(decoder decoder.PluginDecoder) (*stream.Stream[PluginInstallResponse], error) {
 	return nil, nil
 }

+ 6 - 1
internal/core/plugin_manager/installer/installer.go

@@ -5,4 +5,9 @@ import (
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
 )
 
-type Installer func(decoder decoder.PluginDecoder) (*stream.Stream[string], error)
+type PluginInstallResponse struct {
+	Event string `json:"event"`
+	Data  string `json:"data"`
+}
+
+type Installer func(decoder decoder.PluginDecoder) (*stream.Stream[PluginInstallResponse], error)

+ 1 - 1
internal/core/plugin_manager/installer/local.go

@@ -5,6 +5,6 @@ import (
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
 )
 
-func LocalInstaller(decoder decoder.PluginDecoder) (*stream.Stream[string], error) {
+func LocalInstaller(decoder decoder.PluginDecoder) (*stream.Stream[PluginInstallResponse], error) {
 	return nil, nil
 }

+ 4 - 2
internal/core/plugin_manager/manager.go

@@ -7,6 +7,7 @@ import (
 	"github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/installer"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/media_manager"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/serverless"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
@@ -39,7 +40,7 @@ var (
 	manager *PluginManager
 )
 
-func InitGlobalPluginManager(cluster *cluster.Cluster, configuration *app.Config) {
+func InitManager(cluster *cluster.Cluster, configuration *app.Config) {
 	manager = &PluginManager{
 		cluster:              cluster,
 		maxPluginPackageSize: configuration.MaxPluginPackageSize,
@@ -53,6 +54,7 @@ func InitGlobalPluginManager(cluster *cluster.Cluster, configuration *app.Config
 
 	if configuration.Platform == app.PLATFORM_AWS_LAMBDA {
 		manager.installer = installer.AwsInstaller
+		serverless.Init(configuration)
 	} else if configuration.Platform == app.PLATFORM_LOCAL {
 		manager.installer = installer.LocalInstaller
 	}
@@ -60,7 +62,7 @@ func InitGlobalPluginManager(cluster *cluster.Cluster, configuration *app.Config
 	manager.Init(configuration)
 }
 
-func GetGlobalPluginManager() *PluginManager {
+func Manager() *PluginManager {
 	return manager
 }
 

+ 2 - 0
internal/core/plugin_manager/serverless/client.go

@@ -38,4 +38,6 @@ func Init(config *app.Config) {
 	if err := Ping(); err != nil {
 		log.Panic("Failed to ping serverless connector", err)
 	}
+
+	log.Info("Serverless connector initialized")
 }

+ 7 - 5
internal/core/plugin_manager/serverless/packager.go

@@ -14,18 +14,15 @@ import (
 
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/aws_manager/dockerfile"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
-	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/tmpfile"
 )
 
 type Packager struct {
-	runtime plugin_entities.PluginLifetime
 	decoder decoder.PluginDecoder
 }
 
-func NewPackager(runtime plugin_entities.PluginLifetime, decoder decoder.PluginDecoder) *Packager {
+func NewPackager(decoder decoder.PluginDecoder) *Packager {
 	return &Packager{
-		runtime: runtime,
 		decoder: decoder,
 	}
 }
@@ -63,6 +60,11 @@ func (d *dockerFileInfo) Sys() any {
 // Pack takes a plugin and packs it into a tar file with dockerfile inside
 // returns a *os.File with the tar file
 func (p *Packager) Pack() (*os.File, error) {
+	declaration, err := p.decoder.Manifest()
+	if err != nil {
+		return nil, err
+	}
+
 	// walk through the plugin directory and add it to a tar file
 	// create a tmpfile
 	tmpfile, cleanup, err := tmpfile.CreateTempFile("plugin-aws-tar-*")
@@ -132,7 +134,7 @@ func (p *Packager) Pack() (*os.File, error) {
 	}
 
 	// add dockerfile
-	dockerfile, err := dockerfile.GenerateDockerfile(p.runtime.Configuration())
+	dockerfile, err := dockerfile.GenerateDockerfile(&declaration)
 	if err != nil {
 		return nil, err
 	}

+ 1 - 58
internal/core/plugin_manager/serverless/packager_test.go

@@ -11,49 +11,9 @@ import (
 	"path/filepath"
 	"testing"
 
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
-	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
-	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/constants"
-	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 )
 
-type TPluginRuntime struct {
-	plugin_entities.PluginRuntime
-	positive_manager.PositivePluginRuntime
-}
-
-func (r *TPluginRuntime) InitEnvironment() error {
-	return nil
-}
-
-func (r *TPluginRuntime) Checksum() string {
-	return "test_checksum"
-}
-
-func (r *TPluginRuntime) Identity() (plugin_entities.PluginUniqueIdentifier, error) {
-	return plugin_entities.PluginUniqueIdentifier("test_identity"), nil
-}
-
-func (r *TPluginRuntime) StartPlugin() error {
-	return nil
-}
-
-func (r *TPluginRuntime) Type() plugin_entities.PluginRuntimeType {
-	return plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL
-}
-
-func (r *TPluginRuntime) Wait() (<-chan bool, error) {
-	return nil, nil
-}
-
-func (r *TPluginRuntime) Listen(string) *entities.Broadcast[plugin_entities.SessionMessage] {
-	return nil
-}
-
-func (r *TPluginRuntime) Write(string, []byte) {
-}
-
 //go:embed packager_test_plugin/*
 var test_plugin embed.FS
 
@@ -102,24 +62,7 @@ func TestPackager_Pack(t *testing.T) {
 		t.Fatal(err)
 	}
 
-	packager := NewPackager(&TPluginRuntime{
-		PluginRuntime: plugin_entities.PluginRuntime{
-			Config: plugin_entities.PluginDeclaration{
-				PluginDeclarationWithoutAdvancedFields: plugin_entities.PluginDeclarationWithoutAdvancedFields{
-					Meta: plugin_entities.PluginMeta{
-						Runner: plugin_entities.PluginRunner{
-							Language:   constants.Python,
-							Version:    "3.12",
-							Entrypoint: "main",
-						},
-						Arch: []constants.Arch{
-							constants.AMD64,
-						},
-					},
-				},
-			},
-		},
-	}, decoder)
+	packager := NewPackager(decoder)
 
 	f, err := packager.Pack()
 	if err != nil {

+ 34 - 42
internal/core/plugin_manager/serverless/upload.go

@@ -1,12 +1,13 @@
 package serverless
 
 import (
-	"fmt"
 	"os"
 	"time"
 
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/aws_manager"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/checksum"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
 )
 
 var (
@@ -14,66 +15,57 @@ var (
 )
 
 // UploadPlugin uploads the plugin to the AWS Lambda
-func UploadPlugin(r *aws_manager.AWSPluginRuntime) error {
-	r.Log("Starting to initialize environment")
+// return the lambda url and name
+func UploadPlugin(decoder decoder.PluginDecoder) (*stream.Stream[LaunchAWSLambdaFunctionResponse], error) {
+	checksum, err := checksum.CalculateChecksum(decoder)
+	if err != nil {
+		return nil, err
+	}
+
 	// check if the plugin has already been initialized, at most 300s
-	if err := cache.Lock(AWS_LAUNCH_LOCK_PREFIX+r.Checksum(), 300*time.Second, 300*time.Second); err != nil {
-		return err
+	if err := cache.Lock(AWS_LAUNCH_LOCK_PREFIX+checksum, 300*time.Second, 300*time.Second); err != nil {
+		return nil, err
 	}
-	defer cache.Unlock(AWS_LAUNCH_LOCK_PREFIX + r.Checksum())
-	r.Log("Started to initialize environment")
+	defer cache.Unlock(AWS_LAUNCH_LOCK_PREFIX + checksum)
 
-	identity, err := r.Identity()
+	manifest, err := decoder.Manifest()
 	if err != nil {
-		return err
+		return nil, err
 	}
-	function, err := FetchLambda(identity.String(), r.Checksum())
+
+	identity := manifest.Identity()
+	function, err := FetchLambda(identity, checksum)
 	if err != nil {
 		if err != ErrNoLambdaFunction {
-			return err
+			return nil, err
 		}
 	} else {
 		// found, return directly
-		r.LambdaURL = function.FunctionURL
-		r.LambdaName = function.FunctionName
-		r.Log(fmt.Sprintf("Found existing lambda function: %s", r.LambdaName))
-		return nil
+		response := stream.NewStreamResponse[LaunchAWSLambdaFunctionResponse](2)
+		response.Write(LaunchAWSLambdaFunctionResponse{
+			Event:   LambdaUrl,
+			Message: function.FunctionURL,
+		})
+		response.Write(LaunchAWSLambdaFunctionResponse{
+			Event:   Lambda,
+			Message: function.FunctionName,
+		})
+		return response, nil
 	}
 
-	// create it if not found
-	r.Log("Creating new lambda function")
-
 	// create lambda function
-	packager := NewPackager(r, r.Decoder)
+	packager := NewPackager(decoder)
 	context, err := packager.Pack()
 	if err != nil {
-		return err
+		return nil, err
 	}
 	defer os.Remove(context.Name())
 	defer context.Close()
 
-	response, err := LaunchLambda(identity.String(), r.Checksum(), context)
+	response, err := LaunchLambda(identity, checksum, context)
 	if err != nil {
-		return err
-	}
-
-	for response.Next() {
-		response, err := response.Read()
-		if err != nil {
-			return err
-		}
-
-		switch response.Event {
-		case Error:
-			return fmt.Errorf("error: %s", response.Message)
-		case LambdaUrl:
-			r.LambdaURL = response.Message
-		case Lambda:
-			r.LambdaName = response.Message
-		case Info:
-			r.Log(fmt.Sprintf("installing: %s", response.Message))
-		}
+		return nil, err
 	}
 
-	return nil
+	return response, nil
 }

+ 25 - 2
internal/server/controllers/plugins.go

@@ -5,10 +5,12 @@ import (
 
 	"github.com/gin-gonic/gin"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
+	"github.com/langgenius/dify-plugin-daemon/internal/service"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
 )
 
 func GetAsset(c *gin.Context) {
-	plugin_manager := plugin_manager.GetGlobalPluginManager()
+	plugin_manager := plugin_manager.Manager()
 	asset, err := plugin_manager.GetAsset(c.Param("id"))
 
 	if err != nil {
@@ -19,7 +21,28 @@ func GetAsset(c *gin.Context) {
 	c.Data(http.StatusOK, "application/octet-stream", asset)
 }
 
-func InstallPlugin(c *gin.Context) {
+func InstallPlugin(app *app.Config) gin.HandlerFunc {
+	return func(c *gin.Context) {
+		dify_pkg_file_header, err := c.FormFile("dify_pkg")
+		if err != nil {
+			c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
+			return
+		}
+
+		if dify_pkg_file_header.Size > app.MaxPluginPackageSize {
+			c.JSON(http.StatusRequestEntityTooLarge, gin.H{"error": "File size exceeds the maximum limit"})
+			return
+		}
+
+		dify_pkg_file, err := dify_pkg_file_header.Open()
+		if err != nil {
+			c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
+			return
+		}
+		defer dify_pkg_file.Close()
+
+		service.InstallPlugin(c, dify_pkg_file)
+	}
 }
 
 func UninstallPlugin(c *gin.Context) {

+ 1 - 1
internal/server/http_server.go

@@ -101,7 +101,7 @@ func (app *App) pluginGroup(group *gin.RouterGroup, config *app.Config) {
 	group.Use(CheckingKey(config.PluginInnerApiKey))
 
 	group.GET("/asset/:id", controllers.GetAsset)
-	group.POST("/install", controllers.InstallPlugin)
+	group.POST("/install", controllers.InstallPlugin(config))
 	group.POST("/uninstall", controllers.UninstallPlugin)
 	group.GET("/list", controllers.ListPlugins)
 }

+ 1 - 1
internal/server/server.go

@@ -23,7 +23,7 @@ func (app *App) Run(config *app.Config) {
 	process.Init(config)
 
 	// init plugin daemon
-	plugin_manager.InitGlobalPluginManager(app.cluster, config)
+	plugin_manager.InitManager(app.cluster, config)
 
 	// init persistence
 	persistence.InitPersistence(config)

+ 1 - 1
internal/service/endpoint.go

@@ -39,7 +39,7 @@ func Endpoint(
 	}
 
 	// fetch plugin
-	manager := plugin_manager.GetGlobalPluginManager()
+	manager := plugin_manager.Manager()
 	runtime := manager.Get(
 		plugin_entities.PluginUniqueIdentifier(plugin_installation.PluginUniqueIdentifier),
 	)

+ 36 - 0
internal/service/install.go

@@ -1 +1,37 @@
 package service
+
+import (
+	"io"
+	"mime/multipart"
+
+	"github.com/gin-gonic/gin"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/installer"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
+)
+
+func InstallPlugin(c *gin.Context, dify_pkg_file multipart.File) {
+	plugin_manager := plugin_manager.Manager()
+
+	plugin_file, err := io.ReadAll(dify_pkg_file)
+	if err != nil {
+		c.JSON(200, entities.NewErrorResponse(-500, err.Error()))
+		return
+	}
+
+	decoder, err := decoder.NewZipPluginDecoder(plugin_file)
+	if err != nil {
+		c.JSON(200, entities.NewErrorResponse(-500, err.Error()))
+		return
+	}
+
+	baseSSEService(
+		func() (*stream.Stream[installer.PluginInstallResponse], error) {
+			return plugin_manager.Install(decoder)
+		},
+		c,
+		3600,
+	)
+}

+ 1 - 1
internal/service/invoke_tool.go

@@ -18,7 +18,7 @@ func createSession[T any](
 	access_action access_types.PluginAccessAction,
 	cluster_id string,
 ) (*session_manager.Session, error) {
-	runtime := plugin_manager.GetGlobalPluginManager().Get(r.PluginUniqueIdentifier)
+	runtime := plugin_manager.Manager().Get(r.PluginUniqueIdentifier)
 
 	session := session_manager.NewSession(
 		r.TenantId,