Explorar o código

feat: refactor aws

Yeuoly hai 1 ano
pai
achega
73cdc945ee

+ 1 - 0
cmd/server/main.go

@@ -42,6 +42,7 @@ func setDefault(config *app.Config) {
 	setDefaultInt(&config.PluginRemoteInstallingMaxConn, 128)
 	setDefaultInt(&config.MaxPluginPackageSize, 52428800)
 	setDefaultInt(&config.MaxAWSLambdaTransactionTimeout, 150)
+	setDefaultInt(&config.PluginMaxExecutionTimeout, 240)
 	setDefaultBool(&config.PluginRemoteInstallingEnabled, true)
 	setDefaultBool(&config.PluginWebhookEnabled, true)
 	setDefaultString(&config.DBSslMode, "disable")

+ 2 - 2
internal/core/plugin_daemon/backwards_invocation/request.go

@@ -86,12 +86,12 @@ func (bi *BackwardsInvocation) TenantID() (string, error) {
 	if bi.session == nil {
 		return "", fmt.Errorf("session is nil")
 	}
-	return bi.session.TenantID(), nil
+	return bi.session.TenantID, nil
 }
 
 func (bi *BackwardsInvocation) UserID() (string, error) {
 	if bi.session == nil {
 		return "", fmt.Errorf("session is nil")
 	}
-	return bi.session.UserID(), nil
+	return bi.session.UserID, nil
 }

+ 20 - 20
internal/core/plugin_daemon/backwards_invocation/task.go

@@ -14,7 +14,7 @@ import (
 )
 
 func InvokeDify(
-	runtime plugin_entities.PluginRuntimeInterface,
+	declaration *plugin_entities.PluginDeclaration,
 	invoke_from access_types.PluginAccessType,
 	session *session_manager.Session,
 	writer BackwardsInvocationWriter,
@@ -43,7 +43,7 @@ func InvokeDify(
 	}
 
 	// check permission
-	if err := checkPermission(runtime, request_handle); err != nil {
+	if err := checkPermission(declaration, request_handle); err != nil {
 		request_handle.WriteError(err)
 		request_handle.EndResponse()
 		return nil
@@ -61,63 +61,63 @@ func InvokeDify(
 var (
 	permissionMapping = map[dify_invocation.InvokeType]map[string]any{
 		dify_invocation.INVOKE_TYPE_TOOL: {
-			"func": func(runtime plugin_entities.PluginRuntimeTimeLifeInterface) bool {
-				return runtime.Configuration().Resource.Permission.AllowInvokeTool()
+			"func": func(declaration *plugin_entities.PluginDeclaration) bool {
+				return declaration.Resource.Permission.AllowInvokeTool()
 			},
 			"error": "permission denied, you need to enable tool access in plugin manifest",
 		},
 		dify_invocation.INVOKE_TYPE_LLM: {
-			"func": func(runtime plugin_entities.PluginRuntimeTimeLifeInterface) bool {
-				return runtime.Configuration().Resource.Permission.AllowInvokeLLM()
+			"func": func(declaration *plugin_entities.PluginDeclaration) bool {
+				return declaration.Resource.Permission.AllowInvokeLLM()
 			},
 			"error": "permission denied, you need to enable llm access in plugin manifest",
 		},
 		dify_invocation.INVOKE_TYPE_TEXT_EMBEDDING: {
-			"func": func(runtime plugin_entities.PluginRuntimeTimeLifeInterface) bool {
-				return runtime.Configuration().Resource.Permission.AllowInvokeTextEmbedding()
+			"func": func(declaration *plugin_entities.PluginDeclaration) bool {
+				return declaration.Resource.Permission.AllowInvokeTextEmbedding()
 			},
 			"error": "permission denied, you need to enable text-embedding access in plugin manifest",
 		},
 		dify_invocation.INVOKE_TYPE_RERANK: {
-			"func": func(runtime plugin_entities.PluginRuntimeTimeLifeInterface) bool {
-				return runtime.Configuration().Resource.Permission.AllowInvokeRerank()
+			"func": func(declaration *plugin_entities.PluginDeclaration) bool {
+				return declaration.Resource.Permission.AllowInvokeRerank()
 			},
 			"error": "permission denied, you need to enable rerank access in plugin manifest",
 		},
 		dify_invocation.INVOKE_TYPE_TTS: {
-			"func": func(runtime plugin_entities.PluginRuntimeTimeLifeInterface) bool {
-				return runtime.Configuration().Resource.Permission.AllowInvokeTTS()
+			"func": func(declaration *plugin_entities.PluginDeclaration) bool {
+				return declaration.Resource.Permission.AllowInvokeTTS()
 			},
 			"error": "permission denied, you need to enable tts access in plugin manifest",
 		},
 		dify_invocation.INVOKE_TYPE_SPEECH2TEXT: {
-			"func": func(runtime plugin_entities.PluginRuntimeTimeLifeInterface) bool {
-				return runtime.Configuration().Resource.Permission.AllowInvokeSpeech2Text()
+			"func": func(declaration *plugin_entities.PluginDeclaration) bool {
+				return declaration.Resource.Permission.AllowInvokeSpeech2Text()
 			},
 			"error": "permission denied, you need to enable speech2text access in plugin manifest",
 		},
 		dify_invocation.INVOKE_TYPE_MODERATION: {
-			"func": func(runtime plugin_entities.PluginRuntimeTimeLifeInterface) bool {
-				return runtime.Configuration().Resource.Permission.AllowInvokeModeration()
+			"func": func(declaration *plugin_entities.PluginDeclaration) bool {
+				return declaration.Resource.Permission.AllowInvokeModeration()
 			},
 			"error": "permission denied, you need to enable moderation access in plugin manifest",
 		},
 		dify_invocation.INVOKE_TYPE_NODE: {
-			"func": func(runtime plugin_entities.PluginRuntimeTimeLifeInterface) bool {
-				return runtime.Configuration().Resource.Permission.AllowInvokeNode()
+			"func": func(declaration *plugin_entities.PluginDeclaration) bool {
+				return declaration.Resource.Permission.AllowInvokeNode()
 			},
 			"error": "permission denied, you need to enable node access in plugin manifest",
 		},
 	}
 )
 
-func checkPermission(runtime plugin_entities.PluginRuntimeTimeLifeInterface, request_handle *BackwardsInvocation) error {
+func checkPermission(runtime *plugin_entities.PluginDeclaration, request_handle *BackwardsInvocation) error {
 	permission, ok := permissionMapping[request_handle.Type()]
 	if !ok {
 		return fmt.Errorf("unsupported invoke type: %s", request_handle.Type())
 	}
 
-	permission_func, ok := permission["func"].(func(runtime plugin_entities.PluginRuntimeTimeLifeInterface) bool)
+	permission_func, ok := permission["func"].(func(runtime *plugin_entities.PluginDeclaration) bool)
 	if !ok {
 		return fmt.Errorf("permission function not found: %s", request_handle.Type())
 	}

+ 19 - 27
internal/core/plugin_daemon/backwards_invocation/task_test.go

@@ -38,27 +38,23 @@ func (r *TPluginRuntime) Wait() (<-chan bool, error) {
 }
 
 func TestBackwardsInvocationAllPermittedPermission(t *testing.T) {
-	all_permitted_runtime := TPluginRuntime{
-		PluginRuntime: plugin_entities.PluginRuntime{
-			Config: plugin_entities.PluginDeclaration{
-				Resource: plugin_entities.PluginResourceRequirement{
-					Permission: &plugin_entities.PluginPermissionRequirement{
-						Tool: &plugin_entities.PluginPermissionToolRequirement{
-							Enabled: true,
-						},
-						Model: &plugin_entities.PluginPermissionModelRequirement{
-							Enabled:       true,
-							LLM:           true,
-							TextEmbedding: true,
-							Rerank:        true,
-							Moderation:    true,
-							TTS:           true,
-							Speech2text:   true,
-						},
-						Node: &plugin_entities.PluginPermissionNodeRequirement{
-							Enabled: true,
-						},
-					},
+	all_permitted_runtime := plugin_entities.PluginDeclaration{
+		Resource: plugin_entities.PluginResourceRequirement{
+			Permission: &plugin_entities.PluginPermissionRequirement{
+				Tool: &plugin_entities.PluginPermissionToolRequirement{
+					Enabled: true,
+				},
+				Model: &plugin_entities.PluginPermissionModelRequirement{
+					Enabled:       true,
+					LLM:           true,
+					TextEmbedding: true,
+					Rerank:        true,
+					Moderation:    true,
+					TTS:           true,
+					Speech2text:   true,
+				},
+				Node: &plugin_entities.PluginPermissionNodeRequirement{
+					Enabled: true,
 				},
 			},
 		},
@@ -106,12 +102,8 @@ func TestBackwardsInvocationAllPermittedPermission(t *testing.T) {
 }
 
 func TestBackwardsInvocationAllDeniedPermission(t *testing.T) {
-	all_denied_runtime := TPluginRuntime{
-		PluginRuntime: plugin_entities.PluginRuntime{
-			Config: plugin_entities.PluginDeclaration{
-				Resource: plugin_entities.PluginResourceRequirement{},
-			},
-		},
+	all_denied_runtime := plugin_entities.PluginDeclaration{
+		Resource: plugin_entities.PluginResourceRequirement{},
 	}
 
 	invoke_llm_request := NewBackwardsInvocation(dify_invocation.INVOKE_TYPE_LLM, "", nil, nil, nil)

+ 17 - 8
internal/core/plugin_daemon/backwards_invocation/transaction/aws_event_handler.go

@@ -7,7 +7,8 @@ import (
 	"time"
 
 	"github.com/gin-gonic/gin"
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/aws_manager"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/backwards_invocation"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
@@ -39,7 +40,6 @@ func (w *awsTransactionWriteCloser) Close() error {
 func (h *AWSTransactionHandler) Handle(
 	ctx *gin.Context,
 	session_id string,
-	runtime *aws_manager.AWSPluginRuntime,
 ) {
 	writer := &awsTransactionWriteCloser{
 		ResponseWriter: ctx.Writer,
@@ -67,17 +67,26 @@ func (h *AWSTransactionHandler) Handle(
 		return
 	}
 
-	data.RuntimeType = plugin_entities.PLUGIN_RUNTIME_TYPE_AWS
-	data.SessionWriter = writer
-
-	// send the data to the plugin runtime
-	if err := runtime.PushRequest(session_id, data); err != nil {
-		log.Error("push request failed: %s", err.Error())
+	session := session_manager.GetSession(session_id)
+	if err != nil {
+		log.Error("get session failed: %s", err.Error())
 		writer.WriteHeader(http.StatusInternalServerError)
 		writer.Write([]byte(err.Error()))
 		return
 	}
 
+	aws_response_writer := NewAWSTransactionWriter(session, writer)
+
+	if err := backwards_invocation.InvokeDify(
+		session.Declaration,
+		session.InvokeFrom,
+		session,
+		aws_response_writer,
+		data.Data,
+	); err != nil {
+		log.Error("invoke dify failed: %s", err.Error())
+	}
+
 	select {
 	case <-writer.done:
 		return

+ 14 - 16
internal/core/plugin_daemon/generic.go

@@ -3,7 +3,6 @@ package plugin_daemon
 import (
 	"errors"
 
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/backwards_invocation"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/backwards_invocation/transaction"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
@@ -18,17 +17,15 @@ func genericInvokePlugin[Req any, Rsp any](
 	session *session_manager.Session,
 	request *Req,
 	response_buffer_size int,
-	typ access_types.PluginAccessType,
-	action access_types.PluginAccessAction,
 ) (*stream.StreamResponse[Rsp], error) {
-	runtime := plugin_manager.GetGlobalPluginManager().Get(session.PluginIdentity())
+	runtime := plugin_manager.GetGlobalPluginManager().Get(session.PluginIdentity)
 	if runtime == nil {
 		return nil, errors.New("plugin not found")
 	}
 
 	response := stream.NewStreamResponse[Rsp](response_buffer_size)
 
-	listener := runtime.Listen(session.ID())
+	listener := runtime.Listen(session.ID)
 	listener.Listen(func(chunk plugin_entities.SessionMessage) {
 		switch chunk.Type {
 		case plugin_entities.SESSION_MESSAGE_TYPE_STREAM:
@@ -41,13 +38,18 @@ func genericInvokePlugin[Req any, Rsp any](
 			}
 		case plugin_entities.SESSION_MESSAGE_TYPE_INVOKE:
 			// check if the request contains a aws_event_id
-			var writer backwards_invocation.BackwardsInvocationWriter
-			if chunk.RuntimeType == plugin_entities.PLUGIN_RUNTIME_TYPE_AWS {
-				writer = transaction.NewAWSTransactionWriter(session, chunk.SessionWriter)
-			} else {
-				writer = transaction.NewFullDuplexEventWriter(session)
+			if runtime.Type() == plugin_entities.PLUGIN_RUNTIME_TYPE_AWS {
+				response.WriteError(errors.New("aws event is not supported by full duplex"))
+				response.Close()
+				return
 			}
-			if err := backwards_invocation.InvokeDify(runtime, typ, session, writer, chunk.Data); err != nil {
+			if err := backwards_invocation.InvokeDify(
+				runtime.Configuration(),
+				session.InvokeFrom,
+				session,
+				transaction.NewFullDuplexEventWriter(session),
+				chunk.Data,
+			); err != nil {
 				log.Error("invoke dify failed: %s", err.Error())
 				return
 			}
@@ -74,8 +76,6 @@ func genericInvokePlugin[Req any, Rsp any](
 		session_manager.PLUGIN_IN_STREAM_EVENT_REQUEST,
 		getInvokePluginMap(
 			session,
-			typ,
-			action,
 			request,
 		),
 	)
@@ -85,11 +85,9 @@ func genericInvokePlugin[Req any, Rsp any](
 
 func getInvokePluginMap(
 	session *session_manager.Session,
-	typ access_types.PluginAccessType,
-	action access_types.PluginAccessAction,
 	request any,
 ) map[string]any {
-	req := getBasicPluginAccessMap(session.UserID(), typ, action)
+	req := getBasicPluginAccessMap(session.UserID, session.InvokeFrom, session.Action)
 	for k, v := range parser.StructToMap(request) {
 		req[k] = v
 	}

+ 0 - 17
internal/core/plugin_daemon/model_service.go

@@ -1,7 +1,6 @@
 package plugin_daemon
 
 import (
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/model_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
@@ -18,8 +17,6 @@ func InvokeLLM(
 		session,
 		request,
 		512,
-		access_types.PLUGIN_ACCESS_TYPE_MODEL,
-		access_types.PLUGIN_ACCESS_ACTION_INVOKE_LLM,
 	)
 }
 
@@ -33,8 +30,6 @@ func InvokeTextEmbedding(
 		session,
 		request,
 		1,
-		access_types.PLUGIN_ACCESS_TYPE_MODEL,
-		access_types.PLUGIN_ACCESS_ACTION_INVOKE_TEXT_EMBEDDING,
 	)
 }
 
@@ -48,8 +43,6 @@ func InvokeRerank(
 		session,
 		request,
 		1,
-		access_types.PLUGIN_ACCESS_TYPE_MODEL,
-		access_types.PLUGIN_ACCESS_ACTION_INVOKE_RERANK,
 	)
 }
 
@@ -63,8 +56,6 @@ func InvokeTTS(
 		session,
 		request,
 		1,
-		access_types.PLUGIN_ACCESS_TYPE_MODEL,
-		access_types.PLUGIN_ACCESS_ACTION_INVOKE_TTS,
 	)
 }
 
@@ -78,8 +69,6 @@ func InvokeSpeech2Text(
 		session,
 		request,
 		1,
-		access_types.PLUGIN_ACCESS_TYPE_MODEL,
-		access_types.PLUGIN_ACCESS_ACTION_INVOKE_SPEECH2TEXT,
 	)
 }
 
@@ -93,8 +82,6 @@ func InvokeModeration(
 		session,
 		request,
 		1,
-		access_types.PLUGIN_ACCESS_TYPE_MODEL,
-		access_types.PLUGIN_ACCESS_ACTION_INVOKE_MODERATION,
 	)
 }
 
@@ -108,8 +95,6 @@ func ValidateProviderCredentials(
 		session,
 		request,
 		1,
-		access_types.PLUGIN_ACCESS_TYPE_MODEL,
-		access_types.PLUGIN_ACCESS_ACTION_VALIDATE_PROVIDER_CREDENTIALS,
 	)
 }
 
@@ -123,7 +108,5 @@ func ValidateModelCredentials(
 		session,
 		request,
 		1,
-		access_types.PLUGIN_ACCESS_TYPE_MODEL,
-		access_types.PLUGIN_ACCESS_ACTION_VALIDATE_MODEL_CREDENTIALS,
 	)
 }

+ 0 - 5
internal/core/plugin_daemon/tool_service.go

@@ -1,7 +1,6 @@
 package plugin_daemon
 
 import (
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/tool_entities"
@@ -18,8 +17,6 @@ func InvokeTool(
 		session,
 		request,
 		128,
-		access_types.PLUGIN_ACCESS_TYPE_TOOL,
-		access_types.PLUGIN_ACCESS_ACTION_INVOKE_TOOL,
 	)
 }
 
@@ -33,7 +30,5 @@ func ValidateToolCredentials(
 		session,
 		request,
 		1,
-		access_types.PLUGIN_ACCESS_TYPE_TOOL,
-		access_types.PLUGIN_ACCESS_ACTION_VALIDATE_TOOL_CREDENTIALS,
 	)
 }

+ 0 - 3
internal/core/plugin_daemon/webhook_service.go

@@ -4,7 +4,6 @@ import (
 	"encoding/hex"
 	"net/http"
 
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/webhook_entities"
@@ -22,8 +21,6 @@ func InvokeWebhook(
 		session,
 		request,
 		128,
-		access_types.PLUGIN_ACCESS_TYPE_WEBHOOK,
-		access_types.PLUGIN_ACCESS_ACTION_WEBHOOK,
 	)
 
 	if err != nil {

+ 0 - 15
internal/core/plugin_manager/aws_manager/io.go

@@ -73,24 +73,9 @@ func (r *AWSPluginRuntime) Write(session_id string, data []byte) {
 				continue
 			}
 
-			data.RuntimeType = r.Type()
 			l.Send(data)
 		}
 
 		l.Close()
 	})
 }
-
-func (r *AWSPluginRuntime) PushRequest(session_id string, data plugin_entities.SessionMessage) error {
-	if r.Type() != data.RuntimeType {
-		return fmt.Errorf("runtime type mismatch")
-	}
-
-	broadcast, ok := r.listeners.Load(session_id)
-	if !ok {
-		return fmt.Errorf("session %s not found", session_id)
-	}
-
-	broadcast.Send(data)
-	return nil
-}

+ 0 - 2
internal/core/plugin_manager/local_manager/io.go

@@ -19,8 +19,6 @@ func (r *LocalPluginRuntime) Listen(session_id string) *entities.Broadcast[plugi
 			log.Error("unmarshal json failed: %s, failed to parse session message", err.Error())
 			return
 		}
-		// set the runtime type
-		data.RuntimeType = r.Type()
 
 		listener.Send(data)
 	})

+ 0 - 2
internal/core/plugin_manager/remote_manager/io.go

@@ -21,8 +21,6 @@ func (r *RemotePluginRuntime) Listen(session_id string) *entities.Broadcast[plug
 			log.Error("unmarshal json failed: %s, failed to parse session message", err.Error())
 			return
 		}
-		// set the runtime type
-		chunk.RuntimeType = r.Type()
 
 		listener.Send(chunk)
 	})

+ 37 - 56
internal/core/session_manager/session.go

@@ -2,9 +2,12 @@ package session_manager
 
 import (
 	"errors"
+	"fmt"
 	"sync"
+	"time"
 
 	"github.com/google/uuid"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/cache"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/log"
@@ -18,47 +21,47 @@ var (
 
 // session need to implement the backwards_invocation.BackwardsInvocationWriter interface
 type Session struct {
-	id      string
-	runtime plugin_entities.PluginRuntimeInterface
-
-	tenant_id       string
-	user_id         string
-	plugin_identity string
-	cluster_id      string
+	ID      string                                 `json:"id"`
+	runtime plugin_entities.PluginRuntimeInterface `json:"-"`
+
+	TenantID       string                             `json:"tenant_id"`
+	UserID         string                             `json:"user_id"`
+	PluginIdentity string                             `json:"plugin_identity"`
+	ClusterID      string                             `json:"cluster_id"`
+	InvokeFrom     access_types.PluginAccessType      `json:"invoke_from"`
+	Action         access_types.PluginAccessAction    `json:"action"`
+	Declaration    *plugin_entities.PluginDeclaration `json:"declaration"`
 }
 
-type SessionInfo struct {
-	TenantID       string `json:"tenant_id"`
-	UserID         string `json:"user_id"`
-	PluginIdentity string `json:"plugin_identity"`
-	ClusterID      string `json:"cluster_id"`
+func sessionKey(id string) string {
+	return fmt.Sprintf("session_info:%s", id)
 }
 
-const (
-	SESSION_INFO_MAP_KEY = "session_info"
-)
-
-func NewSession(tenant_id string, user_id string, plugin_identity string, cluster_id string) *Session {
+func NewSession(
+	tenant_id string,
+	user_id string,
+	plugin_identity string,
+	cluster_id string,
+	invoke_from access_types.PluginAccessType,
+	action access_types.PluginAccessAction,
+	declaration *plugin_entities.PluginDeclaration,
+) *Session {
 	s := &Session{
-		id:              uuid.New().String(),
-		tenant_id:       tenant_id,
-		user_id:         user_id,
-		plugin_identity: plugin_identity,
-		cluster_id:      cluster_id,
-	}
-
-	session_lock.Lock()
-	sessions[s.id] = s
-	session_lock.Unlock()
-
-	session_info := &SessionInfo{
+		ID:             uuid.New().String(),
 		TenantID:       tenant_id,
 		UserID:         user_id,
 		PluginIdentity: plugin_identity,
 		ClusterID:      cluster_id,
+		InvokeFrom:     invoke_from,
+		Action:         action,
+		Declaration:    declaration,
 	}
 
-	if err := cache.SetMapOneField(SESSION_INFO_MAP_KEY, s.id, session_info); err != nil {
+	session_lock.Lock()
+	sessions[s.ID] = s
+	session_lock.Unlock()
+
+	if err := cache.Store(sessionKey(s.ID), s, time.Minute*30); err != nil {
 		log.Error("set session info to cache failed, %s", err)
 	}
 
@@ -77,35 +80,13 @@ func DeleteSession(id string) {
 	delete(sessions, id)
 	session_lock.Unlock()
 
-	if err := cache.DelMapField(SESSION_INFO_MAP_KEY, id); err != nil {
+	if err := cache.Del(sessionKey(id)); err != nil {
 		log.Error("delete session info from cache failed, %s", err)
 	}
 }
 
 func (s *Session) Close() {
-	session_lock.Lock()
-	delete(sessions, s.id)
-	session_lock.Unlock()
-
-	if err := cache.DelMapField(SESSION_INFO_MAP_KEY, s.id); err != nil {
-		log.Error("delete session info from cache failed, %s", err)
-	}
-}
-
-func (s *Session) ID() string {
-	return s.id
-}
-
-func (s *Session) TenantID() string {
-	return s.tenant_id
-}
-
-func (s *Session) UserID() string {
-	return s.user_id
-}
-
-func (s *Session) PluginIdentity() string {
-	return s.plugin_identity
+	DeleteSession(s.ID)
 }
 
 func (s *Session) BindRuntime(runtime plugin_entities.PluginRuntimeInterface) {
@@ -125,7 +106,7 @@ const (
 
 func (s *Session) Message(event PLUGIN_IN_STREAM_EVENT, data any) []byte {
 	return parser.MarshalJsonBytes(map[string]any{
-		"session_id": s.id,
+		"session_id": s.ID,
 		"event":      event,
 		"data":       data,
 	})
@@ -135,6 +116,6 @@ func (s *Session) Write(event PLUGIN_IN_STREAM_EVENT, data any) error {
 	if s.runtime == nil {
 		return errors.New("runtime not bound")
 	}
-	s.runtime.Write(s.id, s.Message(event, data))
+	s.runtime.Write(s.ID, s.Message(event, data))
 	return nil
 }

+ 73 - 56
internal/server/controllers/model.go

@@ -3,94 +3,111 @@ package controllers
 import (
 	"github.com/gin-gonic/gin"
 	"github.com/langgenius/dify-plugin-daemon/internal/service"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
 )
 
-func InvokeLLM(c *gin.Context) {
+func InvokeLLM(config *app.Config) gin.HandlerFunc {
 	type request = plugin_entities.InvokePluginRequest[requests.RequestInvokeLLM]
 
-	BindRequest[request](
-		c,
-		func(itr request) {
-			service.InvokeLLM(&itr, c)
-		},
-	)
+	return func(c *gin.Context) {
+		BindRequest[request](
+			c,
+			func(itr request) {
+				service.InvokeLLM(&itr, c, config.PluginMaxExecutionTimeout)
+			},
+		)
+	}
 }
 
-func InvokeTextEmbedding(c *gin.Context) {
+func InvokeTextEmbedding(config *app.Config) gin.HandlerFunc {
 	type request = plugin_entities.InvokePluginRequest[requests.RequestInvokeTextEmbedding]
 
-	BindRequest[request](
-		c,
-		func(itr request) {
-			service.InvokeTextEmbedding(&itr, c)
-		},
-	)
+	return func(c *gin.Context) {
+		BindRequest[request](
+			c,
+			func(itr request) {
+				service.InvokeTextEmbedding(&itr, c, config.PluginMaxExecutionTimeout)
+			},
+		)
+	}
 }
 
-func InvokeRerank(c *gin.Context) {
+func InvokeRerank(config *app.Config) gin.HandlerFunc {
 	type request = plugin_entities.InvokePluginRequest[requests.RequestInvokeRerank]
 
-	BindRequest[request](
-		c,
-		func(itr request) {
-			service.InvokeRerank(&itr, c)
-		},
-	)
+	return func(c *gin.Context) {
+		BindRequest[request](
+			c,
+			func(itr request) {
+				service.InvokeRerank(&itr, c, config.PluginMaxExecutionTimeout)
+			},
+		)
+	}
 }
 
-func InvokeTTS(c *gin.Context) {
+func InvokeTTS(config *app.Config) gin.HandlerFunc {
 	type request = plugin_entities.InvokePluginRequest[requests.RequestInvokeTTS]
 
-	BindRequest[request](
-		c,
-		func(itr request) {
-			service.InvokeTTS(&itr, c)
-		},
-	)
+	return func(c *gin.Context) {
+		BindRequest[request](
+			c,
+			func(itr request) {
+				service.InvokeTTS(&itr, c, config.PluginMaxExecutionTimeout)
+			},
+		)
+	}
 }
 
-func InvokeSpeech2Text(c *gin.Context) {
+func InvokeSpeech2Text(config *app.Config) gin.HandlerFunc {
 	type request = plugin_entities.InvokePluginRequest[requests.RequestInvokeSpeech2Text]
 
-	BindRequest[request](
-		c,
-		func(itr request) {
-			service.InvokeSpeech2Text(&itr, c)
-		},
-	)
+	return func(c *gin.Context) {
+		BindRequest[request](
+			c,
+			func(itr request) {
+				service.InvokeSpeech2Text(&itr, c, config.PluginMaxExecutionTimeout)
+			},
+		)
+	}
 }
 
-func InvokeModeration(c *gin.Context) {
+func InvokeModeration(config *app.Config) gin.HandlerFunc {
 	type request = plugin_entities.InvokePluginRequest[requests.RequestInvokeModeration]
 
-	BindRequest[request](
-		c,
-		func(itr request) {
-			service.InvokeModeration(&itr, c)
-		},
-	)
+	return func(c *gin.Context) {
+		BindRequest[request](
+			c,
+			func(itr request) {
+				service.InvokeModeration(&itr, c, config.PluginMaxExecutionTimeout)
+			},
+		)
+	}
 }
 
-func ValidateProviderCredentials(c *gin.Context) {
+func ValidateProviderCredentials(config *app.Config) gin.HandlerFunc {
 	type request = plugin_entities.InvokePluginRequest[requests.RequestValidateProviderCredentials]
 
-	BindRequest[request](
-		c,
-		func(itr request) {
-			service.ValidateProviderCredentials(&itr, c)
-		},
-	)
+	return func(c *gin.Context) {
+		BindRequest[request](
+			c,
+			func(itr request) {
+				service.ValidateProviderCredentials(&itr, c, config.PluginMaxExecutionTimeout)
+			},
+		)
+	}
 }
 
-func ValidateModelCredentials(c *gin.Context) {
+func ValidateModelCredentials(config *app.Config) gin.HandlerFunc {
 	type request = plugin_entities.InvokePluginRequest[requests.RequestValidateModelCredentials]
 
-	BindRequest[request](
-		c,
-		func(itr request) {
-			service.ValidateModelCredentials(&itr, c)
-		},
-	)
+	return func(c *gin.Context) {
+		BindRequest[request](
+			c,
+			func(itr request) {
+				service.ValidateModelCredentials(&itr, c, config.PluginMaxExecutionTimeout)
+			},
+		)
+	}
 }

+ 19 - 14
internal/server/controllers/tool.go

@@ -3,28 +3,33 @@ package controllers
 import (
 	"github.com/gin-gonic/gin"
 	"github.com/langgenius/dify-plugin-daemon/internal/service"
+	"github.com/langgenius/dify-plugin-daemon/internal/types/app"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
 )
 
-func InvokeTool(c *gin.Context) {
+func InvokeTool(config *app.Config) gin.HandlerFunc {
 	type request = plugin_entities.InvokePluginRequest[requests.RequestInvokeTool]
 
-	BindRequest[request](
-		c,
-		func(itr request) {
-			service.InvokeTool(&itr, c)
-		},
-	)
+	return func(c *gin.Context) {
+		BindRequest[request](
+			c,
+			func(itr request) {
+				service.InvokeTool(&itr, c, config.PluginMaxExecutionTimeout)
+			},
+		)
+	}
 }
 
-func ValidateToolCredentials(c *gin.Context) {
+func ValidateToolCredentials(config *app.Config) gin.HandlerFunc {
 	type request = plugin_entities.InvokePluginRequest[requests.RequestValidateToolCredentials]
 
-	BindRequest[request](
-		c,
-		func(itr request) {
-			service.ValidateToolCredentials(&itr, c)
-		},
-	)
+	return func(c *gin.Context) {
+		BindRequest[request](
+			c,
+			func(itr request) {
+				service.ValidateToolCredentials(&itr, c, config.PluginMaxExecutionTimeout)
+			},
+		)
+	}
 }

+ 10 - 10
internal/server/http_server.go

@@ -46,16 +46,16 @@ func (app *App) pluginInvokeGroup(group *gin.RouterGroup, config *app.Config) {
 	group.Use(app.RedirectPluginInvoke())
 	group.Use(app.InitClusterID())
 
-	group.POST("/tool/invoke", controllers.InvokeTool)
-	group.POST("/tool/validate_credentials", controllers.ValidateToolCredentials)
-	group.POST("/llm/invoke", controllers.InvokeLLM)
-	group.POST("/text_embedding/invoke", controllers.InvokeTextEmbedding)
-	group.POST("/rerank/invoke", controllers.InvokeRerank)
-	group.POST("/tts/invoke", controllers.InvokeTTS)
-	group.POST("/speech2text/invoke", controllers.InvokeSpeech2Text)
-	group.POST("/moderation/invoke", controllers.InvokeModeration)
-	group.POST("/model/validate_provider_credentials", controllers.ValidateProviderCredentials)
-	group.POST("/model/validate_model_credentials", controllers.ValidateModelCredentials)
+	group.POST("/tool/invoke", controllers.InvokeTool(config))
+	group.POST("/tool/validate_credentials", controllers.ValidateToolCredentials(config))
+	group.POST("/llm/invoke", controllers.InvokeLLM(config))
+	group.POST("/text_embedding/invoke", controllers.InvokeTextEmbedding(config))
+	group.POST("/rerank/invoke", controllers.InvokeRerank(config))
+	group.POST("/tts/invoke", controllers.InvokeTTS(config))
+	group.POST("/speech2text/invoke", controllers.InvokeSpeech2Text(config))
+	group.POST("/moderation/invoke", controllers.InvokeModeration(config))
+	group.POST("/model/validate_provider_credentials", controllers.ValidateProviderCredentials(config))
+	group.POST("/model/validate_model_credentials", controllers.ValidateModelCredentials(config))
 }
 
 func (app *App) remoteDebuggingGroup(group *gin.RouterGroup, config *app.Config) {

+ 1 - 15
internal/service/aws_transaction.go

@@ -5,7 +5,6 @@ import (
 
 	"github.com/gin-gonic/gin"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/backwards_invocation/transaction"
-	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager/aws_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
 )
 
@@ -19,19 +18,6 @@ func HandleAWSPluginTransaction(handler *transaction.AWSTransactionHandler) gin.
 			return
 		}
 
-		// get runtime from the session
-		runtime := session.Runtime()
-		if runtime == nil {
-			c.JSON(http.StatusBadRequest, gin.H{"error": "runtime not found"})
-			return
-		}
-
-		aws_runtime, ok := runtime.(*aws_manager.AWSPluginRuntime)
-		if !ok {
-			c.JSON(http.StatusBadRequest, gin.H{"error": "runtime is not aws plugin runtime"})
-			return
-		}
-
-		handler.Handle(c, session_id, aws_runtime)
+		handler.Handle(c, session_id)
 	}
 }

+ 37 - 10
internal/service/invoke_model.go

@@ -3,28 +3,55 @@ package service
 import (
 	"github.com/gin-gonic/gin"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/tool_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
 )
 
-func InvokeTool(r *plugin_entities.InvokePluginRequest[requests.RequestInvokeTool], ctx *gin.Context) {
+func InvokeTool(
+	r *plugin_entities.InvokePluginRequest[requests.RequestInvokeTool],
+	ctx *gin.Context,
+	max_timeout_seconds int,
+) {
 	// create session
-	session := createSession(r, ctx.GetString("cluster_id"))
+	session := createSession(
+		r,
+		access_types.PLUGIN_ACCESS_TYPE_TOOL,
+		access_types.PLUGIN_ACCESS_ACTION_INVOKE_TOOL,
+		ctx.GetString("cluster_id"),
+	)
 	defer session.Close()
 
-	baseSSEService(r, func() (*stream.StreamResponse[tool_entities.ToolResponseChunk], error) {
-		return plugin_daemon.InvokeTool(session, &r.Data)
-	}, ctx)
+	baseSSEService(
+		func() (*stream.StreamResponse[tool_entities.ToolResponseChunk], error) {
+			return plugin_daemon.InvokeTool(session, &r.Data)
+		},
+		ctx,
+		max_timeout_seconds,
+	)
 }
 
-func ValidateToolCredentials(r *plugin_entities.InvokePluginRequest[requests.RequestValidateToolCredentials], ctx *gin.Context) {
+func ValidateToolCredentials(
+	r *plugin_entities.InvokePluginRequest[requests.RequestValidateToolCredentials],
+	ctx *gin.Context,
+	max_timeout_seconds int,
+) {
 	// create session
-	session := createSession(r, ctx.GetString("cluster_id"))
+	session := createSession(
+		r,
+		access_types.PLUGIN_ACCESS_TYPE_TOOL,
+		access_types.PLUGIN_ACCESS_ACTION_VALIDATE_TOOL_CREDENTIALS,
+		ctx.GetString("cluster_id"),
+	)
 	defer session.Close()
 
-	baseSSEService(r, func() (*stream.StreamResponse[tool_entities.ValidateCredentialsResult], error) {
-		return plugin_daemon.ValidateToolCredentials(session, &r.Data)
-	}, ctx)
+	baseSSEService(
+		func() (*stream.StreamResponse[tool_entities.ValidateCredentialsResult], error) {
+			return plugin_daemon.ValidateToolCredentials(session, &r.Data)
+		},
+		ctx,
+		max_timeout_seconds,
+	)
 }

+ 163 - 43
internal/service/invoke_tool.go

@@ -3,6 +3,7 @@ package service
 import (
 	"github.com/gin-gonic/gin"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/model_entities"
@@ -12,89 +13,208 @@ import (
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
 )
 
-func createSession[T any](r *plugin_entities.InvokePluginRequest[T], cluster_id string) *session_manager.Session {
-	session := session_manager.NewSession(r.TenantId, r.UserId, parser.MarshalPluginIdentity(r.PluginName, r.PluginVersion), cluster_id)
-	runtime := plugin_manager.GetGlobalPluginManager().Get(session.PluginIdentity())
+func createSession[T any](
+	r *plugin_entities.InvokePluginRequest[T],
+	access_type access_types.PluginAccessType,
+	access_action access_types.PluginAccessAction,
+	cluster_id string,
+) *session_manager.Session {
+	plugin_identity := parser.MarshalPluginIdentity(r.PluginName, r.PluginVersion)
+	runtime := plugin_manager.GetGlobalPluginManager().Get(plugin_identity)
+
+	session := session_manager.NewSession(
+		r.TenantId,
+		r.UserId,
+		parser.MarshalPluginIdentity(r.PluginName, r.PluginVersion),
+		cluster_id,
+		access_type,
+		access_action,
+		runtime.Configuration(),
+	)
+
 	session.BindRuntime(runtime)
 	return session
 }
 
-func InvokeLLM(r *plugin_entities.InvokePluginRequest[requests.RequestInvokeLLM], ctx *gin.Context) {
+func InvokeLLM(
+	r *plugin_entities.InvokePluginRequest[requests.RequestInvokeLLM],
+	ctx *gin.Context,
+	max_timeout_seconds int,
+) {
 	// create session
-	session := createSession(r, ctx.GetString("cluster_id"))
+	session := createSession(
+		r,
+		access_types.PLUGIN_ACCESS_TYPE_MODEL,
+		access_types.PLUGIN_ACCESS_ACTION_INVOKE_LLM,
+		ctx.GetString("cluster_id"),
+	)
 	defer session.Close()
 
-	baseSSEService(r, func() (*stream.StreamResponse[model_entities.LLMResultChunk], error) {
-		return plugin_daemon.InvokeLLM(session, &r.Data)
-	}, ctx)
+	baseSSEService(
+		func() (*stream.StreamResponse[model_entities.LLMResultChunk], error) {
+			return plugin_daemon.InvokeLLM(session, &r.Data)
+		},
+		ctx,
+		max_timeout_seconds,
+	)
 }
 
-func InvokeTextEmbedding(r *plugin_entities.InvokePluginRequest[requests.RequestInvokeTextEmbedding], ctx *gin.Context) {
+func InvokeTextEmbedding(
+	r *plugin_entities.InvokePluginRequest[requests.RequestInvokeTextEmbedding],
+	ctx *gin.Context,
+	max_timeout_seconds int,
+) {
 	// create session
-	session := createSession(r, ctx.GetString("cluster_id"))
+	session := createSession(
+		r,
+		access_types.PLUGIN_ACCESS_TYPE_MODEL,
+		access_types.PLUGIN_ACCESS_ACTION_INVOKE_TEXT_EMBEDDING,
+		ctx.GetString("cluster_id"))
 	defer session.Close()
 
-	baseSSEService(r, func() (*stream.StreamResponse[model_entities.TextEmbeddingResult], error) {
-		return plugin_daemon.InvokeTextEmbedding(session, &r.Data)
-	}, ctx)
+	baseSSEService(
+		func() (*stream.StreamResponse[model_entities.TextEmbeddingResult], error) {
+			return plugin_daemon.InvokeTextEmbedding(session, &r.Data)
+		},
+		ctx,
+		max_timeout_seconds,
+	)
 }
 
-func InvokeRerank(r *plugin_entities.InvokePluginRequest[requests.RequestInvokeRerank], ctx *gin.Context) {
+func InvokeRerank(
+	r *plugin_entities.InvokePluginRequest[requests.RequestInvokeRerank],
+	ctx *gin.Context,
+	max_timeout_seconds int,
+) {
 	// create session
-	session := createSession(r, ctx.GetString("cluster_id"))
+	session := createSession(
+		r,
+		access_types.PLUGIN_ACCESS_TYPE_MODEL,
+		access_types.PLUGIN_ACCESS_ACTION_INVOKE_RERANK,
+		ctx.GetString("cluster_id"),
+	)
 	defer session.Close()
 
-	baseSSEService(r, func() (*stream.StreamResponse[model_entities.RerankResult], error) {
-		return plugin_daemon.InvokeRerank(session, &r.Data)
-	}, ctx)
+	baseSSEService(
+		func() (*stream.StreamResponse[model_entities.RerankResult], error) {
+			return plugin_daemon.InvokeRerank(session, &r.Data)
+		},
+		ctx,
+		max_timeout_seconds,
+	)
 }
 
-func InvokeTTS(r *plugin_entities.InvokePluginRequest[requests.RequestInvokeTTS], ctx *gin.Context) {
+func InvokeTTS(
+	r *plugin_entities.InvokePluginRequest[requests.RequestInvokeTTS],
+	ctx *gin.Context,
+	max_timeout_seconds int,
+) {
 	// create session
-	session := createSession(r, ctx.GetString("cluster_id"))
+	session := createSession(
+		r,
+		access_types.PLUGIN_ACCESS_TYPE_MODEL,
+		access_types.PLUGIN_ACCESS_ACTION_INVOKE_TTS,
+		ctx.GetString("cluster_id"),
+	)
 	defer session.Close()
 
-	baseSSEService(r, func() (*stream.StreamResponse[model_entities.TTSResult], error) {
-		return plugin_daemon.InvokeTTS(session, &r.Data)
-	}, ctx)
+	baseSSEService(
+		func() (*stream.StreamResponse[model_entities.TTSResult], error) {
+			return plugin_daemon.InvokeTTS(session, &r.Data)
+		},
+		ctx,
+		max_timeout_seconds,
+	)
 }
 
-func InvokeSpeech2Text(r *plugin_entities.InvokePluginRequest[requests.RequestInvokeSpeech2Text], ctx *gin.Context) {
+func InvokeSpeech2Text(
+	r *plugin_entities.InvokePluginRequest[requests.RequestInvokeSpeech2Text],
+	ctx *gin.Context,
+	max_timeout_seconds int,
+) {
 	// create session
-	session := createSession(r, ctx.GetString("cluster_id"))
+	session := createSession(
+		r,
+		access_types.PLUGIN_ACCESS_TYPE_MODEL,
+		access_types.PLUGIN_ACCESS_ACTION_INVOKE_SPEECH2TEXT,
+		ctx.GetString("cluster_id"),
+	)
 	defer session.Close()
 
-	baseSSEService(r, func() (*stream.StreamResponse[model_entities.Speech2TextResult], error) {
-		return plugin_daemon.InvokeSpeech2Text(session, &r.Data)
-	}, ctx)
+	baseSSEService(
+		func() (*stream.StreamResponse[model_entities.Speech2TextResult], error) {
+			return plugin_daemon.InvokeSpeech2Text(session, &r.Data)
+		},
+		ctx,
+		max_timeout_seconds,
+	)
 }
 
-func InvokeModeration(r *plugin_entities.InvokePluginRequest[requests.RequestInvokeModeration], ctx *gin.Context) {
+func InvokeModeration(
+	r *plugin_entities.InvokePluginRequest[requests.RequestInvokeModeration],
+	ctx *gin.Context,
+	max_timeout_seconds int,
+) {
 	// create session
-	session := createSession(r, ctx.GetString("cluster_id"))
+	session := createSession(
+		r,
+		access_types.PLUGIN_ACCESS_TYPE_MODEL,
+		access_types.PLUGIN_ACCESS_ACTION_INVOKE_MODERATION,
+		ctx.GetString("cluster_id"),
+	)
 	defer session.Close()
 
-	baseSSEService(r, func() (*stream.StreamResponse[model_entities.ModerationResult], error) {
-		return plugin_daemon.InvokeModeration(session, &r.Data)
-	}, ctx)
+	baseSSEService(
+		func() (*stream.StreamResponse[model_entities.ModerationResult], error) {
+			return plugin_daemon.InvokeModeration(session, &r.Data)
+		},
+		ctx,
+		max_timeout_seconds,
+	)
 }
 
-func ValidateProviderCredentials(r *plugin_entities.InvokePluginRequest[requests.RequestValidateProviderCredentials], ctx *gin.Context) {
+func ValidateProviderCredentials(
+	r *plugin_entities.InvokePluginRequest[requests.RequestValidateProviderCredentials],
+	ctx *gin.Context,
+	max_timeout_seconds int,
+) {
 	// create session
-	session := createSession(r, ctx.GetString("cluster_id"))
+	session := createSession(
+		r,
+		access_types.PLUGIN_ACCESS_TYPE_MODEL,
+		access_types.PLUGIN_ACCESS_ACTION_VALIDATE_PROVIDER_CREDENTIALS,
+		ctx.GetString("cluster_id"),
+	)
 	defer session.Close()
 
-	baseSSEService(r, func() (*stream.StreamResponse[model_entities.ValidateCredentialsResult], error) {
-		return plugin_daemon.ValidateProviderCredentials(session, &r.Data)
-	}, ctx)
+	baseSSEService(
+		func() (*stream.StreamResponse[model_entities.ValidateCredentialsResult], error) {
+			return plugin_daemon.ValidateProviderCredentials(session, &r.Data)
+		},
+		ctx,
+		max_timeout_seconds,
+	)
 }
 
-func ValidateModelCredentials(r *plugin_entities.InvokePluginRequest[requests.RequestValidateModelCredentials], ctx *gin.Context) {
+func ValidateModelCredentials(
+	r *plugin_entities.InvokePluginRequest[requests.RequestValidateModelCredentials],
+	ctx *gin.Context,
+	max_timeout_seconds int,
+) {
 	// create session
-	session := createSession(r, ctx.GetString("cluster_id"))
+	session := createSession(
+		r,
+		access_types.PLUGIN_ACCESS_TYPE_MODEL,
+		access_types.PLUGIN_ACCESS_ACTION_VALIDATE_MODEL_CREDENTIALS,
+		ctx.GetString("cluster_id"),
+	)
 	defer session.Close()
 
-	baseSSEService(r, func() (*stream.StreamResponse[model_entities.ValidateCredentialsResult], error) {
-		return plugin_daemon.ValidateModelCredentials(session, &r.Data)
-	}, ctx)
+	baseSSEService(
+		func() (*stream.StreamResponse[model_entities.ValidateCredentialsResult], error) {
+			return plugin_daemon.ValidateModelCredentials(session, &r.Data)
+		},
+		ctx,
+		max_timeout_seconds,
+	)
 }

+ 17 - 23
internal/service/runner.go

@@ -6,16 +6,17 @@ import (
 
 	"github.com/gin-gonic/gin"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities"
-	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/plugin_entities"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/parser"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/routine"
 	"github.com/langgenius/dify-plugin-daemon/internal/utils/stream"
 )
 
-func baseSSEService[T any, R any](
-	r *plugin_entities.InvokePluginRequest[T],
+// baseSSEService is a helper function to handle SSE service
+// it accepts a generator function that returns a stream response to gin context
+func baseSSEService[R any](
 	generator func() (*stream.StreamResponse[R], error),
 	ctx *gin.Context,
+	max_timeout_seconds int,
 ) {
 	writer := ctx.Writer
 	writer.WriteHeader(200)
@@ -36,7 +37,6 @@ func baseSSEService[T any, R any](
 	}
 
 	plugin_daemon_response, err := generator()
-	last_response_at := time.Now()
 
 	if err != nil {
 		write_data(entities.NewErrorResponse(-500, err.Error()))
@@ -46,7 +46,6 @@ func baseSSEService[T any, R any](
 
 	routine.Submit(func() {
 		for plugin_daemon_response.Next() {
-			last_response_at = time.Now()
 			chunk, err := plugin_daemon_response.Read()
 			if err != nil {
 				write_data(entities.NewErrorResponse(-500, err.Error()))
@@ -60,29 +59,24 @@ func baseSSEService[T any, R any](
 		}
 	})
 
-	ticker := time.NewTicker(15 * time.Second)
-	defer ticker.Stop()
+	timer := time.NewTimer(time.Duration(max_timeout_seconds) * time.Second)
+	defer timer.Stop()
 
 	defer func() {
 		atomic.StoreInt32(closed, 1)
 	}()
 
-	for {
-		select {
-		case <-writer.CloseNotify():
-			plugin_daemon_response.Close()
-			return
-		case <-done:
-			return
-		case <-ticker.C:
-			if time.Since(last_response_at) > 30*time.Second {
-				write_data(entities.NewErrorResponse(-500, "killed by timeout"))
-				if atomic.CompareAndSwapInt32(done_closed, 0, 1) {
-					close(done)
-				}
-				return
-			}
+	select {
+	case <-writer.CloseNotify():
+		plugin_daemon_response.Close()
+		return
+	case <-done:
+		return
+	case <-timer.C:
+		write_data(entities.NewErrorResponse(-500, "killed by timeout"))
+		if atomic.CompareAndSwapInt32(done_closed, 0, 1) {
+			close(done)
 		}
+		return
 	}
-
 }

+ 10 - 1
internal/service/webhook.go

@@ -9,6 +9,7 @@ import (
 
 	"github.com/gin-gonic/gin"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon"
+	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_daemon/access_types"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/plugin_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/core/session_manager"
 	"github.com/langgenius/dify-plugin-daemon/internal/types/entities/requests"
@@ -35,7 +36,15 @@ func Webhook(ctx *gin.Context, webhook *models.Webhook, path string) {
 		return
 	}
 
-	session := session_manager.NewSession(webhook.TenantID, "", webhook.PluginID, ctx.GetString("cluster_id"))
+	session := session_manager.NewSession(
+		webhook.TenantID,
+		"",
+		webhook.PluginID,
+		ctx.GetString("cluster_id"),
+		access_types.PLUGIN_ACCESS_TYPE_WEBHOOK,
+		access_types.PLUGIN_ACCESS_ACTION_WEBHOOK,
+		runtime.Configuration(),
+	)
 	defer session.Close()
 
 	session.BindRuntime(runtime)

+ 2 - 0
internal/types/app/config.go

@@ -24,6 +24,8 @@ type Config struct {
 	PluginWorkingPath  string `envconfig:"PLUGIN_WORKING_PATH"`
 	ProcessCachingPath string `envconfig:"PROCESS_CACHING_PATH"`
 
+	PluginMaxExecutionTimeout int `envconfig:"PLUGIN_MAX_EXECUTION_TIMEOUT" validate:"required"`
+
 	Platform PlatformType `envconfig:"PLATFORM" validate:"required"`
 
 	RoutinePoolSize int `envconfig:"ROUTINE_POOL_SIZE" validate:"required"`

+ 2 - 7
internal/types/entities/plugin_entities/event.go

@@ -2,7 +2,6 @@ package plugin_entities
 
 import (
 	"encoding/json"
-	"io"
 )
 
 type PluginUniversalEvent struct {
@@ -27,12 +26,8 @@ type PluginLogEvent struct {
 }
 
 type SessionMessage struct {
-	Type        SESSION_MESSAGE_TYPE `json:"type"`
-	Data        json.RawMessage      `json:"data"`
-	RuntimeType PluginRuntimeType    `json:"runtime_type"`
-
-	// only used for aws event bridge, not used for stdio and tcp
-	SessionWriter io.WriteCloser `json:"-"`
+	Type SESSION_MESSAGE_TYPE `json:"type"`
+	Data json.RawMessage      `json:"data"`
 }
 
 type SESSION_MESSAGE_TYPE string