123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687 |
- import uuid
- from datetime import datetime
- from typing import Optional
- from flask_login import current_user
- from werkzeug.exceptions import NotFound
- from models import db
- from models.external_application import ExternalApplication
- from services.errors.external_application import ExternalApplicationNameDuplicateError
- class ExternalApplicationService:
- @staticmethod
- def get_external_applications(page, per_page, search=None, type=None, url=None, method=None):
- query = ExternalApplication.query.order_by(ExternalApplication.created_at.desc())
- if search:
- query = ExternalApplication.query.filter(ExternalApplication.name.ilike(f"%{search}%"))
- if type:
- query = ExternalApplication.query.filter(ExternalApplication.type == type)
- if url:
- query = ExternalApplication.query.filter(ExternalApplication.url == url)
- if method:
- query = ExternalApplication.query.filter(ExternalApplication.method == method)
- external_applications = query.paginate(page=page, per_page=per_page, max_per_page=100, error_out=False)
- return external_applications.items, external_applications.total
- @staticmethod
- def get_external_application(external_application_id: str) -> Optional[ExternalApplication]:
- external_application = (ExternalApplication.query
- .filter(ExternalApplication.id == external_application_id).first())
- return external_application
- @staticmethod
- def get_external_application_by_name(name: str) -> Optional[ExternalApplication]:
- external_application = ExternalApplication.query.filter_by(name=name).first()
- return external_application
- @staticmethod
- def save_external_application(args: dict) -> ExternalApplication:
- name = args["name"]
- external_application = ExternalApplicationService.get_external_application_by_name(name)
- if external_application:
- raise ExternalApplicationNameDuplicateError(f"ExternalApplication with name {name} already exists.")
- external_application = ExternalApplication(
- id=str(uuid.uuid4()),
- name=name,
- type=args["type"],
- url=args["url"],
- method=args["method"],
- status=args["status"],
- created_by=current_user.id,
- updated_by=current_user.id,
- )
- db.session.add(external_application)
- db.session.commit()
- return external_application
- @staticmethod
- def update_external_application(args: dict, external_application_id: str) -> ExternalApplication:
- external_application = ExternalApplicationService.get_external_application(external_application_id)
- if not external_application:
- raise NotFound("ExternalApplication not found")
- external_application.name = args["name"]
- external_application.type = args["type"]
- external_application.url = args["url"]
- external_application.method = args["method"]
- external_application.status = args["status"]
- external_application.updated_by = current_user.id
- external_application.updated_at = datetime.now()
- db.session.add(external_application)
- db.session.commit()
- return external_application
- @staticmethod
- def delete_external_application(external_application_id):
- external_application = ExternalApplicationService.get_external_application(external_application_id)
- if not external_application:
- raise NotFound("ExternalApplication not found")
- db.session.delete(external_application)
- db.session.commit()
- return external_application
|