From 69cca2a10364a6128415815d3fc14df7314db596 Mon Sep 17 00:00:00 2001 From: Andrew Murray Date: Sat, 6 Jul 2024 15:08:35 +1000 Subject: [PATCH] Added type hints --- src/PIL/PdfParser.py | 274 +++++++++++++++++++++++++++---------------- 1 file changed, 172 insertions(+), 102 deletions(-) diff --git a/src/PIL/PdfParser.py b/src/PIL/PdfParser.py index 622dc7de9..7cb2d241b 100644 --- a/src/PIL/PdfParser.py +++ b/src/PIL/PdfParser.py @@ -8,7 +8,7 @@ import os import re import time import zlib -from typing import TYPE_CHECKING, Any, NamedTuple, Union +from typing import IO, TYPE_CHECKING, Any, NamedTuple, Union # see 7.9.2.2 Text String Type on page 86 and D.3 PDFDocEncoding Character Set @@ -62,7 +62,7 @@ PDFDocEncoding = { } -def decode_text(b): +def decode_text(b: bytes) -> str: if b[: len(codecs.BOM_UTF16_BE)] == codecs.BOM_UTF16_BE: return b[len(codecs.BOM_UTF16_BE) :].decode("utf_16_be") else: @@ -99,7 +99,7 @@ class IndirectReference(IndirectReferenceTuple): assert isinstance(other, IndirectReference) return other.object_id == self.object_id and other.generation == self.generation - def __ne__(self, other): + def __ne__(self, other: object) -> bool: return not (self == other) def __hash__(self) -> int: @@ -112,13 +112,17 @@ class IndirectObjectDef(IndirectReference): class XrefTable: - def __init__(self): - self.existing_entries = {} # object ID => (offset, generation) - self.new_entries = {} # object ID => (offset, generation) + def __init__(self) -> None: + self.existing_entries: dict[int, tuple[int, int]] = ( + {} + ) # object ID => (offset, generation) + self.new_entries: dict[int, tuple[int, int]] = ( + {} + ) # object ID => (offset, generation) self.deleted_entries = {0: 65536} # object ID => generation self.reading_finished = False - def __setitem__(self, key, value): + def __setitem__(self, key: int, value: tuple[int, int]) -> None: if self.reading_finished: self.new_entries[key] = value else: @@ -126,13 +130,13 @@ class XrefTable: if key in self.deleted_entries: del self.deleted_entries[key] - def __getitem__(self, key): + def __getitem__(self, key: int) -> tuple[int, int]: try: return self.new_entries[key] except KeyError: return self.existing_entries[key] - def __delitem__(self, key): + def __delitem__(self, key: int) -> None: if key in self.new_entries: generation = self.new_entries[key][1] + 1 del self.new_entries[key] @@ -146,7 +150,7 @@ class XrefTable: msg = f"object ID {key} cannot be deleted because it doesn't exist" raise IndexError(msg) - def __contains__(self, key): + def __contains__(self, key: int) -> bool: return key in self.existing_entries or key in self.new_entries def __len__(self) -> int: @@ -156,19 +160,19 @@ class XrefTable: | set(self.deleted_entries.keys()) ) - def keys(self): + def keys(self) -> set[int]: return ( set(self.existing_entries.keys()) - set(self.deleted_entries.keys()) ) | set(self.new_entries.keys()) - def write(self, f): + def write(self, f: IO[bytes]) -> int: keys = sorted(set(self.new_entries.keys()) | set(self.deleted_entries.keys())) deleted_keys = sorted(set(self.deleted_entries.keys())) startxref = f.tell() f.write(b"xref\n") while keys: # find a contiguous sequence of object IDs - prev = None + prev: int | None = None for index, key in enumerate(keys): if prev is None or prev + 1 == key: prev = key @@ -178,7 +182,7 @@ class XrefTable: break else: contiguous_keys = keys - keys = None + keys = [] f.write(b"%d %d\n" % (contiguous_keys[0], len(contiguous_keys))) for object_id in contiguous_keys: if object_id in self.new_entries: @@ -202,7 +206,9 @@ class XrefTable: class PdfName: - def __init__(self, name): + name: bytes + + def __init__(self, name: PdfName | bytes | str) -> None: if isinstance(name, PdfName): self.name = name.name elif isinstance(name, bytes): @@ -213,7 +219,7 @@ class PdfName: def name_as_str(self) -> str: return self.name.decode("us-ascii") - def __eq__(self, other): + def __eq__(self, other: object) -> bool: return ( isinstance(other, PdfName) and other.name == self.name ) or other == self.name @@ -225,7 +231,7 @@ class PdfName: return f"{self.__class__.__name__}({repr(self.name)})" @classmethod - def from_pdf_stream(cls, data): + def from_pdf_stream(cls, data: bytes) -> PdfName: return cls(PdfParser.interpret_name(data)) allowed_chars = set(range(33, 127)) - {ord(c) for c in "#%/()<>[]{}"} @@ -252,13 +258,13 @@ else: class PdfDict(_DictBase): - def __setattr__(self, key, value): + def __setattr__(self, key: str, value: Any) -> None: if key == "data": collections.UserDict.__setattr__(self, key, value) else: self[key.encode("us-ascii")] = value - def __getattr__(self, key): + def __getattr__(self, key: str) -> str | time.struct_time: try: value = self[key.encode("us-ascii")] except KeyError as e: @@ -300,7 +306,7 @@ class PdfDict(_DictBase): class PdfBinary: - def __init__(self, data): + def __init__(self, data: list[int] | bytes) -> None: self.data = data def __bytes__(self) -> bytes: @@ -308,27 +314,27 @@ class PdfBinary: class PdfStream: - def __init__(self, dictionary, buf): + def __init__(self, dictionary: PdfDict, buf: bytes) -> None: self.dictionary = dictionary self.buf = buf - def decode(self): + def decode(self) -> bytes: try: - filter = self.dictionary.Filter - except AttributeError: + filter = self.dictionary[b"Filter"] + except KeyError: return self.buf if filter == b"FlateDecode": try: - expected_length = self.dictionary.DL - except AttributeError: - expected_length = self.dictionary.Length + expected_length = self.dictionary[b"DL"] + except KeyError: + expected_length = self.dictionary[b"Length"] return zlib.decompress(self.buf, bufsize=int(expected_length)) else: - msg = f"stream filter {repr(self.dictionary.Filter)} unknown/unsupported" + msg = f"stream filter {repr(filter)} unknown/unsupported" raise NotImplementedError(msg) -def pdf_repr(x): +def pdf_repr(x: Any) -> bytes: if x is True: return b"true" elif x is False: @@ -363,12 +369,19 @@ class PdfParser: Supports PDF up to 1.4 """ - def __init__(self, filename=None, f=None, buf=None, start_offset=0, mode="rb"): + def __init__( + self, + filename: str | None = None, + f: IO[bytes] | None = None, + buf: bytes | bytearray | None = None, + start_offset: int = 0, + mode: str = "rb", + ) -> None: if buf and f: msg = "specify buf or f or filename, but not both buf and f" raise RuntimeError(msg) self.filename = filename - self.buf = buf + self.buf: bytes | bytearray | mmap.mmap | None = buf self.f = f self.start_offset = start_offset self.should_close_buf = False @@ -377,12 +390,16 @@ class PdfParser: self.f = f = open(filename, mode) self.should_close_file = True if f is not None: - self.buf = buf = self.get_buf_from_file(f) + self.buf = self.get_buf_from_file(f) self.should_close_buf = True if not filename and hasattr(f, "name"): self.filename = f.name - self.cached_objects = {} - if buf: + self.cached_objects: dict[IndirectReference, Any] = {} + self.root_ref: IndirectReference | None + self.info_ref: IndirectReference | None + self.pages_ref: IndirectReference | None + self.last_xref_section_offset: int | None + if self.buf: self.read_pdf_info() else: self.file_size_total = self.file_size_this = 0 @@ -390,12 +407,12 @@ class PdfParser: self.root_ref = None self.info = PdfDict() self.info_ref = None - self.page_tree_root = {} - self.pages = [] - self.orig_pages = [] + self.page_tree_root = PdfDict() + self.pages: list[IndirectReference] = [] + self.orig_pages: list[IndirectReference] = [] self.pages_ref = None self.last_xref_section_offset = None - self.trailer_dict = {} + self.trailer_dict: dict[bytes, Any] = {} self.xref_table = XrefTable() self.xref_table.reading_finished = True if f: @@ -412,10 +429,8 @@ class PdfParser: self.seek_end() def close_buf(self) -> None: - try: + if isinstance(self.buf, mmap.mmap): self.buf.close() - except AttributeError: - pass self.buf = None def close(self) -> None: @@ -426,15 +441,19 @@ class PdfParser: self.f = None def seek_end(self) -> None: + assert self.f is not None self.f.seek(0, os.SEEK_END) def write_header(self) -> None: + assert self.f is not None self.f.write(b"%PDF-1.4\n") - def write_comment(self, s): + def write_comment(self, s: str) -> None: + assert self.f is not None self.f.write(f"% {s}\n".encode()) def write_catalog(self) -> IndirectReference: + assert self.f is not None self.del_root() self.root_ref = self.next_object_id(self.f.tell()) self.pages_ref = self.next_object_id(0) @@ -477,7 +496,10 @@ class PdfParser: pages_tree_node_ref = pages_tree_node.get(b"Parent", None) self.orig_pages = [] - def write_xref_and_trailer(self, new_root_ref=None): + def write_xref_and_trailer( + self, new_root_ref: IndirectReference | None = None + ) -> None: + assert self.f is not None if new_root_ref: self.del_root() self.root_ref = new_root_ref @@ -485,7 +507,10 @@ class PdfParser: self.info_ref = self.write_obj(None, self.info) start_xref = self.xref_table.write(self.f) num_entries = len(self.xref_table) - trailer_dict = {b"Root": self.root_ref, b"Size": num_entries} + trailer_dict: dict[str | bytes, Any] = { + b"Root": self.root_ref, + b"Size": num_entries, + } if self.last_xref_section_offset is not None: trailer_dict[b"Prev"] = self.last_xref_section_offset if self.info: @@ -497,16 +522,20 @@ class PdfParser: + b"\nstartxref\n%d\n%%%%EOF" % start_xref ) - def write_page(self, ref, *objs, **dict_obj): - if isinstance(ref, int): - ref = self.pages[ref] + def write_page( + self, ref: int | IndirectReference | None, *objs: Any, **dict_obj: Any + ) -> IndirectReference: + obj_ref = self.pages[ref] if isinstance(ref, int) else ref if "Type" not in dict_obj: dict_obj["Type"] = PdfName(b"Page") if "Parent" not in dict_obj: dict_obj["Parent"] = self.pages_ref - return self.write_obj(ref, *objs, **dict_obj) + return self.write_obj(obj_ref, *objs, **dict_obj) - def write_obj(self, ref, *objs, **dict_obj): + def write_obj( + self, ref: IndirectReference | None, *objs: Any, **dict_obj: Any + ) -> IndirectReference: + assert self.f is not None f = self.f if ref is None: ref = self.next_object_id(f.tell()) @@ -534,7 +563,7 @@ class PdfParser: del self.xref_table[self.root[b"Pages"].object_id] @staticmethod - def get_buf_from_file(f): + def get_buf_from_file(f: IO[bytes]) -> bytes | mmap.mmap: if hasattr(f, "getbuffer"): return f.getbuffer() elif hasattr(f, "getvalue"): @@ -546,10 +575,15 @@ class PdfParser: return b"" def read_pdf_info(self) -> None: + assert self.buf is not None self.file_size_total = len(self.buf) self.file_size_this = self.file_size_total - self.start_offset self.read_trailer() + check_format_condition( + self.trailer_dict.get(b"Root") is not None, "Root is missing" + ) self.root_ref = self.trailer_dict[b"Root"] + assert self.root_ref is not None self.info_ref = self.trailer_dict.get(b"Info", None) self.root = PdfDict(self.read_indirect(self.root_ref)) if self.info_ref is None: @@ -560,12 +594,15 @@ class PdfParser: check_format_condition( self.root[b"Type"] == b"Catalog", "/Type in Root is not /Catalog" ) - check_format_condition(b"Pages" in self.root, "/Pages missing in Root") + check_format_condition( + self.root.get(b"Pages") is not None, "/Pages missing in Root" + ) check_format_condition( isinstance(self.root[b"Pages"], IndirectReference), "/Pages in Root is not an indirect reference", ) self.pages_ref = self.root[b"Pages"] + assert self.pages_ref is not None self.page_tree_root = self.read_indirect(self.pages_ref) self.pages = self.linearize_page_tree(self.page_tree_root) # save the original list of page references @@ -573,7 +610,7 @@ class PdfParser: # and we need to rewrite the pages and their list self.orig_pages = self.pages[:] - def next_object_id(self, offset=None): + def next_object_id(self, offset: int | None = None) -> IndirectReference: try: # TODO: support reuse of deleted objects reference = IndirectReference(max(self.xref_table.keys()) + 1, 0) @@ -623,12 +660,13 @@ class PdfParser: re.DOTALL, ) - def read_trailer(self): + def read_trailer(self) -> None: + assert self.buf is not None search_start_offset = len(self.buf) - 16384 if search_start_offset < self.start_offset: search_start_offset = self.start_offset m = self.re_trailer_end.search(self.buf, search_start_offset) - check_format_condition(m, "trailer end not found") + check_format_condition(m is not None, "trailer end not found") # make sure we found the LAST trailer last_match = m while m: @@ -636,6 +674,7 @@ class PdfParser: m = self.re_trailer_end.search(self.buf, m.start() + 16) if not m: m = last_match + assert m is not None trailer_data = m.group(1) self.last_xref_section_offset = int(m.group(2)) self.trailer_dict = self.interpret_trailer(trailer_data) @@ -644,12 +683,14 @@ class PdfParser: if b"Prev" in self.trailer_dict: self.read_prev_trailer(self.trailer_dict[b"Prev"]) - def read_prev_trailer(self, xref_section_offset): + def read_prev_trailer(self, xref_section_offset: int) -> None: + assert self.buf is not None trailer_offset = self.read_xref_table(xref_section_offset=xref_section_offset) m = self.re_trailer_prev.search( self.buf[trailer_offset : trailer_offset + 16384] ) - check_format_condition(m, "previous trailer not found") + check_format_condition(m is not None, "previous trailer not found") + assert m is not None trailer_data = m.group(1) check_format_condition( int(m.group(2)) == xref_section_offset, @@ -670,7 +711,7 @@ class PdfParser: re_dict_end = re.compile(whitespace_optional + rb">>" + whitespace_optional) @classmethod - def interpret_trailer(cls, trailer_data): + def interpret_trailer(cls, trailer_data: bytes) -> dict[bytes, Any]: trailer = {} offset = 0 while True: @@ -678,14 +719,18 @@ class PdfParser: if not m: m = cls.re_dict_end.match(trailer_data, offset) check_format_condition( - m and m.end() == len(trailer_data), + m is not None and m.end() == len(trailer_data), "name not found in trailer, remaining data: " + repr(trailer_data[offset:]), ) break key = cls.interpret_name(m.group(1)) - value, offset = cls.get_value(trailer_data, m.end()) + assert isinstance(key, bytes) + value, value_offset = cls.get_value(trailer_data, m.end()) trailer[key] = value + if value_offset is None: + break + offset = value_offset check_format_condition( b"Size" in trailer and isinstance(trailer[b"Size"], int), "/Size not in trailer or not an integer", @@ -699,7 +744,7 @@ class PdfParser: re_hashes_in_name = re.compile(rb"([^#]*)(#([0-9a-fA-F]{2}))?") @classmethod - def interpret_name(cls, raw, as_text=False): + def interpret_name(cls, raw: bytes, as_text: bool = False) -> str | bytes: name = b"" for m in cls.re_hashes_in_name.finditer(raw): if m.group(3): @@ -761,7 +806,13 @@ class PdfParser: ) @classmethod - def get_value(cls, data, offset, expect_indirect=None, max_nesting=-1): + def get_value( + cls, + data: bytes | bytearray | mmap.mmap, + offset: int, + expect_indirect: IndirectReference | None = None, + max_nesting: int = -1, + ) -> tuple[Any, int | None]: if max_nesting == 0: return None, None m = cls.re_comment.match(data, offset) @@ -783,11 +834,16 @@ class PdfParser: == IndirectReference(int(m.group(1)), int(m.group(2))), "indirect object definition different than expected", ) - object, offset = cls.get_value(data, m.end(), max_nesting=max_nesting - 1) - if offset is None: + object, object_offset = cls.get_value( + data, m.end(), max_nesting=max_nesting - 1 + ) + if object_offset is None: return object, None - m = cls.re_indirect_def_end.match(data, offset) - check_format_condition(m, "indirect object definition end not found") + m = cls.re_indirect_def_end.match(data, object_offset) + check_format_condition( + m is not None, "indirect object definition end not found" + ) + assert m is not None return object, m.end() check_format_condition( not expect_indirect, "indirect object definition not found" @@ -806,46 +862,53 @@ class PdfParser: m = cls.re_dict_start.match(data, offset) if m: offset = m.end() - result = {} + result: dict[Any, Any] = {} m = cls.re_dict_end.match(data, offset) + current_offset: int | None = offset while not m: - key, offset = cls.get_value(data, offset, max_nesting=max_nesting - 1) - if offset is None: + assert current_offset is not None + key, current_offset = cls.get_value( + data, current_offset, max_nesting=max_nesting - 1 + ) + if current_offset is None: return result, None - value, offset = cls.get_value(data, offset, max_nesting=max_nesting - 1) + value, current_offset = cls.get_value( + data, current_offset, max_nesting=max_nesting - 1 + ) result[key] = value - if offset is None: + if current_offset is None: return result, None - m = cls.re_dict_end.match(data, offset) - offset = m.end() - m = cls.re_stream_start.match(data, offset) + m = cls.re_dict_end.match(data, current_offset) + current_offset = m.end() + m = cls.re_stream_start.match(data, current_offset) if m: - try: - stream_len_str = result.get(b"Length") - stream_len = int(stream_len_str) - except (TypeError, ValueError) as e: - msg = f"bad or missing Length in stream dict ({stream_len_str})" - raise PdfFormatError(msg) from e + stream_len = result.get(b"Length") + if stream_len is None or not isinstance(stream_len, int): + msg = f"bad or missing Length in stream dict ({stream_len})" + raise PdfFormatError(msg) stream_data = data[m.end() : m.end() + stream_len] m = cls.re_stream_end.match(data, m.end() + stream_len) - check_format_condition(m, "stream end not found") - offset = m.end() - result = PdfStream(PdfDict(result), stream_data) - else: - result = PdfDict(result) - return result, offset + check_format_condition(m is not None, "stream end not found") + assert m is not None + current_offset = m.end() + return PdfStream(PdfDict(result), stream_data), current_offset + return PdfDict(result), current_offset m = cls.re_array_start.match(data, offset) if m: offset = m.end() - result = [] + results = [] m = cls.re_array_end.match(data, offset) + current_offset = offset while not m: - value, offset = cls.get_value(data, offset, max_nesting=max_nesting - 1) - result.append(value) - if offset is None: - return result, None - m = cls.re_array_end.match(data, offset) - return result, m.end() + assert current_offset is not None + value, current_offset = cls.get_value( + data, current_offset, max_nesting=max_nesting - 1 + ) + results.append(value) + if current_offset is None: + return results, None + m = cls.re_array_end.match(data, current_offset) + return results, m.end() m = cls.re_null.match(data, offset) if m: return None, m.end() @@ -905,7 +968,9 @@ class PdfParser: } @classmethod - def get_literal_string(cls, data, offset): + def get_literal_string( + cls, data: bytes | bytearray | mmap.mmap, offset: int + ) -> tuple[bytes, int]: nesting_depth = 0 result = bytearray() for m in cls.re_lit_str_token.finditer(data, offset): @@ -941,12 +1006,14 @@ class PdfParser: ) re_xref_entry = re.compile(rb"([0-9]{10}) ([0-9]{5}) ([fn])( \r| \n|\r\n)") - def read_xref_table(self, xref_section_offset): + def read_xref_table(self, xref_section_offset: int) -> int: + assert self.buf is not None subsection_found = False m = self.re_xref_section_start.match( self.buf, xref_section_offset + self.start_offset ) - check_format_condition(m, "xref section start not found") + check_format_condition(m is not None, "xref section start not found") + assert m is not None offset = m.end() while True: m = self.re_xref_subsection_start.match(self.buf, offset) @@ -961,7 +1028,8 @@ class PdfParser: num_objects = int(m.group(2)) for i in range(first_object, first_object + num_objects): m = self.re_xref_entry.match(self.buf, offset) - check_format_condition(m, "xref entry not found") + check_format_condition(m is not None, "xref entry not found") + assert m is not None offset = m.end() is_free = m.group(3) == b"f" if not is_free: @@ -971,13 +1039,14 @@ class PdfParser: self.xref_table[i] = new_entry return offset - def read_indirect(self, ref, max_nesting=-1): + def read_indirect(self, ref: IndirectReference, max_nesting: int = -1) -> Any: offset, generation = self.xref_table[ref[0]] check_format_condition( generation == ref[1], f"expected to find generation {ref[1]} for object ID {ref[0]} in xref " f"table, instead found generation {generation} at offset {offset}", ) + assert self.buf is not None value = self.get_value( self.buf, offset + self.start_offset, @@ -987,14 +1056,15 @@ class PdfParser: self.cached_objects[ref] = value return value - def linearize_page_tree(self, node=None): - if node is None: - node = self.page_tree_root + def linearize_page_tree( + self, node: PdfDict | None = None + ) -> list[IndirectReference]: + page_node = node if node is not None else self.page_tree_root check_format_condition( - node[b"Type"] == b"Pages", "/Type of page tree node is not /Pages" + page_node[b"Type"] == b"Pages", "/Type of page tree node is not /Pages" ) pages = [] - for kid in node[b"Kids"]: + for kid in page_node[b"Kids"]: kid_object = self.read_indirect(kid) if kid_object[b"Type"] == b"Page": pages.append(kid)