workflow_service.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439
  1. import json
  2. import time
  3. from collections.abc import Callable, Generator, Sequence
  4. from datetime import UTC, datetime
  5. from typing import Any, Optional
  6. from uuid import uuid4
  7. from sqlalchemy import desc
  8. from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager
  9. from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
  10. from core.model_runtime.utils.encoders import jsonable_encoder
  11. from core.variables import Variable
  12. from core.workflow.entities.node_entities import NodeRunResult
  13. from core.workflow.errors import WorkflowNodeRunFailedError
  14. from core.workflow.graph_engine.entities.event import InNodeEvent
  15. from core.workflow.nodes import NodeType
  16. from core.workflow.nodes.base.node import BaseNode
  17. from core.workflow.nodes.enums import ErrorStrategy
  18. from core.workflow.nodes.event import RunCompletedEvent
  19. from core.workflow.nodes.event.types import NodeEvent
  20. from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING
  21. from core.workflow.workflow_entry import WorkflowEntry
  22. from events.app_event import app_draft_workflow_was_synced, app_published_workflow_was_updated
  23. from extensions.ext_database import db
  24. from models.account import Account
  25. from models.enums import CreatedByRole
  26. from models.model import App, AppMode
  27. from models.workflow import (
  28. Workflow,
  29. WorkflowNodeExecution,
  30. WorkflowNodeExecutionStatus,
  31. WorkflowNodeExecutionTriggeredFrom,
  32. WorkflowType,
  33. )
  34. from services.errors.app import WorkflowHashNotEqualError
  35. from services.workflow.workflow_converter import WorkflowConverter
  36. class WorkflowService:
  37. """
  38. Workflow Service
  39. """
  40. def get_draft_workflow(self, app_model: App) -> Optional[Workflow]:
  41. """
  42. Get draft workflow
  43. """
  44. # fetch draft workflow by app_model
  45. workflow = (
  46. db.session.query(Workflow)
  47. .filter(
  48. Workflow.tenant_id == app_model.tenant_id, Workflow.app_id == app_model.id, Workflow.version == "draft"
  49. )
  50. .first()
  51. )
  52. # return draft workflow
  53. return workflow
  54. def get_published_workflow(self, app_model: App) -> Optional[Workflow]:
  55. """
  56. Get published workflow
  57. """
  58. if not app_model.workflow_id:
  59. return None
  60. # fetch published workflow by workflow_id
  61. workflow = (
  62. db.session.query(Workflow)
  63. .filter(
  64. Workflow.tenant_id == app_model.tenant_id,
  65. Workflow.app_id == app_model.id,
  66. Workflow.id == app_model.workflow_id,
  67. )
  68. .first()
  69. )
  70. return workflow
  71. def get_all_published_workflow(self, app_model: App, page: int, limit: int) -> tuple[list[Workflow], bool]:
  72. """
  73. Get published workflow with pagination
  74. """
  75. if not app_model.workflow_id:
  76. return [], False
  77. workflows = (
  78. db.session.query(Workflow)
  79. .filter(Workflow.app_id == app_model.id)
  80. .order_by(desc(Workflow.version))
  81. .offset((page - 1) * limit)
  82. .limit(limit + 1)
  83. .all()
  84. )
  85. has_more = len(workflows) > limit
  86. if has_more:
  87. workflows = workflows[:-1]
  88. return workflows, has_more
  89. def sync_draft_workflow(
  90. self,
  91. *,
  92. app_model: App,
  93. graph: dict,
  94. features: dict,
  95. unique_hash: Optional[str],
  96. account: Account,
  97. environment_variables: Sequence[Variable],
  98. conversation_variables: Sequence[Variable],
  99. ) -> Workflow:
  100. """
  101. Sync draft workflow
  102. :raises WorkflowHashNotEqualError
  103. """
  104. # fetch draft workflow by app_model
  105. workflow = self.get_draft_workflow(app_model=app_model)
  106. if workflow and workflow.unique_hash != unique_hash:
  107. raise WorkflowHashNotEqualError()
  108. # validate features structure
  109. self.validate_features_structure(app_model=app_model, features=features)
  110. # create draft workflow if not found
  111. if not workflow:
  112. workflow = Workflow(
  113. tenant_id=app_model.tenant_id,
  114. app_id=app_model.id,
  115. type=WorkflowType.from_app_mode(app_model.mode).value,
  116. version="draft",
  117. graph=json.dumps(graph),
  118. features=json.dumps(features),
  119. created_by=account.id,
  120. environment_variables=environment_variables,
  121. conversation_variables=conversation_variables,
  122. )
  123. db.session.add(workflow)
  124. # update draft workflow if found
  125. else:
  126. workflow.graph = json.dumps(graph)
  127. workflow.features = json.dumps(features)
  128. workflow.updated_by = account.id
  129. workflow.updated_at = datetime.now(UTC).replace(tzinfo=None)
  130. workflow.environment_variables = environment_variables
  131. workflow.conversation_variables = conversation_variables
  132. # commit db session changes
  133. db.session.commit()
  134. # trigger app workflow events
  135. app_draft_workflow_was_synced.send(app_model, synced_draft_workflow=workflow)
  136. # return draft workflow
  137. return workflow
  138. def publish_workflow(self, app_model: App, account: Account, draft_workflow: Optional[Workflow] = None) -> Workflow:
  139. """
  140. Publish workflow from draft
  141. :param app_model: App instance
  142. :param account: Account instance
  143. :param draft_workflow: Workflow instance
  144. """
  145. if not draft_workflow:
  146. # fetch draft workflow by app_model
  147. draft_workflow = self.get_draft_workflow(app_model=app_model)
  148. if not draft_workflow:
  149. raise ValueError("No valid workflow found.")
  150. # create new workflow
  151. workflow = Workflow(
  152. tenant_id=app_model.tenant_id,
  153. app_id=app_model.id,
  154. type=draft_workflow.type,
  155. version=str(datetime.now(UTC).replace(tzinfo=None)),
  156. graph=draft_workflow.graph,
  157. features=draft_workflow.features,
  158. created_by=account.id,
  159. environment_variables=draft_workflow.environment_variables,
  160. conversation_variables=draft_workflow.conversation_variables,
  161. )
  162. # commit db session changes
  163. db.session.add(workflow)
  164. db.session.flush()
  165. db.session.commit()
  166. app_model.workflow_id = workflow.id
  167. db.session.commit()
  168. # trigger app workflow events
  169. app_published_workflow_was_updated.send(app_model, published_workflow=workflow)
  170. # return new workflow
  171. return workflow
  172. def get_default_block_configs(self) -> list[dict]:
  173. """
  174. Get default block configs
  175. """
  176. # return default block config
  177. default_block_configs = []
  178. for node_class_mapping in NODE_TYPE_CLASSES_MAPPING.values():
  179. node_class = node_class_mapping[LATEST_VERSION]
  180. default_config = node_class.get_default_config()
  181. if default_config:
  182. default_block_configs.append(default_config)
  183. return default_block_configs
  184. def get_default_block_config(self, node_type: str, filters: Optional[dict] = None) -> Optional[dict]:
  185. """
  186. Get default config of node.
  187. :param node_type: node type
  188. :param filters: filter by node config parameters.
  189. :return:
  190. """
  191. node_type_enum = NodeType(node_type)
  192. # return default block config
  193. if node_type_enum not in NODE_TYPE_CLASSES_MAPPING:
  194. return None
  195. node_class = NODE_TYPE_CLASSES_MAPPING[node_type_enum][LATEST_VERSION]
  196. default_config = node_class.get_default_config(filters=filters)
  197. if not default_config:
  198. return None
  199. return default_config
  200. def run_draft_workflow_node(
  201. self, app_model: App, node_id: str, user_inputs: dict, account: Account
  202. ) -> WorkflowNodeExecution:
  203. """
  204. Run draft workflow node
  205. """
  206. # fetch draft workflow by app_model
  207. draft_workflow = self.get_draft_workflow(app_model=app_model)
  208. if not draft_workflow:
  209. raise ValueError("Workflow not initialized")
  210. # run draft workflow node
  211. start_at = time.perf_counter()
  212. workflow_node_execution = self._handle_node_run_result(
  213. getter=lambda: WorkflowEntry.single_step_run(
  214. workflow=draft_workflow,
  215. node_id=node_id,
  216. user_inputs=user_inputs,
  217. user_id=account.id,
  218. ),
  219. start_at=start_at,
  220. tenant_id=app_model.tenant_id,
  221. node_id=node_id,
  222. )
  223. workflow_node_execution.app_id = app_model.id
  224. workflow_node_execution.created_by = account.id
  225. workflow_node_execution.workflow_id = draft_workflow.id
  226. db.session.add(workflow_node_execution)
  227. db.session.commit()
  228. return workflow_node_execution
  229. def run_free_workflow_node(
  230. self, node_data: dict, tenant_id: str, user_id: str, node_id: str, user_inputs: dict[str, Any]
  231. ) -> WorkflowNodeExecution:
  232. """
  233. Run draft workflow node
  234. """
  235. # run draft workflow node
  236. start_at = time.perf_counter()
  237. workflow_node_execution = self._handle_node_run_result(
  238. getter=lambda: WorkflowEntry.run_free_node(
  239. node_id=node_id,
  240. node_data=node_data,
  241. tenant_id=tenant_id,
  242. user_id=user_id,
  243. user_inputs=user_inputs,
  244. ),
  245. start_at=start_at,
  246. tenant_id=tenant_id,
  247. node_id=node_id,
  248. )
  249. return workflow_node_execution
  250. def _handle_node_run_result(
  251. self,
  252. getter: Callable[[], tuple[BaseNode, Generator[NodeEvent | InNodeEvent, None, None]]],
  253. start_at: float,
  254. tenant_id: str,
  255. node_id: str,
  256. ) -> WorkflowNodeExecution:
  257. """
  258. Handle node run result
  259. :param getter: Callable[[], tuple[BaseNode, Generator[RunEvent | InNodeEvent, None, None]]]
  260. :param start_at: float
  261. :param tenant_id: str
  262. :param node_id: str
  263. """
  264. try:
  265. node_instance, generator = getter()
  266. node_run_result: NodeRunResult | None = None
  267. for event in generator:
  268. if isinstance(event, RunCompletedEvent):
  269. node_run_result = event.run_result
  270. # sign output files
  271. node_run_result.outputs = WorkflowEntry.handle_special_values(node_run_result.outputs)
  272. break
  273. if not node_run_result:
  274. raise ValueError("Node run failed with no run result")
  275. # single step debug mode error handling return
  276. if node_run_result.status == WorkflowNodeExecutionStatus.FAILED and node_instance.should_continue_on_error:
  277. node_error_args: dict[str, Any] = {
  278. "status": WorkflowNodeExecutionStatus.EXCEPTION,
  279. "error": node_run_result.error,
  280. "inputs": node_run_result.inputs,
  281. "metadata": {"error_strategy": node_instance.node_data.error_strategy},
  282. }
  283. if node_instance.node_data.error_strategy is ErrorStrategy.DEFAULT_VALUE:
  284. node_run_result = NodeRunResult(
  285. **node_error_args,
  286. outputs={
  287. **node_instance.node_data.default_value_dict,
  288. "error_message": node_run_result.error,
  289. "error_type": node_run_result.error_type,
  290. },
  291. )
  292. else:
  293. node_run_result = NodeRunResult(
  294. **node_error_args,
  295. outputs={
  296. "error_message": node_run_result.error,
  297. "error_type": node_run_result.error_type,
  298. },
  299. )
  300. run_succeeded = node_run_result.status in (
  301. WorkflowNodeExecutionStatus.SUCCEEDED,
  302. WorkflowNodeExecutionStatus.EXCEPTION,
  303. )
  304. error = node_run_result.error if not run_succeeded else None
  305. except WorkflowNodeRunFailedError as e:
  306. node_instance = e.node_instance
  307. run_succeeded = False
  308. node_run_result = None
  309. error = e.error
  310. workflow_node_execution = WorkflowNodeExecution()
  311. workflow_node_execution.id = str(uuid4())
  312. workflow_node_execution.tenant_id = tenant_id
  313. workflow_node_execution.triggered_from = WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP.value
  314. workflow_node_execution.index = 1
  315. workflow_node_execution.node_id = node_id
  316. workflow_node_execution.node_type = node_instance.node_type
  317. workflow_node_execution.title = node_instance.node_data.title
  318. workflow_node_execution.elapsed_time = time.perf_counter() - start_at
  319. workflow_node_execution.created_by_role = CreatedByRole.ACCOUNT.value
  320. workflow_node_execution.created_at = datetime.now(UTC).replace(tzinfo=None)
  321. workflow_node_execution.finished_at = datetime.now(UTC).replace(tzinfo=None)
  322. if run_succeeded and node_run_result:
  323. # create workflow node execution
  324. inputs = WorkflowEntry.handle_special_values(node_run_result.inputs) if node_run_result.inputs else None
  325. process_data = (
  326. WorkflowEntry.handle_special_values(node_run_result.process_data)
  327. if node_run_result.process_data
  328. else None
  329. )
  330. outputs = WorkflowEntry.handle_special_values(node_run_result.outputs) if node_run_result.outputs else None
  331. workflow_node_execution.inputs = json.dumps(inputs)
  332. workflow_node_execution.process_data = json.dumps(process_data)
  333. workflow_node_execution.outputs = json.dumps(outputs)
  334. workflow_node_execution.execution_metadata = (
  335. json.dumps(jsonable_encoder(node_run_result.metadata)) if node_run_result.metadata else None
  336. )
  337. if node_run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED:
  338. workflow_node_execution.status = WorkflowNodeExecutionStatus.SUCCEEDED.value
  339. elif node_run_result.status == WorkflowNodeExecutionStatus.EXCEPTION:
  340. workflow_node_execution.status = WorkflowNodeExecutionStatus.EXCEPTION.value
  341. workflow_node_execution.error = node_run_result.error
  342. else:
  343. # create workflow node execution
  344. workflow_node_execution.status = WorkflowNodeExecutionStatus.FAILED.value
  345. workflow_node_execution.error = error
  346. return workflow_node_execution
  347. def convert_to_workflow(self, app_model: App, account: Account, args: dict) -> App:
  348. """
  349. Basic mode of chatbot app(expert mode) to workflow
  350. Completion App to Workflow App
  351. :param app_model: App instance
  352. :param account: Account instance
  353. :param args: dict
  354. :return:
  355. """
  356. # chatbot convert to workflow mode
  357. workflow_converter = WorkflowConverter()
  358. if app_model.mode not in {AppMode.CHAT.value, AppMode.COMPLETION.value}:
  359. raise ValueError(f"Current App mode: {app_model.mode} is not supported convert to workflow.")
  360. # convert to workflow
  361. new_app: App = workflow_converter.convert_to_workflow(
  362. app_model=app_model,
  363. account=account,
  364. name=args.get("name", "Default Name"),
  365. icon_type=args.get("icon_type", "emoji"),
  366. icon=args.get("icon", "🤖"),
  367. icon_background=args.get("icon_background", "#FFEAD5"),
  368. )
  369. return new_app
  370. def validate_features_structure(self, app_model: App, features: dict) -> dict:
  371. if app_model.mode == AppMode.ADVANCED_CHAT.value:
  372. return AdvancedChatAppConfigManager.config_validate(
  373. tenant_id=app_model.tenant_id, config=features, only_structure_validate=True
  374. )
  375. elif app_model.mode == AppMode.WORKFLOW.value:
  376. return WorkflowAppConfigManager.config_validate(
  377. tenant_id=app_model.tenant_id, config=features, only_structure_validate=True
  378. )
  379. else:
  380. raise ValueError(f"Invalid app mode: {app_model.mode}")