|
@@ -6,14 +6,17 @@ from sqlalchemy import func
|
|
|
from werkzeug.exceptions import NotFound
|
|
|
|
|
|
from extensions.ext_database import db
|
|
|
+from models.account import TenantAccountRole
|
|
|
from models.dataset import Dataset
|
|
|
-from models.model import App, Tag, TagBinding
|
|
|
+from models.model import App, AppPermissionAll, Tag, TagBinding
|
|
|
+from services.errors.account import NoPermissionError
|
|
|
from services.errors.tag import TagNameDuplicateError
|
|
|
|
|
|
|
|
|
class TagService:
|
|
|
@staticmethod
|
|
|
- def get_tags(tag_type: str, current_tenant_id: str, keyword: Optional[str] = None) -> list:
|
|
|
+ def get_tags(tag_type: str, current_tenant_id: str, keyword: Optional[str] = None,
|
|
|
+ label_map: Optional[str] = None) -> list:
|
|
|
query = (
|
|
|
db.session.query(Tag.id, Tag.type, Tag.name, func.count(TagBinding.id).label("binding_count"))
|
|
|
.outerjoin(TagBinding, Tag.id == TagBinding.tag_id)
|
|
@@ -22,10 +25,16 @@ class TagService:
|
|
|
if keyword:
|
|
|
query = query.filter(db.and_(Tag.name.ilike(f"%{keyword}%")))
|
|
|
query = query.group_by(Tag.id, Tag.type, Tag.name)
|
|
|
+ if TagService.str2bool(label_map):
|
|
|
+ query = query.having(func.count(TagBinding.id) >= 1)
|
|
|
results: list = query.order_by(Tag.created_at.desc()).all()
|
|
|
return results
|
|
|
|
|
|
@staticmethod
|
|
|
+ def str2bool(v: Optional[str]) -> bool:
|
|
|
+ return str(v).lower() in ("true", "1", "yes")
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
def get_tag_by_tag_name(tag_type: str, tenant_id: str, tag_name: str) -> Optional[Tag]:
|
|
|
tag: Optional[Tag] = (
|
|
|
db.session.query(Tag).filter(Tag.type == tag_type, Tag.tenant_id == tenant_id, Tag.name == tag_name).first()
|
|
@@ -145,6 +154,8 @@ class TagService:
|
|
|
|
|
|
@staticmethod
|
|
|
def save_tag_binding(args):
|
|
|
+ # 1.智能体设置可见授权的编辑权限一致,2.知识库的标签都能设置--修改为随设置权限一致
|
|
|
+ TagService.check_target_edit_auth(args["type"], args["target_id"])
|
|
|
# check if target exists
|
|
|
TagService.check_target_exists(args["type"], args["target_id"])
|
|
|
# save tag binding
|
|
@@ -167,6 +178,7 @@ class TagService:
|
|
|
|
|
|
@staticmethod
|
|
|
def delete_tag_binding(args):
|
|
|
+ TagService.check_target_edit_auth(args["type"], args["target_id"])
|
|
|
# check if target exists
|
|
|
TagService.check_target_exists(args["type"], args["target_id"])
|
|
|
# delete tag binding
|
|
@@ -199,3 +211,29 @@ class TagService:
|
|
|
raise NotFound("App not found")
|
|
|
else:
|
|
|
raise NotFound("Invalid binding type")
|
|
|
+
|
|
|
+ @staticmethod
|
|
|
+ def check_target_edit_auth(type: str, target_id: str):
|
|
|
+ if type in {"knowledge", "knowledge_category"}:
|
|
|
+ dataset = (
|
|
|
+ db.session.query(Dataset)
|
|
|
+ .filter(Dataset.id == target_id)
|
|
|
+ .first()
|
|
|
+ )
|
|
|
+ if (
|
|
|
+ current_user.current_role not in [TenantAccountRole.ADMIN, TenantAccountRole.OWNER]
|
|
|
+ and dataset.created_by != current_user.id
|
|
|
+ ):
|
|
|
+ raise NoPermissionError("You do not have permission to operate this dataset.")
|
|
|
+ elif type == "app":
|
|
|
+ app = (
|
|
|
+ db.session.query(AppPermissionAll)
|
|
|
+ .filter(AppPermissionAll.has_read_permission == True,
|
|
|
+ AppPermissionAll.account_id == current_user.id,
|
|
|
+ AppPermissionAll.app_id == target_id)
|
|
|
+ .first()
|
|
|
+ )
|
|
|
+ if not app:
|
|
|
+ raise NoPermissionError("You do not have permission to operate this app.")
|
|
|
+ else:
|
|
|
+ raise NotFound("Invalid binding type")
|