123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121 |
- package plugin_manager
- import (
- "fmt"
- serverless "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/serverless_connector"
- "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
- "github.com/langgenius/dify-plugin-daemon/internal/db"
- "github.com/langgenius/dify-plugin-daemon/internal/types/models"
- "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
- "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
- )
- // InstallToAWSFromPkg installs a plugin to AWS Lambda
- func (p *PluginManager) InstallToAWSFromPkg(
- decoder decoder.PluginDecoder,
- source string,
- meta map[string]any,
- ) (
- *stream.Stream[PluginInstallResponse], error,
- ) {
- checksum, err := decoder.Checksum()
- if err != nil {
- return nil, err
- }
- declaration, err := decoder.Manifest()
- if err != nil {
- return nil, err
- }
- uniqueIdentity, err := decoder.UniqueIdentity()
- if err != nil {
- return nil, err
- }
- response, err := serverless.UploadPlugin(decoder)
- if err != nil {
- return nil, err
- }
- newResponse := stream.NewStream[PluginInstallResponse](128)
- routine.Submit(map[string]string{
- "module": "plugin_manager",
- "function": "InstallToAWSFromPkg",
- "checksum": checksum,
- "unique_identity": uniqueIdentity.String(),
- "source": source,
- }, func() {
- defer func() {
- newResponse.Close()
- }()
- lambdaUrl := ""
- lambdaFunctionName := ""
- response.Async(func(r serverless.LaunchAWSLambdaFunctionResponse) {
- if r.Event == serverless.Info {
- newResponse.Write(PluginInstallResponse{
- Event: PluginInstallEventInfo,
- Data: "Installing...",
- })
- } else if r.Event == serverless.Done {
- if lambdaUrl == "" || lambdaFunctionName == "" {
- newResponse.Write(PluginInstallResponse{
- Event: PluginInstallEventError,
- Data: "Internal server error, failed to get lambda url or function name",
- })
- return
- }
- // check if the plugin is already installed
- _, err := db.GetOne[models.ServerlessRuntime](
- db.Equal("checksum", checksum),
- db.Equal("type", string(models.SERVERLESS_RUNTIME_TYPE_AWS_LAMBDA)),
- )
- if err == db.ErrDatabaseNotFound {
- // create a new serverless runtime
- serverlessModel := &models.ServerlessRuntime{
- Checksum: checksum,
- Type: models.SERVERLESS_RUNTIME_TYPE_AWS_LAMBDA,
- FunctionURL: lambdaUrl,
- FunctionName: lambdaFunctionName,
- PluginUniqueIdentifier: uniqueIdentity.String(),
- Declaration: declaration,
- }
- err = db.Create(serverlessModel)
- if err != nil {
- newResponse.Write(PluginInstallResponse{
- Event: PluginInstallEventError,
- Data: "Failed to create serverless runtime",
- })
- return
- }
- } else if err != nil {
- newResponse.Write(PluginInstallResponse{
- Event: PluginInstallEventError,
- Data: "Failed to check if the plugin is already installed",
- })
- return
- }
- newResponse.Write(PluginInstallResponse{
- Event: PluginInstallEventDone,
- Data: "Installed",
- })
- } else if r.Event == serverless.Error {
- newResponse.Write(PluginInstallResponse{
- Event: PluginInstallEventError,
- Data: "Internal server error",
- })
- } else if r.Event == serverless.LambdaUrl {
- lambdaUrl = r.Message
- } else if r.Event == serverless.Lambda {
- lambdaFunctionName = r.Message
- } else {
- newResponse.WriteError(fmt.Errorf("unknown event: %s, with message: %s", r.Event, r.Message))
- }
- })
- })
- return newResponse, nil
- }
|