浏览代码

refactor: backwards invocation

Yeuoly 1 年之前
父节点
当前提交
11989403be
共有 30 个文件被更改,包括 207 次插入1207 次删除
  1. 3 3
      internal/cluster/node.go
  2. 5 5
      internal/cluster/plugin.go
  3. 4 5
      internal/cluster/plugin_test.go
  4. 15 4
      internal/core/plugin_daemon/backwards_invocation/request.go
  5. 25 16
      internal/core/plugin_daemon/backwards_invocation/task.go
  6. 21 22
      internal/core/plugin_daemon/backwards_invocation/task_test.go
  7. 4 0
      internal/core/plugin_daemon/backwards_invocation/transaction/aws_event_handler.go
  8. 29 0
      internal/core/plugin_daemon/backwards_invocation/transaction/aws_event_writer.go
  9. 27 0
      internal/core/plugin_daemon/backwards_invocation/transaction/full_duplex_event_writer.go
  10. 9 1
      internal/core/plugin_daemon/generic.go
  11. 0 576
      internal/core/plugin_manager/aws_manager/full_duplex_simulator.go
  12. 0 512
      internal/core/plugin_manager/aws_manager/full_duplex_simulator_test.go
  13. 3 3
      internal/core/plugin_manager/aws_manager/packager.go
  14. 4 4
      internal/core/plugin_manager/aws_manager/packager_test.go
  15. 3 3
      internal/core/plugin_manager/aws_manager/run.go
  16. 2 2
      internal/core/plugin_manager/aws_manager/type.go
  17. 3 3
      internal/core/plugin_manager/init.go
  18. 2 2
      internal/core/plugin_manager/lifetime.go
  19. 3 3
      internal/core/plugin_manager/local_manager/run.go
  20. 2 2
      internal/core/plugin_manager/local_manager/type.go
  21. 7 7
      internal/core/plugin_manager/manager.go
  22. 2 3
      internal/core/plugin_manager/remote_manager/run.go
  23. 2 2
      internal/core/plugin_manager/remote_manager/type.go
  24. 5 6
      internal/core/plugin_manager/watcher.go
  25. 4 3
      internal/core/session_manager/session.go
  26. 3 3
      internal/service/install_service/state.go
  27. 6 2
      internal/types/entities/plugin_entities/event.go
  28. 8 8
      internal/types/entities/runtime.go
  29. 1 1
      internal/types/entities/runtime_test.go
  30. 5 6
      internal/types/models/plugin.go

+ 3 - 3
internal/cluster/node.go

