瀏覽代碼

Feat/improve document delete logic (#1325)

Co-authored-by: jyong <jyong@dify.ai>
Jyong 1 年之前
父節點
當前提交
289c93d081
共有 4 個文件被更改,包括 34 次插入26 次删除
  1. 14 3
      api/core/indexing_runner.py
  2. 0 3
      api/services/dataset_service.py
  3. 15 13
      api/tasks/clean_document_task.py
  4. 5 7
      api/tasks/document_indexing_task.py

+ 14 - 3
api/core/indexing_runner.py

@@ -11,6 +11,7 @@ from flask import current_app, Flask
 from flask_login import current_user
 from langchain.schema import Document
 from langchain.text_splitter import RecursiveCharacterTextSplitter, TextSplitter
+from sqlalchemy.orm.exc import ObjectDeletedError
 
 from core.data_loader.file_extractor import FileExtractor
 from core.data_loader.loader.notion import NotionLoader
@@ -79,6 +80,8 @@ class IndexingRunner:
                 dataset_document.error = str(e.description)
                 dataset_document.stopped_at = datetime.datetime.utcnow()
                 db.session.commit()
+            except ObjectDeletedError:
+                logging.warning('Document deleted, document id: {}'.format(dataset_document.id))
             except Exception as e:
                 logging.exception("consume document failed")
                 dataset_document.indexing_status = 'error'
@@ -276,7 +279,8 @@ class IndexingRunner:
             )
             if len(preview_texts) > 0:
                 # qa model document
-                response = LLMGenerator.generate_qa_document(current_user.current_tenant_id, preview_texts[0], doc_language)
+                response = LLMGenerator.generate_qa_document(current_user.current_tenant_id, preview_texts[0],
+                                                             doc_language)
                 document_qa_list = self.format_split_text(response)
                 return {
                     "total_segments": total_segments * 20,
@@ -372,7 +376,8 @@ class IndexingRunner:
             )
             if len(preview_texts) > 0:
                 # qa model document
-                response = LLMGenerator.generate_qa_document(current_user.current_tenant_id, preview_texts[0], doc_language)
+                response = LLMGenerator.generate_qa_document(current_user.current_tenant_id, preview_texts[0],
+                                                             doc_language)
                 document_qa_list = self.format_split_text(response)
                 return {
                     "total_segments": total_segments * 20,
@@ -582,7 +587,6 @@ class IndexingRunner:
 
             all_qa_documents.extend(format_documents)
 
-
     def _split_to_documents_for_estimate(self, text_docs: List[Document], splitter: TextSplitter,
                                          processing_rule: DatasetProcessRule) -> List[Document]:
         """
@@ -734,6 +738,9 @@ class IndexingRunner:
         count = DatasetDocument.query.filter_by(id=document_id, is_paused=True).count()
         if count > 0:
             raise DocumentIsPausedException()
+        document = DatasetDocument.query.filter_by(id=document_id).first()
+        if not document:
+            raise DocumentIsDeletedPausedException()
 
         update_params = {
             DatasetDocument.indexing_status: after_indexing_status
@@ -781,3 +788,7 @@ class IndexingRunner:
 
 class DocumentIsPausedException(Exception):
     pass
+
+
+class DocumentIsDeletedPausedException(Exception):
+    pass

+ 0 - 3
api/services/dataset_service.py

@@ -385,9 +385,6 @@ class DocumentService:
 
     @staticmethod
     def delete_document(document):
-        if document.indexing_status in ["parsing", "cleaning", "splitting", "indexing"]:
-            raise DocumentIndexingError()
-
         # trigger document_was_deleted signal
         document_was_deleted.send(document.id, dataset_id=document.dataset_id)
 

+ 15 - 13
api/tasks/clean_document_task.py

@@ -31,22 +31,24 @@ def clean_document_task(document_id: str, dataset_id: str):
         kw_index = IndexBuilder.get_index(dataset, 'economy')
 
         segments = db.session.query(DocumentSegment).filter(DocumentSegment.document_id == document_id).all()
-        index_node_ids = [segment.index_node_id for segment in segments]
+        # check segment is exist
+        if segments:
+            index_node_ids = [segment.index_node_id for segment in segments]
 
-        # delete from vector index
-        if vector_index:
-            vector_index.delete_by_document_id(document_id)
+            # delete from vector index
+            if vector_index:
+                vector_index.delete_by_document_id(document_id)
 
-        # delete from keyword index
-        if index_node_ids:
-            kw_index.delete_by_ids(index_node_ids)
+            # delete from keyword index
+            if index_node_ids:
+                kw_index.delete_by_ids(index_node_ids)
 
-        for segment in segments:
-            db.session.delete(segment)
+            for segment in segments:
+                db.session.delete(segment)
 
-        db.session.commit()
-        end_at = time.perf_counter()
-        logging.info(
-            click.style('Cleaned document when document deleted: {} latency: {}'.format(document_id, end_at - start_at), fg='green'))
+            db.session.commit()
+            end_at = time.perf_counter()
+            logging.info(
+                click.style('Cleaned document when document deleted: {} latency: {}'.format(document_id, end_at - start_at), fg='green'))
     except Exception:
         logging.exception("Cleaned document when document deleted failed")

+ 5 - 7
api/tasks/document_indexing_task.py

@@ -30,13 +30,11 @@ def document_indexing_task(dataset_id: str, document_ids: list):
             Document.dataset_id == dataset_id
         ).first()
 
-        if not document:
-            raise NotFound('Document not found')
-
-        document.indexing_status = 'parsing'
-        document.processing_started_at = datetime.datetime.utcnow()
-        documents.append(document)
-        db.session.add(document)
+        if document:
+            document.indexing_status = 'parsing'
+            document.processing_started_at = datetime.datetime.utcnow()
+            documents.append(document)
+            db.session.add(document)
     db.session.commit()
 
     try: