Pārlūkot izejas kodu

feat: add plugin access action to session write method (#64)

Modify session write method to include a new plugin access action parameter across multiple runtime implementations. This change allows for more granular tracking and handling of plugin interactions by introducing an optional action context to the write method.
Yeuoly 4 mēneši atpakaļ
vecāks
revīzija
1944a829d2

+ 2 - 1
internal/cluster/plugin_test.go

@@ -5,6 +5,7 @@ import (
 	"time"
 	"time"
 
 
 	"github.com/google/uuid"
 	"github.com/google/uuid"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_runtime"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_runtime"
 	"github.com/langgenius/dify-plugin-daemon/pkg/entities"
 	"github.com/langgenius/dify-plugin-daemon/pkg/entities"
 	"github.com/langgenius/dify-plugin-daemon/pkg/entities/manifest_entities"
 	"github.com/langgenius/dify-plugin-daemon/pkg/entities/manifest_entities"
@@ -44,7 +45,7 @@ func (r *fakePlugin) Listen(string) *entities.Broadcast[plugin_entities.SessionM
 	return nil
 	return nil
 }
 }
 
 
-func (r *fakePlugin) Write(string, []byte) {
+func (r *fakePlugin) Write(string, access_types.PluginAccessAction, []byte) {
 }
 }
 
 
 func getRandomPluginRuntime() fakePlugin {
 func getRandomPluginRuntime() fakePlugin {

+ 1 - 1
internal/core/plugin_daemon/backwards_invocation/transaction/full_duplex_event_writer.go

@@ -20,7 +20,7 @@ func NewFullDuplexEventWriter(session *session_manager.Session) *FullDuplexTrans
 }
 }
 
 
 func (w *FullDuplexTransactionWriter) Write(event session_manager.PLUGIN_IN_STREAM_EVENT, data any) error {
 func (w *FullDuplexTransactionWriter) Write(event session_manager.PLUGIN_IN_STREAM_EVENT, data any) error {
-	return w.session.Write(event, data)
+	return w.session.Write(event, "", data)
 }
 }
 
 
 func (w *FullDuplexTransactionWriter) Done() {
 func (w *FullDuplexTransactionWriter) Done() {

+ 1 - 0
internal/core/plugin_daemon/generic.go

@@ -88,6 +88,7 @@ func GenericInvokePlugin[Req any, Rsp any](
 
 
 	session.Write(
 	session.Write(
 		session_manager.PLUGIN_IN_STREAM_EVENT_REQUEST,
 		session_manager.PLUGIN_IN_STREAM_EVENT_REQUEST,
+		session.Action,
 		getInvokePluginMap(
 		getInvokePluginMap(
 			session,
 			session,
 			request,
 			request,

+ 2 - 1
internal/core/plugin_manager/debugging_runtime/io.go

@@ -3,6 +3,7 @@ package debugging_runtime
 import (
 import (
 	"encoding/json"
 	"encoding/json"
 
 
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/exception"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/exception"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
@@ -51,7 +52,7 @@ func (r *RemotePluginRuntime) Listen(session_id string) *entities.Broadcast[plug
 	return listener
 	return listener
 }
 }
 
 
-func (r *RemotePluginRuntime) Write(session_id string, data []byte) {
+func (r *RemotePluginRuntime) Write(session_id string, action access_types.PluginAccessAction, data []byte) {
 	r.conn.AsyncWrite(append(data, '\n'), func(c gnet.Conn, err error) error {
 	r.conn.AsyncWrite(append(data, '\n'), func(c gnet.Conn, err error) error {
 		return nil
 		return nil
 	})
 	})

+ 2 - 1
internal/core/plugin_manager/local_runtime/io.go

@@ -1,6 +1,7 @@
 package local_runtime
 package local_runtime
 
 
 import (
 import (
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
 	"github.com/langgenius/dify-plugin-daemon/pkg/entities"
 	"github.com/langgenius/dify-plugin-daemon/pkg/entities"
@@ -25,6 +26,6 @@ func (r *LocalPluginRuntime) Listen(session_id string) *entities.Broadcast[plugi
 	return listener
 	return listener
 }
 }
 
 
-func (r *LocalPluginRuntime) Write(session_id string, data []byte) {
+func (r *LocalPluginRuntime) Write(session_id string, action access_types.PluginAccessAction, data []byte) {
 	writeToStdioHandler(r.ioIdentity, append(data, '\n'))
 	writeToStdioHandler(r.ioIdentity, append(data, '\n'))
 }
 }

+ 4 - 1
internal/core/plugin_manager/serverless_runtime/io.go

@@ -9,6 +9,7 @@ import (
 	"net/url"
 	"net/url"
 	"time"
 	"time"
 
 
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
@@ -24,7 +25,7 @@ func (r *AWSPluginRuntime) Listen(sessionId string) *entities.Broadcast[plugin_e
 }
 }
 
 
 // For AWS Lambda, write is equivalent to http request, it's not a normal stream like stdio and tcp
 // For AWS Lambda, write is equivalent to http request, it's not a normal stream like stdio and tcp
-func (r *AWSPluginRuntime) Write(sessionId string, data []byte) {
+func (r *AWSPluginRuntime) Write(sessionId string, action access_types.PluginAccessAction, data []byte) {
 	l, ok := r.listeners.Load(sessionId)
 	l, ok := r.listeners.Load(sessionId)
 	if !ok {
 	if !ok {
 		log.Error("session %s not found", sessionId)
 		log.Error("session %s not found", sessionId)
@@ -45,6 +46,8 @@ func (r *AWSPluginRuntime) Write(sessionId string, data []byte) {
 		return
 		return
 	}
 	}
 
 
+	url += "?action=" + string(action)
+
 	connectTime := 240 * time.Second
 	connectTime := 240 * time.Second
 
 
 	// create a new http request
 	// create a new http request

+ 2 - 1
internal/core/plugin_manager/watcher_test.go

@@ -5,6 +5,7 @@ import (
 	"time"
 	"time"
 
 
 	"github.com/google/uuid"
 	"github.com/google/uuid"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_runtime"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/basic_runtime"
 	"github.com/langgenius/dify-plugin-daemon/internal/oss/local"
 	"github.com/langgenius/dify-plugin-daemon/internal/oss/local"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
@@ -47,7 +48,7 @@ func (r *fakePlugin) Listen(string) *entities.Broadcast[plugin_entities.SessionM
 	return nil
 	return nil
 }
 }
 
 
-func (r *fakePlugin) Write(string, []byte) {
+func (r *fakePlugin) Write(string, access_types.PluginAccessAction, []byte) {
 }
 }
 
 
 func (r *fakePlugin) WaitStarted() <-chan bool {
 func (r *fakePlugin) WaitStarted() <-chan bool {

+ 2 - 2
internal/core/session_manager/session.go

@@ -177,10 +177,10 @@ func (s *Session) Message(event PLUGIN_IN_STREAM_EVENT, data any) []byte {
 	})
 	})
 }
 }
 
 
-func (s *Session) Write(event PLUGIN_IN_STREAM_EVENT, data any) error {
+func (s *Session) Write(event PLUGIN_IN_STREAM_EVENT, action access_types.PluginAccessAction, data any) error {
 	if s.runtime == nil {
 	if s.runtime == nil {
 		return errors.New("runtime not bound")
 		return errors.New("runtime not bound")
 	}
 	}
-	s.runtime.Write(s.ID, s.Message(event, data))
+	s.runtime.Write(s.ID, action, s.Message(event, data))
 	return nil
 	return nil
 }
 }

+ 2 - 1
pkg/entities/plugin_entities/runtime.go

@@ -9,6 +9,7 @@ import (
 	"hash/fnv"
 	"hash/fnv"
 	"time"
 	"time"
 
 
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
 	"github.com/langgenius/dify-plugin-daemon/pkg/entities"
 	"github.com/langgenius/dify-plugin-daemon/pkg/entities"
 )
 )
 
 
@@ -71,7 +72,7 @@ type (
 		// Listen listens for messages from the plugin
 		// Listen listens for messages from the plugin
 		Listen(session_id string) *entities.Broadcast[SessionMessage]
 		Listen(session_id string) *entities.Broadcast[SessionMessage]
 		// Write writes a message to the plugin
 		// Write writes a message to the plugin
-		Write(session_id string, data []byte)
+		Write(session_id string, action access_types.PluginAccessAction, data []byte)
 		// Log adds a log to the plugin runtime state
 		// Log adds a log to the plugin runtime state
 		Log(string)
 		Log(string)
 		// Warn adds a warning to the plugin runtime state
 		// Warn adds a warning to the plugin runtime state