Pillow/src/PIL/PdfParser.py

Ignoring revisions in .git-blame-ignore-revs. Click here to bypass and see the normal blame view.

1074 lines
37 KiB
Python
Raw Normal View History

from __future__ import annotations
import calendar
import codecs
import collections
import mmap
import os
import re
import time
import zlib
2024-07-06 08:08:35 +03:00
from typing import IO, TYPE_CHECKING, Any, NamedTuple, Union
2018-06-24 15:32:25 +03:00
# see 7.9.2.2 Text String Type on page 86 and D.3 PDFDocEncoding Character Set
# on page 656
2024-05-18 09:06:50 +03:00
def encode_text(s: str) -> bytes:
return codecs.BOM_UTF16_BE + s.encode("utf_16_be")
PDFDocEncoding = {
0x16: "\u0017",
0x18: "\u02D8",
0x19: "\u02C7",
0x1A: "\u02C6",
0x1B: "\u02D9",
0x1C: "\u02DD",
0x1D: "\u02DB",
0x1E: "\u02DA",
0x1F: "\u02DC",
0x80: "\u2022",
0x81: "\u2020",
0x82: "\u2021",
0x83: "\u2026",
0x84: "\u2014",
0x85: "\u2013",
0x86: "\u0192",
0x87: "\u2044",
0x88: "\u2039",
0x89: "\u203A",
0x8A: "\u2212",
0x8B: "\u2030",
0x8C: "\u201E",
0x8D: "\u201C",
0x8E: "\u201D",
0x8F: "\u2018",
0x90: "\u2019",
0x91: "\u201A",
0x92: "\u2122",
0x93: "\uFB01",
0x94: "\uFB02",
0x95: "\u0141",
0x96: "\u0152",
0x97: "\u0160",
0x98: "\u0178",
0x99: "\u017D",
0x9A: "\u0131",
0x9B: "\u0142",
0x9C: "\u0153",
0x9D: "\u0161",
0x9E: "\u017E",
0xA0: "\u20AC",
}
2024-07-06 08:08:35 +03:00
def decode_text(b: bytes) -> str:
if b[: len(codecs.BOM_UTF16_BE)] == codecs.BOM_UTF16_BE:
return b[len(codecs.BOM_UTF16_BE) :].decode("utf_16_be")
2019-09-26 15:12:28 +03:00
else:
return "".join(PDFDocEncoding.get(byte, chr(byte)) for byte in b)
class PdfFormatError(RuntimeError):
2018-06-24 15:32:25 +03:00
"""An error that probably indicates a syntactic or semantic error in the
PDF file structure"""
2019-03-21 16:28:20 +03:00
pass
2024-06-10 07:15:28 +03:00
def check_format_condition(condition: bool, error_message: str) -> None:
if not condition:
raise PdfFormatError(error_message)
class IndirectReferenceTuple(NamedTuple):
object_id: int
generation: int
class IndirectReference(IndirectReferenceTuple):
2024-05-13 11:47:51 +03:00
def __str__(self) -> str:
return f"{self.object_id} {self.generation} R"
2024-05-13 11:47:51 +03:00
def __bytes__(self) -> bytes:
return self.__str__().encode("us-ascii")
2024-06-10 07:15:28 +03:00
def __eq__(self, other: object) -> bool:
if self.__class__ is not other.__class__:
return False
assert isinstance(other, IndirectReference)
return other.object_id == self.object_id and other.generation == self.generation
2024-07-06 08:08:35 +03:00
def __ne__(self, other: object) -> bool:
return not (self == other)
2024-05-18 09:06:50 +03:00
def __hash__(self) -> int:
return hash((self.object_id, self.generation))
class IndirectObjectDef(IndirectReference):
2024-05-13 11:47:51 +03:00
def __str__(self) -> str:
return f"{self.object_id} {self.generation} obj"
class XrefTable:
2024-07-06 08:08:35 +03:00
def __init__(self) -> None:
self.existing_entries: dict[int, tuple[int, int]] = (
{}
) # object ID => (offset, generation)
self.new_entries: dict[int, tuple[int, int]] = (
{}
) # object ID => (offset, generation)
self.deleted_entries = {0: 65536} # object ID => generation
self.reading_finished = False
2024-07-06 08:08:35 +03:00
def __setitem__(self, key: int, value: tuple[int, int]) -> None:
if self.reading_finished:
self.new_entries[key] = value
else:
self.existing_entries[key] = value
if key in self.deleted_entries:
del self.deleted_entries[key]
2024-07-06 08:08:35 +03:00
def __getitem__(self, key: int) -> tuple[int, int]:
try:
return self.new_entries[key]
except KeyError:
return self.existing_entries[key]
2024-07-06 08:08:35 +03:00
def __delitem__(self, key: int) -> None:
if key in self.new_entries:
generation = self.new_entries[key][1] + 1
del self.new_entries[key]
self.deleted_entries[key] = generation
elif key in self.existing_entries:
generation = self.existing_entries[key][1] + 1
self.deleted_entries[key] = generation
elif key in self.deleted_entries:
generation = self.deleted_entries[key]
else:
msg = f"object ID {key} cannot be deleted because it doesn't exist"
raise IndexError(msg)
2024-07-06 08:08:35 +03:00
def __contains__(self, key: int) -> bool:
return key in self.existing_entries or key in self.new_entries
2024-05-13 11:47:51 +03:00
def __len__(self) -> int:
2018-06-24 15:32:25 +03:00
return len(
set(self.existing_entries.keys())
| set(self.new_entries.keys())
| set(self.deleted_entries.keys())
2019-03-21 16:28:20 +03:00
)
2024-07-06 08:08:35 +03:00
def keys(self) -> set[int]:
2018-06-24 15:32:25 +03:00
return (
set(self.existing_entries.keys()) - set(self.deleted_entries.keys())
) | set(self.new_entries.keys())
2024-07-06 08:08:35 +03:00
def write(self, f: IO[bytes]) -> int:
2018-06-24 15:32:25 +03:00
keys = sorted(set(self.new_entries.keys()) | set(self.deleted_entries.keys()))
deleted_keys = sorted(set(self.deleted_entries.keys()))
startxref = f.tell()
f.write(b"xref\n")
while keys:
# find a contiguous sequence of object IDs
2024-07-06 08:08:35 +03:00
prev: int | None = None
for index, key in enumerate(keys):
if prev is None or prev + 1 == key:
prev = key
else:
contiguous_keys = keys[:index]
keys = keys[index:]
break
else:
contiguous_keys = keys
2024-07-06 08:08:35 +03:00
keys = []
f.write(b"%d %d\n" % (contiguous_keys[0], len(contiguous_keys)))
for object_id in contiguous_keys:
if object_id in self.new_entries:
f.write(b"%010d %05d n \n" % self.new_entries[object_id])
else:
this_deleted_object_id = deleted_keys.pop(0)
check_format_condition(
object_id == this_deleted_object_id,
f"expected the next deleted object ID to be {object_id}, "
f"instead found {this_deleted_object_id}",
)
try:
next_in_linked_list = deleted_keys[0]
except IndexError:
next_in_linked_list = 0
2018-06-24 15:32:25 +03:00
f.write(
b"%010d %05d f \n"
% (next_in_linked_list, self.deleted_entries[object_id])
2019-03-21 16:28:20 +03:00
)
return startxref
class PdfName:
2024-07-06 08:08:35 +03:00
name: bytes
def __init__(self, name: PdfName | bytes | str) -> None:
if isinstance(name, PdfName):
self.name = name.name
elif isinstance(name, bytes):
self.name = name
else:
self.name = name.encode("us-ascii")
2024-05-13 11:47:51 +03:00
def name_as_str(self) -> str:
return self.name.decode("us-ascii")
2024-07-06 08:08:35 +03:00
def __eq__(self, other: object) -> bool:
2018-06-24 15:32:25 +03:00
return (
isinstance(other, PdfName) and other.name == self.name
) or other == self.name
2024-05-18 09:06:50 +03:00
def __hash__(self) -> int:
return hash(self.name)
2024-05-13 11:47:51 +03:00
def __repr__(self) -> str:
2024-05-04 19:26:22 +03:00
return f"{self.__class__.__name__}({repr(self.name)})"
@classmethod
2024-07-06 08:08:35 +03:00
def from_pdf_stream(cls, data: bytes) -> PdfName:
return cls(PdfParser.interpret_name(data))
allowed_chars = set(range(33, 127)) - {ord(c) for c in "#%/()<>[]{}"}
2024-05-13 11:47:51 +03:00
def __bytes__(self) -> bytes:
2018-04-20 02:19:13 +03:00
result = bytearray(b"/")
for b in self.name:
2019-09-26 15:12:28 +03:00
if b in self.allowed_chars:
result.append(b)
else:
result.extend(b"#%02X" % b)
return bytes(result)
class PdfArray(list[Any]):
2024-05-13 11:47:51 +03:00
def __bytes__(self) -> bytes:
return b"[ " + b" ".join(pdf_repr(x) for x in self) + b" ]"
2024-02-10 11:50:45 +03:00
if TYPE_CHECKING:
_DictBase = collections.UserDict[Union[str, bytes], Any]
2024-02-10 11:50:45 +03:00
else:
_DictBase = collections.UserDict
class PdfDict(_DictBase):
2024-07-06 08:08:35 +03:00
def __setattr__(self, key: str, value: Any) -> None:
if key == "data":
2019-10-07 15:34:12 +03:00
collections.UserDict.__setattr__(self, key, value)
else:
self[key.encode("us-ascii")] = value
2024-07-06 08:08:35 +03:00
def __getattr__(self, key: str) -> str | time.struct_time:
try:
value = self[key.encode("us-ascii")]
2020-06-12 00:34:40 +03:00
except KeyError as e:
raise AttributeError(key) from e
if isinstance(value, bytes):
value = decode_text(value)
if key.endswith("Date"):
if value.startswith("D:"):
value = value[2:]
relationship = "Z"
if len(value) > 17:
relationship = value[14]
offset = int(value[15:17]) * 60
if len(value) > 20:
offset += int(value[18:20])
format = "%Y%m%d%H%M%S"[: len(value) - 2]
value = time.strptime(value[: len(format) + 2], format)
if relationship in ["+", "-"]:
offset *= 60
if relationship == "+":
offset *= -1
value = time.gmtime(calendar.timegm(value) + offset)
return value
2024-05-13 11:47:51 +03:00
def __bytes__(self) -> bytes:
out = bytearray(b"<<")
for key, value in self.items():
if value is None:
continue
value = pdf_repr(value)
out.extend(b"\n")
out.extend(bytes(PdfName(key)))
out.extend(b" ")
out.extend(value)
out.extend(b"\n>>")
return bytes(out)
class PdfBinary:
2024-07-06 08:08:35 +03:00
def __init__(self, data: list[int] | bytes) -> None:
self.data = data
2024-05-13 11:47:51 +03:00
def __bytes__(self) -> bytes:
return b"<%s>" % b"".join(b"%02X" % b for b in self.data)
class PdfStream:
2024-07-06 08:08:35 +03:00
def __init__(self, dictionary: PdfDict, buf: bytes) -> None:
self.dictionary = dictionary
self.buf = buf
2024-07-06 08:08:35 +03:00
def decode(self) -> bytes:
try:
2024-07-06 08:08:35 +03:00
filter = self.dictionary[b"Filter"]
except KeyError:
return self.buf
if filter == b"FlateDecode":
try:
2024-07-06 08:08:35 +03:00
expected_length = self.dictionary[b"DL"]
except KeyError:
expected_length = self.dictionary[b"Length"]
return zlib.decompress(self.buf, bufsize=int(expected_length))
else:
2024-07-06 08:08:35 +03:00
msg = f"stream filter {repr(filter)} unknown/unsupported"
raise NotImplementedError(msg)
2024-07-06 08:08:35 +03:00
def pdf_repr(x: Any) -> bytes:
if x is True:
return b"true"
elif x is False:
return b"false"
elif x is None:
return b"null"
elif isinstance(x, (PdfName, PdfDict, PdfArray, PdfBinary)):
return bytes(x)
elif isinstance(x, (int, float)):
return str(x).encode("us-ascii")
elif isinstance(x, time.struct_time):
return b"(D:" + time.strftime("%Y%m%d%H%M%SZ", x).encode("us-ascii") + b")"
elif isinstance(x, dict):
return bytes(PdfDict(x))
elif isinstance(x, list):
return bytes(PdfArray(x))
2019-09-26 15:12:28 +03:00
elif isinstance(x, str):
return pdf_repr(encode_text(x))
elif isinstance(x, bytes):
2018-06-24 15:32:25 +03:00
# XXX escape more chars? handle binary garbage
x = x.replace(b"\\", b"\\\\")
x = x.replace(b"(", b"\\(")
x = x.replace(b")", b"\\)")
return b"(" + x + b")"
else:
return bytes(x)
class PdfParser:
2018-10-24 22:24:03 +03:00
"""Based on
https://www.adobe.com/content/dam/acom/en/devnet/acrobat/pdfs/PDF32000_2008.pdf
Supports PDF up to 1.4
"""
2024-07-06 08:08:35 +03:00
def __init__(
self,
filename: str | None = None,
f: IO[bytes] | None = None,
buf: bytes | bytearray | None = None,
start_offset: int = 0,
mode: str = "rb",
) -> None:
if buf and f:
msg = "specify buf or f or filename, but not both buf and f"
raise RuntimeError(msg)
self.filename = filename
2024-07-06 08:08:35 +03:00
self.buf: bytes | bytearray | mmap.mmap | None = buf
self.f = f
self.start_offset = start_offset
self.should_close_buf = False
self.should_close_file = False
if filename is not None and f is None:
self.f = f = open(filename, mode)
self.should_close_file = True
if f is not None:
2024-07-06 08:08:35 +03:00
self.buf = self.get_buf_from_file(f)
self.should_close_buf = True
if not filename and hasattr(f, "name"):
self.filename = f.name
2024-07-06 08:08:35 +03:00
self.cached_objects: dict[IndirectReference, Any] = {}
self.root_ref: IndirectReference | None
self.info_ref: IndirectReference | None
self.pages_ref: IndirectReference | None
self.last_xref_section_offset: int | None
if self.buf:
self.read_pdf_info()
else:
self.file_size_total = self.file_size_this = 0
self.root = PdfDict()
self.root_ref = None
self.info = PdfDict()
self.info_ref = None
2024-07-06 08:08:35 +03:00
self.page_tree_root = PdfDict()
self.pages: list[IndirectReference] = []
self.orig_pages: list[IndirectReference] = []
self.pages_ref = None
self.last_xref_section_offset = None
2024-07-06 08:08:35 +03:00
self.trailer_dict: dict[bytes, Any] = {}
self.xref_table = XrefTable()
self.xref_table.reading_finished = True
if f:
self.seek_end()
2024-05-18 09:06:50 +03:00
def __enter__(self) -> PdfParser:
return self
2024-06-11 16:26:00 +03:00
def __exit__(self, *args: object) -> None:
self.close()
2024-05-11 03:48:09 +03:00
def start_writing(self) -> None:
self.close_buf()
self.seek_end()
2024-05-11 03:48:09 +03:00
def close_buf(self) -> None:
2024-07-06 08:08:35 +03:00
if isinstance(self.buf, mmap.mmap):
self.buf.close()
self.buf = None
2024-05-11 03:48:09 +03:00
def close(self) -> None:
if self.should_close_buf:
self.close_buf()
if self.f is not None and self.should_close_file:
self.f.close()
self.f = None
2024-05-11 03:48:09 +03:00
def seek_end(self) -> None:
2024-07-06 08:08:35 +03:00
assert self.f is not None
self.f.seek(0, os.SEEK_END)
2024-05-11 03:48:09 +03:00
def write_header(self) -> None:
2024-07-06 08:08:35 +03:00
assert self.f is not None
self.f.write(b"%PDF-1.4\n")
2024-07-06 08:08:35 +03:00
def write_comment(self, s: str) -> None:
assert self.f is not None
self.f.write(f"% {s}\n".encode())
2024-05-18 09:06:50 +03:00
def write_catalog(self) -> IndirectReference:
2024-07-06 08:08:35 +03:00
assert self.f is not None
self.del_root()
self.root_ref = self.next_object_id(self.f.tell())
self.pages_ref = self.next_object_id(0)
self.rewrite_pages()
self.write_obj(self.root_ref, Type=PdfName(b"Catalog"), Pages=self.pages_ref)
self.write_obj(
self.pages_ref,
Type=PdfName(b"Pages"),
Count=len(self.pages),
Kids=self.pages,
)
return self.root_ref
2024-05-11 03:48:09 +03:00
def rewrite_pages(self) -> None:
pages_tree_nodes_to_delete = []
for i, page_ref in enumerate(self.orig_pages):
page_info = self.cached_objects[page_ref]
del self.xref_table[page_ref.object_id]
pages_tree_nodes_to_delete.append(page_info[PdfName(b"Parent")])
if page_ref not in self.pages:
# the page has been deleted
continue
# make dict keys into strings for passing to write_page
stringified_page_info = {}
for key, value in page_info.items():
# key should be a PdfName
stringified_page_info[key.name_as_str()] = value
stringified_page_info["Parent"] = self.pages_ref
new_page_ref = self.write_page(None, **stringified_page_info)
for j, cur_page_ref in enumerate(self.pages):
if cur_page_ref == page_ref:
# replace the page reference with the new one
self.pages[j] = new_page_ref
# delete redundant Pages tree nodes from xref table
for pages_tree_node_ref in pages_tree_nodes_to_delete:
while pages_tree_node_ref:
pages_tree_node = self.cached_objects[pages_tree_node_ref]
if pages_tree_node_ref.object_id in self.xref_table:
del self.xref_table[pages_tree_node_ref.object_id]
pages_tree_node_ref = pages_tree_node.get(b"Parent", None)
self.orig_pages = []
2024-07-06 08:08:35 +03:00
def write_xref_and_trailer(
self, new_root_ref: IndirectReference | None = None
) -> None:
assert self.f is not None
if new_root_ref:
self.del_root()
self.root_ref = new_root_ref
if self.info:
self.info_ref = self.write_obj(None, self.info)
start_xref = self.xref_table.write(self.f)
num_entries = len(self.xref_table)
2024-07-06 08:08:35 +03:00
trailer_dict: dict[str | bytes, Any] = {
b"Root": self.root_ref,
b"Size": num_entries,
}
if self.last_xref_section_offset is not None:
trailer_dict[b"Prev"] = self.last_xref_section_offset
if self.info:
trailer_dict[b"Info"] = self.info_ref
self.last_xref_section_offset = start_xref
2018-06-24 15:32:25 +03:00
self.f.write(
b"trailer\n"
+ bytes(PdfDict(trailer_dict))
+ b"\nstartxref\n%d\n%%%%EOF" % start_xref
2019-03-21 16:28:20 +03:00
)
2024-07-06 08:08:35 +03:00
def write_page(
self, ref: int | IndirectReference | None, *objs: Any, **dict_obj: Any
) -> IndirectReference:
obj_ref = self.pages[ref] if isinstance(ref, int) else ref
if "Type" not in dict_obj:
dict_obj["Type"] = PdfName(b"Page")
if "Parent" not in dict_obj:
dict_obj["Parent"] = self.pages_ref
2024-07-06 08:08:35 +03:00
return self.write_obj(obj_ref, *objs, **dict_obj)
2024-07-06 08:08:35 +03:00
def write_obj(
self, ref: IndirectReference | None, *objs: Any, **dict_obj: Any
) -> IndirectReference:
assert self.f is not None
f = self.f
if ref is None:
ref = self.next_object_id(f.tell())
else:
self.xref_table[ref.object_id] = (f.tell(), ref.generation)
f.write(bytes(IndirectObjectDef(*ref)))
stream = dict_obj.pop("stream", None)
if stream is not None:
dict_obj["Length"] = len(stream)
if dict_obj:
f.write(pdf_repr(dict_obj))
for obj in objs:
f.write(pdf_repr(obj))
if stream is not None:
f.write(b"stream\n")
f.write(stream)
f.write(b"\nendstream\n")
f.write(b"endobj\n")
return ref
2024-05-11 03:48:09 +03:00
def del_root(self) -> None:
if self.root_ref is None:
return
del self.xref_table[self.root_ref.object_id]
del self.xref_table[self.root[b"Pages"].object_id]
@staticmethod
2024-07-06 08:08:35 +03:00
def get_buf_from_file(f: IO[bytes]) -> bytes | mmap.mmap:
if hasattr(f, "getbuffer"):
return f.getbuffer()
elif hasattr(f, "getvalue"):
return f.getvalue()
else:
try:
return mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
except ValueError: # cannot mmap an empty file
return b""
2024-05-11 03:48:09 +03:00
def read_pdf_info(self) -> None:
2024-07-06 08:08:35 +03:00
assert self.buf is not None
self.file_size_total = len(self.buf)
self.file_size_this = self.file_size_total - self.start_offset
self.read_trailer()
2024-07-06 08:08:35 +03:00
check_format_condition(
self.trailer_dict.get(b"Root") is not None, "Root is missing"
)
self.root_ref = self.trailer_dict[b"Root"]
2024-07-06 08:08:35 +03:00
assert self.root_ref is not None
self.info_ref = self.trailer_dict.get(b"Info", None)
self.root = PdfDict(self.read_indirect(self.root_ref))
if self.info_ref is None:
self.info = PdfDict()
else:
self.info = PdfDict(self.read_indirect(self.info_ref))
check_format_condition(b"Type" in self.root, "/Type missing in Root")
2018-06-24 15:32:25 +03:00
check_format_condition(
self.root[b"Type"] == b"Catalog", "/Type in Root is not /Catalog"
)
2024-07-06 08:08:35 +03:00
check_format_condition(
self.root.get(b"Pages") is not None, "/Pages missing in Root"
)
2018-06-24 15:32:25 +03:00
check_format_condition(
isinstance(self.root[b"Pages"], IndirectReference),
"/Pages in Root is not an indirect reference",
)
self.pages_ref = self.root[b"Pages"]
2024-07-06 08:08:35 +03:00
assert self.pages_ref is not None
self.page_tree_root = self.read_indirect(self.pages_ref)
self.pages = self.linearize_page_tree(self.page_tree_root)
2018-06-24 15:32:25 +03:00
# save the original list of page references
# in case the user modifies, adds or deletes some pages
# and we need to rewrite the pages and their list
self.orig_pages = self.pages[:]
2024-07-06 08:08:35 +03:00
def next_object_id(self, offset: int | None = None) -> IndirectReference:
try:
# TODO: support reuse of deleted objects
reference = IndirectReference(max(self.xref_table.keys()) + 1, 0)
except ValueError:
reference = IndirectReference(1, 0)
if offset is not None:
self.xref_table[reference.object_id] = (offset, 0)
return reference
2022-03-04 08:42:24 +03:00
delimiter = rb"[][()<>{}/%]"
delimiter_or_ws = rb"[][()<>{}/%\000\011\012\014\015\040]"
whitespace = rb"[\000\011\012\014\015\040]"
whitespace_or_hex = rb"[\000\011\012\014\015\0400-9a-fA-F]"
whitespace_optional = whitespace + b"*"
whitespace_mandatory = whitespace + b"+"
# No "\012" aka "\n" or "\015" aka "\r":
2022-03-04 08:42:24 +03:00
whitespace_optional_no_nl = rb"[\000\011\014\040]*"
newline_only = rb"[\r\n]+"
newline = whitespace_optional_no_nl + newline_only + whitespace_optional_no_nl
2018-06-24 15:32:25 +03:00
re_trailer_end = re.compile(
whitespace_mandatory
2022-03-04 08:42:24 +03:00
+ rb"trailer"
2018-06-24 15:32:25 +03:00
+ whitespace_optional
2022-04-10 18:17:20 +03:00
+ rb"<<(.*>>)"
2018-06-24 15:32:25 +03:00
+ newline
2022-03-04 08:42:24 +03:00
+ rb"startxref"
2018-06-24 15:32:25 +03:00
+ newline
2022-03-04 08:42:24 +03:00
+ rb"([0-9]+)"
2018-06-24 15:32:25 +03:00
+ newline
2022-03-04 08:42:24 +03:00
+ rb"%%EOF"
2018-06-24 15:32:25 +03:00
+ whitespace_optional
2022-03-04 08:42:24 +03:00
+ rb"$",
2018-06-24 15:32:25 +03:00
re.DOTALL,
)
re_trailer_prev = re.compile(
whitespace_optional
2022-03-04 08:42:24 +03:00
+ rb"trailer"
2018-06-24 15:32:25 +03:00
+ whitespace_optional
2022-04-10 18:17:20 +03:00
+ rb"<<(.*?>>)"
2018-06-24 15:32:25 +03:00
+ newline
2022-03-04 08:42:24 +03:00
+ rb"startxref"
2018-06-24 15:32:25 +03:00
+ newline
2022-03-04 08:42:24 +03:00
+ rb"([0-9]+)"
2018-06-24 15:32:25 +03:00
+ newline
2022-03-04 08:42:24 +03:00
+ rb"%%EOF"
2018-06-24 15:32:25 +03:00
+ whitespace_optional,
re.DOTALL,
)
2024-07-06 08:08:35 +03:00
def read_trailer(self) -> None:
assert self.buf is not None
search_start_offset = len(self.buf) - 16384
if search_start_offset < self.start_offset:
search_start_offset = self.start_offset
m = self.re_trailer_end.search(self.buf, search_start_offset)
2024-07-06 08:08:35 +03:00
check_format_condition(m is not None, "trailer end not found")
# make sure we found the LAST trailer
last_match = m
while m:
last_match = m
m = self.re_trailer_end.search(self.buf, m.start() + 16)
if not m:
m = last_match
2024-07-06 08:08:35 +03:00
assert m is not None
trailer_data = m.group(1)
self.last_xref_section_offset = int(m.group(2))
self.trailer_dict = self.interpret_trailer(trailer_data)
self.xref_table = XrefTable()
self.read_xref_table(xref_section_offset=self.last_xref_section_offset)
if b"Prev" in self.trailer_dict:
self.read_prev_trailer(self.trailer_dict[b"Prev"])
2024-07-06 08:08:35 +03:00
def read_prev_trailer(self, xref_section_offset: int) -> None:
assert self.buf is not None
2018-06-24 15:32:25 +03:00
trailer_offset = self.read_xref_table(xref_section_offset=xref_section_offset)
m = self.re_trailer_prev.search(
self.buf[trailer_offset : trailer_offset + 16384]
)
2024-07-06 08:08:35 +03:00
check_format_condition(m is not None, "previous trailer not found")
assert m is not None
trailer_data = m.group(1)
2018-06-24 15:32:25 +03:00
check_format_condition(
int(m.group(2)) == xref_section_offset,
"xref section offset in previous trailer doesn't match what was expected",
)
trailer_dict = self.interpret_trailer(trailer_data)
if b"Prev" in trailer_dict:
self.read_prev_trailer(trailer_dict[b"Prev"])
re_whitespace_optional = re.compile(whitespace_optional)
2018-06-24 15:32:25 +03:00
re_name = re.compile(
whitespace_optional
2022-03-04 08:42:24 +03:00
+ rb"/([!-$&'*-.0-;=?-Z\\^-z|~]+)(?="
2018-06-24 15:32:25 +03:00
+ delimiter_or_ws
2022-03-04 08:42:24 +03:00
+ rb")"
2018-06-24 15:32:25 +03:00
)
2022-04-10 18:17:20 +03:00
re_dict_start = re.compile(whitespace_optional + rb"<<")
re_dict_end = re.compile(whitespace_optional + rb">>" + whitespace_optional)
@classmethod
2024-07-06 08:08:35 +03:00
def interpret_trailer(cls, trailer_data: bytes) -> dict[bytes, Any]:
trailer = {}
offset = 0
while True:
m = cls.re_name.match(trailer_data, offset)
if not m:
m = cls.re_dict_end.match(trailer_data, offset)
2018-06-24 15:32:25 +03:00
check_format_condition(
2024-07-06 08:08:35 +03:00
m is not None and m.end() == len(trailer_data),
2018-06-24 15:32:25 +03:00
"name not found in trailer, remaining data: "
+ repr(trailer_data[offset:]),
)
break
key = cls.interpret_name(m.group(1))
2024-07-06 08:08:35 +03:00
assert isinstance(key, bytes)
value, value_offset = cls.get_value(trailer_data, m.end())
trailer[key] = value
2024-07-06 08:08:35 +03:00
if value_offset is None:
break
offset = value_offset
2018-06-24 15:32:25 +03:00
check_format_condition(
b"Size" in trailer and isinstance(trailer[b"Size"], int),
"/Size not in trailer or not an integer",
)
check_format_condition(
b"Root" in trailer and isinstance(trailer[b"Root"], IndirectReference),
"/Root not in trailer or not an indirect reference",
)
return trailer
2022-03-04 08:42:24 +03:00
re_hashes_in_name = re.compile(rb"([^#]*)(#([0-9a-fA-F]{2}))?")
@classmethod
2024-07-06 08:08:35 +03:00
def interpret_name(cls, raw: bytes, as_text: bool = False) -> str | bytes:
name = b""
for m in cls.re_hashes_in_name.finditer(raw):
if m.group(3):
2018-06-24 15:32:25 +03:00
name += m.group(1) + bytearray.fromhex(m.group(3).decode("us-ascii"))
else:
name += m.group(1)
if as_text:
return name.decode("utf-8")
else:
return bytes(name)
2022-03-04 08:42:24 +03:00
re_null = re.compile(whitespace_optional + rb"null(?=" + delimiter_or_ws + rb")")
re_true = re.compile(whitespace_optional + rb"true(?=" + delimiter_or_ws + rb")")
re_false = re.compile(whitespace_optional + rb"false(?=" + delimiter_or_ws + rb")")
2018-06-24 15:32:25 +03:00
re_int = re.compile(
2022-03-04 08:42:24 +03:00
whitespace_optional + rb"([-+]?[0-9]+)(?=" + delimiter_or_ws + rb")"
2018-06-24 15:32:25 +03:00
)
re_real = re.compile(
whitespace_optional
2022-03-04 08:42:24 +03:00
+ rb"([-+]?([0-9]+\.[0-9]*|[0-9]*\.[0-9]+))(?="
2018-06-24 15:32:25 +03:00
+ delimiter_or_ws
2022-03-04 08:42:24 +03:00
+ rb")"
2018-06-24 15:32:25 +03:00
)
2022-03-04 08:42:24 +03:00
re_array_start = re.compile(whitespace_optional + rb"\[")
re_array_end = re.compile(whitespace_optional + rb"]")
2018-06-24 15:32:25 +03:00
re_string_hex = re.compile(
2022-04-10 18:17:20 +03:00
whitespace_optional + rb"<(" + whitespace_or_hex + rb"*)>"
2018-06-24 15:32:25 +03:00
)
2022-03-04 08:42:24 +03:00
re_string_lit = re.compile(whitespace_optional + rb"\(")
2018-06-24 15:32:25 +03:00
re_indirect_reference = re.compile(
whitespace_optional
2022-03-04 08:42:24 +03:00
+ rb"([-+]?[0-9]+)"
2018-06-24 15:32:25 +03:00
+ whitespace_mandatory
2022-03-04 08:42:24 +03:00
+ rb"([-+]?[0-9]+)"
2018-06-24 15:32:25 +03:00
+ whitespace_mandatory
2022-03-04 08:42:24 +03:00
+ rb"R(?="
2018-06-24 15:32:25 +03:00
+ delimiter_or_ws
2022-03-04 08:42:24 +03:00
+ rb")"
2018-06-24 15:32:25 +03:00
)
re_indirect_def_start = re.compile(
whitespace_optional
2022-03-04 08:42:24 +03:00
+ rb"([-+]?[0-9]+)"
2018-06-24 15:32:25 +03:00
+ whitespace_mandatory
2022-03-04 08:42:24 +03:00
+ rb"([-+]?[0-9]+)"
2018-06-24 15:32:25 +03:00
+ whitespace_mandatory
2022-03-04 08:42:24 +03:00
+ rb"obj(?="
2018-06-24 15:32:25 +03:00
+ delimiter_or_ws
2022-03-04 08:42:24 +03:00
+ rb")"
2018-06-24 15:32:25 +03:00
)
re_indirect_def_end = re.compile(
2022-03-04 08:42:24 +03:00
whitespace_optional + rb"endobj(?=" + delimiter_or_ws + rb")"
2018-06-24 15:32:25 +03:00
)
re_comment = re.compile(
2022-03-04 08:42:24 +03:00
rb"(" + whitespace_optional + rb"%[^\r\n]*" + newline + rb")*"
2018-06-24 15:32:25 +03:00
)
2022-03-04 08:42:24 +03:00
re_stream_start = re.compile(whitespace_optional + rb"stream\r?\n")
2018-06-24 15:32:25 +03:00
re_stream_end = re.compile(
2022-03-04 08:42:24 +03:00
whitespace_optional + rb"endstream(?=" + delimiter_or_ws + rb")"
2018-06-24 15:32:25 +03:00
)
@classmethod
2024-07-06 08:08:35 +03:00
def get_value(
cls,
data: bytes | bytearray | mmap.mmap,
offset: int,
expect_indirect: IndirectReference | None = None,
max_nesting: int = -1,
) -> tuple[Any, int | None]:
if max_nesting == 0:
return None, None
m = cls.re_comment.match(data, offset)
if m:
offset = m.end()
m = cls.re_indirect_def_start.match(data, offset)
if m:
2018-06-24 15:32:25 +03:00
check_format_condition(
int(m.group(1)) > 0,
"indirect object definition: object ID must be greater than 0",
)
check_format_condition(
int(m.group(2)) >= 0,
"indirect object definition: generation must be non-negative",
)
check_format_condition(
expect_indirect is None
or expect_indirect
== IndirectReference(int(m.group(1)), int(m.group(2))),
"indirect object definition different than expected",
)
2024-07-06 08:08:35 +03:00
object, object_offset = cls.get_value(
data, m.end(), max_nesting=max_nesting - 1
)
if object_offset is None:
return object, None
2024-07-06 08:08:35 +03:00
m = cls.re_indirect_def_end.match(data, object_offset)
check_format_condition(
m is not None, "indirect object definition end not found"
)
assert m is not None
return object, m.end()
2018-06-24 15:32:25 +03:00
check_format_condition(
not expect_indirect, "indirect object definition not found"
)
m = cls.re_indirect_reference.match(data, offset)
if m:
2018-06-24 15:32:25 +03:00
check_format_condition(
int(m.group(1)) > 0,
"indirect object reference: object ID must be greater than 0",
)
check_format_condition(
int(m.group(2)) >= 0,
"indirect object reference: generation must be non-negative",
)
return IndirectReference(int(m.group(1)), int(m.group(2))), m.end()
m = cls.re_dict_start.match(data, offset)
if m:
offset = m.end()
2024-07-06 08:08:35 +03:00
result: dict[Any, Any] = {}
m = cls.re_dict_end.match(data, offset)
2024-07-06 08:08:35 +03:00
current_offset: int | None = offset
while not m:
2024-07-06 08:08:35 +03:00
assert current_offset is not None
key, current_offset = cls.get_value(
data, current_offset, max_nesting=max_nesting - 1
)
if current_offset is None:
return result, None
2024-07-06 08:08:35 +03:00
value, current_offset = cls.get_value(
data, current_offset, max_nesting=max_nesting - 1
)
result[key] = value
2024-07-06 08:08:35 +03:00
if current_offset is None:
return result, None
2024-07-06 08:08:35 +03:00
m = cls.re_dict_end.match(data, current_offset)
current_offset = m.end()
m = cls.re_stream_start.match(data, current_offset)
if m:
2024-07-06 08:08:35 +03:00
stream_len = result.get(b"Length")
if stream_len is None or not isinstance(stream_len, int):
msg = f"bad or missing Length in stream dict ({stream_len})"
raise PdfFormatError(msg)
stream_data = data[m.end() : m.end() + stream_len]
m = cls.re_stream_end.match(data, m.end() + stream_len)
2024-07-06 08:08:35 +03:00
check_format_condition(m is not None, "stream end not found")
assert m is not None
current_offset = m.end()
return PdfStream(PdfDict(result), stream_data), current_offset
return PdfDict(result), current_offset
m = cls.re_array_start.match(data, offset)
if m:
offset = m.end()
2024-07-06 08:08:35 +03:00
results = []
m = cls.re_array_end.match(data, offset)
2024-07-06 08:08:35 +03:00
current_offset = offset
while not m:
2024-07-06 08:08:35 +03:00
assert current_offset is not None
value, current_offset = cls.get_value(
data, current_offset, max_nesting=max_nesting - 1
)
results.append(value)
if current_offset is None:
return results, None
m = cls.re_array_end.match(data, current_offset)
return results, m.end()
m = cls.re_null.match(data, offset)
if m:
return None, m.end()
m = cls.re_true.match(data, offset)
if m:
return True, m.end()
m = cls.re_false.match(data, offset)
if m:
return False, m.end()
m = cls.re_name.match(data, offset)
if m:
return PdfName(cls.interpret_name(m.group(1))), m.end()
m = cls.re_int.match(data, offset)
if m:
return int(m.group(1)), m.end()
m = cls.re_real.match(data, offset)
if m:
2018-06-24 15:32:25 +03:00
# XXX Decimal instead of float???
return float(m.group(1)), m.end()
m = cls.re_string_hex.match(data, offset)
if m:
2018-06-24 15:32:25 +03:00
# filter out whitespace
hex_string = bytearray(
b for b in m.group(1) if b in b"0123456789abcdefABCDEF"
2018-06-24 15:32:25 +03:00
)
if len(hex_string) % 2 == 1:
2018-06-24 15:32:25 +03:00
# append a 0 if the length is not even - yes, at the end
hex_string.append(ord(b"0"))
return bytearray.fromhex(hex_string.decode("us-ascii")), m.end()
m = cls.re_string_lit.match(data, offset)
if m:
return cls.get_literal_string(data, m.end())
2018-06-15 12:40:04 +03:00
# return None, offset # fallback (only for debugging)
2024-05-04 19:21:49 +03:00
msg = f"unrecognized object: {repr(data[offset : offset + 32])}"
raise PdfFormatError(msg)
2018-10-24 22:24:03 +03:00
re_lit_str_token = re.compile(
2022-03-04 08:42:24 +03:00
rb"(\\[nrtbf()\\])|(\\[0-9]{1,3})|(\\(\r\n|\r|\n))|(\r\n|\r|\n)|(\()|(\))"
2018-10-24 22:24:03 +03:00
)
escaped_chars = {
b"n": b"\n",
b"r": b"\r",
b"t": b"\t",
b"b": b"\b",
b"f": b"\f",
b"(": b"(",
b")": b")",
b"\\": b"\\",
ord(b"n"): b"\n",
ord(b"r"): b"\r",
ord(b"t"): b"\t",
ord(b"b"): b"\b",
ord(b"f"): b"\f",
ord(b"("): b"(",
ord(b")"): b")",
ord(b"\\"): b"\\",
}
@classmethod
2024-07-06 08:08:35 +03:00
def get_literal_string(
cls, data: bytes | bytearray | mmap.mmap, offset: int
) -> tuple[bytes, int]:
nesting_depth = 0
result = bytearray()
for m in cls.re_lit_str_token.finditer(data, offset):
result.extend(data[offset : m.start()])
if m.group(1):
result.extend(cls.escaped_chars[m.group(1)[1]])
elif m.group(2):
result.append(int(m.group(2)[1:], 8))
elif m.group(3):
pass
elif m.group(5):
result.extend(b"\n")
elif m.group(6):
result.extend(b"(")
nesting_depth += 1
elif m.group(7):
if nesting_depth == 0:
return bytes(result), m.end()
result.extend(b")")
nesting_depth -= 1
offset = m.end()
msg = "unfinished literal string"
raise PdfFormatError(msg)
2022-03-04 08:42:24 +03:00
re_xref_section_start = re.compile(whitespace_optional + rb"xref" + newline)
2018-06-24 15:32:25 +03:00
re_xref_subsection_start = re.compile(
whitespace_optional
2022-03-04 08:42:24 +03:00
+ rb"([0-9]+)"
2018-06-24 15:32:25 +03:00
+ whitespace_mandatory
2022-03-04 08:42:24 +03:00
+ rb"([0-9]+)"
2018-06-24 15:32:25 +03:00
+ whitespace_optional
+ newline_only
)
2022-03-04 08:42:24 +03:00
re_xref_entry = re.compile(rb"([0-9]{10}) ([0-9]{5}) ([fn])( \r| \n|\r\n)")
2024-07-06 08:08:35 +03:00
def read_xref_table(self, xref_section_offset: int) -> int:
assert self.buf is not None
subsection_found = False
2018-06-24 15:32:25 +03:00
m = self.re_xref_section_start.match(
self.buf, xref_section_offset + self.start_offset
)
2024-07-06 08:08:35 +03:00
check_format_condition(m is not None, "xref section start not found")
assert m is not None
offset = m.end()
while True:
m = self.re_xref_subsection_start.match(self.buf, offset)
if not m:
2018-06-24 15:32:25 +03:00
check_format_condition(
subsection_found, "xref subsection start not found"
)
break
subsection_found = True
offset = m.end()
first_object = int(m.group(1))
num_objects = int(m.group(2))
for i in range(first_object, first_object + num_objects):
m = self.re_xref_entry.match(self.buf, offset)
2024-07-06 08:08:35 +03:00
check_format_condition(m is not None, "xref entry not found")
assert m is not None
offset = m.end()
is_free = m.group(3) == b"f"
if not is_free:
generation = int(m.group(2))
new_entry = (int(m.group(1)), generation)
if i not in self.xref_table:
self.xref_table[i] = new_entry
return offset
2024-07-06 08:08:35 +03:00
def read_indirect(self, ref: IndirectReference, max_nesting: int = -1) -> Any:
offset, generation = self.xref_table[ref[0]]
2018-06-24 15:32:25 +03:00
check_format_condition(
generation == ref[1],
f"expected to find generation {ref[1]} for object ID {ref[0]} in xref "
f"table, instead found generation {generation} at offset {offset}",
)
2024-07-06 08:08:35 +03:00
assert self.buf is not None
2018-06-24 15:32:25 +03:00
value = self.get_value(
self.buf,
offset + self.start_offset,
expect_indirect=IndirectReference(*ref),
max_nesting=max_nesting,
)[0]
self.cached_objects[ref] = value
return value
2024-07-06 08:08:35 +03:00
def linearize_page_tree(
self, node: PdfDict | None = None
) -> list[IndirectReference]:
page_node = node if node is not None else self.page_tree_root
2018-06-24 15:32:25 +03:00
check_format_condition(
2024-07-06 08:08:35 +03:00
page_node[b"Type"] == b"Pages", "/Type of page tree node is not /Pages"
2018-06-24 15:32:25 +03:00
)
pages = []
2024-07-06 08:08:35 +03:00
for kid in page_node[b"Kids"]:
kid_object = self.read_indirect(kid)
if kid_object[b"Type"] == b"Page":
pages.append(kid)
else:
pages.extend(self.linearize_page_tree(node=kid_object))
return pages