| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186 | import mathfrom collections import Counterfrom typing import Optionalimport numpy as npfrom core.model_manager import ModelManagerfrom core.model_runtime.entities.model_entities import ModelTypefrom core.rag.datasource.keyword.jieba.jieba_keyword_table_handler import JiebaKeywordTableHandlerfrom core.rag.embedding.cached_embedding import CacheEmbeddingfrom core.rag.models.document import Documentfrom core.rag.rerank.entity.weight import VectorSetting, Weightsfrom core.rag.rerank.rerank_base import BaseRerankRunnerclass WeightRerankRunner(BaseRerankRunner):    def __init__(self, tenant_id: str, weights: Weights) -> None:        self.tenant_id = tenant_id        self.weights = weights    def run(        self,        query: str,        documents: list[Document],        score_threshold: Optional[float] = None,        top_n: Optional[int] = None,        user: Optional[str] = None,    ) -> list[Document]:        """        Run rerank model        :param query: search query        :param documents: documents for reranking        :param score_threshold: score threshold        :param top_n: top n        :param user: unique user id if needed        :return:        """        docs = []        doc_id = []        unique_documents = []        for document in documents:            if document.metadata["doc_id"] not in doc_id:                doc_id.append(document.metadata["doc_id"])                docs.append(document.page_content)                unique_documents.append(document)        documents = unique_documents        rerank_documents = []        query_scores = self._calculate_keyword_score(query, documents)        query_vector_scores = self._calculate_cosine(self.tenant_id, query, documents, self.weights.vector_setting)        for document, query_score, query_vector_score in zip(documents, query_scores, query_vector_scores):            # format document            score = (                self.weights.vector_setting.vector_weight * query_vector_score                + self.weights.keyword_setting.keyword_weight * query_score            )            if score_threshold and score < score_threshold:                continue            document.metadata["score"] = score            rerank_documents.append(document)        rerank_documents = sorted(rerank_documents, key=lambda x: x.metadata["score"], reverse=True)        return rerank_documents[:top_n] if top_n else rerank_documents    def _calculate_keyword_score(self, query: str, documents: list[Document]) -> list[float]:        """        Calculate BM25 scores        :param query: search query        :param documents: documents for reranking        :return:        """        keyword_table_handler = JiebaKeywordTableHandler()        query_keywords = keyword_table_handler.extract_keywords(query, None)        documents_keywords = []        for document in documents:            # get the document keywords            document_keywords = keyword_table_handler.extract_keywords(document.page_content, None)            document.metadata["keywords"] = document_keywords            documents_keywords.append(document_keywords)        # Counter query keywords(TF)        query_keyword_counts = Counter(query_keywords)        # total documents        total_documents = len(documents)        # calculate all documents' keywords IDF        all_keywords = set()        for document_keywords in documents_keywords:            all_keywords.update(document_keywords)        keyword_idf = {}        for keyword in all_keywords:            # calculate include query keywords' documents            doc_count_containing_keyword = sum(1 for doc_keywords in documents_keywords if keyword in doc_keywords)            # IDF            keyword_idf[keyword] = math.log((1 + total_documents) / (1 + doc_count_containing_keyword)) + 1        query_tfidf = {}        for keyword, count in query_keyword_counts.items():            tf = count            idf = keyword_idf.get(keyword, 0)            query_tfidf[keyword] = tf * idf        # calculate all documents' TF-IDF        documents_tfidf = []        for document_keywords in documents_keywords:            document_keyword_counts = Counter(document_keywords)            document_tfidf = {}            for keyword, count in document_keyword_counts.items():                tf = count                idf = keyword_idf.get(keyword, 0)                document_tfidf[keyword] = tf * idf            documents_tfidf.append(document_tfidf)        def cosine_similarity(vec1, vec2):            intersection = set(vec1.keys()) & set(vec2.keys())            numerator = sum(vec1[x] * vec2[x] for x in intersection)            sum1 = sum(vec1[x] ** 2 for x in vec1)            sum2 = sum(vec2[x] ** 2 for x in vec2)            denominator = math.sqrt(sum1) * math.sqrt(sum2)            if not denominator:                return 0.0            else:                return float(numerator) / denominator        similarities = []        for document_tfidf in documents_tfidf:            similarity = cosine_similarity(query_tfidf, document_tfidf)            similarities.append(similarity)        # for idx, similarity in enumerate(similarities):        #     print(f"Document {idx + 1} similarity: {similarity}")        return similarities    def _calculate_cosine(        self, tenant_id: str, query: str, documents: list[Document], vector_setting: VectorSetting    ) -> list[float]:        """        Calculate Cosine scores        :param query: search query        :param documents: documents for reranking        :return:        """        query_vector_scores = []        model_manager = ModelManager()        embedding_model = model_manager.get_model_instance(            tenant_id=tenant_id,            provider=vector_setting.embedding_provider_name,            model_type=ModelType.TEXT_EMBEDDING,            model=vector_setting.embedding_model_name,        )        cache_embedding = CacheEmbedding(embedding_model)        query_vector = cache_embedding.embed_query(query)        for document in documents:            # calculate cosine similarity            if "score" in document.metadata:                query_vector_scores.append(document.metadata["score"])            else:                # transform to NumPy                vec1 = np.array(query_vector)                vec2 = np.array(document.vector)                # calculate dot product                dot_product = np.dot(vec1, vec2)                # calculate norm                norm_vec1 = np.linalg.norm(vec1)                norm_vec2 = np.linalg.norm(vec2)                # calculate cosine similarity                cosine_sim = dot_product / (norm_vec1 * norm_vec2)                query_vector_scores.append(cosine_sim)        return query_vector_scores
 |