external_application_service.py 3.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. import uuid
  2. from datetime import datetime
  3. from typing import Optional
  4. from flask_login import current_user
  5. from werkzeug.exceptions import NotFound
  6. from models import db
  7. from models.external_application import ExternalApplication
  8. from services.errors.external_application import ExternalApplicationNameDuplicateError
  9. class ExternalApplicationService:
  10. @staticmethod
  11. def get_external_applications(page, per_page, search=None, type=None, url=None, method=None):
  12. query = ExternalApplication.query.order_by(ExternalApplication.created_at.desc())
  13. if search:
  14. query = ExternalApplication.query.filter(ExternalApplication.name.ilike(f"%{search}%"))
  15. if type:
  16. query = ExternalApplication.query.filter(ExternalApplication.type == type)
  17. if url:
  18. query = ExternalApplication.query.filter(ExternalApplication.url == url)
  19. if method:
  20. query = ExternalApplication.query.filter(ExternalApplication.method == method)
  21. external_applications = query.paginate(page=page, per_page=per_page, max_per_page=100, error_out=False)
  22. return external_applications.items, external_applications.total
  23. @staticmethod
  24. def get_external_application(external_application_id: str) -> Optional[ExternalApplication]:
  25. external_application = (ExternalApplication.query
  26. .filter(ExternalApplication.id == external_application_id).first())
  27. return external_application
  28. @staticmethod
  29. def get_external_application_by_name(name: str) -> Optional[ExternalApplication]:
  30. external_application = ExternalApplication.query.filter_by(name=name).first()
  31. return external_application
  32. @staticmethod
  33. def save_external_application(args: dict) -> ExternalApplication:
  34. name = args["name"]
  35. external_application = ExternalApplicationService.get_external_application_by_name(name)
  36. if external_application:
  37. raise ExternalApplicationNameDuplicateError(f"ExternalApplication with name {name} already exists.")
  38. external_application = ExternalApplication(
  39. id=str(uuid.uuid4()),
  40. name=name,
  41. type=args["type"],
  42. url=args["url"],
  43. method=args["method"],
  44. status=args["status"],
  45. created_by=current_user.id,
  46. updated_by=current_user.id,
  47. )
  48. db.session.add(external_application)
  49. db.session.commit()
  50. return external_application
  51. @staticmethod
  52. def update_external_application(args: dict, external_application_id: str) -> ExternalApplication:
  53. external_application = ExternalApplicationService.get_external_application(external_application_id)
  54. if not external_application:
  55. raise NotFound("ExternalApplication not found")
  56. external_application.name = args["name"]
  57. external_application.type = args["type"]
  58. external_application.url = args["url"]
  59. external_application.method = args["method"]
  60. external_application.status = args["status"]
  61. external_application.updated_by = current_user.id
  62. external_application.updated_at = datetime.now()
  63. db.session.add(external_application)
  64. db.session.commit()
  65. return external_application
  66. @staticmethod
  67. def delete_external_application(external_application_id):
  68. external_application = ExternalApplicationService.get_external_application(external_application_id)
  69. if not external_application:
  70. raise NotFound("ExternalApplication not found")
  71. db.session.delete(external_application)
  72. db.session.commit()
  73. return external_application