| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135 | import loggingimport refrom typing import List, Optional, Tuple, castfrom langchain.document_loaders.base import BaseLoaderfrom langchain.document_loaders.helpers import detect_file_encodingsfrom langchain.schema import Documentlogger = logging.getLogger(__name__)class MarkdownLoader(BaseLoader):    """Load md files.    Args:        file_path: Path to the file to load.        remove_hyperlinks: Whether to remove hyperlinks from the text.        remove_images: Whether to remove images from the text.        encoding: File encoding to use. If `None`, the file will be loaded        with the default system encoding.        autodetect_encoding: Whether to try to autodetect the file encoding            if the specified encoding fails.    """    def __init__(        self,        file_path: str,        remove_hyperlinks: bool = True,        remove_images: bool = True,        encoding: Optional[str] = None,        autodetect_encoding: bool = True,    ):        """Initialize with file path."""        self._file_path = file_path        self._remove_hyperlinks = remove_hyperlinks        self._remove_images = remove_images        self._encoding = encoding        self._autodetect_encoding = autodetect_encoding    def load(self) -> List[Document]:        tups = self.parse_tups(self._file_path)        documents = []        for header, value in tups:            value = value.strip()            if header is None:                documents.append(Document(page_content=value))            else:                documents.append(Document(page_content=f"\n\n{header}\n{value}"))        return documents    def markdown_to_tups(self, markdown_text: str) -> List[Tuple[Optional[str], str]]:        """Convert a markdown file to a dictionary.        The keys are the headers and the values are the text under each header.        """        markdown_tups: List[Tuple[Optional[str], str]] = []        lines = markdown_text.split("\n")        current_header = None        current_text = ""        for line in lines:            header_match = re.match(r"^#+\s", line)            if header_match:                if current_header is not None:                    markdown_tups.append((current_header, current_text))                current_header = line                current_text = ""            else:                current_text += line + "\n"        markdown_tups.append((current_header, current_text))        if current_header is not None:            # pass linting, assert keys are defined            markdown_tups = [                (re.sub(r"#", "", cast(str, key)).strip(), re.sub(r"<.*?>", "", value))                for key, value in markdown_tups            ]        else:            markdown_tups = [                (key, re.sub("\n", "", value)) for key, value in markdown_tups            ]        return markdown_tups    def remove_images(self, content: str) -> str:        """Get a dictionary of a markdown file from its path."""        pattern = r"!{1}\[\[(.*)\]\]"        content = re.sub(pattern, "", content)        return content    def remove_hyperlinks(self, content: str) -> str:        """Get a dictionary of a markdown file from its path."""        pattern = r"\[(.*?)\]\((.*?)\)"        content = re.sub(pattern, r"\1", content)        return content    def parse_tups(self, filepath: str) -> List[Tuple[Optional[str], str]]:        """Parse file into tuples."""        content = ""        try:            with open(filepath, "r", encoding=self._encoding) as f:                content = f.read()        except UnicodeDecodeError as e:            if self._autodetect_encoding:                detected_encodings = detect_file_encodings(filepath)                for encoding in detected_encodings:                    logger.debug("Trying encoding: ", encoding.encoding)                    try:                        with open(filepath, encoding=encoding.encoding) as f:                            content = f.read()                        break                    except UnicodeDecodeError:                        continue            else:                raise RuntimeError(f"Error loading {filepath}") from e        except Exception as e:            raise RuntimeError(f"Error loading {filepath}") from e        if self._remove_hyperlinks:            content = self.remove_hyperlinks(content)        if self._remove_images:            content = self.remove_images(content)        return self.markdown_to_tups(content)
 |