@@ -7,7 +7,7 @@ import (
 	"sync/atomic"
 	"time"
 
-	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/network"
@@ -110,7 +110,7 @@ func (c *Cluster) GetNodes() (map[string]node, error) {
 
 // FetchPluginAvailableNodesByHashedId fetches the available nodes of the given plugin
 func (c *Cluster) FetchPluginAvailableNodesByHashedId(hashed_plugin_id string) ([]string, error) {
-	states, err := cache.ScanMap[entities.PluginRuntimeState](
+	states, err := cache.ScanMap[plugin_entities.PluginRuntimeState](
 		PLUGIN_STATE_MAP_KEY, c.getScanPluginsByIdKey(hashed_plugin_id),
 	)
 	if err != nil {
@@ -132,7 +132,7 @@ func (c *Cluster) FetchPluginAvailableNodesByHashedId(hashed_plugin_id string) (
 }
 
 func (c *Cluster) FetchPluginAvailableNodesById(plugin_id string) ([]string, error) {
-	hashed_plugin_id := entities.HashedIdentity(plugin_id)
+	hashed_plugin_id := plugin_entities.HashedIdentity(plugin_id)
 	return c.FetchPluginAvailableNodesByHashedId(hashed_plugin_id)
 }
 

+ 5 - 5
internal/cluster/plugin.go

@@ -6,23 +6,23 @@ import (
 	"sync/atomic"
 	"time"
 
-	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 )
 
 type pluginLifeTime struct {
-	lifetime          entities.PluginRuntimeTimeLifeInterface
+	lifetime          plugin_entities.PluginRuntimeTimeLifeInterface
 	last_scheduled_at time.Time
 }
 
 type pluginState struct {
-	entities.PluginRuntimeState
+	plugin_entities.PluginRuntimeState
 	Identity string `json:"identity"`
 }
 
 // RegisterPlugin registers a plugin to the cluster, and start to be scheduled
-func (c *Cluster) RegisterPlugin(lifetime entities.PluginRuntimeTimeLifeInterface) error {
+func (c *Cluster) RegisterPlugin(lifetime plugin_entities.PluginRuntimeTimeLifeInterface) error {
 	identity, err := lifetime.Identity()
 	if err != nil {
 		return err
@@ -179,7 +179,7 @@ func (c *Cluster) forceGCNodePlugin(node_id string, plugin_id string) error {
 		c.plugin_lock.Unlock()
 	}
 
-	if err := c.removePluginState(node_id, entities.HashedIdentity(plugin_id)); err != nil {
+	if err := c.removePluginState(node_id, plugin_entities.HashedIdentity(plugin_id)); err != nil {
 		return err
 	}
 

+ 4 - 5
internal/cluster/plugin_test.go

@@ -6,12 +6,11 @@ import (
 
 	"github.com/google/uuid"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
-	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 )
 
 type fakePlugin struct {
-	entities.PluginRuntime
+	plugin_entities.PluginRuntime
 	positive_manager.PositivePluginRuntime
 }
 
@@ -31,8 +30,8 @@ func (r *fakePlugin) StartPlugin() error {
 	return nil
 }
 
-func (r *fakePlugin) Type() entities.PluginRuntimeType {
-	return entities.PLUGIN_RUNTIME_TYPE_LOCAL
+func (r *fakePlugin) Type() plugin_entities.PluginRuntimeType {
+	return plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL
 }
 
 func (r *fakePlugin) Wait() (<-chan bool, error) {
@@ -41,7 +40,7 @@ func (r *fakePlugin) Wait() (<-chan bool, error) {
 
 func getRandomPluginRuntime() fakePlugin {
 	return fakePlugin{
-		PluginRuntime: entities.PluginRuntime{
+		PluginRuntime: plugin_entities.PluginRuntime{
 			Config: plugin_entities.PluginDeclaration{
 				Name:      uuid.New().String(),
 				Version:   "0.0.1",

+ 15 - 4
internal/core/plugin_daemon/backwards_invocation/request.go

@@ -9,22 +9,32 @@ import (
 
 type BackwardsInvocationType = dify_invocation.InvokeType
 
+type BackwardsInvocationWriter interface {
+	Write(event session_manager.PLUGIN_IN_STREAM_EVENT, data any)
+	Done()
+}
+
 type BackwardsInvocation struct {
 	typ              BackwardsInvocationType
 	id               string
 	detailed_request map[string]any
 	session          *session_manager.Session
+	writer           BackwardsInvocationWriter
 }
 
 func NewBackwardsInvocation(
 	typ BackwardsInvocationType,
-	id string, session *session_manager.Session, detailed_request map[string]any,
+	id string,
+	session *session_manager.Session,
+	writer BackwardsInvocationWriter,
+	detailed_request map[string]any,
 ) *BackwardsInvocation {
 	return &BackwardsInvocation{
 		typ:              typ,
 		id:               id,
 		detailed_request: detailed_request,
 		session:          session,
+		writer:           writer,
 	}
 }
 
@@ -33,24 +43,25 @@ func (bi *BackwardsInvocation) GetID() string {
 }
 
 func (bi *BackwardsInvocation) WriteError(err error) {
-	bi.session.Write(
+	bi.writer.Write(
 		session_manager.PLUGIN_IN_STREAM_EVENT_RESPONSE,
 		NewErrorEvent(bi.id, err.Error()),
 	)
 }
 
 func (bi *BackwardsInvocation) WriteResponse(message string, data any) {
-	bi.session.Write(
+	bi.writer.Write(
 		session_manager.PLUGIN_IN_STREAM_EVENT_RESPONSE,
 		NewResponseEvent(bi.id, message, data),
 	)
 }
 
 func (bi *BackwardsInvocation) EndResponse() {
-	bi.session.Write(
+	bi.writer.Write(
 		session_manager.PLUGIN_IN_STREAM_EVENT_RESPONSE,
 		NewEndEvent(bi.id),
 	)
+	bi.writer.Done()
 }
 
 func (bi *BackwardsInvocation) Type() BackwardsInvocationType {

+ 25 - 16
internal/core/plugin_daemon/backwards_invocation/task.go

@@ -6,17 +6,19 @@ import (
 	"github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
-	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/model_entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/tool_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
 )
 
 func InvokeDify(
-	runtime entities.PluginRuntimeInterface,
+	runtime plugin_entities.PluginRuntimeInterface,
 	invoke_from access_types.PluginAccessType,
-	session *session_manager.Session, data []byte,
+	session *session_manager.Session,
+	writer BackwardsInvocationWriter,
+	data []byte,
 ) error {
 	// unmarshal invoke data
 	request, err := parser.UnmarshalJsonBytes2Map(data)
@@ -29,7 +31,7 @@ func InvokeDify(
 	}
 
 	// prepare invocation arguments
-	request_handle, err := prepareDifyInvocationArguments(session, request)
+	request_handle, err := prepareDifyInvocationArguments(session, writer, request)
 	if err != nil {
 		return err
 	}
@@ -59,49 +61,49 @@ func InvokeDify(
 var (
 	permissionMapping = map[dify_invocation.InvokeType]map[string]any{
 		dify_invocation.INVOKE_TYPE_TOOL: {
-			"func": func(runtime entities.PluginRuntimeTimeLifeInterface) bool {
+			"func": func(runtime plugin_entities.PluginRuntimeTimeLifeInterface) bool {
 				return runtime.Configuration().Resource.Permission.AllowInvokeTool()
 			},
 			"error": "permission denied, you need to enable tool access in plugin manifest",
 		},
 		dify_invocation.INVOKE_TYPE_LLM: {
-			"func": func(runtime entities.PluginRuntimeTimeLifeInterface) bool {
+			"func": func(runtime plugin_entities.PluginRuntimeTimeLifeInterface) bool {
 				return runtime.Configuration().Resource.Permission.AllowInvokeLLM()
 			},
 			"error": "permission denied, you need to enable llm access in plugin manifest",
 		},
 		dify_invocation.INVOKE_TYPE_TEXT_EMBEDDING: {
-			"func": func(runtime entities.PluginRuntimeTimeLifeInterface) bool {
+			"func": func(runtime plugin_entities.PluginRuntimeTimeLifeInterface) bool {
 				return runtime.Configuration().Resource.Permission.AllowInvokeTextEmbedding()
 			},
 			"error": "permission denied, you need to enable text-embedding access in plugin manifest",
 		},
 		dify_invocation.INVOKE_TYPE_RERANK: {
-			"func": func(runtime entities.PluginRuntimeTimeLifeInterface) bool {
+			"func": func(runtime plugin_entities.PluginRuntimeTimeLifeInterface) bool {
 				return runtime.Configuration().Resource.Permission.AllowInvokeRerank()
 			},
 			"error": "permission denied, you need to enable rerank access in plugin manifest",
 		},
 		dify_invocation.INVOKE_TYPE_TTS: {
-			"func": func(runtime entities.PluginRuntimeTimeLifeInterface) bool {
+			"func": func(runtime plugin_entities.PluginRuntimeTimeLifeInterface) bool {
 				return runtime.Configuration().Resource.Permission.AllowInvokeTTS()
 			},
 			"error": "permission denied, you need to enable tts access in plugin manifest",
 		},
 		dify_invocation.INVOKE_TYPE_SPEECH2TEXT: {
-			"func": func(runtime entities.PluginRuntimeTimeLifeInterface) bool {
+			"func": func(runtime plugin_entities.PluginRuntimeTimeLifeInterface) bool {
 				return runtime.Configuration().Resource.Permission.AllowInvokeSpeech2Text()
 			},
 			"error": "permission denied, you need to enable speech2text access in plugin manifest",
 		},
 		dify_invocation.INVOKE_TYPE_MODERATION: {
-			"func": func(runtime entities.PluginRuntimeTimeLifeInterface) bool {
+			"func": func(runtime plugin_entities.PluginRuntimeTimeLifeInterface) bool {
 				return runtime.Configuration().Resource.Permission.AllowInvokeModeration()
 			},
 			"error": "permission denied, you need to enable moderation access in plugin manifest",
 		},
 		dify_invocation.INVOKE_TYPE_NODE: {
-			"func": func(runtime entities.PluginRuntimeTimeLifeInterface) bool {
+			"func": func(runtime plugin_entities.PluginRuntimeTimeLifeInterface) bool {
 				return runtime.Configuration().Resource.Permission.AllowInvokeNode()
 			},
 			"error": "permission denied, you need to enable node access in plugin manifest",
@@ -109,13 +111,13 @@ var (
 	}
 )
 
-func checkPermission(runtime entities.PluginRuntimeTimeLifeInterface, request_handle *BackwardsInvocation) error {
+func checkPermission(runtime plugin_entities.PluginRuntimeTimeLifeInterface, request_handle *BackwardsInvocation) error {
 	permission, ok := permissionMapping[request_handle.Type()]
 	if !ok {
 		return fmt.Errorf("unsupported invoke type: %s", request_handle.Type())
 	}
 
-	permission_func, ok := permission["func"].(func(runtime entities.PluginRuntimeTimeLifeInterface) bool)
+	permission_func, ok := permission["func"].(func(runtime plugin_entities.PluginRuntimeTimeLifeInterface) bool)
 	if !ok {
 		return fmt.Errorf("permission function not found: %s", request_handle.Type())
 	}
@@ -127,7 +129,11 @@ func checkPermission(runtime entities.PluginRuntimeTimeLifeInterface, request_ha
 	return nil
 }
 
-func prepareDifyInvocationArguments(session *session_manager.Session, request map[string]any) (*BackwardsInvocation, error) {
+func prepareDifyInvocationArguments(
+	session *session_manager.Session,
+	writer BackwardsInvocationWriter,
+	request map[string]any,
+) (*BackwardsInvocation, error) {
 	typ, ok := request["type"].(string)
 	if !ok {
 		return nil, fmt.Errorf("invoke request missing type: %s", request)
@@ -147,7 +153,10 @@ func prepareDifyInvocationArguments(session *session_manager.Session, request ma
 
 	return NewBackwardsInvocation(
 		BackwardsInvocationType(typ),
-		backwards_request_id, session, detailed_request,
+		backwards_request_id,
+		session,
+		writer,
+		detailed_request,
 	), nil
 }
 

+ 21 - 22
internal/core/plugin_daemon/backwards_invocation/task_test.go

@@ -5,12 +5,11 @@ import (
 
 	"github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
-	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 )
 
 type TPluginRuntime struct {
-	entities.PluginRuntime
+	plugin_entities.PluginRuntime
 	positive_manager.PositivePluginRuntime
 }
 
@@ -30,8 +29,8 @@ func (r *TPluginRuntime) StartPlugin() error {
 	return nil
 }
 
-func (r *TPluginRuntime) Type() entities.PluginRuntimeType {
-	return entities.PLUGIN_RUNTIME_TYPE_LOCAL
+func (r *TPluginRuntime) Type() plugin_entities.PluginRuntimeType {
+	return plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL
 }
 
 func (r *TPluginRuntime) Wait() (<-chan bool, error) {
@@ -40,7 +39,7 @@ func (r *TPluginRuntime) Wait() (<-chan bool, error) {
 
 func TestBackwardsInvocationAllPermittedPermission(t *testing.T) {
 	all_permitted_runtime := TPluginRuntime{
-		PluginRuntime: entities.PluginRuntime{
+		PluginRuntime: plugin_entities.PluginRuntime{
 			Config: plugin_entities.PluginDeclaration{
 				Resource: plugin_entities.PluginResourceRequirement{
 					Permission: &plugin_entities.PluginPermissionRequirement{
@@ -65,42 +64,42 @@ func TestBackwardsInvocationAllPermittedPermission(t *testing.T) {
 		},
 	}
 
-	invoke_llm_request := NewBackwardsInvocation(dify_invocation.INVOKE_TYPE_LLM, "", nil, nil)
+	invoke_llm_request := NewBackwardsInvocation(dify_invocation.INVOKE_TYPE_LLM, "", nil, nil, nil)
 	if err := checkPermission(&all_permitted_runtime, invoke_llm_request); err != nil {
 		t.Errorf("checkPermission failed: %s", err.Error())
 	}
 
-	invoke_text_embedding_request := NewBackwardsInvocation(dify_invocation.INVOKE_TYPE_TEXT_EMBEDDING, "", nil, nil)
+	invoke_text_embedding_request := NewBackwardsInvocation(dify_invocation.INVOKE_TYPE_TEXT_EMBEDDING, "", nil, nil, nil)
 	if err := checkPermission(&all_permitted_runtime, invoke_text_embedding_request); err != nil {
 		t.Errorf("checkPermission failed: %s", err.Error())
 	}
 
-	invoke_rerank_request := NewBackwardsInvocation(dify_invocation.INVOKE_TYPE_RERANK, "", nil, nil)
+	invoke_rerank_request := NewBackwardsInvocation(dify_invocation.INVOKE_TYPE_RERANK, "", nil, nil, nil)
 	if err := checkPermission(&all_permitted_runtime, invoke_rerank_request); err != nil {
 		t.Errorf("checkPermission failed: %s", err.Error())
 	}
 
-	invoke_tts_request := NewBackwardsInvocation(dify_invocation.INVOKE_TYPE_TTS, "", nil, nil)
+	invoke_tts_request := NewBackwardsInvocation(dify_invocation.INVOKE_TYPE_TTS, "", nil, nil, nil)
 	if err := checkPermission(&all_permitted_runtime, invoke_tts_request); err != nil {
 		t.Errorf("checkPermission failed: %s", err.Error())
 	}
 
-	invoke_speech2text_request := NewBackwardsInvocation(dify_invocation.INVOKE_TYPE_SPEECH2TEXT, "", nil, nil)
+	invoke_speech2text_request := NewBackwardsInvocation(dify_invocation.INVOKE_TYPE_SPEECH2TEXT, "", nil, nil, nil)
 	if err := checkPermission(&all_permitted_runtime, invoke_speech2text_request); err != nil {
 		t.Errorf("checkPermission failed: %s", err.Error())
 	}
 
-	invoke_moderation_request := NewBackwardsInvocation(dify_invocation.INVOKE_TYPE_MODERATION, "", nil, nil)
+	invoke_moderation_request := NewBackwardsInvocation(dify_invocation.INVOKE_TYPE_MODERATION, "", nil, nil, nil)
 	if err := checkPermission(&all_permitted_runtime, invoke_moderation_request); err != nil {
 		t.Errorf("checkPermission failed: %s", err.Error())
 	}
 
-	invoke_tool_request := NewBackwardsInvocation(dify_invocation.INVOKE_TYPE_TOOL, "", nil, nil)
+	invoke_tool_request := NewBackwardsInvocation(dify_invocation.INVOKE_TYPE_TOOL, "", nil, nil, nil)
 	if err := checkPermission(&all_permitted_runtime, invoke_tool_request); err != nil {
 		t.Errorf("checkPermission failed: %s", err.Error())
 	}
 
-	invoke_node_request := NewBackwardsInvocation(dify_invocation.INVOKE_TYPE_NODE, "", nil, nil)
+	invoke_node_request := NewBackwardsInvocation(dify_invocation.INVOKE_TYPE_NODE, "", nil, nil, nil)
 	if err := checkPermission(&all_permitted_runtime, invoke_node_request); err != nil {
 		t.Errorf("checkPermission failed: %s", err.Error())
 	}
@@ -108,49 +107,49 @@ func TestBackwardsInvocationAllPermittedPermission(t *testing.T) {
 
 func TestBackwardsInvocationAllDeniedPermission(t *testing.T) {
 	all_denied_runtime := TPluginRuntime{
-		PluginRuntime: entities.PluginRuntime{
+		PluginRuntime: plugin_entities.PluginRuntime{
 			Config: plugin_entities.PluginDeclaration{
 				Resource: plugin_entities.PluginResourceRequirement{},
 			},
 		},
 	}
 
-	invoke_llm_request := NewBackwardsInvocation(dify_invocation.INVOKE_TYPE_LLM, "", nil, nil)
+	invoke_llm_request := NewBackwardsInvocation(dify_invocation.INVOKE_TYPE_LLM, "", nil, nil, nil)
 	if err := checkPermission(&all_denied_runtime, invoke_llm_request); err == nil {
 		t.Errorf("checkPermission failed: expected error, got nil")
 	}
 
-	invoke_text_embedding_request := NewBackwardsInvocation(dify_invocation.INVOKE_TYPE_TEXT_EMBEDDING, "", nil, nil)
+	invoke_text_embedding_request := NewBackwardsInvocation(dify_invocation.INVOKE_TYPE_TEXT_EMBEDDING, "", nil, nil, nil)
 	if err := checkPermission(&all_denied_runtime, invoke_text_embedding_request); err == nil {
 		t.Errorf("checkPermission failed: expected error, got nil")
 	}
 
-	invoke_rerank_request := NewBackwardsInvocation(dify_invocation.INVOKE_TYPE_RERANK, "", nil, nil)
+	invoke_rerank_request := NewBackwardsInvocation(dify_invocation.INVOKE_TYPE_RERANK, "", nil, nil, nil)
 	if err := checkPermission(&all_denied_runtime, invoke_rerank_request); err == nil {
 		t.Errorf("checkPermission failed: expected error, got nil")
 	}
 
-	invoke_tts_request := NewBackwardsInvocation(dify_invocation.INVOKE_TYPE_TTS, "", nil, nil)
+	invoke_tts_request := NewBackwardsInvocation(dify_invocation.INVOKE_TYPE_TTS, "", nil, nil, nil)
 	if err := checkPermission(&all_denied_runtime, invoke_tts_request); err == nil {
 		t.Errorf("checkPermission failed: expected error, got nil")
 	}
 
-	invoke_speech2text_request := NewBackwardsInvocation(dify_invocation.INVOKE_TYPE_SPEECH2TEXT, "", nil, nil)
+	invoke_speech2text_request := NewBackwardsInvocation(dify_invocation.INVOKE_TYPE_SPEECH2TEXT, "", nil, nil, nil)
 	if err := checkPermission(&all_denied_runtime, invoke_speech2text_request); err == nil {
 		t.Errorf("checkPermission failed: expected error, got nil")
 	}
 
-	invoke_moderation_request := NewBackwardsInvocation(dify_invocation.INVOKE_TYPE_MODERATION, "", nil, nil)
+	invoke_moderation_request := NewBackwardsInvocation(dify_invocation.INVOKE_TYPE_MODERATION, "", nil, nil, nil)
 	if err := checkPermission(&all_denied_runtime, invoke_moderation_request); err == nil {
 		t.Errorf("checkPermission failed: expected error, got nil")
 	}
 
-	invoke_tool_request := NewBackwardsInvocation(dify_invocation.INVOKE_TYPE_TOOL, "", nil, nil)
+	invoke_tool_request := NewBackwardsInvocation(dify_invocation.INVOKE_TYPE_TOOL, "", nil, nil, nil)
 	if err := checkPermission(&all_denied_runtime, invoke_tool_request); err == nil {
 		t.Errorf("checkPermission failed: expected error, got nil")
 	}
 
-	invoke_node_request := NewBackwardsInvocation(dify_invocation.INVOKE_TYPE_NODE, "", nil, nil)
+	invoke_node_request := NewBackwardsInvocation(dify_invocation.INVOKE_TYPE_NODE, "", nil, nil, nil)
 	if err := checkPermission(&all_denied_runtime, invoke_node_request); err == nil {
 		t.Errorf("checkPermission failed: expected error, got nil")
 	}

+ 4 - 0
internal/core/plugin_daemon/backwards_invocation/transaction/aws_event_handler.go

@@ -0,0 +1,4 @@
+package transaction
+
+type AWSEventHandler struct {
+}

+ 29 - 0
internal/core/plugin_daemon/backwards_invocation/transaction/aws_event_writer.go

@@ -0,0 +1,29 @@
+package transaction
+
+import (
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/backwards_invocation"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
+)
+
+// AWSTransactionWriter is a writer that implements the backwards_invocation.BackwardsInvocationWriter interface
+// it is used to write data to the plugin runtime
+type AWSTransactionWriter struct {
+	event_id string
+
+	backwards_invocation.BackwardsInvocationWriter
+}
+
+// NewAWSTransactionWriter creates a new transaction writer
+func NewAWSTransactionWriter(event_id string) *AWSTransactionWriter {
+	return &AWSTransactionWriter{
+		event_id: event_id,
+	}
+}
+
+func (w *AWSTransactionWriter) Write(event session_manager.PLUGIN_IN_STREAM_EVENT, data any) {
+
+}
+
+func (w *AWSTransactionWriter) Done() {
+
+}

+ 27 - 0
internal/core/plugin_daemon/backwards_invocation/transaction/full_duplex_event_writer.go

@@ -0,0 +1,27 @@
+package transaction
+
+import (
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/backwards_invocation"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
+)
+
+// FullDuplexTransactionWriter is a writer that implements the backwards_invocation.BackwardsInvocationWriter interface
+// write data into session
+type FullDuplexTransactionWriter struct {
+	session *session_manager.Session
+
+	backwards_invocation.BackwardsInvocationWriter
+}
+
+func NewFullDuplexEventWriter(session *session_manager.Session) *FullDuplexTransactionWriter {
+	return &FullDuplexTransactionWriter{
+		session: session,
+	}
+}
+
+func (w *FullDuplexTransactionWriter) Write(event session_manager.PLUGIN_IN_STREAM_EVENT, data any) {
+	w.session.Write(event, data)
+}
+
+func (w *FullDuplexTransactionWriter) Done() {
+}

+ 9 - 1
internal/core/plugin_daemon/generic.go

@@ -5,6 +5,7 @@ import (
 
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/backwards_invocation"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/backwards_invocation/transaction"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
@@ -45,7 +46,14 @@ func genericInvokePlugin[Req any, Rsp any](
 				response.Write(chunk)
 			}
 		case plugin_entities.SESSION_MESSAGE_TYPE_INVOKE:
-			if err := backwards_invocation.InvokeDify(runtime, typ, session, chunk.Data); err != nil {
+			// check if the request contains a aws_event_id
+			var writer backwards_invocation.BackwardsInvocationWriter
+			if chunk.RuntimeType == plugin_entities.PLUGIN_RUNTIME_TYPE_AWS {
+				writer = transaction.NewAWSTransactionWriter(chunk.ServerlessEventId)
+			} else {
+				writer = transaction.NewFullDuplexEventWriter(session)
+			}
+			if err := backwards_invocation.InvokeDify(runtime, typ, session, writer, chunk.Data); err != nil {
 				log.Error("invoke dify failed: %s", err.Error())
 				return
 			}

+ 0 - 576
internal/core/plugin_manager/aws_manager/full_duplex_simulator.go

@@ -1,576 +0,0 @@
-package aws_manager
-
-import (
-	"context"
-	"errors"
-	"fmt"
-	"io"
-	"net"
-	"net/http"
-	"net/url"
-	"sync"
-	"sync/atomic"
-	"time"
-
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/strings"
-)
-
-// Full duplex simulator, using http protocol to simulate the full duplex communication
-// 1. during free time, no connection will be established
-// 2. when there is a virtual connection need to be established, 2 http transactions will be sent to the server
-// 3. one is used to send data chunk by chunk to simulate the data stream and the other is used to receive data using event stream
-// 4. after all data is sent, the connection will be closed to reduce the network traffic
-//
-// When http connection is closed, the simulator will restart it immediately until it has reached max_retries
-type FullDuplexSimulator struct {
-	baseurl *url.URL
-
-	// single connection max alive time
-	sending_connection_max_alive_time          time.Duration
-	receiving_connection_max_alive_time        time.Duration
-	target_sending_connection_max_alive_time   time.Duration
-	target_receiving_connection_max_alive_time time.Duration
-
-	// how many transactions are alive
-	alive_transactions int32
-
-	// total transactions
-	total_transactions int32
-
-	// connection restarts
-	connection_restarts int32
-
-	// sent bytes
-	sent_bytes                 int64
-	current_request_sent_bytes int32
-	// received bytes
-	received_bytes int64
-
-	// sending_connection_timeline_lock
-	sending_connection_timeline_lock sync.Mutex
-	// sending pipeline
-	sending_pipeline *io.PipeWriter
-	// sending pipe lock
-	sending_pipe_lock sync.RWMutex
-
-	// receiving_connection_timeline_lock
-	receiving_connection_timeline_lock sync.Mutex
-	// receiving context
-	receiving_cancel context.CancelFunc
-	// receiving context lock
-	receiving_cancel_lock sync.Mutex
-
-	// max retries
-	max_retries int
-	// max sending single request sending bytes
-	max_sending_bytes int32
-	// max receiving single request receiving bytes
-	max_receiving_bytes int32
-
-	// request id
-	request_id string
-
-	// latest routine id
-	latest_routine_id string
-
-	// is sending connection alive
-	sending_connection_alive         int32
-	sending_routine_lock             sync.Mutex
-	sending_lock                     sync.Mutex
-	virtual_sending_connection_alive int32
-
-	// receiving routine lock
-	receiving_routine_lock sync.Mutex
-	// is receiving connection alive
-	virtual_receiving_connection_alive int32
-
-	// listener for data
-	listeners []func(data []byte)
-
-	// mutex for listeners
-	listeners_lock sync.RWMutex
-
-	// http client
-	client *http.Client
-}
-
-type FullDuplexSimulatorOption struct {
-	// MaxRetries, default 10
-	MaxRetries int
-	// SendingConnectionMaxAliveTime, default 60s
-	SendingConnectionMaxAliveTime time.Duration
-	// TargetSendingConnectionMaxAliveTime, default 80s
-	TargetSendingConnectionMaxAliveTime time.Duration
-	// ReceivingConnectionMaxAliveTime, default 80s
-	ReceivingConnectionMaxAliveTime time.Duration
-	// TargetReceivingConnectionMaxAliveTime, default 60s
-	TargetReceivingConnectionMaxAliveTime time.Duration
-	// MaxSingleRequestSendingBytes, default 5 * 1024 * 1024
-	MaxSingleRequestSendingBytes int32
-	// MaxSingleRequestReceivingBytes, default 5 * 1024 * 1024
-	MaxSingleRequestReceivingBytes int32
-}
-
-func (opt *FullDuplexSimulatorOption) defaultOption() error {
-	if opt.MaxRetries == 0 {
-		opt.MaxRetries = 10
-	}
-
-	if opt.SendingConnectionMaxAliveTime == 0 {
-		opt.SendingConnectionMaxAliveTime = 60 * time.Second
-	}
-
-	if opt.ReceivingConnectionMaxAliveTime == 0 {
-		opt.ReceivingConnectionMaxAliveTime = 80 * time.Second
-	}
-
-	if opt.TargetSendingConnectionMaxAliveTime == 0 {
-		opt.TargetSendingConnectionMaxAliveTime = 80 * time.Second
-	}
-
-	if opt.TargetReceivingConnectionMaxAliveTime == 0 {
-		opt.TargetReceivingConnectionMaxAliveTime = 60 * time.Second
-	}
-
-	if opt.MaxSingleRequestSendingBytes == 0 {
-		opt.MaxSingleRequestSendingBytes = 5 * 1024 * 1024
-	}
-
-	if opt.MaxSingleRequestReceivingBytes == 0 {
-		opt.MaxSingleRequestReceivingBytes = 5 * 1024 * 1024
-	}
-
-	// target receiving connection max alive time should be larger than receiving connection max alive time
-	if opt.TargetReceivingConnectionMaxAliveTime < opt.ReceivingConnectionMaxAliveTime {
-		return errors.New("target receiving connection max alive time should be larger than receiving connection max alive time")
-	}
-
-	// sending connection max alive time should be larger than target sending connection max alive time
-	if opt.SendingConnectionMaxAliveTime < opt.TargetSendingConnectionMaxAliveTime {
-		return errors.New("sending connection max alive time should be larger than target sending connection max alive time")
-	}
-
-	return nil
-}
-
-func NewFullDuplexSimulator(
-	baseurl string,
-	opt *FullDuplexSimulatorOption,
-) (*FullDuplexSimulator, error) {
-	u, err := url.Parse(baseurl)
-	if err != nil {
-		return nil, err
-	}
-
-	if opt == nil {
-		opt = &FullDuplexSimulatorOption{}
-	}
-
-	if err := opt.defaultOption(); err != nil {
-		return nil, err
-	}
-
-	return &FullDuplexSimulator{
-		baseurl:                                    u,
-		sending_connection_max_alive_time:          opt.SendingConnectionMaxAliveTime,
-		target_sending_connection_max_alive_time:   opt.TargetSendingConnectionMaxAliveTime,
-		receiving_connection_max_alive_time:        opt.ReceivingConnectionMaxAliveTime,
-		target_receiving_connection_max_alive_time: opt.TargetReceivingConnectionMaxAliveTime,
-		max_sending_bytes:                          opt.MaxSingleRequestSendingBytes,
-		max_receiving_bytes:                        opt.MaxSingleRequestReceivingBytes,
-		max_retries:                                opt.MaxRetries,
-		request_id:                                 strings.RandomString(32),
-
-		// using keep alive to reduce the connection reset
-		client: &http.Client{
-			Transport: &http.Transport{
-				Dial: (&net.Dialer{
-					Timeout:   5 * time.Second,
-					KeepAlive: 15 * time.Second,
-				}).Dial,
-				IdleConnTimeout: 120 * time.Second,
-			},
-		},
-	}, nil
-}
-
-// send data to server, it's thread-safe
-func (s *FullDuplexSimulator) Send(data []byte, timeout ...time.Duration) error {
-	s.sending_lock.Lock()
-	defer s.sending_lock.Unlock()
-
-	// split data into max 10*1024 bytes
-	for len(data) > 0 {
-		chunk := data
-		if len(chunk) > 10*1024 {
-			chunk = chunk[:10*1024]
-		}
-
-		data = data[len(chunk):]
-		if err := s.send(chunk, timeout...); err != nil {
-			return err
-		}
-	}
-
-	return nil
-}
-
-func (s *FullDuplexSimulator) send(data []byte, timeout ...time.Duration) error {
-	started := time.Now()
-
-	timeout_duration := time.Second * 10
-	if len(timeout) > 0 {
-		timeout_duration = timeout[0]
-	}
-
-	for time.Since(started) < timeout_duration {
-		if atomic.LoadInt32(&s.sending_connection_alive) != 1 {
-			time.Sleep(time.Millisecond * 50)
-			continue
-		}
-
-		if atomic.AddInt32(&s.current_request_sent_bytes, int32(len(data))) > s.max_sending_bytes {
-			// reached max sending bytes, close current connection, and start a new one
-			s.sending_pipe_lock.Lock()
-			if s.sending_pipeline != nil {
-				s.sending_pipeline.Close()
-			}
-			s.sending_pipe_lock.Unlock()
-			atomic.StoreInt32(&s.current_request_sent_bytes, 0)
-			continue
-		}
-
-		s.sending_pipe_lock.Lock()
-		writer := s.sending_pipeline
-		if writer == nil {
-			time.Sleep(time.Millisecond * 50)
-			s.sending_pipe_lock.Unlock()
-			continue
-		}
-
-		if n, err := writer.Write(data); err != nil {
-			time.Sleep(time.Millisecond * 50)
-			s.sending_pipe_lock.Unlock()
-			continue
-		} else {
-			atomic.AddInt64(&s.sent_bytes, int64(n))
-			atomic.AddInt32(&s.current_request_sent_bytes, int32(n))
-		}
-
-		s.sending_pipe_lock.Unlock()
-		break
-	}
-
-	if time.Since(started) > timeout_duration {
-		return errors.New("send data timeout")
-	}
-
-	return nil
-}
-
-func (s *FullDuplexSimulator) On(f func(data []byte)) {
-	s.listeners_lock.Lock()
-	defer s.listeners_lock.Unlock()
-	s.listeners = append(s.listeners, f)
-}
-
-// start a transaction
-// returns a function to stop the transaction
-func (s *FullDuplexSimulator) StartTransaction() (func(), error) {
-	// start a transaction
-	if atomic.AddInt32(&s.alive_transactions, 1) == 1 {
-		// increase connection restarts
-		atomic.AddInt32(&s.connection_restarts, 1)
-
-		// reset request id
-		routine_id := strings.RandomString(32)
-
-		// update latest request id
-		s.latest_routine_id = routine_id
-
-		// start sending connection
-		if err := s.startSendingConnection(routine_id); err != nil {
-			return nil, err
-		}
-
-		// start receiving connection
-		if err := s.startReceivingConnection(routine_id); err != nil {
-			s.stopSendingConnection()
-			return nil, err
-		}
-	}
-
-	atomic.AddInt32(&s.total_transactions, 1)
-
-	return s.stopTransaction, nil
-}
-
-func (s *FullDuplexSimulator) stopTransaction() {
-	// close if no transaction is alive
-	if atomic.AddInt32(&s.alive_transactions, -1) == 0 {
-		s.stopSendingConnection()
-		s.stopReceivingConnection()
-	}
-}
-
-func (s *FullDuplexSimulator) startSendingConnection(routine_id string) error {
-	// if virtual sending connection is already alive, do nothing
-	if !atomic.CompareAndSwapInt32(&s.virtual_sending_connection_alive, 0, 1) {
-		return nil
-	}
-
-	// lock the sending connection
-	s.sending_connection_timeline_lock.Lock()
-	defer s.sending_connection_timeline_lock.Unlock()
-
-	// start a new sending connection
-	u, err := url.JoinPath(s.baseurl.String(), "/invoke")
-	if err != nil {
-		return err
-	}
-
-	req, err := http.NewRequest("POST", u, nil)
-	if err != nil {
-		return err
-	}
-
-	req.Header.Set("Content-Type", "octet-stream")
-	req.Header.Set("Connection", "keep-alive")
-	req.Header.Set("x-dify-plugin-request-id", s.request_id)
-	req.Header.Set("x-dify-plugin-max-alive-time", fmt.Sprintf("%d", s.target_receiving_connection_max_alive_time.Milliseconds()))
-
-	routine.Submit(func() {
-		s.sendingConnectionRoutine(req, routine_id)
-	})
-
-	return nil
-}
-
-func (s *FullDuplexSimulator) sendingConnectionRoutine(origin_req *http.Request, routine_id string) {
-	// lock the sending routine, to avoid there are multiple routines trying to establish the sending connection
-	s.sending_routine_lock.Lock()
-
-	// cancel the sending routine
-	defer s.sending_routine_lock.Unlock()
-
-	failed_times := 0
-	for atomic.LoadInt32(&s.virtual_sending_connection_alive) == 1 {
-		// check if the request id is the latest one, avoid this routine being used by a old request
-		if routine_id != s.latest_routine_id {
-			return
-		}
-
-		ctx, cancel := context.WithCancel(context.Background())
-		time.AfterFunc(s.sending_connection_max_alive_time, func() {
-			// reached max alive time, remove pipe writer
-			s.sending_pipe_lock.Lock()
-			if s.sending_pipeline != nil {
-				s.sending_pipeline.Close()
-				s.sending_pipeline = nil
-			}
-			s.sending_pipe_lock.Unlock()
-			time.AfterFunc(time.Second, cancel)
-		})
-
-		req := origin_req.Clone(ctx)
-		pr, pw := io.Pipe()
-		s.sending_pipe_lock.Lock()
-		req.Body = pr
-		s.sending_pipeline = pw
-		s.sending_pipe_lock.Unlock()
-		req = req.WithContext(ctx)
-
-		// mark sending connection as alive
-		atomic.StoreInt32(&s.sending_connection_alive, 1)
-
-		resp, err := s.client.Do(req)
-		if err != nil {
-			atomic.StoreInt32(&s.sending_connection_alive, 0)
-
-			// if virtual sending connection is not alive, clear the sending pipeline and return
-			if atomic.LoadInt32(&s.virtual_sending_connection_alive) == 0 {
-				// clear the sending pipeline
-				s.sending_pipe_lock.Lock()
-				if s.sending_pipeline != nil {
-					s.sending_pipeline.Close()
-					s.sending_pipeline = nil
-				}
-				s.sending_pipe_lock.Unlock()
-				return
-			}
-
-			failed_times++
-			if failed_times > s.max_retries {
-				log.Error("failed to establish sending connection: %v", err)
-				s.stopSendingConnection()
-				return
-			}
-
-			log.Error("failed to establish sending connection: %v", err)
-		} else {
-			defer resp.Body.Close()
-		}
-
-		// mark sending connection as dead
-		atomic.StoreInt32(&s.sending_connection_alive, 0)
-
-		s.sending_pipe_lock.Lock()
-		// close the sending pipeline
-		if s.sending_pipeline != nil {
-			s.sending_pipeline.Close()
-			s.sending_pipeline = nil
-		}
-		s.sending_pipe_lock.Unlock()
-	}
-}
-
-func (s *FullDuplexSimulator) stopSendingConnection() error {
-	if !atomic.CompareAndSwapInt32(&s.virtual_sending_connection_alive, 1, 0) {
-		return nil
-	}
-
-	s.sending_connection_timeline_lock.Lock()
-	defer s.sending_connection_timeline_lock.Unlock()
-
-	s.sending_pipe_lock.Lock()
-	defer s.sending_pipe_lock.Unlock()
-
-	// close the sending pipeline
-	if s.sending_pipeline != nil {
-		s.sending_pipeline.Close()
-		s.sending_pipeline = nil
-	}
-
-	// mark sending connection as dead
-	atomic.StoreInt32(&s.virtual_sending_connection_alive, 0)
-
-	return nil
-}
-
-func (s *FullDuplexSimulator) startReceivingConnection(request_id string) error {
-	// if virtual receiving connection is already alive, do nothing
-	if !atomic.CompareAndSwapInt32(&s.virtual_receiving_connection_alive, 0, 1) {
-		return nil
-	}
-
-	// lock the receiving connection
-	s.receiving_connection_timeline_lock.Lock()
-	defer s.receiving_connection_timeline_lock.Unlock()
-
-	routine.Submit(func() {
-		s.receivingConnectionRoutine(request_id)
-	})
-
-	return nil
-}
-
-func (s *FullDuplexSimulator) receivingConnectionRoutine(routine_id string) {
-	// lock the receiving routine, to avoid there are multiple routines trying to establish the receiving connection
-	s.receiving_routine_lock.Lock()
-	// cancel the receiving routine
-	defer s.receiving_routine_lock.Unlock()
-
-	for atomic.LoadInt32(&s.virtual_receiving_connection_alive) == 1 {
-		// check if the request id is the latest one, avoid this routine being used by a old request
-		if routine_id != s.latest_routine_id {
-			return
-		}
-
-		recved_pong := false
-		buf := make([]byte, 0)
-		buf_len := 0
-
-		// start a new receiving connection
-		u, err := url.JoinPath(s.baseurl.String(), "/response")
-		if err != nil {
-			continue
-		}
-
-		req, err := http.NewRequest("GET", u, nil)
-		if err != nil {
-			continue
-		}
-		req.Header.Set("Content-Type", "octet-stream")
-		req.Header.Set("Connection", "keep-alive")
-		req.Header.Set("x-dify-plugin-request-id", s.request_id)
-		req.Header.Set("x-dify-plugin-max-alive-time", fmt.Sprintf("%d", s.target_sending_connection_max_alive_time.Milliseconds()))
-		req.Header.Set("x-dify-plugin-max-sending-bytes", fmt.Sprintf("%d", s.max_receiving_bytes))
-
-		ctx, cancel := context.WithCancel(context.Background())
-		req = req.Clone(ctx)
-		resp, err := s.client.Do(req)
-		if err != nil {
-			continue
-		}
-
-		s.receiving_cancel_lock.Lock()
-		s.receiving_cancel = cancel
-		s.receiving_cancel_lock.Unlock()
-
-		time.AfterFunc(s.receiving_connection_max_alive_time, func() {
-			cancel()
-			resp.Body.Close()
-		})
-
-		reader := resp.Body
-		for {
-			data := make([]byte, 1024)
-			n, err := reader.Read(data)
-			if n != 0 {
-				// check if pong\n is at the beginning of the data
-				if !recved_pong {
-					data = append(buf[:buf_len], data[:n]...)
-					buf = make([]byte, 0)
-					buf_len = 0
-
-					if n >= 5 {
-						if string(data[:5]) == "pong\n" {
-							recved_pong = true
-							// remove pong\n from the beginning of the data
-							data = data[5:]
-							n -= 5
-						} else {
-							// not pong\n, break
-							break
-						}
-					} else if n < 5 {
-						// save the data to the buffer
-						buf = append(buf, data[:n]...)
-						buf_len += n
-						continue
-					}
-				}
-			}
-
-			for _, listener := range s.listeners[:] {
-				listener(data[:n])
-			}
-
-			atomic.AddInt64(&s.received_bytes, int64(n))
-
-			if err != nil {
-				break
-			}
-		}
-	}
-}
-
-func (s *FullDuplexSimulator) stopReceivingConnection() {
-	if !atomic.CompareAndSwapInt32(&s.virtual_receiving_connection_alive, 1, 0) {
-		return
-	}
-
-	// cancel the receiving context
-	s.receiving_cancel_lock.Lock()
-	if s.receiving_cancel != nil {
-		s.receiving_cancel()
-	}
-	s.receiving_cancel_lock.Unlock()
-}
-
-// GetStats, returns the sent and received bytes
-func (s *FullDuplexSimulator) GetStats() (sent_bytes, received_bytes int64, connection_restarts int32) {
-	return atomic.LoadInt64(&s.sent_bytes), atomic.LoadInt64(&s.received_bytes), atomic.LoadInt32(&s.connection_restarts)
-}

+ 0 - 512
internal/core/plugin_manager/aws_manager/full_duplex_simulator_test.go

@@ -1,512 +0,0 @@
-package aws_manager
-
-import (
-	"bytes"
-	"fmt"
-	"net/http"
-	"strconv"
-	"strings"
-	"sync"
-	"sync/atomic"
-	"testing"
-	"time"
-
-	"github.com/gin-gonic/gin"
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/debugging"
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/network"
-	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
-)
-
-func init() {
-	routine.InitPool(1024)
-}
-
-type S struct {
-	srv  *http.Server
-	url  string
-	port int
-
-	send_count          int32
-	recv_buffered_count int32
-	recv_count          int32
-
-	send_request int32
-	recv_request int32
-
-	data_mu sync.Mutex
-	data    map[string]chan []byte
-
-	current_recv_request_id string
-	current_send_request_id string
-}
-
-func (s *S) Stop() {
-	s.srv.Close()
-}
-
-func server() (*S, error) {
-	port, err := network.GetRandomPort()
-	if err != nil {
-		return nil, err
-	}
-
-	eng := gin.New()
-	srv := &http.Server{
-		Addr:    fmt.Sprintf(":%d", port),
-		Handler: eng,
-	}
-
-	s := &S{
-		srv:  srv,
-		url:  fmt.Sprintf("http://localhost:%d", port),
-		data: make(map[string]chan []byte),
-
-		send_count: 0,
-		recv_count: 0,
-	}
-
-	// avoid log
-	gin.SetMode(gin.ReleaseMode)
-
-	eng.POST("/invoke", func(c *gin.Context) {
-		atomic.AddInt32(&s.send_request, 1)
-		defer atomic.AddInt32(&s.send_request, -1)
-
-		id := c.Request.Header.Get("x-dify-plugin-request-id")
-		max_alive_time := c.Request.Header.Get("x-dify-plugin-max-alive-time")
-		s.current_send_request_id = id
-
-		var ch chan []byte
-
-		s.data_mu.Lock()
-		if _, ok := s.data[id]; !ok {
-			ch = make(chan []byte)
-			s.data[id] = ch
-		} else {
-			ch = s.data[id]
-		}
-		s.data_mu.Unlock()
-
-		timeout, err := strconv.ParseInt(max_alive_time, 10, 64)
-		if err != nil {
-			timeout = 60
-		}
-
-		time.AfterFunc(time.Millisecond*time.Duration(timeout), func() {
-			c.Request.Body.Close()
-		})
-
-		// read data asynchronously
-		for {
-			buf := make([]byte, 1024)
-			n, err := c.Request.Body.Read(buf)
-			if n != 0 {
-				atomic.AddInt32(&s.recv_buffered_count, int32(n))
-				ch <- buf[:n]
-				atomic.AddInt32(&s.recv_count, int32(n))
-			}
-			if err != nil {
-				break
-			}
-		}
-
-		// output closed
-		c.Writer.WriteHeader(http.StatusOK)
-		c.Writer.Write([]byte("closed\n"))
-		c.Writer.Flush()
-	})
-
-	eng.GET("/response", func(ctx *gin.Context) {
-		atomic.AddInt32(&s.recv_request, 1)
-		defer atomic.AddInt32(&s.recv_request, -1)
-
-		// fmt.Println("new recv request")
-		id := ctx.Request.Header.Get("x-dify-plugin-request-id")
-		max_alive_time := ctx.Request.Header.Get("x-dify-plugin-max-alive-time")
-		max_sending_bytes := ctx.Request.Header.Get("x-dify-plugin-max-sending-bytes")
-		max_sending_bytes_int, err := strconv.ParseInt(max_sending_bytes, 10, 64)
-		if err != nil {
-			max_sending_bytes_int = 1024 * 1024
-		}
-
-		sent_bytes := int32(0)
-
-		s.current_recv_request_id = id
-
-		var ch chan []byte
-		s.data_mu.Lock()
-		if _, ok := s.data[id]; ok {
-			ch = s.data[id]
-		} else {
-			ch = make(chan []byte)
-			s.data[id] = ch
-		}
-		s.data_mu.Unlock()
-
-		ctx.Writer.WriteHeader(http.StatusOK)
-		ctx.Writer.Header().Set("Content-Type", "application/octet-stream")
-		ctx.Writer.Header().Set("Transfer-Encoding", "chunked")
-		ctx.Writer.Header().Set("Connection", "keep-alive")
-		ctx.Writer.Write([]byte("pong\n"))
-		ctx.Writer.Flush()
-
-		timeout, err := strconv.ParseInt(max_alive_time, 10, 64)
-		if err != nil {
-			timeout = 60
-		}
-
-		timer := time.NewTimer(time.Millisecond * time.Duration(timeout))
-
-		for {
-			select {
-			case data := <-ch:
-				if sent_bytes+int32(len(data)) > int32(max_sending_bytes_int) {
-					ctx.Writer.Write(data)
-					ctx.Writer.Flush()
-					ctx.Status(http.StatusOK)
-					return
-				}
-
-				ctx.Writer.Write(data)
-				ctx.Writer.Flush()
-				atomic.AddInt32(&s.send_count, int32(len(data)))
-			case <-ctx.Done():
-				return
-			case <-ctx.Writer.CloseNotify():
-				return
-			case <-timer.C:
-				ctx.Status(http.StatusOK)
-				return
-			}
-		}
-	})
-
-	go func() {
-		srv.ListenAndServe()
-	}()
-
-	return s, nil
-}
-
-func TestFullDuplexSimulator_SingleSendAndReceive(t *testing.T) {
-	log.SetShowLog(false)
-	defer log.SetShowLog(true)
-
-	srv, err := server()
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer srv.Stop()
-
-	time.Sleep(time.Second)
-
-	simulator, err := NewFullDuplexSimulator(
-		srv.url, &FullDuplexSimulatorOption{
-			SendingConnectionMaxAliveTime:         time.Second * 100,
-			ReceivingConnectionMaxAliveTime:       time.Second * 100,
-			TargetSendingConnectionMaxAliveTime:   time.Second * 99,
-			TargetReceivingConnectionMaxAliveTime: time.Second * 101,
-			MaxSingleRequestSendingBytes:          1024 * 1024,
-			MaxSingleRequestReceivingBytes:        1024 * 1024,
-		},
-	)
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	recved := make([]byte, 0)
-
-	simulator.On(func(data []byte) {
-		if len(bytes.TrimSpace(data)) == 0 {
-			return
-		}
-
-		recved = append(recved, data...)
-	})
-
-	if done, err := simulator.StartTransaction(); err != nil {
-		t.Fatal(err)
-	} else {
-		defer done()
-	}
-
-	if err := simulator.Send([]byte("hello\n")); err != nil {
-		t.Fatal(err)
-	}
-	if err := simulator.Send([]byte("world\n")); err != nil {
-		t.Fatal(err)
-	}
-
-	time.Sleep(time.Millisecond * 500)
-
-	if string(recved) != "hello\nworld\n" {
-		t.Fatal(fmt.Sprintf("recved: %s", string(recved)))
-	}
-}
-
-func TestFullDuplexSimulator_AutoReconnect(t *testing.T) {
-	log.SetShowLog(false)
-	defer log.SetShowLog(true)
-
-	// hmmm, to ensure the server is stable, we need to run the test 100 times
-	// don't ask me why, just trust me, I have spent 1 days to correctly handle this race condition
-	wg := sync.WaitGroup{}
-	wg.Add(100)
-	for i := 0; i < 100; i++ {
-		go func() {
-			defer wg.Done()
-
-			srv, err := server()
-			if err != nil {
-				t.Fatal(err)
-			}
-			defer srv.Stop()
-
-			time.Sleep(time.Second)
-
-			simulator, err := NewFullDuplexSimulator(
-				srv.url, &FullDuplexSimulatorOption{
-					SendingConnectionMaxAliveTime:         time.Millisecond * 700,
-					TargetSendingConnectionMaxAliveTime:   time.Millisecond * 700,
-					ReceivingConnectionMaxAliveTime:       time.Millisecond * 10000,
-					TargetReceivingConnectionMaxAliveTime: time.Millisecond * 10000,
-					MaxSingleRequestSendingBytes:          1024 * 1024,
-					MaxSingleRequestReceivingBytes:        1024 * 1024,
-				},
-			)
-			if err != nil {
-				t.Fatal(err)
-			}
-
-			l := 0
-			recved := strings.Builder{}
-			simulator.On(func(data []byte) {
-				l += len(data)
-				recved.Write(data)
-			})
-
-			done, err := simulator.StartTransaction()
-			if err != nil {
-				t.Fatal(err)
-			}
-			defer done()
-
-			ticker := time.NewTicker(time.Millisecond * 1)
-			counter := 0
-
-			for range ticker.C {
-				if err := simulator.Send([]byte(fmt.Sprintf("%05d", counter))); err != nil {
-					t.Fatal(err)
-				}
-				counter++
-				if counter == 3000 {
-					break
-				}
-			}
-
-			time.Sleep(time.Millisecond * 500)
-
-			if l != 3000*5 {
-				sent, received, restarts := simulator.GetStats()
-				t.Errorf(fmt.Sprintf("expected: %d, actual: %d, sent: %d, received: %d, restarts: %d", 3000*5, l, sent, received, restarts))
-				server_recv_count := srv.recv_count
-				server_send_count := srv.send_count
-				t.Errorf(fmt.Sprintf("server recv count: %d, server send count: %d", server_recv_count, server_send_count))
-				// to find which one is missing
-				// for i := 0; i < 3000; i++ {
-				// 	if !strings.Contains(recved.String(), fmt.Sprintf("%05d", i)) {
-				// 		t.Errorf(fmt.Sprintf("missing: %d", i))
-				// 	}
-				// }
-			}
-		}()
-	}
-
-	wg.Wait()
-}
-
-func TestFullDuplexSimulator_MultipleTransactions(t *testing.T) {
-	log.SetShowLog(false)
-	defer log.SetShowLog(true)
-
-	// avoid too many test cases, it will cause too many goroutines
-	// finally, os will run into busy, and requests can not be handled correctly in time
-	const NUM_CASES = 30
-
-	w := sync.WaitGroup{}
-	w.Add(NUM_CASES)
-
-	for j := 0; j < NUM_CASES; j++ {
-		// j := j
-		go func() {
-			defer w.Done()
-
-			srv, err := server()
-			if err != nil {
-				t.Fatal(err)
-			}
-			defer srv.Stop()
-
-			time.Sleep(time.Second)
-
-			simulator, err := NewFullDuplexSimulator(
-				srv.url, &FullDuplexSimulatorOption{
-					SendingConnectionMaxAliveTime:         time.Millisecond * 700,
-					TargetSendingConnectionMaxAliveTime:   time.Millisecond * 700,
-					ReceivingConnectionMaxAliveTime:       time.Millisecond * 1000,
-					TargetReceivingConnectionMaxAliveTime: time.Millisecond * 1000,
-					MaxSingleRequestSendingBytes:          1024 * 1024,
-					MaxSingleRequestReceivingBytes:        1024 * 1024,
-				},
-			)
-			if err != nil {
-				t.Fatal(err)
-			}
-
-			l := int32(0)
-
-			dones := make(map[int]func())
-			dones_lock := sync.Mutex{}
-
-			buf := bytes.Buffer{}
-			simulator.On(func(data []byte) {
-				debugging.PossibleBlocking(
-					func() any {
-						atomic.AddInt32(&l, int32(len(data)))
-
-						buf.Write(data)
-
-						bytes := buf.Bytes()
-						buf.Reset()
-
-						i := 0
-						for i < len(bytes) {
-							num, err := strconv.Atoi(string(bytes[i : i+5]))
-							if err != nil {
-								t.Fatalf("invalid data: %s", string(bytes))
-							}
-
-							dones_lock.Lock()
-
-							if done, ok := dones[num]; ok {
-								done()
-							} else {
-								t.Fatalf("done not found: %d", num)
-							}
-
-							dones_lock.Unlock()
-
-							i += 5
-						}
-
-						if buf.Len() != i {
-							// write the rest of the data
-							b := make([]byte, len(bytes)-i)
-							copy(b, bytes[i:])
-							buf.Write(b)
-						}
-
-						return nil
-					},
-					time.Second*1,
-					func() {
-						t.Fatal("possible blocking triggered")
-					},
-				)
-			})
-
-			wg := sync.WaitGroup{}
-			wg.Add(100)
-
-			for i := 0; i < 100; i++ {
-				i := i
-				time.Sleep(time.Millisecond * 20)
-				go func() {
-					done, err := simulator.StartTransaction()
-					if err != nil {
-						t.Fatal(err)
-					}
-
-					dones_lock.Lock()
-					dones[i] = func() {
-						done()
-						wg.Done()
-					}
-					dones_lock.Unlock()
-
-					if err := simulator.Send([]byte(fmt.Sprintf("%05d", i))); err != nil {
-						t.Fatal(err)
-					}
-				}()
-			}
-
-			// time.AfterFunc(time.Second*5, func() {
-			// 	// fmt.Println("server recv count: ", srv.recv_count, "server send count: ", srv.send_count, "j: ", j,
-			// 	// 	"server recv request: ", srv.recv_request, "server send request: ", srv.send_request)
-			// 	// fmt.Println("current_recv_request_id: ", srv.current_recv_request_id, "current_send_request_id: ", srv.current_send_request_id)
-			// })
-
-			wg.Wait()
-
-			if l != 100*5 {
-				sent, received, restarts := simulator.GetStats()
-				t.Errorf(fmt.Sprintf("expected: %d, actual: %d, sent: %d, received: %d, restarts: %d", 100*5, l, sent, received, restarts))
-			}
-		}()
-	}
-
-	w.Wait()
-}
-
-func TestFullDuplexSimulator_SendLargeData(t *testing.T) {
-	log.SetShowLog(false)
-	defer log.SetShowLog(true)
-
-	srv, err := server()
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer srv.Stop()
-
-	time.Sleep(time.Second)
-
-	l := 0
-
-	simulator, err := NewFullDuplexSimulator(
-		srv.url, &FullDuplexSimulatorOption{
-			SendingConnectionMaxAliveTime:         time.Millisecond * 70000,
-			TargetSendingConnectionMaxAliveTime:   time.Millisecond * 70000,
-			ReceivingConnectionMaxAliveTime:       time.Millisecond * 100000,
-			TargetReceivingConnectionMaxAliveTime: time.Millisecond * 100000,
-			MaxSingleRequestSendingBytes:          5 * 1024 * 1024,
-			MaxSingleRequestReceivingBytes:        5 * 1024 * 1024,
-		},
-	)
-
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	simulator.On(func(data []byte) {
-		l += len(data)
-	})
-
-	done, err := simulator.StartTransaction()
-	if err != nil {
-		t.Fatal(err)
-	}
-	defer done()
-
-	for i := 0; i < 300; i++ { // 300MB, this process should be done in 20 seconds
-		if err := simulator.Send([]byte(strings.Repeat("a", 1024*1024))); err != nil {
-			t.Fatal(err)
-		}
-	}
-
-	time.Sleep(time.Second * 5)
-
-	if l != 300*1024*1024 { // 300MB
-		t.Fatal(fmt.Sprintf("expected: %d, actual: %d", 300*1024*1024, l))
-	}
-}

+ 3 - 3
internal/core/plugin_manager/aws_manager/packager.go

@@ -14,16 +14,16 @@ import (
 
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/aws_manager/dockerfile"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
-	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/tmpfile"
 )
 
 type Packager struct {
-	runtime entities.PluginRuntimeInterface
+	runtime plugin_entities.PluginRuntimeInterface
 	decoder decoder.PluginDecoder
 }
 
-func NewPackager(runtime entities.PluginRuntimeInterface, decoder decoder.PluginDecoder) *Packager {
+func NewPackager(runtime plugin_entities.PluginRuntimeInterface, decoder decoder.PluginDecoder) *Packager {
 	return &Packager{
 		runtime: runtime,
 		decoder: decoder,

+ 4 - 4
internal/core/plugin_manager/aws_manager/packager_test.go

@@ -19,7 +19,7 @@ import (
 )
 
 type TPluginRuntime struct {
-	entities.PluginRuntime
+	plugin_entities.PluginRuntime
 	positive_manager.PositivePluginRuntime
 }
 
@@ -39,8 +39,8 @@ func (r *TPluginRuntime) StartPlugin() error {
 	return nil
 }
 
-func (r *TPluginRuntime) Type() entities.PluginRuntimeType {
-	return entities.PLUGIN_RUNTIME_TYPE_LOCAL
+func (r *TPluginRuntime) Type() plugin_entities.PluginRuntimeType {
+	return plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL
 }
 
 func (r *TPluginRuntime) Wait() (<-chan bool, error) {
@@ -97,7 +97,7 @@ func TestPackager_Pack(t *testing.T) {
 	}
 
 	packager := NewPackager(&TPluginRuntime{
-		PluginRuntime: entities.PluginRuntime{
+		PluginRuntime: plugin_entities.PluginRuntime{
 			Config: plugin_entities.PluginDeclaration{
 				Meta: plugin_entities.PluginMeta{
 					Runner: plugin_entities.PluginRunner{

+ 3 - 3
internal/core/plugin_manager/aws_manager/run.go

@@ -1,6 +1,6 @@
 package aws_manager
 
-import "github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+import "github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 
 func (r *AWSPluginRuntime) StartPlugin() error {
 	return nil
@@ -10,6 +10,6 @@ func (r *AWSPluginRuntime) Wait() (<-chan bool, error) {
 	return nil, nil
 }
 
-func (r *AWSPluginRuntime) Type() entities.PluginRuntimeType {
-	return entities.PLUGIN_RUNTIME_TYPE_AWS
+func (r *AWSPluginRuntime) Type() plugin_entities.PluginRuntimeType {
+	return plugin_entities.PLUGIN_RUNTIME_TYPE_AWS
 }

+ 2 - 2
internal/core/plugin_manager/aws_manager/type.go

@@ -5,13 +5,13 @@ import (
 	"net/http"
 
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
-	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/mapping"
 )
 
 type AWSPluginRuntime struct {
 	positive_manager.PositivePluginRuntime
-	entities.PluginRuntime
+	plugin_entities.PluginRuntime
 
 	// access url for the lambda function
 	lambda_url  string

+ 3 - 3
internal/core/plugin_manager/init.go

@@ -1,12 +1,12 @@
 package plugin_manager
 
 import (
-	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 )
 
-func (m *PluginManager) checkPluginExist(identity string) (entities.PluginRuntimeInterface, bool) {
+func (m *PluginManager) checkPluginExist(identity string) (plugin_entities.PluginRuntimeInterface, bool) {
 	if v, ok := m.m.Load(identity); ok {
-		return v.(entities.PluginRuntimeInterface), true
+		return v.(plugin_entities.PluginRuntimeInterface), true
 	}
 
 	return nil, false

+ 2 - 2
internal/core/plugin_manager/lifetime.go

@@ -4,11 +4,11 @@ import (
 	"time"
 
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
-	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 )
 
-func (p *PluginManager) lifetime(config *app.Config, r entities.PluginRuntimeInterface) {
+func (p *PluginManager) lifetime(config *app.Config, r plugin_entities.PluginRuntimeInterface) {
 	configuration := r.Configuration()
 
 	log.Info("new plugin logged in: %s", configuration.Identity())

+ 3 - 3
internal/core/plugin_manager/local_manager/run.go

@@ -8,7 +8,7 @@ import (
 	"sync"
 
 	"github.com/langgenius/dify-plugin-daemon/internal/process"
-	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
 )
@@ -29,8 +29,8 @@ func (r *LocalPluginRuntime) init() {
 	r.SetLaunching()
 }
 
-func (r *LocalPluginRuntime) Type() entities.PluginRuntimeType {
-	return entities.PLUGIN_RUNTIME_TYPE_LOCAL
+func (r *LocalPluginRuntime) Type() plugin_entities.PluginRuntimeType {
+	return plugin_entities.PLUGIN_RUNTIME_TYPE_LOCAL
 }
 
 func (r *LocalPluginRuntime) StartPlugin() error {

+ 2 - 2
internal/core/plugin_manager/local_manager/type.go

@@ -2,12 +2,12 @@ package local_manager
 
 import (
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/positive_manager"
-	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 )
 
 type LocalPluginRuntime struct {
 	positive_manager.PositivePluginRuntime
-	entities.PluginRuntime
+	plugin_entities.PluginRuntime
 
 	wait_chan   chan bool
 	io_identity string

+ 7 - 7
internal/core/plugin_manager/manager.go

@@ -7,7 +7,7 @@ import (
 	"github.com/langgenius/dify-plugin-daemon/internal/cluster"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/dify_invocation"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
-	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 )
@@ -38,7 +38,7 @@ func GetGlobalPluginManager() *PluginManager {
 	return manager
 }
 
-func (p *PluginManager) Add(plugin entities.PluginRuntimeInterface) error {
+func (p *PluginManager) Add(plugin plugin_entities.PluginRuntimeInterface) error {
 	identity, err := plugin.Identity()
 	if err != nil {
 		return err
@@ -47,10 +47,10 @@ func (p *PluginManager) Add(plugin entities.PluginRuntimeInterface) error {
 	return nil
 }
 
-func (p *PluginManager) List() []entities.PluginRuntimeInterface {
-	var runtimes []entities.PluginRuntimeInterface
+func (p *PluginManager) List() []plugin_entities.PluginRuntimeInterface {
+	var runtimes []plugin_entities.PluginRuntimeInterface
 	p.m.Range(func(key, value interface{}) bool {
-		if v, ok := value.(entities.PluginRuntimeInterface); ok {
+		if v, ok := value.(plugin_entities.PluginRuntimeInterface); ok {
 			runtimes = append(runtimes, v)
 		}
 		return true
@@ -58,9 +58,9 @@ func (p *PluginManager) List() []entities.PluginRuntimeInterface {
 	return runtimes
 }
 
-func (p *PluginManager) Get(identity string) entities.PluginRuntimeInterface {
+func (p *PluginManager) Get(identity string) plugin_entities.PluginRuntimeInterface {
 	if v, ok := p.m.Load(identity); ok {
-		if r, ok := v.(entities.PluginRuntimeInterface); ok {
+		if r, ok := v.(plugin_entities.PluginRuntimeInterface); ok {
 			return r
 		}
 	}

+ 2 - 3
internal/core/plugin_manager/remote_manager/run.go

@@ -4,7 +4,6 @@ import (
 	"time"
 
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/plugin_errors"
-	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
@@ -27,8 +26,8 @@ func (r *RemotePluginRuntime) Stop() {
 	r.conn.Close()
 }
 
-func (r *RemotePluginRuntime) Type() entities.PluginRuntimeType {
-	return entities.PLUGIN_RUNTIME_TYPE_REMOTE
+func (r *RemotePluginRuntime) Type() plugin_entities.PluginRuntimeType {
+	return plugin_entities.PLUGIN_RUNTIME_TYPE_REMOTE
 }
 
 func (r *RemotePluginRuntime) StartPlugin() error {

+ 2 - 2
internal/core/plugin_manager/remote_manager/type.go

@@ -4,13 +4,13 @@ import (
 	"sync"
 	"time"
 
-	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
 	"github.com/panjf2000/gnet/v2"
 )
 
 type RemotePluginRuntime struct {
-	entities.PluginRuntime
+	plugin_entities.PluginRuntime
 
 	// connection
 	conn gnet.Conn

+ 5 - 6
internal/core/plugin_manager/watcher.go

@@ -14,7 +14,6 @@ import (
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/decoder"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_packager/verifier"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
-	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
@@ -51,7 +50,7 @@ func (p *PluginManager) startRemoteWatcher(config *app.Config) {
 func (p *PluginManager) handleNewPlugins(config *app.Config) {
 	// load local plugins firstly
 	for plugin := range p.loadNewPlugins(config.PluginStoragePath) {
-		var plugin_interface entities.PluginRuntimeInterface
+		var plugin_interface plugin_entities.PluginRuntimeInterface
 
 		if config.Platform == app.PLATFORM_AWS_LAMBDA {
 			plugin_interface = &aws_manager.AWSPluginRuntime{
@@ -83,7 +82,7 @@ func (p *PluginManager) handleNewPlugins(config *app.Config) {
 }
 
 type pluginRuntimeWithDecoder struct {
-	Runtime entities.PluginRuntime
+	Runtime plugin_entities.PluginRuntime
 	Decoder decoder.PluginDecoder
 }
 
@@ -193,10 +192,10 @@ func (p *PluginManager) loadPlugin(plugin_path string) (*pluginRuntimeWithDecode
 	}
 
 	return &pluginRuntimeWithDecoder{
-		Runtime: entities.PluginRuntime{
+		Runtime: plugin_entities.PluginRuntime{
 			Config: manifest,
-			State: entities.PluginRuntimeState{
-				Status:       entities.PLUGIN_RUNTIME_STATUS_PENDING,
+			State: plugin_entities.PluginRuntimeState{
+				Status:       plugin_entities.PLUGIN_RUNTIME_STATUS_PENDING,
 				Restarts:     0,
 				AbsolutePath: plugin_path,
 				WorkingPath:  plugin_working_path,

+ 4 - 3
internal/core/session_manager/session.go

@@ -5,7 +5,7 @@ import (
 	"sync"
 
 	"github.com/google/uuid"
-	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
 )
 
@@ -14,9 +14,10 @@ var (
 	session_lock sync.RWMutex
 )
 
+// session need to implement the backwards_invocation.BackwardsInvocationWriter interface
 type Session struct {
 	id      string
-	runtime entities.PluginRuntimeSessionIOInterface
+	runtime plugin_entities.PluginRuntimeSessionIOInterface
 
 	tenant_id       string
 	user_id         string
@@ -76,7 +77,7 @@ func (s *Session) PluginIdentity() string {
 	return s.plugin_identity
 }
 
-func (s *Session) BindRuntime(runtime entities.PluginRuntimeSessionIOInterface) {
+func (s *Session) BindRuntime(runtime plugin_entities.PluginRuntimeSessionIOInterface) {
 	s.runtime = runtime
 }
 

+ 3 - 3
internal/service/install_service/state.go

@@ -4,7 +4,7 @@ import (
 	"time"
 
 	"github.com/langgenius/dify-plugin-daemon/internal/db"
-	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/models"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/models/curd"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/strings"
@@ -13,7 +13,7 @@ import (
 func InstallPlugin(
 	tenant_id string,
 	user_id string,
-	runtime entities.PluginRuntimeInterface,
+	runtime plugin_entities.PluginRuntimeInterface,
 	configuration map[string]any,
 ) (string, error) {
 	identity, err := runtime.Identity()
@@ -45,7 +45,7 @@ func InstallPlugin(
 	return installation.ID, nil
 }
 
-func UninstallPlugin(tenant_id string, installation_id string, runtime entities.PluginRuntimeInterface) error {
+func UninstallPlugin(tenant_id string, installation_id string, runtime plugin_entities.PluginRuntimeInterface) error {
 	identity, err := runtime.Identity()
 	if err != nil {
 		return err

+ 6 - 2
internal/types/entities/plugin_entities/event.go

@@ -26,8 +26,12 @@ type PluginLogEvent struct {
 }
 
 type SessionMessage struct {
-	Type SESSION_MESSAGE_TYPE `json:"type"`
-	Data json.RawMessage      `json:"data"`
+	Type        SESSION_MESSAGE_TYPE `json:"type"`
+	Data        json.RawMessage      `json:"data"`
+	RuntimeType PluginRuntimeType    `json:"runtime_type"`
+
+	// only used for aws event bridge, not used for stdio and tcp
+	ServerlessEventId string `json:"serverless_event_id"`
 }
 
 type SESSION_MESSAGE_TYPE string

+ 8 - 8
internal/types/entities/runtime.go

@@ -1,4 +1,4 @@
-package entities
+package plugin_entities
 
 import (
 	"bytes"
@@ -9,14 +9,14 @@ import (
 	"hash/fnv"
 	"time"
 
-	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
 )
 
 type (
 	PluginRuntime struct {
-		State     PluginRuntimeState                `json:"state"`
-		Config    plugin_entities.PluginDeclaration `json:"config"`
-		onStopped []func()                          `json:"-"`
+		State     PluginRuntimeState `json:"state"`
+		Config    PluginDeclaration  `json:"config"`
+		onStopped []func()           `json:"-"`
 	}
 
 	PluginRuntimeInterface interface {
@@ -28,7 +28,7 @@ type (
 
 	PluginRuntimeTimeLifeInterface interface {
 		// returns the plugin configuration
-		Configuration() *plugin_entities.PluginDeclaration
+		Configuration() *PluginDeclaration
 		// unique identity of the plugin
 		Identity() (string, error)
 		// hashed identity of the plugin
@@ -84,7 +84,7 @@ type (
 	}
 
 	PluginRuntimeSessionIOInterface interface {
-		Listen(session_id string) *BytesIOListener
+		Listen(session_id string) *entities.BytesIOListener
 		Write(session_id string, data []byte)
 	}
 
@@ -101,7 +101,7 @@ func (r *PluginRuntime) Stop() {
 	r.State.Status = PLUGIN_RUNTIME_STATUS_STOPPED
 }
 
-func (r *PluginRuntime) Configuration() *plugin_entities.PluginDeclaration {
+func (r *PluginRuntime) Configuration() *PluginDeclaration {
 	return &r.Config
 }
 

+ 1 - 1
internal/types/entities/runtime_test.go

@@ -1,4 +1,4 @@
-package entities
+package plugin_entities
 
 import (
 	"testing"

+ 5 - 6
internal/types/models/plugin.go

@@ -1,15 +1,14 @@
 package models
 
 import (
-	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 )
 
 type Plugin struct {
 	Model
-	PluginID     string                           `json:"id" orm:"index;size:127"`
-	Refers       int                              `json:"refers" orm:"default:0"`
-	Checksum     string                           `json:"checksum" orm:"size:127"`
-	InstallType  entities.PluginRuntimeType       `json:"install_type" orm:"size:127;index"`
-	ManifestType plugin_entities.DifyManifestType `json:"manifest_type" orm:"size:127"`
+	PluginID     string                            `json:"id" orm:"index;size:127"`
+	Refers       int                               `json:"refers" orm:"default:0"`
+	Checksum     string                            `json:"checksum" orm:"size:127"`
+	InstallType  plugin_entities.PluginRuntimeType `json:"install_type" orm:"size:127;index"`
+	ManifestType plugin_entities.DifyManifestType  `json:"manifest_type" orm:"size:127"`
 }