Browse Source

feat: Add WORKFLOW_MAX_EXECUTION_TIME env var (#4632)

majian 11 months ago
parent
commit
8c2ca60c8b
2 changed files with 22 additions and 13 deletions
  1. 12 7
      api/config.py
  2. 10 6
      api/core/workflow/workflow_engine_manager.py

+ 12 - 7
api/config.py

@@ -80,6 +80,8 @@ DEFAULTS = {
     'INNER_API': 'False',
     'ENTERPRISE_ENABLED': 'False',
     'INDEXING_MAX_SEGMENTATION_TOKENS_LENGTH': 1000,
+    'WORKFLOW_MAX_EXECUTION_STEPS': 50,
+    'WORKFLOW_MAX_EXECUTION_TIME': 600,
 }
 
 
@@ -216,12 +218,12 @@ class Config:
         self.AZURE_BLOB_ACCOUNT_KEY = get_env('AZURE_BLOB_ACCOUNT_KEY')
         self.AZURE_BLOB_CONTAINER_NAME = get_env('AZURE_BLOB_CONTAINER_NAME')
         self.AZURE_BLOB_ACCOUNT_URL = get_env('AZURE_BLOB_ACCOUNT_URL')
-        self.ALIYUN_OSS_BUCKET_NAME=get_env('ALIYUN_OSS_BUCKET_NAME')
-        self.ALIYUN_OSS_ACCESS_KEY=get_env('ALIYUN_OSS_ACCESS_KEY')
-        self.ALIYUN_OSS_SECRET_KEY=get_env('ALIYUN_OSS_SECRET_KEY')
-        self.ALIYUN_OSS_ENDPOINT=get_env('ALIYUN_OSS_ENDPOINT')
-        self.ALIYUN_OSS_REGION=get_env('ALIYUN_OSS_REGION')
-        self.ALIYUN_OSS_AUTH_VERSION=get_env('ALIYUN_OSS_AUTH_VERSION')
+        self.ALIYUN_OSS_BUCKET_NAME = get_env('ALIYUN_OSS_BUCKET_NAME')
+        self.ALIYUN_OSS_ACCESS_KEY = get_env('ALIYUN_OSS_ACCESS_KEY')
+        self.ALIYUN_OSS_SECRET_KEY = get_env('ALIYUN_OSS_SECRET_KEY')
+        self.ALIYUN_OSS_ENDPOINT = get_env('ALIYUN_OSS_ENDPOINT')
+        self.ALIYUN_OSS_REGION = get_env('ALIYUN_OSS_REGION')
+        self.ALIYUN_OSS_AUTH_VERSION = get_env('ALIYUN_OSS_AUTH_VERSION')
         self.GOOGLE_STORAGE_BUCKET_NAME = get_env('GOOGLE_STORAGE_BUCKET_NAME')
         self.GOOGLE_STORAGE_SERVICE_ACCOUNT_JSON_BASE64 = get_env('GOOGLE_STORAGE_SERVICE_ACCOUNT_JSON_BASE64')
 
@@ -286,7 +288,7 @@ class Config:
         self.SMTP_USERNAME = get_env('SMTP_USERNAME')
         self.SMTP_PASSWORD = get_env('SMTP_PASSWORD')
         self.SMTP_USE_TLS = get_bool_env('SMTP_USE_TLS')
-        
+
         # ------------------------
         # Workspace Configurations.
         # ------------------------
@@ -385,3 +387,6 @@ class Config:
         # Indexing Configurations.
         # ------------------------
         self.INDEXING_MAX_SEGMENTATION_TOKENS_LENGTH = get_env('INDEXING_MAX_SEGMENTATION_TOKENS_LENGTH')
+
+        self.WORKFLOW_MAX_EXECUTION_STEPS = get_env('WORKFLOW_MAX_EWORKFLOW_MAX_EXECUTION_STEPSXECUTION_STEPS')
+        self.WORKFLOW_MAX_EXECUTION_TIME = get_env('WORKFLOW_MAX_EXECUTION_TIME')

+ 10 - 6
api/core/workflow/workflow_engine_manager.py

@@ -2,6 +2,8 @@ import logging
 import time
 from typing import Optional, cast
 
+from flask import current_app
+
 from core.app.app_config.entities import FileExtraConfig
 from core.app.apps.base_app_queue_manager import GenerateTaskStoppedException
 from core.file.file_obj import FileTransferMethod, FileType, FileVar
@@ -128,6 +130,8 @@ class WorkflowEngineManager:
         try:
             predecessor_node = None
             has_entry_node = False
+            max_execution_steps = current_app.config.get("WORKFLOW_MAX_EXECUTION_STEPS")
+            max_execution_time = current_app.config.get("WORKFLOW_MAX_EXECUTION_TIME")
             while True:
                 # get next node, multiple target nodes in the future
                 next_node = self._get_next_node(
@@ -148,13 +152,13 @@ class WorkflowEngineManager:
 
                 has_entry_node = True
 
-                # max steps 50 reached
-                if len(workflow_run_state.workflow_nodes_and_results) > 50:
-                    raise ValueError('Max steps 50 reached.')
+                # max steps reached
+                if len(workflow_run_state.workflow_nodes_and_results) > max_execution_steps:
+                    raise ValueError('Max steps {} reached.'.format(max_execution_steps))
 
-                # or max execution time 10min reached
-                if self._is_timed_out(start_at=workflow_run_state.start_at, max_execution_time=600):
-                    raise ValueError('Max execution time 10min reached.')
+                # or max execution time reached
+                if self._is_timed_out(start_at=workflow_run_state.start_at, max_execution_time=max_execution_time):
+                    raise ValueError('Max execution time {}s reached.'.format(max_execution_time))
 
                 # run workflow, run multiple target nodes in the future
                 self._run_workflow_node(