Просмотр исходного кода

fix: correctly handle tool invoke message, add metadata

Yeuoly месяцев назад: 9
Родитель
Сommit
f55a3b2bf8

+ 9 - 0
internal/core/plugin_manager/lifetime.go

@@ -12,6 +12,15 @@ func (p *PluginManager) AddPluginRegisterHandler(handler func(r plugin_entities.
 }
 
 func (p *PluginManager) fullDuplexLifetime(r plugin_entities.PluginFullDuplexLifetime) {
+	identifier, err := r.Identity()
+	if err != nil {
+		log.Error("get plugin identity failed: %s", err.Error())
+		return
+	}
+
+	p.m.Store(identifier.String(), r)
+	defer p.m.Delete(identifier.String())
+
 	configuration := r.Configuration()
 
 	log.Info("new plugin logged in: %s", configuration.Identity())

+ 0 - 12
internal/core/plugin_manager/manager.go

@@ -76,18 +76,6 @@ func Manager() *PluginManager {
 	return manager
 }
 
-func (p *PluginManager) Add(
-	plugin plugin_entities.PluginLifetime,
-) error {
-	identity, err := plugin.Identity()
-	if err != nil {
-		return err
-	}
-
-	p.m.Store(identity.String(), plugin)
-	return nil
-}
-
 func (p *PluginManager) Get(
 	identity plugin_entities.PluginUniqueIdentifier,
 ) plugin_entities.PluginLifetime {

+ 4 - 3
internal/core/plugin_manager/remote_manager/hooks.go

@@ -2,6 +2,7 @@ package remote_manager
 
 import (
 	"encoding/hex"
+	"fmt"
 	"sync"
 	"sync/atomic"
 	"time"
@@ -266,7 +267,7 @@ func (s *DifyServer) onMessage(runtime *RemotePluginRuntime, message []byte) {
 		assets, err := parser.UnmarshalJsonBytes2Slice[plugin_entities.RemoteAssetPayload](message)
 		if err != nil {
 			log.Error("assets register failed, error: %v", err)
-			close_conn([]byte("assets register failed, invalid assets declaration\n"))
+			close_conn([]byte(fmt.Sprintf("assets register failed, invalid assets declaration: %v\n", err)))
 			return
 		}
 
@@ -275,7 +276,7 @@ func (s *DifyServer) onMessage(runtime *RemotePluginRuntime, message []byte) {
 			files[asset.Filename], err = hex.DecodeString(asset.Data)
 			if err != nil {
 				log.Error("assets decode failed, error: %v", err)
-				close_conn([]byte("assets decode failed, invalid assets data, cannot decode file\n"))
+				close_conn([]byte(fmt.Sprintf("assets decode failed, invalid assets data, cannot decode file: %v\n", err)))
 				return
 			}
 		}
@@ -283,7 +284,7 @@ func (s *DifyServer) onMessage(runtime *RemotePluginRuntime, message []byte) {
 		// remap assets
 		if err := runtime.RemapAssets(&runtime.Config, files); err != nil {
 			log.Error("assets remap failed, error: %v", err)
-			close_conn([]byte("assets remap failed, invalid assets data, cannot remap\n"))
+			close_conn([]byte(fmt.Sprintf("assets remap failed, invalid assets data, cannot remap: %v\n", err)))
 			return
 		}
 

+ 0 - 6
internal/core/plugin_manager/watcher.go

@@ -137,12 +137,6 @@ func (p *PluginManager) launchLocal(plugin_package_path string) (plugin_entities
 		return nil, failed(errors.Join(err, fmt.Errorf("remap plugin assets error")).Error())
 	}
 
-	// add plugin to manager
-	err = p.Add(local_plugin_runtime)
-	if err != nil {
-		return nil, failed(errors.Join(err, fmt.Errorf("add plugin to manager failed")).Error())
-	}
-
 	success = true
 
 	// local plugin

+ 6 - 5
internal/types/app/config.go

@@ -16,11 +16,12 @@ type Config struct {
 	DifyInnerApiKey string `envconfig:"DIFY_INNER_API_KEY" validate:"required"`
 
 	// plugin remote installing
-	PluginRemoteInstallingHost             string `envconfig:"PLUGIN_REMOTE_INSTALLING_HOST"`
-	PluginRemoteInstallingPort             uint16 `envconfig:"PLUGIN_REMOTE_INSTALLING_PORT"`
-	PluginRemoteInstallingEnabled          bool   `envconfig:"PLUGIN_REMOTE_INSTALLING_ENABLED"`
-	PluginRemoteInstallingMaxConn          int    `envconfig:"PLUGIN_REMOTE_INSTALLING_MAX_CONN"`
-	PluginRemoteInstallServerEventLoopNums int    `envconfig:"PLUGIN_REMOTE_INSTALL_SERVER_EVENT_LOOP_NUMS"`
+	PluginRemoteInstallingHost                string `envconfig:"PLUGIN_REMOTE_INSTALLING_HOST"`
+	PluginRemoteInstallingPort                uint16 `envconfig:"PLUGIN_REMOTE_INSTALLING_PORT"`
+	PluginRemoteInstallingEnabled             bool   `envconfig:"PLUGIN_REMOTE_INSTALLING_ENABLED"`
+	PluginRemoteInstallingMaxConn             int    `envconfig:"PLUGIN_REMOTE_INSTALLING_MAX_CONN"`
+	PluginRemoteInstallingMaxSingleTenantConn int    `envconfig:"PLUGIN_REMOTE_INSTALLING_MAX_SINGLE_TENANT_CONN"`
+	PluginRemoteInstallServerEventLoopNums    int    `envconfig:"PLUGIN_REMOTE_INSTALL_SERVER_EVENT_LOOP_NUMS"`
 
 	PluginEndpointEnabled bool `envconfig:"PLUGIN_ENDPOINT_ENABLED"`
 

+ 2 - 1
internal/types/app/default.go

@@ -10,11 +10,12 @@ func (config *Config) SetDefault() {
 	setDefaultInt(&config.LifetimeStateGCInterval, 300)
 	setDefaultInt(&config.DifyInvocationConnectionIdleTimeout, 120)
 	setDefaultInt(&config.PluginRemoteInstallServerEventLoopNums, 8)
-	setDefaultInt(&config.PluginRemoteInstallingMaxConn, 128)
+	setDefaultInt(&config.PluginRemoteInstallingMaxConn, 256)
 	setDefaultInt(&config.MaxPluginPackageSize, 52428800)
 	setDefaultInt(&config.MaxAWSLambdaTransactionTimeout, 150)
 	setDefaultInt(&config.PluginMaxExecutionTimeout, 240)
 	setDefaultInt(&config.PluginMediaCacheSize, 1024)
+	setDefaultInt(&config.PluginRemoteInstallingMaxSingleTenantConn, 5)
 	setDefaultBool(&config.PluginRemoteInstallingEnabled, true)
 	setDefaultBool(&config.PluginEndpointEnabled, true)
 	setDefaultString(&config.DBSslMode, "disable")

+ 1 - 0
internal/types/entities/tool_entities/tool.go

@@ -46,6 +46,7 @@ func init() {
 type ToolResponseChunk struct {
 	Type    ToolResponseChunkType `json:"type" validate:"required,is_valid_tool_response_chunk_type"`
 	Message map[string]any        `json:"message"`
+	Meta    map[string]any        `json:"meta"`
 }
 
 type GetToolRuntimeParametersResponse struct {