workflow_app_service.py 2.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182
  1. import uuid
  2. from flask_sqlalchemy.pagination import Pagination
  3. from sqlalchemy import and_, or_
  4. from extensions.ext_database import db
  5. from models import CreatedByRole
  6. from models.model import App, EndUser
  7. from models.workflow import WorkflowAppLog, WorkflowRun, WorkflowRunStatus
  8. class WorkflowAppService:
  9. def get_paginate_workflow_app_logs(self, app_model: App, args: dict) -> Pagination:
  10. """
  11. Get paginate workflow app logs
  12. :param app: app model
  13. :param args: request args
  14. :return:
  15. """
  16. query = (
  17. db.select(WorkflowAppLog)
  18. .where(
  19. WorkflowAppLog.tenant_id == app_model.tenant_id,
  20. WorkflowAppLog.app_id == app_model.id
  21. )
  22. )
  23. status = WorkflowRunStatus.value_of(args.get('status')) if args.get('status') else None
  24. keyword = args['keyword']
  25. if keyword or status:
  26. query = query.join(
  27. WorkflowRun, WorkflowRun.id == WorkflowAppLog.workflow_run_id
  28. )
  29. if keyword:
  30. keyword_like_val = f"%{args['keyword'][:30]}%"
  31. keyword_conditions = [
  32. WorkflowRun.inputs.ilike(keyword_like_val),
  33. WorkflowRun.outputs.ilike(keyword_like_val),
  34. # filter keyword by end user session id if created by end user role
  35. and_(WorkflowRun.created_by_role == 'end_user', EndUser.session_id.ilike(keyword_like_val))
  36. ]
  37. # filter keyword by workflow run id
  38. keyword_uuid = self._safe_parse_uuid(keyword)
  39. if keyword_uuid:
  40. keyword_conditions.append(WorkflowRun.id == keyword_uuid)
  41. query = query.outerjoin(
  42. EndUser,
  43. and_(WorkflowRun.created_by == EndUser.id, WorkflowRun.created_by_role == CreatedByRole.END_USER.value)
  44. ).filter(or_(*keyword_conditions))
  45. if status:
  46. # join with workflow_run and filter by status
  47. query = query.filter(
  48. WorkflowRun.status == status.value
  49. )
  50. query = query.order_by(WorkflowAppLog.created_at.desc())
  51. pagination = db.paginate(
  52. query,
  53. page=args['page'],
  54. per_page=args['limit'],
  55. error_out=False
  56. )
  57. return pagination
  58. @staticmethod
  59. def _safe_parse_uuid(value: str):
  60. # fast check
  61. if len(value) < 32:
  62. return None
  63. try:
  64. return uuid.UUID(value)
  65. except ValueError:
  66. return None