浏览代码

Merge remote-tracking branch 'myfork/p3' into p2

非法操作 6 月之前
父节点
当前提交
d44eb84e5a

+ 3 - 0
.env.example

@@ -59,6 +59,9 @@ DIFY_PLUGIN_SERVERLESS_CONNECTOR_API_KEY=HeRFb6yrzAy5vUSlJWK2lUl36mpkaRycv4witbQ
 # otherwise, it should be /usr/bin/python3
 PYTHON_INTERPRETER_PATH=/Users/yeuoly/miniconda3/envs/dify-plugin-sdk/bin/python
 
+# python environment init timeout, if the python environment init process is not finished within this time, it will be killed
+PYTHON_ENV_INIT_TIMEOUT=120
+
 # pprof enabled, for debugging
 PPROF_ENABLED=false
 

+ 1 - 1
internal/core/plugin_manager/launcher.go

@@ -129,7 +129,7 @@ func (p *PluginManager) launchLocal(pluginUniqueIdentifier plugin_entities.Plugi
 		return nil, nil, nil, failed(err.Error())
 	}
 
-	localPluginRuntime := local_runtime.NewLocalPluginRuntime(p.pythonInterpreterPath, p.proxyHttp, p.proxyHttps)
+	localPluginRuntime := local_runtime.NewLocalPluginRuntime(p.pythonInterpreterPath, p.pythonEnvInitTimeout, p.proxyHttp, p.proxyHttps)
 	localPluginRuntime.PluginRuntime = plugin.runtime
 	localPluginRuntime.BasicChecksum = basic_runtime.BasicChecksum{
 		MediaTransport: basic_runtime.NewMediaTransport(p.mediaBucket),

+ 6 - 6
internal/core/plugin_manager/local_runtime/environment_python.go

@@ -103,7 +103,7 @@ func (p *LocalPluginRuntime) InitPythonEnvironment() error {
 		}
 	}()
 
-	var err_msg strings.Builder
+	var errMsg strings.Builder
 	var wg sync.WaitGroup
 	wg.Add(2)
 
@@ -137,14 +137,14 @@ func (p *LocalPluginRuntime) InitPythonEnvironment() error {
 			n, err := stderr.Read(buf)
 			if err != nil && err != os.ErrClosed {
 				lastActiveAt = time.Now()
-				err_msg.WriteString(string(buf[:n]))
+				errMsg.WriteString(string(buf[:n]))
 				break
 			} else if err == os.ErrClosed {
 				break
 			}
 
 			if n > 0 {
-				err_msg.WriteString(string(buf[:n]))
+				errMsg.WriteString(string(buf[:n]))
 				lastActiveAt = time.Now()
 			}
 		}
@@ -161,9 +161,9 @@ func (p *LocalPluginRuntime) InitPythonEnvironment() error {
 				break
 			}
 
-			if time.Since(lastActiveAt) > 60*time.Second {
+			if time.Since(lastActiveAt) > time.Duration(p.pythonEnvInitTimeout)*time.Second {
 				cmd.Process.Kill()
-				err_msg.WriteString("init process exited due to long time no activity")
+				errMsg.WriteString(fmt.Sprintf("init process exited due to no activity for %d seconds", p.pythonEnvInitTimeout))
 				break
 			}
 		}
