123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147 |
- package plugin_manager
- import (
- "fmt"
- "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/serverless"
- "github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
- "github.com/langgenius/dify-plugin-daemon/internal/db"
- "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
- "github.com/langgenius/dify-plugin-daemon/internal/types/models"
- "github.com/langgenius/dify-plugin-daemon/internal/types/models/curd"
- "github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
- "github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
- )
- type PluginInstallEvent string
- const (
- PluginInstallEventInfo PluginInstallEvent = "info"
- PluginInstallEventDone PluginInstallEvent = "done"
- PluginInstallEventError PluginInstallEvent = "error"
- )
- type PluginInstallResponse struct {
- Event PluginInstallEvent `json:"event"`
- Data string `json:"data"`
- }
- // InstallToAWSFromPkg installs a plugin to AWS Lambda
- func (p *PluginManager) InstallToAWSFromPkg(tenant_id string, decoder decoder.PluginDecoder) (
- *stream.Stream[PluginInstallResponse], error,
- ) {
- checksum, err := decoder.Checksum()
- if err != nil {
- return nil, err
- }
- declaration, err := decoder.Manifest()
- if err != nil {
- return nil, err
- }
- unique_identity, err := decoder.UniqueIdentity()
- if err != nil {
- return nil, err
- }
- response, err := serverless.UploadPlugin(decoder)
- if err != nil {
- return nil, err
- }
- new_response := stream.NewStream[PluginInstallResponse](128)
- routine.Submit(func() {
- defer func() {
- new_response.Close()
- }()
- lambda_url := ""
- lambda_function_name := ""
- response.Async(func(r serverless.LaunchAWSLambdaFunctionResponse) {
- if r.Event == serverless.Info {
- new_response.Write(PluginInstallResponse{
- Event: PluginInstallEventInfo,
- Data: "Installing...",
- })
- } else if r.Event == serverless.Done {
- if lambda_url == "" || lambda_function_name == "" {
- new_response.Write(PluginInstallResponse{
- Event: PluginInstallEventError,
- Data: "Internal server error, failed to get lambda url or function name",
- })
- return
- }
- // check if the plugin is already installed
- _, err := db.GetOne[models.ServerlessRuntime](
- db.Equal("checksum", checksum),
- db.Equal("type", string(models.SERVERLESS_RUNTIME_TYPE_AWS_LAMBDA)),
- )
- if err == db.ErrDatabaseNotFound {
- // create a new serverless runtime
- serverless_model := &models.ServerlessRuntime{
- Checksum: checksum,
- Type: models.SERVERLESS_RUNTIME_TYPE_AWS_LAMBDA,
- FunctionURL: lambda_url,
- FunctionName: lambda_function_name,
- PluginUniqueIdentifier: unique_identity.String(),
- Declaration: declaration,
- }
- err = db.Create(serverless_model)
- if err != nil {
- new_response.Write(PluginInstallResponse{
- Event: PluginInstallEventError,
- Data: "Failed to create serverless runtime",
- })
- return
- }
- } else if err != nil {
- new_response.Write(PluginInstallResponse{
- Event: PluginInstallEventError,
- Data: "Failed to check if the plugin is already installed",
- })
- return
- }
- _, _, err = curd.CreatePlugin(
- tenant_id,
- unique_identity,
- plugin_entities.PLUGIN_RUNTIME_TYPE_AWS,
- &declaration,
- )
- if err != nil {
- new_response.Write(PluginInstallResponse{
- Event: PluginInstallEventError,
- Data: "Failed to create plugin",
- })
- return
- }
- new_response.Write(PluginInstallResponse{
- Event: PluginInstallEventDone,
- Data: "Installed",
- })
- } else if r.Event == serverless.Error {
- new_response.Write(PluginInstallResponse{
- Event: PluginInstallEventError,
- Data: "Internal server error",
- })
- } else if r.Event == serverless.LambdaUrl {
- lambda_url = r.Message
- } else if r.Event == serverless.Lambda {
- lambda_function_name = r.Message
- } else {
- new_response.WriteError(fmt.Errorf("unknown event: %s, with message: %s", r.Event, r.Message))
- }
- })
- })
- return new_response, nil
- }
- // InstallToLocal installs a plugin to local
- func (p *PluginManager) InstallToLocal(tenant_id string, decoder decoder.PluginDecoder) (
- *stream.Stream[PluginInstallResponse], error,
- ) {
- return nil, nil
- }
|