|
@@ -15,7 +15,8 @@ from services.errors.tag import TagNameDuplicateError
|
|
|
|
|
|
class TagService:
|
|
|
@staticmethod
|
|
|
- def get_tags(tag_type: str, current_tenant_id: str, keyword: Optional[str] = None) -> list:
|
|
|
+ def get_tags(tag_type: str, current_tenant_id: str, keyword: Optional[str] = None,
|
|
|
+ label_map: Optional[str] = None) -> list:
|
|
|
query = (
|
|
|
db.session.query(Tag.id, Tag.type, Tag.name, func.count(TagBinding.id).label("binding_count"))
|
|
|
.outerjoin(TagBinding, Tag.id == TagBinding.tag_id)
|
|
@@ -24,10 +25,16 @@ class TagService:
|
|
|
if keyword:
|
|
|
query = query.filter(db.and_(Tag.name.ilike(f"%{keyword}%")))
|
|
|
query = query.group_by(Tag.id, Tag.type, Tag.name)
|
|
|
+ if TagService.str2bool(label_map):
|
|
|
+ query = query.having(func.count(TagBinding.id) >= 1)
|
|
|
results: list = query.order_by(Tag.created_at.desc()).all()
|
|
|
return results
|
|
|
|
|
|
@staticmethod
|
|
|
+ def str2bool(v: Optional[str]) -> bool:
|
|
|
+ return str(v).lower() in ("true", "1", "yes")
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
def get_tag_by_tag_name(tag_type: str, tenant_id: str, tag_name: str) -> Optional[Tag]:
|
|
|
tag: Optional[Tag] = (
|
|
|
db.session.query(Tag).filter(Tag.type == tag_type, Tag.tenant_id == tenant_id, Tag.name == tag_name).first()
|