workflow_service.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413
  1. import json
  2. import time
  3. from collections.abc import Callable, Generator, Sequence
  4. from datetime import UTC, datetime
  5. from typing import Any, Optional
  6. from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager
  7. from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
  8. from core.model_runtime.utils.encoders import jsonable_encoder
  9. from core.variables import Variable
  10. from core.workflow.entities.node_entities import NodeRunResult
  11. from core.workflow.errors import WorkflowNodeRunFailedError
  12. from core.workflow.graph_engine.entities.event import InNodeEvent
  13. from core.workflow.nodes import NodeType
  14. from core.workflow.nodes.base.node import BaseNode
  15. from core.workflow.nodes.enums import ErrorStrategy
  16. from core.workflow.nodes.event import RunCompletedEvent
  17. from core.workflow.nodes.event.types import NodeEvent
  18. from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING
  19. from core.workflow.workflow_entry import WorkflowEntry
  20. from events.app_event import app_draft_workflow_was_synced, app_published_workflow_was_updated
  21. from extensions.ext_database import db
  22. from models.account import Account
  23. from models.enums import CreatedByRole
  24. from models.model import App, AppMode
  25. from models.workflow import (
  26. Workflow,
  27. WorkflowNodeExecution,
  28. WorkflowNodeExecutionStatus,
  29. WorkflowNodeExecutionTriggeredFrom,
  30. WorkflowType,
  31. )
  32. from services.errors.app import WorkflowHashNotEqualError
  33. from services.workflow.workflow_converter import WorkflowConverter
  34. class WorkflowService:
  35. """
  36. Workflow Service
  37. """
  38. def get_draft_workflow(self, app_model: App) -> Optional[Workflow]:
  39. """
  40. Get draft workflow
  41. """
  42. # fetch draft workflow by app_model
  43. workflow = (
  44. db.session.query(Workflow)
  45. .filter(
  46. Workflow.tenant_id == app_model.tenant_id, Workflow.app_id == app_model.id, Workflow.version == "draft"
  47. )
  48. .first()
  49. )
  50. # return draft workflow
  51. return workflow
  52. def get_published_workflow(self, app_model: App) -> Optional[Workflow]:
  53. """
  54. Get published workflow
  55. """
  56. if not app_model.workflow_id:
  57. return None
  58. # fetch published workflow by workflow_id
  59. workflow = (
  60. db.session.query(Workflow)
  61. .filter(
  62. Workflow.tenant_id == app_model.tenant_id,
  63. Workflow.app_id == app_model.id,
  64. Workflow.id == app_model.workflow_id,
  65. )
  66. .first()
  67. )
  68. return workflow
  69. def sync_draft_workflow(
  70. self,
  71. *,
  72. app_model: App,
  73. graph: dict,
  74. features: dict,
  75. unique_hash: Optional[str],
  76. account: Account,
  77. environment_variables: Sequence[Variable],
  78. conversation_variables: Sequence[Variable],
  79. ) -> Workflow:
  80. """
  81. Sync draft workflow
  82. :raises WorkflowHashNotEqualError
  83. """
  84. # fetch draft workflow by app_model
  85. workflow = self.get_draft_workflow(app_model=app_model)
  86. if workflow and workflow.unique_hash != unique_hash:
  87. raise WorkflowHashNotEqualError()
  88. # validate features structure
  89. self.validate_features_structure(app_model=app_model, features=features)
  90. # create draft workflow if not found
  91. if not workflow:
  92. workflow = Workflow(
  93. tenant_id=app_model.tenant_id,
  94. app_id=app_model.id,
  95. type=WorkflowType.from_app_mode(app_model.mode).value,
  96. version="draft",
  97. graph=json.dumps(graph),
  98. features=json.dumps(features),
  99. created_by=account.id,
  100. environment_variables=environment_variables,
  101. conversation_variables=conversation_variables,
  102. )
  103. db.session.add(workflow)
  104. # update draft workflow if found
  105. else:
  106. workflow.graph = json.dumps(graph)
  107. workflow.features = json.dumps(features)
  108. workflow.updated_by = account.id
  109. workflow.updated_at = datetime.now(UTC).replace(tzinfo=None)
  110. workflow.environment_variables = environment_variables
  111. workflow.conversation_variables = conversation_variables
  112. # commit db session changes
  113. db.session.commit()
  114. # trigger app workflow events
  115. app_draft_workflow_was_synced.send(app_model, synced_draft_workflow=workflow)
  116. # return draft workflow
  117. return workflow
  118. def publish_workflow(self, app_model: App, account: Account, draft_workflow: Optional[Workflow] = None) -> Workflow:
  119. """
  120. Publish workflow from draft
  121. :param app_model: App instance
  122. :param account: Account instance
  123. :param draft_workflow: Workflow instance
  124. """
  125. if not draft_workflow:
  126. # fetch draft workflow by app_model
  127. draft_workflow = self.get_draft_workflow(app_model=app_model)
  128. if not draft_workflow:
  129. raise ValueError("No valid workflow found.")
  130. # create new workflow
  131. workflow = Workflow(
  132. tenant_id=app_model.tenant_id,
  133. app_id=app_model.id,
  134. type=draft_workflow.type,
  135. version=str(datetime.now(UTC).replace(tzinfo=None)),
  136. graph=draft_workflow.graph,
  137. features=draft_workflow.features,
  138. created_by=account.id,
  139. environment_variables=draft_workflow.environment_variables,
  140. conversation_variables=draft_workflow.conversation_variables,
  141. )
  142. # commit db session changes
  143. db.session.add(workflow)
  144. db.session.flush()
  145. db.session.commit()
  146. app_model.workflow_id = workflow.id
  147. db.session.commit()
  148. # trigger app workflow events
  149. app_published_workflow_was_updated.send(app_model, published_workflow=workflow)
  150. # return new workflow
  151. return workflow
  152. def get_default_block_configs(self) -> list[dict]:
  153. """
  154. Get default block configs
  155. """
  156. # return default block config
  157. default_block_configs = []
  158. for node_class_mapping in NODE_TYPE_CLASSES_MAPPING.values():
  159. node_class = node_class_mapping[LATEST_VERSION]
  160. default_config = node_class.get_default_config()
  161. if default_config:
  162. default_block_configs.append(default_config)
  163. return default_block_configs
  164. def get_default_block_config(self, node_type: str, filters: Optional[dict] = None) -> Optional[dict]:
  165. """
  166. Get default config of node.
  167. :param node_type: node type
  168. :param filters: filter by node config parameters.
  169. :return:
  170. """
  171. node_type_enum = NodeType(node_type)
  172. # return default block config
  173. if node_type_enum not in NODE_TYPE_CLASSES_MAPPING:
  174. return None
  175. node_class = NODE_TYPE_CLASSES_MAPPING[node_type_enum][LATEST_VERSION]
  176. default_config = node_class.get_default_config(filters=filters)
  177. if not default_config:
  178. return None
  179. return default_config
  180. def run_draft_workflow_node(
  181. self, app_model: App, node_id: str, user_inputs: dict, account: Account
  182. ) -> WorkflowNodeExecution:
  183. """
  184. Run draft workflow node
  185. """
  186. # fetch draft workflow by app_model
  187. draft_workflow = self.get_draft_workflow(app_model=app_model)
  188. if not draft_workflow:
  189. raise ValueError("Workflow not initialized")
  190. # run draft workflow node
  191. start_at = time.perf_counter()
  192. workflow_node_execution = self._handle_node_run_result(
  193. getter=lambda: WorkflowEntry.single_step_run(
  194. workflow=draft_workflow,
  195. node_id=node_id,
  196. user_inputs=user_inputs,
  197. user_id=account.id,
  198. ),
  199. start_at=start_at,
  200. tenant_id=app_model.tenant_id,
  201. node_id=node_id,
  202. )
  203. workflow_node_execution.app_id = app_model.id
  204. workflow_node_execution.created_by = account.id
  205. workflow_node_execution.workflow_id = draft_workflow.id
  206. db.session.add(workflow_node_execution)
  207. db.session.commit()
  208. return workflow_node_execution
  209. def run_free_workflow_node(
  210. self, node_data: dict, tenant_id: str, user_id: str, node_id: str, user_inputs: dict[str, Any]
  211. ) -> WorkflowNodeExecution:
  212. """
  213. Run draft workflow node
  214. """
  215. # run draft workflow node
  216. start_at = time.perf_counter()
  217. workflow_node_execution = self._handle_node_run_result(
  218. getter=lambda: WorkflowEntry.run_free_node(
  219. node_id=node_id,
  220. node_data=node_data,
  221. tenant_id=tenant_id,
  222. user_id=user_id,
  223. user_inputs=user_inputs,
  224. ),
  225. start_at=start_at,
  226. tenant_id=tenant_id,
  227. node_id=node_id,
  228. )
  229. return workflow_node_execution
  230. def _handle_node_run_result(
  231. self,
  232. getter: Callable[[], tuple[BaseNode, Generator[NodeEvent | InNodeEvent, None, None]]],
  233. start_at: float,
  234. tenant_id: str,
  235. node_id: str,
  236. ):
  237. """
  238. Handle node run result
  239. :param getter: Callable[[], tuple[BaseNode, Generator[RunEvent | InNodeEvent, None, None]]]
  240. :param start_at: float
  241. :param tenant_id: str
  242. :param node_id: str
  243. """
  244. try:
  245. node_instance, generator = getter()
  246. node_run_result: NodeRunResult | None = None
  247. for event in generator:
  248. if isinstance(event, RunCompletedEvent):
  249. node_run_result = event.run_result
  250. # sign output files
  251. node_run_result.outputs = WorkflowEntry.handle_special_values(node_run_result.outputs)
  252. break
  253. if not node_run_result:
  254. raise ValueError("Node run failed with no run result")
  255. # single step debug mode error handling return
  256. if node_run_result.status == WorkflowNodeExecutionStatus.FAILED and node_instance.should_continue_on_error:
  257. node_error_args = {
  258. "status": WorkflowNodeExecutionStatus.EXCEPTION,
  259. "error": node_run_result.error,
  260. "inputs": node_run_result.inputs,
  261. "metadata": {"error_strategy": node_instance.node_data.error_strategy},
  262. }
  263. if node_instance.node_data.error_strategy is ErrorStrategy.DEFAULT_VALUE:
  264. node_run_result = NodeRunResult(
  265. **node_error_args,
  266. outputs={
  267. **node_instance.node_data.default_value_dict,
  268. "error_message": node_run_result.error,
  269. "error_type": node_run_result.error_type,
  270. },
  271. )
  272. else:
  273. node_run_result = NodeRunResult(
  274. **node_error_args,
  275. outputs={
  276. "error_message": node_run_result.error,
  277. "error_type": node_run_result.error_type,
  278. },
  279. )
  280. run_succeeded = node_run_result.status in (
  281. WorkflowNodeExecutionStatus.SUCCEEDED,
  282. WorkflowNodeExecutionStatus.EXCEPTION,
  283. )
  284. error = node_run_result.error if not run_succeeded else None
  285. except WorkflowNodeRunFailedError as e:
  286. node_instance = e.node_instance
  287. run_succeeded = False
  288. node_run_result = None
  289. error = e.error
  290. workflow_node_execution = WorkflowNodeExecution()
  291. workflow_node_execution.tenant_id = tenant_id
  292. workflow_node_execution.triggered_from = WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP.value
  293. workflow_node_execution.index = 1
  294. workflow_node_execution.node_id = node_id
  295. workflow_node_execution.node_type = node_instance.node_type
  296. workflow_node_execution.title = node_instance.node_data.title
  297. workflow_node_execution.elapsed_time = time.perf_counter() - start_at
  298. workflow_node_execution.created_by_role = CreatedByRole.ACCOUNT.value
  299. workflow_node_execution.created_at = datetime.now(UTC).replace(tzinfo=None)
  300. workflow_node_execution.finished_at = datetime.now(UTC).replace(tzinfo=None)
  301. if run_succeeded and node_run_result:
  302. # create workflow node execution
  303. inputs = WorkflowEntry.handle_special_values(node_run_result.inputs) if node_run_result.inputs else None
  304. process_data = (
  305. WorkflowEntry.handle_special_values(node_run_result.process_data)
  306. if node_run_result.process_data
  307. else None
  308. )
  309. outputs = WorkflowEntry.handle_special_values(node_run_result.outputs) if node_run_result.outputs else None
  310. workflow_node_execution.inputs = json.dumps(inputs)
  311. workflow_node_execution.process_data = json.dumps(process_data)
  312. workflow_node_execution.outputs = json.dumps(outputs)
  313. workflow_node_execution.execution_metadata = (
  314. json.dumps(jsonable_encoder(node_run_result.metadata)) if node_run_result.metadata else None
  315. )
  316. if node_run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED:
  317. workflow_node_execution.status = WorkflowNodeExecutionStatus.SUCCEEDED.value
  318. elif node_run_result.status == WorkflowNodeExecutionStatus.EXCEPTION:
  319. workflow_node_execution.status = WorkflowNodeExecutionStatus.EXCEPTION.value
  320. workflow_node_execution.error = node_run_result.error
  321. else:
  322. # create workflow node execution
  323. workflow_node_execution.status = WorkflowNodeExecutionStatus.FAILED.value
  324. workflow_node_execution.error = error
  325. return workflow_node_execution
  326. def convert_to_workflow(self, app_model: App, account: Account, args: dict) -> App:
  327. """
  328. Basic mode of chatbot app(expert mode) to workflow
  329. Completion App to Workflow App
  330. :param app_model: App instance
  331. :param account: Account instance
  332. :param args: dict
  333. :return:
  334. """
  335. # chatbot convert to workflow mode
  336. workflow_converter = WorkflowConverter()
  337. if app_model.mode not in {AppMode.CHAT.value, AppMode.COMPLETION.value}:
  338. raise ValueError(f"Current App mode: {app_model.mode} is not supported convert to workflow.")
  339. # convert to workflow
  340. new_app = workflow_converter.convert_to_workflow(
  341. app_model=app_model,
  342. account=account,
  343. name=args.get("name", "Default Name"),
  344. icon_type=args.get("icon_type", "emoji"),
  345. icon=args.get("icon", "🤖"),
  346. icon_background=args.get("icon_background", "#FFEAD5"),
  347. )
  348. return new_app
  349. def validate_features_structure(self, app_model: App, features: dict) -> dict:
  350. if app_model.mode == AppMode.ADVANCED_CHAT.value:
  351. return AdvancedChatAppConfigManager.config_validate(
  352. tenant_id=app_model.tenant_id, config=features, only_structure_validate=True
  353. )
  354. elif app_model.mode == AppMode.WORKFLOW.value:
  355. return WorkflowAppConfigManager.config_validate(
  356. tenant_id=app_model.tenant_id, config=features, only_structure_validate=True
  357. )
  358. else:
  359. raise ValueError(f"Invalid app mode: {app_model.mode}")