web_reader_tool.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. import hashlib
  2. import json
  3. import mimetypes
  4. import os
  5. import re
  6. import site
  7. import subprocess
  8. import tempfile
  9. import unicodedata
  10. from contextlib import contextmanager
  11. from pathlib import Path
  12. from typing import Any, Literal, Optional, cast
  13. from urllib.parse import unquote
  14. import chardet
  15. import cloudscraper # type: ignore
  16. from bs4 import BeautifulSoup, CData, Comment, NavigableString # type: ignore
  17. from regex import regex # type: ignore
  18. from core.helper import ssrf_proxy
  19. from core.rag.extractor import extract_processor
  20. from core.rag.extractor.extract_processor import ExtractProcessor
  21. FULL_TEMPLATE = """
  22. TITLE: {title}
  23. AUTHORS: {authors}
  24. PUBLISH DATE: {publish_date}
  25. TOP_IMAGE_URL: {top_image}
  26. TEXT:
  27. {text}
  28. """
  29. def page_result(text: str, cursor: int, max_length: int) -> str:
  30. """Page through `text` and return a substring of `max_length` characters starting from `cursor`."""
  31. return text[cursor : cursor + max_length]
  32. def get_url(url: str, user_agent: Optional[str] = None) -> str:
  33. """Fetch URL and return the contents as a string."""
  34. headers = {
  35. "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko)"
  36. " Chrome/91.0.4472.124 Safari/537.36"
  37. }
  38. if user_agent:
  39. headers["User-Agent"] = user_agent
  40. main_content_type = None
  41. supported_content_types = extract_processor.SUPPORT_URL_CONTENT_TYPES + ["text/html"]
  42. response = ssrf_proxy.head(url, headers=headers, follow_redirects=True, timeout=(5, 10))
  43. if response.status_code == 200:
  44. # check content-type
  45. content_type = response.headers.get("Content-Type")
  46. if content_type:
  47. main_content_type = response.headers.get("Content-Type").split(";")[0].strip()
  48. else:
  49. content_disposition = response.headers.get("Content-Disposition", "")
  50. filename_match = re.search(r'filename="([^"]+)"', content_disposition)
  51. if filename_match:
  52. filename = unquote(filename_match.group(1))
  53. extension = re.search(r"\.(\w+)$", filename)
  54. if extension:
  55. main_content_type = mimetypes.guess_type(filename)[0]
  56. if main_content_type not in supported_content_types:
  57. return "Unsupported content-type [{}] of URL.".format(main_content_type)
  58. if main_content_type in extract_processor.SUPPORT_URL_CONTENT_TYPES:
  59. return cast(str, ExtractProcessor.load_from_url(url, return_text=True))
  60. response = ssrf_proxy.get(url, headers=headers, follow_redirects=True, timeout=(120, 300))
  61. elif response.status_code == 403:
  62. scraper = cloudscraper.create_scraper()
  63. scraper.perform_request = ssrf_proxy.make_request
  64. response = scraper.get(url, headers=headers, follow_redirects=True, timeout=(120, 300))
  65. if response.status_code != 200:
  66. return "URL returned status code {}.".format(response.status_code)
  67. # Detect encoding using chardet
  68. detected_encoding = chardet.detect(response.content)
  69. encoding = detected_encoding["encoding"]
  70. if encoding:
  71. try:
  72. content = response.content.decode(encoding)
  73. except (UnicodeDecodeError, TypeError):
  74. content = response.text
  75. else:
  76. content = response.text
  77. a = extract_using_readabilipy(content)
  78. if not a["plain_text"] or not a["plain_text"].strip():
  79. return ""
  80. res = FULL_TEMPLATE.format(
  81. title=a["title"],
  82. authors=a["byline"],
  83. publish_date=a["date"],
  84. top_image="",
  85. text=a["plain_text"] or "",
  86. )
  87. return res
  88. def extract_using_readabilipy(html):
  89. with tempfile.NamedTemporaryFile(delete=False, mode="w+") as f_html:
  90. f_html.write(html)
  91. f_html.close()
  92. html_path = f_html.name
  93. # Call Mozilla's Readability.js Readability.parse() function via node, writing output to a temporary file
  94. article_json_path = html_path + ".json"
  95. jsdir = os.path.join(find_module_path("readabilipy"), "javascript")
  96. with chdir(jsdir):
  97. subprocess.check_call(["node", "ExtractArticle.js", "-i", html_path, "-o", article_json_path])
  98. # Read output of call to Readability.parse() from JSON file and return as Python dictionary
  99. input_json = json.loads(Path(article_json_path).read_text(encoding="utf-8"))
  100. # Deleting files after processing
  101. os.unlink(article_json_path)
  102. os.unlink(html_path)
  103. article_json: dict[str, Any] = {
  104. "title": None,
  105. "byline": None,
  106. "date": None,
  107. "content": None,
  108. "plain_content": None,
  109. "plain_text": None,
  110. }
  111. # Populate article fields from readability fields where present
  112. if input_json:
  113. if input_json.get("title"):
  114. article_json["title"] = input_json["title"]
  115. if input_json.get("byline"):
  116. article_json["byline"] = input_json["byline"]
  117. if input_json.get("date"):
  118. article_json["date"] = input_json["date"]
  119. if input_json.get("content"):
  120. article_json["content"] = input_json["content"]
  121. article_json["plain_content"] = plain_content(article_json["content"], False, False)
  122. article_json["plain_text"] = extract_text_blocks_as_plain_text(article_json["plain_content"])
  123. if input_json.get("textContent"):
  124. article_json["plain_text"] = input_json["textContent"]
  125. article_json["plain_text"] = re.sub(r"\n\s*\n", "\n", article_json["plain_text"])
  126. return article_json
  127. def find_module_path(module_name):
  128. for package_path in site.getsitepackages():
  129. potential_path = os.path.join(package_path, module_name)
  130. if os.path.exists(potential_path):
  131. return potential_path
  132. return None
  133. @contextmanager
  134. def chdir(path):
  135. """Change directory in context and return to original on exit"""
  136. # From https://stackoverflow.com/a/37996581, couldn't find a built-in
  137. original_path = os.getcwd()
  138. os.chdir(path)
  139. try:
  140. yield
  141. finally:
  142. os.chdir(original_path)
  143. def extract_text_blocks_as_plain_text(paragraph_html):
  144. # Load article as DOM
  145. soup = BeautifulSoup(paragraph_html, "html.parser")
  146. # Select all lists
  147. list_elements = soup.find_all(["ul", "ol"])
  148. # Prefix text in all list items with "* " and make lists paragraphs
  149. for list_element in list_elements:
  150. plain_items = "".join(
  151. list(filter(None, [plain_text_leaf_node(li)["text"] for li in list_element.find_all("li")]))
  152. )
  153. list_element.string = plain_items
  154. list_element.name = "p"
  155. # Select all text blocks
  156. text_blocks = [s.parent for s in soup.find_all(string=True)]
  157. text_blocks = [plain_text_leaf_node(block) for block in text_blocks]
  158. # Drop empty paragraphs
  159. text_blocks = list(filter(lambda p: p["text"] is not None, text_blocks))
  160. return text_blocks
  161. def plain_text_leaf_node(element):
  162. # Extract all text, stripped of any child HTML elements and normalize it
  163. plain_text = normalize_text(element.get_text())
  164. if plain_text != "" and element.name == "li":
  165. plain_text = "* {}, ".format(plain_text)
  166. if plain_text == "":
  167. plain_text = None
  168. if "data-node-index" in element.attrs:
  169. plain = {"node_index": element["data-node-index"], "text": plain_text}
  170. else:
  171. plain = {"text": plain_text}
  172. return plain
  173. def plain_content(readability_content, content_digests, node_indexes):
  174. # Load article as DOM
  175. soup = BeautifulSoup(readability_content, "html.parser")
  176. # Make all elements plain
  177. elements = plain_elements(soup.contents, content_digests, node_indexes)
  178. if node_indexes:
  179. # Add node index attributes to nodes
  180. elements = [add_node_indexes(element) for element in elements]
  181. # Replace article contents with plain elements
  182. soup.contents = elements
  183. return str(soup)
  184. def plain_elements(elements, content_digests, node_indexes):
  185. # Get plain content versions of all elements
  186. elements = [plain_element(element, content_digests, node_indexes) for element in elements]
  187. if content_digests:
  188. # Add content digest attribute to nodes
  189. elements = [add_content_digest(element) for element in elements]
  190. return elements
  191. def plain_element(element, content_digests, node_indexes):
  192. # For lists, we make each item plain text
  193. if is_leaf(element):
  194. # For leaf node elements, extract the text content, discarding any HTML tags
  195. # 1. Get element contents as text
  196. plain_text = element.get_text()
  197. # 2. Normalize the extracted text string to a canonical representation
  198. plain_text = normalize_text(plain_text)
  199. # 3. Update element content to be plain text
  200. element.string = plain_text
  201. elif is_text(element):
  202. if is_non_printing(element):
  203. # The simplified HTML may have come from Readability.js so might
  204. # have non-printing text (e.g. Comment or CData). In this case, we
  205. # keep the structure, but ensure that the string is empty.
  206. element = type(element)("")
  207. else:
  208. plain_text = element.string
  209. plain_text = normalize_text(plain_text)
  210. element = type(element)(plain_text)
  211. else:
  212. # If not a leaf node or leaf type call recursively on child nodes, replacing
  213. element.contents = plain_elements(element.contents, content_digests, node_indexes)
  214. return element
  215. def add_node_indexes(element, node_index="0"):
  216. # Can't add attributes to string types
  217. if is_text(element):
  218. return element
  219. # Add index to current element
  220. element["data-node-index"] = node_index
  221. # Add index to child elements
  222. for local_idx, child in enumerate([c for c in element.contents if not is_text(c)], start=1):
  223. # Can't add attributes to leaf string types
  224. child_index = "{stem}.{local}".format(stem=node_index, local=local_idx)
  225. add_node_indexes(child, node_index=child_index)
  226. return element
  227. def normalize_text(text):
  228. """Normalize unicode and whitespace."""
  229. # Normalize unicode first to try and standardize whitespace characters as much as possible before normalizing them
  230. text = strip_control_characters(text)
  231. text = normalize_unicode(text)
  232. text = normalize_whitespace(text)
  233. return text
  234. def strip_control_characters(text):
  235. """Strip out unicode control characters which might break the parsing."""
  236. # Unicode control characters
  237. # [Cc]: Other, Control [includes new lines]
  238. # [Cf]: Other, Format
  239. # [Cn]: Other, Not Assigned
  240. # [Co]: Other, Private Use
  241. # [Cs]: Other, Surrogate
  242. control_chars = {"Cc", "Cf", "Cn", "Co", "Cs"}
  243. retained_chars = ["\t", "\n", "\r", "\f"]
  244. # Remove non-printing control characters
  245. return "".join(
  246. [
  247. "" if (unicodedata.category(char) in control_chars) and (char not in retained_chars) else char
  248. for char in text
  249. ]
  250. )
  251. def normalize_unicode(text):
  252. """Normalize unicode such that things that are visually equivalent map to the same unicode string where possible."""
  253. normal_form: Literal["NFC", "NFD", "NFKC", "NFKD"] = "NFKC"
  254. text = unicodedata.normalize(normal_form, text)
  255. return text
  256. def normalize_whitespace(text):
  257. """Replace runs of whitespace characters with a single space as this is what happens when HTML text is displayed."""
  258. text = regex.sub(r"\s+", " ", text)
  259. # Remove leading and trailing whitespace
  260. text = text.strip()
  261. return text
  262. def is_leaf(element):
  263. return element.name in {"p", "li"}
  264. def is_text(element):
  265. return isinstance(element, NavigableString)
  266. def is_non_printing(element):
  267. return any(isinstance(element, _e) for _e in [Comment, CData])
  268. def add_content_digest(element):
  269. if not is_text(element):
  270. element["data-content-digest"] = content_digest(element)
  271. return element
  272. def content_digest(element):
  273. digest: Any
  274. if is_text(element):
  275. # Hash
  276. trimmed_string = element.string.strip()
  277. if trimmed_string == "":
  278. digest = ""
  279. else:
  280. digest = hashlib.sha256(trimmed_string.encode("utf-8")).hexdigest()
  281. else:
  282. contents = element.contents
  283. num_contents = len(contents)
  284. if num_contents == 0:
  285. # No hash when no child elements exist
  286. digest = ""
  287. elif num_contents == 1:
  288. # If single child, use digest of child
  289. digest = content_digest(contents[0])
  290. else:
  291. # Build content digest from the "non-empty" digests of child nodes
  292. digest = hashlib.sha256()
  293. child_digests = list(filter(lambda x: x != "", [content_digest(content) for content in contents]))
  294. for child in child_digests:
  295. digest.update(child.encode("utf-8"))
  296. digest = digest.hexdigest()
  297. return digest
  298. def get_image_upload_file_ids(content):
  299. pattern = r"!\[image\]\((http?://.*?(file-preview|image-preview))\)"
  300. matches = re.findall(pattern, content)
  301. image_upload_file_ids = []
  302. for match in matches:
  303. if match[1] == "file-preview":
  304. content_pattern = r"files/([^/]+)/file-preview"
  305. else:
  306. content_pattern = r"files/([^/]+)/image-preview"
  307. content_match = re.search(content_pattern, match[0])
  308. if content_match:
  309. image_upload_file_id = content_match.group(1)
  310. image_upload_file_ids.append(image_upload_file_id)
  311. return image_upload_file_ids