123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179 |
- import math
- from collections import Counter
- from typing import Optional
- import numpy as np
- from core.embedding.cached_embedding import CacheEmbedding
- from core.model_manager import ModelManager
- from core.model_runtime.entities.model_entities import ModelType
- from core.rag.datasource.keyword.jieba.jieba_keyword_table_handler import JiebaKeywordTableHandler
- from core.rag.models.document import Document
- from core.rag.rerank.entity.weight import VectorSetting, Weights
- class WeightRerankRunner:
- def __init__(self, tenant_id: str, weights: Weights) -> None:
- self.tenant_id = tenant_id
- self.weights = weights
- def run(self, query: str, documents: list[Document], score_threshold: Optional[float] = None,
- top_n: Optional[int] = None, user: Optional[str] = None) -> list[Document]:
- """
- Run rerank model
- :param query: search query
- :param documents: documents for reranking
- :param score_threshold: score threshold
- :param top_n: top n
- :param user: unique user id if needed
- :return:
- """
- docs = []
- doc_id = []
- unique_documents = []
- for document in documents:
- if document.metadata['doc_id'] not in doc_id:
- doc_id.append(document.metadata['doc_id'])
- docs.append(document.page_content)
- unique_documents.append(document)
- documents = unique_documents
- rerank_documents = []
- query_scores = self._calculate_keyword_score(query, documents)
- query_vector_scores = self._calculate_cosine(self.tenant_id, query, documents, self.weights.vector_setting)
- for document, query_score, query_vector_score in zip(documents, query_scores, query_vector_scores):
- # format document
- score = self.weights.vector_setting.vector_weight * query_vector_score + \
- self.weights.keyword_setting.keyword_weight * query_score
- if score_threshold and score < score_threshold:
- continue
- document.metadata['score'] = score
- rerank_documents.append(document)
- rerank_documents = sorted(rerank_documents, key=lambda x: x.metadata['score'], reverse=True)
- return rerank_documents[:top_n] if top_n else rerank_documents
- def _calculate_keyword_score(self, query: str, documents: list[Document]) -> list[float]:
- """
- Calculate BM25 scores
- :param query: search query
- :param documents: documents for reranking
- :return:
- """
- keyword_table_handler = JiebaKeywordTableHandler()
- query_keywords = keyword_table_handler.extract_keywords(query, None)
- documents_keywords = []
- for document in documents:
- # get the document keywords
- document_keywords = keyword_table_handler.extract_keywords(document.page_content, None)
- document.metadata['keywords'] = document_keywords
- documents_keywords.append(document_keywords)
- # Counter query keywords(TF)
- query_keyword_counts = Counter(query_keywords)
- # total documents
- total_documents = len(documents)
- # calculate all documents' keywords IDF
- all_keywords = set()
- for document_keywords in documents_keywords:
- all_keywords.update(document_keywords)
- keyword_idf = {}
- for keyword in all_keywords:
- # calculate include query keywords' documents
- doc_count_containing_keyword = sum(1 for doc_keywords in documents_keywords if keyword in doc_keywords)
- # IDF
- keyword_idf[keyword] = math.log((1 + total_documents) / (1 + doc_count_containing_keyword)) + 1
- query_tfidf = {}
- for keyword, count in query_keyword_counts.items():
- tf = count
- idf = keyword_idf.get(keyword, 0)
- query_tfidf[keyword] = tf * idf
- # calculate all documents' TF-IDF
- documents_tfidf = []
- for document_keywords in documents_keywords:
- document_keyword_counts = Counter(document_keywords)
- document_tfidf = {}
- for keyword, count in document_keyword_counts.items():
- tf = count
- idf = keyword_idf.get(keyword, 0)
- document_tfidf[keyword] = tf * idf
- documents_tfidf.append(document_tfidf)
- def cosine_similarity(vec1, vec2):
- intersection = set(vec1.keys()) & set(vec2.keys())
- numerator = sum(vec1[x] * vec2[x] for x in intersection)
- sum1 = sum(vec1[x] ** 2 for x in vec1.keys())
- sum2 = sum(vec2[x] ** 2 for x in vec2.keys())
- denominator = math.sqrt(sum1) * math.sqrt(sum2)
- if not denominator:
- return 0.0
- else:
- return float(numerator) / denominator
- similarities = []
- for document_tfidf in documents_tfidf:
- similarity = cosine_similarity(query_tfidf, document_tfidf)
- similarities.append(similarity)
- # for idx, similarity in enumerate(similarities):
- # print(f"Document {idx + 1} similarity: {similarity}")
- return similarities
- def _calculate_cosine(self, tenant_id: str, query: str, documents: list[Document],
- vector_setting: VectorSetting) -> list[float]:
- """
- Calculate Cosine scores
- :param query: search query
- :param documents: documents for reranking
- :return:
- """
- query_vector_scores = []
- model_manager = ModelManager()
- embedding_model = model_manager.get_model_instance(
- tenant_id=tenant_id,
- provider=vector_setting.embedding_provider_name,
- model_type=ModelType.TEXT_EMBEDDING,
- model=vector_setting.embedding_model_name
- )
- cache_embedding = CacheEmbedding(embedding_model)
- query_vector = cache_embedding.embed_query(query)
- for document in documents:
- # calculate cosine similarity
- if 'score' in document.metadata:
- query_vector_scores.append(document.metadata['score'])
- else:
- content_vector = document.metadata['vector']
- # transform to NumPy
- vec1 = np.array(query_vector)
- vec2 = np.array(document.metadata['vector'])
- # calculate dot product
- dot_product = np.dot(vec1, vec2)
- # calculate norm
- norm_vec1 = np.linalg.norm(vec1)
- norm_vec2 = np.linalg.norm(vec2)
- # calculate cosine similarity
- cosine_sim = dot_product / (norm_vec1 * norm_vec2)
- query_vector_scores.append(cosine_sim)
- return query_vector_scores
|