tag_service.py 8.5 KB


  1. import uuid
  2. from typing import Optional
  3. from flask_login import current_user # type: ignore
  4. from sqlalchemy import func
  5. from werkzeug.exceptions import NotFound
  6. from extensions.ext_database import db
  7. from models.account import TenantAccountRole
  8. from models.dataset import Dataset
  9. from models.model import App, AppPermissionAll, Tag, TagBinding
  10. from services.errors.account import NoPermissionError
  11. from services.errors.tag import TagNameDuplicateError
  12. class TagService:
  13. @staticmethod
  14. def get_tags(tag_type: str, current_tenant_id: str, keyword: Optional[str] = None) -> list:
  15. query = (
  16. db.session.query(Tag.id, Tag.type, Tag.name, func.count(TagBinding.id).label("binding_count"))
  17. .outerjoin(TagBinding, Tag.id == TagBinding.tag_id)
  18. .filter(Tag.type == tag_type, Tag.tenant_id == current_tenant_id)
  19. )
  20. if keyword:
  21. query = query.filter(db.and_(Tag.name.ilike(f"%{keyword}%")))
  22. query = query.group_by(Tag.id, Tag.type, Tag.name)
  23. results: list = query.order_by(Tag.created_at.desc()).all()
  24. return results
  25. @staticmethod
  26. def get_tag_by_tag_name(tag_type: str, tenant_id: str, tag_name: str) -> Optional[Tag]:
  27. tag: Optional[Tag] = (
  28. db.session.query(Tag).filter(Tag.type == tag_type, Tag.tenant_id == tenant_id, Tag.name == tag_name).first()
  29. )
  30. return tag
  31. @staticmethod
  32. def get_target_ids_by_tag_ids(tag_type: str, current_tenant_id: str, tag_ids: list) -> list:
  33. tags = (
  34. db.session.query(Tag)
  35. .filter(Tag.id.in_(tag_ids), Tag.tenant_id == current_tenant_id, Tag.type == tag_type)
  36. .all()
  37. )
  38. if not tags:
  39. return []
  40. tag_ids = [tag.id for tag in tags]
  41. tag_bindings = (
  42. db.session.query(TagBinding.target_id)
  43. .filter(TagBinding.tag_id.in_(tag_ids), TagBinding.tenant_id == current_tenant_id)
  44. .all()
  45. )
  46. if not tag_bindings:
  47. return []
  48. results = [tag_binding.target_id for tag_binding in tag_bindings]
  49. return results
  50. @staticmethod
  51. def get_tags_count(tenant_id: str, keyword: Optional[str] = None) -> int:
  52. query = db.session.query(Tag).filter(Tag.type == "knowledge")
  53. if tenant_id:
  54. query = query.filter(Tag.tenant_id == tenant_id)
  55. if keyword:
  56. query = query.filter(Tag.name.ilike(f"%{keyword}%"))
  57. return query.count()
  58. @staticmethod
  59. def get_tags_by_target_id(tag_type: str, current_tenant_id: str, target_id: str) -> list:
  60. tags = (
  61. db.session.query(Tag)
  62. .join(TagBinding, Tag.id == TagBinding.tag_id)
  63. .filter(
  64. TagBinding.target_id == target_id,
  65. TagBinding.tenant_id == current_tenant_id,
  66. Tag.tenant_id == current_tenant_id,
  67. Tag.type == tag_type,
  68. )
  69. .all()
  70. )
  71. return tags or []
  72. @staticmethod
  73. def get_tag(tag_id: str):
  74. tag = db.session.query(Tag).filter(Tag.id == tag_id).first()
  75. if not tag:
  76. raise NotFound("Tag not found")
  77. return tag
  78. @staticmethod
  79. def get_page_tags(page, per_page, tag_type, tenant_id):
  80. query = (
  81. db.session.query(Tag.id, Tag.type, Tag.name, func.count(TagBinding.id).label("binding_count"))
  82. .outerjoin(TagBinding, Tag.id == TagBinding.tag_id)
  83. .filter(Tag.type == tag_type, Tag.tenant_id == tenant_id)
  84. )
  85. query = query.group_by(Tag.id, Tag.type, Tag.name)
  86. tags = query.paginate(page=page, per_page=per_page, error_out=False)
  87. return tags.items, tags.total
  88. @staticmethod
  89. def save_tags(args: dict) -> Tag:
  90. name = args["name"]
  91. type = args["type"]
  92. tenant_id = current_user.current_tenant_id
  93. tag = TagService.get_tag_by_tag_name(type, tenant_id, name)
  94. if tag:
  95. raise TagNameDuplicateError(f"Tag with name {name} already exists.")
  96. tag = Tag(
  97. id=str(uuid.uuid4()),
  98. name=name,
  99. type=type,
  100. created_by=current_user.id,
  101. tenant_id=tenant_id,
  102. )
  103. db.session.add(tag)
  104. db.session.commit()
  105. return tag
  106. @staticmethod
  107. def update_tags(args: dict, tag_id: str) -> Tag:
  108. tag = db.session.query(Tag).filter(Tag.id == tag_id).first()
  109. if not tag:
  110. raise NotFound("Tag not found")
  111. tag.name = args["name"]
  112. db.session.commit()
  113. return tag
  114. @staticmethod
  115. def get_tag_binding_count(tag_id: str) -> int:
  116. count = db.session.query(TagBinding).filter(TagBinding.tag_id == tag_id).count()
  117. return count
  118. @staticmethod
  119. def delete_tag(tag_id: str):
  120. tag = db.session.query(Tag).filter(Tag.id == tag_id).first()
  121. if not tag:
  122. raise NotFound("Tag not found")
  123. db.session.delete(tag)
  124. # delete tag binding
  125. tag_bindings = db.session.query(TagBinding).filter(TagBinding.tag_id == tag_id).all()
  126. if tag_bindings:
  127. for tag_binding in tag_bindings:
  128. db.session.delete(tag_binding)
  129. db.session.commit()
  130. @staticmethod
  131. def save_tag_binding(args):
  132. # 1.智能体设置可见授权的编辑权限一致,2.知识库的标签都能设置--修改为随设置权限一致
  133. TagService.check_target_edit_auth(args["type"], args["target_id"])
  134. # check if target exists
  135. TagService.check_target_exists(args["type"], args["target_id"])
  136. # save tag binding
  137. for tag_id in args["tag_ids"]:
  138. tag_binding = (
  139. db.session.query(TagBinding)
  140. .filter(TagBinding.tag_id == tag_id, TagBinding.target_id == args["target_id"])
  141. .first()
  142. )
  143. if tag_binding:
  144. continue
  145. new_tag_binding = TagBinding(
  146. tag_id=tag_id,
  147. target_id=args["target_id"],
  148. tenant_id=current_user.current_tenant_id,
  149. created_by=current_user.id,
  150. )
  151. db.session.add(new_tag_binding)
  152. db.session.commit()
  153. @staticmethod
  154. def delete_tag_binding(args):
  155. TagService.check_target_edit_auth(args["type"], args["target_id"])
  156. # check if target exists
  157. TagService.check_target_exists(args["type"], args["target_id"])
  158. # delete tag binding
  159. tag_bindings = (
  160. db.session.query(TagBinding)
  161. .filter(TagBinding.target_id == args["target_id"], TagBinding.tag_id == (args["tag_id"]))
  162. .first()
  163. )
  164. if tag_bindings:
  165. db.session.delete(tag_bindings)
  166. db.session.commit()
  167. @staticmethod
  168. def check_target_exists(type: str, target_id: str):
  169. if type in {"knowledge", "knowledge_category"}:
  170. dataset = (
  171. db.session.query(Dataset)
  172. .filter(Dataset.tenant_id == current_user.current_tenant_id, Dataset.id == target_id)
  173. .first()
  174. )
  175. if not dataset:
  176. raise NotFound("Dataset not found")
  177. elif type == "app":
  178. app = (
  179. db.session.query(App)
  180. .filter(App.tenant_id == current_user.current_tenant_id, App.id == target_id)
  181. .first()
  182. )
  183. if not app:
  184. raise NotFound("App not found")
  185. else:
  186. raise NotFound("Invalid binding type")
  187. @staticmethod
  188. def check_target_edit_auth(type: str, target_id: str):
  189. if type in {"knowledge", "knowledge_category"}:
  190. dataset = (
  191. db.session.query(Dataset)
  192. .filter(Dataset.id == target_id)
  193. .first()
  194. )
  195. if (
  196. current_user.current_role not in [TenantAccountRole.ADMIN, TenantAccountRole.OWNER]
  197. and dataset.created_by != current_user.id
  198. ):
  199. raise NoPermissionError("You do not have permission to operate this dataset.")
  200. elif type == "app":
  201. app = (
  202. db.session.query(AppPermissionAll)
  203. .filter(AppPermissionAll.has_read_permission == True,
  204. AppPermissionAll.account_id == current_user.id,
  205. AppPermissionAll.app_id == target_id)
  206. .first()
  207. )
  208. if not app:
  209. raise NoPermissionError("You do not have permission to operate this app.")
  210. else:
  211. raise NotFound("Invalid binding type")