workflow.py 17 KB


  1. import json
  2. import logging
  3. from flask import abort, request
  4. from flask_restful import Resource, marshal_with, reqparse
  5. from werkzeug.exceptions import Forbidden, InternalServerError, NotFound
  6. import services
  7. from controllers.console import api
  8. from controllers.console.app.error import ConversationCompletedError, DraftWorkflowNotExist, DraftWorkflowNotSync
  9. from controllers.console.app.wraps import get_app_model
  10. from controllers.console.setup import setup_required
  11. from controllers.console.wraps import account_initialization_required
  12. from core.app.apps.base_app_queue_manager import AppQueueManager
  13. from core.app.entities.app_invoke_entities import InvokeFrom
  14. from core.errors.error import AppInvokeQuotaExceededError
  15. from fields.workflow_fields import workflow_fields
  16. from fields.workflow_run_fields import workflow_run_node_execution_fields
  17. from libs import helper
  18. from libs.helper import TimestampField, uuid_value
  19. from libs.login import current_user, login_required
  20. from models.model import App, AppMode
  21. from services.app_dsl_service import AppDslService
  22. from services.app_generate_service import AppGenerateService
  23. from services.errors.app import WorkflowHashNotEqualError
  24. from services.workflow_service import WorkflowService
  25. logger = logging.getLogger(__name__)
  26. class DraftWorkflowApi(Resource):
  27. @setup_required
  28. @login_required
  29. @account_initialization_required
  30. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  31. @marshal_with(workflow_fields)
  32. def get(self, app_model: App):
  33. """
  34. Get draft workflow
  35. """
  36. # The role of the current user in the ta table must be admin, owner, or editor
  37. if not current_user.is_editor:
  38. raise Forbidden()
  39. # fetch draft workflow by app_model
  40. workflow_service = WorkflowService()
  41. workflow = workflow_service.get_draft_workflow(app_model=app_model)
  42. if not workflow:
  43. raise DraftWorkflowNotExist()
  44. # return workflow, if not found, return None (initiate graph by frontend)
  45. return workflow
  46. @setup_required
  47. @login_required
  48. @account_initialization_required
  49. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  50. def post(self, app_model: App):
  51. """
  52. Sync draft workflow
  53. """
  54. # The role of the current user in the ta table must be admin, owner, or editor
  55. if not current_user.is_editor:
  56. raise Forbidden()
  57. content_type = request.headers.get('Content-Type')
  58. if 'application/json' in content_type:
  59. parser = reqparse.RequestParser()
  60. parser.add_argument('graph', type=dict, required=True, nullable=False, location='json')
  61. parser.add_argument('features', type=dict, required=True, nullable=False, location='json')
  62. parser.add_argument('hash', type=str, required=False, location='json')
  63. args = parser.parse_args()
  64. elif 'text/plain' in content_type:
  65. try:
  66. data = json.loads(request.data.decode('utf-8'))
  67. if 'graph' not in data or 'features' not in data:
  68. raise ValueError('graph or features not found in data')
  69. if not isinstance(data.get('graph'), dict) or not isinstance(data.get('features'), dict):
  70. raise ValueError('graph or features is not a dict')
  71. args = {
  72. 'graph': data.get('graph'),
  73. 'features': data.get('features'),
  74. 'hash': data.get('hash')
  75. }
  76. except json.JSONDecodeError:
  77. return {'message': 'Invalid JSON data'}, 400
  78. else:
  79. abort(415)
  80. workflow_service = WorkflowService()
  81. try:
  82. workflow = workflow_service.sync_draft_workflow(
  83. app_model=app_model,
  84. graph=args.get('graph'),
  85. features=args.get('features'),
  86. unique_hash=args.get('hash'),
  87. account=current_user
  88. )
  89. except WorkflowHashNotEqualError:
  90. raise DraftWorkflowNotSync()
  91. return {
  92. "result": "success",
  93. "hash": workflow.unique_hash,
  94. "updated_at": TimestampField().format(workflow.updated_at or workflow.created_at)
  95. }
  96. class DraftWorkflowImportApi(Resource):
  97. @setup_required
  98. @login_required
  99. @account_initialization_required
  100. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  101. @marshal_with(workflow_fields)
  102. def post(self, app_model: App):
  103. """
  104. Import draft workflow
  105. """
  106. # The role of the current user in the ta table must be admin, owner, or editor
  107. if not current_user.is_editor:
  108. raise Forbidden()
  109. parser = reqparse.RequestParser()
  110. parser.add_argument('data', type=str, required=True, nullable=False, location='json')
  111. args = parser.parse_args()
  112. workflow = AppDslService.import_and_overwrite_workflow(
  113. app_model=app_model,
  114. data=args['data'],
  115. account=current_user
  116. )
  117. return workflow
  118. class AdvancedChatDraftWorkflowRunApi(Resource):
  119. @setup_required
  120. @login_required
  121. @account_initialization_required
  122. @get_app_model(mode=[AppMode.ADVANCED_CHAT])
  123. def post(self, app_model: App):
  124. """
  125. Run draft workflow
  126. """
  127. # The role of the current user in the ta table must be admin, owner, or editor
  128. if not current_user.is_editor:
  129. raise Forbidden()
  130. parser = reqparse.RequestParser()
  131. parser.add_argument('inputs', type=dict, location='json')
  132. parser.add_argument('query', type=str, required=True, location='json', default='')
  133. parser.add_argument('files', type=list, location='json')
  134. parser.add_argument('conversation_id', type=uuid_value, location='json')
  135. args = parser.parse_args()
  136. try:
  137. response = AppGenerateService.generate(
  138. app_model=app_model,
  139. user=current_user,
  140. args=args,
  141. invoke_from=InvokeFrom.DEBUGGER,
  142. streaming=True
  143. )
  144. return helper.compact_generate_response(response)
  145. except services.errors.conversation.ConversationNotExistsError:
  146. raise NotFound("Conversation Not Exists.")
  147. except services.errors.conversation.ConversationCompletedError:
  148. raise ConversationCompletedError()
  149. except ValueError as e:
  150. raise e
  151. except Exception as e:
  152. logging.exception("internal server error.")
  153. raise InternalServerError()
  154. class AdvancedChatDraftRunIterationNodeApi(Resource):
  155. @setup_required
  156. @login_required
  157. @account_initialization_required
  158. @get_app_model(mode=[AppMode.ADVANCED_CHAT])
  159. def post(self, app_model: App, node_id: str):
  160. """
  161. Run draft workflow iteration node
  162. """
  163. # The role of the current user in the ta table must be admin, owner, or editor
  164. if not current_user.is_editor:
  165. raise Forbidden()
  166. parser = reqparse.RequestParser()
  167. parser.add_argument('inputs', type=dict, location='json')
  168. args = parser.parse_args()
  169. try:
  170. response = AppGenerateService.generate_single_iteration(
  171. app_model=app_model,
  172. user=current_user,
  173. node_id=node_id,
  174. args=args,
  175. streaming=True
  176. )
  177. return helper.compact_generate_response(response)
  178. except services.errors.conversation.ConversationNotExistsError:
  179. raise NotFound("Conversation Not Exists.")
  180. except services.errors.conversation.ConversationCompletedError:
  181. raise ConversationCompletedError()
  182. except ValueError as e:
  183. raise e
  184. except Exception as e:
  185. logging.exception("internal server error.")
  186. raise InternalServerError()
  187. class WorkflowDraftRunIterationNodeApi(Resource):
  188. @setup_required
  189. @login_required
  190. @account_initialization_required
  191. @get_app_model(mode=[AppMode.WORKFLOW])
  192. def post(self, app_model: App, node_id: str):
  193. """
  194. Run draft workflow iteration node
  195. """
  196. # The role of the current user in the ta table must be admin, owner, or editor
  197. if not current_user.is_editor:
  198. raise Forbidden()
  199. parser = reqparse.RequestParser()
  200. parser.add_argument('inputs', type=dict, location='json')
  201. args = parser.parse_args()
  202. try:
  203. response = AppGenerateService.generate_single_iteration(
  204. app_model=app_model,
  205. user=current_user,
  206. node_id=node_id,
  207. args=args,
  208. streaming=True
  209. )
  210. return helper.compact_generate_response(response)
  211. except services.errors.conversation.ConversationNotExistsError:
  212. raise NotFound("Conversation Not Exists.")
  213. except services.errors.conversation.ConversationCompletedError:
  214. raise ConversationCompletedError()
  215. except ValueError as e:
  216. raise e
  217. except Exception as e:
  218. logging.exception("internal server error.")
  219. raise InternalServerError()
  220. class DraftWorkflowRunApi(Resource):
  221. @setup_required
  222. @login_required
  223. @account_initialization_required
  224. @get_app_model(mode=[AppMode.WORKFLOW])
  225. def post(self, app_model: App):
  226. """
  227. Run draft workflow
  228. """
  229. # The role of the current user in the ta table must be admin, owner, or editor
  230. if not current_user.is_editor:
  231. raise Forbidden()
  232. parser = reqparse.RequestParser()
  233. parser.add_argument('inputs', type=dict, required=True, nullable=False, location='json')
  234. parser.add_argument('files', type=list, required=False, location='json')
  235. args = parser.parse_args()
  236. try:
  237. response = AppGenerateService.generate(
  238. app_model=app_model,
  239. user=current_user,
  240. args=args,
  241. invoke_from=InvokeFrom.DEBUGGER,
  242. streaming=True
  243. )
  244. return helper.compact_generate_response(response)
  245. except (ValueError, AppInvokeQuotaExceededError) as e:
  246. raise e
  247. except Exception as e:
  248. logging.exception("internal server error.")
  249. raise InternalServerError()
  250. class WorkflowTaskStopApi(Resource):
  251. @setup_required
  252. @login_required
  253. @account_initialization_required
  254. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  255. def post(self, app_model: App, task_id: str):
  256. """
  257. Stop workflow task
  258. """
  259. # The role of the current user in the ta table must be admin, owner, or editor
  260. if not current_user.is_editor:
  261. raise Forbidden()
  262. AppQueueManager.set_stop_flag(task_id, InvokeFrom.DEBUGGER, current_user.id)
  263. return {
  264. "result": "success"
  265. }
  266. class DraftWorkflowNodeRunApi(Resource):
  267. @setup_required
  268. @login_required
  269. @account_initialization_required
  270. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  271. @marshal_with(workflow_run_node_execution_fields)
  272. def post(self, app_model: App, node_id: str):
  273. """
  274. Run draft workflow node
  275. """
  276. # The role of the current user in the ta table must be admin, owner, or editor
  277. if not current_user.is_editor:
  278. raise Forbidden()
  279. parser = reqparse.RequestParser()
  280. parser.add_argument('inputs', type=dict, required=True, nullable=False, location='json')
  281. args = parser.parse_args()
  282. workflow_service = WorkflowService()
  283. workflow_node_execution = workflow_service.run_draft_workflow_node(
  284. app_model=app_model,
  285. node_id=node_id,
  286. user_inputs=args.get('inputs'),
  287. account=current_user
  288. )
  289. return workflow_node_execution
  290. class PublishedWorkflowApi(Resource):
  291. @setup_required
  292. @login_required
  293. @account_initialization_required
  294. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  295. @marshal_with(workflow_fields)
  296. def get(self, app_model: App):
  297. """
  298. Get published workflow
  299. """
  300. # The role of the current user in the ta table must be admin, owner, or editor
  301. if not current_user.is_editor:
  302. raise Forbidden()
  303. # fetch published workflow by app_model
  304. workflow_service = WorkflowService()
  305. workflow = workflow_service.get_published_workflow(app_model=app_model)
  306. # return workflow, if not found, return None
  307. return workflow
  308. @setup_required
  309. @login_required
  310. @account_initialization_required
  311. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  312. def post(self, app_model: App):
  313. """
  314. Publish workflow
  315. """
  316. # The role of the current user in the ta table must be admin, owner, or editor
  317. if not current_user.is_editor:
  318. raise Forbidden()
  319. workflow_service = WorkflowService()
  320. workflow = workflow_service.publish_workflow(app_model=app_model, account=current_user)
  321. return {
  322. "result": "success",
  323. "created_at": TimestampField().format(workflow.created_at)
  324. }
  325. class DefaultBlockConfigsApi(Resource):
  326. @setup_required
  327. @login_required
  328. @account_initialization_required
  329. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  330. def get(self, app_model: App):
  331. """
  332. Get default block config
  333. """
  334. # The role of the current user in the ta table must be admin, owner, or editor
  335. if not current_user.is_editor:
  336. raise Forbidden()
  337. # Get default block configs
  338. workflow_service = WorkflowService()
  339. return workflow_service.get_default_block_configs()
  340. class DefaultBlockConfigApi(Resource):
  341. @setup_required
  342. @login_required
  343. @account_initialization_required
  344. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  345. def get(self, app_model: App, block_type: str):
  346. """
  347. Get default block config
  348. """
  349. # The role of the current user in the ta table must be admin, owner, or editor
  350. if not current_user.is_editor:
  351. raise Forbidden()
  352. parser = reqparse.RequestParser()
  353. parser.add_argument('q', type=str, location='args')
  354. args = parser.parse_args()
  355. filters = None
  356. if args.get('q'):
  357. try:
  358. filters = json.loads(args.get('q'))
  359. except json.JSONDecodeError:
  360. raise ValueError('Invalid filters')
  361. # Get default block configs
  362. workflow_service = WorkflowService()
  363. return workflow_service.get_default_block_config(
  364. node_type=block_type,
  365. filters=filters
  366. )
  367. class ConvertToWorkflowApi(Resource):
  368. @setup_required
  369. @login_required
  370. @account_initialization_required
  371. @get_app_model(mode=[AppMode.CHAT, AppMode.COMPLETION])
  372. def post(self, app_model: App):
  373. """
  374. Convert basic mode of chatbot app to workflow mode
  375. Convert expert mode of chatbot app to workflow mode
  376. Convert Completion App to Workflow App
  377. """
  378. # The role of the current user in the ta table must be admin, owner, or editor
  379. if not current_user.is_editor:
  380. raise Forbidden()
  381. if request.data:
  382. parser = reqparse.RequestParser()
  383. parser.add_argument('name', type=str, required=False, nullable=True, location='json')
  384. parser.add_argument('icon', type=str, required=False, nullable=True, location='json')
  385. parser.add_argument('icon_background', type=str, required=False, nullable=True, location='json')
  386. args = parser.parse_args()
  387. else:
  388. args = {}
  389. # convert to workflow mode
  390. workflow_service = WorkflowService()
  391. new_app_model = workflow_service.convert_to_workflow(
  392. app_model=app_model,
  393. account=current_user,
  394. args=args
  395. )
  396. # return app id
  397. return {
  398. 'new_app_id': new_app_model.id,
  399. }
  400. api.add_resource(DraftWorkflowApi, '/apps/<uuid:app_id>/workflows/draft')
  401. api.add_resource(DraftWorkflowImportApi, '/apps/<uuid:app_id>/workflows/draft/import')
  402. api.add_resource(AdvancedChatDraftWorkflowRunApi, '/apps/<uuid:app_id>/advanced-chat/workflows/draft/run')
  403. api.add_resource(DraftWorkflowRunApi, '/apps/<uuid:app_id>/workflows/draft/run')
  404. api.add_resource(WorkflowTaskStopApi, '/apps/<uuid:app_id>/workflow-runs/tasks/<string:task_id>/stop')
  405. api.add_resource(DraftWorkflowNodeRunApi, '/apps/<uuid:app_id>/workflows/draft/nodes/<string:node_id>/run')
  406. api.add_resource(AdvancedChatDraftRunIterationNodeApi, '/apps/<uuid:app_id>/advanced-chat/workflows/draft/iteration/nodes/<string:node_id>/run')
  407. api.add_resource(WorkflowDraftRunIterationNodeApi, '/apps/<uuid:app_id>/workflows/draft/iteration/nodes/<string:node_id>/run')
  408. api.add_resource(PublishedWorkflowApi, '/apps/<uuid:app_id>/workflows/publish')
  409. api.add_resource(DefaultBlockConfigsApi, '/apps/<uuid:app_id>/workflows/default-workflow-block-configs')
  410. api.add_resource(DefaultBlockConfigApi, '/apps/<uuid:app_id>/workflows/default-workflow-block-configs'
  411. '/<string:block_type>')
  412. api.add_resource(ConvertToWorkflowApi, '/apps/<uuid:app_id>/convert-to-workflow')