@@ -172,7 +172,7 @@ func (p *LocalPluginRuntime) InitPythonEnvironment() error {
 	wg.Wait()
 
 	if err := cmd.Wait(); err != nil {
-		return fmt.Errorf("failed to install dependencies: %s, output: %s", err, err_msg.String())
+		return fmt.Errorf("failed to install dependencies: %s, output: %s", err, errMsg.String())
 	}
 
 	success = true

+ 15 - 5
internal/core/plugin_manager/local_runtime/run.go

@@ -112,15 +112,25 @@ func (r *LocalPluginRuntime) StartPlugin() error {
 
 	defer func() {
 		// wait for plugin to exit
-		err = e.Wait()
-		if err != nil {
+		originalErr := e.Wait()
+		if originalErr != nil {
 			// get stdio
 			var err error
 			if stdio != nil {
-				err = stdio.Error()
+				stdioErr := stdio.Error()
+				if stdioErr != nil {
+					err = errors.Join(originalErr, stdioErr)
+				} else {
+					err = originalErr
+				}
+			} else {
+				err = originalErr
+			}
+			if err != nil {
+				log.Error("plugin %s exited with error: %s", r.Config.Identity(), err.Error())
+			} else {
+				log.Error("plugin %s exited with unknown error", r.Config.Identity())
 			}
-			err = errors.Join(err, err)
-			log.Error("plugin %s exited with error: %s", r.Config.Identity(), err.Error())
 		}
 
 		r.gc()

+ 5 - 0
internal/core/plugin_manager/local_runtime/type.go

@@ -17,6 +17,9 @@ type LocalPluginRuntime struct {
 	// python interpreter path, currently only support python
 	pythonInterpreterPath string
 
+	// python env init timeout
+	pythonEnvInitTimeout int
+
 	// to create a new python virtual environment, we need a default python interpreter
 	// by using its venv module
 	defaultPythonInterpreterPath string
@@ -34,11 +37,13 @@ type LocalPluginRuntime struct {
 
 func NewLocalPluginRuntime(
 	pythonInterpreterPath string,
+	pythonEnvInitTimeout int,
 	proxyHttp string,
 	proxyHttps string,
 ) *LocalPluginRuntime {
 	return &LocalPluginRuntime{
 		defaultPythonInterpreterPath: pythonInterpreterPath,
+		pythonEnvInitTimeout:         pythonEnvInitTimeout,
 		proxyHttp:                    proxyHttp,
 		proxyHttps:                   proxyHttps,
 	}

+ 4 - 0
internal/core/plugin_manager/manager.go

@@ -56,6 +56,9 @@ type PluginManager struct {
 	// python interpreter path
 	pythonInterpreterPath string
 
+	// python env init timeout
+	pythonEnvInitTimeout int
+
 	// proxy settings
 	proxyHttp  string
 	proxyHttps string
@@ -95,6 +98,7 @@ func InitGlobalManager(oss oss.OSS, configuration *app.Config) *PluginManager {
 		localPluginLaunchingLock: lock.NewGranularityLock(),
 		maxLaunchingLock:         make(chan bool, 2), // by default, we allow 2 plugins launching at the same time
 		pythonInterpreterPath:    configuration.PythonInterpreterPath,
+		pythonEnvInitTimeout:     configuration.PythonEnvInitTimeout,
 		platform:                 configuration.Platform,
 		proxyHttp:                configuration.ProxyHttp,
 		proxyHttps:               configuration.ProxyHttps,

+ 6 - 0
internal/service/endpoint.go

@@ -39,7 +39,13 @@ func Endpoint(
 	}
 
 	req := ctx.Request.Clone(context.Background())
+	// get query params
+	queryParams := req.URL.Query()
+
+	// replace path with endpoint path
 	req.URL.Path = path
+	// set query params
+	req.URL.RawQuery = queryParams.Encode()
 
 	// read request body until complete, max 10MB
 	body, err := io.ReadAll(io.LimitReader(req.Body, 10*1024*1024))

+ 1 - 0
internal/types/app/config.go

@@ -82,6 +82,7 @@ type Config struct {
 	MaxAWSLambdaTransactionTimeout int   `envconfig:"MAX_AWS_LAMBDA_TRANSACTION_TIMEOUT"`
 
 	PythonInterpreterPath string `envconfig:"PYTHON_INTERPRETER_PATH"`
+	PythonEnvInitTimeout  int    `envconfig:"PYTHON_ENV_INIT_TIMEOUT" validate:"required"`
 
 	DisplayClusterLog bool `envconfig:"DISPLAY_CLUSTER_LOG"`
 

+ 1 - 0
internal/types/app/default.go

@@ -28,6 +28,7 @@ func (config *Config) SetDefault() {
 	setDefaultInt(&config.PersistenceStorageMaxSize, 100*1024*1024)
 	setDefaultString(&config.PluginPackageCachePath, "plugin_packages")
 	setDefaultString(&config.PythonInterpreterPath, "/usr/bin/python3")
+	setDefaultInt(&config.PythonEnvInitTimeout, 120)
 	setDefaultBoolPtr(&config.ForceVerifyingSignature, true)
 }
 

+ 6 - 0
pkg/entities/plugin_entities/model_declaration.go

@@ -256,6 +256,12 @@ var PARAMETER_RULE_TEMPLATE = map[DefaultParameterName]ModelParameterRule{
 }
 
 func (m *ModelParameterRule) TransformTemplate() error {
+	if m.Label == nil || m.Label.EnUS == "" {
+		m.Label = &I18nObject{
+			EnUS: m.Name,
+		}
+	}
+
 	// if use_template is not empty, transform to use default value
 	if m.UseTemplate != nil && *m.UseTemplate != "" {
 		// get the value of use_template