indexing_runner.py 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854
  1. import concurrent.futures
  2. import datetime
  3. import json
  4. import logging
  5. import re
  6. import threading
  7. import time
  8. import uuid
  9. from typing import Optional, cast
  10. from flask import Flask, current_app
  11. from flask_login import current_user
  12. from sqlalchemy.orm.exc import ObjectDeletedError
  13. from configs import dify_config
  14. from core.errors.error import ProviderTokenNotInitError
  15. from core.llm_generator.llm_generator import LLMGenerator
  16. from core.model_manager import ModelInstance, ModelManager
  17. from core.model_runtime.entities.model_entities import ModelType
  18. from core.rag.datasource.keyword.keyword_factory import Keyword
  19. from core.rag.docstore.dataset_docstore import DatasetDocumentStore
  20. from core.rag.extractor.entity.extract_setting import ExtractSetting
  21. from core.rag.index_processor.index_processor_base import BaseIndexProcessor
  22. from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
  23. from core.rag.models.document import Document
  24. from core.rag.splitter.fixed_text_splitter import (
  25. EnhanceRecursiveCharacterTextSplitter,
  26. FixedRecursiveCharacterTextSplitter,
  27. )
  28. from core.rag.splitter.text_splitter import TextSplitter
  29. from extensions.ext_database import db
  30. from extensions.ext_redis import redis_client
  31. from extensions.ext_storage import storage
  32. from libs import helper
  33. from models.dataset import Dataset, DatasetProcessRule, DocumentSegment
  34. from models.dataset import Document as DatasetDocument
  35. from models.model import UploadFile
  36. from services.feature_service import FeatureService
  37. class IndexingRunner:
  38. def __init__(self):
  39. self.storage = storage
  40. self.model_manager = ModelManager()
  41. def run(self, dataset_documents: list[DatasetDocument]):
  42. """Run the indexing process."""
  43. for dataset_document in dataset_documents:
  44. try:
  45. # get dataset
  46. dataset = Dataset.query.filter_by(
  47. id=dataset_document.dataset_id
  48. ).first()
  49. if not dataset:
  50. raise ValueError("no dataset found")
  51. # get the process rule
  52. processing_rule = db.session.query(DatasetProcessRule). \
  53. filter(DatasetProcessRule.id == dataset_document.dataset_process_rule_id). \
  54. first()
  55. index_type = dataset_document.doc_form
  56. index_processor = IndexProcessorFactory(index_type).init_index_processor()
  57. # extract
  58. text_docs = self._extract(index_processor, dataset_document, processing_rule.to_dict())
  59. # transform
  60. documents = self._transform(index_processor, dataset, text_docs, dataset_document.doc_language,
  61. processing_rule.to_dict())
  62. # save segment
  63. self._load_segments(dataset, dataset_document, documents)
  64. # load
  65. self._load(
  66. index_processor=index_processor,
  67. dataset=dataset,
  68. dataset_document=dataset_document,
  69. documents=documents
  70. )
  71. except DocumentIsPausedException:
  72. raise DocumentIsPausedException('Document paused, document id: {}'.format(dataset_document.id))
  73. except ProviderTokenNotInitError as e:
  74. dataset_document.indexing_status = 'error'
  75. dataset_document.error = str(e.description)
  76. dataset_document.stopped_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
  77. db.session.commit()
  78. except ObjectDeletedError:
  79. logging.warning('Document deleted, document id: {}'.format(dataset_document.id))
  80. except Exception as e:
  81. logging.exception("consume document failed")
  82. dataset_document.indexing_status = 'error'
  83. dataset_document.error = str(e)
  84. dataset_document.stopped_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
  85. db.session.commit()
  86. def run_in_splitting_status(self, dataset_document: DatasetDocument):
  87. """Run the indexing process when the index_status is splitting."""
  88. try:
  89. # get dataset
  90. dataset = Dataset.query.filter_by(
  91. id=dataset_document.dataset_id
  92. ).first()
  93. if not dataset:
  94. raise ValueError("no dataset found")
  95. # get exist document_segment list and delete
  96. document_segments = DocumentSegment.query.filter_by(
  97. dataset_id=dataset.id,
  98. document_id=dataset_document.id
  99. ).all()
  100. for document_segment in document_segments:
  101. db.session.delete(document_segment)
  102. db.session.commit()
  103. # get the process rule
  104. processing_rule = db.session.query(DatasetProcessRule). \
  105. filter(DatasetProcessRule.id == dataset_document.dataset_process_rule_id). \
  106. first()
  107. index_type = dataset_document.doc_form
  108. index_processor = IndexProcessorFactory(index_type).init_index_processor()
  109. # extract
  110. text_docs = self._extract(index_processor, dataset_document, processing_rule.to_dict())
  111. # transform
  112. documents = self._transform(index_processor, dataset, text_docs, dataset_document.doc_language,
  113. processing_rule.to_dict())
  114. # save segment
  115. self._load_segments(dataset, dataset_document, documents)
  116. # load
  117. self._load(
  118. index_processor=index_processor,
  119. dataset=dataset,
  120. dataset_document=dataset_document,
  121. documents=documents
  122. )
  123. except DocumentIsPausedException:
  124. raise DocumentIsPausedException('Document paused, document id: {}'.format(dataset_document.id))
  125. except ProviderTokenNotInitError as e:
  126. dataset_document.indexing_status = 'error'
  127. dataset_document.error = str(e.description)
  128. dataset_document.stopped_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
  129. db.session.commit()
  130. except Exception as e:
  131. logging.exception("consume document failed")
  132. dataset_document.indexing_status = 'error'
  133. dataset_document.error = str(e)
  134. dataset_document.stopped_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
  135. db.session.commit()
  136. def run_in_indexing_status(self, dataset_document: DatasetDocument):
  137. """Run the indexing process when the index_status is indexing."""
  138. try:
  139. # get dataset
  140. dataset = Dataset.query.filter_by(
  141. id=dataset_document.dataset_id
  142. ).first()
  143. if not dataset:
  144. raise ValueError("no dataset found")
  145. # get exist document_segment list and delete
  146. document_segments = DocumentSegment.query.filter_by(
  147. dataset_id=dataset.id,
  148. document_id=dataset_document.id
  149. ).all()
  150. documents = []
  151. if document_segments:
  152. for document_segment in document_segments:
  153. # transform segment to node
  154. if document_segment.status != "completed":
  155. document = Document(
  156. page_content=document_segment.content,
  157. metadata={
  158. "doc_id": document_segment.index_node_id,
  159. "doc_hash": document_segment.index_node_hash,
  160. "document_id": document_segment.document_id,
  161. "dataset_id": document_segment.dataset_id,
  162. }
  163. )
  164. documents.append(document)
  165. # build index
  166. # get the process rule
  167. processing_rule = db.session.query(DatasetProcessRule). \
  168. filter(DatasetProcessRule.id == dataset_document.dataset_process_rule_id). \
  169. first()
  170. index_type = dataset_document.doc_form
  171. index_processor = IndexProcessorFactory(index_type).init_index_processor()
  172. self._load(
  173. index_processor=index_processor,
  174. dataset=dataset,
  175. dataset_document=dataset_document,
  176. documents=documents
  177. )
  178. except DocumentIsPausedException:
  179. raise DocumentIsPausedException('Document paused, document id: {}'.format(dataset_document.id))
  180. except ProviderTokenNotInitError as e:
  181. dataset_document.indexing_status = 'error'
  182. dataset_document.error = str(e.description)
  183. dataset_document.stopped_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
  184. db.session.commit()
  185. except Exception as e:
  186. logging.exception("consume document failed")
  187. dataset_document.indexing_status = 'error'
  188. dataset_document.error = str(e)
  189. dataset_document.stopped_at = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
  190. db.session.commit()
  191. def indexing_estimate(self, tenant_id: str, extract_settings: list[ExtractSetting], tmp_processing_rule: dict,
  192. doc_form: str = None, doc_language: str = 'English', dataset_id: str = None,
  193. indexing_technique: str = 'economy') -> dict:
  194. """
  195. Estimate the indexing for the document.
  196. """
  197. # check document limit
  198. features = FeatureService.get_features(tenant_id)
  199. if features.billing.enabled:
  200. count = len(extract_settings)
  201. batch_upload_limit = dify_config.BATCH_UPLOAD_LIMIT
  202. if count > batch_upload_limit:
  203. raise ValueError(f"You have reached the batch upload limit of {batch_upload_limit}.")
  204. embedding_model_instance = None
  205. if dataset_id:
  206. dataset = Dataset.query.filter_by(
  207. id=dataset_id
  208. ).first()
  209. if not dataset:
  210. raise ValueError('Dataset not found.')
  211. if dataset.indexing_technique == 'high_quality' or indexing_technique == 'high_quality':
  212. if dataset.embedding_model_provider:
  213. embedding_model_instance = self.model_manager.get_model_instance(
  214. tenant_id=tenant_id,
  215. provider=dataset.embedding_model_provider,
  216. model_type=ModelType.TEXT_EMBEDDING,
  217. model=dataset.embedding_model
  218. )
  219. else:
  220. embedding_model_instance = self.model_manager.get_default_model_instance(
  221. tenant_id=tenant_id,
  222. model_type=ModelType.TEXT_EMBEDDING,
  223. )
  224. else:
  225. if indexing_technique == 'high_quality':
  226. embedding_model_instance = self.model_manager.get_default_model_instance(
  227. tenant_id=tenant_id,
  228. model_type=ModelType.TEXT_EMBEDDING,
  229. )
  230. preview_texts = []
  231. total_segments = 0
  232. index_type = doc_form
  233. index_processor = IndexProcessorFactory(index_type).init_index_processor()
  234. all_text_docs = []
  235. for extract_setting in extract_settings:
  236. # extract
  237. text_docs = index_processor.extract(extract_setting, process_rule_mode=tmp_processing_rule["mode"])
  238. all_text_docs.extend(text_docs)
  239. processing_rule = DatasetProcessRule(
  240. mode=tmp_processing_rule["mode"],
  241. rules=json.dumps(tmp_processing_rule["rules"])
  242. )
  243. # get splitter
  244. splitter = self._get_splitter(processing_rule, embedding_model_instance)
  245. # split to documents
  246. documents = self._split_to_documents_for_estimate(
  247. text_docs=text_docs,
  248. splitter=splitter,
  249. processing_rule=processing_rule
  250. )
  251. total_segments += len(documents)
  252. for document in documents:
  253. if len(preview_texts) < 5:
  254. preview_texts.append(document.page_content)
  255. if doc_form and doc_form == 'qa_model':
  256. if len(preview_texts) > 0:
  257. # qa model document
  258. response = LLMGenerator.generate_qa_document(current_user.current_tenant_id, preview_texts[0],
  259. doc_language)
  260. document_qa_list = self.format_split_text(response)
  261. return {
  262. "total_segments": total_segments * 20,
  263. "qa_preview": document_qa_list,
  264. "preview": preview_texts
  265. }
  266. return {
  267. "total_segments": total_segments,
  268. "preview": preview_texts
  269. }
  270. def _extract(self, index_processor: BaseIndexProcessor, dataset_document: DatasetDocument, process_rule: dict) \
  271. -> list[Document]:
  272. # load file
  273. if dataset_document.data_source_type not in ["upload_file", "notion_import", "website_crawl"]:
  274. return []
  275. data_source_info = dataset_document.data_source_info_dict
  276. text_docs = []
  277. if dataset_document.data_source_type == 'upload_file':
  278. if not data_source_info or 'upload_file_id' not in data_source_info:
  279. raise ValueError("no upload file found")
  280. file_detail = db.session.query(UploadFile). \
  281. filter(UploadFile.id == data_source_info['upload_file_id']). \
  282. one_or_none()
  283. if file_detail:
  284. extract_setting = ExtractSetting(
  285. datasource_type="upload_file",
  286. upload_file=file_detail,
  287. document_model=dataset_document.doc_form
  288. )
  289. text_docs = index_processor.extract(extract_setting, process_rule_mode=process_rule['mode'])
  290. elif dataset_document.data_source_type == 'notion_import':
  291. if (not data_source_info or 'notion_workspace_id' not in data_source_info
  292. or 'notion_page_id' not in data_source_info):
  293. raise ValueError("no notion import info found")
  294. extract_setting = ExtractSetting(
  295. datasource_type="notion_import",
  296. notion_info={
  297. "notion_workspace_id": data_source_info['notion_workspace_id'],
  298. "notion_obj_id": data_source_info['notion_page_id'],
  299. "notion_page_type": data_source_info['type'],
  300. "document": dataset_document,
  301. "tenant_id": dataset_document.tenant_id
  302. },
  303. document_model=dataset_document.doc_form
  304. )
  305. text_docs = index_processor.extract(extract_setting, process_rule_mode=process_rule['mode'])
  306. elif dataset_document.data_source_type == 'website_crawl':
  307. if (not data_source_info or 'provider' not in data_source_info
  308. or 'url' not in data_source_info or 'job_id' not in data_source_info):
  309. raise ValueError("no website import info found")
  310. extract_setting = ExtractSetting(
  311. datasource_type="website_crawl",
  312. website_info={
  313. "provider": data_source_info['provider'],
  314. "job_id": data_source_info['job_id'],
  315. "tenant_id": dataset_document.tenant_id,
  316. "url": data_source_info['url'],
  317. "mode": data_source_info['mode'],
  318. "only_main_content": data_source_info['only_main_content']
  319. },
  320. document_model=dataset_document.doc_form
  321. )
  322. text_docs = index_processor.extract(extract_setting, process_rule_mode=process_rule['mode'])
  323. # update document status to splitting
  324. self._update_document_index_status(
  325. document_id=dataset_document.id,
  326. after_indexing_status="splitting",
  327. extra_update_params={
  328. DatasetDocument.word_count: sum(len(text_doc.page_content) for text_doc in text_docs),
  329. DatasetDocument.parsing_completed_at: datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
  330. }
  331. )
  332. # replace doc id to document model id
  333. text_docs = cast(list[Document], text_docs)
  334. for text_doc in text_docs:
  335. text_doc.metadata['document_id'] = dataset_document.id
  336. text_doc.metadata['dataset_id'] = dataset_document.dataset_id
  337. return text_docs
  338. @staticmethod
  339. def filter_string(text):
  340. text = re.sub(r'<\|', '<', text)
  341. text = re.sub(r'\|>', '>', text)
  342. text = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F\xEF\xBF\xBE]', '', text)
  343. # Unicode U+FFFE
  344. text = re.sub('\uFFFE', '', text)
  345. return text
  346. @staticmethod
  347. def _get_splitter(processing_rule: DatasetProcessRule,
  348. embedding_model_instance: Optional[ModelInstance]) -> TextSplitter:
  349. """
  350. Get the NodeParser object according to the processing rule.
  351. """
  352. if processing_rule.mode == "custom":
  353. # The user-defined segmentation rule
  354. rules = json.loads(processing_rule.rules)
  355. segmentation = rules["segmentation"]
  356. max_segmentation_tokens_length = dify_config.INDEXING_MAX_SEGMENTATION_TOKENS_LENGTH
  357. if segmentation["max_tokens"] < 50 or segmentation["max_tokens"] > max_segmentation_tokens_length:
  358. raise ValueError(f"Custom segment length should be between 50 and {max_segmentation_tokens_length}.")
  359. separator = segmentation["separator"]
  360. if separator:
  361. separator = separator.replace('\\n', '\n')
  362. if segmentation.get('chunk_overlap'):
  363. chunk_overlap = segmentation['chunk_overlap']
  364. else:
  365. chunk_overlap = 0
  366. character_splitter = FixedRecursiveCharacterTextSplitter.from_encoder(
  367. chunk_size=segmentation["max_tokens"],
  368. chunk_overlap=chunk_overlap,
  369. fixed_separator=separator,
  370. separators=["\n\n", "。", ". ", " ", ""],
  371. embedding_model_instance=embedding_model_instance
  372. )
  373. else:
  374. # Automatic segmentation
  375. character_splitter = EnhanceRecursiveCharacterTextSplitter.from_encoder(
  376. chunk_size=DatasetProcessRule.AUTOMATIC_RULES['segmentation']['max_tokens'],
  377. chunk_overlap=DatasetProcessRule.AUTOMATIC_RULES['segmentation']['chunk_overlap'],
  378. separators=["\n\n", "。", ". ", " ", ""],
  379. embedding_model_instance=embedding_model_instance
  380. )
  381. return character_splitter
  382. def _step_split(self, text_docs: list[Document], splitter: TextSplitter,
  383. dataset: Dataset, dataset_document: DatasetDocument, processing_rule: DatasetProcessRule) \
  384. -> list[Document]:
  385. """
  386. Split the text documents into documents and save them to the document segment.
  387. """
  388. documents = self._split_to_documents(
  389. text_docs=text_docs,
  390. splitter=splitter,
  391. processing_rule=processing_rule,
  392. tenant_id=dataset.tenant_id,
  393. document_form=dataset_document.doc_form,
  394. document_language=dataset_document.doc_language
  395. )
  396. # save node to document segment
  397. doc_store = DatasetDocumentStore(
  398. dataset=dataset,
  399. user_id=dataset_document.created_by,
  400. document_id=dataset_document.id
  401. )
  402. # add document segments
  403. doc_store.add_documents(documents)
  404. # update document status to indexing
  405. cur_time = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
  406. self._update_document_index_status(
  407. document_id=dataset_document.id,
  408. after_indexing_status="indexing",
  409. extra_update_params={
  410. DatasetDocument.cleaning_completed_at: cur_time,
  411. DatasetDocument.splitting_completed_at: cur_time,
  412. }
  413. )
  414. # update segment status to indexing
  415. self._update_segments_by_document(
  416. dataset_document_id=dataset_document.id,
  417. update_params={
  418. DocumentSegment.status: "indexing",
  419. DocumentSegment.indexing_at: datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
  420. }
  421. )
  422. return documents
  423. def _split_to_documents(self, text_docs: list[Document], splitter: TextSplitter,
  424. processing_rule: DatasetProcessRule, tenant_id: str,
  425. document_form: str, document_language: str) -> list[Document]:
  426. """
  427. Split the text documents into nodes.
  428. """
  429. all_documents = []
  430. all_qa_documents = []
  431. for text_doc in text_docs:
  432. # document clean
  433. document_text = self._document_clean(text_doc.page_content, processing_rule)
  434. text_doc.page_content = document_text
  435. # parse document to nodes
  436. documents = splitter.split_documents([text_doc])
  437. split_documents = []
  438. for document_node in documents:
  439. if document_node.page_content.strip():
  440. doc_id = str(uuid.uuid4())
  441. hash = helper.generate_text_hash(document_node.page_content)
  442. document_node.metadata['doc_id'] = doc_id
  443. document_node.metadata['doc_hash'] = hash
  444. # delete Spliter character
  445. page_content = document_node.page_content
  446. if page_content.startswith(".") or page_content.startswith("。"):
  447. page_content = page_content[1:]
  448. else:
  449. page_content = page_content
  450. document_node.page_content = page_content
  451. if document_node.page_content:
  452. split_documents.append(document_node)
  453. all_documents.extend(split_documents)
  454. # processing qa document
  455. if document_form == 'qa_model':
  456. for i in range(0, len(all_documents), 10):
  457. threads = []
  458. sub_documents = all_documents[i:i + 10]
  459. for doc in sub_documents:
  460. document_format_thread = threading.Thread(target=self.format_qa_document, kwargs={
  461. 'flask_app': current_app._get_current_object(),
  462. 'tenant_id': tenant_id, 'document_node': doc, 'all_qa_documents': all_qa_documents,
  463. 'document_language': document_language})
  464. threads.append(document_format_thread)
  465. document_format_thread.start()
  466. for thread in threads:
  467. thread.join()
  468. return all_qa_documents
  469. return all_documents
  470. def format_qa_document(self, flask_app: Flask, tenant_id: str, document_node, all_qa_documents, document_language):
  471. format_documents = []
  472. if document_node.page_content is None or not document_node.page_content.strip():
  473. return
  474. with flask_app.app_context():
  475. try:
  476. # qa model document
  477. response = LLMGenerator.generate_qa_document(tenant_id, document_node.page_content, document_language)
  478. document_qa_list = self.format_split_text(response)
  479. qa_documents = []
  480. for result in document_qa_list:
  481. qa_document = Document(page_content=result['question'], metadata=document_node.metadata.model_copy())
  482. doc_id = str(uuid.uuid4())
  483. hash = helper.generate_text_hash(result['question'])
  484. qa_document.metadata['answer'] = result['answer']
  485. qa_document.metadata['doc_id'] = doc_id
  486. qa_document.metadata['doc_hash'] = hash
  487. qa_documents.append(qa_document)
  488. format_documents.extend(qa_documents)
  489. except Exception as e:
  490. logging.exception(e)
  491. all_qa_documents.extend(format_documents)
  492. def _split_to_documents_for_estimate(self, text_docs: list[Document], splitter: TextSplitter,
  493. processing_rule: DatasetProcessRule) -> list[Document]:
  494. """
  495. Split the text documents into nodes.
  496. """
  497. all_documents = []
  498. for text_doc in text_docs:
  499. # document clean
  500. document_text = self._document_clean(text_doc.page_content, processing_rule)
  501. text_doc.page_content = document_text
  502. # parse document to nodes
  503. documents = splitter.split_documents([text_doc])
  504. split_documents = []
  505. for document in documents:
  506. if document.page_content is None or not document.page_content.strip():
  507. continue
  508. doc_id = str(uuid.uuid4())
  509. hash = helper.generate_text_hash(document.page_content)
  510. document.metadata['doc_id'] = doc_id
  511. document.metadata['doc_hash'] = hash
  512. split_documents.append(document)
  513. all_documents.extend(split_documents)
  514. return all_documents
  515. @staticmethod
  516. def _document_clean(text: str, processing_rule: DatasetProcessRule) -> str:
  517. """
  518. Clean the document text according to the processing rules.
  519. """
  520. if processing_rule.mode == "automatic":
  521. rules = DatasetProcessRule.AUTOMATIC_RULES
  522. else:
  523. rules = json.loads(processing_rule.rules) if processing_rule.rules else {}
  524. if 'pre_processing_rules' in rules:
  525. pre_processing_rules = rules["pre_processing_rules"]
  526. for pre_processing_rule in pre_processing_rules:
  527. if pre_processing_rule["id"] == "remove_extra_spaces" and pre_processing_rule["enabled"] is True:
  528. # Remove extra spaces
  529. pattern = r'\n{3,}'
  530. text = re.sub(pattern, '\n\n', text)
  531. pattern = r'[\t\f\r\x20\u00a0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]{2,}'
  532. text = re.sub(pattern, ' ', text)
  533. elif pre_processing_rule["id"] == "remove_urls_emails" and pre_processing_rule["enabled"] is True:
  534. # Remove email
  535. pattern = r'([a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+)'
  536. text = re.sub(pattern, '', text)
  537. # Remove URL
  538. pattern = r'https?://[^\s]+'
  539. text = re.sub(pattern, '', text)
  540. return text
  541. @staticmethod
  542. def format_split_text(text):
  543. regex = r"Q\d+:\s*(.*?)\s*A\d+:\s*([\s\S]*?)(?=Q\d+:|$)"
  544. matches = re.findall(regex, text, re.UNICODE)
  545. return [
  546. {
  547. "question": q,
  548. "answer": re.sub(r"\n\s*", "\n", a.strip())
  549. }
  550. for q, a in matches if q and a
  551. ]
  552. def _load(self, index_processor: BaseIndexProcessor, dataset: Dataset,
  553. dataset_document: DatasetDocument, documents: list[Document]) -> None:
  554. """
  555. insert index and update document/segment status to completed
  556. """
  557. embedding_model_instance = None
  558. if dataset.indexing_technique == 'high_quality':
  559. embedding_model_instance = self.model_manager.get_model_instance(
  560. tenant_id=dataset.tenant_id,
  561. provider=dataset.embedding_model_provider,
  562. model_type=ModelType.TEXT_EMBEDDING,
  563. model=dataset.embedding_model
  564. )
  565. # chunk nodes by chunk size
  566. indexing_start_at = time.perf_counter()
  567. tokens = 0
  568. chunk_size = 10
  569. # create keyword index
  570. create_keyword_thread = threading.Thread(target=self._process_keyword_index,
  571. args=(current_app._get_current_object(),
  572. dataset.id, dataset_document.id, documents))
  573. create_keyword_thread.start()
  574. if dataset.indexing_technique == 'high_quality':
  575. with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
  576. futures = []
  577. for i in range(0, len(documents), chunk_size):
  578. chunk_documents = documents[i:i + chunk_size]
  579. futures.append(executor.submit(self._process_chunk, current_app._get_current_object(), index_processor,
  580. chunk_documents, dataset,
  581. dataset_document, embedding_model_instance))
  582. for future in futures:
  583. tokens += future.result()
  584. create_keyword_thread.join()
  585. indexing_end_at = time.perf_counter()
  586. # update document status to completed
  587. self._update_document_index_status(
  588. document_id=dataset_document.id,
  589. after_indexing_status="completed",
  590. extra_update_params={
  591. DatasetDocument.tokens: tokens,
  592. DatasetDocument.completed_at: datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None),
  593. DatasetDocument.indexing_latency: indexing_end_at - indexing_start_at,
  594. DatasetDocument.error: None,
  595. }
  596. )
  597. @staticmethod
  598. def _process_keyword_index(flask_app, dataset_id, document_id, documents):
  599. with flask_app.app_context():
  600. dataset = Dataset.query.filter_by(id=dataset_id).first()
  601. if not dataset:
  602. raise ValueError("no dataset found")
  603. keyword = Keyword(dataset)
  604. keyword.create(documents)
  605. if dataset.indexing_technique != 'high_quality':
  606. document_ids = [document.metadata['doc_id'] for document in documents]
  607. db.session.query(DocumentSegment).filter(
  608. DocumentSegment.document_id == document_id,
  609. DocumentSegment.dataset_id == dataset_id,
  610. DocumentSegment.index_node_id.in_(document_ids),
  611. DocumentSegment.status == "indexing"
  612. ).update({
  613. DocumentSegment.status: "completed",
  614. DocumentSegment.enabled: True,
  615. DocumentSegment.completed_at: datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
  616. })
  617. db.session.commit()
  618. def _process_chunk(self, flask_app, index_processor, chunk_documents, dataset, dataset_document,
  619. embedding_model_instance):
  620. with flask_app.app_context():
  621. # check document is paused
  622. self._check_document_paused_status(dataset_document.id)
  623. tokens = 0
  624. if embedding_model_instance:
  625. tokens += sum(
  626. embedding_model_instance.get_text_embedding_num_tokens(
  627. [document.page_content]
  628. )
  629. for document in chunk_documents
  630. )
  631. # load index
  632. index_processor.load(dataset, chunk_documents, with_keywords=False)
  633. document_ids = [document.metadata['doc_id'] for document in chunk_documents]
  634. db.session.query(DocumentSegment).filter(
  635. DocumentSegment.document_id == dataset_document.id,
  636. DocumentSegment.dataset_id == dataset.id,
  637. DocumentSegment.index_node_id.in_(document_ids),
  638. DocumentSegment.status == "indexing"
  639. ).update({
  640. DocumentSegment.status: "completed",
  641. DocumentSegment.enabled: True,
  642. DocumentSegment.completed_at: datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
  643. })
  644. db.session.commit()
  645. return tokens
  646. @staticmethod
  647. def _check_document_paused_status(document_id: str):
  648. indexing_cache_key = 'document_{}_is_paused'.format(document_id)
  649. result = redis_client.get(indexing_cache_key)
  650. if result:
  651. raise DocumentIsPausedException()
  652. @staticmethod
  653. def _update_document_index_status(document_id: str, after_indexing_status: str,
  654. extra_update_params: Optional[dict] = None) -> None:
  655. """
  656. Update the document indexing status.
  657. """
  658. count = DatasetDocument.query.filter_by(id=document_id, is_paused=True).count()
  659. if count > 0:
  660. raise DocumentIsPausedException()
  661. document = DatasetDocument.query.filter_by(id=document_id).first()
  662. if not document:
  663. raise DocumentIsDeletedPausedException()
  664. update_params = {
  665. DatasetDocument.indexing_status: after_indexing_status
  666. }
  667. if extra_update_params:
  668. update_params.update(extra_update_params)
  669. DatasetDocument.query.filter_by(id=document_id).update(update_params)
  670. db.session.commit()
  671. @staticmethod
  672. def _update_segments_by_document(dataset_document_id: str, update_params: dict) -> None:
  673. """
  674. Update the document segment by document id.
  675. """
  676. DocumentSegment.query.filter_by(document_id=dataset_document_id).update(update_params)
  677. db.session.commit()
  678. @staticmethod
  679. def batch_add_segments(segments: list[DocumentSegment], dataset: Dataset):
  680. """
  681. Batch add segments index processing
  682. """
  683. documents = []
  684. for segment in segments:
  685. document = Document(
  686. page_content=segment.content,
  687. metadata={
  688. "doc_id": segment.index_node_id,
  689. "doc_hash": segment.index_node_hash,
  690. "document_id": segment.document_id,
  691. "dataset_id": segment.dataset_id,
  692. }
  693. )
  694. documents.append(document)
  695. # save vector index
  696. index_type = dataset.doc_form
  697. index_processor = IndexProcessorFactory(index_type).init_index_processor()
  698. index_processor.load(dataset, documents)
  699. def _transform(self, index_processor: BaseIndexProcessor, dataset: Dataset,
  700. text_docs: list[Document], doc_language: str, process_rule: dict) -> list[Document]:
  701. # get embedding model instance
  702. embedding_model_instance = None
  703. if dataset.indexing_technique == 'high_quality':
  704. if dataset.embedding_model_provider:
  705. embedding_model_instance = self.model_manager.get_model_instance(
  706. tenant_id=dataset.tenant_id,
  707. provider=dataset.embedding_model_provider,
  708. model_type=ModelType.TEXT_EMBEDDING,
  709. model=dataset.embedding_model
  710. )
  711. else:
  712. embedding_model_instance = self.model_manager.get_default_model_instance(
  713. tenant_id=dataset.tenant_id,
  714. model_type=ModelType.TEXT_EMBEDDING,
  715. )
  716. documents = index_processor.transform(text_docs, embedding_model_instance=embedding_model_instance,
  717. process_rule=process_rule, tenant_id=dataset.tenant_id,
  718. doc_language=doc_language)
  719. return documents
  720. def _load_segments(self, dataset, dataset_document, documents):
  721. # save node to document segment
  722. doc_store = DatasetDocumentStore(
  723. dataset=dataset,
  724. user_id=dataset_document.created_by,
  725. document_id=dataset_document.id
  726. )
  727. # add document segments
  728. doc_store.add_documents(documents)
  729. # update document status to indexing
  730. cur_time = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
  731. self._update_document_index_status(
  732. document_id=dataset_document.id,
  733. after_indexing_status="indexing",
  734. extra_update_params={
  735. DatasetDocument.cleaning_completed_at: cur_time,
  736. DatasetDocument.splitting_completed_at: cur_time,
  737. }
  738. )
  739. # update segment status to indexing
  740. self._update_segments_by_document(
  741. dataset_document_id=dataset_document.id,
  742. update_params={
  743. DocumentSegment.status: "indexing",
  744. DocumentSegment.indexing_at: datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
  745. }
  746. )
  747. pass
  748. class DocumentIsPausedException(Exception):
  749. pass
  750. class DocumentIsDeletedPausedException(Exception):
  751. pass