Selaa lähdekoodia

refactor: install plugin from local

Yeuoly 9 kuukautta sitten
vanhempi
commit
a55f6f507f

+ 1 - 1
.env.example

@@ -1,7 +1,7 @@
 SERVER_PORT=5002
 SERVER_KEY=lYkiYYT6owG+71oLerGzA7GXCgOT++6ovaezWAjpCjf+Sjc3ZtU+qUEi
 GIN_MODE=release
-PLATFORM=aws_lambda
+PLATFORM=local
 
 DIFY_INNER_API_KEY=QaHbTe77CtuXmsfyhR7+vRjI/+XbV1AaFy691iy+kGDv2Jvy0/eAh8Y1
 DIFY_INNER_API_URL=http://127.0.0.1:5001

+ 1 - 1
internal/core/dify_invocation/real/encrypt_test.go

@@ -101,7 +101,7 @@ func TestInvokeEncrypt(t *testing.T) {
 
 	time.Sleep(1 * time.Second)
 
-	i, err := InitDifyInvocationDaemon(fmt.Sprintf("http://localhost:%d", port), "test")
+	i, err := NewDifyInvocationDaemon(fmt.Sprintf("http://localhost:%d", port), "test")
 	if err != nil {
 		t.Errorf("InitDifyInvocationDaemon failed: %v", err)
 		return

+ 1 - 1
internal/core/dify_invocation/real/http_client.go

@@ -9,7 +9,7 @@ import (
 	"github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
 )
 
-func InitDifyInvocationDaemon(base string, calling_key string) (dify_invocation.BackwardsInvocation, error) {
+func NewDifyInvocationDaemon(base string, calling_key string) (dify_invocation.BackwardsInvocation, error) {
 	var err error
 	invocation := &RealBackwardsInvocation{}
 	baseurl, err := url.Parse(base)

+ 14 - 0
internal/core/plugin_manager/install_entities.go

@@ -0,0 +1,14 @@
+package plugin_manager
+
+type PluginInstallEvent string
+
+const (
+	PluginInstallEventInfo  PluginInstallEvent = "info"
+	PluginInstallEventDone  PluginInstallEvent = "done"
+	PluginInstallEventError PluginInstallEvent = "error"
+)
+
+type PluginInstallResponse struct {
+	Event PluginInstallEvent `json:"event"`
+	Data  string             `json:"data"`
+}

+ 91 - 0
internal/core/plugin_manager/install_to_local.go

@@ -0,0 +1,91 @@
+package plugin_manager
+
+import (
+	"io"
+	"os"
+	"path/filepath"
+	"time"
+
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
+)
+
+// InstallToLocal installs a plugin to local
+func (p *PluginManager) InstallToLocal(
+	tenant_id string,
+	plugin_path string,
+	plugin_unique_identifier plugin_entities.PluginUniqueIdentifier,
+	source string,
+	meta map[string]any,
+) (
+	*stream.Stream[PluginInstallResponse], error,
+) {
+	plugin_file, err := os.Open(plugin_path)
+	if err != nil {
+		return nil, err
+	}
+	defer plugin_file.Close()
+	installed_file_path := filepath.Join(p.pluginStoragePath, plugin_unique_identifier.String())
+	installed_file, err := os.Create(installed_file_path)
+	if err != nil {
+		return nil, err
+	}
+	defer installed_file.Close()
+
+	if _, err := io.Copy(installed_file, plugin_file); err != nil {
+		return nil, err
+	}
+
+	runtime, err := p.launchLocal(installed_file_path)
+	if err != nil {
+		return nil, err
+	}
+
+	response := stream.NewStream[PluginInstallResponse](128)
+	routine.Submit(func() {
+		defer response.Close()
+
+		ticker := time.NewTicker(time.Second * 5) // check heartbeat every 5 seconds
+		defer ticker.Stop()
+		timer := time.NewTimer(time.Second * 240) // timeout after 240 seconds
+		defer timer.Stop()
+
+		for {
+			select {
+			case <-ticker.C:
+				// heartbeat
+				response.Write(PluginInstallResponse{
+					Event: PluginInstallEventInfo,
+					Data:  "Installing",
+				})
+			case <-timer.C:
+				// timeout
+				response.Write(PluginInstallResponse{
+					Event: PluginInstallEventInfo,
+					Data:  "Timeout",
+				})
+				runtime.Stop()
+				return
+			case err := <-runtime.WaitLaunched():
+				// launched
+				if err != nil {
+					response.Write(PluginInstallResponse{
+						Event: PluginInstallEventError,
+						Data:  err.Error(),
+					})
+					runtime.Stop()
+					return
+				}
+				response.Write(PluginInstallResponse{
+					Event: PluginInstallEventDone,
+					Data:  "Installed",
+				})
+				return
+			}
+		}
+
+	})
+
+	return response, nil
+}

+ 2 - 25
internal/core/plugin_manager/install.go

@@ -13,22 +13,10 @@ import (
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
 )
 
-type PluginInstallEvent string
-
-const (
-	PluginInstallEventInfo  PluginInstallEvent = "info"
-	PluginInstallEventDone  PluginInstallEvent = "done"
-	PluginInstallEventError PluginInstallEvent = "error"
-)
-
-type PluginInstallResponse struct {
-	Event PluginInstallEvent `json:"event"`
-	Data  string             `json:"data"`
-}
-
 // InstallToAWSFromPkg installs a plugin to AWS Lambda
 func (p *PluginManager) InstallToAWSFromPkg(
-	tenant_id string, decoder decoder.PluginDecoder,
+	tenant_id string,
+	decoder decoder.PluginDecoder,
 	source string,
 	meta map[string]any,
 ) (
@@ -143,14 +131,3 @@ func (p *PluginManager) InstallToAWSFromPkg(
 
 	return new_response, nil
 }
-
-// InstallToLocal installs a plugin to local
-func (p *PluginManager) InstallToLocal(
-	tenant_id string, decoder decoder.PluginDecoder,
-	source string,
-	meta map[string]any,
-) (
-	*stream.Stream[PluginInstallResponse], error,
-) {
-	return nil, nil
-}

+ 10 - 24
internal/core/plugin_manager/lifetime.go

@@ -32,54 +32,40 @@ func (p *PluginManager) fullDuplexLifetime(r plugin_entities.PluginFullDuplexLif
 		}
 	}
 
-	// add plugin to manager
-	err := p.Add(r)
-	if err != nil {
-		log.Error("add plugin to manager failed: %s", err.Error())
-		return
-	}
-
 	start_failed_times := 0
 
 	// remove lifetime state after plugin if it has been stopped
 	defer r.TriggerStop()
 
-	for !r.Stopped() {
+	// try at most 3 times to init environment
+	for i := 0; i < 3; i++ {
 		if err := r.InitEnvironment(); err != nil {
 			log.Error("init environment failed: %s, retry in 30s", err.Error())
-			time.Sleep(30 * time.Second)
 			if start_failed_times == 3 {
 				log.Error(
 					"init environment failed 3 times, plugin %s has been stopped",
 					configuration.Identity(),
 				)
-				break
+				return
 			}
+			time.Sleep(30 * time.Second)
 			start_failed_times++
 			continue
 		}
+	}
 
-		start_failed_times = 0
+	// init environment successfully
+	// once succeed, we consider the plugin is installed successfully
+	for !r.Stopped() {
 		// start plugin
 		if err := r.StartPlugin(); err != nil {
 			if r.Stopped() {
+				// plugin has been stopped, exit
 				break
 			}
-			log.Error("start plugin failed: %s, retry in 30s", err.Error())
-			time.Sleep(30 * time.Second)
-			if start_failed_times == 3 {
-				log.Error(
-					"start plugin failed 3 times, plugin %s has been stopped",
-					configuration.Identity(),
-				)
-				break
-			}
-
-			start_failed_times++
-			continue
 		}
 
-		// wait for plugin to stop
+		// wait for plugin to stop normally
 		c, err := r.Wait()
 		if err == nil {
 			<-c

+ 61 - 5
internal/core/plugin_manager/local_manager/run.go

@@ -1,3 +1,4 @@
+// Package local_manager handles the local plugin runtime management
 package local_manager
 
 import (
@@ -14,6 +15,7 @@ import (
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
 )
 
+// gc performs garbage collection for the LocalPluginRuntime
 func (r *LocalPluginRuntime) gc() {
 	if r.io_identity != "" {
 		RemoveStdio(r.io_identity)
@@ -25,15 +27,23 @@ func (r *LocalPluginRuntime) gc() {
 	}
 }
 
+// init initializes the LocalPluginRuntime
 func (r *LocalPluginRuntime) init() {
+	// reset wait chan
 	r.wait_chan = make(chan bool)
+	// reset wait launched chan
+	r.wait_launched_chan_once = sync.Once{}
+	r.wait_launched_chan = make(chan error)
+
 	r.SetLaunching()
 }
 
+// Type returns the runtime type of the plugin
 func (r *LocalPluginRuntime) Type() plugin_entities.PluginRuntimeType {
 	return plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL
 }
 
+// getCmd prepares the exec.Cmd for the plugin based on its language
 func (r *LocalPluginRuntime) getCmd() (*exec.Cmd, error) {
 	if r.Config.Meta.Runner.Language == constants.Python {
 		cmd := exec.Command(r.python_interpreter_path, "-m", r.Config.Meta.Runner.Entrypoint)
@@ -44,6 +54,7 @@ func (r *LocalPluginRuntime) getCmd() (*exec.Cmd, error) {
 	return nil, fmt.Errorf("unsupported language: %s", r.Config.Meta.Runner.Language)
 }
 
+// StartPlugin starts the plugin and manages its lifecycle
 func (r *LocalPluginRuntime) StartPlugin() error {
 	defer log.Info("plugin %s stopped", r.Config.Identity())
 	defer func() {
@@ -62,6 +73,13 @@ func (r *LocalPluginRuntime) StartPlugin() error {
 	// start plugin
 	e, err := r.getCmd()
 	if err != nil {
+		r.wait_launched_chan_once.Do(func() {
+			select {
+			case r.wait_launched_chan <- err:
+			default:
+			}
+			close(r.wait_launched_chan)
+		})
 		return err
 	}
 
@@ -73,30 +91,51 @@ func (r *LocalPluginRuntime) StartPlugin() error {
 	// ensure all subprocess are killed when parent process exits, especially on Golang debugger
 	process.WrapProcess(e)
 
+	// notify launched, notify error if any
+	notify_launched := func(err error) {
+		r.wait_launched_chan_once.Do(func() {
+			select {
+			case r.wait_launched_chan <- err:
+			default:
+			}
+			close(r.wait_launched_chan)
+		})
+	}
+
 	// get writer
 	stdin, err := e.StdinPipe()
 	if err != nil {
 		r.SetRestarting()
-		return fmt.Errorf("get stdin pipe failed: %s", err.Error())
+		err = fmt.Errorf("get stdin pipe failed: %s", err.Error())
+		notify_launched(err)
+		return err
 	}
 	defer stdin.Close()
 
+	// get stdout
 	stdout, err := e.StdoutPipe()
 	if err != nil {
 		r.SetRestarting()
-		return fmt.Errorf("get stdout pipe failed: %s", err.Error())
+		err = fmt.Errorf("get stdout pipe failed: %s", err.Error())
+		notify_launched(err)
+		return err
 	}
 	defer stdout.Close()
 
+	// get stderr
 	stderr, err := e.StderrPipe()
 	if err != nil {
 		r.SetRestarting()
-		return fmt.Errorf("get stderr pipe failed: %s", err.Error())
+		err = fmt.Errorf("get stderr pipe failed: %s", err.Error())
+		notify_launched(err)
+		return err
 	}
 	defer stderr.Close()
 
 	if err := e.Start(); err != nil {
 		r.SetRestarting()
+		err = fmt.Errorf("start plugin failed: %s", err.Error())
+		notify_launched(err)
 		return err
 	}
 
@@ -112,6 +151,11 @@ func (r *LocalPluginRuntime) StartPlugin() error {
 			log.Error("plugin %s exited with error: %s", r.Config.Identity(), err.Error())
 		}
 
+		// close wait launched chan
+		r.wait_launched_chan_once.Do(func() {
+			close(r.wait_launched_chan)
+		})
+
 		r.gc()
 	}()
 	defer e.Process.Kill()
@@ -129,7 +173,12 @@ func (r *LocalPluginRuntime) StartPlugin() error {
 	// listen to plugin stdout
 	routine.Submit(func() {
 		defer wg.Done()
-		stdio.StartStdout()
+		stdio.StartStdout(func() {
+			// get heartbeat, notify launched
+			r.wait_launched_chan_once.Do(func() {
+				close(r.wait_launched_chan)
+			})
+		})
 	})
 
 	// listen to plugin stderr
@@ -153,7 +202,6 @@ func (r *LocalPluginRuntime) StartPlugin() error {
 	if err != nil {
 		return err
 	}
-
 	wg.Wait()
 
 	// plugin has exited
@@ -161,6 +209,7 @@ func (r *LocalPluginRuntime) StartPlugin() error {
 	return nil
 }
 
+// Wait returns a channel that will be closed when the plugin stops
 func (r *LocalPluginRuntime) Wait() (<-chan bool, error) {
 	if r.wait_chan == nil {
 		return nil, errors.New("plugin not started")
@@ -168,6 +217,7 @@ func (r *LocalPluginRuntime) Wait() (<-chan bool, error) {
 	return r.wait_chan, nil
 }
 
+// WaitStarted returns a channel that will receive true when the plugin starts
 func (r *LocalPluginRuntime) WaitStarted() <-chan bool {
 	c := make(chan bool)
 	r.wait_chan_lock.Lock()
@@ -176,6 +226,7 @@ func (r *LocalPluginRuntime) WaitStarted() <-chan bool {
 	return c
 }
 
+// WaitStopped returns a channel that will receive true when the plugin stops
 func (r *LocalPluginRuntime) WaitStopped() <-chan bool {
 	c := make(chan bool)
 	r.wait_chan_lock.Lock()
@@ -183,3 +234,8 @@ func (r *LocalPluginRuntime) WaitStopped() <-chan bool {
 	r.wait_chan_lock.Unlock()
 	return c
 }
+
+// WaitLaunched returns a channel that will receive an error if the plugin fails to launch
+func (r *LocalPluginRuntime) WaitLaunched() <-chan error {
+	return r.wait_launched_chan
+}

+ 3 - 1
internal/core/plugin_manager/local_manager/stdio_handle.go

@@ -64,7 +64,7 @@ func (s *stdioHolder) Stop() {
 	stdio_holder.Delete(s.id)
 }
 
-func (s *stdioHolder) StartStdout() {
+func (s *stdioHolder) StartStdout(notify_heartbeat func()) {
 	s.started = true
 	s.last_active_at = time.Now()
 	defer s.Stop()
@@ -90,6 +90,8 @@ func (s *stdioHolder) StartStdout() {
 			},
 			func() {
 				s.last_active_at = time.Now()
+				// notify launched
+				notify_heartbeat()
 			},
 			func(err string) {
 				log.Error("plugin %s: %s", s.plugin_unique_identifier, err)

+ 8 - 4
internal/core/plugin_manager/local_manager/type.go

@@ -17,11 +17,15 @@ type LocalPluginRuntime struct {
 	// python interpreter path, currently only support python
 	python_interpreter_path string
 
-	wait_chan_lock    sync.Mutex
-	wait_started_chan []chan bool
-	wait_stopped_chan []chan bool
+	wait_chan_lock          sync.Mutex
+	wait_started_chan       []chan bool
+	wait_stopped_chan       []chan bool
+	wait_launched_chan      chan error
+	wait_launched_chan_once sync.Once
 }
 
 func NewLocalPluginRuntime() *LocalPluginRuntime {
-	return &LocalPluginRuntime{}
+	return &LocalPluginRuntime{
+		wait_launched_chan: make(chan error),
+	}
 }

+ 21 - 28
internal/core/plugin_manager/manager.go

@@ -9,23 +9,28 @@ import (
 	"github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation/real"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/media_manager"
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/serverless"
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/lock"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/mapping"
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
 )
 
 type PluginManager struct {
 	m mapping.Map[string, plugin_entities.PluginLifetime]
 
+	// max size of a plugin package
 	maxPluginPackageSize int64
-	workingDirectory     string
-	packageCachePath     string
+
+	// where the plugin finally running
+	workingDirectory string
+
+	// where the plugin uploaded but not installed
+	packageCachePath string
+
+	// where the plugin finally installed but not running
+	pluginStoragePath string
 
 	// mediaManager is used to manage media files like plugin icons, images, etc.
 	mediaManager *media_manager.MediaManager
@@ -33,18 +38,8 @@ type PluginManager struct {
 	// register plugin
 	pluginRegisters []func(lifetime plugin_entities.PluginLifetime) error
 
-	// running plugin in storage contains relations between plugin packages and their running instances
-	runningPluginInStorage mapping.Map[string, string]
-	// start process lock
-	startProcessLock *lock.HighGranularityLock
-	// serverless runtime
-
-	// Install is a function that installs a plugin to the platform
-	Install func(
-		tenant_id string, decoder decoder.PluginDecoder,
-		source string,
-		meta map[string]any,
-	) (*stream.Stream[PluginInstallResponse], error)
+	// localPluginLaunchingLock is a lock to launch local plugins
+	localPluginLaunchingLock *lock.GranularityLock
 
 	// backwardsInvocation is a handle to invoke dify
 	backwardsInvocation dify_invocation.BackwardsInvocation
@@ -58,19 +53,13 @@ func NewManager(configuration *app.Config) *PluginManager {
 	manager = &PluginManager{
 		maxPluginPackageSize: configuration.MaxPluginPackageSize,
 		packageCachePath:     configuration.PluginPackageCachePath,
+		pluginStoragePath:    configuration.PluginStoragePath,
 		workingDirectory:     configuration.PluginWorkingPath,
 		mediaManager: media_manager.NewMediaManager(
 			configuration.PluginMediaCachePath,
 			configuration.PluginMediaCacheSize,
 		),
-		startProcessLock: lock.NewHighGranularityLock(),
-	}
-
-	if configuration.Platform == app.PLATFORM_AWS_LAMBDA {
-		manager.Install = manager.InstallToAWSFromPkg
-		serverless.Init(configuration)
-	} else if configuration.Platform == app.PLATFORM_LOCAL {
-		manager.Install = manager.InstallToLocal
+		localPluginLaunchingLock: lock.NewGranularityLock(),
 	}
 
 	// mkdir
@@ -119,7 +108,7 @@ func (p *PluginManager) GetAsset(id string) ([]byte, error) {
 	return p.mediaManager.Get(id)
 }
 
-func (p *PluginManager) Init(configuration *app.Config) {
+func (p *PluginManager) Launch(configuration *app.Config) {
 	log.Info("start plugin manager daemon...")
 
 	// init redis client
@@ -130,7 +119,7 @@ func (p *PluginManager) Init(configuration *app.Config) {
 		log.Panic("init redis client failed: %s", err.Error())
 	}
 
-	invocation, err := real.InitDifyInvocationDaemon(
+	invocation, err := real.NewDifyInvocationDaemon(
 		configuration.DifyInnerApiURL, configuration.DifyInnerApiKey,
 	)
 	if err != nil {
@@ -140,7 +129,7 @@ func (p *PluginManager) Init(configuration *app.Config) {
 
 	// start local watcher
 	if configuration.Platform == app.PLATFORM_LOCAL {
-		p.startLocalWatcher(configuration)
+		p.startLocalWatcher()
 	}
 
 	// start remote watcher
@@ -178,3 +167,7 @@ func (p *PluginManager) GetPackage(plugin_unique_identifier plugin_entities.Plug
 
 	return file, nil
 }
+
+func (p *PluginManager) GetPackagePath(plugin_unique_identifier plugin_entities.PluginUniqueIdentifier) (string, error) {
+	return filepath.Join(p.packageCachePath, plugin_unique_identifier.String()), nil
+}

+ 24 - 13
internal/core/plugin_manager/remote_manager/hooks.go

@@ -68,7 +68,8 @@ func (s *DifyServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
 		callbacks:      make(map[string][]func([]byte)),
 		callbacks_lock: &sync.RWMutex{},
 
-		shutdown_chan: make(chan bool),
+		shutdown_chan:      make(chan bool),
+		wait_launched_chan: make(chan error),
 
 		alive: true,
 	}
@@ -131,6 +132,11 @@ func (s *DifyServer) OnClose(c gnet.Conn, err error) (action gnet.Action) {
 	}
 	plugin.wait_chan_lock.Unlock()
 
+	// recycle launched chan, avoid memory leak
+	plugin.wait_launched_chan_once.Do(func() {
+		close(plugin.wait_launched_chan)
+	})
+
 	return gnet.None
 }
 
@@ -172,7 +178,7 @@ func (s *DifyServer) onMessage(runtime *RemotePluginRuntime, message []byte) {
 		return
 	}
 
-	close := func(message []byte) {
+	close_conn := func(message []byte) {
 		if atomic.CompareAndSwapInt32(&runtime.closed, 0, 1) {
 			runtime.conn.Write(message)
 			runtime.conn.Close()
@@ -185,12 +191,12 @@ func (s *DifyServer) onMessage(runtime *RemotePluginRuntime, message []byte) {
 		info, err := GetConnectionInfo(key)
 		if err == cache.ErrNotFound {
 			// close connection if handshake failed
-			close([]byte("handshake failed, invalid key\n"))
+			close_conn([]byte("handshake failed, invalid key\n"))
 			runtime.handshake_failed = true
 			return
 		} else if err != nil {
 			// close connection if handshake failed
-			close([]byte("internal error\n"))
+			close_conn([]byte("internal error\n"))
 			return
 		}
 
@@ -203,7 +209,7 @@ func (s *DifyServer) onMessage(runtime *RemotePluginRuntime, message []byte) {
 		declaration, err := parser.UnmarshalJsonBytes[plugin_entities.PluginDeclaration](message)
 		if err != nil {
 			// close connection if handshake failed
-			close([]byte("handshake failed, invalid plugin declaration\n"))
+			close_conn([]byte("handshake failed, invalid plugin declaration\n"))
 			return
 		}
 
@@ -215,7 +221,7 @@ func (s *DifyServer) onMessage(runtime *RemotePluginRuntime, message []byte) {
 		tools, err := parser.UnmarshalJsonBytes2Slice[plugin_entities.ToolProviderDeclaration](message)
 		if err != nil {
 			log.Error("tools register failed, error: %v", err)
-			close([]byte("tools register failed, invalid tools declaration\n"))
+			close_conn([]byte("tools register failed, invalid tools declaration\n"))
 			return
 		}
 
@@ -230,7 +236,7 @@ func (s *DifyServer) onMessage(runtime *RemotePluginRuntime, message []byte) {
 		models, err := parser.UnmarshalJsonBytes2Slice[plugin_entities.ModelProviderDeclaration](message)
 		if err != nil {
 			log.Error("models register failed, error: %v", err)
-			close([]byte("models register failed, invalid models declaration\n"))
+			close_conn([]byte("models register failed, invalid models declaration\n"))
 			return
 		}
 
@@ -245,7 +251,7 @@ func (s *DifyServer) onMessage(runtime *RemotePluginRuntime, message []byte) {
 		endpoints, err := parser.UnmarshalJsonBytes2Slice[plugin_entities.EndpointProviderDeclaration](message)
 		if err != nil {
 			log.Error("endpoints register failed, error: %v", err)
-			close([]byte("endpoints register failed, invalid endpoints declaration\n"))
+			close_conn([]byte("endpoints register failed, invalid endpoints declaration\n"))
 			return
 		}
 
@@ -260,7 +266,7 @@ func (s *DifyServer) onMessage(runtime *RemotePluginRuntime, message []byte) {
 		assets, err := parser.UnmarshalJsonBytes2Slice[plugin_entities.RemoteAssetPayload](message)
 		if err != nil {
 			log.Error("assets register failed, error: %v", err)
-			close([]byte("assets register failed, invalid assets declaration\n"))
+			close_conn([]byte("assets register failed, invalid assets declaration\n"))
 			return
 		}
 
@@ -269,7 +275,7 @@ func (s *DifyServer) onMessage(runtime *RemotePluginRuntime, message []byte) {
 			files[asset.Filename], err = hex.DecodeString(asset.Data)
 			if err != nil {
 				log.Error("assets decode failed, error: %v", err)
-				close([]byte("assets decode failed, invalid assets data, cannot decode file\n"))
+				close_conn([]byte("assets decode failed, invalid assets data, cannot decode file\n"))
 				return
 			}
 		}
@@ -277,13 +283,13 @@ func (s *DifyServer) onMessage(runtime *RemotePluginRuntime, message []byte) {
 		// remap assets
 		if err := runtime.RemapAssets(&runtime.Config, files); err != nil {
 			log.Error("assets remap failed, error: %v", err)
-			close([]byte("assets remap failed, invalid assets data, cannot remap\n"))
+			close_conn([]byte("assets remap failed, invalid assets data, cannot remap\n"))
 			return
 		}
 
 		atomic.AddInt32(&s.current_conn, 1)
 		if atomic.LoadInt32(&s.current_conn) > int32(s.max_conn) {
-			close([]byte("server is busy now, please try again later\n"))
+			close_conn([]byte("server is busy now, please try again later\n"))
 			return
 		}
 
@@ -300,7 +306,7 @@ func (s *DifyServer) onMessage(runtime *RemotePluginRuntime, message []byte) {
 		// trigger registration event
 		if err := runtime.Register(); err != nil {
 			log.Error("register failed, error: %v", err)
-			close([]byte("register failed, cannot register\n"))
+			close_conn([]byte("register failed, cannot register\n"))
 			return
 		}
 
@@ -314,6 +320,11 @@ func (s *DifyServer) onMessage(runtime *RemotePluginRuntime, message []byte) {
 		}
 		runtime.wait_chan_lock.Unlock()
 
+		// notify launched
+		runtime.wait_launched_chan_once.Do(func() {
+			close(runtime.wait_launched_chan)
+		})
+
 		// publish runtime to watcher
 		s.response.Write(runtime)
 	} else {

+ 4 - 0
internal/core/plugin_manager/remote_manager/run.go

@@ -88,3 +88,7 @@ func (r *RemotePluginRuntime) Wait() (<-chan bool, error) {
 func (r *RemotePluginRuntime) Checksum() (string, error) {
 	return r.checksum, nil
 }
+
+func (r *RemotePluginRuntime) WaitLaunched() <-chan error {
+	return r.wait_launched_chan
+}

+ 5 - 3
internal/core/plugin_manager/remote_manager/type.go

@@ -59,9 +59,11 @@ type RemotePluginRuntime struct {
 	installation_id string
 
 	// wait for started event
-	wait_chan_lock    sync.Mutex
-	wait_started_chan []chan bool
-	wait_stopped_chan []chan bool
+	wait_chan_lock          sync.Mutex
+	wait_started_chan       []chan bool
+	wait_stopped_chan       []chan bool
+	wait_launched_chan      chan error
+	wait_launched_chan_once sync.Once
 }
 
 // Listen creates a new listener for the given session_id

+ 7 - 8
internal/core/plugin_manager/tester.go

@@ -24,24 +24,24 @@ func (p *PluginManager) TestPlugin(
 	timeout string,
 ) (*stream.Stream[any], error) {
 	// launch plugin runtime
-	plugin, err := p.loadPlugin(path)
+	plugin, err := p.getLocalPluginRuntime(path)
 	if err != nil {
 		return nil, errors.Join(err, errors.New("failed to load plugin"))
 	}
 
 	// get assets
-	assets, err := plugin.Decoder.Assets()
+	assets, err := plugin.decoder.Assets()
 	if err != nil {
 		return nil, errors.Join(err, errors.New("failed to get assets"))
 	}
 
 	local_plugin_runtime := local_manager.NewLocalPluginRuntime()
-	local_plugin_runtime.PluginRuntime = plugin.Runtime
+	local_plugin_runtime.PluginRuntime = plugin.runtime
 	local_plugin_runtime.PositivePluginRuntime = positive_manager.PositivePluginRuntime{
 		BasicPluginRuntime: basic_manager.NewBasicPluginRuntime(p.mediaManager),
-		LocalPackagePath:   plugin.Runtime.State.AbsolutePath,
-		WorkingPath:        plugin.Runtime.State.WorkingPath,
-		Decoder:            plugin.Decoder,
+		LocalPackagePath:   plugin.runtime.State.AbsolutePath,
+		WorkingPath:        plugin.runtime.State.WorkingPath,
+		Decoder:            plugin.decoder,
 	}
 	if err := local_plugin_runtime.RemapAssets(
 		&local_plugin_runtime.Config,
@@ -66,7 +66,6 @@ func (p *PluginManager) TestPlugin(
 			}
 		}()
 		// delete the plugin from the storage when the plugin is stopped
-		defer p.runningPluginInStorage.Delete(plugin.Runtime.State.AbsolutePath)
 		p.fullDuplexLifetime(local_plugin_runtime)
 	})
 
@@ -94,7 +93,7 @@ func (p *PluginManager) TestPlugin(
 			ClusterID:              "test-cluster",
 			InvokeFrom:             access_type,
 			Action:                 access_action,
-			Declaration:            plugin.Runtime.Configuration(),
+			Declaration:            plugin.runtime.Configuration(),
 			BackwardsInvocation:    manager.BackwardsInvocation(),
 			IgnoreCache:            true,
 		},

+ 95 - 80
internal/core/plugin_manager/watcher.go

@@ -20,12 +20,12 @@ import (
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
 )
 
-func (p *PluginManager) startLocalWatcher(config *app.Config) {
+func (p *PluginManager) startLocalWatcher() {
 	go func() {
-		log.Info("start to handle new plugins in path: %s", config.PluginStoragePath)
-		p.handleNewLocalPlugins(config)
+		log.Info("start to handle new plugins in path: %s", p.pluginStoragePath)
+		p.handleNewLocalPlugins()
 		for range time.NewTicker(time.Second * 30).C {
-			p.handleNewLocalPlugins(config)
+			p.handleNewLocalPlugins()
 		}
 	}()
 }
@@ -55,96 +55,116 @@ func (p *PluginManager) startRemoteWatcher(config *app.Config) {
 	}
 }
 
-func (p *PluginManager) handleNewLocalPlugins(config *app.Config) {
+func (p *PluginManager) handleNewLocalPlugins() {
 	// load local plugins firstly
-	for plugin := range p.loadNewLocalPlugins(config.PluginStoragePath) {
-		// get assets
-		assets, err := plugin.Decoder.Assets()
-		if err != nil {
-			log.Error("get plugin assets error: %v", err)
-			continue
-		}
+	plugins, err := os.ReadDir(p.pluginStoragePath)
+	if err != nil {
+		log.Error("no plugin found in path: %s", p.pluginStoragePath)
+	}
 
-		local_plugin_runtime := local_manager.NewLocalPluginRuntime()
-		local_plugin_runtime.PluginRuntime = plugin.Runtime
-		local_plugin_runtime.PositivePluginRuntime = positive_manager.PositivePluginRuntime{
-			BasicPluginRuntime: basic_manager.NewBasicPluginRuntime(p.mediaManager),
-			LocalPackagePath:   plugin.Runtime.State.AbsolutePath,
-			WorkingPath:        plugin.Runtime.State.WorkingPath,
-			Decoder:            plugin.Decoder,
+	for _, plugin := range plugins {
+		if !plugin.IsDir() {
+			abs_path := path.Join(p.pluginStoragePath, plugin.Name())
+			_, err := p.launchLocal(abs_path)
+			if err != nil {
+				log.Error("launch local plugin failed: %s", err.Error())
+			}
 		}
+	}
+}
+
+func (p *PluginManager) launchLocal(plugin_package_path string) (plugin_entities.PluginFullDuplexLifetime, error) {
+	plugin, err := p.getLocalPluginRuntime(plugin_package_path)
+	if err != nil {
+		return nil, err
+	}
 
-		if err := local_plugin_runtime.RemapAssets(
-			&local_plugin_runtime.Config,
-			assets,
-		); err != nil {
-			log.Error("remap plugin assets error: %v", err)
-			continue
+	identity, err := plugin.decoder.UniqueIdentity()
+	if err != nil {
+		return nil, err
+	}
+
+	// lock launch process
+	p.localPluginLaunchingLock.Lock(identity.String())
+	defer p.localPluginLaunchingLock.Unlock(identity.String())
+
+	// check if the plugin is already running
+	if _, ok := p.m.Load(identity.String()); ok {
+		lifetime, ok := p.Get(identity).(plugin_entities.PluginFullDuplexLifetime)
+		if !ok {
+			return nil, fmt.Errorf("plugin runtime not found")
 		}
+		return lifetime, nil
+	}
+
+	// extract plugin
+	decoder, ok := plugin.decoder.(*decoder.ZipPluginDecoder)
+	if !ok {
+		return nil, fmt.Errorf("plugin decoder is not a zip decoder")
+	}
+
+	if err := decoder.ExtractTo(plugin.runtime.State.WorkingPath); err != nil {
+		return nil, errors.Join(err, fmt.Errorf("extract plugin to working directory error"))
+	}
 
-		identity, err := local_plugin_runtime.Identity()
-		if err != nil {
-			log.Error("get plugin identity error: %v", err)
-			continue
+	success := false
+	failed := func(message string) error {
+		if !success {
+			os.RemoveAll(plugin.runtime.State.WorkingPath)
 		}
+		return errors.New(message)
+	}
 
-		// store the plugin in the storage, avoid duplicate loading
-		p.runningPluginInStorage.Store(plugin.Runtime.State.AbsolutePath, identity.String())
-
-		// local plugin
-		routine.Submit(func() {
-			defer func() {
-				if r := recover(); r != nil {
-					log.Error("plugin runtime error: %v", r)
-				}
-			}()
-			// delete the plugin from the storage when the plugin is stopped
-			defer p.runningPluginInStorage.Delete(plugin.Runtime.State.AbsolutePath)
-			p.fullDuplexLifetime(local_plugin_runtime)
-		})
+	// get assets
+	assets, err := plugin.decoder.Assets()
+	if err != nil {
+		return nil, failed(err.Error())
 	}
-}
 
-type pluginRuntimeWithDecoder struct {
-	Runtime plugin_entities.PluginRuntime
-	Decoder decoder.PluginDecoder
-}
+	local_plugin_runtime := local_manager.NewLocalPluginRuntime()
+	local_plugin_runtime.PluginRuntime = plugin.runtime
+	local_plugin_runtime.PositivePluginRuntime = positive_manager.PositivePluginRuntime{
+		BasicPluginRuntime: basic_manager.NewBasicPluginRuntime(p.mediaManager),
+		LocalPackagePath:   plugin.runtime.State.AbsolutePath,
+		WorkingPath:        plugin.runtime.State.WorkingPath,
+		Decoder:            plugin.decoder,
+	}
 
-// chan should be closed after using that
-func (p *PluginManager) loadNewLocalPlugins(root_path string) <-chan *pluginRuntimeWithDecoder {
-	ch := make(chan *pluginRuntimeWithDecoder)
+	if err := local_plugin_runtime.RemapAssets(
+		&local_plugin_runtime.Config,
+		assets,
+	); err != nil {
+		return nil, failed(errors.Join(err, fmt.Errorf("remap plugin assets error")).Error())
+	}
 
-	plugins, err := os.ReadDir(root_path)
+	// add plugin to manager
+	err = p.Add(local_plugin_runtime)
 	if err != nil {
-		log.Error("no plugin found in path: %s", root_path)
-		close(ch)
-		return ch
+		return nil, failed(errors.Join(err, fmt.Errorf("add plugin to manager failed")).Error())
 	}
 
+	success = true
+
+	// local plugin
 	routine.Submit(func() {
-		for _, plugin := range plugins {
-			if !plugin.IsDir() {
-				abs_path := path.Join(root_path, plugin.Name())
-				if _, ok := p.runningPluginInStorage.Load(abs_path); ok {
-					// if the plugin is already running, skip it
-					continue
-				}
-
-				plugin, err := p.loadPlugin(abs_path)
-				if err != nil {
-					log.Error("load plugin error: %v", err)
-					continue
-				}
-				ch <- plugin
+		defer func() {
+			if r := recover(); r != nil {
+				log.Error("plugin runtime panic: %v", r)
 			}
-		}
-		close(ch)
+		}()
+		p.fullDuplexLifetime(local_plugin_runtime)
 	})
 
-	return ch
+	return local_plugin_runtime, nil
 }
 
-func (p *PluginManager) loadPlugin(plugin_path string) (*pluginRuntimeWithDecoder, error) {
+type pluginRuntimeWithDecoder struct {
+	runtime plugin_entities.PluginRuntime
+	decoder decoder.PluginDecoder
+}
+
+// extract plugin from package to working directory
+func (p *PluginManager) getLocalPluginRuntime(plugin_path string) (*pluginRuntimeWithDecoder, error) {
 	pack, err := os.Open(plugin_path)
 	if err != nil {
 		return nil, errors.Join(err, fmt.Errorf("open plugin package error"))
@@ -195,13 +215,8 @@ func (p *PluginManager) loadPlugin(plugin_path string) (*pluginRuntimeWithDecode
 		return nil, errors.Join(fmt.Errorf("plugin working directory already exists: %s", plugin_working_path), err)
 	}
 
-	// extract to working directory
-	if err := decoder.ExtractTo(plugin_working_path); err != nil {
-		return nil, errors.Join(fmt.Errorf("extract plugin to working directory error: %v", err), err)
-	}
-
 	return &pluginRuntimeWithDecoder{
-		Runtime: plugin_entities.PluginRuntime{
+		runtime: plugin_entities.PluginRuntime{
 			Config: manifest,
 			State: plugin_entities.PluginRuntimeState{
 				Status:       plugin_entities.PLUGIN_RUNTIME_STATUS_PENDING,
@@ -212,6 +227,6 @@ func (p *PluginManager) loadPlugin(plugin_path string) (*pluginRuntimeWithDecode
 				Verified:     manifest.Verified,
 			},
 		},
-		Decoder: decoder,
+		decoder: decoder,
 	}, nil
 }

+ 1 - 1
internal/server/controllers/plugins.go

@@ -67,7 +67,7 @@ func InstallPluginFromIdentifiers(app *app.Config) gin.HandlerFunc {
 				request.Meta = map[string]any{}
 			}
 			c.JSON(http.StatusOK, service.InstallPluginFromIdentifiers(
-				request.TenantID, request.PluginUniqueIdentifiers, request.Source, request.Meta,
+				app, request.TenantID, request.PluginUniqueIdentifiers, request.Source, request.Meta,
 			))
 		})
 	}

+ 1 - 1
internal/server/server.go

@@ -30,7 +30,7 @@ func (app *App) Run(config *app.Config) {
 	manager.AddPluginRegisterHandler(app.cluster.RegisterPlugin)
 
 	// init manager
-	manager.Init(config)
+	manager.Launch(config)
 
 	// init persistence
 	persistence.InitPersistence(config)

+ 39 - 11
internal/service/install_plugin.go

@@ -3,20 +3,24 @@ package service
 import (
 	"errors"
 	"fmt"
+	"os"
 
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
 	"github.com/langgenius/dify-plugin-daemon/internal/db"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/models"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/models/curd"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
 	"gorm.io/gorm"
 )
 
 func InstallPluginFromIdentifiers(
+	config *app.Config,
 	tenant_id string,
 	plugin_unique_identifiers []plugin_entities.PluginUniqueIdentifier,
 	source string,
@@ -90,7 +94,6 @@ func InstallPluginFromIdentifiers(
 	}
 
 	response.TaskID = task.ID
-
 	manager := plugin_manager.Manager()
 
 	tasks := []func(){}
@@ -127,7 +130,7 @@ func InstallPluginFromIdentifiers(
 				}
 			}
 
-			pkg, err := manager.GetPackage(plugin_unique_identifier)
+			pkg_path, err := manager.GetPackagePath(plugin_unique_identifier)
 			if err != nil {
 				updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
 					task.Status = models.InstallTaskStatusFailed
@@ -137,22 +140,47 @@ func InstallPluginFromIdentifiers(
 				return
 			}
 
-			decoder, err := decoder.NewZipPluginDecoder(pkg)
-			if err != nil {
+			updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
+				plugin.Status = models.InstallTaskStatusRunning
+				plugin.Message = "Installing"
+			})
+
+			var stream *stream.Stream[plugin_manager.PluginInstallResponse]
+			if config.Platform == app.PLATFORM_AWS_LAMBDA {
+				var zip_decoder *decoder.ZipPluginDecoder
+				var pkg_file []byte
+
+				pkg_file, err = os.ReadFile(pkg_path)
+				if err != nil {
+					updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
+						task.Status = models.InstallTaskStatusFailed
+						plugin.Status = models.InstallTaskStatusFailed
+						plugin.Message = "Failed to read plugin package"
+					})
+					return
+				}
+
+				zip_decoder, err = decoder.NewZipPluginDecoder(pkg_file)
+				if err != nil {
+					updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
+						task.Status = models.InstallTaskStatusFailed
+						plugin.Status = models.InstallTaskStatusFailed
+						plugin.Message = err.Error()
+					})
+					return
+				}
+				stream, err = manager.InstallToAWSFromPkg(tenant_id, zip_decoder, source, meta)
+			} else if config.Platform == app.PLATFORM_LOCAL {
+				stream, err = manager.InstallToLocal(tenant_id, pkg_path, plugin_unique_identifier, source, meta)
+			} else {
 				updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
 					task.Status = models.InstallTaskStatusFailed
 					plugin.Status = models.InstallTaskStatusFailed
-					plugin.Message = err.Error()
+					plugin.Message = "Unsupported platform"
 				})
 				return
 			}
 
-			updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
-				plugin.Status = models.InstallTaskStatusRunning
-				plugin.Message = "Installing"
-			})
-
-			stream, err := manager.Install(tenant_id, decoder, source, meta)
 			if err != nil {
 				updateTaskStatus(func(task *models.InstallTask, plugin *models.InstallTaskPluginStatus) {
 					task.Status = models.InstallTaskStatusFailed

+ 2 - 0
internal/types/entities/plugin_entities/runtime.go

@@ -54,6 +54,8 @@ type (
 		WaitStarted() <-chan bool
 		// Stopped
 		WaitStopped() <-chan bool
+		// Launched
+		WaitLaunched() <-chan error
 	}
 
 	PluginServerlessLifetime interface {

+ 17 - 5
internal/utils/lock/lock.go

@@ -10,18 +10,18 @@ type mutex struct {
 	count int32
 }
 
-type HighGranularityLock struct {
+type GranularityLock struct {
 	m map[string]*mutex
 	l sync.Mutex
 }
 
-func NewHighGranularityLock() *HighGranularityLock {
-	return &HighGranularityLock{
+func NewGranularityLock() *GranularityLock {
+	return &GranularityLock{
 		m: make(map[string]*mutex),
 	}
 }
 
-func (l *HighGranularityLock) Lock(key string) {
+func (l *GranularityLock) Lock(key string) {
 	l.l.Lock()
 	var m *mutex
 	var ok bool
@@ -36,7 +36,19 @@ func (l *HighGranularityLock) Lock(key string) {
 	m.Lock()
 }
 
-func (l *HighGranularityLock) Unlock(key string) {
+func (l *GranularityLock) TryLock(key string) bool {
+	l.l.Lock()
+	m, ok := l.m[key]
+	if !ok {
+		return false
+	}
+
+	locked := m.TryLock()
+	l.l.Unlock()
+	return locked
+}
+
+func (l *GranularityLock) Unlock(key string) {
 	l.l.Lock()
 	m, ok := l.m[key]
 	if !ok {

+ 1 - 1
internal/utils/lock/lock_test.go

@@ -7,7 +7,7 @@ import (
 )
 
 func TestHighGranularityLock(t *testing.T) {
-	l := NewHighGranularityLock()
+	l := NewGranularityLock()
 
 	data := []int{}
 	add := func(key int) {