| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242 | import copyimport datetimeimport loggingfrom typing import Optionalfrom flask_login import current_user  # type: ignorefrom core.rag.index_processor.constant.built_in_field import BuiltInField, MetadataDataSourcefrom extensions.ext_database import dbfrom extensions.ext_redis import redis_clientfrom models.dataset import Dataset, DatasetMetadata, DatasetMetadataBindingfrom services.dataset_service import DocumentServicefrom services.entities.knowledge_entities.knowledge_entities import (    MetadataArgs,    MetadataOperationData,)class MetadataService:    @staticmethod    def create_metadata(dataset_id: str, metadata_args: MetadataArgs) -> DatasetMetadata:        # check if metadata name already exists        if DatasetMetadata.query.filter_by(            tenant_id=current_user.current_tenant_id, dataset_id=dataset_id, name=metadata_args.name        ).first():            raise ValueError("Metadata name already exists.")        for field in BuiltInField:            if field.value == metadata_args.name:                raise ValueError("Metadata name already exists in Built-in fields.")        metadata = DatasetMetadata(            tenant_id=current_user.current_tenant_id,            dataset_id=dataset_id,            type=metadata_args.type,            name=metadata_args.name,            created_by=current_user.id,        )        db.session.add(metadata)        db.session.commit()        return metadata    @staticmethod    def update_metadata_name(dataset_id: str, metadata_id: str, name: str) -> DatasetMetadata:  # type: ignore        lock_key = f"dataset_metadata_lock_{dataset_id}"        # check if metadata name already exists        if DatasetMetadata.query.filter_by(            tenant_id=current_user.current_tenant_id, dataset_id=dataset_id, name=name        ).first():            raise ValueError("Metadata name already exists.")        for field in BuiltInField:            if field.value == name:                raise ValueError("Metadata name already exists in Built-in fields.")        try:            MetadataService.knowledge_base_metadata_lock_check(dataset_id, None)            metadata = DatasetMetadata.query.filter_by(id=metadata_id).first()            if metadata is None:                raise ValueError("Metadata not found.")            old_name = metadata.name            metadata.name = name            metadata.updated_by = current_user.id            metadata.updated_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)            # update related documents            dataset_metadata_bindings = DatasetMetadataBinding.query.filter_by(metadata_id=metadata_id).all()            if dataset_metadata_bindings:                document_ids = [binding.document_id for binding in dataset_metadata_bindings]                documents = DocumentService.get_document_by_ids(document_ids)                for document in documents:                    doc_metadata = copy.deepcopy(document.doc_metadata)                    value = doc_metadata.pop(old_name, None)                    doc_metadata[name] = value                    document.doc_metadata = doc_metadata                    db.session.add(document)            db.session.commit()            return metadata  # type: ignore        except Exception:            logging.exception("Update metadata name failed")        finally:            redis_client.delete(lock_key)    @staticmethod    def delete_metadata(dataset_id: str, metadata_id: str):        lock_key = f"dataset_metadata_lock_{dataset_id}"        try:            MetadataService.knowledge_base_metadata_lock_check(dataset_id, None)            metadata = DatasetMetadata.query.filter_by(id=metadata_id).first()            if metadata is None:                raise ValueError("Metadata not found.")            db.session.delete(metadata)            # deal related documents            dataset_metadata_bindings = DatasetMetadataBinding.query.filter_by(metadata_id=metadata_id).all()            if dataset_metadata_bindings:                document_ids = [binding.document_id for binding in dataset_metadata_bindings]                documents = DocumentService.get_document_by_ids(document_ids)                for document in documents:                    doc_metadata = copy.deepcopy(document.doc_metadata)                    doc_metadata.pop(metadata.name, None)                    document.doc_metadata = doc_metadata                    db.session.add(document)            db.session.commit()            return metadata        except Exception:            logging.exception("Delete metadata failed")        finally:            redis_client.delete(lock_key)    @staticmethod    def get_built_in_fields():        return [            {"name": BuiltInField.document_name.value, "type": "string"},            {"name": BuiltInField.uploader.value, "type": "string"},            {"name": BuiltInField.upload_date.value, "type": "time"},            {"name": BuiltInField.last_update_date.value, "type": "time"},            {"name": BuiltInField.source.value, "type": "string"},        ]    @staticmethod    def enable_built_in_field(dataset: Dataset):        if dataset.built_in_field_enabled:            return        lock_key = f"dataset_metadata_lock_{dataset.id}"        try:            MetadataService.knowledge_base_metadata_lock_check(dataset.id, None)            dataset.built_in_field_enabled = True            db.session.add(dataset)            documents = DocumentService.get_working_documents_by_dataset_id(dataset.id)            if documents:                for document in documents:                    if not document.doc_metadata:                        doc_metadata = {}                    else:                        doc_metadata = copy.deepcopy(document.doc_metadata)                    doc_metadata[BuiltInField.document_name.value] = document.name                    doc_metadata[BuiltInField.uploader.value] = document.uploader                    doc_metadata[BuiltInField.upload_date.value] = document.upload_date.timestamp()                    doc_metadata[BuiltInField.last_update_date.value] = document.last_update_date.timestamp()                    doc_metadata[BuiltInField.source.value] = MetadataDataSource[document.data_source_type].value                    document.doc_metadata = doc_metadata                    db.session.add(document)            db.session.commit()        except Exception:            logging.exception("Enable built-in field failed")        finally:            redis_client.delete(lock_key)    @staticmethod    def disable_built_in_field(dataset: Dataset):        if not dataset.built_in_field_enabled:            return        lock_key = f"dataset_metadata_lock_{dataset.id}"        try:            MetadataService.knowledge_base_metadata_lock_check(dataset.id, None)            dataset.built_in_field_enabled = False            db.session.add(dataset)            documents = DocumentService.get_working_documents_by_dataset_id(dataset.id)            document_ids = []            if documents:                for document in documents:                    doc_metadata = copy.deepcopy(document.doc_metadata)                    doc_metadata.pop(BuiltInField.document_name.value, None)                    doc_metadata.pop(BuiltInField.uploader.value, None)                    doc_metadata.pop(BuiltInField.upload_date.value, None)                    doc_metadata.pop(BuiltInField.last_update_date.value, None)                    doc_metadata.pop(BuiltInField.source.value, None)                    document.doc_metadata = doc_metadata                    db.session.add(document)                    document_ids.append(document.id)            db.session.commit()        except Exception:            logging.exception("Disable built-in field failed")        finally:            redis_client.delete(lock_key)    @staticmethod    def update_documents_metadata(dataset: Dataset, metadata_args: MetadataOperationData):        for operation in metadata_args.operation_data:            lock_key = f"document_metadata_lock_{operation.document_id}"            try:                MetadataService.knowledge_base_metadata_lock_check(None, operation.document_id)                document = DocumentService.get_document(dataset.id, operation.document_id)                if document is None:                    raise ValueError("Document not found.")                doc_metadata = {}                for metadata_value in operation.metadata_list:                    doc_metadata[metadata_value.name] = metadata_value.value                if dataset.built_in_field_enabled:                    doc_metadata[BuiltInField.document_name.value] = document.name                    doc_metadata[BuiltInField.uploader.value] = document.uploader                    doc_metadata[BuiltInField.upload_date.value] = document.upload_date.timestamp()                    doc_metadata[BuiltInField.last_update_date.value] = document.last_update_date.timestamp()                    doc_metadata[BuiltInField.source.value] = MetadataDataSource[document.data_source_type].value                document.doc_metadata = doc_metadata                db.session.add(document)                db.session.commit()                # deal metadata binding                DatasetMetadataBinding.query.filter_by(document_id=operation.document_id).delete()                for metadata_value in operation.metadata_list:                    dataset_metadata_binding = DatasetMetadataBinding(                        tenant_id=current_user.current_tenant_id,                        dataset_id=dataset.id,                        document_id=operation.document_id,                        metadata_id=metadata_value.id,                        created_by=current_user.id,                    )                    db.session.add(dataset_metadata_binding)                db.session.commit()            except Exception:                logging.exception("Update documents metadata failed")            finally:                redis_client.delete(lock_key)    @staticmethod    def knowledge_base_metadata_lock_check(dataset_id: Optional[str], document_id: Optional[str]):        if dataset_id:            lock_key = f"dataset_metadata_lock_{dataset_id}"            if redis_client.get(lock_key):                raise ValueError("Another knowledge base metadata operation is running, please wait a moment.")            redis_client.set(lock_key, 1, ex=3600)        if document_id:            lock_key = f"document_metadata_lock_{document_id}"            if redis_client.get(lock_key):                raise ValueError("Another document metadata operation is running, please wait a moment.")            redis_client.set(lock_key, 1, ex=3600)    @staticmethod    def get_dataset_metadatas(dataset: Dataset):        return {            "doc_metadata": [                {                    "id": item.get("id"),                    "name": item.get("name"),                    "type": item.get("type"),                    "count": DatasetMetadataBinding.query.filter_by(                        metadata_id=item.get("id"), dataset_id=dataset.id                    ).count(),                }                for item in dataset.doc_metadata or []                if item.get("id") != "built-in"            ],            "built_in_field_enabled": dataset.built_in_field_enabled,        }
 |