workflow.py 23 KB


  1. import json
  2. from collections.abc import Mapping, Sequence
  3. from enum import Enum
  4. from typing import Any, Optional, Union
  5. import contexts
  6. from constants import HIDDEN_VALUE
  7. from core.app.segments import (
  8. SecretVariable,
  9. Variable,
  10. factory,
  11. )
  12. from core.helper import encrypter
  13. from extensions.ext_database import db
  14. from libs import helper
  15. from models import StringUUID
  16. from models.account import Account
  17. class CreatedByRole(Enum):
  18. """
  19. Created By Role Enum
  20. """
  21. ACCOUNT = 'account'
  22. END_USER = 'end_user'
  23. @classmethod
  24. def value_of(cls, value: str) -> 'CreatedByRole':
  25. """
  26. Get value of given mode.
  27. :param value: mode value
  28. :return: mode
  29. """
  30. for mode in cls:
  31. if mode.value == value:
  32. return mode
  33. raise ValueError(f'invalid created by role value {value}')
  34. class WorkflowType(Enum):
  35. """
  36. Workflow Type Enum
  37. """
  38. WORKFLOW = 'workflow'
  39. CHAT = 'chat'
  40. @classmethod
  41. def value_of(cls, value: str) -> 'WorkflowType':
  42. """
  43. Get value of given mode.
  44. :param value: mode value
  45. :return: mode
  46. """
  47. for mode in cls:
  48. if mode.value == value:
  49. return mode
  50. raise ValueError(f'invalid workflow type value {value}')
  51. @classmethod
  52. def from_app_mode(cls, app_mode: Union[str, 'AppMode']) -> 'WorkflowType':
  53. """
  54. Get workflow type from app mode.
  55. :param app_mode: app mode
  56. :return: workflow type
  57. """
  58. from models.model import AppMode
  59. app_mode = app_mode if isinstance(app_mode, AppMode) else AppMode.value_of(app_mode)
  60. return cls.WORKFLOW if app_mode == AppMode.WORKFLOW else cls.CHAT
  61. class Workflow(db.Model):
  62. """
  63. Workflow, for `Workflow App` and `Chat App workflow mode`.
  64. Attributes:
  65. - id (uuid) Workflow ID, pk
  66. - tenant_id (uuid) Workspace ID
  67. - app_id (uuid) App ID
  68. - type (string) Workflow type
  69. `workflow` for `Workflow App`
  70. `chat` for `Chat App workflow mode`
  71. - version (string) Version
  72. `draft` for draft version (only one for each app), other for version number (redundant)
  73. - graph (text) Workflow canvas configuration (JSON)
  74. The entire canvas configuration JSON, including Node, Edge, and other configurations
  75. - nodes (array[object]) Node list, see Node Schema
  76. - edges (array[object]) Edge list, see Edge Schema
  77. - created_by (uuid) Creator ID
  78. - created_at (timestamp) Creation time
  79. - updated_by (uuid) `optional` Last updater ID
  80. - updated_at (timestamp) `optional` Last update time
  81. """
  82. __tablename__ = 'workflows'
  83. __table_args__ = (
  84. db.PrimaryKeyConstraint('id', name='workflow_pkey'),
  85. db.Index('workflow_version_idx', 'tenant_id', 'app_id', 'version'),
  86. )
  87. id = db.Column(StringUUID, server_default=db.text('uuid_generate_v4()'))
  88. tenant_id = db.Column(StringUUID, nullable=False)
  89. app_id = db.Column(StringUUID, nullable=False)
  90. type = db.Column(db.String(255), nullable=False)
  91. version = db.Column(db.String(255), nullable=False)
  92. graph = db.Column(db.Text)
  93. features = db.Column(db.Text)
  94. created_by = db.Column(StringUUID, nullable=False)
  95. created_at = db.Column(db.DateTime, nullable=False, server_default=db.text('CURRENT_TIMESTAMP(0)'))
  96. updated_by = db.Column(StringUUID)
  97. updated_at = db.Column(db.DateTime)
  98. _environment_variables = db.Column('environment_variables', db.Text, nullable=False, server_default='{}')
  99. @property
  100. def created_by_account(self):
  101. return db.session.get(Account, self.created_by)
  102. @property
  103. def updated_by_account(self):
  104. return db.session.get(Account, self.updated_by) if self.updated_by else None
  105. @property
  106. def graph_dict(self) -> Mapping[str, Any]:
  107. return json.loads(self.graph) if self.graph else {}
  108. @property
  109. def features_dict(self) -> Mapping[str, Any]:
  110. return json.loads(self.features) if self.features else {}
  111. def user_input_form(self, to_old_structure: bool = False) -> list:
  112. # get start node from graph
  113. if not self.graph:
  114. return []
  115. graph_dict = self.graph_dict
  116. if 'nodes' not in graph_dict:
  117. return []
  118. start_node = next((node for node in graph_dict['nodes'] if node['data']['type'] == 'start'), None)
  119. if not start_node:
  120. return []
  121. # get user_input_form from start node
  122. variables = start_node.get('data', {}).get('variables', [])
  123. if to_old_structure:
  124. old_structure_variables = []
  125. for variable in variables:
  126. old_structure_variables.append({
  127. variable['type']: variable
  128. })
  129. return old_structure_variables
  130. return variables
  131. @property
  132. def unique_hash(self) -> str:
  133. """
  134. Get hash of workflow.
  135. :return: hash
  136. """
  137. entity = {
  138. 'graph': self.graph_dict,
  139. 'features': self.features_dict
  140. }
  141. return helper.generate_text_hash(json.dumps(entity, sort_keys=True))
  142. @property
  143. def tool_published(self) -> bool:
  144. from models.tools import WorkflowToolProvider
  145. return db.session.query(WorkflowToolProvider).filter(
  146. WorkflowToolProvider.app_id == self.app_id
  147. ).first() is not None
  148. @property
  149. def environment_variables(self) -> Sequence[Variable]:
  150. # TODO: find some way to init `self._environment_variables` when instance created.
  151. if self._environment_variables is None:
  152. self._environment_variables = '{}'
  153. tenant_id = contexts.tenant_id.get()
  154. environment_variables_dict: dict[str, Any] = json.loads(self._environment_variables)
  155. results = [factory.build_variable_from_mapping(v) for v in environment_variables_dict.values()]
  156. # decrypt secret variables value
  157. decrypt_func = (
  158. lambda var: var.model_copy(
  159. update={'value': encrypter.decrypt_token(tenant_id=tenant_id, token=var.value)}
  160. )
  161. if isinstance(var, SecretVariable)
  162. else var
  163. )
  164. results = list(map(decrypt_func, results))
  165. return results
  166. @environment_variables.setter
  167. def environment_variables(self, value: Sequence[Variable]):
  168. tenant_id = contexts.tenant_id.get()
  169. value = list(value)
  170. if any(var for var in value if not var.id):
  171. raise ValueError('environment variable require a unique id')
  172. # Compare inputs and origin variables, if the value is HIDDEN_VALUE, use the origin variable value (only update `name`).
  173. origin_variables_dictionary = {var.id: var for var in self.environment_variables}
  174. for i, variable in enumerate(value):
  175. if variable.id in origin_variables_dictionary and variable.value == HIDDEN_VALUE:
  176. value[i] = origin_variables_dictionary[variable.id].model_copy(update={'name': variable.name})
  177. # encrypt secret variables value
  178. encrypt_func = (
  179. lambda var: var.model_copy(
  180. update={'value': encrypter.encrypt_token(tenant_id=tenant_id, token=var.value)}
  181. )
  182. if isinstance(var, SecretVariable)
  183. else var
  184. )
  185. encrypted_vars = list(map(encrypt_func, value))
  186. environment_variables_json = json.dumps(
  187. {var.name: var.model_dump() for var in encrypted_vars},
  188. ensure_ascii=False,
  189. )
  190. self._environment_variables = environment_variables_json
  191. def to_dict(self, *, include_secret: bool = False) -> Mapping[str, Any]:
  192. environment_variables = list(self.environment_variables)
  193. environment_variables = [
  194. v if not isinstance(v, SecretVariable) or include_secret else v.model_copy(update={'value': ''})
  195. for v in environment_variables
  196. ]
  197. result = {
  198. 'graph': self.graph_dict,
  199. 'features': self.features_dict,
  200. 'environment_variables': [var.model_dump(mode='json') for var in environment_variables],
  201. }
  202. return result
  203. class WorkflowRunTriggeredFrom(Enum):
  204. """
  205. Workflow Run Triggered From Enum
  206. """
  207. DEBUGGING = 'debugging'
  208. APP_RUN = 'app-run'
  209. @classmethod
  210. def value_of(cls, value: str) -> 'WorkflowRunTriggeredFrom':
  211. """
  212. Get value of given mode.
  213. :param value: mode value
  214. :return: mode
  215. """
  216. for mode in cls:
  217. if mode.value == value:
  218. return mode
  219. raise ValueError(f'invalid workflow run triggered from value {value}')
  220. class WorkflowRunStatus(Enum):
  221. """
  222. Workflow Run Status Enum
  223. """
  224. RUNNING = 'running'
  225. SUCCEEDED = 'succeeded'
  226. FAILED = 'failed'
  227. STOPPED = 'stopped'
  228. @classmethod
  229. def value_of(cls, value: str) -> 'WorkflowRunStatus':
  230. """
  231. Get value of given mode.
  232. :param value: mode value
  233. :return: mode
  234. """
  235. for mode in cls:
  236. if mode.value == value:
  237. return mode
  238. raise ValueError(f'invalid workflow run status value {value}')
  239. class WorkflowRun(db.Model):
  240. """
  241. Workflow Run
  242. Attributes:
  243. - id (uuid) Run ID
  244. - tenant_id (uuid) Workspace ID
  245. - app_id (uuid) App ID
  246. - sequence_number (int) Auto-increment sequence number, incremented within the App, starting from 1
  247. - workflow_id (uuid) Workflow ID
  248. - type (string) Workflow type
  249. - triggered_from (string) Trigger source
  250. `debugging` for canvas debugging
  251. `app-run` for (published) app execution
  252. - version (string) Version
  253. - graph (text) Workflow canvas configuration (JSON)
  254. - inputs (text) Input parameters
  255. - status (string) Execution status, `running` / `succeeded` / `failed` / `stopped`
  256. - outputs (text) `optional` Output content
  257. - error (string) `optional` Error reason
  258. - elapsed_time (float) `optional` Time consumption (s)
  259. - total_tokens (int) `optional` Total tokens used
  260. - total_steps (int) Total steps (redundant), default 0
  261. - created_by_role (string) Creator role
  262. - `account` Console account
  263. - `end_user` End user
  264. - created_by (uuid) Runner ID
  265. - created_at (timestamp) Run time
  266. - finished_at (timestamp) End time
  267. """
  268. __tablename__ = 'workflow_runs'
  269. __table_args__ = (
  270. db.PrimaryKeyConstraint('id', name='workflow_run_pkey'),
  271. db.Index('workflow_run_triggerd_from_idx', 'tenant_id', 'app_id', 'triggered_from'),
  272. db.Index('workflow_run_tenant_app_sequence_idx', 'tenant_id', 'app_id', 'sequence_number'),
  273. )
  274. id = db.Column(StringUUID, server_default=db.text('uuid_generate_v4()'))
  275. tenant_id = db.Column(StringUUID, nullable=False)
  276. app_id = db.Column(StringUUID, nullable=False)
  277. sequence_number = db.Column(db.Integer, nullable=False)
  278. workflow_id = db.Column(StringUUID, nullable=False)
  279. type = db.Column(db.String(255), nullable=False)
  280. triggered_from = db.Column(db.String(255), nullable=False)
  281. version = db.Column(db.String(255), nullable=False)
  282. graph = db.Column(db.Text)
  283. inputs = db.Column(db.Text)
  284. status = db.Column(db.String(255), nullable=False)
  285. outputs = db.Column(db.Text)
  286. error = db.Column(db.Text)
  287. elapsed_time = db.Column(db.Float, nullable=False, server_default=db.text('0'))
  288. total_tokens = db.Column(db.Integer, nullable=False, server_default=db.text('0'))
  289. total_steps = db.Column(db.Integer, server_default=db.text('0'))
  290. created_by_role = db.Column(db.String(255), nullable=False)
  291. created_by = db.Column(StringUUID, nullable=False)
  292. created_at = db.Column(db.DateTime, nullable=False, server_default=db.text('CURRENT_TIMESTAMP(0)'))
  293. finished_at = db.Column(db.DateTime)
  294. @property
  295. def created_by_account(self):
  296. created_by_role = CreatedByRole.value_of(self.created_by_role)
  297. return db.session.get(Account, self.created_by) \
  298. if created_by_role == CreatedByRole.ACCOUNT else None
  299. @property
  300. def created_by_end_user(self):
  301. from models.model import EndUser
  302. created_by_role = CreatedByRole.value_of(self.created_by_role)
  303. return db.session.get(EndUser, self.created_by) \
  304. if created_by_role == CreatedByRole.END_USER else None
  305. @property
  306. def graph_dict(self):
  307. return json.loads(self.graph) if self.graph else None
  308. @property
  309. def inputs_dict(self):
  310. return json.loads(self.inputs) if self.inputs else None
  311. @property
  312. def outputs_dict(self):
  313. return json.loads(self.outputs) if self.outputs else None
  314. @property
  315. def message(self) -> Optional['Message']:
  316. from models.model import Message
  317. return db.session.query(Message).filter(
  318. Message.app_id == self.app_id,
  319. Message.workflow_run_id == self.id
  320. ).first()
  321. @property
  322. def workflow(self):
  323. return db.session.query(Workflow).filter(Workflow.id == self.workflow_id).first()
  324. def to_dict(self):
  325. return {
  326. 'id': self.id,
  327. 'tenant_id': self.tenant_id,
  328. 'app_id': self.app_id,
  329. 'sequence_number': self.sequence_number,
  330. 'workflow_id': self.workflow_id,
  331. 'type': self.type,
  332. 'triggered_from': self.triggered_from,
  333. 'version': self.version,
  334. 'graph': self.graph_dict,
  335. 'inputs': self.inputs_dict,
  336. 'status': self.status,
  337. 'outputs': self.outputs_dict,
  338. 'error': self.error,
  339. 'elapsed_time': self.elapsed_time,
  340. 'total_tokens': self.total_tokens,
  341. 'total_steps': self.total_steps,
  342. 'created_by_role': self.created_by_role,
  343. 'created_by': self.created_by,
  344. 'created_at': self.created_at,
  345. 'finished_at': self.finished_at,
  346. }
  347. @classmethod
  348. def from_dict(cls, data: dict) -> 'WorkflowRun':
  349. return cls(
  350. id=data.get('id'),
  351. tenant_id=data.get('tenant_id'),
  352. app_id=data.get('app_id'),
  353. sequence_number=data.get('sequence_number'),
  354. workflow_id=data.get('workflow_id'),
  355. type=data.get('type'),
  356. triggered_from=data.get('triggered_from'),
  357. version=data.get('version'),
  358. graph=json.dumps(data.get('graph')),
  359. inputs=json.dumps(data.get('inputs')),
  360. status=data.get('status'),
  361. outputs=json.dumps(data.get('outputs')),
  362. error=data.get('error'),
  363. elapsed_time=data.get('elapsed_time'),
  364. total_tokens=data.get('total_tokens'),
  365. total_steps=data.get('total_steps'),
  366. created_by_role=data.get('created_by_role'),
  367. created_by=data.get('created_by'),
  368. created_at=data.get('created_at'),
  369. finished_at=data.get('finished_at'),
  370. )
  371. class WorkflowNodeExecutionTriggeredFrom(Enum):
  372. """
  373. Workflow Node Execution Triggered From Enum
  374. """
  375. SINGLE_STEP = 'single-step'
  376. WORKFLOW_RUN = 'workflow-run'
  377. @classmethod
  378. def value_of(cls, value: str) -> 'WorkflowNodeExecutionTriggeredFrom':
  379. """
  380. Get value of given mode.
  381. :param value: mode value
  382. :return: mode
  383. """
  384. for mode in cls:
  385. if mode.value == value:
  386. return mode
  387. raise ValueError(f'invalid workflow node execution triggered from value {value}')
  388. class WorkflowNodeExecutionStatus(Enum):
  389. """
  390. Workflow Node Execution Status Enum
  391. """
  392. RUNNING = 'running'
  393. SUCCEEDED = 'succeeded'
  394. FAILED = 'failed'
  395. @classmethod
  396. def value_of(cls, value: str) -> 'WorkflowNodeExecutionStatus':
  397. """
  398. Get value of given mode.
  399. :param value: mode value
  400. :return: mode
  401. """
  402. for mode in cls:
  403. if mode.value == value:
  404. return mode
  405. raise ValueError(f'invalid workflow node execution status value {value}')
  406. class WorkflowNodeExecution(db.Model):
  407. """
  408. Workflow Node Execution
  409. - id (uuid) Execution ID
  410. - tenant_id (uuid) Workspace ID
  411. - app_id (uuid) App ID
  412. - workflow_id (uuid) Workflow ID
  413. - triggered_from (string) Trigger source
  414. `single-step` for single-step debugging
  415. `workflow-run` for workflow execution (debugging / user execution)
  416. - workflow_run_id (uuid) `optional` Workflow run ID
  417. Null for single-step debugging.
  418. - index (int) Execution sequence number, used for displaying Tracing Node order
  419. - predecessor_node_id (string) `optional` Predecessor node ID, used for displaying execution path
  420. - node_id (string) Node ID
  421. - node_type (string) Node type, such as `start`
  422. - title (string) Node title
  423. - inputs (json) All predecessor node variable content used in the node
  424. - process_data (json) Node process data
  425. - outputs (json) `optional` Node output variables
  426. - status (string) Execution status, `running` / `succeeded` / `failed`
  427. - error (string) `optional` Error reason
  428. - elapsed_time (float) `optional` Time consumption (s)
  429. - execution_metadata (text) Metadata
  430. - total_tokens (int) `optional` Total tokens used
  431. - total_price (decimal) `optional` Total cost
  432. - currency (string) `optional` Currency, such as USD / RMB
  433. - created_at (timestamp) Run time
  434. - created_by_role (string) Creator role
  435. - `account` Console account
  436. - `end_user` End user
  437. - created_by (uuid) Runner ID
  438. - finished_at (timestamp) End time
  439. """
  440. __tablename__ = 'workflow_node_executions'
  441. __table_args__ = (
  442. db.PrimaryKeyConstraint('id', name='workflow_node_execution_pkey'),
  443. db.Index('workflow_node_execution_workflow_run_idx', 'tenant_id', 'app_id', 'workflow_id',
  444. 'triggered_from', 'workflow_run_id'),
  445. db.Index('workflow_node_execution_node_run_idx', 'tenant_id', 'app_id', 'workflow_id',
  446. 'triggered_from', 'node_id'),
  447. )
  448. id = db.Column(StringUUID, server_default=db.text('uuid_generate_v4()'))
  449. tenant_id = db.Column(StringUUID, nullable=False)
  450. app_id = db.Column(StringUUID, nullable=False)
  451. workflow_id = db.Column(StringUUID, nullable=False)
  452. triggered_from = db.Column(db.String(255), nullable=False)
  453. workflow_run_id = db.Column(StringUUID)
  454. index = db.Column(db.Integer, nullable=False)
  455. predecessor_node_id = db.Column(db.String(255))
  456. node_id = db.Column(db.String(255), nullable=False)
  457. node_type = db.Column(db.String(255), nullable=False)
  458. title = db.Column(db.String(255), nullable=False)
  459. inputs = db.Column(db.Text)
  460. process_data = db.Column(db.Text)
  461. outputs = db.Column(db.Text)
  462. status = db.Column(db.String(255), nullable=False)
  463. error = db.Column(db.Text)
  464. elapsed_time = db.Column(db.Float, nullable=False, server_default=db.text('0'))
  465. execution_metadata = db.Column(db.Text)
  466. created_at = db.Column(db.DateTime, nullable=False, server_default=db.text('CURRENT_TIMESTAMP(0)'))
  467. created_by_role = db.Column(db.String(255), nullable=False)
  468. created_by = db.Column(StringUUID, nullable=False)
  469. finished_at = db.Column(db.DateTime)
  470. @property
  471. def created_by_account(self):
  472. created_by_role = CreatedByRole.value_of(self.created_by_role)
  473. return db.session.get(Account, self.created_by) \
  474. if created_by_role == CreatedByRole.ACCOUNT else None
  475. @property
  476. def created_by_end_user(self):
  477. from models.model import EndUser
  478. created_by_role = CreatedByRole.value_of(self.created_by_role)
  479. return db.session.get(EndUser, self.created_by) \
  480. if created_by_role == CreatedByRole.END_USER else None
  481. @property
  482. def inputs_dict(self):
  483. return json.loads(self.inputs) if self.inputs else None
  484. @property
  485. def outputs_dict(self):
  486. return json.loads(self.outputs) if self.outputs else None
  487. @property
  488. def process_data_dict(self):
  489. return json.loads(self.process_data) if self.process_data else None
  490. @property
  491. def execution_metadata_dict(self):
  492. return json.loads(self.execution_metadata) if self.execution_metadata else None
  493. @property
  494. def extras(self):
  495. from core.tools.tool_manager import ToolManager
  496. extras = {}
  497. if self.execution_metadata_dict:
  498. from core.workflow.entities.node_entities import NodeType
  499. if self.node_type == NodeType.TOOL.value and 'tool_info' in self.execution_metadata_dict:
  500. tool_info = self.execution_metadata_dict['tool_info']
  501. extras['icon'] = ToolManager.get_tool_icon(
  502. tenant_id=self.tenant_id,
  503. provider_type=tool_info['provider_type'],
  504. provider_id=tool_info['provider_id']
  505. )
  506. return extras
  507. class WorkflowAppLogCreatedFrom(Enum):
  508. """
  509. Workflow App Log Created From Enum
  510. """
  511. SERVICE_API = 'service-api'
  512. WEB_APP = 'web-app'
  513. INSTALLED_APP = 'installed-app'
  514. @classmethod
  515. def value_of(cls, value: str) -> 'WorkflowAppLogCreatedFrom':
  516. """
  517. Get value of given mode.
  518. :param value: mode value
  519. :return: mode
  520. """
  521. for mode in cls:
  522. if mode.value == value:
  523. return mode
  524. raise ValueError(f'invalid workflow app log created from value {value}')
  525. class WorkflowAppLog(db.Model):
  526. """
  527. Workflow App execution log, excluding workflow debugging records.
  528. Attributes:
  529. - id (uuid) run ID
  530. - tenant_id (uuid) Workspace ID
  531. - app_id (uuid) App ID
  532. - workflow_id (uuid) Associated Workflow ID
  533. - workflow_run_id (uuid) Associated Workflow Run ID
  534. - created_from (string) Creation source
  535. `service-api` App Execution OpenAPI
  536. `web-app` WebApp
  537. `installed-app` Installed App
  538. - created_by_role (string) Creator role
  539. - `account` Console account
  540. - `end_user` End user
  541. - created_by (uuid) Creator ID, depends on the user table according to created_by_role
  542. - created_at (timestamp) Creation time
  543. """
  544. __tablename__ = 'workflow_app_logs'
  545. __table_args__ = (
  546. db.PrimaryKeyConstraint('id', name='workflow_app_log_pkey'),
  547. db.Index('workflow_app_log_app_idx', 'tenant_id', 'app_id'),
  548. )
  549. id = db.Column(StringUUID, server_default=db.text('uuid_generate_v4()'))
  550. tenant_id = db.Column(StringUUID, nullable=False)
  551. app_id = db.Column(StringUUID, nullable=False)
  552. workflow_id = db.Column(StringUUID, nullable=False)
  553. workflow_run_id = db.Column(StringUUID, nullable=False)
  554. created_from = db.Column(db.String(255), nullable=False)
  555. created_by_role = db.Column(db.String(255), nullable=False)
  556. created_by = db.Column(StringUUID, nullable=False)
  557. created_at = db.Column(db.DateTime, nullable=False, server_default=db.text('CURRENT_TIMESTAMP(0)'))
  558. @property
  559. def workflow_run(self):
  560. return db.session.get(WorkflowRun, self.workflow_run_id)
  561. @property
  562. def created_by_account(self):
  563. created_by_role = CreatedByRole.value_of(self.created_by_role)
  564. return db.session.get(Account, self.created_by) \
  565. if created_by_role == CreatedByRole.ACCOUNT else None
  566. @property
  567. def created_by_end_user(self):
  568. from models.model import EndUser
  569. created_by_role = CreatedByRole.value_of(self.created_by_role)
  570. return db.session.get(EndUser, self.created_by) \
  571. if created_by_role == CreatedByRole.END_USER else None