external_application_service.py 3.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. import uuid
  2. from datetime import datetime
  3. from typing import Optional
  4. from flask_login import current_user
  5. from werkzeug.exceptions import NotFound
  6. from models import db
  7. from models.external_application import ExternalApplication
  8. from services.errors.external_application import ExternalApplicationNameDuplicateError
  9. class ExternalApplicationService:
  10. @staticmethod
  11. def get_external_applications(page, per_page, search=None, type=None, url=None, method=None):
  12. query = ExternalApplication.query.order_by(ExternalApplication.created_at.desc())
  13. if search:
  14. query = ExternalApplication.query.filter(ExternalApplication.name.ilike(f"%{search}%"))
  15. if type:
  16. query = ExternalApplication.query.filter(ExternalApplication.type == type)
  17. if url:
  18. query = ExternalApplication.query.filter(ExternalApplication.url == url)
  19. if method:
  20. query = ExternalApplication.query.filter(ExternalApplication.method == method)
  21. external_applications = query.paginate(page=page, per_page=per_page, max_per_page=100, error_out=False)
  22. return external_applications.items, external_applications.total
  23. @staticmethod
  24. def get_external_application(external_application_id: str) -> Optional[ExternalApplication]:
  25. external_application = ExternalApplication.query.filter(
  26. ExternalApplication.id == external_application_id
  27. ).first()
  28. return external_application
  29. @staticmethod
  30. def get_external_application_by_name(name: str) -> Optional[ExternalApplication]:
  31. external_application = ExternalApplication.query.filter_by(name=name).first()
  32. return external_application
  33. @staticmethod
  34. def save_external_application(args: dict) -> ExternalApplication:
  35. name = args["name"]
  36. external_application = ExternalApplicationService.get_external_application_by_name(name)
  37. if external_application:
  38. raise ExternalApplicationNameDuplicateError(f"ExternalApplication with name {name} already exists.")
  39. external_application = ExternalApplication(
  40. id=str(uuid.uuid4()),
  41. name=name,
  42. type=args["type"],
  43. url=args["url"],
  44. method=args["method"],
  45. status=args["status"],
  46. created_by=current_user.id,
  47. updated_by=current_user.id,
  48. )
  49. db.session.add(external_application)
  50. db.session.commit()
  51. return external_application
  52. @staticmethod
  53. def update_external_application(args: dict, external_application_id: str) -> ExternalApplication:
  54. external_application = ExternalApplicationService.get_external_application(external_application_id)
  55. if not external_application:
  56. raise NotFound("ExternalApplication not found")
  57. external_application.name = args["name"]
  58. external_application.type = args["type"]
  59. external_application.url = args["url"]
  60. external_application.method = args["method"]
  61. external_application.status = args["status"]
  62. external_application.updated_by = current_user.id
  63. external_application.updated_at = datetime.now()
  64. db.session.add(external_application)
  65. db.session.commit()
  66. return external_application
  67. @staticmethod
  68. def delete_external_application(external_application_id):
  69. external_application = ExternalApplicationService.get_external_application(external_application_id)
  70. if not external_application:
  71. raise NotFound("ExternalApplication not found")
  72. db.session.delete(external_application)
  73. db.session.commit()
  74. return external_application