Pārlūkot izejas kodu

fix: switch persistence layer to aws s3

Yeuoly 8 mēneši atpakaļ
vecāks
revīzija
89714978e8

+ 3 - 3
.env.example

@@ -21,14 +21,14 @@ PLUGIN_STORAGE_OSS_BUCKET=
 PLUGIN_STORAGE_LOCAL_ROOT=./storage
 
 # where the plugin finally installed
-PLUGIN_INSTALLED_PATH=./plugin
+PLUGIN_INSTALLED_PATH=plugin
 
 # where the plugin finally running and working
-PLUGIN_WORKING_PATH=./cwd
+PLUGIN_WORKING_PATH=cwd
 
 # persistence storage
 PERSISTENCE_STORAGE_TYPE=local
-PERSISTENCE_STORAGE_PATH=./persistence
+PERSISTENCE_STORAGE_PATH=persistence
 PERSISTENCE_STORAGE_MAX_SIZE=104857600
 
 # plugin webhook

+ 4 - 3
internal/core/plugin_manager/install_to_local.go

@@ -26,7 +26,7 @@ func (p *PluginManager) InstallToLocal(
 		return nil, err
 	}
 
-	runtime, launchedChan, err := p.launchLocal(plugin_unique_identifier)
+	runtime, launchedChan, errChan, err := p.launchLocal(plugin_unique_identifier)
 	if err != nil {
 		return nil, err
 	}
@@ -56,9 +56,9 @@ func (p *PluginManager) InstallToLocal(
 				})
 				runtime.Stop()
 				return
