indexing_runner.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812
  1. import datetime
  2. import json
  3. import logging
  4. import re
  5. import threading
  6. import time
  7. import uuid
  8. from typing import Optional, cast
  9. from flask import Flask, current_app
  10. from flask_login import current_user
  11. from sqlalchemy.orm.exc import ObjectDeletedError
  12. from core.docstore.dataset_docstore import DatasetDocumentStore
  13. from core.errors.error import ProviderTokenNotInitError
  14. from core.generator.llm_generator import LLMGenerator
  15. from core.model_manager import ModelInstance, ModelManager
  16. from core.model_runtime.entities.model_entities import ModelType, PriceType
  17. from core.model_runtime.model_providers.__base.large_language_model import LargeLanguageModel
  18. from core.model_runtime.model_providers.__base.text_embedding_model import TextEmbeddingModel
  19. from core.rag.extractor.entity.extract_setting import ExtractSetting
  20. from core.rag.index_processor.index_processor_base import BaseIndexProcessor
  21. from core.rag.index_processor.index_processor_factory import IndexProcessorFactory
  22. from core.rag.models.document import Document
  23. from core.splitter.fixed_text_splitter import EnhanceRecursiveCharacterTextSplitter, FixedRecursiveCharacterTextSplitter
  24. from core.splitter.text_splitter import TextSplitter
  25. from extensions.ext_database import db
  26. from extensions.ext_redis import redis_client
  27. from extensions.ext_storage import storage
  28. from libs import helper
  29. from models.dataset import Dataset, DatasetProcessRule, DocumentSegment
  30. from models.dataset import Document as DatasetDocument
  31. from models.model import UploadFile
  32. from services.feature_service import FeatureService
  33. class IndexingRunner:
  34. def __init__(self):
  35. self.storage = storage
  36. self.model_manager = ModelManager()
  37. def run(self, dataset_documents: list[DatasetDocument]):
  38. """Run the indexing process."""
  39. for dataset_document in dataset_documents:
  40. try:
  41. # get dataset
  42. dataset = Dataset.query.filter_by(
  43. id=dataset_document.dataset_id
  44. ).first()
  45. if not dataset:
  46. raise ValueError("no dataset found")
  47. # get the process rule
  48. processing_rule = db.session.query(DatasetProcessRule). \
  49. filter(DatasetProcessRule.id == dataset_document.dataset_process_rule_id). \
  50. first()
  51. index_type = dataset_document.doc_form
  52. index_processor = IndexProcessorFactory(index_type).init_index_processor()
  53. # extract
  54. text_docs = self._extract(index_processor, dataset_document, processing_rule.to_dict())
  55. # transform
  56. documents = self._transform(index_processor, dataset, text_docs, processing_rule.to_dict())
  57. # save segment
  58. self._load_segments(dataset, dataset_document, documents)
  59. # load
  60. self._load(
  61. index_processor=index_processor,
  62. dataset=dataset,
  63. dataset_document=dataset_document,
  64. documents=documents
  65. )
  66. except DocumentIsPausedException:
  67. raise DocumentIsPausedException('Document paused, document id: {}'.format(dataset_document.id))
  68. except ProviderTokenNotInitError as e:
  69. dataset_document.indexing_status = 'error'
  70. dataset_document.error = str(e.description)
  71. dataset_document.stopped_at = datetime.datetime.utcnow()
  72. db.session.commit()
  73. except ObjectDeletedError:
  74. logging.warning('Document deleted, document id: {}'.format(dataset_document.id))
  75. except Exception as e:
  76. logging.exception("consume document failed")
  77. dataset_document.indexing_status = 'error'
  78. dataset_document.error = str(e)
  79. dataset_document.stopped_at = datetime.datetime.utcnow()
  80. db.session.commit()
  81. def run_in_splitting_status(self, dataset_document: DatasetDocument):
  82. """Run the indexing process when the index_status is splitting."""
  83. try:
  84. # get dataset
  85. dataset = Dataset.query.filter_by(
  86. id=dataset_document.dataset_id
  87. ).first()
  88. if not dataset:
  89. raise ValueError("no dataset found")
  90. # get exist document_segment list and delete
  91. document_segments = DocumentSegment.query.filter_by(
  92. dataset_id=dataset.id,
  93. document_id=dataset_document.id
  94. ).all()
  95. for document_segment in document_segments:
  96. db.session.delete(document_segment)
  97. db.session.commit()
  98. # get the process rule
  99. processing_rule = db.session.query(DatasetProcessRule). \
  100. filter(DatasetProcessRule.id == dataset_document.dataset_process_rule_id). \
  101. first()
  102. index_type = dataset_document.doc_form
  103. index_processor = IndexProcessorFactory(index_type).init_index_processor()
  104. # extract
  105. text_docs = self._extract(index_processor, dataset_document, processing_rule.to_dict())
  106. # transform
  107. documents = self._transform(index_processor, dataset, text_docs, processing_rule.to_dict())
  108. # save segment
  109. self._load_segments(dataset, dataset_document, documents)
  110. # load
  111. self._load(
  112. index_processor=index_processor,
  113. dataset=dataset,
  114. dataset_document=dataset_document,
  115. documents=documents
  116. )
  117. except DocumentIsPausedException:
  118. raise DocumentIsPausedException('Document paused, document id: {}'.format(dataset_document.id))
  119. except ProviderTokenNotInitError as e:
  120. dataset_document.indexing_status = 'error'
  121. dataset_document.error = str(e.description)
  122. dataset_document.stopped_at = datetime.datetime.utcnow()
  123. db.session.commit()
  124. except Exception as e:
  125. logging.exception("consume document failed")
  126. dataset_document.indexing_status = 'error'
  127. dataset_document.error = str(e)
  128. dataset_document.stopped_at = datetime.datetime.utcnow()
  129. db.session.commit()
  130. def run_in_indexing_status(self, dataset_document: DatasetDocument):
  131. """Run the indexing process when the index_status is indexing."""
  132. try:
  133. # get dataset
  134. dataset = Dataset.query.filter_by(
  135. id=dataset_document.dataset_id
  136. ).first()
  137. if not dataset:
  138. raise ValueError("no dataset found")
  139. # get exist document_segment list and delete
  140. document_segments = DocumentSegment.query.filter_by(
  141. dataset_id=dataset.id,
  142. document_id=dataset_document.id
  143. ).all()
  144. documents = []
  145. if document_segments:
  146. for document_segment in document_segments:
  147. # transform segment to node
  148. if document_segment.status != "completed":
  149. document = Document(
  150. page_content=document_segment.content,
  151. metadata={
  152. "doc_id": document_segment.index_node_id,
  153. "doc_hash": document_segment.index_node_hash,
  154. "document_id": document_segment.document_id,
  155. "dataset_id": document_segment.dataset_id,
  156. }
  157. )
  158. documents.append(document)
  159. # build index
  160. # get the process rule
  161. processing_rule = db.session.query(DatasetProcessRule). \
  162. filter(DatasetProcessRule.id == dataset_document.dataset_process_rule_id). \
  163. first()
  164. index_type = dataset_document.doc_form
  165. index_processor = IndexProcessorFactory(index_type, processing_rule.to_dict()).init_index_processor()
  166. self._load(
  167. index_processor=index_processor,
  168. dataset=dataset,
  169. dataset_document=dataset_document,
  170. documents=documents
  171. )
  172. except DocumentIsPausedException:
  173. raise DocumentIsPausedException('Document paused, document id: {}'.format(dataset_document.id))
  174. except ProviderTokenNotInitError as e:
  175. dataset_document.indexing_status = 'error'
  176. dataset_document.error = str(e.description)
  177. dataset_document.stopped_at = datetime.datetime.utcnow()
  178. db.session.commit()
  179. except Exception as e:
  180. logging.exception("consume document failed")
  181. dataset_document.indexing_status = 'error'
  182. dataset_document.error = str(e)
  183. dataset_document.stopped_at = datetime.datetime.utcnow()
  184. db.session.commit()
  185. def indexing_estimate(self, tenant_id: str, extract_settings: list[ExtractSetting], tmp_processing_rule: dict,
  186. doc_form: str = None, doc_language: str = 'English', dataset_id: str = None,
  187. indexing_technique: str = 'economy') -> dict:
  188. """
  189. Estimate the indexing for the document.
  190. """
  191. # check document limit
  192. features = FeatureService.get_features(tenant_id)
  193. if features.billing.enabled:
  194. count = len(extract_settings)
  195. batch_upload_limit = int(current_app.config['BATCH_UPLOAD_LIMIT'])
  196. if count > batch_upload_limit:
  197. raise ValueError(f"You have reached the batch upload limit of {batch_upload_limit}.")
  198. embedding_model_instance = None
  199. if dataset_id:
  200. dataset = Dataset.query.filter_by(
  201. id=dataset_id
  202. ).first()
  203. if not dataset:
  204. raise ValueError('Dataset not found.')
  205. if dataset.indexing_technique == 'high_quality' or indexing_technique == 'high_quality':
  206. if dataset.embedding_model_provider:
  207. embedding_model_instance = self.model_manager.get_model_instance(
  208. tenant_id=tenant_id,
  209. provider=dataset.embedding_model_provider,
  210. model_type=ModelType.TEXT_EMBEDDING,
  211. model=dataset.embedding_model
  212. )
  213. else:
  214. embedding_model_instance = self.model_manager.get_default_model_instance(
  215. tenant_id=tenant_id,
  216. model_type=ModelType.TEXT_EMBEDDING,
  217. )
  218. else:
  219. if indexing_technique == 'high_quality':
  220. embedding_model_instance = self.model_manager.get_default_model_instance(
  221. tenant_id=tenant_id,
  222. model_type=ModelType.TEXT_EMBEDDING,
  223. )
  224. tokens = 0
  225. preview_texts = []
  226. total_segments = 0
  227. total_price = 0
  228. currency = 'USD'
  229. index_type = doc_form
  230. index_processor = IndexProcessorFactory(index_type).init_index_processor()
  231. all_text_docs = []
  232. for extract_setting in extract_settings:
  233. # extract
  234. text_docs = index_processor.extract(extract_setting, process_rule_mode=tmp_processing_rule["mode"])
  235. all_text_docs.extend(text_docs)
  236. processing_rule = DatasetProcessRule(
  237. mode=tmp_processing_rule["mode"],
  238. rules=json.dumps(tmp_processing_rule["rules"])
  239. )
  240. # get splitter
  241. splitter = self._get_splitter(processing_rule, embedding_model_instance)
  242. # split to documents
  243. documents = self._split_to_documents_for_estimate(
  244. text_docs=text_docs,
  245. splitter=splitter,
  246. processing_rule=processing_rule
  247. )
  248. total_segments += len(documents)
  249. for document in documents:
  250. if len(preview_texts) < 5:
  251. preview_texts.append(document.page_content)
  252. if indexing_technique == 'high_quality' or embedding_model_instance:
  253. embedding_model_type_instance = embedding_model_instance.model_type_instance
  254. embedding_model_type_instance = cast(TextEmbeddingModel, embedding_model_type_instance)
  255. tokens += embedding_model_type_instance.get_num_tokens(
  256. model=embedding_model_instance.model,
  257. credentials=embedding_model_instance.credentials,
  258. texts=[self.filter_string(document.page_content)]
  259. )
  260. if doc_form and doc_form == 'qa_model':
  261. model_instance = self.model_manager.get_default_model_instance(
  262. tenant_id=tenant_id,
  263. model_type=ModelType.LLM
  264. )
  265. model_type_instance = model_instance.model_type_instance
  266. model_type_instance = cast(LargeLanguageModel, model_type_instance)
  267. if len(preview_texts) > 0:
  268. # qa model document
  269. response = LLMGenerator.generate_qa_document(current_user.current_tenant_id, preview_texts[0],
  270. doc_language)
  271. document_qa_list = self.format_split_text(response)
  272. price_info = model_type_instance.get_price(
  273. model=model_instance.model,
  274. credentials=model_instance.credentials,
  275. price_type=PriceType.INPUT,
  276. tokens=total_segments * 2000,
  277. )
  278. return {
  279. "total_segments": total_segments * 20,
  280. "tokens": total_segments * 2000,
  281. "total_price": '{:f}'.format(price_info.total_amount),
  282. "currency": price_info.currency,
  283. "qa_preview": document_qa_list,
  284. "preview": preview_texts
  285. }
  286. if embedding_model_instance:
  287. embedding_model_type_instance = cast(TextEmbeddingModel, embedding_model_instance.model_type_instance)
  288. embedding_price_info = embedding_model_type_instance.get_price(
  289. model=embedding_model_instance.model,
  290. credentials=embedding_model_instance.credentials,
  291. price_type=PriceType.INPUT,
  292. tokens=tokens
  293. )
  294. total_price = '{:f}'.format(embedding_price_info.total_amount)
  295. currency = embedding_price_info.currency
  296. return {
  297. "total_segments": total_segments,
  298. "tokens": tokens,
  299. "total_price": total_price,
  300. "currency": currency,
  301. "preview": preview_texts
  302. }
  303. def _extract(self, index_processor: BaseIndexProcessor, dataset_document: DatasetDocument, process_rule: dict) \
  304. -> list[Document]:
  305. # load file
  306. if dataset_document.data_source_type not in ["upload_file", "notion_import"]:
  307. return []
  308. data_source_info = dataset_document.data_source_info_dict
  309. text_docs = []
  310. if dataset_document.data_source_type == 'upload_file':
  311. if not data_source_info or 'upload_file_id' not in data_source_info:
  312. raise ValueError("no upload file found")
  313. file_detail = db.session.query(UploadFile). \
  314. filter(UploadFile.id == data_source_info['upload_file_id']). \
  315. one_or_none()
  316. if file_detail:
  317. extract_setting = ExtractSetting(
  318. datasource_type="upload_file",
  319. upload_file=file_detail,
  320. document_model=dataset_document.doc_form
  321. )
  322. text_docs = index_processor.extract(extract_setting, process_rule_mode=process_rule['mode'])
  323. elif dataset_document.data_source_type == 'notion_import':
  324. if (not data_source_info or 'notion_workspace_id' not in data_source_info
  325. or 'notion_page_id' not in data_source_info):
  326. raise ValueError("no notion import info found")
  327. extract_setting = ExtractSetting(
  328. datasource_type="notion_import",
  329. notion_info={
  330. "notion_workspace_id": data_source_info['notion_workspace_id'],
  331. "notion_obj_id": data_source_info['notion_page_id'],
  332. "notion_page_type": data_source_info['type'],
  333. "document": dataset_document
  334. },
  335. document_model=dataset_document.doc_form
  336. )
  337. text_docs = index_processor.extract(extract_setting, process_rule_mode=process_rule['mode'])
  338. # update document status to splitting
  339. self._update_document_index_status(
  340. document_id=dataset_document.id,
  341. after_indexing_status="splitting",
  342. extra_update_params={
  343. DatasetDocument.word_count: sum([len(text_doc.page_content) for text_doc in text_docs]),
  344. DatasetDocument.parsing_completed_at: datetime.datetime.utcnow()
  345. }
  346. )
  347. # replace doc id to document model id
  348. text_docs = cast(list[Document], text_docs)
  349. for text_doc in text_docs:
  350. text_doc.metadata['document_id'] = dataset_document.id
  351. text_doc.metadata['dataset_id'] = dataset_document.dataset_id
  352. return text_docs
  353. def filter_string(self, text):
  354. text = re.sub(r'<\|', '<', text)
  355. text = re.sub(r'\|>', '>', text)
  356. text = re.sub(r'[\x00-\x08\x0B\x0C\x0E-\x1F\x7F\xEF\xBF\xBE]', '', text)
  357. # Unicode U+FFFE
  358. text = re.sub('\uFFFE', '', text)
  359. return text
  360. def _get_splitter(self, processing_rule: DatasetProcessRule,
  361. embedding_model_instance: Optional[ModelInstance]) -> TextSplitter:
  362. """
  363. Get the NodeParser object according to the processing rule.
  364. """
  365. if processing_rule.mode == "custom":
  366. # The user-defined segmentation rule
  367. rules = json.loads(processing_rule.rules)
  368. segmentation = rules["segmentation"]
  369. if segmentation["max_tokens"] < 50 or segmentation["max_tokens"] > 1000:
  370. raise ValueError("Custom segment length should be between 50 and 1000.")
  371. separator = segmentation["separator"]
  372. if separator:
  373. separator = separator.replace('\\n', '\n')
  374. character_splitter = FixedRecursiveCharacterTextSplitter.from_encoder(
  375. chunk_size=segmentation["max_tokens"],
  376. chunk_overlap=segmentation.get('chunk_overlap', 0),
  377. fixed_separator=separator,
  378. separators=["\n\n", "。", ".", " ", ""],
  379. embedding_model_instance=embedding_model_instance
  380. )
  381. else:
  382. # Automatic segmentation
  383. character_splitter = EnhanceRecursiveCharacterTextSplitter.from_encoder(
  384. chunk_size=DatasetProcessRule.AUTOMATIC_RULES['segmentation']['max_tokens'],
  385. chunk_overlap=DatasetProcessRule.AUTOMATIC_RULES['segmentation']['chunk_overlap'],
  386. separators=["\n\n", "。", ".", " ", ""],
  387. embedding_model_instance=embedding_model_instance
  388. )
  389. return character_splitter
  390. def _step_split(self, text_docs: list[Document], splitter: TextSplitter,
  391. dataset: Dataset, dataset_document: DatasetDocument, processing_rule: DatasetProcessRule) \
  392. -> list[Document]:
  393. """
  394. Split the text documents into documents and save them to the document segment.
  395. """
  396. documents = self._split_to_documents(
  397. text_docs=text_docs,
  398. splitter=splitter,
  399. processing_rule=processing_rule,
  400. tenant_id=dataset.tenant_id,
  401. document_form=dataset_document.doc_form,
  402. document_language=dataset_document.doc_language
  403. )
  404. # save node to document segment
  405. doc_store = DatasetDocumentStore(
  406. dataset=dataset,
  407. user_id=dataset_document.created_by,
  408. document_id=dataset_document.id
  409. )
  410. # add document segments
  411. doc_store.add_documents(documents)
  412. # update document status to indexing
  413. cur_time = datetime.datetime.utcnow()
  414. self._update_document_index_status(
  415. document_id=dataset_document.id,
  416. after_indexing_status="indexing",
  417. extra_update_params={
  418. DatasetDocument.cleaning_completed_at: cur_time,
  419. DatasetDocument.splitting_completed_at: cur_time,
  420. }
  421. )
  422. # update segment status to indexing
  423. self._update_segments_by_document(
  424. dataset_document_id=dataset_document.id,
  425. update_params={
  426. DocumentSegment.status: "indexing",
  427. DocumentSegment.indexing_at: datetime.datetime.utcnow()
  428. }
  429. )
  430. return documents
  431. def _split_to_documents(self, text_docs: list[Document], splitter: TextSplitter,
  432. processing_rule: DatasetProcessRule, tenant_id: str,
  433. document_form: str, document_language: str) -> list[Document]:
  434. """
  435. Split the text documents into nodes.
  436. """
  437. all_documents = []
  438. all_qa_documents = []
  439. for text_doc in text_docs:
  440. # document clean
  441. document_text = self._document_clean(text_doc.page_content, processing_rule)
  442. text_doc.page_content = document_text
  443. # parse document to nodes
  444. documents = splitter.split_documents([text_doc])
  445. split_documents = []
  446. for document_node in documents:
  447. if document_node.page_content.strip():
  448. doc_id = str(uuid.uuid4())
  449. hash = helper.generate_text_hash(document_node.page_content)
  450. document_node.metadata['doc_id'] = doc_id
  451. document_node.metadata['doc_hash'] = hash
  452. # delete Spliter character
  453. page_content = document_node.page_content
  454. if page_content.startswith(".") or page_content.startswith("。"):
  455. page_content = page_content[1:]
  456. else:
  457. page_content = page_content
  458. document_node.page_content = page_content
  459. if document_node.page_content:
  460. split_documents.append(document_node)
  461. all_documents.extend(split_documents)
  462. # processing qa document
  463. if document_form == 'qa_model':
  464. for i in range(0, len(all_documents), 10):
  465. threads = []
  466. sub_documents = all_documents[i:i + 10]
  467. for doc in sub_documents:
  468. document_format_thread = threading.Thread(target=self.format_qa_document, kwargs={
  469. 'flask_app': current_app._get_current_object(),
  470. 'tenant_id': tenant_id, 'document_node': doc, 'all_qa_documents': all_qa_documents,
  471. 'document_language': document_language})
  472. threads.append(document_format_thread)
  473. document_format_thread.start()
  474. for thread in threads:
  475. thread.join()
  476. return all_qa_documents
  477. return all_documents
  478. def format_qa_document(self, flask_app: Flask, tenant_id: str, document_node, all_qa_documents, document_language):
  479. format_documents = []
  480. if document_node.page_content is None or not document_node.page_content.strip():
  481. return
  482. with flask_app.app_context():
  483. try:
  484. # qa model document
  485. response = LLMGenerator.generate_qa_document(tenant_id, document_node.page_content, document_language)
  486. document_qa_list = self.format_split_text(response)
  487. qa_documents = []
  488. for result in document_qa_list:
  489. qa_document = Document(page_content=result['question'], metadata=document_node.metadata.copy())
  490. doc_id = str(uuid.uuid4())
  491. hash = helper.generate_text_hash(result['question'])
  492. qa_document.metadata['answer'] = result['answer']
  493. qa_document.metadata['doc_id'] = doc_id
  494. qa_document.metadata['doc_hash'] = hash
  495. qa_documents.append(qa_document)
  496. format_documents.extend(qa_documents)
  497. except Exception as e:
  498. logging.exception(e)
  499. all_qa_documents.extend(format_documents)
  500. def _split_to_documents_for_estimate(self, text_docs: list[Document], splitter: TextSplitter,
  501. processing_rule: DatasetProcessRule) -> list[Document]:
  502. """
  503. Split the text documents into nodes.
  504. """
  505. all_documents = []
  506. for text_doc in text_docs:
  507. # document clean
  508. document_text = self._document_clean(text_doc.page_content, processing_rule)
  509. text_doc.page_content = document_text
  510. # parse document to nodes
  511. documents = splitter.split_documents([text_doc])
  512. split_documents = []
  513. for document in documents:
  514. if document.page_content is None or not document.page_content.strip():
  515. continue
  516. doc_id = str(uuid.uuid4())
  517. hash = helper.generate_text_hash(document.page_content)
  518. document.metadata['doc_id'] = doc_id
  519. document.metadata['doc_hash'] = hash
  520. split_documents.append(document)
  521. all_documents.extend(split_documents)
  522. return all_documents
  523. def _document_clean(self, text: str, processing_rule: DatasetProcessRule) -> str:
  524. """
  525. Clean the document text according to the processing rules.
  526. """
  527. if processing_rule.mode == "automatic":
  528. rules = DatasetProcessRule.AUTOMATIC_RULES
  529. else:
  530. rules = json.loads(processing_rule.rules) if processing_rule.rules else {}
  531. if 'pre_processing_rules' in rules:
  532. pre_processing_rules = rules["pre_processing_rules"]
  533. for pre_processing_rule in pre_processing_rules:
  534. if pre_processing_rule["id"] == "remove_extra_spaces" and pre_processing_rule["enabled"] is True:
  535. # Remove extra spaces
  536. pattern = r'\n{3,}'
  537. text = re.sub(pattern, '\n\n', text)
  538. pattern = r'[\t\f\r\x20\u00a0\u1680\u180e\u2000-\u200a\u202f\u205f\u3000]{2,}'
  539. text = re.sub(pattern, ' ', text)
  540. elif pre_processing_rule["id"] == "remove_urls_emails" and pre_processing_rule["enabled"] is True:
  541. # Remove email
  542. pattern = r'([a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\.[a-zA-Z0-9-.]+)'
  543. text = re.sub(pattern, '', text)
  544. # Remove URL
  545. pattern = r'https?://[^\s]+'
  546. text = re.sub(pattern, '', text)
  547. return text
  548. def format_split_text(self, text):
  549. regex = r"Q\d+:\s*(.*?)\s*A\d+:\s*([\s\S]*?)(?=Q\d+:|$)"
  550. matches = re.findall(regex, text, re.UNICODE)
  551. return [
  552. {
  553. "question": q,
  554. "answer": re.sub(r"\n\s*", "\n", a.strip())
  555. }
  556. for q, a in matches if q and a
  557. ]
  558. def _load(self, index_processor: BaseIndexProcessor, dataset: Dataset,
  559. dataset_document: DatasetDocument, documents: list[Document]) -> None:
  560. """
  561. insert index and update document/segment status to completed
  562. """
  563. embedding_model_instance = None
  564. if dataset.indexing_technique == 'high_quality':
  565. embedding_model_instance = self.model_manager.get_model_instance(
  566. tenant_id=dataset.tenant_id,
  567. provider=dataset.embedding_model_provider,
  568. model_type=ModelType.TEXT_EMBEDDING,
  569. model=dataset.embedding_model
  570. )
  571. # chunk nodes by chunk size
  572. indexing_start_at = time.perf_counter()
  573. tokens = 0
  574. chunk_size = 100
  575. embedding_model_type_instance = None
  576. if embedding_model_instance:
  577. embedding_model_type_instance = embedding_model_instance.model_type_instance
  578. embedding_model_type_instance = cast(TextEmbeddingModel, embedding_model_type_instance)
  579. for i in range(0, len(documents), chunk_size):
  580. # check document is paused
  581. self._check_document_paused_status(dataset_document.id)
  582. chunk_documents = documents[i:i + chunk_size]
  583. if dataset.indexing_technique == 'high_quality' or embedding_model_type_instance:
  584. tokens += sum(
  585. embedding_model_type_instance.get_num_tokens(
  586. embedding_model_instance.model,
  587. embedding_model_instance.credentials,
  588. [document.page_content]
  589. )
  590. for document in chunk_documents
  591. )
  592. # load index
  593. index_processor.load(dataset, chunk_documents)
  594. document_ids = [document.metadata['doc_id'] for document in chunk_documents]
  595. db.session.query(DocumentSegment).filter(
  596. DocumentSegment.document_id == dataset_document.id,
  597. DocumentSegment.index_node_id.in_(document_ids),
  598. DocumentSegment.status == "indexing"
  599. ).update({
  600. DocumentSegment.status: "completed",
  601. DocumentSegment.enabled: True,
  602. DocumentSegment.completed_at: datetime.datetime.utcnow()
  603. })
  604. db.session.commit()
  605. indexing_end_at = time.perf_counter()
  606. # update document status to completed
  607. self._update_document_index_status(
  608. document_id=dataset_document.id,
  609. after_indexing_status="completed",
  610. extra_update_params={
  611. DatasetDocument.tokens: tokens,
  612. DatasetDocument.completed_at: datetime.datetime.utcnow(),
  613. DatasetDocument.indexing_latency: indexing_end_at - indexing_start_at,
  614. }
  615. )
  616. def _check_document_paused_status(self, document_id: str):
  617. indexing_cache_key = 'document_{}_is_paused'.format(document_id)
  618. result = redis_client.get(indexing_cache_key)
  619. if result:
  620. raise DocumentIsPausedException()
  621. def _update_document_index_status(self, document_id: str, after_indexing_status: str,
  622. extra_update_params: Optional[dict] = None) -> None:
  623. """
  624. Update the document indexing status.
  625. """
  626. count = DatasetDocument.query.filter_by(id=document_id, is_paused=True).count()
  627. if count > 0:
  628. raise DocumentIsPausedException()
  629. document = DatasetDocument.query.filter_by(id=document_id).first()
  630. if not document:
  631. raise DocumentIsDeletedPausedException()
  632. update_params = {
  633. DatasetDocument.indexing_status: after_indexing_status
  634. }
  635. if extra_update_params:
  636. update_params.update(extra_update_params)
  637. DatasetDocument.query.filter_by(id=document_id).update(update_params)
  638. db.session.commit()
  639. def _update_segments_by_document(self, dataset_document_id: str, update_params: dict) -> None:
  640. """
  641. Update the document segment by document id.
  642. """
  643. DocumentSegment.query.filter_by(document_id=dataset_document_id).update(update_params)
  644. db.session.commit()
  645. def batch_add_segments(self, segments: list[DocumentSegment], dataset: Dataset):
  646. """
  647. Batch add segments index processing
  648. """
  649. documents = []
  650. for segment in segments:
  651. document = Document(
  652. page_content=segment.content,
  653. metadata={
  654. "doc_id": segment.index_node_id,
  655. "doc_hash": segment.index_node_hash,
  656. "document_id": segment.document_id,
  657. "dataset_id": segment.dataset_id,
  658. }
  659. )
  660. documents.append(document)
  661. # save vector index
  662. index_type = dataset.doc_form
  663. index_processor = IndexProcessorFactory(index_type).init_index_processor()
  664. index_processor.load(dataset, documents)
  665. def _transform(self, index_processor: BaseIndexProcessor, dataset: Dataset,
  666. text_docs: list[Document], process_rule: dict) -> list[Document]:
  667. # get embedding model instance
  668. embedding_model_instance = None
  669. if dataset.indexing_technique == 'high_quality':
  670. if dataset.embedding_model_provider:
  671. embedding_model_instance = self.model_manager.get_model_instance(
  672. tenant_id=dataset.tenant_id,
  673. provider=dataset.embedding_model_provider,
  674. model_type=ModelType.TEXT_EMBEDDING,
  675. model=dataset.embedding_model
  676. )
  677. else:
  678. embedding_model_instance = self.model_manager.get_default_model_instance(
  679. tenant_id=dataset.tenant_id,
  680. model_type=ModelType.TEXT_EMBEDDING,
  681. )
  682. documents = index_processor.transform(text_docs, embedding_model_instance=embedding_model_instance,
  683. process_rule=process_rule)
  684. return documents
  685. def _load_segments(self, dataset, dataset_document, documents):
  686. # save node to document segment
  687. doc_store = DatasetDocumentStore(
  688. dataset=dataset,
  689. user_id=dataset_document.created_by,
  690. document_id=dataset_document.id
  691. )
  692. # add document segments
  693. doc_store.add_documents(documents)
  694. # update document status to indexing
  695. cur_time = datetime.datetime.utcnow()
  696. self._update_document_index_status(
  697. document_id=dataset_document.id,
  698. after_indexing_status="indexing",
  699. extra_update_params={
  700. DatasetDocument.cleaning_completed_at: cur_time,
  701. DatasetDocument.splitting_completed_at: cur_time,
  702. }
  703. )
  704. # update segment status to indexing
  705. self._update_segments_by_document(
  706. dataset_document_id=dataset_document.id,
  707. update_params={
  708. DocumentSegment.status: "indexing",
  709. DocumentSegment.indexing_at: datetime.datetime.utcnow()
  710. }
  711. )
  712. pass
  713. class DocumentIsPausedException(Exception):
  714. pass
  715. class DocumentIsDeletedPausedException(Exception):
  716. pass