| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755 | import concurrent.futuresimport datetimeimport jsonimport loggingimport reimport threadingimport timeimport uuidfrom typing import Any, Optional, castfrom flask import current_appfrom flask_login import current_user  # type: ignorefrom sqlalchemy.orm.exc import ObjectDeletedErrorfrom configs import dify_configfrom core.entities.knowledge_entities import IndexingEstimate, PreviewDetail, QAPreviewDetailfrom core.errors.error import ProviderTokenNotInitErrorfrom core.model_manager import ModelInstance, ModelManagerfrom core.model_runtime.entities.model_entities import ModelTypefrom core.rag.cleaner.clean_processor import CleanProcessorfrom core.rag.datasource.keyword.keyword_factory import Keywordfrom core.rag.docstore.dataset_docstore import DatasetDocumentStorefrom core.rag.extractor.entity.extract_setting import ExtractSettingfrom core.rag.index_processor.constant.index_type import IndexTypefrom core.rag.index_processor.index_processor_base import BaseIndexProcessorfrom core.rag.index_processor.index_processor_factory import IndexProcessorFactoryfrom core.rag.models.document import ChildDocument, Documentfrom core.rag.splitter.fixed_text_splitter import (    EnhanceRecursiveCharacterTextSplitter,    FixedRecursiveCharacterTextSplitter,)from core.rag.splitter.text_splitter import TextSplitterfrom core.tools.utils.web_reader_tool import get_image_upload_file_idsfrom extensions.ext_database import dbfrom extensions.ext_redis import redis_clientfrom extensions.ext_storage import storagefrom libs import helperfrom models.dataset import ChildChunk, Dataset, DatasetProcessRule, DocumentSegmentfrom models.dataset import Document as DatasetDocumentfrom models.model import UploadFilefrom services.feature_service import FeatureServiceclass IndexingRunner:    def __init__(self):        self.storage = storage        self.model_manager = ModelManager()    def run(self, dataset_documents: list[DatasetDocument]):        """Run the indexing process."""        for dataset_document in dataset_documents:            try:                # get dataset                dataset = Dataset.query.filter_by(id=dataset_document.dataset_id).first()                if not dataset:                    raise ValueError("no dataset found")                # get the process rule                processing_rule = (                    db.session.query(DatasetProcessRule)                    .filter(DatasetProcessRule.id == dataset_document.dataset_process_rule_id)                    .first()                )                if not processing_rule:                    raise ValueError("no process rule found")                index_type = dataset_document.doc_form                index_processor = IndexProcessorFactory(index_type).init_index_processor()                # extract                text_docs = self._extract(index_processor, dataset_document, processing_rule.to_dict())                # transform                documents = self._transform(                    index_processor, dataset, text_docs, dataset_document.doc_language, processing_rule.to_dict()                )                # save segment                self._load_segments(dataset, dataset_document, documents)                # load                self._load(                    index_processor=index_processor,                    dataset=dataset,                    dataset_document=dataset_document,                    documents=documents,                )            except DocumentIsPausedError:                raise DocumentIsPausedError("Document paused, document id: {}".format(dataset_document.id))            except ProviderTokenNotInitError as e:                dataset_document.indexing_status = "error"                dataset_document.error = str(e.description)                dataset_document.stopped_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)                db.session.commit()            except ObjectDeletedError:                logging.warning("Document deleted, document id: {}".format(dataset_document.id))            except Exception as e:                logging.exception("consume document failed")                dataset_document.indexing_status = "error"                dataset_document.error = str(e)                dataset_document.stopped_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)                db.session.commit()    def run_in_splitting_status(self, dataset_document: DatasetDocument):        """Run the indexing process when the index_status is splitting."""        try:            # get dataset            dataset = Dataset.query.filter_by(id=dataset_document.dataset_id).first()            if not dataset:                raise ValueError("no dataset found")            # get exist document_segment list and delete            document_segments = DocumentSegment.query.filter_by(                dataset_id=dataset.id, document_id=dataset_document.id            ).all()            for document_segment in document_segments:                db.session.delete(document_segment)                if dataset_document.doc_form == IndexType.PARENT_CHILD_INDEX:                    # delete child chunks                    db.session.query(ChildChunk).filter(ChildChunk.segment_id == document_segment.id).delete()            db.session.commit()            # get the process rule            processing_rule = (                db.session.query(DatasetProcessRule)                .filter(DatasetProcessRule.id == dataset_document.dataset_process_rule_id)                .first()            )            if not processing_rule:                raise ValueError("no process rule found")            index_type = dataset_document.doc_form            index_processor = IndexProcessorFactory(index_type).init_index_processor()            # extract            text_docs = self._extract(index_processor, dataset_document, processing_rule.to_dict())            # transform            documents = self._transform(                index_processor, dataset, text_docs, dataset_document.doc_language, processing_rule.to_dict()            )            # save segment            self._load_segments(dataset, dataset_document, documents)            # load            self._load(                index_processor=index_processor, dataset=dataset, dataset_document=dataset_document, documents=documents            )        except DocumentIsPausedError:            raise DocumentIsPausedError("Document paused, document id: {}".format(dataset_document.id))        except ProviderTokenNotInitError as e:            dataset_document.indexing_status = "error"            dataset_document.error = str(e.description)            dataset_document.stopped_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)            db.session.commit()        except Exception as e:            logging.exception("consume document failed")            dataset_document.indexing_status = "error"            dataset_document.error = str(e)            dataset_document.stopped_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)            db.session.commit()    def run_in_indexing_status(self, dataset_document: DatasetDocument):        """Run the indexing process when the index_status is indexing."""        try:            # get dataset            dataset = Dataset.query.filter_by(id=dataset_document.dataset_id).first()            if not dataset:                raise ValueError("no dataset found")            # get exist document_segment list and delete            document_segments = DocumentSegment.query.filter_by(                dataset_id=dataset.id, document_id=dataset_document.id            ).all()            documents = []            if document_segments:                for document_segment in document_segments:                    # transform segment to node                    if document_segment.status != "completed":                        document = Document(                            page_content=document_segment.content,                            metadata={                                "doc_id": document_segment.index_node_id,                                "doc_hash": document_segment.index_node_hash,                                "document_id": document_segment.document_id,                                "dataset_id": document_segment.dataset_id,                            },                        )                        if dataset_document.doc_form == IndexType.PARENT_CHILD_INDEX:                            child_chunks = document_segment.child_chunks                            if child_chunks:                                child_documents = []                                for child_chunk in child_chunks:                                    child_document = ChildDocument(                                        page_content=child_chunk.content,                                        metadata={                                            "doc_id": child_chunk.index_node_id,                                            "doc_hash": child_chunk.index_node_hash,                                            "document_id": document_segment.document_id,                                            "dataset_id": document_segment.dataset_id,                                        },                                    )                                    child_documents.append(child_document)                                document.children = child_documents                        documents.append(document)            # build index            # get the process rule            processing_rule = (                db.session.query(DatasetProcessRule)                .filter(DatasetProcessRule.id == dataset_document.dataset_process_rule_id)                .first()            )            index_type = dataset_document.doc_form            index_processor = IndexProcessorFactory(index_type).init_index_processor()            self._load(                index_processor=index_processor, dataset=dataset, dataset_document=dataset_document, documents=documents            )        except DocumentIsPausedError:            raise DocumentIsPausedError("Document paused, document id: {}".format(dataset_document.id))        except ProviderTokenNotInitError as e:            dataset_document.indexing_status = "error"            dataset_document.error = str(e.description)            dataset_document.stopped_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)            db.session.commit()        except Exception as e:            logging.exception("consume document failed")            dataset_document.indexing_status = "error"            dataset_document.error = str(e)            dataset_document.stopped_at = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)            db.session.commit()    def indexing_estimate(        self,        tenant_id: str,        extract_settings: list[ExtractSetting],        tmp_processing_rule: dict,        doc_form: Optional[str] = None,        doc_language: str = "English",        dataset_id: Optional[str] = None,        indexing_technique: str = "economy",    ) -> IndexingEstimate:        """        Estimate the indexing for the document.        """        # check document limit        features = FeatureService.get_features(tenant_id)        if features.billing.enabled:            count = len(extract_settings)            batch_upload_limit = dify_config.BATCH_UPLOAD_LIMIT            if count > batch_upload_limit:                raise ValueError(f"You have reached the batch upload limit of {batch_upload_limit}.")        embedding_model_instance = None        if dataset_id:            dataset = Dataset.query.filter_by(id=dataset_id).first()            if not dataset:                raise ValueError("Dataset not found.")            if dataset.indexing_technique == "high_quality" or indexing_technique == "high_quality":                if dataset.embedding_model_provider:                    embedding_model_instance = self.model_manager.get_model_instance(                        tenant_id=tenant_id,                        provider=dataset.embedding_model_provider,                        model_type=ModelType.TEXT_EMBEDDING,                        model=dataset.embedding_model,                    )                else:                    embedding_model_instance = self.model_manager.get_default_model_instance(                        tenant_id=tenant_id,                        model_type=ModelType.TEXT_EMBEDDING,                    )        else:            if indexing_technique == "high_quality":                embedding_model_instance = self.model_manager.get_default_model_instance(                    tenant_id=tenant_id,                    model_type=ModelType.TEXT_EMBEDDING,                )        preview_texts = []  # type: ignore        total_segments = 0        index_type = doc_form        index_processor = IndexProcessorFactory(index_type).init_index_processor()        for extract_setting in extract_settings:            # extract            processing_rule = DatasetProcessRule(                mode=tmp_processing_rule["mode"], rules=json.dumps(tmp_processing_rule["rules"])            )            text_docs = index_processor.extract(extract_setting, process_rule_mode=tmp_processing_rule["mode"])            documents = index_processor.transform(                text_docs,                embedding_model_instance=embedding_model_instance,                process_rule=processing_rule.to_dict(),                tenant_id=current_user.current_tenant_id,                doc_language=doc_language,                preview=True,            )            total_segments += len(documents)            for document in documents:                if len(preview_texts) < 10:                    if doc_form and doc_form == "qa_model":                        preview_detail = QAPreviewDetail(                            question=document.page_content, answer=document.metadata.get("answer") or ""                        )                        preview_texts.append(preview_detail)                    else:                        preview_detail = PreviewDetail(content=document.page_content)  # type: ignore                        if document.children:                            preview_detail.child_chunks = [child.page_content for child in document.children]  # type: ignore                        preview_texts.append(preview_detail)                # delete image files and related db records                image_upload_file_ids = get_image_upload_file_ids(document.page_content)                for upload_file_id in image_upload_file_ids:                    image_file = db.session.query(UploadFile).filter(UploadFile.id == upload_file_id).first()                    try:                        if image_file:                            storage.delete(image_file.key)                    except Exception:                        logging.exception(                            "Delete image_files failed while indexing_estimate, \                                          image_upload_file_is: {}".format(upload_file_id)                        )                    db.session.delete(image_file)        if doc_form and doc_form == "qa_model":            return IndexingEstimate(total_segments=total_segments * 20, qa_preview=preview_texts, preview=[])        return IndexingEstimate(total_segments=total_segments, preview=preview_texts)  # type: ignore    def _extract(        self, index_processor: BaseIndexProcessor, dataset_document: DatasetDocument, process_rule: dict    ) -> list[Document]:        # load file        if dataset_document.data_source_type not in {"upload_file", "notion_import", "website_crawl"}:            return []        data_source_info = dataset_document.data_source_info_dict        text_docs = []        if dataset_document.data_source_type == "upload_file":            if not data_source_info or "upload_file_id" not in data_source_info:                raise ValueError("no upload file found")            file_detail = (                db.session.query(UploadFile).filter(UploadFile.id == data_source_info["upload_file_id"]).one_or_none()            )            if file_detail:                extract_setting = ExtractSetting(                    datasource_type="upload_file", upload_file=file_detail, document_model=dataset_document.doc_form                )                text_docs = index_processor.extract(extract_setting, process_rule_mode=process_rule["mode"])        elif dataset_document.data_source_type == "notion_import":            if (                not data_source_info                or "notion_workspace_id" not in data_source_info                or "notion_page_id" not in data_source_info            ):                raise ValueError("no notion import info found")            extract_setting = ExtractSetting(                datasource_type="notion_import",                notion_info={                    "notion_workspace_id": data_source_info["notion_workspace_id"],                    "notion_obj_id": data_source_info["notion_page_id"],                    "notion_page_type": data_source_info["type"],                    "document": dataset_document,                    "tenant_id": dataset_document.tenant_id,                },                document_model=dataset_document.doc_form,            )            text_docs = index_processor.extract(extract_setting, process_rule_mode=process_rule["mode"])        elif dataset_document.data_source_type == "website_crawl":            if (                not data_source_info                or "provider" not in data_source_info                or "url" not in data_source_info                or "job_id" not in data_source_info            ):                raise ValueError("no website import info found")            extract_setting = ExtractSetting(                datasource_type="website_crawl",                website_info={                    "provider": data_source_info["provider"],                    "job_id": data_source_info["job_id"],                    "tenant_id": dataset_document.tenant_id,                    "url": data_source_info["url"],                    "mode": data_source_info["mode"],                    "only_main_content": data_source_info["only_main_content"],                },                document_model=dataset_document.doc_form,            )            text_docs = index_processor.extract(extract_setting, process_rule_mode=process_rule["mode"])        # update document status to splitting        self._update_document_index_status(            document_id=dataset_document.id,            after_indexing_status="splitting",            extra_update_params={                DatasetDocument.word_count: sum(len(text_doc.page_content) for text_doc in text_docs),                DatasetDocument.parsing_completed_at: datetime.datetime.now(datetime.UTC).replace(tzinfo=None),            },        )        # replace doc id to document model id        text_docs = cast(list[Document], text_docs)        for text_doc in text_docs:            if text_doc.metadata is not None:                text_doc.metadata["document_id"] = dataset_document.id                text_doc.metadata["dataset_id"] = dataset_document.dataset_id        return text_docs    @staticmethod    def filter_string(text):        text = re.sub(r"<\|", "<", text)        text = re.sub(r"\|>", ">", text)        text = re.sub(r"[\x00-\x08\x0B\x0C\x0E-\x1F\x7F\xEF\xBF\xBE]", "", text)        # Unicode  U+FFFE        text = re.sub("\ufffe", "", text)        return text    @staticmethod    def _get_splitter(        processing_rule_mode: str,        max_tokens: int,        chunk_overlap: int,        separator: str,        embedding_model_instance: Optional[ModelInstance],    ) -> TextSplitter:        """        Get the NodeParser object according to the processing rule.        """        if processing_rule_mode in ["custom", "hierarchical"]:            # The user-defined segmentation rule            max_segmentation_tokens_length = dify_config.INDEXING_MAX_SEGMENTATION_TOKENS_LENGTH            if max_tokens < 50 or max_tokens > max_segmentation_tokens_length:                raise ValueError(f"Custom segment length should be between 50 and {max_segmentation_tokens_length}.")            if separator:                separator = separator.replace("\\n", "\n")            character_splitter = FixedRecursiveCharacterTextSplitter.from_encoder(                chunk_size=max_tokens,                chunk_overlap=chunk_overlap,                fixed_separator=separator,                separators=["\n\n", "。", ". ", " ", ""],                embedding_model_instance=embedding_model_instance,            )        else:            # Automatic segmentation            automatic_rules: dict[str, Any] = dict(DatasetProcessRule.AUTOMATIC_RULES["segmentation"])            character_splitter = EnhanceRecursiveCharacterTextSplitter.from_encoder(                chunk_size=automatic_rules["max_tokens"],                chunk_overlap=automatic_rules["chunk_overlap"],                separators=["\n\n", "。", ". ", " ", ""],                embedding_model_instance=embedding_model_instance,            )        return character_splitter  # type: ignore    def _split_to_documents_for_estimate(        self, text_docs: list[Document], splitter: TextSplitter, processing_rule: DatasetProcessRule    ) -> list[Document]:        """        Split the text documents into nodes.        """        all_documents: list[Document] = []        for text_doc in text_docs:            # document clean            document_text = self._document_clean(text_doc.page_content, processing_rule)            text_doc.page_content = document_text            # parse document to nodes            documents = splitter.split_documents([text_doc])            split_documents = []            for document in documents:                if document.page_content is None or not document.page_content.strip():                    continue                if document.metadata is not None:                    doc_id = str(uuid.uuid4())                    hash = helper.generate_text_hash(document.page_content)                    document.metadata["doc_id"] = doc_id                    document.metadata["doc_hash"] = hash                split_documents.append(document)            all_documents.extend(split_documents)        return all_documents    @staticmethod    def _document_clean(text: str, processing_rule: DatasetProcessRule) -> str:        """        Clean the document text according to the processing rules.        """        if processing_rule.mode == "automatic":            rules = DatasetProcessRule.AUTOMATIC_RULES        else:            rules = json.loads(processing_rule.rules) if processing_rule.rules else {}        document_text = CleanProcessor.clean(text, {"rules": rules})        return document_text    @staticmethod    def format_split_text(text: str) -> list[QAPreviewDetail]:        regex = r"Q\d+:\s*(.*?)\s*A\d+:\s*([\s\S]*?)(?=Q\d+:|$)"        matches = re.findall(regex, text, re.UNICODE)        return [QAPreviewDetail(question=q, answer=re.sub(r"\n\s*", "\n", a.strip())) for q, a in matches if q and a]    def _load(        self,        index_processor: BaseIndexProcessor,        dataset: Dataset,        dataset_document: DatasetDocument,        documents: list[Document],    ) -> None:        """        insert index and update document/segment status to completed        """        embedding_model_instance = None        if dataset.indexing_technique == "high_quality":            embedding_model_instance = self.model_manager.get_model_instance(                tenant_id=dataset.tenant_id,                provider=dataset.embedding_model_provider,                model_type=ModelType.TEXT_EMBEDDING,                model=dataset.embedding_model,            )        # chunk nodes by chunk size        indexing_start_at = time.perf_counter()        tokens = 0        if dataset_document.doc_form != IndexType.PARENT_CHILD_INDEX:            # create keyword index            create_keyword_thread = threading.Thread(                target=self._process_keyword_index,                args=(current_app._get_current_object(), dataset.id, dataset_document.id, documents),  # type: ignore            )            create_keyword_thread.start()        max_workers = 10        if dataset.indexing_technique == "high_quality":            with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:                futures = []                # Distribute documents into multiple groups based on the hash values of page_content                # This is done to prevent multiple threads from processing the same document,                # Thereby avoiding potential database insertion deadlocks                document_groups: list[list[Document]] = [[] for _ in range(max_workers)]                for document in documents:                    hash = helper.generate_text_hash(document.page_content)                    group_index = int(hash, 16) % max_workers                    document_groups[group_index].append(document)                for chunk_documents in document_groups:                    if len(chunk_documents) == 0:                        continue                    futures.append(                        executor.submit(                            self._process_chunk,                            current_app._get_current_object(),  # type: ignore                            index_processor,                            chunk_documents,                            dataset,                            dataset_document,                            embedding_model_instance,                        )                    )                for future in futures:                    tokens += future.result()        if dataset_document.doc_form != IndexType.PARENT_CHILD_INDEX:            create_keyword_thread.join()        indexing_end_at = time.perf_counter()        # update document status to completed        self._update_document_index_status(            document_id=dataset_document.id,            after_indexing_status="completed",            extra_update_params={                DatasetDocument.tokens: tokens,                DatasetDocument.completed_at: datetime.datetime.now(datetime.UTC).replace(tzinfo=None),                DatasetDocument.indexing_latency: indexing_end_at - indexing_start_at,                DatasetDocument.error: None,            },        )    @staticmethod    def _process_keyword_index(flask_app, dataset_id, document_id, documents):        with flask_app.app_context():            dataset = Dataset.query.filter_by(id=dataset_id).first()            if not dataset:                raise ValueError("no dataset found")            keyword = Keyword(dataset)            keyword.create(documents)            if dataset.indexing_technique != "high_quality":                document_ids = [document.metadata["doc_id"] for document in documents]                db.session.query(DocumentSegment).filter(                    DocumentSegment.document_id == document_id,                    DocumentSegment.dataset_id == dataset_id,                    DocumentSegment.index_node_id.in_(document_ids),                    DocumentSegment.status == "indexing",                ).update(                    {                        DocumentSegment.status: "completed",                        DocumentSegment.enabled: True,                        DocumentSegment.completed_at: datetime.datetime.now(datetime.UTC).replace(tzinfo=None),                    }                )                db.session.commit()    def _process_chunk(        self, flask_app, index_processor, chunk_documents, dataset, dataset_document, embedding_model_instance    ):        with flask_app.app_context():            # check document is paused            self._check_document_paused_status(dataset_document.id)            tokens = 0            if embedding_model_instance:                tokens += sum(                    embedding_model_instance.get_text_embedding_num_tokens([document.page_content])                    for document in chunk_documents                )            # load index            index_processor.load(dataset, chunk_documents, with_keywords=False)            document_ids = [document.metadata["doc_id"] for document in chunk_documents]            db.session.query(DocumentSegment).filter(                DocumentSegment.document_id == dataset_document.id,                DocumentSegment.dataset_id == dataset.id,                DocumentSegment.index_node_id.in_(document_ids),                DocumentSegment.status == "indexing",            ).update(                {                    DocumentSegment.status: "completed",                    DocumentSegment.enabled: True,                    DocumentSegment.completed_at: datetime.datetime.now(datetime.UTC).replace(tzinfo=None),                }            )            db.session.commit()            return tokens    @staticmethod    def _check_document_paused_status(document_id: str):        indexing_cache_key = "document_{}_is_paused".format(document_id)        result = redis_client.get(indexing_cache_key)        if result:            raise DocumentIsPausedError()    @staticmethod    def _update_document_index_status(        document_id: str, after_indexing_status: str, extra_update_params: Optional[dict] = None    ) -> None:        """        Update the document indexing status.        """        count = DatasetDocument.query.filter_by(id=document_id, is_paused=True).count()        if count > 0:            raise DocumentIsPausedError()        document = DatasetDocument.query.filter_by(id=document_id).first()        if not document:            raise DocumentIsDeletedPausedError()        update_params = {DatasetDocument.indexing_status: after_indexing_status}        if extra_update_params:            update_params.update(extra_update_params)        DatasetDocument.query.filter_by(id=document_id).update(update_params)        db.session.commit()    @staticmethod    def _update_segments_by_document(dataset_document_id: str, update_params: dict) -> None:        """        Update the document segment by document id.        """        DocumentSegment.query.filter_by(document_id=dataset_document_id).update(update_params)        db.session.commit()    def _transform(        self,        index_processor: BaseIndexProcessor,        dataset: Dataset,        text_docs: list[Document],        doc_language: str,        process_rule: dict,    ) -> list[Document]:        # get embedding model instance        embedding_model_instance = None        if dataset.indexing_technique == "high_quality":            if dataset.embedding_model_provider:                embedding_model_instance = self.model_manager.get_model_instance(                    tenant_id=dataset.tenant_id,                    provider=dataset.embedding_model_provider,                    model_type=ModelType.TEXT_EMBEDDING,                    model=dataset.embedding_model,                )            else:                embedding_model_instance = self.model_manager.get_default_model_instance(                    tenant_id=dataset.tenant_id,                    model_type=ModelType.TEXT_EMBEDDING,                )        documents = index_processor.transform(            text_docs,            embedding_model_instance=embedding_model_instance,            process_rule=process_rule,            tenant_id=dataset.tenant_id,            doc_language=doc_language,        )        return documents    def _load_segments(self, dataset, dataset_document, documents):        # save node to document segment        doc_store = DatasetDocumentStore(            dataset=dataset, user_id=dataset_document.created_by, document_id=dataset_document.id        )        # add document segments        doc_store.add_documents(docs=documents, save_child=dataset_document.doc_form == IndexType.PARENT_CHILD_INDEX)        # update document status to indexing        cur_time = datetime.datetime.now(datetime.UTC).replace(tzinfo=None)        self._update_document_index_status(            document_id=dataset_document.id,            after_indexing_status="indexing",            extra_update_params={                DatasetDocument.cleaning_completed_at: cur_time,                DatasetDocument.splitting_completed_at: cur_time,            },        )        # update segment status to indexing        self._update_segments_by_document(            dataset_document_id=dataset_document.id,            update_params={                DocumentSegment.status: "indexing",                DocumentSegment.indexing_at: datetime.datetime.now(datetime.UTC).replace(tzinfo=None),            },        )        passclass DocumentIsPausedError(Exception):    passclass DocumentIsDeletedPausedError(Exception):    pass
 |