| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443 | 
							- import hashlib
 
- import json
 
- import os
 
- import re
 
- import site
 
- import subprocess
 
- import tempfile
 
- import unicodedata
 
- from contextlib import contextmanager
 
- from typing import Any, Type
 
- import requests
 
- from bs4 import BeautifulSoup, CData, Comment, NavigableString
 
- from core.chain.llm_chain import LLMChain
 
- from core.data_loader import file_extractor
 
- from core.data_loader.file_extractor import FileExtractor
 
- from core.entities.application_entities import ModelConfigEntity
 
- from langchain.chains import RefineDocumentsChain
 
- from langchain.chains.summarize import refine_prompts
 
- from langchain.schema import Document
 
- from langchain.text_splitter import RecursiveCharacterTextSplitter
 
- from langchain.tools.base import BaseTool
 
- from newspaper import Article
 
- from pydantic import BaseModel, Field
 
- from regex import regex
 
- FULL_TEMPLATE = """
 
- TITLE: {title}
 
- AUTHORS: {authors}
 
- PUBLISH DATE: {publish_date}
 
- TOP_IMAGE_URL: {top_image}
 
- TEXT:
 
- {text}
 
- """
 
- class WebReaderToolInput(BaseModel):
 
-     url: str = Field(..., description="URL of the website to read")
 
-     summary: bool = Field(
 
-         default=False,
 
-         description="When the user's question requires extracting the summarizing content of the webpage, "
 
-                     "set it to true."
 
-     )
 
-     cursor: int = Field(
 
-         default=0,
 
-         description="Start reading from this character."
 
-         "Use when the first response was truncated"
 
-         "and you want to continue reading the page."
 
-         "The value cannot exceed 24000.",
 
-     )
 
- class WebReaderTool(BaseTool):
 
-     """Reader tool for getting website title and contents. Gives more control than SimpleReaderTool."""
 
-     name: str = "web_reader"
 
-     args_schema: Type[BaseModel] = WebReaderToolInput
 
-     description: str = "use this to read a website. " \
 
-                        "If you can answer the question based on the information provided, " \
 
-                        "there is no need to use."
 
-     page_contents: str = None
 
-     url: str = None
 
-     max_chunk_length: int = 4000
 
-     summary_chunk_tokens: int = 4000
 
-     summary_chunk_overlap: int = 0
 
-     summary_separators: list[str] = ["\n\n", "。", ".", " ", ""]
 
-     continue_reading: bool = True
 
-     model_config: ModelConfigEntity
 
-     model_parameters: dict[str, Any]
 
-     def _run(self, url: str, summary: bool = False, cursor: int = 0) -> str:
 
-         try:
 
-             if not self.page_contents or self.url != url:
 
-                 page_contents = get_url(url)
 
-                 self.page_contents = page_contents
 
-                 self.url = url
 
-             else:
 
-                 page_contents = self.page_contents
 
-         except Exception as e:
 
-             return f'Read this website failed, caused by: {str(e)}.'
 
-         if summary:
 
-             character_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(
 
-                 chunk_size=self.summary_chunk_tokens,
 
-                 chunk_overlap=self.summary_chunk_overlap,
 
-                 separators=self.summary_separators
 
-             )
 
-             texts = character_splitter.split_text(page_contents)
 
-             docs = [Document(page_content=t) for t in texts]
 
-             if len(docs) == 0 or docs[0].page_content.endswith('TEXT:'):
 
-                 return "No content found."
 
-             # only use first 5 docs
 
-             if len(docs) > 5:
 
-                 docs = docs[:5]
 
-             chain = self.get_summary_chain()
 
-             try:
 
-                 page_contents = chain.run(docs)
 
-             except Exception as e:
 
-                 return f'Read this website failed, caused by: {str(e)}.'
 
-         else:
 
-             page_contents = page_result(page_contents, cursor, self.max_chunk_length)
 
-             if self.continue_reading and len(page_contents) >= self.max_chunk_length:
 
-                 page_contents += f"\nPAGE WAS TRUNCATED. IF YOU FIND INFORMATION THAT CAN ANSWER QUESTION " \
 
-                                  f"THEN DIRECT ANSWER AND STOP INVOKING web_reader TOOL, OTHERWISE USE " \
 
