Source code for htmldate.utils

# pylint:disable-msg=E0611,I1101
"""
Module bundling functions related to HTML processing.
"""

import logging
import re

from datetime import datetime
from typing import Any, List, Optional, Set, Union

import urllib3


# CChardet is faster and can be more accurate
try:
    from cchardet import detect as cchardet_detect  # type: ignore
except ImportError:
    cchardet_detect = None
from charset_normalizer import from_bytes

from lxml.html import HtmlElement, HTMLParser, fromstring

from .settings import MAX_FILE_SIZE, MIN_FILE_SIZE


LOGGER = logging.getLogger(__name__)

UNICODE_ALIASES: Set[str] = {"utf-8", "utf_8"}

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
RETRY_STRATEGY = urllib3.util.Retry(
    total=3,
    connect=0,
    status_forcelist=[429, 500, 502, 503, 504],
)
HTTP_POOL = urllib3.PoolManager(retries=RETRY_STRATEGY)

HTML_PARSER = HTMLParser(
    collect_ids=False, default_doctype=False, encoding="utf-8", remove_pis=True
)

DOCTYPE_TAG = re.compile("^< ?! ?DOCTYPE.+?/ ?>", re.I)
FAULTY_HTML = re.compile(r"(<html.*?)\s*/>", re.I)


class Extractor:
    "Defines a class to store all extraction options."
    __slots__ = ["extensive", "format", "max", "min", "original"]

    # consider dataclasses for Python 3.7+
    def __init__(
        self,
        extensive_search: bool,
        max_date: datetime,
        min_date: datetime,
        original_date: bool,
        outputformat: str,
    ) -> None:
        self.extensive: bool = extensive_search
        self.format: str = outputformat
        self.max: datetime = max_date
        self.min: datetime = min_date
        self.original: bool = original_date


def isutf8(data: bytes) -> bool:
    """Simple heuristic to determine if a bytestring uses standard unicode encoding"""
    try:
        data.decode("UTF-8")
    except UnicodeDecodeError:
        return False
    return True


def detect_encoding(bytesobject: bytes) -> List[str]:
    """Read all input or first chunk and return a list of encodings"""
    # alternatives: https://github.com/scrapy/w3lib/blob/master/w3lib/encoding.py
    # unicode-test
    if isutf8(bytesobject):
        return ["utf-8"]
    guesses = []
    # additional module
    if cchardet_detect is not None:
        cchardet_guess = cchardet_detect(bytesobject)["encoding"]
        if cchardet_guess is not None:
            guesses.append(cchardet_guess.lower())
    # try charset_normalizer on first part, fallback on full document
    detection_results = from_bytes(bytesobject[:15000]) or from_bytes(bytesobject)
    # return alternatives
    if len(detection_results) > 0:
        guesses.extend([r.encoding for r in detection_results])
    # it cannot be utf-8 (tested above)
    return [g for g in guesses if g not in UNICODE_ALIASES]


def decode_file(filecontent: Union[bytes, str]) -> str:
    """Guess bytestring encoding and try to decode to Unicode string.
    Resort to destructive conversion otherwise."""
    # init
    if isinstance(filecontent, str):
        return filecontent
    htmltext = None
    # encoding
    for guessed_encoding in detect_encoding(filecontent):
        try:
            htmltext = filecontent.decode(guessed_encoding)
        except (LookupError, UnicodeDecodeError):  # VISCII: lookup
            LOGGER.warning("wrong encoding detected: %s", guessed_encoding)
            htmltext = None
        else:
            break
    # return original content if nothing else succeeded
    return htmltext or str(filecontent, encoding="utf-8", errors="replace")


def decode_response(response: Any) -> str:
    """Read the urllib3 object corresponding to the server response, then
    try to guess its encoding and decode it to return a unicode string"""
    # urllib3 response object / bytes switch
    if isinstance(response, urllib3.response.HTTPResponse) or hasattr(response, "data"):
        resp_content = response.data
    else:
        resp_content = response
    return decode_file(resp_content)


