import datetime from sqlalchemy import or_ from extensions.ext_database import db from models.account import Account, TenantAccountJoin from models.dept import Dept from services.account_service import AccountService class DeptService: @staticmethod def get_dept_account_list(self): dept_list = [] condition = or_(Dept.parent_dept_id == None, Dept.parent_dept_id == "") dept_results = db.session.query(Dept).filter(Dept.status == "active", condition).all() for dept_row in dept_results: children_dept_list = [] children_depts = ( db.session.query(Dept).filter(Dept.status == "active", Dept.parent_dept_id == dept_row.dept_id).all() ) for children_dept in children_depts: if self == 0: account_list = DeptService.get_dept_account(children_dept.dept_id) else: account_list = DeptService.get_dept_edit_account(children_dept.dept_id) children_dept_list.append( { "dept_id": children_dept.dept_id, "dept_name": children_dept.dept_name, "parent_dept_id": children_dept.parent_dept_id, "account_list": account_list, } ) if self == 0: account_list = DeptService.get_dept_account(dept_row.dept_id) else: account_list = DeptService.get_dept_edit_account(dept_row.dept_id) dept_list.append( { "parent_dept_id": "", "dept_id": dept_row.dept_id, "dept_name": dept_row.dept_name, "account_list": account_list, "children": children_dept_list, } ) return dept_list @staticmethod def get_dept_account(self): dept_account = [] account_results = ( db.session.query(Account.dept_id, Account.id, Account.email, Account.name) .filter(Account.status == "active", Account.dept_id == str(self)) .all() ) for row in account_results: dept_account.append({"account_id": row.id, "email": row.email, "name": row.name}) return dept_account @staticmethod def get_dept_edit_account(self): dept_account = [] account_results = ( db.session.query(Account.dept_id, Account.id, Account.email, Account.name) .join(TenantAccountJoin, Account.id == TenantAccountJoin.account_id) .filter(Account.status == "active", Account.dept_id == str(self), TenantAccountJoin.role != "normal") .all() ) for row in account_results: dept_account.append({"account_id": row.id, "email": row.email, "name": row.name}) return dept_account @staticmethod def get_dept_list(): dept_list = [] condition = or_(Dept.parent_dept_id == None, Dept.parent_dept_id == "") dept_results = db.session.query(Dept).filter(Dept.status == "active", condition).all() for dept_row in dept_results: children_dept_list = [] children_depts = ( db.session.query(Dept).filter(Dept.status == "active", Dept.parent_dept_id == dept_row.dept_id).all() ) for children_dept in children_depts: children_dept_list.append( { "dept_id": children_dept.dept_id, "dept_name": children_dept.dept_name, "parent_dept_id": children_dept.parent_dept_id, } ) dept_list.append( { "parent_dept_id": "", "dept_id": dept_row.dept_id, "dept_name": dept_row.dept_name, "children": children_dept_list, } ) return dept_list @staticmethod def get_dept_by_name(dept_name): dept = db.session.query(Dept).filter(Dept.status == "active", Dept.dept_name == dept_name).first() return dept @staticmethod def get_dept_by_id_name(dept_id, dept_name): dept = ( db.session.query(Dept) .filter(Dept.status == "active", Dept.dept_name == dept_name, Dept.dept_id == dept_id) .first() ) return dept @staticmethod def get_dept_by_id(dept_id): dept = db.session.query(Dept).filter(Dept.status == "active", Dept.dept_id == dept_id).first() return dept @staticmethod def save_dept(dept_name, current_user, parent_dept_id): dept = Dept( dept_name=dept_name, status="active", created_at=datetime.datetime.now(), created_by=current_user.id, parent_dept_id=parent_dept_id, ) db.session.add(dept) db.session.flush() db.session.commit() @staticmethod def update_dept(dept_id, dept_name, current_user, parent_dept_id): sql = ( db.session.query(Dept) .filter(Dept.dept_id == dept_id) .update( { "updated_by": current_user.id, "updated_at": datetime.datetime.now(), "dept_name": dept_name, "parent_dept_id": parent_dept_id, } ) ) db.session.commit() @staticmethod def delete_dept(dept): AccountService.delete_account_dept(dept) db.session.delete(dept) db.session.commit() @staticmethod def save_dept_account_list(dept_id, dept_account_list): for dept_account in dept_account_list: account_id = dept_account.get("account_id") AccountService.update_account_dept(dept_id, account_id) @staticmethod def delete_dept_account_list(dept_id, dept_account_list): for dept_account in dept_account_list: account_id = dept_account.get("account_id") AccountService.update_account_dept("", account_id) @staticmethod def get_dept_account_edit_list(): dept_list = [] condition = or_(Dept.parent_dept_id == None, Dept.parent_dept_id == "") dept_results = db.session.query(Dept).filter(Dept.status == "active", condition).all() for dept_row in dept_results: children_dept_list = [] children_depts = ( db.session.query(Dept).filter(Dept.status == "active", Dept.parent_dept_id == dept_row.dept_id).all() ) for children_dept in children_depts: account_list = DeptService.get_dept_account(children_dept.dept_id) children_dept_list.append( { "dept_id": children_dept.dept_id, "dept_name": children_dept.dept_name, "parent_dept_id": children_dept.parent_dept_id, "account_list": account_list, } ) account_list = DeptService.get_dept_account(dept_row.dept_id.dept_id) dept_list.append( { "parent_dept_id": "", "dept_id": dept_row.dept_id, "dept_name": dept_row.dept_name, "account_list": account_list, "children": children_dept_list, } ) return dept_list @staticmethod def get_depts_count(): count = db.session.query(Dept).filter(Dept.status == "active").count() return count