import uuid from flask_login import current_user from sqlalchemy import func from werkzeug.exceptions import NotFound from extensions.ext_database import db from models.dataset import Dataset from models.model import App, Tag, TagBinding class TagService: @staticmethod def get_tags(tag_type: str, current_tenant_id: str, keyword: str = None) -> list: query = ( db.session.query(Tag.id, Tag.type, Tag.name, func.count(TagBinding.id).label("binding_count")) .outerjoin(TagBinding, Tag.id == TagBinding.tag_id) .filter(Tag.type == tag_type, Tag.tenant_id == current_tenant_id) ) if keyword: query = query.filter(db.and_(Tag.name.ilike(f"%{keyword}%"))) query = query.group_by(Tag.id) results = query.order_by(Tag.created_at.desc()).all() return results @staticmethod def get_target_ids_by_tag_ids(tag_type: str, current_tenant_id: str, tag_ids: list) -> list: tags = ( db.session.query(Tag) .filter(Tag.id.in_(tag_ids), Tag.tenant_id == current_tenant_id, Tag.type == tag_type) .all() ) if not tags: return [] tag_ids = [tag.id for tag in tags] tag_bindings = ( db.session.query(TagBinding.target_id) .filter(TagBinding.tag_id.in_(tag_ids), TagBinding.tenant_id == current_tenant_id) .all() ) if not tag_bindings: return [] results = [tag_binding.target_id for tag_binding in tag_bindings] return results @staticmethod def get_tags_by_target_id(tag_type: str, current_tenant_id: str, target_id: str) -> list: tags = ( db.session.query(Tag) .join(TagBinding, Tag.id == TagBinding.tag_id) .filter( TagBinding.target_id == target_id, TagBinding.tenant_id == current_tenant_id, Tag.tenant_id == current_tenant_id, Tag.type == tag_type, ) .all() ) return tags or [] @staticmethod def save_tags(args: dict) -> Tag: tag = Tag( id=str(uuid.uuid4()), name=args["name"], type=args["type"], created_by=current_user.id, tenant_id=current_user.current_tenant_id, ) db.session.add(tag) db.session.commit() return tag @staticmethod def update_tags(args: dict, tag_id: str) -> Tag: tag = db.session.query(Tag).filter(Tag.id == tag_id).first() if not tag: raise NotFound("Tag not found") tag.name = args["name"] db.session.commit() return tag @staticmethod def get_tag_binding_count(tag_id: str) -> int: count = db.session.query(TagBinding).filter(TagBinding.tag_id == tag_id).count() return count @staticmethod def delete_tag(tag_id: str): tag = db.session.query(Tag).filter(Tag.id == tag_id).first() if not tag: raise NotFound("Tag not found") db.session.delete(tag) # delete tag binding tag_bindings = db.session.query(TagBinding).filter(TagBinding.tag_id == tag_id).all() if tag_bindings: for tag_binding in tag_bindings: db.session.delete(tag_binding) db.session.commit() @staticmethod def save_tag_binding(args): # check if target exists TagService.check_target_exists(args["type"], args["target_id"]) # save tag binding for tag_id in args["tag_ids"]: tag_binding = ( db.session.query(TagBinding) .filter(TagBinding.tag_id == tag_id, TagBinding.target_id == args["target_id"]) .first() ) if tag_binding: continue new_tag_binding = TagBinding( tag_id=tag_id, target_id=args["target_id"], tenant_id=current_user.current_tenant_id, created_by=current_user.id, ) db.session.add(new_tag_binding) db.session.commit() @staticmethod def delete_tag_binding(args): # check if target exists TagService.check_target_exists(args["type"], args["target_id"]) # delete tag binding tag_bindings = ( db.session.query(TagBinding) .filter(TagBinding.target_id == args["target_id"], TagBinding.tag_id == (args["tag_id"])) .first() ) if tag_bindings: db.session.delete(tag_bindings) db.session.commit() @staticmethod def check_target_exists(type: str, target_id: str): if type == "knowledge": dataset = ( db.session.query(Dataset) .filter(Dataset.tenant_id == current_user.current_tenant_id, Dataset.id == target_id) .first() ) if not dataset: raise NotFound("Dataset not found") elif type == "app": app = ( db.session.query(App) .filter(App.tenant_id == current_user.current_tenant_id, App.id == target_id) .first() ) if not app: raise NotFound("App not found") else: raise NotFound("Invalid binding type")