workflow.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496
  1. import json
  2. import logging
  3. from flask import abort, request
  4. from flask_restful import Resource, marshal_with, reqparse
  5. from werkzeug.exceptions import Forbidden, InternalServerError, NotFound
  6. import services
  7. from controllers.console import api
  8. from controllers.console.app.error import ConversationCompletedError, DraftWorkflowNotExist, DraftWorkflowNotSync
  9. from controllers.console.app.wraps import get_app_model
  10. from controllers.console.setup import setup_required
  11. from controllers.console.wraps import account_initialization_required
  12. from core.app.apps.base_app_queue_manager import AppQueueManager
  13. from core.app.entities.app_invoke_entities import InvokeFrom
  14. from core.app.segments import factory
  15. from core.errors.error import AppInvokeQuotaExceededError
  16. from fields.workflow_fields import workflow_fields
  17. from fields.workflow_run_fields import workflow_run_node_execution_fields
  18. from libs import helper
  19. from libs.helper import TimestampField, uuid_value
  20. from libs.login import current_user, login_required
  21. from models.model import App, AppMode
  22. from services.app_dsl_service import AppDslService
  23. from services.app_generate_service import AppGenerateService
  24. from services.errors.app import WorkflowHashNotEqualError
  25. from services.workflow_service import WorkflowService
  26. logger = logging.getLogger(__name__)
  27. class DraftWorkflowApi(Resource):
  28. @setup_required
  29. @login_required
  30. @account_initialization_required
  31. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  32. @marshal_with(workflow_fields)
  33. def get(self, app_model: App):
  34. """
  35. Get draft workflow
  36. """
  37. # The role of the current user in the ta table must be admin, owner, or editor
  38. if not current_user.is_editor:
  39. raise Forbidden()
  40. # fetch draft workflow by app_model
  41. workflow_service = WorkflowService()
  42. workflow = workflow_service.get_draft_workflow(app_model=app_model)
  43. if not workflow:
  44. raise DraftWorkflowNotExist()
  45. # return workflow, if not found, return None (initiate graph by frontend)
  46. return workflow
  47. @setup_required
  48. @login_required
  49. @account_initialization_required
  50. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  51. def post(self, app_model: App):
  52. """
  53. Sync draft workflow
  54. """
  55. # The role of the current user in the ta table must be admin, owner, or editor
  56. if not current_user.is_editor:
  57. raise Forbidden()
  58. content_type = request.headers.get('Content-Type', '')
  59. if 'application/json' in content_type:
  60. parser = reqparse.RequestParser()
  61. parser.add_argument('graph', type=dict, required=True, nullable=False, location='json')
  62. parser.add_argument('features', type=dict, required=True, nullable=False, location='json')
  63. parser.add_argument('hash', type=str, required=False, location='json')
  64. # TODO: set this to required=True after frontend is updated
  65. parser.add_argument('environment_variables', type=list, required=False, location='json')
  66. parser.add_argument('conversation_variables', type=list, required=False, location='json')
  67. args = parser.parse_args()
  68. elif 'text/plain' in content_type:
  69. try:
  70. data = json.loads(request.data.decode('utf-8'))
  71. if 'graph' not in data or 'features' not in data:
  72. raise ValueError('graph or features not found in data')
  73. if not isinstance(data.get('graph'), dict) or not isinstance(data.get('features'), dict):
  74. raise ValueError('graph or features is not a dict')
  75. args = {
  76. 'graph': data.get('graph'),
  77. 'features': data.get('features'),
  78. 'hash': data.get('hash'),
  79. 'environment_variables': data.get('environment_variables'),
  80. 'conversation_variables': data.get('conversation_variables'),
  81. }
  82. except json.JSONDecodeError:
  83. return {'message': 'Invalid JSON data'}, 400
  84. else:
  85. abort(415)
  86. workflow_service = WorkflowService()
  87. try:
  88. environment_variables_list = args.get('environment_variables') or []
  89. environment_variables = [factory.build_variable_from_mapping(obj) for obj in environment_variables_list]
  90. conversation_variables_list = args.get('conversation_variables') or []
  91. conversation_variables = [factory.build_variable_from_mapping(obj) for obj in conversation_variables_list]
  92. workflow = workflow_service.sync_draft_workflow(
  93. app_model=app_model,
  94. graph=args['graph'],
  95. features=args['features'],
  96. unique_hash=args.get('hash'),
  97. account=current_user,
  98. environment_variables=environment_variables,
  99. conversation_variables=conversation_variables,
  100. )
  101. except WorkflowHashNotEqualError:
  102. raise DraftWorkflowNotSync()
  103. return {
  104. "result": "success",
  105. "hash": workflow.unique_hash,
  106. "updated_at": TimestampField().format(workflow.updated_at or workflow.created_at)
  107. }
  108. class DraftWorkflowImportApi(Resource):
  109. @setup_required
  110. @login_required
  111. @account_initialization_required
  112. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  113. @marshal_with(workflow_fields)
  114. def post(self, app_model: App):
  115. """
  116. Import draft workflow
  117. """
  118. # The role of the current user in the ta table must be admin, owner, or editor
  119. if not current_user.is_editor:
  120. raise Forbidden()
  121. parser = reqparse.RequestParser()
  122. parser.add_argument('data', type=str, required=True, nullable=False, location='json')
  123. args = parser.parse_args()
  124. workflow = AppDslService.import_and_overwrite_workflow(
  125. app_model=app_model,
  126. data=args['data'],
  127. account=current_user
  128. )
  129. return workflow
  130. class AdvancedChatDraftWorkflowRunApi(Resource):
  131. @setup_required
  132. @login_required
  133. @account_initialization_required
  134. @get_app_model(mode=[AppMode.ADVANCED_CHAT])
  135. def post(self, app_model: App):
  136. """
  137. Run draft workflow
  138. """
  139. # The role of the current user in the ta table must be admin, owner, or editor
  140. if not current_user.is_editor:
  141. raise Forbidden()
  142. parser = reqparse.RequestParser()
  143. parser.add_argument('inputs', type=dict, location='json')
  144. parser.add_argument('query', type=str, required=True, location='json', default='')
  145. parser.add_argument('files', type=list, location='json')
  146. parser.add_argument('conversation_id', type=uuid_value, location='json')
  147. args = parser.parse_args()
  148. try:
  149. response = AppGenerateService.generate(
  150. app_model=app_model,
  151. user=current_user,
  152. args=args,
  153. invoke_from=InvokeFrom.DEBUGGER,
  154. streaming=True
  155. )
  156. return helper.compact_generate_response(response)
  157. except services.errors.conversation.ConversationNotExistsError:
  158. raise NotFound("Conversation Not Exists.")
  159. except services.errors.conversation.ConversationCompletedError:
  160. raise ConversationCompletedError()
  161. except ValueError as e:
  162. raise e
  163. except Exception as e:
  164. logging.exception("internal server error.")
  165. raise InternalServerError()
  166. class AdvancedChatDraftRunIterationNodeApi(Resource):
  167. @setup_required
  168. @login_required
  169. @account_initialization_required
  170. @get_app_model(mode=[AppMode.ADVANCED_CHAT])
  171. def post(self, app_model: App, node_id: str):
  172. """
  173. Run draft workflow iteration node
  174. """
  175. # The role of the current user in the ta table must be admin, owner, or editor
  176. if not current_user.is_editor:
  177. raise Forbidden()
  178. parser = reqparse.RequestParser()
  179. parser.add_argument('inputs', type=dict, location='json')
  180. args = parser.parse_args()
  181. try:
  182. response = AppGenerateService.generate_single_iteration(
  183. app_model=app_model,
  184. user=current_user,
  185. node_id=node_id,
  186. args=args,
  187. streaming=True
  188. )
  189. return helper.compact_generate_response(response)
  190. except services.errors.conversation.ConversationNotExistsError:
  191. raise NotFound("Conversation Not Exists.")
  192. except services.errors.conversation.ConversationCompletedError:
  193. raise ConversationCompletedError()
  194. except ValueError as e:
  195. raise e
  196. except Exception as e:
  197. logging.exception("internal server error.")
  198. raise InternalServerError()
  199. class WorkflowDraftRunIterationNodeApi(Resource):
  200. @setup_required
  201. @login_required
  202. @account_initialization_required
  203. @get_app_model(mode=[AppMode.WORKFLOW])
  204. def post(self, app_model: App, node_id: str):
  205. """
  206. Run draft workflow iteration node
  207. """
  208. # The role of the current user in the ta table must be admin, owner, or editor
  209. if not current_user.is_editor:
  210. raise Forbidden()
  211. parser = reqparse.RequestParser()
  212. parser.add_argument('inputs', type=dict, location='json')
  213. args = parser.parse_args()
  214. try:
  215. response = AppGenerateService.generate_single_iteration(
  216. app_model=app_model,
  217. user=current_user,
  218. node_id=node_id,
  219. args=args,
  220. streaming=True
  221. )
  222. return helper.compact_generate_response(response)
  223. except services.errors.conversation.ConversationNotExistsError:
  224. raise NotFound("Conversation Not Exists.")
  225. except services.errors.conversation.ConversationCompletedError:
  226. raise ConversationCompletedError()
  227. except ValueError as e:
  228. raise e
  229. except Exception as e:
  230. logging.exception("internal server error.")
  231. raise InternalServerError()
  232. class DraftWorkflowRunApi(Resource):
  233. @setup_required
  234. @login_required
  235. @account_initialization_required
  236. @get_app_model(mode=[AppMode.WORKFLOW])
  237. def post(self, app_model: App):
  238. """
  239. Run draft workflow
  240. """
  241. # The role of the current user in the ta table must be admin, owner, or editor
  242. if not current_user.is_editor:
  243. raise Forbidden()
  244. parser = reqparse.RequestParser()
  245. parser.add_argument('inputs', type=dict, required=True, nullable=False, location='json')
  246. parser.add_argument('files', type=list, required=False, location='json')
  247. args = parser.parse_args()
  248. try:
  249. response = AppGenerateService.generate(
  250. app_model=app_model,
  251. user=current_user,
  252. args=args,
  253. invoke_from=InvokeFrom.DEBUGGER,
  254. streaming=True
  255. )
  256. return helper.compact_generate_response(response)
  257. except (ValueError, AppInvokeQuotaExceededError) as e:
  258. raise e
  259. except Exception as e:
  260. logging.exception("internal server error.")
  261. raise InternalServerError()
  262. class WorkflowTaskStopApi(Resource):
  263. @setup_required
  264. @login_required
  265. @account_initialization_required
  266. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  267. def post(self, app_model: App, task_id: str):
  268. """
  269. Stop workflow task
  270. """
  271. # The role of the current user in the ta table must be admin, owner, or editor
  272. if not current_user.is_editor:
  273. raise Forbidden()
  274. AppQueueManager.set_stop_flag(task_id, InvokeFrom.DEBUGGER, current_user.id)
  275. return {
  276. "result": "success"
  277. }
  278. class DraftWorkflowNodeRunApi(Resource):
  279. @setup_required
  280. @login_required
  281. @account_initialization_required
  282. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  283. @marshal_with(workflow_run_node_execution_fields)
  284. def post(self, app_model: App, node_id: str):
  285. """
  286. Run draft workflow node
  287. """
  288. # The role of the current user in the ta table must be admin, owner, or editor
  289. if not current_user.is_editor:
  290. raise Forbidden()
  291. parser = reqparse.RequestParser()
  292. parser.add_argument('inputs', type=dict, required=True, nullable=False, location='json')
  293. args = parser.parse_args()
  294. workflow_service = WorkflowService()
  295. workflow_node_execution = workflow_service.run_draft_workflow_node(
  296. app_model=app_model,
  297. node_id=node_id,
  298. user_inputs=args.get('inputs'),
  299. account=current_user
  300. )
  301. return workflow_node_execution
  302. class PublishedWorkflowApi(Resource):
  303. @setup_required
  304. @login_required
  305. @account_initialization_required
  306. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  307. @marshal_with(workflow_fields)
  308. def get(self, app_model: App):
  309. """
  310. Get published workflow
  311. """
  312. # The role of the current user in the ta table must be admin, owner, or editor
  313. if not current_user.is_editor:
  314. raise Forbidden()
  315. # fetch published workflow by app_model
  316. workflow_service = WorkflowService()
  317. workflow = workflow_service.get_published_workflow(app_model=app_model)
  318. # return workflow, if not found, return None
  319. return workflow
  320. @setup_required
  321. @login_required
  322. @account_initialization_required
  323. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  324. def post(self, app_model: App):
  325. """
  326. Publish workflow
  327. """
  328. # The role of the current user in the ta table must be admin, owner, or editor
  329. if not current_user.is_editor:
  330. raise Forbidden()
  331. workflow_service = WorkflowService()
  332. workflow = workflow_service.publish_workflow(app_model=app_model, account=current_user)
  333. return {
  334. "result": "success",
  335. "created_at": TimestampField().format(workflow.created_at)
  336. }
  337. class DefaultBlockConfigsApi(Resource):
  338. @setup_required
  339. @login_required
  340. @account_initialization_required
  341. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  342. def get(self, app_model: App):
  343. """
  344. Get default block config
  345. """
  346. # The role of the current user in the ta table must be admin, owner, or editor
  347. if not current_user.is_editor:
  348. raise Forbidden()
  349. # Get default block configs
  350. workflow_service = WorkflowService()
  351. return workflow_service.get_default_block_configs()
  352. class DefaultBlockConfigApi(Resource):
  353. @setup_required
  354. @login_required
  355. @account_initialization_required
  356. @get_app_model(mode=[AppMode.ADVANCED_CHAT, AppMode.WORKFLOW])
  357. def get(self, app_model: App, block_type: str):
  358. """
  359. Get default block config
  360. """
  361. # The role of the current user in the ta table must be admin, owner, or editor
  362. if not current_user.is_editor:
  363. raise Forbidden()
  364. parser = reqparse.RequestParser()
  365. parser.add_argument('q', type=str, location='args')
  366. args = parser.parse_args()
  367. filters = None
  368. if args.get('q'):
  369. try:
  370. filters = json.loads(args.get('q'))
  371. except json.JSONDecodeError:
  372. raise ValueError('Invalid filters')
  373. # Get default block configs
  374. workflow_service = WorkflowService()
  375. return workflow_service.get_default_block_config(
  376. node_type=block_type,
  377. filters=filters
  378. )
  379. class ConvertToWorkflowApi(Resource):
  380. @setup_required
  381. @login_required
  382. @account_initialization_required
  383. @get_app_model(mode=[AppMode.CHAT, AppMode.COMPLETION])
  384. def post(self, app_model: App):
  385. """
  386. Convert basic mode of chatbot app to workflow mode
  387. Convert expert mode of chatbot app to workflow mode
  388. Convert Completion App to Workflow App
  389. """
  390. # The role of the current user in the ta table must be admin, owner, or editor
  391. if not current_user.is_editor:
  392. raise Forbidden()
  393. if request.data:
  394. parser = reqparse.RequestParser()
  395. parser.add_argument('name', type=str, required=False, nullable=True, location='json')
  396. parser.add_argument('icon_type', type=str, required=False, nullable=True, location='json')
  397. parser.add_argument('icon', type=str, required=False, nullable=True, location='json')
  398. parser.add_argument('icon_background', type=str, required=False, nullable=True, location='json')
  399. args = parser.parse_args()
  400. else:
  401. args = {}
  402. # convert to workflow mode
  403. workflow_service = WorkflowService()
  404. new_app_model = workflow_service.convert_to_workflow(
  405. app_model=app_model,
  406. account=current_user,
  407. args=args
  408. )
  409. # return app id
  410. return {
  411. 'new_app_id': new_app_model.id,
  412. }
  413. api.add_resource(DraftWorkflowApi, '/apps/<uuid:app_id>/workflows/draft')
  414. api.add_resource(DraftWorkflowImportApi, '/apps/<uuid:app_id>/workflows/draft/import')
  415. api.add_resource(AdvancedChatDraftWorkflowRunApi, '/apps/<uuid:app_id>/advanced-chat/workflows/draft/run')
  416. api.add_resource(DraftWorkflowRunApi, '/apps/<uuid:app_id>/workflows/draft/run')
  417. api.add_resource(WorkflowTaskStopApi, '/apps/<uuid:app_id>/workflow-runs/tasks/<string:task_id>/stop')
  418. api.add_resource(DraftWorkflowNodeRunApi, '/apps/<uuid:app_id>/workflows/draft/nodes/<string:node_id>/run')
  419. api.add_resource(AdvancedChatDraftRunIterationNodeApi, '/apps/<uuid:app_id>/advanced-chat/workflows/draft/iteration/nodes/<string:node_id>/run')
  420. api.add_resource(WorkflowDraftRunIterationNodeApi, '/apps/<uuid:app_id>/workflows/draft/iteration/nodes/<string:node_id>/run')
  421. api.add_resource(PublishedWorkflowApi, '/apps/<uuid:app_id>/workflows/publish')
  422. api.add_resource(DefaultBlockConfigsApi, '/apps/<uuid:app_id>/workflows/default-workflow-block-configs')
  423. api.add_resource(DefaultBlockConfigApi, '/apps/<uuid:app_id>/workflows/default-workflow-block-configs'
  424. '/<string:block_type>')
  425. api.add_resource(ConvertToWorkflowApi, '/apps/<uuid:app_id>/convert-to-workflow')