-                                  f"CURSOR={cursor+len(page_contents)} TO CONTINUE READING."
 
-         return page_contents
 
-     async def _arun(self, url: str) -> str:
 
-         raise NotImplementedError
 
-     def get_summary_chain(self) -> RefineDocumentsChain:
 
-         initial_chain = LLMChain(
 
-             model_config=self.model_config,
 
-             prompt=refine_prompts.PROMPT,
 
-             parameters=self.model_parameters
 
-         )
 
-         refine_chain = LLMChain(
 
-             model_config=self.model_config,
 
-             prompt=refine_prompts.REFINE_PROMPT,
 
-             parameters=self.model_parameters
 
-         )
 
-         return RefineDocumentsChain(
 
-             initial_llm_chain=initial_chain,
 
-             refine_llm_chain=refine_chain,
 
-             document_variable_name="text",
 
-             initial_response_name="existing_answer",
 
-             callbacks=self.callbacks
 
-         )
 
- def page_result(text: str, cursor: int, max_length: int) -> str:
 
-     """Page through `text` and return a substring of `max_length` characters starting from `cursor`."""
 
-     return text[cursor: cursor + max_length]
 
- def get_url(url: str) -> str:
 
-     """Fetch URL and return the contents as a string."""
 
-     headers = {
 
-         "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36"
 
-     }
 
-     supported_content_types = file_extractor.SUPPORT_URL_CONTENT_TYPES + ["text/html"]
 
-     head_response = requests.head(url, headers=headers, allow_redirects=True, timeout=(5, 10))
 
-     if head_response.status_code != 200:
 
-         return "URL returned status code {}.".format(head_response.status_code)
 
-     # check content-type
 
-     main_content_type = head_response.headers.get('Content-Type').split(';')[0].strip()
 
-     if main_content_type not in supported_content_types:
 
-         return "Unsupported content-type [{}] of URL.".format(main_content_type)
 
-     if main_content_type in file_extractor.SUPPORT_URL_CONTENT_TYPES:
 
-         return FileExtractor.load_from_url(url, return_text=True)
 
-     response = requests.get(url, headers=headers, allow_redirects=True, timeout=(5, 30))
 
-     a = extract_using_readabilipy(response.text)
 
-     if not a['plain_text'] or not a['plain_text'].strip():
 
-         return get_url_from_newspaper3k(url)
 
-     res = FULL_TEMPLATE.format(
 
-         title=a['title'],
 
-         authors=a['byline'],
 
-         publish_date=a['date'],
 
-         top_image="",
 
-         text=a['plain_text'] if a['plain_text'] else "",
 
-     )
 
-     return res
 
- def get_url_from_newspaper3k(url: str) -> str:
 
-     a = Article(url)
 
-     a.download()
 
-     a.parse()
 
-     res = FULL_TEMPLATE.format(
 
-         title=a.title,
 
-         authors=a.authors,
 
-         publish_date=a.publish_date,
 
-         top_image=a.top_image,
 
-         text=a.text,
 
-     )
 
-     return res
 
- def extract_using_readabilipy(html):
 
-     with tempfile.NamedTemporaryFile(delete=False, mode='w+') as f_html:
 
-         f_html.write(html)
 
-         f_html.close()
 
-     html_path = f_html.name
 
-     # Call Mozilla's Readability.js Readability.parse() function via node, writing output to a temporary file
 
-     article_json_path = html_path + ".json"
 
-     jsdir = os.path.join(find_module_path('readabilipy'), 'javascript')
 
-     with chdir(jsdir):
 
-         subprocess.check_call(["node", "ExtractArticle.js", "-i", html_path, "-o", article_json_path])
 
-     # Read output of call to Readability.parse() from JSON file and return as Python dictionary
 
-     with open(article_json_path, "r", encoding="utf-8") as json_file:
 
-         input_json = json.loads(json_file.read())
 
-     # Deleting files after processing
 
-     os.unlink(article_json_path)
 
-     os.unlink(html_path)
 
-     article_json = {
 
-         "title": None,
 
-         "byline": None,
 
-         "date": None,
 
-         "content": None,
 
-         "plain_content": None,
 
-         "plain_text": None
 
-     }
 
-     # Populate article fields from readability fields where present
 
-     if input_json:
 
-         if "title" in input_json and input_json["title"]:
 
