workflow_service.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363
  1. import json
  2. import time
  3. from collections.abc import Sequence
  4. from datetime import UTC, datetime
  5. from typing import Optional, cast
  6. from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager
  7. from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
  8. from core.model_runtime.utils.encoders import jsonable_encoder
  9. from core.variables import Variable
  10. from core.workflow.entities.node_entities import NodeRunResult
  11. from core.workflow.errors import WorkflowNodeRunFailedError
  12. from core.workflow.nodes import NodeType
  13. from core.workflow.nodes.base.entities import BaseNodeData
  14. from core.workflow.nodes.base.node import BaseNode
  15. from core.workflow.nodes.enums import ErrorStrategy
  16. from core.workflow.nodes.event import RunCompletedEvent
  17. from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING
  18. from core.workflow.workflow_entry import WorkflowEntry
  19. from events.app_event import app_draft_workflow_was_synced, app_published_workflow_was_updated
  20. from extensions.ext_database import db
  21. from models.account import Account
  22. from models.enums import CreatedByRole
  23. from models.model import App, AppMode
  24. from models.workflow import (
  25. Workflow,
  26. WorkflowNodeExecution,
  27. WorkflowNodeExecutionStatus,
  28. WorkflowNodeExecutionTriggeredFrom,
  29. WorkflowType,
  30. )
  31. from services.errors.app import WorkflowHashNotEqualError
  32. from services.workflow.workflow_converter import WorkflowConverter
  33. class WorkflowService:
  34. """
  35. Workflow Service
  36. """
  37. def get_draft_workflow(self, app_model: App) -> Optional[Workflow]:
  38. """
  39. Get draft workflow
  40. """
  41. # fetch draft workflow by app_model
  42. workflow = (
  43. db.session.query(Workflow)
  44. .filter(
  45. Workflow.tenant_id == app_model.tenant_id, Workflow.app_id == app_model.id, Workflow.version == "draft"
  46. )
  47. .first()
  48. )
  49. # return draft workflow
  50. return workflow
  51. def get_published_workflow(self, app_model: App) -> Optional[Workflow]:
  52. """
  53. Get published workflow
  54. """
  55. if not app_model.workflow_id:
  56. return None
  57. # fetch published workflow by workflow_id
  58. workflow = (
  59. db.session.query(Workflow)
  60. .filter(
  61. Workflow.tenant_id == app_model.tenant_id,
  62. Workflow.app_id == app_model.id,
  63. Workflow.id == app_model.workflow_id,
  64. )
  65. .first()
  66. )
  67. return workflow
  68. def sync_draft_workflow(
  69. self,
  70. *,
  71. app_model: App,
  72. graph: dict,
  73. features: dict,
  74. unique_hash: Optional[str],
  75. account: Account,
  76. environment_variables: Sequence[Variable],
  77. conversation_variables: Sequence[Variable],
  78. ) -> Workflow:
  79. """
  80. Sync draft workflow
  81. :raises WorkflowHashNotEqualError
  82. """
  83. # fetch draft workflow by app_model
  84. workflow = self.get_draft_workflow(app_model=app_model)
  85. if workflow and workflow.unique_hash != unique_hash:
  86. raise WorkflowHashNotEqualError()
  87. # validate features structure
  88. self.validate_features_structure(app_model=app_model, features=features)
  89. # create draft workflow if not found
  90. if not workflow:
  91. workflow = Workflow(
  92. tenant_id=app_model.tenant_id,
  93. app_id=app_model.id,
  94. type=WorkflowType.from_app_mode(app_model.mode).value,
  95. version="draft",
  96. graph=json.dumps(graph),
  97. features=json.dumps(features),
  98. created_by=account.id,
  99. environment_variables=environment_variables,
  100. conversation_variables=conversation_variables,
  101. )
  102. db.session.add(workflow)
  103. # update draft workflow if found
  104. else:
  105. workflow.graph = json.dumps(graph)
  106. workflow.features = json.dumps(features)
  107. workflow.updated_by = account.id
  108. workflow.updated_at = datetime.now(UTC).replace(tzinfo=None)
  109. workflow.environment_variables = environment_variables
  110. workflow.conversation_variables = conversation_variables
  111. # commit db session changes
  112. db.session.commit()
  113. # trigger app workflow events
  114. app_draft_workflow_was_synced.send(app_model, synced_draft_workflow=workflow)
  115. # return draft workflow
  116. return workflow
  117. def publish_workflow(self, app_model: App, account: Account, draft_workflow: Optional[Workflow] = None) -> Workflow:
  118. """
  119. Publish workflow from draft
  120. :param app_model: App instance
  121. :param account: Account instance
  122. :param draft_workflow: Workflow instance
  123. """
  124. if not draft_workflow:
  125. # fetch draft workflow by app_model
  126. draft_workflow = self.get_draft_workflow(app_model=app_model)
  127. if not draft_workflow:
  128. raise ValueError("No valid workflow found.")
  129. # create new workflow
  130. workflow = Workflow(
  131. tenant_id=app_model.tenant_id,
  132. app_id=app_model.id,
  133. type=draft_workflow.type,
  134. version=str(datetime.now(UTC).replace(tzinfo=None)),
  135. graph=draft_workflow.graph,
  136. features=draft_workflow.features,
  137. created_by=account.id,
  138. environment_variables=draft_workflow.environment_variables,
  139. conversation_variables=draft_workflow.conversation_variables,
  140. )
  141. # commit db session changes
  142. db.session.add(workflow)
  143. db.session.flush()
  144. db.session.commit()
  145. app_model.workflow_id = workflow.id
  146. db.session.commit()
  147. # trigger app workflow events
  148. app_published_workflow_was_updated.send(app_model, published_workflow=workflow)
  149. # return new workflow
  150. return workflow
  151. def get_default_block_configs(self) -> list[dict]:
  152. """
  153. Get default block configs
  154. """
  155. # return default block config
  156. default_block_configs = []
  157. for node_class_mapping in NODE_TYPE_CLASSES_MAPPING.values():
  158. node_class = node_class_mapping[LATEST_VERSION]
  159. default_config = node_class.get_default_config()
  160. if default_config:
  161. default_block_configs.append(default_config)
  162. return default_block_configs
  163. def get_default_block_config(self, node_type: str, filters: Optional[dict] = None) -> Optional[dict]:
  164. """
  165. Get default config of node.
  166. :param node_type: node type
  167. :param filters: filter by node config parameters.
  168. :return:
  169. """
  170. node_type_enum = NodeType(node_type)
  171. # return default block config
  172. if node_type_enum not in NODE_TYPE_CLASSES_MAPPING:
  173. return None
  174. node_class = NODE_TYPE_CLASSES_MAPPING[node_type_enum][LATEST_VERSION]
  175. default_config = node_class.get_default_config(filters=filters)
  176. if not default_config:
  177. return None
  178. return default_config
  179. def run_draft_workflow_node(
  180. self, app_model: App, node_id: str, user_inputs: dict, account: Account
  181. ) -> WorkflowNodeExecution:
  182. """
  183. Run draft workflow node
  184. """
  185. # fetch draft workflow by app_model
  186. draft_workflow = self.get_draft_workflow(app_model=app_model)
  187. if not draft_workflow:
  188. raise ValueError("Workflow not initialized")
  189. # run draft workflow node
  190. start_at = time.perf_counter()
  191. try:
  192. node_instance, generator = WorkflowEntry.single_step_run(
  193. workflow=draft_workflow,
  194. node_id=node_id,
  195. user_inputs=user_inputs,
  196. user_id=account.id,
  197. )
  198. node_instance = cast(BaseNode[BaseNodeData], node_instance)
  199. node_run_result: NodeRunResult | None = None
  200. for event in generator:
  201. if isinstance(event, RunCompletedEvent):
  202. node_run_result = event.run_result
  203. # sign output files
  204. node_run_result.outputs = WorkflowEntry.handle_special_values(node_run_result.outputs)
  205. break
  206. if not node_run_result:
  207. raise ValueError("Node run failed with no run result")
  208. # single step debug mode error handling return
  209. if node_run_result.status == WorkflowNodeExecutionStatus.FAILED and node_instance.should_continue_on_error:
  210. node_error_args = {
  211. "status": WorkflowNodeExecutionStatus.EXCEPTION,
  212. "error": node_run_result.error,
  213. "inputs": node_run_result.inputs,
  214. "metadata": {"error_strategy": node_instance.node_data.error_strategy},
  215. }
  216. if node_instance.node_data.error_strategy is ErrorStrategy.DEFAULT_VALUE:
  217. node_run_result = NodeRunResult(
  218. **node_error_args,
  219. outputs={
  220. **node_instance.node_data.default_value_dict,
  221. "error_message": node_run_result.error,
  222. "error_type": node_run_result.error_type,
  223. },
  224. )
  225. else:
  226. node_run_result = NodeRunResult(
  227. **node_error_args,
  228. outputs={
  229. "error_message": node_run_result.error,
  230. "error_type": node_run_result.error_type,
  231. },
  232. )
  233. run_succeeded = node_run_result.status in (
  234. WorkflowNodeExecutionStatus.SUCCEEDED,
  235. WorkflowNodeExecutionStatus.EXCEPTION,
  236. )
  237. error = node_run_result.error if not run_succeeded else None
  238. except WorkflowNodeRunFailedError as e:
  239. node_instance = e.node_instance
  240. run_succeeded = False
  241. node_run_result = None
  242. error = e.error
  243. workflow_node_execution = WorkflowNodeExecution()
  244. workflow_node_execution.tenant_id = app_model.tenant_id
  245. workflow_node_execution.app_id = app_model.id
  246. workflow_node_execution.workflow_id = draft_workflow.id
  247. workflow_node_execution.triggered_from = WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP.value
  248. workflow_node_execution.index = 1
  249. workflow_node_execution.node_id = node_id
  250. workflow_node_execution.node_type = node_instance.node_type
  251. workflow_node_execution.title = node_instance.node_data.title
  252. workflow_node_execution.elapsed_time = time.perf_counter() - start_at
  253. workflow_node_execution.created_by_role = CreatedByRole.ACCOUNT.value
  254. workflow_node_execution.created_by = account.id
  255. workflow_node_execution.created_at = datetime.now(UTC).replace(tzinfo=None)
  256. workflow_node_execution.finished_at = datetime.now(UTC).replace(tzinfo=None)
  257. if run_succeeded and node_run_result:
  258. # create workflow node execution
  259. inputs = WorkflowEntry.handle_special_values(node_run_result.inputs) if node_run_result.inputs else None
  260. process_data = (
  261. WorkflowEntry.handle_special_values(node_run_result.process_data)
  262. if node_run_result.process_data
  263. else None
  264. )
  265. outputs = WorkflowEntry.handle_special_values(node_run_result.outputs) if node_run_result.outputs else None
  266. workflow_node_execution.inputs = json.dumps(inputs)
  267. workflow_node_execution.process_data = json.dumps(process_data)
  268. workflow_node_execution.outputs = json.dumps(outputs)
  269. workflow_node_execution.execution_metadata = (
  270. json.dumps(jsonable_encoder(node_run_result.metadata)) if node_run_result.metadata else None
  271. )
  272. if node_run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED:
  273. workflow_node_execution.status = WorkflowNodeExecutionStatus.SUCCEEDED.value
  274. elif node_run_result.status == WorkflowNodeExecutionStatus.EXCEPTION:
  275. workflow_node_execution.status = WorkflowNodeExecutionStatus.EXCEPTION.value
  276. workflow_node_execution.error = node_run_result.error
  277. else:
  278. # create workflow node execution
  279. workflow_node_execution.status = WorkflowNodeExecutionStatus.FAILED.value
  280. workflow_node_execution.error = error
  281. db.session.add(workflow_node_execution)
  282. db.session.commit()
  283. return workflow_node_execution
  284. def convert_to_workflow(self, app_model: App, account: Account, args: dict) -> App:
  285. """
  286. Basic mode of chatbot app(expert mode) to workflow
  287. Completion App to Workflow App
  288. :param app_model: App instance
  289. :param account: Account instance
  290. :param args: dict
  291. :return:
  292. """
  293. # chatbot convert to workflow mode
  294. workflow_converter = WorkflowConverter()
  295. if app_model.mode not in {AppMode.CHAT.value, AppMode.COMPLETION.value}:
  296. raise ValueError(f"Current App mode: {app_model.mode} is not supported convert to workflow.")
  297. # convert to workflow
  298. new_app = workflow_converter.convert_to_workflow(
  299. app_model=app_model,
  300. account=account,
  301. name=args.get("name", "Default Name"),
  302. icon_type=args.get("icon_type", "emoji"),
  303. icon=args.get("icon", "🤖"),
  304. icon_background=args.get("icon_background", "#FFEAD5"),
  305. )
  306. return new_app
  307. def validate_features_structure(self, app_model: App, features: dict) -> dict:
  308. if app_model.mode == AppMode.ADVANCED_CHAT.value:
  309. return AdvancedChatAppConfigManager.config_validate(
  310. tenant_id=app_model.tenant_id, config=features, only_structure_validate=True
  311. )
  312. elif app_model.mode == AppMode.WORKFLOW.value:
  313. return WorkflowAppConfigManager.config_validate(
  314. tenant_id=app_model.tenant_id, config=features, only_structure_validate=True
  315. )
  316. else:
  317. raise ValueError(f"Invalid app mode: {app_model.mode}")