workflow_service.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415
  1. import json
  2. import time
  3. from collections.abc import Callable, Generator, Sequence
  4. from datetime import UTC, datetime
  5. from typing import Any, Optional
  6. from uuid import uuid4
  7. from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager
  8. from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
  9. from core.model_runtime.utils.encoders import jsonable_encoder
  10. from core.variables import Variable
  11. from core.workflow.entities.node_entities import NodeRunResult
  12. from core.workflow.errors import WorkflowNodeRunFailedError
  13. from core.workflow.graph_engine.entities.event import InNodeEvent
  14. from core.workflow.nodes import NodeType
  15. from core.workflow.nodes.base.node import BaseNode
  16. from core.workflow.nodes.enums import ErrorStrategy
  17. from core.workflow.nodes.event import RunCompletedEvent
  18. from core.workflow.nodes.event.types import NodeEvent
  19. from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING
  20. from core.workflow.workflow_entry import WorkflowEntry
  21. from events.app_event import app_draft_workflow_was_synced, app_published_workflow_was_updated
  22. from extensions.ext_database import db
  23. from models.account import Account
  24. from models.enums import CreatedByRole
  25. from models.model import App, AppMode
  26. from models.workflow import (
  27. Workflow,
  28. WorkflowNodeExecution,
  29. WorkflowNodeExecutionStatus,
  30. WorkflowNodeExecutionTriggeredFrom,
  31. WorkflowType,
  32. )
  33. from services.errors.app import WorkflowHashNotEqualError
  34. from services.workflow.workflow_converter import WorkflowConverter
  35. class WorkflowService:
  36. """
  37. Workflow Service
  38. """
  39. def get_draft_workflow(self, app_model: App) -> Optional[Workflow]:
  40. """
  41. Get draft workflow
  42. """
  43. # fetch draft workflow by app_model
  44. workflow = (
  45. db.session.query(Workflow)
  46. .filter(
  47. Workflow.tenant_id == app_model.tenant_id, Workflow.app_id == app_model.id, Workflow.version == "draft"
  48. )
  49. .first()
  50. )
  51. # return draft workflow
  52. return workflow
  53. def get_published_workflow(self, app_model: App) -> Optional[Workflow]:
  54. """
  55. Get published workflow
  56. """
  57. if not app_model.workflow_id:
  58. return None
  59. # fetch published workflow by workflow_id
  60. workflow = (
  61. db.session.query(Workflow)
  62. .filter(
  63. Workflow.tenant_id == app_model.tenant_id,
  64. Workflow.app_id == app_model.id,
  65. Workflow.id == app_model.workflow_id,
  66. )
  67. .first()
  68. )
  69. return workflow
  70. def sync_draft_workflow(
  71. self,
  72. *,
  73. app_model: App,
  74. graph: dict,
  75. features: dict,
  76. unique_hash: Optional[str],
  77. account: Account,
  78. environment_variables: Sequence[Variable],
  79. conversation_variables: Sequence[Variable],
  80. ) -> Workflow:
  81. """
  82. Sync draft workflow
  83. :raises WorkflowHashNotEqualError
  84. """
  85. # fetch draft workflow by app_model
  86. workflow = self.get_draft_workflow(app_model=app_model)
  87. if workflow and workflow.unique_hash != unique_hash:
  88. raise WorkflowHashNotEqualError()
  89. # validate features structure
  90. self.validate_features_structure(app_model=app_model, features=features)
  91. # create draft workflow if not found
  92. if not workflow:
  93. workflow = Workflow(
  94. tenant_id=app_model.tenant_id,
  95. app_id=app_model.id,
  96. type=WorkflowType.from_app_mode(app_model.mode).value,
  97. version="draft",
  98. graph=json.dumps(graph),
  99. features=json.dumps(features),
  100. created_by=account.id,
  101. environment_variables=environment_variables,
  102. conversation_variables=conversation_variables,
  103. )
  104. db.session.add(workflow)
  105. # update draft workflow if found
  106. else:
  107. workflow.graph = json.dumps(graph)
  108. workflow.features = json.dumps(features)
  109. workflow.updated_by = account.id
  110. workflow.updated_at = datetime.now(UTC).replace(tzinfo=None)
  111. workflow.environment_variables = environment_variables
  112. workflow.conversation_variables = conversation_variables
  113. # commit db session changes
  114. db.session.commit()
  115. # trigger app workflow events
  116. app_draft_workflow_was_synced.send(app_model, synced_draft_workflow=workflow)
  117. # return draft workflow
  118. return workflow
  119. def publish_workflow(self, app_model: App, account: Account, draft_workflow: Optional[Workflow] = None) -> Workflow:
  120. """
  121. Publish workflow from draft
  122. :param app_model: App instance
  123. :param account: Account instance
  124. :param draft_workflow: Workflow instance
  125. """
  126. if not draft_workflow:
  127. # fetch draft workflow by app_model
  128. draft_workflow = self.get_draft_workflow(app_model=app_model)
  129. if not draft_workflow:
  130. raise ValueError("No valid workflow found.")
  131. # create new workflow
  132. workflow = Workflow(
  133. tenant_id=app_model.tenant_id,
  134. app_id=app_model.id,
  135. type=draft_workflow.type,
  136. version=str(datetime.now(UTC).replace(tzinfo=None)),
  137. graph=draft_workflow.graph,
  138. features=draft_workflow.features,
  139. created_by=account.id,
  140. environment_variables=draft_workflow.environment_variables,
  141. conversation_variables=draft_workflow.conversation_variables,
  142. )
  143. # commit db session changes
  144. db.session.add(workflow)
  145. db.session.flush()
  146. db.session.commit()
  147. app_model.workflow_id = workflow.id
  148. db.session.commit()
  149. # trigger app workflow events
  150. app_published_workflow_was_updated.send(app_model, published_workflow=workflow)
  151. # return new workflow
  152. return workflow
  153. def get_default_block_configs(self) -> list[dict]:
  154. """
  155. Get default block configs
  156. """
  157. # return default block config
  158. default_block_configs = []
  159. for node_class_mapping in NODE_TYPE_CLASSES_MAPPING.values():
  160. node_class = node_class_mapping[LATEST_VERSION]
  161. default_config = node_class.get_default_config()
  162. if default_config:
  163. default_block_configs.append(default_config)
  164. return default_block_configs
  165. def get_default_block_config(self, node_type: str, filters: Optional[dict] = None) -> Optional[dict]:
  166. """
  167. Get default config of node.
  168. :param node_type: node type
  169. :param filters: filter by node config parameters.
  170. :return:
  171. """
  172. node_type_enum = NodeType(node_type)
  173. # return default block config
  174. if node_type_enum not in NODE_TYPE_CLASSES_MAPPING:
  175. return None
  176. node_class = NODE_TYPE_CLASSES_MAPPING[node_type_enum][LATEST_VERSION]
  177. default_config = node_class.get_default_config(filters=filters)
  178. if not default_config:
  179. return None
  180. return default_config
  181. def run_draft_workflow_node(
  182. self, app_model: App, node_id: str, user_inputs: dict, account: Account
  183. ) -> WorkflowNodeExecution:
  184. """
  185. Run draft workflow node
  186. """
  187. # fetch draft workflow by app_model
  188. draft_workflow = self.get_draft_workflow(app_model=app_model)
  189. if not draft_workflow:
  190. raise ValueError("Workflow not initialized")
  191. # run draft workflow node
  192. start_at = time.perf_counter()
  193. workflow_node_execution = self._handle_node_run_result(
  194. getter=lambda: WorkflowEntry.single_step_run(
  195. workflow=draft_workflow,
  196. node_id=node_id,
  197. user_inputs=user_inputs,
  198. user_id=account.id,
  199. ),
  200. start_at=start_at,
  201. tenant_id=app_model.tenant_id,
  202. node_id=node_id,
  203. )
  204. workflow_node_execution.app_id = app_model.id
  205. workflow_node_execution.created_by = account.id
  206. workflow_node_execution.workflow_id = draft_workflow.id
  207. db.session.add(workflow_node_execution)
  208. db.session.commit()
  209. return workflow_node_execution
  210. def run_free_workflow_node(
  211. self, node_data: dict, tenant_id: str, user_id: str, node_id: str, user_inputs: dict[str, Any]
  212. ) -> WorkflowNodeExecution:
  213. """
  214. Run draft workflow node
  215. """
  216. # run draft workflow node
  217. start_at = time.perf_counter()
  218. workflow_node_execution = self._handle_node_run_result(
  219. getter=lambda: WorkflowEntry.run_free_node(
  220. node_id=node_id,
  221. node_data=node_data,
  222. tenant_id=tenant_id,
  223. user_id=user_id,
  224. user_inputs=user_inputs,
  225. ),
  226. start_at=start_at,
  227. tenant_id=tenant_id,
  228. node_id=node_id,
  229. )
  230. return workflow_node_execution
  231. def _handle_node_run_result(
  232. self,
  233. getter: Callable[[], tuple[BaseNode, Generator[NodeEvent | InNodeEvent, None, None]]],
  234. start_at: float,
  235. tenant_id: str,
  236. node_id: str,
  237. ):
  238. """
  239. Handle node run result
  240. :param getter: Callable[[], tuple[BaseNode, Generator[RunEvent | InNodeEvent, None, None]]]
  241. :param start_at: float
  242. :param tenant_id: str
  243. :param node_id: str
  244. """
  245. try:
  246. node_instance, generator = getter()
  247. node_run_result: NodeRunResult | None = None
  248. for event in generator:
  249. if isinstance(event, RunCompletedEvent):
  250. node_run_result = event.run_result
  251. # sign output files
  252. node_run_result.outputs = WorkflowEntry.handle_special_values(node_run_result.outputs)
  253. break
  254. if not node_run_result:
  255. raise ValueError("Node run failed with no run result")
  256. # single step debug mode error handling return
  257. if node_run_result.status == WorkflowNodeExecutionStatus.FAILED and node_instance.should_continue_on_error:
  258. node_error_args: dict[str, Any] = {
  259. "status": WorkflowNodeExecutionStatus.EXCEPTION,
  260. "error": node_run_result.error,
  261. "inputs": node_run_result.inputs,
  262. "metadata": {"error_strategy": node_instance.node_data.error_strategy},
  263. }
  264. if node_instance.node_data.error_strategy is ErrorStrategy.DEFAULT_VALUE:
  265. node_run_result = NodeRunResult(
  266. **node_error_args,
  267. outputs={
  268. **node_instance.node_data.default_value_dict,
  269. "error_message": node_run_result.error,
  270. "error_type": node_run_result.error_type,
  271. },
  272. )
  273. else:
  274. node_run_result = NodeRunResult(
  275. **node_error_args,
  276. outputs={
  277. "error_message": node_run_result.error,
  278. "error_type": node_run_result.error_type,
  279. },
  280. )
  281. run_succeeded = node_run_result.status in (
  282. WorkflowNodeExecutionStatus.SUCCEEDED,
  283. WorkflowNodeExecutionStatus.EXCEPTION,
  284. )
  285. error = node_run_result.error if not run_succeeded else None
  286. except WorkflowNodeRunFailedError as e:
  287. node_instance = e.node_instance
  288. run_succeeded = False
  289. node_run_result = None
  290. error = e.error
  291. workflow_node_execution = WorkflowNodeExecution()
  292. workflow_node_execution.id = str(uuid4())
  293. workflow_node_execution.tenant_id = tenant_id
  294. workflow_node_execution.triggered_from = WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP.value
  295. workflow_node_execution.index = 1
  296. workflow_node_execution.node_id = node_id
  297. workflow_node_execution.node_type = node_instance.node_type
  298. workflow_node_execution.title = node_instance.node_data.title
  299. workflow_node_execution.elapsed_time = time.perf_counter() - start_at
  300. workflow_node_execution.created_by_role = CreatedByRole.ACCOUNT.value
  301. workflow_node_execution.created_at = datetime.now(UTC).replace(tzinfo=None)
  302. workflow_node_execution.finished_at = datetime.now(UTC).replace(tzinfo=None)
  303. if run_succeeded and node_run_result:
  304. # create workflow node execution
  305. inputs = WorkflowEntry.handle_special_values(node_run_result.inputs) if node_run_result.inputs else None
  306. process_data = (
  307. WorkflowEntry.handle_special_values(node_run_result.process_data)
  308. if node_run_result.process_data
  309. else None
  310. )
  311. outputs = WorkflowEntry.handle_special_values(node_run_result.outputs) if node_run_result.outputs else None
  312. workflow_node_execution.inputs = json.dumps(inputs)
  313. workflow_node_execution.process_data = json.dumps(process_data)
  314. workflow_node_execution.outputs = json.dumps(outputs)
  315. workflow_node_execution.execution_metadata = (
  316. json.dumps(jsonable_encoder(node_run_result.metadata)) if node_run_result.metadata else None
  317. )
  318. if node_run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED:
  319. workflow_node_execution.status = WorkflowNodeExecutionStatus.SUCCEEDED.value
  320. elif node_run_result.status == WorkflowNodeExecutionStatus.EXCEPTION:
  321. workflow_node_execution.status = WorkflowNodeExecutionStatus.EXCEPTION.value
  322. workflow_node_execution.error = node_run_result.error
  323. else:
  324. # create workflow node execution
  325. workflow_node_execution.status = WorkflowNodeExecutionStatus.FAILED.value
  326. workflow_node_execution.error = error
  327. return workflow_node_execution
  328. def convert_to_workflow(self, app_model: App, account: Account, args: dict) -> App:
  329. """
  330. Basic mode of chatbot app(expert mode) to workflow
  331. Completion App to Workflow App
  332. :param app_model: App instance
  333. :param account: Account instance
  334. :param args: dict
  335. :return:
  336. """
  337. # chatbot convert to workflow mode
  338. workflow_converter = WorkflowConverter()
  339. if app_model.mode not in {AppMode.CHAT.value, AppMode.COMPLETION.value}:
  340. raise ValueError(f"Current App mode: {app_model.mode} is not supported convert to workflow.")
  341. # convert to workflow
  342. new_app: App = workflow_converter.convert_to_workflow(
  343. app_model=app_model,
  344. account=account,
  345. name=args.get("name", "Default Name"),
  346. icon_type=args.get("icon_type", "emoji"),
  347. icon=args.get("icon", "🤖"),
  348. icon_background=args.get("icon_background", "#FFEAD5"),
  349. )
  350. return new_app
  351. def validate_features_structure(self, app_model: App, features: dict) -> dict:
  352. if app_model.mode == AppMode.ADVANCED_CHAT.value:
  353. return AdvancedChatAppConfigManager.config_validate(
  354. tenant_id=app_model.tenant_id, config=features, only_structure_validate=True
  355. )
  356. elif app_model.mode == AppMode.WORKFLOW.value:
  357. return WorkflowAppConfigManager.config_validate(
  358. tenant_id=app_model.tenant_id, config=features, only_structure_validate=True
  359. )
  360. else:
  361. raise ValueError(f"Invalid app mode: {app_model.mode}")