Merge pull request #8207 from radarhere/type_hints_pdfparser

Added type hints to PdfParser
This commit is contained in:
Hugo van Kemenade 2024-07-06 05:54:16 -06:00 committed by GitHub
commit ae25d23c34
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -8,7 +8,7 @@ import os
import re import re
import time import time
import zlib import zlib
from typing import TYPE_CHECKING, Any, NamedTuple, Union from typing import IO, TYPE_CHECKING, Any, NamedTuple, Union
# see 7.9.2.2 Text String Type on page 86 and D.3 PDFDocEncoding Character Set # see 7.9.2.2 Text String Type on page 86 and D.3 PDFDocEncoding Character Set
@ -62,7 +62,7 @@ PDFDocEncoding = {
} }
def decode_text(b): def decode_text(b: bytes) -> str:
if b[: len(codecs.BOM_UTF16_BE)] == codecs.BOM_UTF16_BE: if b[: len(codecs.BOM_UTF16_BE)] == codecs.BOM_UTF16_BE:
return b[len(codecs.BOM_UTF16_BE) :].decode("utf_16_be") return b[len(codecs.BOM_UTF16_BE) :].decode("utf_16_be")
else: else:
@ -99,7 +99,7 @@ class IndirectReference(IndirectReferenceTuple):
assert isinstance(other, IndirectReference) assert isinstance(other, IndirectReference)
return other.object_id == self.object_id and other.generation == self.generation return other.object_id == self.object_id and other.generation == self.generation
def __ne__(self, other): def __ne__(self, other: object) -> bool:
return not (self == other) return not (self == other)
def __hash__(self) -> int: def __hash__(self) -> int:
@ -112,13 +112,17 @@ class IndirectObjectDef(IndirectReference):
class XrefTable: class XrefTable:
def __init__(self): def __init__(self) -> None:
self.existing_entries = {} # object ID => (offset, generation) self.existing_entries: dict[int, tuple[int, int]] = (
self.new_entries = {} # object ID => (offset, generation) {}
) # object ID => (offset, generation)
self.new_entries: dict[int, tuple[int, int]] = (
{}
) # object ID => (offset, generation)
self.deleted_entries = {0: 65536} # object ID => generation self.deleted_entries = {0: 65536} # object ID => generation
self.reading_finished = False self.reading_finished = False
def __setitem__(self, key, value): def __setitem__(self, key: int, value: tuple[int, int]) -> None:
if self.reading_finished: if self.reading_finished:
self.new_entries[key] = value self.new_entries[key] = value
else: else:
@ -126,13 +130,13 @@ class XrefTable:
if key in self.deleted_entries: if key in self.deleted_entries:
del self.deleted_entries[key] del self.deleted_entries[key]
def __getitem__(self, key): def __getitem__(self, key: int) -> tuple[int, int]:
try: try:
return self.new_entries[key] return self.new_entries[key]
except KeyError: except KeyError:
return self.existing_entries[key] return self.existing_entries[key]
def __delitem__(self, key): def __delitem__(self, key: int) -> None:
if key in self.new_entries: if key in self.new_entries:
generation = self.new_entries[key][1] + 1 generation = self.new_entries[key][1] + 1
del self.new_entries[key] del self.new_entries[key]
@ -146,7 +150,7 @@ class XrefTable:
msg = f"object ID {key} cannot be deleted because it doesn't exist" msg = f"object ID {key} cannot be deleted because it doesn't exist"
raise IndexError(msg) raise IndexError(msg)
def __contains__(self, key): def __contains__(self, key: int) -> bool:
return key in self.existing_entries or key in self.new_entries return key in self.existing_entries or key in self.new_entries
def __len__(self) -> int: def __len__(self) -> int:
@ -156,19 +160,19 @@ class XrefTable:
| set(self.deleted_entries.keys()) | set(self.deleted_entries.keys())
) )
def keys(self): def keys(self) -> set[int]:
return ( return (
set(self.existing_entries.keys()) - set(self.deleted_entries.keys()) set(self.existing_entries.keys()) - set(self.deleted_entries.keys())
) | set(self.new_entries.keys()) ) | set(self.new_entries.keys())
def write(self, f): def write(self, f: IO[bytes]) -> int:
keys = sorted(set(self.new_entries.keys()) | set(self.deleted_entries.keys())) keys = sorted(set(self.new_entries.keys()) | set(self.deleted_entries.keys()))
deleted_keys = sorted(set(self.deleted_entries.keys())) deleted_keys = sorted(set(self.deleted_entries.keys()))
startxref = f.tell() startxref = f.tell()
f.write(b"xref\n") f.write(b"xref\n")
while keys: while keys:
# find a contiguous sequence of object IDs # find a contiguous sequence of object IDs
prev = None prev: int | None = None
for index, key in enumerate(keys): for index, key in enumerate(keys):
if prev is None or prev + 1 == key: if prev is None or prev + 1 == key:
prev = key prev = key
@ -178,7 +182,7 @@ class XrefTable:
break break
else: else:
contiguous_keys = keys contiguous_keys = keys
keys = None keys = []
f.write(b"%d %d\n" % (contiguous_keys[0], len(contiguous_keys))) f.write(b"%d %d\n" % (contiguous_keys[0], len(contiguous_keys)))
for object_id in contiguous_keys: for object_id in contiguous_keys:
if object_id in self.new_entries: if object_id in self.new_entries:
@ -202,7 +206,9 @@ class XrefTable:
class PdfName: class PdfName:
def __init__(self, name): name: bytes
def __init__(self, name: PdfName | bytes | str) -> None:
if isinstance(name, PdfName): if isinstance(name, PdfName):
self.name = name.name self.name = name.name
elif isinstance(name, bytes): elif isinstance(name, bytes):
@ -213,7 +219,7 @@ class PdfName:
def name_as_str(self) -> str: def name_as_str(self) -> str:
return self.name.decode("us-ascii") return self.name.decode("us-ascii")
def __eq__(self, other): def __eq__(self, other: object) -> bool:
return ( return (
isinstance(other, PdfName) and other.name == self.name isinstance(other, PdfName) and other.name == self.name
) or other == self.name ) or other == self.name
@ -225,7 +231,7 @@ class PdfName:
return f"{self.__class__.__name__}({repr(self.name)})" return f"{self.__class__.__name__}({repr(self.name)})"
@classmethod @classmethod
def from_pdf_stream(cls, data): def from_pdf_stream(cls, data: bytes) -> PdfName:
return cls(PdfParser.interpret_name(data)) return cls(PdfParser.interpret_name(data))
allowed_chars = set(range(33, 127)) - {ord(c) for c in "#%/()<>[]{}"} allowed_chars = set(range(33, 127)) - {ord(c) for c in "#%/()<>[]{}"}
@ -252,13 +258,13 @@ else:
class PdfDict(_DictBase): class PdfDict(_DictBase):
def __setattr__(self, key, value): def __setattr__(self, key: str, value: Any) -> None:
if key == "data": if key == "data":
collections.UserDict.__setattr__(self, key, value) collections.UserDict.__setattr__(self, key, value)
else: else:
self[key.encode("us-ascii")] = value self[key.encode("us-ascii")] = value
def __getattr__(self, key): def __getattr__(self, key: str) -> str | time.struct_time:
try: try:
value = self[key.encode("us-ascii")] value = self[key.encode("us-ascii")]
except KeyError as e: except KeyError as e:
@ -300,7 +306,7 @@ class PdfDict(_DictBase):
class PdfBinary: class PdfBinary:
def __init__(self, data): def __init__(self, data: list[int] | bytes) -> None:
self.data = data self.data = data
def __bytes__(self) -> bytes: def __bytes__(self) -> bytes:
@ -308,27 +314,27 @@ class PdfBinary:
class PdfStream: class PdfStream:
def __init__(self, dictionary, buf): def __init__(self, dictionary: PdfDict, buf: bytes) -> None:
self.dictionary = dictionary self.dictionary = dictionary
self.buf = buf self.buf = buf
def decode(self): def decode(self) -> bytes:
try: try:
filter = self.dictionary.Filter filter = self.dictionary[b"Filter"]
except AttributeError: except KeyError:
return self.buf return self.buf
if filter == b"FlateDecode": if filter == b"FlateDecode":
try: try:
expected_length = self.dictionary.DL expected_length = self.dictionary[b"DL"]
except AttributeError: except KeyError:
expected_length = self.dictionary.Length expected_length = self.dictionary[b"Length"]
return zlib.decompress(self.buf, bufsize=int(expected_length)) return zlib.decompress(self.buf, bufsize=int(expected_length))
else: else:
msg = f"stream filter {repr(self.dictionary.Filter)} unknown/unsupported" msg = f"stream filter {repr(filter)} unknown/unsupported"
raise NotImplementedError(msg) raise NotImplementedError(msg)
def pdf_repr(x): def pdf_repr(x: Any) -> bytes:
if x is True: if x is True:
return b"true" return b"true"
elif x is False: elif x is False:
@ -363,12 +369,19 @@ class PdfParser:
Supports PDF up to 1.4 Supports PDF up to 1.4
""" """
def __init__(self, filename=None, f=None, buf=None, start_offset=0, mode="rb"): def __init__(
self,
filename: str | None = None,
f: IO[bytes] | None = None,
buf: bytes | bytearray | None = None,
start_offset: int = 0,
mode: str = "rb",
) -> None:
if buf and f: if buf and f:
msg = "specify buf or f or filename, but not both buf and f" msg = "specify buf or f or filename, but not both buf and f"
raise RuntimeError(msg) raise RuntimeError(msg)
self.filename = filename self.filename = filename
self.buf = buf self.buf: bytes | bytearray | mmap.mmap | None = buf
self.f = f self.f = f
self.start_offset = start_offset self.start_offset = start_offset
self.should_close_buf = False self.should_close_buf = False
@ -377,12 +390,16 @@ class PdfParser:
self.f = f = open(filename, mode) self.f = f = open(filename, mode)
self.should_close_file = True self.should_close_file = True
if f is not None: if f is not None:
self.buf = buf = self.get_buf_from_file(f) self.buf = self.get_buf_from_file(f)
self.should_close_buf = True self.should_close_buf = True
if not filename and hasattr(f, "name"): if not filename and hasattr(f, "name"):
self.filename = f.name self.filename = f.name
self.cached_objects = {} self.cached_objects: dict[IndirectReference, Any] = {}
if buf: self.root_ref: IndirectReference | None
self.info_ref: IndirectReference | None
self.pages_ref: IndirectReference | None
self.last_xref_section_offset: int | None
if self.buf:
self.read_pdf_info() self.read_pdf_info()
else: else:
self.file_size_total = self.file_size_this = 0 self.file_size_total = self.file_size_this = 0
@ -390,12 +407,12 @@ class PdfParser:
self.root_ref = None self.root_ref = None
self.info = PdfDict() self.info = PdfDict()
self.info_ref = None self.info_ref = None
self.page_tree_root = {} self.page_tree_root = PdfDict()
self.pages = [] self.pages: list[IndirectReference] = []
self.orig_pages = [] self.orig_pages: list[IndirectReference] = []
self.pages_ref = None self.pages_ref = None
self.last_xref_section_offset = None self.last_xref_section_offset = None
self.trailer_dict = {} self.trailer_dict: dict[bytes, Any] = {}
self.xref_table = XrefTable() self.xref_table = XrefTable()
self.xref_table.reading_finished = True self.xref_table.reading_finished = True
if f: if f:
@ -412,10 +429,8 @@ class PdfParser:
self.seek_end() self.seek_end()
def close_buf(self) -> None: def close_buf(self) -> None:
try: if isinstance(self.buf, mmap.mmap):
self.buf.close() self.buf.close()
except AttributeError:
pass
self.buf = None self.buf = None
def close(self) -> None: def close(self) -> None:
@ -426,15 +441,19 @@ class PdfParser:
self.f = None self.f = None
def seek_end(self) -> None: def seek_end(self) -> None:
assert self.f is not None
self.f.seek(0, os.SEEK_END) self.f.seek(0, os.SEEK_END)
def write_header(self) -> None: def write_header(self) -> None:
assert self.f is not None
self.f.write(b"%PDF-1.4\n") self.f.write(b"%PDF-1.4\n")
def write_comment(self, s): def write_comment(self, s: str) -> None:
assert self.f is not None
self.f.write(f"% {s}\n".encode()) self.f.write(f"% {s}\n".encode())
def write_catalog(self) -> IndirectReference: def write_catalog(self) -> IndirectReference:
assert self.f is not None
self.del_root() self.del_root()
self.root_ref = self.next_object_id(self.f.tell()) self.root_ref = self.next_object_id(self.f.tell())
self.pages_ref = self.next_object_id(0) self.pages_ref = self.next_object_id(0)
@ -477,7 +496,10 @@ class PdfParser:
pages_tree_node_ref = pages_tree_node.get(b"Parent", None) pages_tree_node_ref = pages_tree_node.get(b"Parent", None)
self.orig_pages = [] self.orig_pages = []
def write_xref_and_trailer(self, new_root_ref=None): def write_xref_and_trailer(
self, new_root_ref: IndirectReference | None = None
) -> None:
assert self.f is not None
if new_root_ref: if new_root_ref:
self.del_root() self.del_root()
self.root_ref = new_root_ref self.root_ref = new_root_ref
@ -485,7 +507,10 @@ class PdfParser:
self.info_ref = self.write_obj(None, self.info) self.info_ref = self.write_obj(None, self.info)
start_xref = self.xref_table.write(self.f) start_xref = self.xref_table.write(self.f)
num_entries = len(self.xref_table) num_entries = len(self.xref_table)
trailer_dict = {b"Root": self.root_ref, b"Size": num_entries} trailer_dict: dict[str | bytes, Any] = {
b"Root": self.root_ref,
b"Size": num_entries,
}
if self.last_xref_section_offset is not None: if self.last_xref_section_offset is not None:
trailer_dict[b"Prev"] = self.last_xref_section_offset trailer_dict[b"Prev"] = self.last_xref_section_offset
if self.info: if self.info:
@ -497,16 +522,20 @@ class PdfParser:
+ b"\nstartxref\n%d\n%%%%EOF" % start_xref + b"\nstartxref\n%d\n%%%%EOF" % start_xref
) )
def write_page(self, ref, *objs, **dict_obj): def write_page(
if isinstance(ref, int): self, ref: int | IndirectReference | None, *objs: Any, **dict_obj: Any
ref = self.pages[ref] ) -> IndirectReference:
obj_ref = self.pages[ref] if isinstance(ref, int) else ref
if "Type" not in dict_obj: if "Type" not in dict_obj:
dict_obj["Type"] = PdfName(b"Page") dict_obj["Type"] = PdfName(b"Page")
if "Parent" not in dict_obj: if "Parent" not in dict_obj:
dict_obj["Parent"] = self.pages_ref dict_obj["Parent"] = self.pages_ref
return self.write_obj(ref, *objs, **dict_obj) return self.write_obj(obj_ref, *objs, **dict_obj)
def write_obj(self, ref, *objs, **dict_obj): def write_obj(
self, ref: IndirectReference | None, *objs: Any, **dict_obj: Any
) -> IndirectReference:
assert self.f is not None
f = self.f f = self.f
if ref is None: if ref is None:
ref = self.next_object_id(f.tell()) ref = self.next_object_id(f.tell())
@ -534,7 +563,7 @@ class PdfParser:
del self.xref_table[self.root[b"Pages"].object_id] del self.xref_table[self.root[b"Pages"].object_id]
@staticmethod @staticmethod
def get_buf_from_file(f): def get_buf_from_file(f: IO[bytes]) -> bytes | mmap.mmap:
if hasattr(f, "getbuffer"): if hasattr(f, "getbuffer"):
return f.getbuffer() return f.getbuffer()
elif hasattr(f, "getvalue"): elif hasattr(f, "getvalue"):
@ -546,10 +575,15 @@ class PdfParser:
return b"" return b""
def read_pdf_info(self) -> None: def read_pdf_info(self) -> None:
assert self.buf is not None
self.file_size_total = len(self.buf) self.file_size_total = len(self.buf)
self.file_size_this = self.file_size_total - self.start_offset self.file_size_this = self.file_size_total - self.start_offset
self.read_trailer() self.read_trailer()
check_format_condition(
self.trailer_dict.get(b"Root") is not None, "Root is missing"
)
self.root_ref = self.trailer_dict[b"Root"] self.root_ref = self.trailer_dict[b"Root"]
assert self.root_ref is not None
self.info_ref = self.trailer_dict.get(b"Info", None) self.info_ref = self.trailer_dict.get(b"Info", None)
self.root = PdfDict(self.read_indirect(self.root_ref)) self.root = PdfDict(self.read_indirect(self.root_ref))
if self.info_ref is None: if self.info_ref is None:
@ -560,12 +594,15 @@ class PdfParser:
check_format_condition( check_format_condition(
self.root[b"Type"] == b"Catalog", "/Type in Root is not /Catalog" self.root[b"Type"] == b"Catalog", "/Type in Root is not /Catalog"
) )
check_format_condition(b"Pages" in self.root, "/Pages missing in Root") check_format_condition(
self.root.get(b"Pages") is not None, "/Pages missing in Root"
)
check_format_condition( check_format_condition(
isinstance(self.root[b"Pages"], IndirectReference), isinstance(self.root[b"Pages"], IndirectReference),
"/Pages in Root is not an indirect reference", "/Pages in Root is not an indirect reference",
) )
self.pages_ref = self.root[b"Pages"] self.pages_ref = self.root[b"Pages"]
assert self.pages_ref is not None
self.page_tree_root = self.read_indirect(self.pages_ref) self.page_tree_root = self.read_indirect(self.pages_ref)
self.pages = self.linearize_page_tree(self.page_tree_root) self.pages = self.linearize_page_tree(self.page_tree_root)
# save the original list of page references # save the original list of page references
@ -573,7 +610,7 @@ class PdfParser:
# and we need to rewrite the pages and their list # and we need to rewrite the pages and their list
self.orig_pages = self.pages[:] self.orig_pages = self.pages[:]
def next_object_id(self, offset=None): def next_object_id(self, offset: int | None = None) -> IndirectReference:
try: try:
# TODO: support reuse of deleted objects # TODO: support reuse of deleted objects
reference = IndirectReference(max(self.xref_table.keys()) + 1, 0) reference = IndirectReference(max(self.xref_table.keys()) + 1, 0)
@ -623,12 +660,13 @@ class PdfParser:
re.DOTALL, re.DOTALL,
) )
def read_trailer(self): def read_trailer(self) -> None:
assert self.buf is not None
search_start_offset = len(self.buf) - 16384 search_start_offset = len(self.buf) - 16384
if search_start_offset < self.start_offset: if search_start_offset < self.start_offset:
search_start_offset = self.start_offset search_start_offset = self.start_offset
m = self.re_trailer_end.search(self.buf, search_start_offset) m = self.re_trailer_end.search(self.buf, search_start_offset)
check_format_condition(m, "trailer end not found") check_format_condition(m is not None, "trailer end not found")
# make sure we found the LAST trailer # make sure we found the LAST trailer
last_match = m last_match = m
while m: while m:
@ -636,6 +674,7 @@ class PdfParser:
m = self.re_trailer_end.search(self.buf, m.start() + 16) m = self.re_trailer_end.search(self.buf, m.start() + 16)
if not m: if not m:
m = last_match m = last_match
assert m is not None
trailer_data = m.group(1) trailer_data = m.group(1)
self.last_xref_section_offset = int(m.group(2)) self.last_xref_section_offset = int(m.group(2))
self.trailer_dict = self.interpret_trailer(trailer_data) self.trailer_dict = self.interpret_trailer(trailer_data)
@ -644,12 +683,14 @@ class PdfParser:
if b"Prev" in self.trailer_dict: if b"Prev" in self.trailer_dict:
self.read_prev_trailer(self.trailer_dict[b"Prev"]) self.read_prev_trailer(self.trailer_dict[b"Prev"])
def read_prev_trailer(self, xref_section_offset): def read_prev_trailer(self, xref_section_offset: int) -> None:
assert self.buf is not None
trailer_offset = self.read_xref_table(xref_section_offset=xref_section_offset) trailer_offset = self.read_xref_table(xref_section_offset=xref_section_offset)
m = self.re_trailer_prev.search( m = self.re_trailer_prev.search(
self.buf[trailer_offset : trailer_offset + 16384] self.buf[trailer_offset : trailer_offset + 16384]
) )
check_format_condition(m, "previous trailer not found") check_format_condition(m is not None, "previous trailer not found")
assert m is not None
trailer_data = m.group(1) trailer_data = m.group(1)
check_format_condition( check_format_condition(
int(m.group(2)) == xref_section_offset, int(m.group(2)) == xref_section_offset,
@ -670,7 +711,7 @@ class PdfParser:
re_dict_end = re.compile(whitespace_optional + rb">>" + whitespace_optional) re_dict_end = re.compile(whitespace_optional + rb">>" + whitespace_optional)
@classmethod @classmethod
def interpret_trailer(cls, trailer_data): def interpret_trailer(cls, trailer_data: bytes) -> dict[bytes, Any]:
trailer = {} trailer = {}
offset = 0 offset = 0
while True: while True:
@ -678,14 +719,18 @@ class PdfParser:
if not m: if not m:
m = cls.re_dict_end.match(trailer_data, offset) m = cls.re_dict_end.match(trailer_data, offset)
check_format_condition( check_format_condition(
m and m.end() == len(trailer_data), m is not None and m.end() == len(trailer_data),
"name not found in trailer, remaining data: " "name not found in trailer, remaining data: "
+ repr(trailer_data[offset:]), + repr(trailer_data[offset:]),
) )
break break
key = cls.interpret_name(m.group(1)) key = cls.interpret_name(m.group(1))
value, offset = cls.get_value(trailer_data, m.end()) assert isinstance(key, bytes)
value, value_offset = cls.get_value(trailer_data, m.end())
trailer[key] = value trailer[key] = value
if value_offset is None:
break
offset = value_offset
check_format_condition( check_format_condition(
b"Size" in trailer and isinstance(trailer[b"Size"], int), b"Size" in trailer and isinstance(trailer[b"Size"], int),
"/Size not in trailer or not an integer", "/Size not in trailer or not an integer",
@ -699,7 +744,7 @@ class PdfParser:
re_hashes_in_name = re.compile(rb"([^#]*)(#([0-9a-fA-F]{2}))?") re_hashes_in_name = re.compile(rb"([^#]*)(#([0-9a-fA-F]{2}))?")
@classmethod @classmethod
def interpret_name(cls, raw, as_text=False): def interpret_name(cls, raw: bytes, as_text: bool = False) -> str | bytes:
name = b"" name = b""
for m in cls.re_hashes_in_name.finditer(raw): for m in cls.re_hashes_in_name.finditer(raw):
if m.group(3): if m.group(3):
@ -761,7 +806,13 @@ class PdfParser:
) )
@classmethod @classmethod
def get_value(cls, data, offset, expect_indirect=None, max_nesting=-1): def get_value(
cls,
data: bytes | bytearray | mmap.mmap,
offset: int,
expect_indirect: IndirectReference | None = None,
max_nesting: int = -1,
) -> tuple[Any, int | None]:
if max_nesting == 0: if max_nesting == 0:
return None, None return None, None
m = cls.re_comment.match(data, offset) m = cls.re_comment.match(data, offset)
@ -783,11 +834,16 @@ class PdfParser:
== IndirectReference(int(m.group(1)), int(m.group(2))), == IndirectReference(int(m.group(1)), int(m.group(2))),
"indirect object definition different than expected", "indirect object definition different than expected",
) )
object, offset = cls.get_value(data, m.end(), max_nesting=max_nesting - 1) object, object_offset = cls.get_value(
if offset is None: data, m.end(), max_nesting=max_nesting - 1
)
if object_offset is None:
return object, None return object, None
m = cls.re_indirect_def_end.match(data, offset) m = cls.re_indirect_def_end.match(data, object_offset)
check_format_condition(m, "indirect object definition end not found") check_format_condition(
m is not None, "indirect object definition end not found"
)
assert m is not None
return object, m.end() return object, m.end()
check_format_condition( check_format_condition(
not expect_indirect, "indirect object definition not found" not expect_indirect, "indirect object definition not found"
@ -806,46 +862,53 @@ class PdfParser:
m = cls.re_dict_start.match(data, offset) m = cls.re_dict_start.match(data, offset)
if m: if m:
offset = m.end() offset = m.end()
result = {} result: dict[Any, Any] = {}
m = cls.re_dict_end.match(data, offset) m = cls.re_dict_end.match(data, offset)
current_offset: int | None = offset
while not m: while not m:
key, offset = cls.get_value(data, offset, max_nesting=max_nesting - 1) assert current_offset is not None
if offset is None: key, current_offset = cls.get_value(
data, current_offset, max_nesting=max_nesting - 1
)
if current_offset is None:
return result, None return result, None
value, offset = cls.get_value(data, offset, max_nesting=max_nesting - 1) value, current_offset = cls.get_value(
data, current_offset, max_nesting=max_nesting - 1
)
result[key] = value result[key] = value
if offset is None: if current_offset is None:
return result, None return result, None
m = cls.re_dict_end.match(data, offset) m = cls.re_dict_end.match(data, current_offset)
offset = m.end() current_offset = m.end()
m = cls.re_stream_start.match(data, offset) m = cls.re_stream_start.match(data, current_offset)
if m: if m:
try: stream_len = result.get(b"Length")
stream_len_str = result.get(b"Length") if stream_len is None or not isinstance(stream_len, int):
stream_len = int(stream_len_str) msg = f"bad or missing Length in stream dict ({stream_len})"
except (TypeError, ValueError) as e: raise PdfFormatError(msg)
msg = f"bad or missing Length in stream dict ({stream_len_str})"
raise PdfFormatError(msg) from e
stream_data = data[m.end() : m.end() + stream_len] stream_data = data[m.end() : m.end() + stream_len]
m = cls.re_stream_end.match(data, m.end() + stream_len) m = cls.re_stream_end.match(data, m.end() + stream_len)
check_format_condition(m, "stream end not found") check_format_condition(m is not None, "stream end not found")
offset = m.end() assert m is not None
result = PdfStream(PdfDict(result), stream_data) current_offset = m.end()
else: return PdfStream(PdfDict(result), stream_data), current_offset
result = PdfDict(result) return PdfDict(result), current_offset
return result, offset
m = cls.re_array_start.match(data, offset) m = cls.re_array_start.match(data, offset)
if m: if m:
offset = m.end() offset = m.end()
result = [] results = []
m = cls.re_array_end.match(data, offset) m = cls.re_array_end.match(data, offset)
current_offset = offset
while not m: while not m:
value, offset = cls.get_value(data, offset, max_nesting=max_nesting - 1) assert current_offset is not None
result.append(value) value, current_offset = cls.get_value(
if offset is None: data, current_offset, max_nesting=max_nesting - 1
return result, None )
m = cls.re_array_end.match(data, offset) results.append(value)
return result, m.end() if current_offset is None:
return results, None
m = cls.re_array_end.match(data, current_offset)
return results, m.end()
m = cls.re_null.match(data, offset) m = cls.re_null.match(data, offset)
if m: if m:
return None, m.end() return None, m.end()
@ -905,7 +968,9 @@ class PdfParser:
} }
@classmethod @classmethod
def get_literal_string(cls, data, offset): def get_literal_string(
cls, data: bytes | bytearray | mmap.mmap, offset: int
) -> tuple[bytes, int]:
nesting_depth = 0 nesting_depth = 0
result = bytearray() result = bytearray()
for m in cls.re_lit_str_token.finditer(data, offset): for m in cls.re_lit_str_token.finditer(data, offset):
@ -941,12 +1006,14 @@ class PdfParser:
) )
re_xref_entry = re.compile(rb"([0-9]{10}) ([0-9]{5}) ([fn])( \r| \n|\r\n)") re_xref_entry = re.compile(rb"([0-9]{10}) ([0-9]{5}) ([fn])( \r| \n|\r\n)")
def read_xref_table(self, xref_section_offset): def read_xref_table(self, xref_section_offset: int) -> int:
assert self.buf is not None
subsection_found = False subsection_found = False
m = self.re_xref_section_start.match( m = self.re_xref_section_start.match(
self.buf, xref_section_offset + self.start_offset self.buf, xref_section_offset + self.start_offset
) )
check_format_condition(m, "xref section start not found") check_format_condition(m is not None, "xref section start not found")
assert m is not None
offset = m.end() offset = m.end()
while True: while True:
m = self.re_xref_subsection_start.match(self.buf, offset) m = self.re_xref_subsection_start.match(self.buf, offset)
@ -961,7 +1028,8 @@ class PdfParser:
num_objects = int(m.group(2)) num_objects = int(m.group(2))
for i in range(first_object, first_object + num_objects): for i in range(first_object, first_object + num_objects):
m = self.re_xref_entry.match(self.buf, offset) m = self.re_xref_entry.match(self.buf, offset)
check_format_condition(m, "xref entry not found") check_format_condition(m is not None, "xref entry not found")
assert m is not None
offset = m.end() offset = m.end()
is_free = m.group(3) == b"f" is_free = m.group(3) == b"f"
if not is_free: if not is_free:
@ -971,13 +1039,14 @@ class PdfParser:
self.xref_table[i] = new_entry self.xref_table[i] = new_entry
return offset return offset
def read_indirect(self, ref, max_nesting=-1): def read_indirect(self, ref: IndirectReference, max_nesting: int = -1) -> Any:
offset, generation = self.xref_table[ref[0]] offset, generation = self.xref_table[ref[0]]
check_format_condition( check_format_condition(
generation == ref[1], generation == ref[1],
f"expected to find generation {ref[1]} for object ID {ref[0]} in xref " f"expected to find generation {ref[1]} for object ID {ref[0]} in xref "
f"table, instead found generation {generation} at offset {offset}", f"table, instead found generation {generation} at offset {offset}",
) )
assert self.buf is not None
value = self.get_value( value = self.get_value(
self.buf, self.buf,
offset + self.start_offset, offset + self.start_offset,
@ -987,14 +1056,15 @@ class PdfParser:
self.cached_objects[ref] = value self.cached_objects[ref] = value
return value return value
def linearize_page_tree(self, node=None): def linearize_page_tree(
if node is None: self, node: PdfDict | None = None
node = self.page_tree_root ) -> list[IndirectReference]:
page_node = node if node is not None else self.page_tree_root
check_format_condition( check_format_condition(
node[b"Type"] == b"Pages", "/Type of page tree node is not /Pages" page_node[b"Type"] == b"Pages", "/Type of page tree node is not /Pages"
) )
pages = [] pages = []
for kid in node[b"Kids"]: for kid in page_node[b"Kids"]:
kid_object = self.read_indirect(kid) kid_object = self.read_indirect(kid)
if kid_object[b"Type"] == b"Page": if kid_object[b"Type"] == b"Page":
pages.append(kid) pages.append(kid)