workflow_service.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520
  1. import json
  2. import time
  3. from collections.abc import Callable, Generator, Sequence
  4. from datetime import UTC, datetime
  5. from typing import Any, Optional
  6. from uuid import uuid4
  7. from sqlalchemy import select
  8. from sqlalchemy.orm import Session
  9. from core.app.apps.advanced_chat.app_config_manager import AdvancedChatAppConfigManager
  10. from core.app.apps.workflow.app_config_manager import WorkflowAppConfigManager
  11. from core.model_runtime.utils.encoders import jsonable_encoder
  12. from core.variables import Variable
  13. from core.workflow.entities.node_entities import NodeRunResult
  14. from core.workflow.errors import WorkflowNodeRunFailedError
  15. from core.workflow.graph_engine.entities.event import InNodeEvent
  16. from core.workflow.nodes import NodeType
  17. from core.workflow.nodes.base.node import BaseNode
  18. from core.workflow.nodes.enums import ErrorStrategy
  19. from core.workflow.nodes.event import RunCompletedEvent
  20. from core.workflow.nodes.event.types import NodeEvent
  21. from core.workflow.nodes.node_mapping import LATEST_VERSION, NODE_TYPE_CLASSES_MAPPING
  22. from core.workflow.workflow_entry import WorkflowEntry
  23. from events.app_event import app_draft_workflow_was_synced, app_published_workflow_was_updated
  24. from extensions.ext_database import db
  25. from models.account import Account
  26. from models.enums import CreatedByRole
  27. from models.model import App, AppMode
  28. from models.workflow import (
  29. Workflow,
  30. WorkflowNodeExecution,
  31. WorkflowNodeExecutionStatus,
  32. WorkflowNodeExecutionTriggeredFrom,
  33. WorkflowType,
  34. )
  35. from services.errors.app import WorkflowHashNotEqualError
  36. from services.workflow.workflow_converter import WorkflowConverter
  37. from .errors.workflow_service import DraftWorkflowDeletionError, WorkflowInUseError
  38. class WorkflowService:
  39. """
  40. Workflow Service
  41. """
  42. def get_draft_workflow(self, app_model: App) -> Optional[Workflow]:
  43. """
  44. Get draft workflow
  45. """
  46. # fetch draft workflow by app_model
  47. workflow = (
  48. db.session.query(Workflow)
  49. .filter(
  50. Workflow.tenant_id == app_model.tenant_id, Workflow.app_id == app_model.id, Workflow.version == "draft"
  51. )
  52. .first()
  53. )
  54. # return draft workflow
  55. return workflow
  56. def get_published_workflow(self, app_model: App) -> Optional[Workflow]:
  57. """
  58. Get published workflow
  59. """
  60. if not app_model.workflow_id:
  61. return None
  62. # fetch published workflow by workflow_id
  63. workflow = (
  64. db.session.query(Workflow)
  65. .filter(
  66. Workflow.tenant_id == app_model.tenant_id,
  67. Workflow.app_id == app_model.id,
  68. Workflow.id == app_model.workflow_id,
  69. )
  70. .first()
  71. )
  72. return workflow
  73. def get_all_published_workflow(
  74. self,
  75. *,
  76. session: Session,
  77. app_model: App,
  78. page: int,
  79. limit: int,
  80. user_id: str | None,
  81. named_only: bool = False,
  82. ) -> tuple[Sequence[Workflow], bool]:
  83. """
  84. Get published workflow with pagination
  85. """
  86. if not app_model.workflow_id:
  87. return [], False
  88. stmt = (
  89. select(Workflow)
  90. .where(Workflow.app_id == app_model.id)
  91. .order_by(Workflow.version.desc())
  92. .limit(limit + 1)
  93. .offset((page - 1) * limit)
  94. )
  95. if user_id:
  96. stmt = stmt.where(Workflow.created_by == user_id)
  97. if named_only:
  98. stmt = stmt.where(Workflow.marked_name != "")
  99. workflows = session.scalars(stmt).all()
  100. has_more = len(workflows) > limit
  101. if has_more:
  102. workflows = workflows[:-1]
  103. return workflows, has_more
  104. def sync_draft_workflow(
  105. self,
  106. *,
  107. app_model: App,
  108. graph: dict,
  109. features: dict,
  110. unique_hash: Optional[str],
  111. account: Account,
  112. environment_variables: Sequence[Variable],
  113. conversation_variables: Sequence[Variable],
  114. ) -> Workflow:
  115. """
  116. Sync draft workflow
  117. :raises WorkflowHashNotEqualError
  118. """
  119. # fetch draft workflow by app_model
  120. workflow = self.get_draft_workflow(app_model=app_model)
  121. if workflow and workflow.unique_hash != unique_hash:
  122. raise WorkflowHashNotEqualError()
  123. # validate features structure
  124. self.validate_features_structure(app_model=app_model, features=features)
  125. # create draft workflow if not found
  126. if not workflow:
  127. workflow = Workflow(
  128. tenant_id=app_model.tenant_id,
  129. app_id=app_model.id,
  130. type=WorkflowType.from_app_mode(app_model.mode).value,
  131. version="draft",
  132. graph=json.dumps(graph),
  133. features=json.dumps(features),
  134. created_by=account.id,
  135. environment_variables=environment_variables,
  136. conversation_variables=conversation_variables,
  137. )
  138. db.session.add(workflow)
  139. # update draft workflow if found
  140. else:
  141. workflow.graph = json.dumps(graph)
  142. workflow.features = json.dumps(features)
  143. workflow.updated_by = account.id
  144. workflow.updated_at = datetime.now(UTC).replace(tzinfo=None)
  145. workflow.environment_variables = environment_variables
  146. workflow.conversation_variables = conversation_variables
  147. # commit db session changes
  148. db.session.commit()
  149. # trigger app workflow events
  150. app_draft_workflow_was_synced.send(app_model, synced_draft_workflow=workflow)
  151. # return draft workflow
  152. return workflow
  153. def publish_workflow(
  154. self,
  155. *,
  156. session: Session,
  157. app_model: App,
  158. account: Account,
  159. marked_name: str = "",
  160. marked_comment: str = "",
  161. ) -> Workflow:
  162. draft_workflow_stmt = select(Workflow).where(
  163. Workflow.tenant_id == app_model.tenant_id,
  164. Workflow.app_id == app_model.id,
  165. Workflow.version == "draft",
  166. )
  167. draft_workflow = session.scalar(draft_workflow_stmt)
  168. if not draft_workflow:
  169. raise ValueError("No valid workflow found.")
  170. # create new workflow
  171. workflow = Workflow.new(
  172. tenant_id=app_model.tenant_id,
  173. app_id=app_model.id,
  174. type=draft_workflow.type,
  175. version=str(datetime.now(UTC).replace(tzinfo=None)),
  176. graph=draft_workflow.graph,
  177. features=draft_workflow.features,
  178. created_by=account.id,
  179. environment_variables=draft_workflow.environment_variables,
  180. conversation_variables=draft_workflow.conversation_variables,
  181. marked_name=marked_name,
  182. marked_comment=marked_comment,
  183. )
  184. # commit db session changes
  185. session.add(workflow)
  186. # trigger app workflow events
  187. app_published_workflow_was_updated.send(app_model, published_workflow=workflow)
  188. # return new workflow
  189. return workflow
  190. def get_default_block_configs(self) -> list[dict]:
  191. """
  192. Get default block configs
  193. """
  194. # return default block config
  195. default_block_configs = []
  196. for node_class_mapping in NODE_TYPE_CLASSES_MAPPING.values():
  197. node_class = node_class_mapping[LATEST_VERSION]
  198. default_config = node_class.get_default_config()
  199. if default_config:
  200. default_block_configs.append(default_config)
  201. return default_block_configs
  202. def get_default_block_config(self, node_type: str, filters: Optional[dict] = None) -> Optional[dict]:
  203. """
  204. Get default config of node.
  205. :param node_type: node type
  206. :param filters: filter by node config parameters.
  207. :return:
  208. """
  209. node_type_enum = NodeType(node_type)
  210. # return default block config
  211. if node_type_enum not in NODE_TYPE_CLASSES_MAPPING:
  212. return None
  213. node_class = NODE_TYPE_CLASSES_MAPPING[node_type_enum][LATEST_VERSION]
  214. default_config = node_class.get_default_config(filters=filters)
  215. if not default_config:
  216. return None
  217. return default_config
  218. def run_draft_workflow_node(
  219. self, app_model: App, node_id: str, user_inputs: dict, account: Account
  220. ) -> WorkflowNodeExecution:
  221. """
  222. Run draft workflow node
  223. """
  224. # fetch draft workflow by app_model
  225. draft_workflow = self.get_draft_workflow(app_model=app_model)
  226. if not draft_workflow:
  227. raise ValueError("Workflow not initialized")
  228. # run draft workflow node
  229. start_at = time.perf_counter()
  230. workflow_node_execution = self._handle_node_run_result(
  231. getter=lambda: WorkflowEntry.single_step_run(
  232. workflow=draft_workflow,
  233. node_id=node_id,
  234. user_inputs=user_inputs,
  235. user_id=account.id,
  236. ),
  237. start_at=start_at,
  238. tenant_id=app_model.tenant_id,
  239. node_id=node_id,
  240. )
  241. workflow_node_execution.app_id = app_model.id
  242. workflow_node_execution.created_by = account.id
  243. workflow_node_execution.workflow_id = draft_workflow.id
  244. db.session.add(workflow_node_execution)
  245. db.session.commit()
  246. return workflow_node_execution
  247. def run_free_workflow_node(
  248. self, node_data: dict, tenant_id: str, user_id: str, node_id: str, user_inputs: dict[str, Any]
  249. ) -> WorkflowNodeExecution:
  250. """
  251. Run draft workflow node
  252. """
  253. # run draft workflow node
  254. start_at = time.perf_counter()
  255. workflow_node_execution = self._handle_node_run_result(
  256. getter=lambda: WorkflowEntry.run_free_node(
  257. node_id=node_id,
  258. node_data=node_data,
  259. tenant_id=tenant_id,
  260. user_id=user_id,
  261. user_inputs=user_inputs,
  262. ),
  263. start_at=start_at,
  264. tenant_id=tenant_id,
  265. node_id=node_id,
  266. )
  267. return workflow_node_execution
  268. def _handle_node_run_result(
  269. self,
  270. getter: Callable[[], tuple[BaseNode, Generator[NodeEvent | InNodeEvent, None, None]]],
  271. start_at: float,
  272. tenant_id: str,
  273. node_id: str,
  274. ) -> WorkflowNodeExecution:
  275. """
  276. Handle node run result
  277. :param getter: Callable[[], tuple[BaseNode, Generator[RunEvent | InNodeEvent, None, None]]]
  278. :param start_at: float
  279. :param tenant_id: str
  280. :param node_id: str
  281. """
  282. try:
  283. node_instance, generator = getter()
  284. node_run_result: NodeRunResult | None = None
  285. for event in generator:
  286. if isinstance(event, RunCompletedEvent):
  287. node_run_result = event.run_result
  288. # sign output files
  289. node_run_result.outputs = WorkflowEntry.handle_special_values(node_run_result.outputs)
  290. break
  291. if not node_run_result:
  292. raise ValueError("Node run failed with no run result")
  293. # single step debug mode error handling return
  294. if node_run_result.status == WorkflowNodeExecutionStatus.FAILED and node_instance.should_continue_on_error:
  295. node_error_args: dict[str, Any] = {
  296. "status": WorkflowNodeExecutionStatus.EXCEPTION,
  297. "error": node_run_result.error,
  298. "inputs": node_run_result.inputs,
  299. "metadata": {"error_strategy": node_instance.node_data.error_strategy},
  300. }
  301. if node_instance.node_data.error_strategy is ErrorStrategy.DEFAULT_VALUE:
  302. node_run_result = NodeRunResult(
  303. **node_error_args,
  304. outputs={
  305. **node_instance.node_data.default_value_dict,
  306. "error_message": node_run_result.error,
  307. "error_type": node_run_result.error_type,
  308. },
  309. )
  310. else:
  311. node_run_result = NodeRunResult(
  312. **node_error_args,
  313. outputs={
  314. "error_message": node_run_result.error,
  315. "error_type": node_run_result.error_type,
  316. },
  317. )
  318. run_succeeded = node_run_result.status in (
  319. WorkflowNodeExecutionStatus.SUCCEEDED,
  320. WorkflowNodeExecutionStatus.EXCEPTION,
  321. )
  322. error = node_run_result.error if not run_succeeded else None
  323. except WorkflowNodeRunFailedError as e:
  324. node_instance = e.node_instance
  325. run_succeeded = False
  326. node_run_result = None
  327. error = e.error
  328. workflow_node_execution = WorkflowNodeExecution()
  329. workflow_node_execution.id = str(uuid4())
  330. workflow_node_execution.tenant_id = tenant_id
  331. workflow_node_execution.triggered_from = WorkflowNodeExecutionTriggeredFrom.SINGLE_STEP.value
  332. workflow_node_execution.index = 1
  333. workflow_node_execution.node_id = node_id
  334. workflow_node_execution.node_type = node_instance.node_type
  335. workflow_node_execution.title = node_instance.node_data.title
  336. workflow_node_execution.elapsed_time = time.perf_counter() - start_at
  337. workflow_node_execution.created_by_role = CreatedByRole.ACCOUNT.value
  338. workflow_node_execution.created_at = datetime.now(UTC).replace(tzinfo=None)
  339. workflow_node_execution.finished_at = datetime.now(UTC).replace(tzinfo=None)
  340. if run_succeeded and node_run_result:
  341. # create workflow node execution
  342. inputs = WorkflowEntry.handle_special_values(node_run_result.inputs) if node_run_result.inputs else None
  343. process_data = (
  344. WorkflowEntry.handle_special_values(node_run_result.process_data)
  345. if node_run_result.process_data
  346. else None
  347. )
  348. outputs = WorkflowEntry.handle_special_values(node_run_result.outputs) if node_run_result.outputs else None
  349. workflow_node_execution.inputs = json.dumps(inputs)
  350. workflow_node_execution.process_data = json.dumps(process_data)
  351. workflow_node_execution.outputs = json.dumps(outputs)
  352. workflow_node_execution.execution_metadata = (
  353. json.dumps(jsonable_encoder(node_run_result.metadata)) if node_run_result.metadata else None
  354. )
  355. if node_run_result.status == WorkflowNodeExecutionStatus.SUCCEEDED:
  356. workflow_node_execution.status = WorkflowNodeExecutionStatus.SUCCEEDED.value
  357. elif node_run_result.status == WorkflowNodeExecutionStatus.EXCEPTION:
  358. workflow_node_execution.status = WorkflowNodeExecutionStatus.EXCEPTION.value
  359. workflow_node_execution.error = node_run_result.error
  360. else:
  361. # create workflow node execution
  362. workflow_node_execution.status = WorkflowNodeExecutionStatus.FAILED.value
  363. workflow_node_execution.error = error
  364. return workflow_node_execution
  365. def convert_to_workflow(self, app_model: App, account: Account, args: dict) -> App:
  366. """
  367. Basic mode of chatbot app(expert mode) to workflow
  368. Completion App to Workflow App
  369. :param app_model: App instance
  370. :param account: Account instance
  371. :param args: dict
  372. :return:
  373. """
  374. # chatbot convert to workflow mode
  375. workflow_converter = WorkflowConverter()
  376. if app_model.mode not in {AppMode.CHAT.value, AppMode.COMPLETION.value}:
  377. raise ValueError(f"Current App mode: {app_model.mode} is not supported convert to workflow.")
  378. # convert to workflow
  379. new_app: App = workflow_converter.convert_to_workflow(
  380. app_model=app_model,
  381. account=account,
  382. name=args.get("name", "Default Name"),
  383. icon_type=args.get("icon_type", "emoji"),
  384. icon=args.get("icon", "🤖"),
  385. icon_background=args.get("icon_background", "#FFEAD5"),
  386. )
  387. return new_app
  388. def validate_features_structure(self, app_model: App, features: dict) -> dict:
  389. if app_model.mode == AppMode.ADVANCED_CHAT.value:
  390. return AdvancedChatAppConfigManager.config_validate(
  391. tenant_id=app_model.tenant_id, config=features, only_structure_validate=True
  392. )
  393. elif app_model.mode == AppMode.WORKFLOW.value:
  394. return WorkflowAppConfigManager.config_validate(
  395. tenant_id=app_model.tenant_id, config=features, only_structure_validate=True
  396. )
  397. else:
  398. raise ValueError(f"Invalid app mode: {app_model.mode}")
  399. def update_workflow(
  400. self, *, session: Session, workflow_id: str, tenant_id: str, account_id: str, data: dict
  401. ) -> Optional[Workflow]:
  402. """
  403. Update workflow attributes
  404. :param session: SQLAlchemy database session
  405. :param workflow_id: Workflow ID
  406. :param tenant_id: Tenant ID
  407. :param account_id: Account ID (for permission check)
  408. :param data: Dictionary containing fields to update
  409. :return: Updated workflow or None if not found
  410. """
  411. stmt = select(Workflow).where(Workflow.id == workflow_id, Workflow.tenant_id == tenant_id)
  412. workflow = session.scalar(stmt)
  413. if not workflow:
  414. return None
  415. allowed_fields = ["marked_name", "marked_comment"]
  416. for field, value in data.items():
  417. if field in allowed_fields:
  418. setattr(workflow, field, value)
  419. workflow.updated_by = account_id
  420. workflow.updated_at = datetime.now(UTC).replace(tzinfo=None)
  421. return workflow
  422. def delete_workflow(self, *, session: Session, workflow_id: str, tenant_id: str) -> bool:
  423. """
  424. Delete a workflow
  425. :param session: SQLAlchemy database session
  426. :param workflow_id: Workflow ID
  427. :param tenant_id: Tenant ID
  428. :return: True if successful
  429. :raises: ValueError if workflow not found
  430. :raises: WorkflowInUseError if workflow is in use
  431. :raises: DraftWorkflowDeletionError if workflow is a draft version
  432. """
  433. stmt = select(Workflow).where(Workflow.id == workflow_id, Workflow.tenant_id == tenant_id)
  434. workflow = session.scalar(stmt)
  435. if not workflow:
  436. raise ValueError(f"Workflow with ID {workflow_id} not found")
  437. # Check if workflow is a draft version
  438. if workflow.version == "draft":
  439. raise DraftWorkflowDeletionError("Cannot delete draft workflow versions")
  440. # Check if this workflow is currently referenced by an app
  441. stmt = select(App).where(App.workflow_id == workflow_id)
  442. app = session.scalar(stmt)
  443. if app:
  444. # Cannot delete a workflow that's currently in use by an app
  445. raise WorkflowInUseError(f"Cannot delete workflow that is currently in use by app '{app.name}'")
  446. session.delete(workflow)
  447. return True