Browse Source

feat: support install pkg

Yeuoly 11 months ago
parent
commit
26c98695fb

+ 1 - 2
cmd/commandline/plugin.go

@@ -6,7 +6,6 @@ import (
 	"path/filepath"
 
 	init_pkg "github.com/langgenius/dify-plugin-daemon/cmd/commandline/init"
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/checksum"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/packager"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
@@ -103,7 +102,7 @@ var (
 				return
 			}
 
-			checksum, err := checksum.CalculateChecksum(plugin_decoder)
+			checksum, err := plugin_decoder.Checksum()
 			if err != nil {
 				log.Error("failed to calculate checksum, plugin path: %s, error: %v", plugin_path, err)
 				return

+ 82 - 3
internal/core/plugin_manager/install.go

@@ -3,6 +3,10 @@ package plugin_manager
 import (
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/serverless"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
+	"github.com/langgenius/dify-plugin-daemon/internal/db"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/models"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/models/curd"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
 )
@@ -21,16 +25,34 @@ type PluginInstallResponse struct {
 }
 
 // InstallToAWSFromPkg installs a plugin to AWS Lambda
-func (p *PluginManager) InstallToAWSFromPkg(decoder decoder.PluginDecoder) (
+func (p *PluginManager) InstallToAWSFromPkg(tenant_id string, decoder decoder.PluginDecoder) (
 	*stream.Stream[PluginInstallResponse], error,
 ) {
+	checksum, err := decoder.Checksum()
+	if err != nil {
+		return nil, err
+	}
+	declaration, err := decoder.Manifest()
+	if err != nil {
+		return nil, err
+	}
+	unique_identity, err := decoder.UniqueIdentity()
+	if err != nil {
+		return nil, err
+	}
+
 	response, err := serverless.UploadPlugin(decoder)
 	if err != nil {
 		return nil, err
 	}
 
-	new_response := stream.NewStream[PluginInstallResponse](2)
+	new_response := stream.NewStream[PluginInstallResponse](128)
 	routine.Submit(func() {
+		defer new_response.Close()
+
+		lambda_url := ""
+		lambda_function_name := ""
+
 		response.Async(func(r serverless.LaunchAWSLambdaFunctionResponse) {
 			if r.Event == serverless.Info {
 				new_response.Write(PluginInstallResponse{
@@ -38,6 +60,58 @@ func (p *PluginManager) InstallToAWSFromPkg(decoder decoder.PluginDecoder) (
 					Data:  "Installing...",
 				})
 			} else if r.Event == serverless.Done {
+				if lambda_url == "" || lambda_function_name == "" {
+					new_response.Write(PluginInstallResponse{
+						Event: PluginInstallEventError,
+						Data:  "Internal server error, failed to get lambda url or function name",
+					})
+					return
+				}
+				// check if the plugin is already installed
+				_, err := db.GetOne[models.ServerlessRuntime](
+					db.Equal("checksum", checksum),
+					db.Equal("type", string(models.SERVERLESS_RUNTIME_TYPE_AWS_LAMBDA)),
+				)
+				if err == db.ErrDatabaseNotFound {
+					// create a new serverless runtime
+					serverless_model := &models.ServerlessRuntime{
+						Checksum:               checksum,
+						Type:                   models.SERVERLESS_RUNTIME_TYPE_AWS_LAMBDA,
+						FunctionURL:            lambda_url,
+						FunctionName:           lambda_function_name,
+						PluginUniqueIdentifier: unique_identity.String(),
+					}
+					serverless_model.SetDeclaration(&declaration)
+					err = db.Create(serverless_model)
+					if err != nil {
+						new_response.Write(PluginInstallResponse{
+							Event: PluginInstallEventError,
+							Data:  "Failed to create serverless runtime",
+						})
+						return
+					}
+				} else if err != nil {
+					new_response.Write(PluginInstallResponse{
+						Event: PluginInstallEventError,
+						Data:  "Failed to check if the plugin is already installed",
+					})
+					return
+				}
+
+				_, _, err = curd.CreatePlugin(
+					tenant_id,
+					unique_identity,
+					plugin_entities.PLUGIN_RUNTIME_TYPE_AWS,
+					&declaration,
+				)
+				if err != nil {
+					new_response.Write(PluginInstallResponse{
+						Event: PluginInstallEventError,
+						Data:  "Failed to create plugin",
+					})
+					return
+				}
+
 				new_response.Write(PluginInstallResponse{
 					Event: PluginInstallEventDone,
 					Data:  "Installed",
@@ -45,7 +119,12 @@ func (p *PluginManager) InstallToAWSFromPkg(decoder decoder.PluginDecoder) (
 			} else if r.Event == serverless.Error {
 				new_response.Write(PluginInstallResponse{
 					Event: PluginInstallEventError,
+					Data:  "Internal server error",
 				})
+			} else if r.Event == serverless.LambdaUrl {
+				lambda_url = r.Message
+			} else if r.Event == serverless.Lambda {
+				lambda_function_name = r.Message
 			}
 		})
 	})
@@ -54,7 +133,7 @@ func (p *PluginManager) InstallToAWSFromPkg(decoder decoder.PluginDecoder) (
 }
 
 // InstallToLocal installs a plugin to local
-func (p *PluginManager) InstallToLocal(decoder decoder.PluginDecoder) (
+func (p *PluginManager) InstallToLocal(tenant_id string, decoder decoder.PluginDecoder) (
 	*stream.Stream[PluginInstallResponse], error,
 ) {
 	return nil, nil

+ 1 - 1
internal/core/plugin_manager/manager.go

@@ -35,7 +35,7 @@ type PluginManager struct {
 	// serverless runtime
 
 	// Install is a function that installs a plugin to the platform
-	Install func(decoder decoder.PluginDecoder) (*stream.Stream[PluginInstallResponse], error)
+	Install func(tenant_id string, decoder decoder.PluginDecoder) (*stream.Stream[PluginInstallResponse], error)
 }
 
 var (

+ 1 - 3
internal/core/plugin_manager/positive_manager/environment.go

@@ -2,12 +2,10 @@ package positive_manager
 
 import (
 	"os"
-
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/checksum"
 )
 
 func (r *PositivePluginRuntime) calculateChecksum() (string, error) {
-	checksum, err := checksum.CalculateChecksum(r.Decoder)
+	checksum, err := r.Decoder.Checksum()
 	if err != nil {
 		return "", err
 	}

+ 1 - 2
internal/core/plugin_manager/serverless/upload.go

@@ -4,7 +4,6 @@ import (
 	"os"
 	"time"
 
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/checksum"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
@@ -17,7 +16,7 @@ var (
 // UploadPlugin uploads the plugin to the AWS Lambda
 // return the lambda url and name
 func UploadPlugin(decoder decoder.PluginDecoder) (*stream.Stream[LaunchAWSLambdaFunctionResponse], error) {
-	checksum, err := checksum.CalculateChecksum(decoder)
+	checksum, err := decoder.Checksum()
 	if err != nil {
 		return nil, err
 	}

+ 1 - 2
internal/core/plugin_manager/watcher.go

@@ -12,7 +12,6 @@ import (
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/local_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/remote_manager"
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/checksum"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/verifier"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
@@ -173,7 +172,7 @@ func (p *PluginManager) loadPlugin(plugin_path string) (*pluginRuntimeWithDecode
 		return nil, errors.Join(fmt.Errorf("plugin already exists: %s", manifest.Identity()), err)
 	}
 
-	checksum, err := checksum.CalculateChecksum(decoder)
+	checksum, err := decoder.Checksum()
 	if err != nil {
 		return nil, errors.Join(err, fmt.Errorf("calculate checksum error"))
 	}

+ 2 - 4
internal/core/plugin_packager/checksum/checksum.go

@@ -1,15 +1,13 @@
-package checksum
+package decoder
 
 import (
 	"crypto/sha256"
 	"encoding/hex"
 	"path"
 	"slices"
-
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
 )
 
-func CalculateChecksum(plugin decoder.PluginDecoder) (string, error) {
+func CalculateChecksum(plugin PluginDecoder) (string, error) {
 	m := map[string][]byte{}
 
 	sha256 := func(data []byte) []byte {

+ 37 - 0
internal/core/plugin_packager/decoder/decoder.go

@@ -52,10 +52,17 @@ type PluginDecoder interface {
 
 	// Assets returns a map of assets, the key is the filename, the value is the content
 	Assets() (map[string][]byte, error)
+
+	// UniqueIdentity returns the unique identity of the plugin
+	UniqueIdentity() (plugin_entities.PluginUniqueIdentifier, error)
+
+	// Checksum returns the checksum of the plugin
+	Checksum() (string, error)
 }
 
 type PluginDecoderHelper struct {
 	pluginDeclaration *plugin_entities.PluginDeclaration
+	checksum          string
 }
 
 func (p *PluginDecoderHelper) Manifest(decoder PluginDecoder) (plugin_entities.PluginDeclaration, error) {
@@ -136,3 +143,33 @@ func (p *PluginDecoderHelper) Assets(decoder PluginDecoder) (map[string][]byte,
 
 	return assets, nil
 }
+
+func (p *PluginDecoderHelper) Checksum(decoder PluginDecoder) (string, error) {
+	if p.checksum != "" {
+		return p.checksum, nil
+	}
+
+	var err error
+
+	p.checksum, err = CalculateChecksum(decoder)
+	if err != nil {
+		return "", err
+	}
+
+	return p.checksum, nil
+}
+
+func (p *PluginDecoderHelper) UniqueIdentity(decoder PluginDecoder) (plugin_entities.PluginUniqueIdentifier, error) {
+	manifest, err := decoder.Manifest()
+	if err != nil {
+		return plugin_entities.PluginUniqueIdentifier(""), err
+	}
+
+	identity := manifest.Identity()
+	checksum, err := decoder.Checksum()
+	if err != nil {
+		return plugin_entities.PluginUniqueIdentifier(""), err
+	}
+
+	return plugin_entities.PluginUniqueIdentifier(fmt.Sprintf("%s-%s", identity, checksum)), nil
+}

+ 8 - 0
internal/core/plugin_packager/decoder/fs.go

@@ -178,3 +178,11 @@ func (d *FSPluginDecoder) Manifest() (plugin_entities.PluginDeclaration, error)
 func (d *FSPluginDecoder) Assets() (map[string][]byte, error) {
 	return d.PluginDecoderHelper.Assets(d)
 }
+
+func (d *FSPluginDecoder) Checksum() (string, error) {
+	return d.PluginDecoderHelper.Checksum(d)
+}
+
+func (d *FSPluginDecoder) UniqueIdentity() (plugin_entities.PluginUniqueIdentifier, error) {
+	return d.PluginDecoderHelper.UniqueIdentity(d)
+}

+ 8 - 0
internal/core/plugin_packager/decoder/zip.go

@@ -187,3 +187,11 @@ func (z *ZipPluginDecoder) Manifest() (plugin_entities.PluginDeclaration, error)
 func (z *ZipPluginDecoder) Assets() (map[string][]byte, error) {
 	return z.PluginDecoderHelper.Assets(z)
 }
+
+func (z *ZipPluginDecoder) Checksum() (string, error) {
+	return z.PluginDecoderHelper.Checksum(z)
+}
+
+func (z *ZipPluginDecoder) UniqueIdentity() (plugin_entities.PluginUniqueIdentifier, error) {
+	return z.PluginDecoderHelper.UniqueIdentity(z)
+}

+ 1 - 1
internal/service/install.go

@@ -28,7 +28,7 @@ func InstallPluginFromPkg(c *gin.Context, tenant_id string, dify_pkg_file multip
 
 	baseSSEService(
 		func() (*stream.Stream[plugin_manager.PluginInstallResponse], error) {
-			return manager.Install(decoder)
+			return manager.Install(tenant_id, decoder)
 		},
 		c,
 		3600,

+ 0 - 1
internal/service/install_service/state.go

@@ -24,7 +24,6 @@ func InstallPlugin(
 	configuration := runtime.Configuration()
 	plugin, installation, err := curd.CreatePlugin(
 		tenant_id,
-		user_id,
 		identity,
 		runtime.Type(),
 		configuration,

+ 0 - 1
internal/types/models/curd/atomic.go

@@ -15,7 +15,6 @@ import (
 // if the plugin has been created before, return the plugin which has been created before
 func CreatePlugin(
 	tenant_id string,
-	user_id string,
 	plugin_unique_identifier plugin_entities.PluginUniqueIdentifier,
 	install_type plugin_entities.PluginRuntimeType,
 	declaration *plugin_entities.PluginDeclaration,