Przeglądaj źródła

refactor: checksum

Yeuoly 10 miesięcy temu
rodzic
commit
1df861f9fc

+ 2 - 2
internal/cluster/plugin_test.go

@@ -19,8 +19,8 @@ func (r *fakePlugin) InitEnvironment() error {
 	return nil
 }
 
-func (r *fakePlugin) Checksum() string {
-	return ""
+func (r *fakePlugin) Checksum() (string, error) {
+	return "", nil
 }
 
 func (r *fakePlugin) Identity() (plugin_entities.PluginUniqueIdentifier, error) {

+ 5 - 1
internal/core/plugin_manager/aws_manager/environment.go

@@ -25,5 +25,9 @@ func (r *AWSPluginRuntime) InitEnvironment() error {
 }
 
 func (r *AWSPluginRuntime) Identity() (plugin_entities.PluginUniqueIdentifier, error) {
-	return plugin_entities.PluginUniqueIdentifier(fmt.Sprintf("%s@%s", r.Config.Identity(), r.Checksum())), nil
+	checksum, err := r.Checksum()
+	if err != nil {
+		return "", err
+	}
+	return plugin_entities.PluginUniqueIdentifier(fmt.Sprintf("%s@%s", r.Config.Identity(), checksum)), nil
 }

+ 53 - 3
internal/core/plugin_manager/install.go

@@ -1,11 +1,61 @@
 package plugin_manager
 
 import (
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/installer"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/serverless"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
 )
 
-func (p *PluginManager) Install(decoder decoder.PluginDecoder) (*stream.Stream[installer.PluginInstallResponse], error) {
-	return p.installer(decoder)
+type PluginInstallEvent string
+
+const (
+	PluginInstallEventInfo  PluginInstallEvent = "info"
+	PluginInstallEventDone  PluginInstallEvent = "done"
+	PluginInstallEventError PluginInstallEvent = "error"
+)
+
+type PluginInstallResponse struct {
+	Event PluginInstallEvent `json:"event"`
+	Data  string             `json:"data"`
+}
+
+// InstallToAWSFromPkg installs a plugin to AWS Lambda
+func (p *PluginManager) InstallToAWSFromPkg(decoder decoder.PluginDecoder) (
+	*stream.Stream[PluginInstallResponse], error,
+) {
+	response, err := serverless.UploadPlugin(decoder)
+	if err != nil {
+		return nil, err
+	}
+
+	new_response := stream.NewStream[PluginInstallResponse](2)
+	routine.Submit(func() {
+		response.Async(func(r serverless.LaunchAWSLambdaFunctionResponse) {
+			if r.Event == serverless.Info {
+				new_response.Write(PluginInstallResponse{
+					Event: PluginInstallEventInfo,
+					Data:  "Installing...",
+				})
+			} else if r.Event == serverless.Done {
+				new_response.Write(PluginInstallResponse{
+					Event: PluginInstallEventDone,
+					Data:  "Installed",
+				})
+			} else if r.Event == serverless.Error {
+				new_response.Write(PluginInstallResponse{
+					Event: PluginInstallEventError,
+				})
+			}
+		})
+	})
+
+	return new_response, nil
+}
+
+// InstallToLocal installs a plugin to local
+func (p *PluginManager) InstallToLocal(decoder decoder.PluginDecoder) (
+	*stream.Stream[PluginInstallResponse], error,
+) {
+	return nil, nil
 }

+ 0 - 10
internal/core/plugin_manager/installer/aws.go

@@ -1,10 +0,0 @@
-package installer
-
-import (
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
-)
-
-func AwsInstaller(decoder decoder.PluginDecoder) (*stream.Stream[PluginInstallResponse], error) {
-	return nil, nil
-}

+ 0 - 13
internal/core/plugin_manager/installer/installer.go

@@ -1,13 +0,0 @@
-package installer
-
-import (
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
-)
-
-type PluginInstallResponse struct {
-	Event string `json:"event"`
-	Data  string `json:"data"`
-}
-
-type Installer func(decoder decoder.PluginDecoder) (*stream.Stream[PluginInstallResponse], error)

+ 0 - 10
internal/core/plugin_manager/installer/local.go

@@ -1,10 +0,0 @@
-package installer
-
-import (
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
-)
-
-func LocalInstaller(decoder decoder.PluginDecoder) (*stream.Stream[PluginInstallResponse], error) {
-	return nil, nil
-}

+ 5 - 1
internal/core/plugin_manager/local_manager/environment.go

@@ -36,5 +36,9 @@ func (r *LocalPluginRuntime) InitEnvironment() error {
 }
 
 func (r *LocalPluginRuntime) Identity() (plugin_entities.PluginUniqueIdentifier, error) {
-	return plugin_entities.PluginUniqueIdentifier(fmt.Sprintf("%s@%s", r.Config.Identity(), r.Checksum())), nil
+	checksum, err := r.Checksum()
+	if err != nil {
+		return "", err
+	}
+	return plugin_entities.PluginUniqueIdentifier(fmt.Sprintf("%s@%s", r.Config.Identity(), checksum)), nil
 }

+ 6 - 4
internal/core/plugin_manager/manager.go

@@ -5,15 +5,16 @@ import (
 
 	"github.com/langgenius/dify-plugin-daemon/internal/cluster"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/installer"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/media_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/serverless"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/lock"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/mapping"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
 )
 
 type PluginManager struct {
@@ -33,7 +34,8 @@ type PluginManager struct {
 	startProcessLock *lock.HighGranularityLock
 	// serverless runtime
 
-	installer installer.Installer
+	// Install is a function that installs a plugin to the platform
+	Install func(decoder decoder.PluginDecoder) (*stream.Stream[PluginInstallResponse], error)
 }
 
 var (
@@ -53,10 +55,10 @@ func InitManager(cluster *cluster.Cluster, configuration *app.Config) {
 	}
 
 	if configuration.Platform == app.PLATFORM_AWS_LAMBDA {
-		manager.installer = installer.AwsInstaller
+		manager.Install = manager.InstallToAWSFromPkg
 		serverless.Init(configuration)
 	} else if configuration.Platform == app.PLATFORM_LOCAL {
-		manager.installer = installer.LocalInstaller
+		manager.Install = manager.InstallToLocal
 	}
 
 	manager.Init(configuration)

+ 11 - 13
internal/core/plugin_manager/positive_manager/environment.go

@@ -4,29 +4,27 @@ import (
 	"os"
 
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/checksum"
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
 )
 
-func (r *PositivePluginRuntime) calculateChecksum() string {
-	plugin_decoder, err := decoder.NewFSPluginDecoder(r.LocalPackagePath)
+func (r *PositivePluginRuntime) calculateChecksum() (string, error) {
+	checksum, err := checksum.CalculateChecksum(r.Decoder)
 	if err != nil {
-		return ""
+		return "", err
 	}
 
-	checksum, err := checksum.CalculateChecksum(plugin_decoder)
-	if err != nil {
-		return ""
-	}
-
-	return checksum
+	return checksum, nil
 }
 
-func (r *PositivePluginRuntime) Checksum() string {
+func (r *PositivePluginRuntime) Checksum() (string, error) {
 	if r.InnerChecksum == "" {
-		r.InnerChecksum = r.calculateChecksum()
+		checksum, err := r.calculateChecksum()
+		if err != nil {
+			return "", err
+		}
+		r.InnerChecksum = checksum
 	}
 
-	return r.InnerChecksum
+	return r.InnerChecksum, nil
 }
 
 func (r *PositivePluginRuntime) Cleanup() {

+ 2 - 1
internal/core/plugin_manager/remote_manager/environment.go

@@ -9,7 +9,8 @@ import (
 
 func (r *RemotePluginRuntime) Identity() (plugin_entities.PluginUniqueIdentifier, error) {
 	identity := strings.Join([]string{r.Configuration().Identity(), r.tenant_id}, ":")
-	return plugin_entities.PluginUniqueIdentifier(fmt.Sprintf("%s@%s", identity, r.Checksum())), nil
+	checksum, _ := r.Checksum()
+	return plugin_entities.PluginUniqueIdentifier(fmt.Sprintf("%s@%s", identity, checksum)), nil
 }
 
 func (r *RemotePluginRuntime) Cleanup() {

+ 2 - 2
internal/core/plugin_manager/remote_manager/run.go

@@ -99,6 +99,6 @@ func (r *RemotePluginRuntime) Wait() (<-chan bool, error) {
 	return r.shutdown_chan, nil
 }
 
-func (r *RemotePluginRuntime) Checksum() string {
-	return r.checksum
+func (r *RemotePluginRuntime) Checksum() (string, error) {
+	return r.checksum, nil
 }

+ 23 - 5
internal/server/controllers/plugins.go

@@ -7,6 +7,8 @@ import (
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/service"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 )
 
 func GetAsset(c *gin.Context) {
@@ -21,27 +23,43 @@ func GetAsset(c *gin.Context) {
 	c.Data(http.StatusOK, "application/octet-stream", asset)
 }
 
-func InstallPlugin(app *app.Config) gin.HandlerFunc {
+func InstallPluginFromPkg(app *app.Config) gin.HandlerFunc {
 	return func(c *gin.Context) {
 		dify_pkg_file_header, err := c.FormFile("dify_pkg")
 		if err != nil {
-			c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
+			c.JSON(http.StatusOK, entities.NewErrorResponse(-400, err.Error()))
+			return
+		}
+
+		tenant_id := c.PostForm("tenant_id")
+		if tenant_id == "" {
+			c.JSON(http.StatusOK, entities.NewErrorResponse(-400, "Tenant ID is required"))
 			return
 		}
 
 		if dify_pkg_file_header.Size > app.MaxPluginPackageSize {
-			c.JSON(http.StatusRequestEntityTooLarge, gin.H{"error": "File size exceeds the maximum limit"})
+			c.JSON(http.StatusOK, entities.NewErrorResponse(-413, "File size exceeds the maximum limit"))
 			return
 		}
 
 		dify_pkg_file, err := dify_pkg_file_header.Open()
 		if err != nil {
-			c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
+			c.JSON(http.StatusOK, entities.NewErrorResponse(-500, err.Error()))
 			return
 		}
 		defer dify_pkg_file.Close()
 
-		service.InstallPlugin(c, dify_pkg_file)
+		service.InstallPluginFromPkg(c, tenant_id, dify_pkg_file)
+	}
+}
+
+func InstallPluginFromIdentifier(app *app.Config) gin.HandlerFunc {
+	return func(c *gin.Context) {
+		BindRequestWithPluginUniqueIdentifier(c, func(request struct {
+			TenantID string `json:"tenant_id" binding:"required"`
+		}, identifier plugin_entities.PluginUniqueIdentifier) {
+			// TODO
+		})
 	}
 }
 

+ 1 - 1
internal/server/http_server.go

@@ -101,7 +101,7 @@ func (app *App) pluginGroup(group *gin.RouterGroup, config *app.Config) {
 	group.Use(CheckingKey(config.PluginInnerApiKey))
 
 	group.GET("/asset/:id", controllers.GetAsset)
-	group.POST("/install", controllers.InstallPlugin(config))
+	group.POST("/install", controllers.InstallPluginFromPkg(config))
 	group.POST("/uninstall", controllers.UninstallPlugin)
 	group.GET("/list", controllers.ListPlugins)
 }

+ 4 - 5
internal/service/install.go

@@ -6,14 +6,13 @@ import (
 
 	"github.com/gin-gonic/gin"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/installer"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
 )
 
-func InstallPlugin(c *gin.Context, dify_pkg_file multipart.File) {
-	plugin_manager := plugin_manager.Manager()
+func InstallPluginFromPkg(c *gin.Context, tenant_id string, dify_pkg_file multipart.File) {
+	manager := plugin_manager.Manager()
 
 	plugin_file, err := io.ReadAll(dify_pkg_file)
 	if err != nil {
@@ -28,8 +27,8 @@ func InstallPlugin(c *gin.Context, dify_pkg_file multipart.File) {
 	}
 
 	baseSSEService(
-		func() (*stream.Stream[installer.PluginInstallResponse], error) {
-			return plugin_manager.Install(decoder)
+		func() (*stream.Stream[plugin_manager.PluginInstallResponse], error) {
+			return manager.Install(decoder)
 		},
 		c,
 		3600,

+ 1 - 1
internal/types/entities/plugin_entities/runtime.go

@@ -101,7 +101,7 @@ type (
 		// hashed identity of the plugin
 		HashedIdentity() (string, error)
 		// returns the checksum of the plugin
-		Checksum() string
+		Checksum() (string, error)
 	}
 )
 

+ 1 - 1
internal/utils/stream/response.go

@@ -118,7 +118,7 @@ func (r *Stream[T]) Async(fn func(T)) error {
 	return nil
 }
 
-// Write writes data to the stream
+// Write writes data to the stream,
 // returns error if the buffer is full
 func (r *Stream[T]) Write(data T) error {
 	if atomic.LoadInt32(&r.closed) == 1 {