-             article_json["title"] = input_json["title"]
 
-         if "byline" in input_json and input_json["byline"]:
 
-             article_json["byline"] = input_json["byline"]
 
-         if "date" in input_json and input_json["date"]:
 
-             article_json["date"] = input_json["date"]
 
-         if "content" in input_json and input_json["content"]:
 
-             article_json["content"] = input_json["content"]
 
-             article_json["plain_content"] = plain_content(article_json["content"], False, False)
 
-             article_json["plain_text"] = extract_text_blocks_as_plain_text(article_json["plain_content"])
 
-         if "textContent" in input_json and input_json["textContent"]:
 
-             article_json["plain_text"] = input_json["textContent"]
 
-             article_json["plain_text"] = re.sub(r'\n\s*\n', '\n', article_json["plain_text"])
 
-     return article_json
 
- def find_module_path(module_name):
 
-     for package_path in site.getsitepackages():
 
-         potential_path = os.path.join(package_path, module_name)
 
-         if os.path.exists(potential_path):
 
-             return potential_path
 
-     return None
 
- @contextmanager
 
- def chdir(path):
 
-     """Change directory in context and return to original on exit"""
 
-     # From https://stackoverflow.com/a/37996581, couldn't find a built-in
 
-     original_path = os.getcwd()
 
-     os.chdir(path)
 
-     try:
 
-         yield
 
-     finally:
 
-         os.chdir(original_path)
 
- def extract_text_blocks_as_plain_text(paragraph_html):
 
-     # Load article as DOM
 
-     soup = BeautifulSoup(paragraph_html, 'html.parser')
 
-     # Select all lists
 
-     list_elements = soup.find_all(['ul', 'ol'])
 
-     # Prefix text in all list items with "* " and make lists paragraphs
 
-     for list_element in list_elements:
 
-         plain_items = "".join(list(filter(None, [plain_text_leaf_node(li)["text"] for li in list_element.find_all('li')])))
 
-         list_element.string = plain_items
 
-         list_element.name = "p"
 
-     # Select all text blocks
 
-     text_blocks = [s.parent for s in soup.find_all(string=True)]
 
-     text_blocks = [plain_text_leaf_node(block) for block in text_blocks]
 
-     # Drop empty paragraphs
 
-     text_blocks = list(filter(lambda p: p["text"] is not None, text_blocks))
 
-     return text_blocks
 
- def plain_text_leaf_node(element):
 
-     # Extract all text, stripped of any child HTML elements and normalise it
 
-     plain_text = normalise_text(element.get_text())
 
-     if plain_text != "" and element.name == "li":
 
-         plain_text = "* {}, ".format(plain_text)
 
-     if plain_text == "":
 
-         plain_text = None
 
-     if "data-node-index" in element.attrs:
 
-         plain = {"node_index": element["data-node-index"], "text": plain_text}
 
-     else:
 
-         plain = {"text": plain_text}
 
-     return plain
 
- def plain_content(readability_content, content_digests, node_indexes):
 
-     # Load article as DOM
 
-     soup = BeautifulSoup(readability_content, 'html.parser')
 
-     # Make all elements plain
 
-     elements = plain_elements(soup.contents, content_digests, node_indexes)
 
-     if node_indexes:
 
-         # Add node index attributes to nodes
 
-         elements = [add_node_indexes(element) for element in elements]
 
-     # Replace article contents with plain elements
 
-     soup.contents = elements
 
-     return str(soup)
 
- def plain_elements(elements, content_digests, node_indexes):
 
-     # Get plain content versions of all elements
 
-     elements = [plain_element(element, content_digests, node_indexes)
 
-                 for element in elements]
 
-     if content_digests:
 
-         # Add content digest attribute to nodes
 
-         elements = [add_content_digest(element) for element in elements]
 
-     return elements
 
- def plain_element(element, content_digests, node_indexes):
 
-     # For lists, we make each item plain text
 
-     if is_leaf(element):
 
-         # For leaf node elements, extract the text content, discarding any HTML tags
 
-         # 1. Get element contents as text
 
-         plain_text = element.get_text()
 
-         # 2. Normalise the extracted text string to a canonical representation
 
-         plain_text = normalise_text(plain_text)
 
-         # 3. Update element content to be plain text
 