-			case <-launchedChan:
-				// launched
+			case err := <-errChan:
 				if err != nil {
+					// if error occurs, stop the plugin
 					response.Write(PluginInstallResponse{
 						Event: PluginInstallEventError,
 						Data:  err.Error(),
@@ -66,6 +66,7 @@ func (p *PluginManager) InstallToLocal(
 					runtime.Stop()
 					return
 				}
+			case <-launchedChan:
 				response.Write(PluginInstallResponse{
 					Event: PluginInstallEventDone,
 					Data:  "Installed",

+ 179 - 0
internal/core/plugin_manager/launcher.go

@@ -0,0 +1,179 @@
+package plugin_manager
+
+import (
+	"errors"
+	"fmt"
+	"os"
+	"path"
+	"strings"
+
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_manager"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/local_manager"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
+)
+
+type pluginRuntimeWithDecoder struct {
+	runtime plugin_entities.PluginRuntime
+	decoder decoder.PluginDecoder
+}
+
+// extract plugin from package to working directory
+func (p *PluginManager) getLocalPluginRuntime(pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier) (
+	*pluginRuntimeWithDecoder,
+	error,
+) {
+	pluginZip, err := p.installedBucket.Get(pluginUniqueIdentifier)
+	if err != nil {
+		return nil, errors.Join(err, fmt.Errorf("get plugin package error"))
+	}
+
+	decoder, err := decoder.NewZipPluginDecoder(pluginZip)
+	if err != nil {
+		return nil, errors.Join(err, fmt.Errorf("create plugin decoder error"))
+	}
+
+	// get manifest
+	manifest, err := decoder.Manifest()
+	if err != nil {
+		return nil, errors.Join(err, fmt.Errorf("get plugin manifest error"))
+	}
+
+	checksum, err := decoder.Checksum()
+	if err != nil {
+		return nil, errors.Join(err, fmt.Errorf("calculate checksum error"))
+	}
+
+	identity := manifest.Identity()
+	identity = strings.ReplaceAll(identity, ":", "-")
+	pluginWorkingPath := path.Join(p.workingDirectory, fmt.Sprintf("%s@%s", identity, checksum))
+	return &pluginRuntimeWithDecoder{
+		runtime: plugin_entities.PluginRuntime{
+			Config: manifest,
+			State: plugin_entities.PluginRuntimeState{
+				Status:      plugin_entities.PLUGIN_RUNTIME_STATUS_PENDING,
+				Restarts:    0,
+				ActiveAt:    nil,
+				Verified:    manifest.Verified,
+				WorkingPath: pluginWorkingPath,
+			},
+		},
+		decoder: decoder,
+	}, nil
+}
+
+// launch a local plugin
+// returns a full duplex lifetime, a launched channel, an error channel, and an error
+// caller should always handle both the channels to avoid deadlock
+// 1. for launched channel, launch process will close the channel to notify the caller, just wait for it
+// 2. for error channel, it will be closed also, but no more error will be sent, caller should consume all errors
+func (p *PluginManager) launchLocal(pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier) (
+	plugin_entities.PluginFullDuplexLifetime, <-chan bool, <-chan error, error,
+) {
+	plugin, err := p.getLocalPluginRuntime(pluginUniqueIdentifier)
+	if err != nil {
+		return nil, nil, nil, err
+	}
+
+	identity, err := plugin.decoder.UniqueIdentity()
+	if err != nil {
+		return nil, nil, nil, err
+	}
+
+	// lock launch process
+	p.localPluginLaunchingLock.Lock(identity.String())
+	defer p.localPluginLaunchingLock.Unlock(identity.String())
+
+	// check if the plugin is already running
+	if lifetime, ok := p.m.Load(identity.String()); ok {
+		lifetime, ok := lifetime.(plugin_entities.PluginFullDuplexLifetime)
+		if !ok {
+			return nil, nil, nil, fmt.Errorf("plugin runtime not found")
+		}
+
+		// returns a closed channel to indicate the plugin is already running, no more waiting is needed
+		c := make(chan bool)
+		close(c)
+		errChan := make(chan error)
+		close(errChan)
+
+		return lifetime, c, errChan, nil
+	}
+
+	// extract plugin
+	decoder, ok := plugin.decoder.(*decoder.ZipPluginDecoder)
+	if !ok {
+		return nil, nil, nil, fmt.Errorf("plugin decoder is not a zip decoder")
+	}
+
+	// check if the working directory exists, if not, create it, otherwise, launch it directly
+	if _, err := os.Stat(plugin.runtime.State.WorkingPath); err != nil {
+		if err := decoder.ExtractTo(plugin.runtime.State.WorkingPath); err != nil {
+			return nil, nil, nil, errors.Join(err, fmt.Errorf("extract plugin to working directory error"))
+		}
+	}
+
+	success := false
+	failed := func(message string) error {
+		if !success {
+			os.RemoveAll(plugin.runtime.State.WorkingPath)
+		}
+		return errors.New(message)
+	}
+
+	// get assets
+	assets, err := plugin.decoder.Assets()
+	if err != nil {
+		return nil, nil, nil, failed(err.Error())
+	}
+
+	localPluginRuntime := local_manager.NewLocalPluginRuntime(p.pythonInterpreterPath)
+	localPluginRuntime.PluginRuntime = plugin.runtime
+	localPluginRuntime.PositivePluginRuntime = positive_manager.PositivePluginRuntime{
+		BasicPluginRuntime: basic_manager.NewBasicPluginRuntime(p.mediaBucket),
+		WorkingPath:        plugin.runtime.State.WorkingPath,
+		Decoder:            plugin.decoder,
+	}
+
+	if err := localPluginRuntime.RemapAssets(
+		&localPluginRuntime.Config,
+		assets,
+	); err != nil {
+		return nil, nil, nil, failed(errors.Join(err, fmt.Errorf("remap plugin assets error")).Error())
+	}
+
+	success = true
+
+	p.m.Store(identity.String(), localPluginRuntime)
+
+	// NOTE: you should always keep the size of the channel to 0
+	// we use this to synchronize the plugin launch process
+	launchedChan := make(chan bool)
+	errChan := make(chan error)
+
+	// local plugin
+	routine.Submit(func() {
+		defer func() {
+			if r := recover(); r != nil {
+				log.Error("plugin runtime panic: %v", r)
+			}
+			p.m.Delete(identity.String())
+		}()
+
+		// add max launching lock to prevent too many plugins launching at the same time
+		p.maxLaunchingLock <- true
+		routine.Submit(func() {
+			// wait for plugin launched
+			<-launchedChan
+			// release max launching lock
+			<-p.maxLaunchingLock
+		})
+
+		p.fullDuplexLifecycle(localPluginRuntime, launchedChan, errChan)
+	})
+
+	return localPluginRuntime, launchedChan, errChan, nil
+}

+ 45 - 5
internal/core/plugin_manager/lifetime.go

@@ -1,6 +1,8 @@
 package plugin_manager
 
 import (
+	"fmt"
+	"sync"
 	"time"
 
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
@@ -11,7 +13,11 @@ func (p *PluginManager) AddPluginRegisterHandler(handler func(r plugin_entities.
 	p.pluginRegisters = append(p.pluginRegisters, handler)
 }
 
-func (p *PluginManager) fullDuplexLifecycle(r plugin_entities.PluginFullDuplexLifetime, launched_chan chan error) {
+func (p *PluginManager) fullDuplexLifecycle(
+	r plugin_entities.PluginFullDuplexLifetime,
+	launchedChan chan bool,
+	errChan chan error,
+) {
 	configuration := r.Configuration()
 
 	log.Info("new plugin logged in: %s", configuration.Identity())
@@ -36,20 +42,54 @@ func (p *PluginManager) fullDuplexLifecycle(r plugin_entities.PluginFullDuplexLi
 	defer r.TriggerStop()
 
 	// try to init environment until succeed
-	for {
+	failedTimes := 0
+
+	// only notify launched once
+	once := sync.Once{}
+
+	for !r.Stopped() {
+		// notify launched if failed too many times
+		if failedTimes > 3 {
+			once.Do(func() {
+				if errChan != nil {
+					errChan <- fmt.Errorf(
+						"init environment for plugin %s failed too many times, "+
+							"you should consider the package is corrupted or your network is unstable",
+						configuration.Identity(),
+					)
+					close(errChan)
+				}
+
+				if launchedChan != nil {
+					close(launchedChan)
+				}
+			})
+		}
+
 		log.Info("init environment for plugin %s", configuration.Identity())
 		if err := r.InitEnvironment(); err != nil {
+			if r.Stopped() {
+				// plugin has been stopped, exit
+				break
+			}
 			log.Error("init environment failed: %s, retry in 30s", err.Error())
 			time.Sleep(30 * time.Second)
+			failedTimes++
 			continue
 		}
 		break
 	}
 
 	// notify launched
-	if launched_chan != nil {
-		close(launched_chan)
-	}
+	once.Do(func() {
+		if launchedChan != nil {
+			close(launchedChan)
+		}
+
+		if errChan != nil {
+			close(errChan)
+		}
+	})
 
 	// init environment successfully
 	// once succeed, we consider the plugin is installed successfully

+ 10 - 162
internal/core/plugin_manager/watcher.go

@@ -1,18 +1,10 @@
 package plugin_manager
 
 import (
-	"errors"
-	"fmt"
-	"os"
-	"path"
-	"strings"
 	"time"
 
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/local_manager"
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/remote_manager"
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
@@ -62,7 +54,7 @@ func (p *PluginManager) startRemoteWatcher(config *app.Config) {
 						}
 						p.m.Delete(identity.String())
 					}()
-					p.fullDuplexLifecycle(rpr, nil)
+					p.fullDuplexLifecycle(rpr, nil, nil)
 				})
 			})
 		}()
@@ -78,10 +70,18 @@ func (p *PluginManager) handleNewLocalPlugins() {
 	}
 
 	for _, plugin := range plugins {
-		_, _, err := p.launchLocal(plugin)
+		_, launchedChan, errChan, err := p.launchLocal(plugin)
 		if err != nil {
 			log.Error("launch local plugin failed: %s", err.Error())
 		}
+
+		// consume error, avoid deadlock
+		for err := range errChan {
+			log.Error("plugin launch error: %s", err.Error())
+		}
+
+		// wait for plugin launched
+		<-launchedChan
 	}
 }
 
@@ -115,155 +115,3 @@ func (p *PluginManager) removeUninstalledLocalPlugins() {
 		return true
 	})
 }
-
-func (p *PluginManager) launchLocal(pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier) (
-	plugin_entities.PluginFullDuplexLifetime, <-chan error, error,
-) {
-	plugin, err := p.getLocalPluginRuntime(pluginUniqueIdentifier)
-	if err != nil {
-		return nil, nil, err
-	}
-
-	identity, err := plugin.decoder.UniqueIdentity()
-	if err != nil {
-		return nil, nil, err
-	}
-
-	// lock launch process
-	p.localPluginLaunchingLock.Lock(identity.String())
-	defer p.localPluginLaunchingLock.Unlock(identity.String())
-
-	// check if the plugin is already running
-	if lifetime, ok := p.m.Load(identity.String()); ok {
-		lifetime, ok := lifetime.(plugin_entities.PluginFullDuplexLifetime)
-		if !ok {
-			return nil, nil, fmt.Errorf("plugin runtime not found")
-		}
-
-		// returns a closed channel to indicate the plugin is already running, no more waiting is needed
-		c := make(chan error)
-		close(c)
-
-		return lifetime, c, nil
-	}
-
-	// extract plugin
-	decoder, ok := plugin.decoder.(*decoder.ZipPluginDecoder)
-	if !ok {
-		return nil, nil, fmt.Errorf("plugin decoder is not a zip decoder")
-	}
-
-	// check if the working directory exists, if not, create it, otherwise, launch it directly
-	if _, err := os.Stat(plugin.runtime.State.WorkingPath); err != nil {
-		if err := decoder.ExtractTo(plugin.runtime.State.WorkingPath); err != nil {
-			return nil, nil, errors.Join(err, fmt.Errorf("extract plugin to working directory error"))
-		}
-	}
-
-	success := false
-	failed := func(message string) error {
-		if !success {
-			os.RemoveAll(plugin.runtime.State.WorkingPath)
-		}
-		return errors.New(message)
-	}
-
-	// get assets
-	assets, err := plugin.decoder.Assets()
-	if err != nil {
-		return nil, nil, failed(err.Error())
-	}
-
-	localPluginRuntime := local_manager.NewLocalPluginRuntime(p.pythonInterpreterPath)
-	localPluginRuntime.PluginRuntime = plugin.runtime
-	localPluginRuntime.PositivePluginRuntime = positive_manager.PositivePluginRuntime{
-		BasicPluginRuntime: basic_manager.NewBasicPluginRuntime(p.mediaBucket),
-		WorkingPath:        plugin.runtime.State.WorkingPath,
-		Decoder:            plugin.decoder,
-	}
-
-	if err := localPluginRuntime.RemapAssets(
-		&localPluginRuntime.Config,
-		assets,
-	); err != nil {
-		return nil, nil, failed(errors.Join(err, fmt.Errorf("remap plugin assets error")).Error())
-	}
-
-	success = true
-
-	p.m.Store(identity.String(), localPluginRuntime)
-
-	launchedChan := make(chan error)
-
-	// local plugin
-	routine.Submit(func() {
-		defer func() {
-			if r := recover(); r != nil {
-				log.Error("plugin runtime panic: %v", r)
-			}
-			p.m.Delete(identity.String())
-		}()
-
-		// add max launching lock to prevent too many plugins launching at the same time
-		p.maxLaunchingLock <- true
-		routine.Submit(func() {
-			// wait for plugin launched
-			<-launchedChan
-			// release max launching lock
-			<-p.maxLaunchingLock
-		})
-
-		p.fullDuplexLifecycle(localPluginRuntime, launchedChan)
-	})
-
-	return localPluginRuntime, launchedChan, nil
-}
-
-type pluginRuntimeWithDecoder struct {
-	runtime plugin_entities.PluginRuntime
-	decoder decoder.PluginDecoder
-}
-
-// extract plugin from package to working directory
-func (p *PluginManager) getLocalPluginRuntime(pluginUniqueIdentifier plugin_entities.PluginUniqueIdentifier) (
-	*pluginRuntimeWithDecoder,
-	error,
-) {
-	pluginZip, err := p.installedBucket.Get(pluginUniqueIdentifier)
-	if err != nil {
-		return nil, errors.Join(err, fmt.Errorf("get plugin package error"))
-	}
-
-	decoder, err := decoder.NewZipPluginDecoder(pluginZip)
-	if err != nil {
-		return nil, errors.Join(err, fmt.Errorf("create plugin decoder error"))
-	}
-
-	// get manifest
-	manifest, err := decoder.Manifest()
-	if err != nil {
-		return nil, errors.Join(err, fmt.Errorf("get plugin manifest error"))
-	}
-
-	checksum, err := decoder.Checksum()
-	if err != nil {
-		return nil, errors.Join(err, fmt.Errorf("calculate checksum error"))
-	}
-
-	identity := manifest.Identity()
-	identity = strings.ReplaceAll(identity, ":", "-")
-	pluginWorkingPath := path.Join(p.workingDirectory, fmt.Sprintf("%s@%s", identity, checksum))
-	return &pluginRuntimeWithDecoder{
-		runtime: plugin_entities.PluginRuntime{
-			Config: manifest,
-			State: plugin_entities.PluginRuntimeState{
-				Status:      plugin_entities.PLUGIN_RUNTIME_STATUS_PENDING,
-				Restarts:    0,
-				ActiveAt:    nil,
-				Verified:    manifest.Verified,
-				WorkingPath: pluginWorkingPath,
-			},
-		},
-		decoder: decoder,
-	}, nil
-}

+ 5 - 0
internal/oss/s3/s3_storage.go

@@ -90,6 +90,11 @@ func (s *AWSS3Storage) Delete(key string) error {
 }
 
 func (s *AWSS3Storage) List(prefix string) ([]oss.OSSPath, error) {
+	// append a slash to the prefix if it doesn't end with one
+	if !strings.HasSuffix(prefix, "/") {
+		prefix = prefix + "/"
+	}
+
 	var keys []oss.OSSPath
 	input := &s3.ListObjectsV2Input{
 		Bucket: aws.String(s.bucket),

+ 9 - 1
internal/service/endpoint.go

@@ -216,7 +216,15 @@ func ListEndpoints(tenant_id string, page int, page_size int) *entities.Response
 			db.Equal("tenant_id", tenant_id),
 		)
 		if err != nil {
-			return entities.NewErrorResponse(-404, fmt.Sprintf("failed to find plugin installation: %v", err))
+			// use empty settings and declaration for uninstalled plugins
+			endpoint.Settings = map[string]any{}
+			endpoint.Declaration = &plugin_entities.EndpointProviderDeclaration{
+				Settings:      []plugin_entities.ProviderConfig{},
+				Endpoints:     []plugin_entities.EndpointDeclaration{},
+				EndpointFiles: []string{},
+			}
+			endpoints[i] = endpoint
+			continue
 		}
 
 		pluginUniqueIdentifier, err := plugin_entities.NewPluginUniqueIdentifier(

+ 5 - 5
internal/types/app/default.go

@@ -20,12 +20,12 @@ func (config *Config) SetDefault() {
 	setDefaultBool(&config.PluginRemoteInstallingEnabled, true)
 	setDefaultBool(&config.PluginEndpointEnabled, true)
 	setDefaultString(&config.DBSslMode, "disable")
-	setDefaultString(&config.PluginStorageLocalRoot, "./storage")
-	setDefaultString(&config.PluginInstalledPath, "./plugin")
-	setDefaultString(&config.PluginMediaCachePath, "./assets")
-	setDefaultString(&config.PersistenceStoragePath, "./persistence")
+	setDefaultString(&config.PluginStorageLocalRoot, "storage")
+	setDefaultString(&config.PluginInstalledPath, "plugin")
+	setDefaultString(&config.PluginMediaCachePath, "assets")
+	setDefaultString(&config.PersistenceStoragePath, "persistence")
 	setDefaultInt(&config.PersistenceStorageMaxSize, 100*1024*1024)
-	setDefaultString(&config.PluginPackageCachePath, "./plugin_packages")
+	setDefaultString(&config.PluginPackageCachePath, "plugin_packages")
 	setDefaultString(&config.PythonInterpreterPath, "/usr/bin/python3")
 }