[docs] def fetch_url(url: str) -> Optional[str]: """Fetches page using urllib3 and decodes the response. Args: url: URL of the page to fetch. Returns: HTML code as string, or Urllib3 response object (headers + body), or empty string in case the result is invalid, or None if there was a problem with the network. """ # send try: # read by streaming chunks (stream=True, iter_content=xx) # so we can stop downloading as soon as MAX_FILE_SIZE is reached response = HTTP_POOL.request("GET", url, timeout=30) # type: ignore except Exception as err: LOGGER.error("download error: %s %s", url, err) # sys.exc_info()[0] else: # safety checks if response.status != 200: LOGGER.error("not a 200 response: %s for URL %s", response.status, url) elif ( response.data is None or len(response.data) < MIN_FILE_SIZE or len(response.data) > MAX_FILE_SIZE ): LOGGER.error("incorrect input data for URL %s", url) else: return decode_response(response.data) return None
def is_dubious_html(beginning: str) -> bool: "Assess if the object is proper HTML (awith a corresponding tag or declaration)." return "html" not in beginning def repair_faulty_html(htmlstring: str, beginning: str) -> str: "Repair faulty HTML strings to make then palatable for libxml2." # libxml2/LXML issue: https://bugs.launchpad.net/lxml/+bug/1955915 if "doctype" in beginning: firstline, _, rest = htmlstring.partition("\n") htmlstring = DOCTYPE_TAG.sub("", firstline, count=1) + "\n" + rest # other issue with malformed documents: check first three lines for i, line in enumerate(iter(htmlstring.splitlines())): if "<html" in line and line.endswith("/>"): htmlstring = FAULTY_HTML.sub(r"\1>", htmlstring, count=1) break if i > 2: break return htmlstring def fromstring_bytes(htmlobject: str) -> Optional[HtmlElement]: "Try to pass bytes to LXML parser." tree = None try: tree = fromstring(htmlobject.encode("utf8"), parser=HTML_PARSER) except Exception as err: LOGGER.error("lxml parser bytestring %s", err) return tree
[docs] def load_html(htmlobject: Union[bytes, str, HtmlElement]) -> Optional[HtmlElement]: """Load object given as input and validate its type (accepted: lxml.html tree, bytestring and string) """ # use tree directly if isinstance(htmlobject, HtmlElement): return htmlobject # do not accept any other type after this point if not isinstance(htmlobject, (bytes, str)): raise TypeError("incompatible input type: %s", type(htmlobject)) # the string is a URL, download it if isinstance(htmlobject, str) and htmlobject.startswith("http"): htmltext = None if re.match(r"https?://[^ ]+$", htmlobject): LOGGER.info("URL detected, downloading: %s", htmlobject) htmltext = fetch_url(htmlobject) if htmltext is not None: htmlobject = htmltext # log the error and quit if htmltext is None: raise ValueError("URL couldn't be processed: %s", htmlobject) # start processing tree = None # try to guess encoding and decode file: if None then keep original htmlobject = decode_file(htmlobject) # sanity checks beginning = htmlobject[:50].lower() check_flag = is_dubious_html(beginning) # repair first htmlobject = repair_faulty_html(htmlobject, beginning) # first pass: use Unicode string fallback_parse = False try: tree = fromstring(htmlobject, parser=HTML_PARSER) except ValueError: # "Unicode strings with encoding declaration are not supported." fallback_parse = True tree = fromstring_bytes(htmlobject) except Exception as err: # pragma: no cover LOGGER.error("lxml parsing failed: %s", err) # second pass: try passing bytes to LXML if (tree is None or len(tree) < 1) and not fallback_parse: tree = fromstring_bytes(htmlobject) # rejection test: is it (well-formed) HTML at all? # log parsing errors if tree is not None and check_flag is True and len(tree) < 2: LOGGER.error( "parsed tree length: %s, wrong data type or not valid HTML", len(tree) ) tree = None return tree
def clean_html(tree: HtmlElement, elemlist: List[str]) -> HtmlElement: "Delete selected elements." for element in tree.iter(elemlist): # type: ignore[call-overload] try: element.drop_tree() except AttributeError: # pragma: no cover element.getparent().remove(element) return tree def trim_text(string: str) -> str: "Remove superfluous space and normalize remaining space." return " ".join(string.split()).strip()