-         element.string = plain_text
 
-     elif is_text(element):
 
-         if is_non_printing(element):
 
-             # The simplified HTML may have come from Readability.js so might
 
-             # have non-printing text (e.g. Comment or CData). In this case, we
 
-             # keep the structure, but ensure that the string is empty.
 
-             element = type(element)("")
 
-         else:
 
-             plain_text = element.string
 
-             plain_text = normalise_text(plain_text)
 
-             element = type(element)(plain_text)
 
-     else:
 
-         # If not a leaf node or leaf type call recursively on child nodes, replacing
 
-         element.contents = plain_elements(element.contents, content_digests, node_indexes)
 
-     return element
 
- def add_node_indexes(element, node_index="0"):
 
-     # Can't add attributes to string types
 
-     if is_text(element):
 
-         return element
 
-     # Add index to current element
 
-     element["data-node-index"] = node_index
 
-     # Add index to child elements
 
-     for local_idx, child in enumerate(
 
-             [c for c in element.contents if not is_text(c)], start=1):
 
-         # Can't add attributes to leaf string types
 
-         child_index = "{stem}.{local}".format(
 
-             stem=node_index, local=local_idx)
 
-         add_node_indexes(child, node_index=child_index)
 
-     return element
 
- def normalise_text(text):
 
-     """Normalise unicode and whitespace."""
 
-     # Normalise unicode first to try and standardise whitespace characters as much as possible before normalising them
 
-     text = strip_control_characters(text)
 
-     text = normalise_unicode(text)
 
-     text = normalise_whitespace(text)
 
-     return text
 
- def strip_control_characters(text):
 
-     """Strip out unicode control characters which might break the parsing."""
 
-     # Unicode control characters
 
-     #   [Cc]: Other, Control [includes new lines]
 
-     #   [Cf]: Other, Format
 
-     #   [Cn]: Other, Not Assigned
 
-     #   [Co]: Other, Private Use
 
-     #   [Cs]: Other, Surrogate
 
-     control_chars = set(['Cc', 'Cf', 'Cn', 'Co', 'Cs'])
 
-     retained_chars = ['\t', '\n', '\r', '\f']
 
-     # Remove non-printing control characters
 
-     return "".join(["" if (unicodedata.category(char) in control_chars) and (char not in retained_chars) else char for char in text])
 
- def normalise_unicode(text):
 
-     """Normalise unicode such that things that are visually equivalent map to the same unicode string where possible."""
 
-     normal_form = "NFKC"
 
-     text = unicodedata.normalize(normal_form, text)
 
-     return text
 
- def normalise_whitespace(text):
 
-     """Replace runs of whitespace characters with a single space as this is what happens when HTML text is displayed."""
 
-     text = regex.sub(r"\s+", " ", text)
 
-     # Remove leading and trailing whitespace
 
-     text = text.strip()
 
-     return text
 
- def is_leaf(element):
 
-     return (element.name in ['p', 'li'])
 
- def is_text(element):
 
-     return isinstance(element, NavigableString)
 
- def is_non_printing(element):
 
-     return any(isinstance(element, _e) for _e in [Comment, CData])
 
- def add_content_digest(element):
 
-     if not is_text(element):
 
-         element["data-content-digest"] = content_digest(element)
 
-     return element
 
- def content_digest(element):
 
-     if is_text(element):
 
-         # Hash
 
-         trimmed_string = element.string.strip()
 
-         if trimmed_string == "":
 
-             digest = ""
 
-         else:
 
-             digest = hashlib.sha256(trimmed_string.encode('utf-8')).hexdigest()
 
-     else:
 
-         contents = element.contents
 
-         num_contents = len(contents)
 
-         if num_contents == 0:
 
-             # No hash when no child elements exist
 
-             digest = ""
 
-         elif num_contents == 1:
 
-             # If single child, use digest of child
 
-             digest = content_digest(contents[0])
 
-         else:
 
-             # Build content digest from the "non-empty" digests of child nodes
 
-             digest = hashlib.sha256()
 
-             child_digests = list(
 
-                 filter(lambda x: x != "", [content_digest(content) for content in contents]))
 
-             for child in child_digests:
 
-                 digest.update(child.encode('utf-8'))
 
-             digest = digest.hexdigest()
 
-     return digest
 
 
  |