ソースを参照

feat: plugin debugging server

Yeuoly 1 年間 前
コミット
52f51e13ad

+ 1 - 0
cmd/server/main.go

@@ -34,6 +34,7 @@ func setDefault(config *app.Config) {
 	setDefaultInt(&config.LifetimeCollectionHeartbeatInterval, 5)
 	setDefaultInt(&config.LifetimeStateGCInterval, 300)
 	setDefaultInt(&config.DifyInvocationConnectionIdleTimeout, 120)
+	setDefaultInt(&config.PluginRemoteInstallServerEventLoopNums, 8)
 
 	setDebugString(&config.ProcessCachingPath, "/tmp/dify-plugin-daemon-subprocesses")
 }

+ 0 - 1
internal/core/plugin_manager/aws_connector.go

@@ -1 +0,0 @@
-package plugin_manager

+ 0 - 4
internal/core/plugin_manager/aws_manager/io.go

@@ -10,7 +10,3 @@ func (r *AWSPluginRuntime) Listen(session_id string) *entities.BytesIOListener {
 func (r *AWSPluginRuntime) Write(session_id string, data []byte) {
 
 }
-
-func (r *AWSPluginRuntime) Request(session_id string, data []byte) ([]byte, error) {
-	return nil, nil
-}

+ 8 - 8
internal/core/plugin_manager/lifetime.go

@@ -12,15 +12,15 @@ func lifetime(config *app.Config, r entities.PluginRuntimeInterface) {
 	start_failed_times := 0
 	configuration := r.Configuration()
 
+	// store plugin runtime
+	m.Store(configuration.Identity(), r)
+	defer m.Delete(configuration.Identity())
+
+	// update lifetime state for this pod
 	addLifetimeState(r)
-	defer func() {
-		// remove lifetime state after plugin if it has been stopped for $LIFETIME_STATE_GC_INTERVAL and not started again
-		time.AfterFunc(time.Duration(config.LifetimeStateGCInterval)*time.Second, func() {
-			if r.Stopped() {
-				deleteLifetimeState(r)
-			}
-		})
-	}()
+
+	// remove lifetime state after plugin if it has been stopped
+	defer deleteLifetimeState(r)
 
 	for !r.Stopped() {
 		if err := r.InitEnvironment(); err != nil {

+ 0 - 1
internal/core/plugin_manager/local_connector.go

@@ -1 +0,0 @@
-package plugin_manager

+ 1 - 5
internal/core/plugin_manager/local_manager/io.go

@@ -1,7 +1,7 @@
 package local_manager
 
 import (
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/stdio_holder"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/local_manager/stdio_holder"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
 )
 
@@ -20,7 +20,3 @@ func (r *LocalPluginRuntime) Listen(session_id string) *entities.BytesIOListener
 func (r *LocalPluginRuntime) Write(session_id string, data []byte) {
 	stdio_holder.Write(r.io_identity, append(data, '\n'))
 }
-
-func (r *LocalPluginRuntime) Request(session_id string, data []byte) ([]byte, error) {
-	return nil, nil
-}

+ 1 - 1
internal/core/plugin_manager/local_manager/run.go

@@ -6,7 +6,7 @@ import (
 	"os/exec"
 	"sync"
 
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/stdio_holder"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/local_manager/stdio_holder"
 	"github.com/langgenius/dify-plugin-daemon/internal/process"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"

+ 2 - 1
internal/core/plugin_manager/stdio_holder/io.go

@@ -8,6 +8,7 @@ import (
 	"sync"
 	"time"
 
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/plugin_errors"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
@@ -168,7 +169,7 @@ func (s *stdioHolder) Wait() error {
 		case <-ticker.C:
 			// check heartbeat
 			if time.Since(s.last_active_at) > 20*time.Second {
-				return errors.New("plugin is not active, does not respond to heartbeat in 20 seconds")
+				return plugin_errors.ErrPluginNotActive
 			}
 		case <-s.health_chan:
 			// closed

internal/core/plugin_manager/stdio_holder/store.go → internal/core/plugin_manager/local_manager/stdio_holder/store.go


+ 7 - 0
internal/core/plugin_manager/plugin_errors/errors.go

@@ -0,0 +1,7 @@
+package plugin_errors
+
+import "errors"
+
+var (
+	ErrPluginNotActive = errors.New("plugin is not active, does not respond to heartbeat in 20 seconds")
+)

+ 47 - 0
internal/core/plugin_manager/remote_manager/codec.go

@@ -0,0 +1,47 @@
+package remote_manager
+
+import (
+	"bytes"
+	"errors"
+
+	"github.com/panjf2000/gnet/v2"
+)
+
+type codec struct {
+	buf bytes.Buffer
+}
+
+func (w *codec) Decode(c gnet.Conn) ([][]byte, error) {
+	size := c.InboundBuffered()
+	buf := make([]byte, size)
+	read, err := c.Read(buf)
+
+	if err != nil {
+		return nil, err
+	}
+
+	if read < size {
+		return nil, errors.New("read less than size")
+	}
+
+	return w.getLines(buf), nil
+}
+
+func (w *codec) getLines(data []byte) [][]byte {
+	// write to buffer
+	w.buf.Write(data)
+
+	// read line by line, split by \n, remaining data will be kept in buffer
+	lines := bytes.Split(w.buf.Bytes(), []byte("\n"))
+	w.buf.Reset()
+
+	// if last line is not complete, keep it in buffer
+	if len(lines[len(lines)-1]) != 0 {
+		w.buf.Write(lines[len(lines)-1])
+		lines = lines[:len(lines)-1]
+	} else if len(lines) > 0 {
+		lines = lines[:len(lines)-1]
+	}
+
+	return lines
+}

+ 21 - 0
internal/core/plugin_manager/remote_manager/codec_test.go

@@ -0,0 +1,21 @@
+package remote_manager
+
+import "testing"
+
+func TestCodec(t *testing.T) {
+	codec := &codec{}
+	liens := codec.getLines([]byte("test\n"))
+	if len(liens) != 1 {
+		t.Error("getLines failed")
+	}
+
+	liens = codec.getLines([]byte("test\ntest"))
+	if len(liens) == 2 {
+		t.Error("getLines failed")
+	}
+
+	liens = codec.getLines([]byte("\n"))
+	if len(liens) != 1 {
+		t.Error("getLines failed")
+	}
+}

+ 106 - 0
internal/core/plugin_manager/remote_manager/hooks.go

@@ -0,0 +1,106 @@
+package remote_manager
+
+import (
+	"sync"
+
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
+	"github.com/panjf2000/gnet/v2"
+)
+
+type DifyServer struct {
+	gnet.BuiltinEventEngine
+
+	engine gnet.Engine
+
+	// listening address
+	addr string
+
+	// enabled multicore
+	multicore bool
+
+	// event loop count
+	num_loops int
+
+	// read new connections
+	response *stream.StreamResponse[*RemotePluginRuntime]
+
+	plugins      map[int]*RemotePluginRuntime
+	plugins_lock *sync.RWMutex
+
+	shutdown_chan chan bool
+}
+
+func (s *DifyServer) OnBoot(c gnet.Engine) (action gnet.Action) {
+	s.engine = c
+	return gnet.None
+}
+
+func (s *DifyServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
+	// new plugin connected
+	c.SetContext(&codec{})
+	runtime := &RemotePluginRuntime{
+		conn:           c,
+		response:       stream.NewStreamResponse[[]byte](512),
+		callbacks:      make(map[string][]func([]byte)),
+		callbacks_lock: &sync.RWMutex{},
+
+		shutdown_chan: make(chan bool),
+
+		alive: true,
+	}
+
+	// store plugin runtime
+	s.plugins_lock.Lock()
+	s.plugins[c.Fd()] = runtime
+	s.plugins_lock.Unlock()
+
+	s.response.Write(runtime)
+
+	// verified
+	verified := true
+	if verified {
+		return nil, gnet.None
+	}
+
+	return nil, gnet.Close
+}
+
+func (s *DifyServer) OnClose(c gnet.Conn, err error) (action gnet.Action) {
+	// plugin disconnected
+	s.plugins_lock.Lock()
+	plugin := s.plugins[c.Fd()]
+	delete(s.plugins, c.Fd())
+	s.plugins_lock.Unlock()
+
+	// close plugin
+	plugin.close()
+
+	return gnet.None
+}
+
+func (s *DifyServer) OnShutdown(c gnet.Engine) {
+	close(s.shutdown_chan)
+}
+
+func (s *DifyServer) OnTraffic(c gnet.Conn) (action gnet.Action) {
+	codec := c.Context().(*codec)
+	messages, err := codec.Decode(c)
+	if err != nil {
+		return gnet.Close
+	}
+
+	// get plugin runtime
+	s.plugins_lock.RLock()
+	runtime, ok := s.plugins[c.Fd()]
+	s.plugins_lock.RUnlock()
+	if !ok {
+		return gnet.Close
+	}
+
+	// handle messages
+	for _, message := range messages {
+		runtime.response.Write(message)
+	}
+
+	return gnet.None
+}

+ 25 - 0
internal/core/plugin_manager/remote_manager/io.go

@@ -0,0 +1,25 @@
+package remote_manager
+
+import (
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+	"github.com/panjf2000/gnet/v2"
+)
+
+func (r *RemotePluginRuntime) Listen(session_id string) *entities.BytesIOListener {
+	listener := entities.NewIOListener[[]byte]()
+	listener.OnClose(func() {
+		r.removeCallback(session_id)
+	})
+
+	r.addCallback(session_id, func(data []byte) {
+		listener.Emit(data)
+	})
+
+	return listener
+}
+
+func (r *RemotePluginRuntime) Write(session_id string, data []byte) {
+	r.conn.AsyncWrite(append(data, '\n'), func(c gnet.Conn, err error) error {
+		return nil
+	})
+}

+ 101 - 0
internal/core/plugin_manager/remote_manager/run.go

@@ -0,0 +1,101 @@
+package remote_manager
+
+import (
+	"time"
+
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/plugin_errors"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
+)
+
+func (r *RemotePluginRuntime) InitEnvironment() error {
+	return nil
+}
+
+func (r *RemotePluginRuntime) Stopped() bool {
+	return !r.alive
+}
+
+func (r *RemotePluginRuntime) Stop() {
+	r.alive = false
+	if r.conn == nil {
+		return
+	}
+	r.conn.Close()
+}
+
+func (r *RemotePluginRuntime) StartPlugin() error {
+	var exit_error error
+
+	// handle heartbeat
+	routine.Submit(func() {
+		r.last_active_at = time.Now()
+		ticker := time.NewTicker(5 * time.Second)
+		defer ticker.Stop()
+
+		for {
+			select {
+			case <-ticker.C:
+				if time.Since(r.last_active_at) > 20*time.Second {
+					// kill this connection
+					r.conn.Close()
+					exit_error = plugin_errors.ErrPluginNotActive
+					return
+				}
+			case <-r.shutdown_chan:
+				return
+			}
+		}
+	})
+
+	for r.response.Next() {
+		data, err := r.response.Read()
+		if err != nil {
+			return err
+		}
+
+		// handle event
+		event, err := parser.UnmarshalJsonBytes[plugin_entities.PluginUniversalEvent](data)
+		if err != nil {
+			continue
+		}
+
+		session_id := event.SessionId
+
+		switch event.Event {
+		case plugin_entities.PLUGIN_EVENT_LOG:
+			if event.Event == plugin_entities.PLUGIN_EVENT_LOG {
+				log_event, err := parser.UnmarshalJsonBytes[plugin_entities.PluginLogEvent](
+					event.Data,
+				)
+				if err != nil {
+					log.Error("unmarshal json failed: %s", err.Error())
+					continue
+				}
+
+				log.Info("plugin %s: %s", r.Configuration().Identity(), log_event.Message)
+			}
+		case plugin_entities.PLUGIN_EVENT_SESSION:
+			r.callbacks_lock.RLock()
+			listeners := r.callbacks[session_id][:]
+			r.callbacks_lock.RUnlock()
+
+			// handle session event
+			for _, listener := range listeners {
+				listener(event.Data)
+			}
+		case plugin_entities.PLUGIN_EVENT_ERROR:
+			log.Error("plugin %s: %s", r.Configuration().Identity(), event.Data)
+		case plugin_entities.PLUGIN_EVENT_HEARTBEAT:
+			r.last_active_at = time.Now()
+		}
+	}
+
+	return exit_error
+}
+
+func (r *RemotePluginRuntime) Wait() (<-chan bool, error) {
+	return r.shutdown_chan, nil
+}

+ 97 - 0
internal/core/plugin_manager/remote_manager/server.go

@@ -0,0 +1,97 @@
+package remote_manager
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"sync"
+
+	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
+	"github.com/panjf2000/gnet/v2"
+
+	gnet_errors "github.com/panjf2000/gnet/v2/pkg/errors"
+)
+
+type RemotePluginServer struct {
+	server *DifyServer
+}
+
+// continue accepting new connections
+func (r *RemotePluginServer) Read() (*RemotePluginRuntime, error) {
+	if r.server.response == nil {
+		return nil, errors.New("plugin server not started")
+	}
+
+	return r.server.response.Read()
+}
+
+// Next returns true if there are more connections to be read
+func (r *RemotePluginServer) Next() bool {
+	if r.server.response == nil {
+		return false
+	}
+
+	return r.server.response.Next()
+}
+
+// Stop stops the server
+func (r *RemotePluginServer) Stop() error {
+	if r.server.response == nil {
+		return errors.New("plugin server not started")
+	}
+	r.server.response.Close()
+	err := r.server.engine.Stop(context.Background())
+
+	if err == gnet_errors.ErrEmptyEngine || err == gnet_errors.ErrEngineInShutdown {
+		return nil
+	}
+
+	return err
+}
+
+// Launch starts the server
+func (r *RemotePluginServer) Launch() error {
+	err := gnet.Run(
+		r.server, r.server.addr, gnet.WithMulticore(r.server.multicore),
+		gnet.WithNumEventLoop(r.server.num_loops),
+	)
+
+	if err != nil {
+		r.Stop()
+	}
+
+	return err
+}
+
+// NewRemotePluginServer creates a new RemotePluginServer
+func NewRemotePluginServer(config *app.Config) *RemotePluginServer {
+	addr := fmt.Sprintf(
+		"tcp://%s:%d",
+		config.PluginRemoteInstallingHost,
+		config.PluginRemoteInstallingPort,
+	)
+
+	response := stream.NewStreamResponse[*RemotePluginRuntime](
+		config.PluginRemoteInstallingMaxConn,
+	)
+
+	multicore := true
+	s := &DifyServer{
+		addr:      addr,
+		multicore: multicore,
+		num_loops: config.PluginRemoteInstallServerEventLoopNums,
+		response:  response,
+
+		plugins:      make(map[int]*RemotePluginRuntime),
+		plugins_lock: &sync.RWMutex{},
+
+		shutdown_chan: make(chan bool),
+	}
+
+	manager := &RemotePluginServer{
+		server: s,
+	}
+
+	return manager
+}

+ 128 - 0
internal/core/plugin_manager/remote_manager/server_test.go

@@ -0,0 +1,128 @@
+package remote_manager
+
+import (
+	"fmt"
+	"net"
+	"testing"
+	"time"
+
+	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
+)
+
+func preparePluginServer(t *testing.T) (*RemotePluginServer, uint16) {
+	// generate a random port
+	listener, err := net.Listen("tcp", ":0")
+	if err != nil {
+		t.Errorf("failed to get a random port: %s", err.Error())
+		return nil, 0
+	}
+	listener.Close()
+
+	port := listener.Addr().(*net.TCPAddr).Port
+
+	// start plugin server
+	return NewRemotePluginServer(&app.Config{
+		PluginRemoteInstallingHost:             "0.0.0.0",
+		PluginRemoteInstallingPort:             uint16(port),
+		PluginRemoteInstallingMaxConn:          1,
+		PluginRemoteInstallServerEventLoopNums: 8,
+	}), uint16(port)
+}
+
+// TestLaunchAndClosePluginServer tests the launch and close of the plugin server
+func TestLaunchAndClosePluginServer(t *testing.T) {
+	// start plugin server
+	server, _ := preparePluginServer(t)
+	if server == nil {
+		return
+	}
+
+	done_chan := make(chan error)
+
+	go func() {
+		err := server.Launch()
+		if err != nil {
+			done_chan <- err
+		}
+	}()
+
+	timer := time.NewTimer(time.Second * 5)
+
+	select {
+	case err := <-done_chan:
+		t.Errorf("failed to launch plugin server: %s", err.Error())
+		return
+	case <-timer.C:
+		err := server.Stop()
+		if err != nil {
+			t.Errorf("failed to stop plugin server: %s", err.Error())
+			return
+		}
+	}
+}
+
+// TestAcceptConnection tests the acceptance of the connection
+func TestAcceptConnection(t *testing.T) {
+	server, port := preparePluginServer(t)
+	if server == nil {
+		return
+	}
+	defer server.Stop()
+	go func() {
+		server.Launch()
+	}()
+
+	got_connection := false
+
+	go func() {
+		for server.Next() {
+			runtime, err := server.Read()
+			if err != nil {
+				t.Errorf("failed to read plugin runtime: %s", err.Error())
+				return
+			}
+
+			got_connection = true
+
+			time.Sleep(time.Second * 2)
+			runtime.Stop()
+		}
+	}()
+
+	// wait for the server to start
+	time.Sleep(time.Second * 2)
+
+	conn, err := net.Dial("tcp", fmt.Sprintf("0.0.0.0:%d", port))
+	if err != nil {
+		t.Errorf("failed to connect to plugin server: %s", err.Error())
+		return
+	}
+
+	closed_chan := make(chan bool)
+
+	go func() {
+		// block here to accept messages until the connection is closed
+		buffer := make([]byte, 1024)
+		for {
+			_, err := conn.Read(buffer)
+			if err != nil {
+				break
+			}
+		}
+		close(closed_chan)
+	}()
+
+	select {
+	case <-time.After(time.Second * 10):
+		// connection not closed
+		t.Errorf("connection not closed normally")
+		return
+	case <-closed_chan:
+		// success
+		if !got_connection {
+			t.Errorf("failed to accept connection")
+			return
+		}
+		return
+	}
+}

+ 58 - 0
internal/core/plugin_manager/remote_manager/type.go

@@ -0,0 +1,58 @@
+package remote_manager
+
+import (
+	"sync"
+	"time"
+
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
+	"github.com/panjf2000/gnet/v2"
+)
+
+type RemotePluginRuntime struct {
+	entities.PluginRuntime
+
+	// connection
+	conn gnet.Conn
+
+	// response entity to accept new events
+	response *stream.StreamResponse[[]byte]
+
+	// callbacks for each session
+	callbacks      map[string][]func([]byte)
+	callbacks_lock *sync.RWMutex
+
+	// channel to notify all waiting routines
+	shutdown_chan chan bool
+
+	// heartbeat
+	last_active_at time.Time
+
+	alive bool
+}
+
+// Listen creates a new listener for the given session_id
+// session id is an unique identifier for a request
+func (r *RemotePluginRuntime) addCallback(session_id string, fn func([]byte)) {
+	r.callbacks_lock.Lock()
+	if _, ok := r.callbacks[session_id]; !ok {
+		r.callbacks[session_id] = make([]func([]byte), 0)
+	}
+	r.callbacks[session_id] = append(r.callbacks[session_id], fn)
+	r.callbacks_lock.Unlock()
+}
+
+// removeCallback removes the listener for the given session_id
+func (r *RemotePluginRuntime) removeCallback(session_id string) {
+	r.callbacks_lock.Lock()
+	delete(r.callbacks, session_id)
+	r.callbacks_lock.Unlock()
+}
+
+func (r *RemotePluginRuntime) close() {
+	// close shutdown channel to notify all waiting routines
+	close(r.shutdown_chan)
+
+	// close response to stop current plugin
+	r.response.Close()
+}

+ 0 - 46
internal/core/plugin_manager/tcp_connector.go

@@ -1,46 +0,0 @@
-package plugin_manager
-
-import (
-	"fmt"
-
-	"github.com/panjf2000/gnet/v2"
-)
-
-type difyServer struct {
-	gnet.BuiltinEventEngine
-
-	eng       gnet.Engine
-	addr      string
-	multicore bool
-}
-
-func (s *difyServer) OnOpen(c gnet.Conn) (out []byte, action gnet.Action) {
-	c.SetContext(&codec{})
-	return nil, gnet.None
-}
-
-func (s *difyServer) OnBoot(c gnet.Engine) (action gnet.Action) {
-	return gnet.None
-}
-
-func (s *difyServer) OnTraffic(c gnet.Conn) (action gnet.Action) {
-	codec := c.Context().(*codec)
-	messages, err := codec.Decode(c)
-	if err != nil {
-		return gnet.Close
-	}
-
-	for _, message := range messages {
-		fmt.Println(message)
-	}
-
-	return gnet.None
-}
-
-func traffic() {
-	addr := "tcp://:9000"
-	multicore := true
-	s := &difyServer{addr: addr, multicore: multicore}
-
-	gnet.Run(s, addr, gnet.WithMulticore(multicore), gnet.WithNumEventLoop(8))
-}

+ 0 - 55
internal/core/plugin_manager/tcp_protocol.go

@@ -1,55 +0,0 @@
-package plugin_manager
-
-import (
-	"bytes"
-	"errors"
-
-	"github.com/panjf2000/gnet/v2"
-)
-
-type codec struct {
-	buf bytes.Buffer
-}
-
-func (w *codec) Decode(c gnet.Conn) ([][]byte, error) {
-	size := c.InboundBuffered()
-	buf := make([]byte, size)
-	read, err := c.Read(buf)
-
-	if err != nil {
-		return nil, err
-	}
-
-	if read < size {
-		return nil, errors.New("read less than size")
-	}
-
-	// use \ as escape character, as for \ itself, it should be escaped as well
-	var start int
-	var result [][]byte = make([][]byte, 0)
-	var current_line []byte = make([]byte, 0)
-	for i := 0; i < size; i++ {
-		if buf[i] == '\\' {
-			// write to current line
-			current_line = append(current_line, buf[start:i]...)
-			start = i + 1
-			i++
-			continue
-		}
-
-		if buf[i] == '\n' {
-			// write to current line
-			current_line = append(current_line, buf[start:i]...)
-			result = append(result, current_line)
-			current_line = make([]byte, 0)
-			start = i + 1
-		}
-	}
-
-	// for the last line, write it to buffer
-	if start < size {
-		w.buf.Write(buf[start:size])
-	}
-
-	return result, nil
-}

+ 26 - 3
internal/core/plugin_manager/watcher.go

@@ -7,6 +7,7 @@ import (
 
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/aws_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/local_manager"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/remote_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
@@ -22,6 +23,31 @@ func startWatcher(config *app.Config) {
 			handleNewPlugins(config)
 		}
 	}()
+
+	startRemoteWatcher(config)
+}
+
+func startRemoteWatcher(config *app.Config) {
+	// launch TCP debugging server if enabled
+	if config.PluginRemoteInstallingEnabled {
+		server := remote_manager.NewRemotePluginServer(config)
+		go func() {
+			err := server.Launch()
+			if err != nil {
+				log.Error("start remote plugin server failed: %s", err.Error())
+			}
+		}()
+		go func() {
+			for server.Next() {
+				plugin, err := server.Read()
+				if err != nil {
+					log.Error("encounter error: %s", err.Error())
+					continue
+				}
+				lifetime(config, plugin)
+			}
+		}()
+	}
 }
 
 func handleNewPlugins(config *app.Config) {
@@ -43,9 +69,6 @@ func handleNewPlugins(config *app.Config) {
 		}
 
 		log.Info("loaded plugin: %s", plugin.Config.Identity())
-
-		m.Store(plugin.Config.Identity(), plugin_interface)
-
 		routine.Submit(func() {
 			lifetime(config, plugin_interface)
 		})

+ 5 - 2
internal/types/app/config.go

@@ -6,8 +6,11 @@ type Config struct {
 	PluginInnerApiKey string `envconfig:"PLUGIN_INNER_API_KEY"`
 	PluginInnerApiURL string `envconfig:"PLUGIN_INNER_API_URL"`
 
-	PluginRemoteInstallingHost string `envconfig:"PLUGIN_REMOTE_INSTALLING_HOST"`
-	PluginRemoteInstallingPort int16  `envconfig:"PLUGIN_REMOTE_INSTALLING_PORT"`
+	PluginRemoteInstallingHost             string `envconfig:"PLUGIN_REMOTE_INSTALLING_HOST"`
+	PluginRemoteInstallingPort             uint16 `envconfig:"PLUGIN_REMOTE_INSTALLING_PORT"`
+	PluginRemoteInstallingEnabled          bool   `envconfig:"PLUGIN_REMOTE_INSTALLING_ENABLED"`
+	PluginRemoteInstallingMaxConn          int    `envconfig:"PLUGIN_REMOTE_INSTALLING_MAX_CONN"`
+	PluginRemoteInstallServerEventLoopNums int    `envconfig:"PLUGIN_REMOTE_INSTALL_SERVER_EVENT_LOOP_NUMS"`
 
 	StoragePath        string `envconfig:"STORAGE_PATH"`
 	ProcessCachingPath string `envconfig:"PROCESS_CACHING_PATH"`

+ 0 - 1
internal/types/entities/runtime.go

@@ -31,7 +31,6 @@ type (
 	PluginRuntimeSessionIOInterface interface {
 		Listen(session_id string) *BytesIOListener
 		Write(session_id string, data []byte)
-		Request(session_id string, data []byte) ([]byte, error)
 	}
 )
 

+ 10 - 0
internal/utils/stream/response.go

@@ -30,6 +30,10 @@ func (r *StreamResponse[T]) OnClose(f func()) {
 	r.onClose = f
 }
 
+// Next returns true if there are more data to be read
+// and waits for the next data to be available
+// returns false if the stream is closed
+// NOTE: even if the stream is closed, it will return true if there is data available
 func (r *StreamResponse[T]) Next() bool {
 	r.l.Lock()
 	if r.closed && r.q.Len() == 0 && r.err == nil {
@@ -51,6 +55,8 @@ func (r *StreamResponse[T]) Next() bool {
 	return <-r.sig
 }
 
+// Read reads buffered data from the stream and
+// it returns error only if the buffer is empty or an error is written to the stream
 func (r *StreamResponse[T]) Read() (T, error) {
 	r.l.Lock()
 	defer r.l.Unlock()
@@ -70,6 +76,8 @@ func (r *StreamResponse[T]) Read() (T, error) {
 	}
 }
 
+// Write writes data to the stream
+// returns error if the buffer is full
 func (r *StreamResponse[T]) Write(data T) error {
 	r.l.Lock()
 	if r.closed {
@@ -92,6 +100,7 @@ func (r *StreamResponse[T]) Write(data T) error {
 	return nil
 }
 
+// Close closes the stream
 func (r *StreamResponse[T]) Close() {
 	r.l.Lock()
 	if r.closed {
@@ -125,6 +134,7 @@ func (r *StreamResponse[T]) Size() int {
 	return r.q.Len()
 }
 
+// WriteError writes an error to the stream
 func (r *StreamResponse[T]) WriteError(err error) {
 	r.l.Lock()
 	defer r.l.Unlock()