mirror of
https://github.com/python-pillow/Pillow.git
synced 2024-11-10 19:56:47 +03:00
Merge pull request #2965 from vashek/master
Support appending to existing PDFs
This commit is contained in:
commit
ddc9e73b47
|
@ -1,24 +1,31 @@
|
|||
from helper import unittest, PillowTestCase, hopper
|
||||
from PIL import Image
|
||||
from PIL import Image, PdfParser
|
||||
import io
|
||||
import os
|
||||
import os.path
|
||||
import tempfile
|
||||
|
||||
|
||||
class TestFilePdf(PillowTestCase):
|
||||
|
||||
def helper_save_as_pdf(self, mode, save_all=False):
|
||||
def helper_save_as_pdf(self, mode, **kwargs):
|
||||
# Arrange
|
||||
im = hopper(mode)
|
||||
outfile = self.tempfile("temp_" + mode + ".pdf")
|
||||
|
||||
# Act
|
||||
if save_all:
|
||||
im.save(outfile, save_all=True)
|
||||
else:
|
||||
im.save(outfile)
|
||||
im.save(outfile, **kwargs)
|
||||
|
||||
# Assert
|
||||
self.assertTrue(os.path.isfile(outfile))
|
||||
self.assertGreater(os.path.getsize(outfile), 0)
|
||||
with PdfParser.PdfParser(outfile) as pdf:
|
||||
if kwargs.get("append_images", False) or kwargs.get("append", False):
|
||||
self.assertGreater(len(pdf.pages), 1)
|
||||
else:
|
||||
self.assertGreater(len(pdf.pages), 0)
|
||||
|
||||
return outfile
|
||||
|
||||
def test_monochrome(self):
|
||||
# Arrange
|
||||
|
@ -97,6 +104,135 @@ class TestFilePdf(PillowTestCase):
|
|||
self.assertTrue(os.path.isfile(outfile))
|
||||
self.assertGreater(os.path.getsize(outfile), 0)
|
||||
|
||||
def test_pdf_open(self):
|
||||
# fail on a buffer full of null bytes
|
||||
self.assertRaises(PdfParser.PdfFormatError, PdfParser.PdfParser, buf=bytearray(65536))
|
||||
|
||||
# make an empty PDF object
|
||||
with PdfParser.PdfParser() as empty_pdf:
|
||||
self.assertEqual(len(empty_pdf.pages), 0)
|
||||
self.assertEqual(len(empty_pdf.info), 0)
|
||||
self.assertFalse(empty_pdf.should_close_buf)
|
||||
self.assertFalse(empty_pdf.should_close_file)
|
||||
|
||||
# make a PDF file
|
||||
pdf_filename = self.helper_save_as_pdf("RGB")
|
||||
|
||||
# open the PDF file
|
||||
with PdfParser.PdfParser(filename=pdf_filename) as hopper_pdf:
|
||||
self.assertEqual(len(hopper_pdf.pages), 1)
|
||||
self.assertTrue(hopper_pdf.should_close_buf)
|
||||
self.assertTrue(hopper_pdf.should_close_file)
|
||||
|
||||
# read a PDF file from a buffer with a non-zero offset
|
||||
with open(pdf_filename, "rb") as f:
|
||||
content = b"xyzzy" + f.read()
|
||||
with PdfParser.PdfParser(buf=content, start_offset=5) as hopper_pdf:
|
||||
self.assertEqual(len(hopper_pdf.pages), 1)
|
||||
self.assertFalse(hopper_pdf.should_close_buf)
|
||||
self.assertFalse(hopper_pdf.should_close_file)
|
||||
|
||||
# read a PDF file from an already open file
|
||||
with open(pdf_filename, "rb") as f:
|
||||
with PdfParser.PdfParser(f=f) as hopper_pdf:
|
||||
self.assertEqual(len(hopper_pdf.pages), 1)
|
||||
self.assertTrue(hopper_pdf.should_close_buf)
|
||||
self.assertFalse(hopper_pdf.should_close_file)
|
||||
|
||||
def test_pdf_append_fails_on_nonexistent_file(self):
|
||||
im = hopper("RGB")
|
||||
temp_dir = tempfile.mkdtemp()
|
||||
try:
|
||||
self.assertRaises(IOError, im.save, os.path.join(temp_dir, "nonexistent.pdf"), append=True)
|
||||
finally:
|
||||
os.rmdir(temp_dir)
|
||||
|
||||
def check_pdf_pages_consistency(self, pdf):
|
||||
pages_info = pdf.read_indirect(pdf.pages_ref)
|
||||
self.assertNotIn(b"Parent", pages_info)
|
||||
self.assertIn(b"Kids", pages_info)
|
||||
kids_not_used = pages_info[b"Kids"]
|
||||
for page_ref in pdf.pages:
|
||||
while True:
|
||||
if page_ref in kids_not_used:
|
||||
kids_not_used.remove(page_ref)
|
||||
page_info = pdf.read_indirect(page_ref)
|
||||
self.assertIn(b"Parent", page_info)
|
||||
page_ref = page_info[b"Parent"]
|
||||
if page_ref == pdf.pages_ref:
|
||||
break
|
||||
self.assertEqual(pdf.pages_ref, page_info[b"Parent"])
|
||||
self.assertEqual(kids_not_used, [])
|
||||
|
||||
def test_pdf_append(self):
|
||||
# make a PDF file
|
||||
pdf_filename = self.helper_save_as_pdf("RGB", producer="PdfParser")
|
||||
|
||||
# open it, check pages and info
|
||||
with PdfParser.PdfParser(pdf_filename, mode="r+b") as pdf:
|
||||
self.assertEqual(len(pdf.pages), 1)
|
||||
self.assertEqual(len(pdf.info), 1)
|
||||
self.assertEqual(pdf.info.Producer, "PdfParser")
|
||||
self.check_pdf_pages_consistency(pdf)
|
||||
|
||||
# append some info
|
||||
pdf.info.Title = "abc"
|
||||
pdf.info.Author = "def"
|
||||
pdf.info.Subject = u"ghi\uABCD"
|
||||
pdf.info.Keywords = "qw)e\\r(ty"
|
||||
pdf.info.Creator = "hopper()"
|
||||
pdf.start_writing()
|
||||
pdf.write_xref_and_trailer()
|
||||
|
||||
# open it again, check pages and info again
|
||||
with PdfParser.PdfParser(pdf_filename) as pdf:
|
||||
self.assertEqual(len(pdf.pages), 1)
|
||||
self.assertEqual(len(pdf.info), 6)
|
||||
self.assertEqual(pdf.info.Title, "abc")
|
||||
self.check_pdf_pages_consistency(pdf)
|
||||
|
||||
# append two images
|
||||
mode_CMYK = hopper("CMYK")
|
||||
mode_P = hopper("P")
|
||||
mode_CMYK.save(pdf_filename, append=True, save_all=True, append_images=[mode_P])
|
||||
|
||||
# open the PDF again, check pages and info again
|
||||
with PdfParser.PdfParser(pdf_filename) as pdf:
|
||||
self.assertEqual(len(pdf.pages), 3)
|
||||
self.assertEqual(len(pdf.info), 6)
|
||||
self.assertEqual(PdfParser.decode_text(pdf.info[b"Title"]), "abc")
|
||||
self.assertEqual(pdf.info.Title, "abc")
|
||||
self.assertEqual(pdf.info.Producer, "PdfParser")
|
||||
self.assertEqual(pdf.info.Keywords, "qw)e\\r(ty")
|
||||
self.assertEqual(pdf.info.Subject, u"ghi\uABCD")
|
||||
self.check_pdf_pages_consistency(pdf)
|
||||
|
||||
def test_pdf_info(self):
|
||||
# make a PDF file
|
||||
pdf_filename = self.helper_save_as_pdf("RGB", title="title", author="author", subject="subject", keywords="keywords", creator="creator", producer="producer")
|
||||
|
||||
# open it, check pages and info
|
||||
with PdfParser.PdfParser(pdf_filename) as pdf:
|
||||
self.assertEqual(len(pdf.info), 6)
|
||||
self.assertEqual(pdf.info.Title, "title")
|
||||
self.assertEqual(pdf.info.Author, "author")
|
||||
self.assertEqual(pdf.info.Subject, "subject")
|
||||
self.assertEqual(pdf.info.Keywords, "keywords")
|
||||
self.assertEqual(pdf.info.Creator, "creator")
|
||||
self.assertEqual(pdf.info.Producer, "producer")
|
||||
self.check_pdf_pages_consistency(pdf)
|
||||
|
||||
def test_pdf_append_to_bytesio(self):
|
||||
im = hopper("RGB")
|
||||
f = io.BytesIO()
|
||||
im.save(f, format="PDF")
|
||||
initial_size = len(f.getvalue())
|
||||
self.assertGreater(initial_size, 0)
|
||||
im = hopper("P")
|
||||
f = io.BytesIO(f.getvalue())
|
||||
im.save(f, format="PDF", append=True)
|
||||
self.assertGreater(len(f.getvalue()), initial_size)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
89
Tests/test_pdfparser.py
Normal file
89
Tests/test_pdfparser.py
Normal file
|
@ -0,0 +1,89 @@
|
|||
from helper import unittest, PillowTestCase
|
||||
|
||||
from PIL.PdfParser import IndirectObjectDef, IndirectReference, PdfBinary, PdfDict, PdfFormatError, PdfName, PdfParser, PdfStream, decode_text, encode_text, pdf_repr
|
||||
|
||||
|
||||
class TestPdfParser(PillowTestCase):
|
||||
|
||||
def test_text_encode_decode(self):
|
||||
self.assertEqual(encode_text("abc"), b"\xFE\xFF\x00a\x00b\x00c")
|
||||
self.assertEqual(decode_text(b"\xFE\xFF\x00a\x00b\x00c"), "abc")
|
||||
self.assertEqual(decode_text(b"abc"), "abc")
|
||||
self.assertEqual(decode_text(b"\x1B a \x1C"), u"\u02D9 a \u02DD")
|
||||
|
||||
def test_indirect_refs(self):
|
||||
self.assertEqual(IndirectReference(1, 2), IndirectReference(1, 2))
|
||||
self.assertNotEqual(IndirectReference(1, 2), IndirectReference(1, 3))
|
||||
self.assertNotEqual(IndirectReference(1, 2), IndirectObjectDef(1, 2))
|
||||
self.assertNotEqual(IndirectReference(1, 2), (1, 2))
|
||||
self.assertEqual(IndirectObjectDef(1, 2), IndirectObjectDef(1, 2))
|
||||
self.assertNotEqual(IndirectObjectDef(1, 2), IndirectObjectDef(1, 3))
|
||||
self.assertNotEqual(IndirectObjectDef(1, 2), IndirectReference(1, 2))
|
||||
self.assertNotEqual(IndirectObjectDef(1, 2), (1, 2))
|
||||
|
||||
def test_parsing(self):
|
||||
self.assertEqual(PdfParser.interpret_name(b"Name#23Hash"), b"Name#Hash")
|
||||
self.assertEqual(PdfParser.interpret_name(b"Name#23Hash", as_text=True), "Name#Hash")
|
||||
self.assertEqual(PdfParser.get_value(b"1 2 R ", 0), (IndirectReference(1, 2), 5))
|
||||
self.assertEqual(PdfParser.get_value(b"true[", 0), (True, 4))
|
||||
self.assertEqual(PdfParser.get_value(b"false%", 0), (False, 5))
|
||||
self.assertEqual(PdfParser.get_value(b"null<", 0), (None, 4))
|
||||
self.assertEqual(PdfParser.get_value(b"%cmt\n %cmt\n 123\n", 0), (123, 15))
|
||||
self.assertEqual(PdfParser.get_value(b"<901FA3>", 0), (b"\x90\x1F\xA3", 8))
|
||||
self.assertEqual(PdfParser.get_value(b"asd < 9 0 1 f A > qwe", 3), (b"\x90\x1F\xA0", 17))
|
||||
self.assertEqual(PdfParser.get_value(b"(asd)", 0), (b"asd", 5))
|
||||
self.assertEqual(PdfParser.get_value(b"(asd(qwe)zxc)zzz(aaa)", 0), (b"asd(qwe)zxc", 13))
|
||||
self.assertEqual(PdfParser.get_value(b"(Two \\\nwords.)", 0), (b"Two words.", 14))
|
||||
self.assertEqual(PdfParser.get_value(b"(Two\nlines.)", 0), (b"Two\nlines.", 12))
|
||||
self.assertEqual(PdfParser.get_value(b"(Two\r\nlines.)", 0), (b"Two\nlines.", 13))
|
||||
self.assertEqual(PdfParser.get_value(b"(Two\\nlines.)", 0), (b"Two\nlines.", 13))
|
||||
self.assertEqual(PdfParser.get_value(b"(One\\(paren).", 0), (b"One(paren", 12))
|
||||
self.assertEqual(PdfParser.get_value(b"(One\\)paren).", 0), (b"One)paren", 12))
|
||||
self.assertEqual(PdfParser.get_value(b"(\\0053)", 0), (b"\x053", 7))
|
||||
self.assertEqual(PdfParser.get_value(b"(\\053)", 0), (b"\x2B", 6))
|
||||
self.assertEqual(PdfParser.get_value(b"(\\53)", 0), (b"\x2B", 5))
|
||||
self.assertEqual(PdfParser.get_value(b"(\\53a)", 0), (b"\x2Ba", 6))
|
||||
self.assertEqual(PdfParser.get_value(b"(\\1111)", 0), (b"\x491", 7))
|
||||
self.assertEqual(PdfParser.get_value(b" 123 (", 0), (123, 4))
|
||||
self.assertAlmostEqual(PdfParser.get_value(b" 123.4 %", 0)[0], 123.4)
|
||||
self.assertEqual(PdfParser.get_value(b" 123.4 %", 0)[1], 6)
|
||||
self.assertRaises(PdfFormatError, PdfParser.get_value, b"]", 0)
|
||||
d = PdfParser.get_value(b"<</Name (value) /N /V>>", 0)[0]
|
||||
self.assertIsInstance(d, PdfDict)
|
||||
self.assertEqual(len(d), 2)
|
||||
self.assertEqual(d.Name, "value")
|
||||
self.assertEqual(d[b"Name"], b"value")
|
||||
self.assertEqual(d.N, PdfName("V"))
|
||||
a = PdfParser.get_value(b"[/Name (value) /N /V]", 0)[0]
|
||||
self.assertIsInstance(a, list)
|
||||
self.assertEqual(len(a), 4)
|
||||
self.assertEqual(a[0], PdfName("Name"))
|
||||
s = PdfParser.get_value(b"<</Name (value) /Length 5>>\nstream\nabcde\nendstream<<...", 0)[0]
|
||||
self.assertIsInstance(s, PdfStream)
|
||||
self.assertEqual(s.dictionary.Name, "value")
|
||||
self.assertEqual(s.decode(), b"abcde")
|
||||
|
||||
def test_pdf_repr(self):
|
||||
self.assertEqual(bytes(IndirectReference(1, 2)), b"1 2 R")
|
||||
self.assertEqual(bytes(IndirectObjectDef(*IndirectReference(1, 2))), b"1 2 obj")
|
||||
self.assertEqual(bytes(PdfName(b"Name#Hash")), b"/Name#23Hash")
|
||||
self.assertEqual(bytes(PdfName("Name#Hash")), b"/Name#23Hash")
|
||||
self.assertEqual(bytes(PdfDict({b"Name": IndirectReference(1, 2)})), b"<<\n/Name 1 2 R\n>>")
|
||||
self.assertEqual(bytes(PdfDict({"Name": IndirectReference(1, 2)})), b"<<\n/Name 1 2 R\n>>")
|
||||
self.assertEqual(pdf_repr(IndirectReference(1, 2)), b"1 2 R")
|
||||
self.assertEqual(pdf_repr(IndirectObjectDef(*IndirectReference(1, 2))), b"1 2 obj")
|
||||
self.assertEqual(pdf_repr(PdfName(b"Name#Hash")), b"/Name#23Hash")
|
||||
self.assertEqual(pdf_repr(PdfName("Name#Hash")), b"/Name#23Hash")
|
||||
self.assertEqual(pdf_repr(PdfDict({b"Name": IndirectReference(1, 2)})), b"<<\n/Name 1 2 R\n>>")
|
||||
self.assertEqual(pdf_repr(PdfDict({"Name": IndirectReference(1, 2)})), b"<<\n/Name 1 2 R\n>>")
|
||||
self.assertEqual(pdf_repr(123), b"123")
|
||||
self.assertEqual(pdf_repr(True), b"true")
|
||||
self.assertEqual(pdf_repr(False), b"false")
|
||||
self.assertEqual(pdf_repr(None), b"null")
|
||||
self.assertEqual(pdf_repr(b"a)/b\\(c"), br"(a\)/b\\\(c)")
|
||||
self.assertEqual(pdf_repr([123, True, {"a": PdfName(b"b")}]), b"[ 123 true <<\n/a /b\n>> ]")
|
||||
self.assertEqual(pdf_repr(PdfBinary(b"\x90\x1F\xA0")), b"<901FA0>")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
|
@ -612,6 +612,14 @@ The :py:meth:`~PIL.Image.Image.save` method can take the following keyword argum
|
|||
|
||||
.. versionadded:: 3.4.0
|
||||
|
||||
**append_images**
|
||||
A list of images to append as additional frames. Each of the
|
||||
images in the list can be single or multiframe images. Note however, that for
|
||||
correct results, all the appended images should have the same
|
||||
``encoderinfo`` and ``encoderconfig`` properties.
|
||||
|
||||
.. versionadded:: 4.2.0
|
||||
|
||||
**tiffinfo**
|
||||
A :py:class:`~PIL.TiffImagePlugin.ImageFileDirectory_v2` object or dict
|
||||
object containing tiff tags and values. The TIFF field type is
|
||||
|
@ -944,14 +952,68 @@ The format code is ``Palm``, the extension is ``.palm``.
|
|||
PDF
|
||||
^^^
|
||||
|
||||
PIL can write PDF (Acrobat) images. Such images are written as binary PDF 1.1
|
||||
PIL can write PDF (Acrobat) images. Such images are written as binary PDF 1.4
|
||||
files, using either JPEG or HEX encoding depending on the image mode (and
|
||||
whether JPEG support is available or not).
|
||||
|
||||
When calling :py:meth:`~PIL.Image.Image.save`, if a multiframe image is used,
|
||||
by default, only the first image will be saved. To save all frames, each frame
|
||||
to a separate page of the PDF, the ``save_all`` parameter must be present and
|
||||
set to ``True``.
|
||||
The :py:meth:`~PIL.Image.Image.save` method can take the following keyword arguments:
|
||||
|
||||
**save_all**
|
||||
If a multiframe image is used, by default, only the first image will be saved.
|
||||
To save all frames, each frame to a separate page of the PDF, the ``save_all``
|
||||
parameter must be present and set to ``True``.
|
||||
|
||||
.. versionadded:: 3.0.0
|
||||
|
||||
**append_images**
|
||||
A list of images to append as additional pages. Each of the
|
||||
images in the list can be single or multiframe images.
|
||||
|
||||
.. versionadded:: 4.2.0
|
||||
|
||||
**append**
|
||||
Set to True to append pages to an existing PDF file. If the file doesn't
|
||||
exist, an :py:exc:`IOError` will be raised.
|
||||
|
||||
.. versionadded:: 5.1.0
|
||||
|
||||
**resolution**
|
||||
Image resolution in DPI. This, together with the number of pixels in the
|
||||
image, will determine the physical dimensions of the page that will be
|
||||
saved in the PDF.
|
||||
|
||||
**title**
|
||||
The document’s title.
|
||||
|
||||
.. versionadded:: 5.1.0
|
||||
|
||||
**author**
|
||||
The name of the person who created the document.
|
||||
|
||||
.. versionadded:: 5.1.0
|
||||
|
||||
**subject**
|
||||
The subject of the document.
|
||||
|
||||
.. versionadded:: 5.1.0
|
||||
|
||||
**keywords**
|
||||
Keywords associated with the document.
|
||||
|
||||
.. versionadded:: 5.1.0
|
||||
|
||||
**creator**
|
||||
If the document was converted to PDF from another format, the name of the
|
||||
conforming product that created the original document from which it was
|
||||
converted.
|
||||
|
||||
.. versionadded:: 5.1.0
|
||||
|
||||
**producer**
|
||||
If the document was converted to PDF from another format, the name of the
|
||||
conforming product that converted it to PDF.
|
||||
|
||||
.. versionadded:: 5.1.0
|
||||
|
||||
XV Thumbnails
|
||||
^^^^^^^^^^^^^
|
||||
|
|
|
@ -1924,9 +1924,12 @@ class Image(object):
|
|||
save_handler = SAVE[format.upper()]
|
||||
|
||||
if open_fp:
|
||||
# Open also for reading ("+"), because TIFF save_all
|
||||
# writer needs to go back and edit the written data.
|
||||
fp = builtins.open(filename, "w+b")
|
||||
if params.get('append', False):
|
||||
fp = builtins.open(filename, "r+b")
|
||||
else:
|
||||
# Open also for reading ("+"), because TIFF save_all
|
||||
# writer needs to go back and edit the written data.
|
||||
fp = builtins.open(filename, "w+b")
|
||||
|
||||
try:
|
||||
save_handler(self, fp, filename)
|
||||
|
|
|
@ -20,11 +20,10 @@
|
|||
# Image plugin for PDF images (output only).
|
||||
##
|
||||
|
||||
from . import Image, ImageFile, ImageSequence
|
||||
from ._binary import i8
|
||||
from . import Image, ImageFile, ImageSequence, PdfParser
|
||||
import io
|
||||
|
||||
__version__ = "0.4"
|
||||
__version__ = "0.5"
|
||||
|
||||
|
||||
#
|
||||
|
@ -37,19 +36,6 @@ __version__ = "0.4"
|
|||
# 4. page
|
||||
# 5. page contents
|
||||
|
||||
def _obj(fp, obj, **dictionary):
|
||||
fp.write("%d 0 obj\n" % obj)
|
||||
if dictionary:
|
||||
fp.write("<<\n")
|
||||
for k, v in dictionary.items():
|
||||
if v is not None:
|
||||
fp.write("/%s %s\n" % (k, v))
|
||||
fp.write(">>\n")
|
||||
|
||||
|
||||
def _endobj(fp):
|
||||
fp.write("endobj\n")
|
||||
|
||||
|
||||
def _save_all(im, fp, filename):
|
||||
_save(im, fp, filename, save_all=True)
|
||||
|
@ -60,76 +46,39 @@ def _save_all(im, fp, filename):
|
|||
|
||||
def _save(im, fp, filename, save_all=False):
|
||||
resolution = im.encoderinfo.get("resolution", 72.0)
|
||||
is_appending = im.encoderinfo.get("append", False)
|
||||
title = im.encoderinfo.get("title", None)
|
||||
author = im.encoderinfo.get("author", None)
|
||||
subject = im.encoderinfo.get("subject", None)
|
||||
keywords = im.encoderinfo.get("keywords", None)
|
||||
creator = im.encoderinfo.get("creator", None)
|
||||
producer = im.encoderinfo.get("producer", None)
|
||||
|
||||
if is_appending:
|
||||
existing_pdf = PdfParser.PdfParser(f=fp, filename=filename, mode="r+b")
|
||||
else:
|
||||
existing_pdf = PdfParser.PdfParser(f=fp, filename=filename, mode="w+b")
|
||||
|
||||
if title:
|
||||
existing_pdf.info.Title = title
|
||||
if author:
|
||||
existing_pdf.info.Author = author
|
||||
if subject:
|
||||
existing_pdf.info.Subject = subject
|
||||
if keywords:
|
||||
existing_pdf.info.Keywords = keywords
|
||||
if creator:
|
||||
existing_pdf.info.Creator = creator
|
||||
if producer:
|
||||
existing_pdf.info.Producer = producer
|
||||
|
||||
#
|
||||
# make sure image data is available
|
||||
im.load()
|
||||
|
||||
xref = [0]
|
||||
|
||||
class TextWriter(object):
|
||||
def __init__(self, fp):
|
||||
self.fp = fp
|
||||
|
||||
def __getattr__(self, name):
|
||||
return getattr(self.fp, name)
|
||||
|
||||
def write(self, value):
|
||||
self.fp.write(value.encode('latin-1'))
|
||||
|
||||
fp = TextWriter(fp)
|
||||
|
||||
fp.write("%PDF-1.2\n")
|
||||
fp.write("% created by PIL PDF driver " + __version__ + "\n")
|
||||
|
||||
# FIXME: Should replace ASCIIHexDecode with RunLengthDecode (packbits)
|
||||
# or LZWDecode (tiff/lzw compression). Note that PDF 1.2 also supports
|
||||
# Flatedecode (zip compression).
|
||||
|
||||
bits = 8
|
||||
params = None
|
||||
|
||||
if im.mode == "1":
|
||||
filter = "/ASCIIHexDecode"
|
||||
colorspace = "/DeviceGray"
|
||||
procset = "/ImageB" # grayscale
|
||||
bits = 1
|
||||
elif im.mode == "L":
|
||||
filter = "/DCTDecode"
|
||||
# params = "<< /Predictor 15 /Columns %d >>" % (width-2)
|
||||
colorspace = "/DeviceGray"
|
||||
procset = "/ImageB" # grayscale
|
||||
elif im.mode == "P":
|
||||
filter = "/ASCIIHexDecode"
|
||||
colorspace = "[ /Indexed /DeviceRGB 255 <"
|
||||
palette = im.im.getpalette("RGB")
|
||||
for i in range(256):
|
||||
r = i8(palette[i*3])
|
||||
g = i8(palette[i*3+1])
|
||||
b = i8(palette[i*3+2])
|
||||
colorspace += "%02x%02x%02x " % (r, g, b)
|
||||
colorspace += "> ]"
|
||||
procset = "/ImageI" # indexed color
|
||||
elif im.mode == "RGB":
|
||||
filter = "/DCTDecode"
|
||||
colorspace = "/DeviceRGB"
|
||||
procset = "/ImageC" # color images
|
||||
elif im.mode == "CMYK":
|
||||
filter = "/DCTDecode"
|
||||
colorspace = "/DeviceCMYK"
|
||||
procset = "/ImageC" # color images
|
||||
else:
|
||||
raise ValueError("cannot save mode %s" % im.mode)
|
||||
|
||||
#
|
||||
# catalogue
|
||||
|
||||
xref.append(fp.tell())
|
||||
_obj(
|
||||
fp, 1,
|
||||
Type="/Catalog",
|
||||
Pages="2 0 R")
|
||||
_endobj(fp)
|
||||
existing_pdf.start_writing()
|
||||
existing_pdf.write_header()
|
||||
existing_pdf.write_comment("created by PIL PDF driver " + __version__)
|
||||
|
||||
#
|
||||
# pages
|
||||
|
@ -137,11 +86,12 @@ def _save(im, fp, filename, save_all=False):
|
|||
if save_all:
|
||||
append_images = im.encoderinfo.get("append_images", [])
|
||||
for append_im in append_images:
|
||||
if append_im.mode != im.mode:
|
||||
append_im = append_im.convert(im.mode)
|
||||
append_im.encoderinfo = im.encoderinfo.copy()
|
||||
ims.append(append_im)
|
||||
numberOfPages = 0
|
||||
image_refs = []
|
||||
page_refs = []
|
||||
contents_refs = []
|
||||
for im in ims:
|
||||
im_numberOfPages = 1
|
||||
if save_all:
|
||||
|
@ -151,26 +101,58 @@ def _save(im, fp, filename, save_all=False):
|
|||
# Image format does not have n_frames. It is a single frame image
|
||||
pass
|
||||
numberOfPages += im_numberOfPages
|
||||
pages = [str(pageNumber*3+4)+" 0 R"
|
||||
for pageNumber in range(0, numberOfPages)]
|
||||
for i in range(im_numberOfPages):
|
||||
image_refs.append(existing_pdf.next_object_id(0))
|
||||
page_refs.append(existing_pdf.next_object_id(0))
|
||||
contents_refs.append(existing_pdf.next_object_id(0))
|
||||
existing_pdf.pages.append(page_refs[-1])
|
||||
|
||||
xref.append(fp.tell())
|
||||
_obj(
|
||||
fp, 2,
|
||||
Type="/Pages",
|
||||
Count=len(pages),
|
||||
Kids="["+"\n".join(pages)+"]")
|
||||
_endobj(fp)
|
||||
#
|
||||
# catalog and list of pages
|
||||
existing_pdf.write_catalog()
|
||||
|
||||
pageNumber = 0
|
||||
for imSequence in ims:
|
||||
for im in ImageSequence.Iterator(imSequence):
|
||||
# FIXME: Should replace ASCIIHexDecode with RunLengthDecode (packbits)
|
||||
# or LZWDecode (tiff/lzw compression). Note that PDF 1.2 also supports
|
||||
# Flatedecode (zip compression).
|
||||
|
||||
bits = 8
|
||||
params = None
|
||||
|
||||
if im.mode == "1":
|
||||
filter = "ASCIIHexDecode"
|
||||
colorspace = PdfParser.PdfName("DeviceGray")
|
||||
procset = "ImageB" # grayscale
|
||||
bits = 1
|
||||
elif im.mode == "L":
|
||||
filter = "DCTDecode"
|
||||
# params = "<< /Predictor 15 /Columns %d >>" % (width-2)
|
||||
colorspace = PdfParser.PdfName("DeviceGray")
|
||||
procset = "ImageB" # grayscale
|
||||
elif im.mode == "P":
|
||||
filter = "ASCIIHexDecode"
|
||||
palette = im.im.getpalette("RGB")
|
||||
colorspace = [PdfParser.PdfName("Indexed"), PdfParser.PdfName("DeviceRGB"), 255, PdfParser.PdfBinary(palette)]
|
||||
procset = "ImageI" # indexed color
|
||||
elif im.mode == "RGB":
|
||||
filter = "DCTDecode"
|
||||
colorspace = PdfParser.PdfName("DeviceRGB")
|
||||
procset = "ImageC" # color images
|
||||
elif im.mode == "CMYK":
|
||||
filter = "DCTDecode"
|
||||
colorspace = PdfParser.PdfName("DeviceCMYK")
|
||||
procset = "ImageC" # color images
|
||||
else:
|
||||
raise ValueError("cannot save mode %s" % im.mode)
|
||||
|
||||
#
|
||||
# image
|
||||
|
||||
op = io.BytesIO()
|
||||
|
||||
if filter == "/ASCIIHexDecode":
|
||||
if filter == "ASCIIHexDecode":
|
||||
if bits == 1:
|
||||
# FIXME: the hex encoder doesn't support packed 1-bit
|
||||
# images; do things the hard way...
|
||||
|
@ -178,11 +160,11 @@ def _save(im, fp, filename, save_all=False):
|
|||
im = Image.new("L", (len(data), 1), None)
|
||||
im.putdata(data)
|
||||
ImageFile._save(im, op, [("hex", (0, 0)+im.size, 0, im.mode)])
|
||||
elif filter == "/DCTDecode":
|
||||
elif filter == "DCTDecode":
|
||||
Image.SAVE["JPEG"](im, op, filename)
|
||||
elif filter == "/FlateDecode":
|
||||
elif filter == "FlateDecode":
|
||||
ImageFile._save(im, op, [("zip", (0, 0)+im.size, 0, im.mode)])
|
||||
elif filter == "/RunLengthDecode":
|
||||
elif filter == "RunLengthDecode":
|
||||
ImageFile._save(im, op, [("packbits", (0, 0)+im.size, 0, im.mode)])
|
||||
else:
|
||||
raise ValueError("unsupported PDF filter (%s)" % filter)
|
||||
|
@ -192,73 +174,45 @@ def _save(im, fp, filename, save_all=False):
|
|||
|
||||
width, height = im.size
|
||||
|
||||
xref.append(fp.tell())
|
||||
_obj(
|
||||
fp, pageNumber*3+3,
|
||||
Type="/XObject",
|
||||
Subtype="/Image",
|
||||
existing_pdf.write_obj(image_refs[pageNumber], stream=op.getvalue(),
|
||||
Type=PdfParser.PdfName("XObject"),
|
||||
Subtype=PdfParser.PdfName("Image"),
|
||||
Width=width, # * 72.0 / resolution,
|
||||
Height=height, # * 72.0 / resolution,
|
||||
Length=len(op.getvalue()),
|
||||
Filter=filter,
|
||||
Filter=PdfParser.PdfName(filter),
|
||||
BitsPerComponent=bits,
|
||||
DecodeParams=params,
|
||||
ColorSpace=colorspace)
|
||||
|
||||
fp.write("stream\n")
|
||||
fp.fp.write(op.getvalue())
|
||||
fp.write("\nendstream\n")
|
||||
|
||||
_endobj(fp)
|
||||
|
||||
#
|
||||
# page
|
||||
|
||||
xref.append(fp.tell())
|
||||
_obj(fp, pageNumber*3+4)
|
||||
fp.write(
|
||||
"<<\n/Type /Page\n/Parent 2 0 R\n"
|
||||
"/Resources <<\n/ProcSet [ /PDF %s ]\n"
|
||||
"/XObject << /image %d 0 R >>\n>>\n"
|
||||
"/MediaBox [ 0 0 %d %d ]\n/Contents %d 0 R\n>>\n" % (
|
||||
procset,
|
||||
pageNumber*3+3,
|
||||
int(width * 72.0 / resolution),
|
||||
int(height * 72.0 / resolution),
|
||||
pageNumber*3+5))
|
||||
_endobj(fp)
|
||||
existing_pdf.write_page(page_refs[pageNumber],
|
||||
Resources=PdfParser.PdfDict(
|
||||
ProcSet=[PdfParser.PdfName("PDF"), PdfParser.PdfName(procset)],
|
||||
XObject=PdfParser.PdfDict(image=image_refs[pageNumber])),
|
||||
MediaBox=[0, 0, int(width * 72.0 / resolution), int(height * 72.0 / resolution)],
|
||||
Contents=contents_refs[pageNumber]
|
||||
)
|
||||
|
||||
#
|
||||
# page contents
|
||||
|
||||
op = TextWriter(io.BytesIO())
|
||||
|
||||
op.write(
|
||||
page_contents = PdfParser.make_bytes(
|
||||
"q %d 0 0 %d 0 0 cm /image Do Q\n" % (
|
||||
int(width * 72.0 / resolution),
|
||||
int(height * 72.0 / resolution)))
|
||||
|
||||
xref.append(fp.tell())
|
||||
_obj(fp, pageNumber*3+5, Length=len(op.fp.getvalue()))
|
||||
|
||||
fp.write("stream\n")
|
||||
fp.fp.write(op.fp.getvalue())
|
||||
fp.write("\nendstream\n")
|
||||
|
||||
_endobj(fp)
|
||||
existing_pdf.write_obj(contents_refs[pageNumber], stream=page_contents)
|
||||
|
||||
pageNumber += 1
|
||||
|
||||
#
|
||||
# trailer
|
||||
startxref = fp.tell()
|
||||
fp.write("xref\n0 %d\n0000000000 65535 f \n" % len(xref))
|
||||
for x in xref[1:]:
|
||||
fp.write("%010d 00000 n \n" % x)
|
||||
fp.write("trailer\n<<\n/Size %d\n/Root 1 0 R\n>>\n" % len(xref))
|
||||
fp.write("startxref\n%d\n%%%%EOF\n" % startxref)
|
||||
existing_pdf.write_xref_and_trailer()
|
||||
if hasattr(fp, "flush"):
|
||||
fp.flush()
|
||||
existing_pdf.close()
|
||||
|
||||
#
|
||||
# --------------------------------------------------------------------
|
||||
|
|
846
src/PIL/PdfParser.py
Normal file
846
src/PIL/PdfParser.py
Normal file
|
@ -0,0 +1,846 @@
|
|||
import codecs
|
||||
import collections
|
||||
import mmap
|
||||
import os
|
||||
import re
|
||||
import zlib
|
||||
|
||||
try:
|
||||
from UserDict import UserDict # Python 2.x
|
||||
except ImportError:
|
||||
UserDict = collections.UserDict # Python 3.x
|
||||
|
||||
|
||||
if str == bytes: # Python 2.x
|
||||
def make_bytes(s): # pragma: no cover
|
||||
return s # pragma: no cover
|
||||
else: # Python 3.x
|
||||
def make_bytes(s):
|
||||
return s.encode("us-ascii")
|
||||
|
||||
|
||||
# see 7.9.2.2 Text String Type on page 86 and D.3 PDFDocEncoding Character Set on page 656
|
||||
def encode_text(s):
|
||||
return codecs.BOM_UTF16_BE + s.encode("utf_16_be")
|
||||
|
||||
|
||||
PDFDocEncoding = {
|
||||
0x16: u"\u0017",
|
||||
0x18: u"\u02D8",
|
||||
0x19: u"\u02C7",
|
||||
0x1A: u"\u02C6",
|
||||
0x1B: u"\u02D9",
|
||||
0x1C: u"\u02DD",
|
||||
0x1D: u"\u02DB",
|
||||
0x1E: u"\u02DA",
|
||||
0x1F: u"\u02DC",
|
||||
0x80: u"\u2022",
|
||||
0x81: u"\u2020",
|
||||
0x82: u"\u2021",
|
||||
0x83: u"\u2026",
|
||||
0x84: u"\u2014",
|
||||
0x85: u"\u2013",
|
||||
0x86: u"\u0192",
|
||||
0x87: u"\u2044",
|
||||
0x88: u"\u2039",
|
||||
0x89: u"\u203A",
|
||||
0x8A: u"\u2212",
|
||||
0x8B: u"\u2030",
|
||||
0x8C: u"\u201E",
|
||||
0x8D: u"\u201C",
|
||||
0x8E: u"\u201D",
|
||||
0x8F: u"\u2018",
|
||||
0x90: u"\u2019",
|
||||
0x91: u"\u201A",
|
||||
0x92: u"\u2122",
|
||||
0x93: u"\uFB01",
|
||||
0x94: u"\uFB02",
|
||||
0x95: u"\u0141",
|
||||
0x96: u"\u0152",
|
||||
0x97: u"\u0160",
|
||||
0x98: u"\u0178",
|
||||
0x99: u"\u017D",
|
||||
0x9A: u"\u0131",
|
||||
0x9B: u"\u0142",
|
||||
0x9C: u"\u0153",
|
||||
0x9D: u"\u0161",
|
||||
0x9E: u"\u017E",
|
||||
0xA0: u"\u20AC",
|
||||
}
|
||||
|
||||
|
||||
def decode_text(b):
|
||||
if b[:len(codecs.BOM_UTF16_BE)] == codecs.BOM_UTF16_BE:
|
||||
return b[len(codecs.BOM_UTF16_BE):].decode("utf_16_be")
|
||||
elif str == bytes: # Python 2.x
|
||||
return u"".join(PDFDocEncoding.get(ord(byte), byte) for byte in b)
|
||||
else:
|
||||
return "".join(PDFDocEncoding.get(byte, chr(byte)) for byte in b)
|
||||
|
||||
|
||||
class PdfFormatError(RuntimeError):
|
||||
"""An error that probably indicates a syntactic or semantic error in the PDF file structure"""
|
||||
pass
|
||||
|
||||
|
||||
def check_format_condition(condition, error_message):
|
||||
if not condition:
|
||||
raise PdfFormatError(error_message)
|
||||
|
||||
|
||||
class IndirectReference(collections.namedtuple("IndirectReferenceTuple", ["object_id", "generation"])):
|
||||
def __str__(self):
|
||||
return "%s %s R" % self
|
||||
|
||||
def __bytes__(self):
|
||||
return self.__str__().encode("us-ascii")
|
||||
|
||||
def __eq__(self, other):
|
||||
return other.__class__ is self.__class__ and other.object_id == self.object_id and other.generation == self.generation
|
||||
|
||||
def __ne__(self, other):
|
||||
return not (self == other)
|
||||
|
||||
def __hash__(self):
|
||||
return hash((self.object_id, self.generation))
|
||||
|
||||
|
||||
class IndirectObjectDef(IndirectReference):
|
||||
def __str__(self):
|
||||
return "%s %s obj" % self
|
||||
|
||||
|
||||
class XrefTable:
|
||||
def __init__(self):
|
||||
self.existing_entries = {} # object ID => (offset, generation)
|
||||
self.new_entries = {} # object ID => (offset, generation)
|
||||
self.deleted_entries = {0: 65536} # object ID => generation
|
||||
self.reading_finished = False
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
if self.reading_finished:
|
||||
self.new_entries[key] = value
|
||||
else:
|
||||
self.existing_entries[key] = value
|
||||
if key in self.deleted_entries:
|
||||
del self.deleted_entries[key]
|
||||
|
||||
def __getitem__(self, key):
|
||||
try:
|
||||
return self.new_entries[key]
|
||||
except KeyError:
|
||||
return self.existing_entries[key]
|
||||
|
||||
def __delitem__(self, key):
|
||||
if key in self.new_entries:
|
||||
generation = self.new_entries[key][1] + 1
|
||||
del self.new_entries[key]
|
||||
self.deleted_entries[key] = generation
|
||||
elif key in self.existing_entries:
|
||||
generation = self.existing_entries[key][1] + 1
|
||||
self.deleted_entries[key] = generation
|
||||
elif key in self.deleted_entries:
|
||||
generation = self.deleted_entries[key]
|
||||
else:
|
||||
raise IndexError("object ID " + str(key) + " cannot be deleted because it doesn't exist")
|
||||
|
||||
def __contains__(self, key):
|
||||
return key in self.existing_entries or key in self.new_entries
|
||||
|
||||
def __len__(self):
|
||||
return len(set(self.existing_entries.keys()) | set(self.new_entries.keys()) | set(self.deleted_entries.keys()))
|
||||
|
||||
def keys(self):
|
||||
return (set(self.existing_entries.keys()) - set(self.deleted_entries.keys())) | set(self.new_entries.keys())
|
||||
|
||||
def write(self, f):
|
||||
keys = sorted(set(self.new_entries.keys()) | set(self.deleted_entries.keys()))
|
||||
deleted_keys = sorted(set(self.deleted_entries.keys()))
|
||||
startxref = f.tell()
|
||||
f.write(b"xref\n")
|
||||
while keys:
|
||||
# find a contiguous sequence of object IDs
|
||||
prev = None
|
||||
for index, key in enumerate(keys):
|
||||
if prev is None or prev+1 == key:
|
||||
prev = key
|
||||
else:
|
||||
contiguous_keys = keys[:index]
|
||||
keys = keys[index:]
|
||||
break
|
||||
else:
|
||||
contiguous_keys = keys
|
||||
keys = None
|
||||
f.write(make_bytes("%d %d\n" % (contiguous_keys[0], len(contiguous_keys))))
|
||||
for object_id in contiguous_keys:
|
||||
if object_id in self.new_entries:
|
||||
f.write(make_bytes("%010d %05d n \n" % self.new_entries[object_id]))
|
||||
else:
|
||||
this_deleted_object_id = deleted_keys.pop(0)
|
||||
check_format_condition(object_id == this_deleted_object_id, "expected the next deleted object ID to be %s, instead found %s" % (object_id, this_deleted_object_id))
|
||||
try:
|
||||
next_in_linked_list = deleted_keys[0]
|
||||
except IndexError:
|
||||
next_in_linked_list = 0
|
||||
f.write(make_bytes("%010d %05d f \n" % (next_in_linked_list, self.deleted_entries[object_id])))
|
||||
return startxref
|
||||
|
||||
|
||||
class PdfName:
|
||||
def __init__(self, name):
|
||||
if isinstance(name, PdfName):
|
||||
self.name = name.name
|
||||
elif isinstance(name, bytes):
|
||||
self.name = name
|
||||
else:
|
||||
self.name = name.encode("us-ascii")
|
||||
|
||||
def name_as_str(self):
|
||||
return self.name.decode("us-ascii")
|
||||
|
||||
def __eq__(self, other):
|
||||
return (isinstance(other, PdfName) and other.name == self.name) or other == self.name
|
||||
|
||||
def __hash__(self):
|
||||
return hash(self.name)
|
||||
|
||||
def __repr__(self):
|
||||
return "PdfName(%s)" % repr(self.name)
|
||||
|
||||
@classmethod
|
||||
def from_pdf_stream(klass, data):
|
||||
return klass(PdfParser.interpret_name(data))
|
||||
|
||||
allowed_chars = set(range(33,127)) - set(ord(c) for c in "#%/()<>[]{}")
|
||||
|
||||
def __bytes__(self):
|
||||
if str == bytes: # Python 2.x
|
||||
result = bytearray(b"/")
|
||||
for b in self.name:
|
||||
if ord(b) in self.allowed_chars:
|
||||
result.append(b)
|
||||
else:
|
||||
result.extend(b"#%02X" % ord(b))
|
||||
else: # Python 3.x
|
||||
result = bytearray(b"/")
|
||||
for b in self.name:
|
||||
if b in self.allowed_chars:
|
||||
result.append(b)
|
||||
else:
|
||||
result.extend(make_bytes("#%02X" % b))
|
||||
return bytes(result)
|
||||
|
||||
__str__ = __bytes__
|
||||
|
||||
|
||||
class PdfArray(list):
|
||||
def __bytes__(self):
|
||||
return b"[ " + b" ".join(pdf_repr(x) for x in self) + b" ]"
|
||||
|
||||
__str__ = __bytes__
|
||||
|
||||
|
||||
class PdfDict(UserDict):
|
||||
def __init__(self, *args, **kwargs):
|
||||
UserDict.__init__(self, *args, **kwargs)
|
||||
|
||||
def __setattr__(self, key, value):
|
||||
if key == "data":
|
||||
if hasattr(UserDict, "__setattr__"):
|
||||
UserDict.__setattr__(self, key, value)
|
||||
else:
|
||||
self.__dict__[key] = value
|
||||
else:
|
||||
if isinstance(key, str):
|
||||
key = key.encode("us-ascii")
|
||||
self[key] = value
|
||||
|
||||
def __getattr__(self, key):
|
||||
try:
|
||||
value = self[key]
|
||||
except KeyError:
|
||||
try:
|
||||
value = self[key.encode("us-ascii")]
|
||||
except KeyError:
|
||||
raise AttributeError(key)
|
||||
if isinstance(value, bytes):
|
||||
return decode_text(value)
|
||||
else:
|
||||
return value
|
||||
|
||||
def __bytes__(self):
|
||||
out = bytearray(b"<<")
|
||||
for key, value in self.items():
|
||||
if value is None:
|
||||
continue
|
||||
value = pdf_repr(value)
|
||||
out.extend(b"\n")
|
||||
out.extend(bytes(PdfName(key)))
|
||||
out.extend(b" ")
|
||||
out.extend(value)
|
||||
out.extend(b"\n>>")
|
||||
return bytes(out)
|
||||
|
||||
if str == bytes:
|
||||
__str__ = __bytes__
|
||||
|
||||
|
||||
class PdfBinary:
|
||||
def __init__(self, data):
|
||||
self.data = data
|
||||
|
||||
if str == bytes: # Python 2.x
|
||||
def __str__(self):
|
||||
return "<%s>" % "".join("%02X" % ord(b) for b in self.data)
|
||||
|
||||
else: # Python 3.x
|
||||
def __bytes__(self):
|
||||
return make_bytes("<%s>" % "".join("%02X" % b for b in self.data))
|
||||
|
||||
|
||||
class PdfStream:
|
||||
def __init__(self, dictionary, buf):
|
||||
self.dictionary = dictionary
|
||||
self.buf = buf
|
||||
|
||||
def decode(self):
|
||||
try:
|
||||
filter = self.dictionary.Filter
|
||||
except AttributeError:
|
||||
return self.buf
|
||||
if filter == b"FlateDecode":
|
||||
try:
|
||||
expected_length = self.dictionary.DL
|
||||
except AttributeError:
|
||||
expected_length = self.dictionary.Length
|
||||
return zlib.decompress(self.buf, bufsize=int(expected_length))
|
||||
else:
|
||||
raise NotImplementedError("stream filter %s unknown/unsupported" % repr(self.dictionary.Filter))
|
||||
|
||||
|
||||
def pdf_repr(x):
|
||||
if x is True:
|
||||
return b"true"
|
||||
elif x is False:
|
||||
return b"false"
|
||||
elif x is None:
|
||||
return b"null"
|
||||
elif isinstance(x, PdfName) or isinstance(x, PdfDict) or isinstance(x, PdfArray) or isinstance(x, PdfBinary):
|
||||
return bytes(x)
|
||||
elif isinstance(x, int):
|
||||
return str(x).encode("us-ascii")
|
||||
elif isinstance(x, dict):
|
||||
return bytes(PdfDict(x))
|
||||
elif isinstance(x, list):
|
||||
return bytes(PdfArray(x))
|
||||
elif (str == bytes and isinstance(x, unicode)) or (str != bytes and isinstance(x, str)):
|
||||
return pdf_repr(encode_text(x))
|
||||
elif isinstance(x, bytes):
|
||||
return b"(" + x.replace(b"\\", b"\\\\").replace(b"(", b"\\(").replace(b")", b"\\)") + b")" # XXX escape more chars? handle binary garbage
|
||||
else:
|
||||
return bytes(x)
|
||||
|
||||
|
||||
class PdfParser:
|
||||
"""Based on http://www.adobe.com/content/dam/acom/en/devnet/acrobat/pdfs/PDF32000_2008.pdf
|
||||
Supports PDF up to 1.4
|
||||
"""
|
||||
|
||||
def __init__(self, filename=None, f=None, buf=None, start_offset=0, mode="rb"):
|
||||
# type: (PdfParser, str, file, Union[bytes, bytearray], int, str) -> None
|
||||
if buf and f:
|
||||
raise RuntimeError("specify buf or f or filename, but not both buf and f")
|
||||
self.filename = filename
|
||||
self.buf = buf
|
||||
self.f = f
|
||||
self.start_offset = start_offset
|
||||
self.should_close_buf = False
|
||||
self.should_close_file = False
|
||||
if filename is not None and f is None:
|
||||
self.f = f = open(filename, mode)
|
||||
self.should_close_file = True
|
||||
if f is not None:
|
||||
self.buf = buf = self.get_buf_from_file(f)
|
||||
self.should_close_buf = True
|
||||
if not filename and hasattr(f, "name"):
|
||||
self.filename = f.name
|
||||
self.cached_objects = {}
|
||||
if buf:
|
||||
self.read_pdf_info()
|
||||
else:
|
||||
self.file_size_total = self.file_size_this = 0
|
||||
self.root = PdfDict()
|
||||
self.root_ref = None
|
||||
self.info = PdfDict()
|
||||
self.info_ref = None
|
||||
self.page_tree_root = {}
|
||||
self.pages = []
|
||||
self.orig_pages = []
|
||||
self.pages_ref = None
|
||||
self.last_xref_section_offset = None
|
||||
self.trailer_dict = {}
|
||||
self.xref_table = XrefTable()
|
||||
self.xref_table.reading_finished = True
|
||||
if f:
|
||||
self.seek_end()
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.close()
|
||||
return False # do not suppress exceptions
|
||||
|
||||
def start_writing(self):
|
||||
self.close_buf()
|
||||
self.seek_end()
|
||||
|
||||
def close_buf(self):
|
||||
try:
|
||||
self.buf.close()
|
||||
except AttributeError:
|
||||
pass
|
||||
self.buf = None
|
||||
|
||||
def close(self):
|
||||
if self.should_close_buf:
|
||||
self.close_buf()
|
||||
if self.f is not None and self.should_close_file:
|
||||
self.f.close()
|
||||
self.f = None
|
||||
|
||||
def seek_end(self):
|
||||
self.f.seek(0, os.SEEK_END)
|
||||
|
||||
def write_header(self):
|
||||
self.f.write(b"%PDF-1.4\n")
|
||||
|
||||
def write_comment(self, s):
|
||||
self.f.write(("%% %s\n" % (s,)).encode("utf-8"))
|
||||
|
||||
def write_catalog(self):
|
||||
self.del_root()
|
||||
self.root_ref = self.next_object_id(self.f.tell())
|
||||
self.pages_ref = self.next_object_id(0)
|
||||
self.rewrite_pages()
|
||||
self.write_obj(self.root_ref,
|
||||
Type=PdfName(b"Catalog"),
|
||||
Pages=self.pages_ref)
|
||||
self.write_obj(self.pages_ref,
|
||||
Type=PdfName(b"Pages"),
|
||||
Count=len(self.pages),
|
||||
Kids=self.pages)
|
||||
return self.root_ref
|
||||
|
||||
def rewrite_pages(self):
|
||||
pages_tree_nodes_to_delete = []
|
||||
for i, page_ref in enumerate(self.orig_pages):
|
||||
page_info = self.cached_objects[page_ref]
|
||||
del self.xref_table[page_ref.object_id]
|
||||
pages_tree_nodes_to_delete.append(page_info[PdfName(b"Parent")])
|
||||
if page_ref not in self.pages:
|
||||
# the page has been deleted
|
||||
continue
|
||||
# make dict keys into strings for passing to write_page
|
||||
stringified_page_info = {}
|
||||
for key, value in page_info.items():
|
||||
# key should be a PdfName
|
||||
stringified_page_info[key.name_as_str()] = value
|
||||
stringified_page_info["Parent"] = self.pages_ref
|
||||
new_page_ref = self.write_page(None, **stringified_page_info)
|
||||
for j, cur_page_ref in enumerate(self.pages):
|
||||
if cur_page_ref == page_ref:
|
||||
# replace the page reference with the new one
|
||||
self.pages[j] = new_page_ref
|
||||
# delete redundant Pages tree nodes from xref table
|
||||
for pages_tree_node_ref in pages_tree_nodes_to_delete:
|
||||
while pages_tree_node_ref:
|
||||
pages_tree_node = self.cached_objects[pages_tree_node_ref]
|
||||
if pages_tree_node_ref.object_id in self.xref_table:
|
||||
del self.xref_table[pages_tree_node_ref.object_id]
|
||||
pages_tree_node_ref = pages_tree_node.get(b"Parent", None)
|
||||
self.orig_pages = []
|
||||
|
||||
def write_xref_and_trailer(self, new_root_ref=None):
|
||||
if new_root_ref:
|
||||
self.del_root()
|
||||
self.root_ref = new_root_ref
|
||||
if self.info:
|
||||
self.info_ref = self.write_obj(None, self.info)
|
||||
start_xref = self.xref_table.write(self.f)
|
||||
num_entries = len(self.xref_table)
|
||||
trailer_dict = {b"Root": self.root_ref, b"Size": num_entries}
|
||||
if self.last_xref_section_offset is not None:
|
||||
trailer_dict[b"Prev"] = self.last_xref_section_offset
|
||||
if self.info:
|
||||
trailer_dict[b"Info"] = self.info_ref
|
||||
self.last_xref_section_offset = start_xref
|
||||
self.f.write(b"trailer\n" + bytes(PdfDict(trailer_dict)) + make_bytes("\nstartxref\n%d\n%%%%EOF" % start_xref))
|
||||
|
||||
def write_page(self, ref, *objs, **dict_obj):
|
||||
if isinstance(ref, int):
|
||||
ref = self.pages[ref]
|
||||
if "Type" not in dict_obj:
|
||||
dict_obj["Type"] = PdfName(b"Page")
|
||||
if "Parent" not in dict_obj:
|
||||
dict_obj["Parent"] = self.pages_ref
|
||||
return self.write_obj(ref, *objs, **dict_obj)
|
||||
|
||||
def write_obj(self, ref, *objs, **dict_obj):
|
||||
f = self.f
|
||||
if ref is None:
|
||||
ref = self.next_object_id(f.tell())
|
||||
else:
|
||||
self.xref_table[ref.object_id] = (f.tell(), ref.generation)
|
||||
f.write(bytes(IndirectObjectDef(*ref)))
|
||||
stream = dict_obj.pop("stream", None)
|
||||
if stream is not None:
|
||||
dict_obj["Length"] = len(stream)
|
||||
if dict_obj:
|
||||
f.write(pdf_repr(dict_obj))
|
||||
for obj in objs:
|
||||
f.write(pdf_repr(obj))
|
||||
if stream is not None:
|
||||
f.write(b"stream\n")
|
||||
f.write(stream)
|
||||
f.write(b"\nendstream\n")
|
||||
f.write(b"endobj\n")
|
||||
return ref
|
||||
|
||||
def del_root(self):
|
||||
if self.root_ref is None:
|
||||
return
|
||||
del self.xref_table[self.root_ref.object_id]
|
||||
del self.xref_table[self.root[b"Pages"].object_id]
|
||||
|
||||
@staticmethod
|
||||
def get_buf_from_file(f):
|
||||
if hasattr(f, "getbuffer"):
|
||||
return f.getbuffer()
|
||||
elif hasattr(f, "getvalue"):
|
||||
return f.getvalue()
|
||||
else:
|
||||
try:
|
||||
return mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
|
||||
except ValueError: # cannot mmap an empty file
|
||||
return b""
|
||||
|
||||
def read_pdf_info(self):
|
||||
self.file_size_total = len(self.buf)
|
||||
self.file_size_this = self.file_size_total - self.start_offset
|
||||
self.read_trailer()
|
||||
self.root_ref = self.trailer_dict[b"Root"]
|
||||
self.info_ref = self.trailer_dict.get(b"Info", None)
|
||||
self.root = PdfDict(self.read_indirect(self.root_ref))
|
||||
if self.info_ref is None:
|
||||
self.info = PdfDict()
|
||||
else:
|
||||
self.info = PdfDict(self.read_indirect(self.info_ref))
|
||||
check_format_condition(b"Type" in self.root, "/Type missing in Root")
|
||||
check_format_condition(self.root[b"Type"] == b"Catalog", "/Type in Root is not /Catalog")
|
||||
check_format_condition(b"Pages" in self.root, "/Pages missing in Root")
|
||||
check_format_condition(isinstance(self.root[b"Pages"], IndirectReference), "/Pages in Root is not an indirect reference")
|
||||
self.pages_ref = self.root[b"Pages"]
|
||||
self.page_tree_root = self.read_indirect(self.pages_ref)
|
||||
self.pages = self.linearize_page_tree(self.page_tree_root)
|
||||
# save the original list of page references in case the user modifies, adds or deletes some pages and we need to rewrite the pages and their list
|
||||
self.orig_pages = self.pages[:]
|
||||
|
||||
def next_object_id(self, offset=None):
|
||||
try:
|
||||
# TODO: support reuse of deleted objects
|
||||
reference = IndirectReference(max(self.xref_table.keys()) + 1, 0)
|
||||
except ValueError:
|
||||
reference = IndirectReference(1, 0)
|
||||
if offset is not None:
|
||||
self.xref_table[reference.object_id] = (offset, 0)
|
||||
return reference
|
||||
|
||||
delimiter = br"[][()<>{}/%]"
|
||||
delimiter_or_ws = br"[][()<>{}/%\000\011\012\014\015\040]"
|
||||
whitespace = br"[\000\011\012\014\015\040]"
|
||||
whitespace_or_hex = br"[\000\011\012\014\015\0400-9a-fA-F]"
|
||||
whitespace_optional = whitespace + b"*"
|
||||
whitespace_mandatory = whitespace + b"+"
|
||||
newline_only = br"[\r\n]+"
|
||||
newline = whitespace_optional + newline_only + whitespace_optional
|
||||
re_trailer_end = re.compile(whitespace_mandatory + br"trailer" + whitespace_optional + br"\<\<(.*\>\>)" + newline
|
||||
+ br"startxref" + newline + br"([0-9]+)" + newline + br"%%EOF" + whitespace_optional + br"$", re.DOTALL)
|
||||
re_trailer_prev = re.compile(whitespace_optional + br"trailer" + whitespace_optional + br"\<\<(.*?\>\>)" + newline
|
||||
+ br"startxref" + newline + br"([0-9]+)" + newline + br"%%EOF" + whitespace_optional, re.DOTALL)
|
||||
|
||||
def read_trailer(self):
|
||||
search_start_offset = len(self.buf) - 16384
|
||||
if search_start_offset < self.start_offset:
|
||||
search_start_offset = self.start_offset
|
||||
m = self.re_trailer_end.search(self.buf, search_start_offset)
|
||||
check_format_condition(m, "trailer end not found")
|
||||
# make sure we found the LAST trailer
|
||||
last_match = m
|
||||
while m:
|
||||
last_match = m
|
||||
m = self.re_trailer_end.search(self.buf, m.start()+16)
|
||||
if not m:
|
||||
m = last_match
|
||||
trailer_data = m.group(1)
|
||||
self.last_xref_section_offset = int(m.group(2))
|
||||
self.trailer_dict = self.interpret_trailer(trailer_data)
|
||||
self.xref_table = XrefTable()
|
||||
self.read_xref_table(xref_section_offset=self.last_xref_section_offset)
|
||||
if b"Prev" in self.trailer_dict:
|
||||
self.read_prev_trailer(self.trailer_dict[b"Prev"])
|
||||
|
||||
def read_prev_trailer(self, xref_section_offset):
|
||||
trailer_offset = self.read_xref_table(xref_section_offset=xref_section_offset)
|
||||
m = self.re_trailer_prev.search(self.buf[trailer_offset:trailer_offset+16384])
|
||||
check_format_condition(m, "previous trailer not found")
|
||||
trailer_data = m.group(1)
|
||||
check_format_condition(int(m.group(2)) == xref_section_offset, "xref section offset in previous trailer doesn't match what was expected")
|
||||
trailer_dict = self.interpret_trailer(trailer_data)
|
||||
if b"Prev" in trailer_dict:
|
||||
self.read_prev_trailer(trailer_dict[b"Prev"])
|
||||
|
||||
re_whitespace_optional = re.compile(whitespace_optional)
|
||||
re_name = re.compile(whitespace_optional + br"/([!-$&'*-.0-;=?-Z\\^-z|~]+)(?=" + delimiter_or_ws + br")")
|
||||
re_dict_start = re.compile(whitespace_optional + br"\<\<")
|
||||
re_dict_end = re.compile(whitespace_optional + br"\>\>" + whitespace_optional)
|
||||
|
||||
@classmethod
|
||||
def interpret_trailer(klass, trailer_data):
|
||||
trailer = {}
|
||||
offset = 0
|
||||
while True:
|
||||
m = klass.re_name.match(trailer_data, offset)
|
||||
if not m:
|
||||
m = klass.re_dict_end.match(trailer_data, offset)
|
||||
check_format_condition(m and m.end() == len(trailer_data), "name not found in trailer, remaining data: " + repr(trailer_data[offset:]))
|
||||
break
|
||||
key = klass.interpret_name(m.group(1))
|
||||
value, offset = klass.get_value(trailer_data, m.end())
|
||||
trailer[key] = value
|
||||
check_format_condition(b"Size" in trailer and isinstance(trailer[b"Size"], int), "/Size not in trailer or not an integer")
|
||||
check_format_condition(b"Root" in trailer and isinstance(trailer[b"Root"], IndirectReference), "/Root not in trailer or not an indirect reference")
|
||||
return trailer
|
||||
|
||||
re_hashes_in_name = re.compile(br"([^#]*)(#([0-9a-fA-F]{2}))?")
|
||||
|
||||
@classmethod
|
||||
def interpret_name(klass, raw, as_text=False):
|
||||
name = b""
|
||||
for m in klass.re_hashes_in_name.finditer(raw):
|
||||
if m.group(3):
|
||||
name += m.group(1) + bytearray.fromhex(m.group(3).decode("us-ascii"))
|
||||
else:
|
||||
name += m.group(1)
|
||||
if as_text:
|
||||
return name.decode("utf-8")
|
||||
else:
|
||||
return bytes(name)
|
||||
|
||||
re_null = re.compile(whitespace_optional + br"null(?=" + delimiter_or_ws + br")")
|
||||
re_true = re.compile(whitespace_optional + br"true(?=" + delimiter_or_ws + br")")
|
||||
re_false = re.compile(whitespace_optional + br"false(?=" + delimiter_or_ws + br")")
|
||||
re_int = re.compile(whitespace_optional + br"([-+]?[0-9]+)(?=" + delimiter_or_ws + br")")
|
||||
re_real = re.compile(whitespace_optional + br"([-+]?([0-9]+\.[0-9]*|[0-9]*\.[0-9]+))(?=" + delimiter_or_ws + br")")
|
||||
re_array_start = re.compile(whitespace_optional + br"\[")
|
||||
re_array_end = re.compile(whitespace_optional + br"]")
|
||||
re_string_hex = re.compile(whitespace_optional + br"\<(" + whitespace_or_hex + br"*)\>")
|
||||
re_string_lit = re.compile(whitespace_optional + br"\(")
|
||||
re_indirect_reference = re.compile(whitespace_optional + br"([-+]?[0-9]+)" + whitespace_mandatory + br"([-+]?[0-9]+)" + whitespace_mandatory + br"R(?=" + delimiter_or_ws + br")")
|
||||
re_indirect_def_start = re.compile(whitespace_optional + br"([-+]?[0-9]+)" + whitespace_mandatory + br"([-+]?[0-9]+)" + whitespace_mandatory + br"obj(?=" + delimiter_or_ws + br")")
|
||||
re_indirect_def_end = re.compile(whitespace_optional + br"endobj(?=" + delimiter_or_ws + br")")
|
||||
re_comment = re.compile(br"(" + whitespace_optional + br"%[^\r\n]*" + newline + br")*")
|
||||
re_stream_start = re.compile(whitespace_optional + br"stream\r?\n")
|
||||
re_stream_end = re.compile(whitespace_optional + br"endstream(?=" + delimiter_or_ws + br")")
|
||||
|
||||
@classmethod
|
||||
def get_value(klass, data, offset, expect_indirect=None, max_nesting=-1):
|
||||
if max_nesting == 0:
|
||||
return None, None
|
||||
m = klass.re_comment.match(data, offset)
|
||||
if m:
|
||||
offset = m.end()
|
||||
m = klass.re_indirect_def_start.match(data, offset)
|
||||
if m:
|
||||
check_format_condition(int(m.group(1)) > 0, "indirect object definition: object ID must be greater than 0")
|
||||
check_format_condition(int(m.group(2)) >= 0, "indirect object definition: generation must be non-negative")
|
||||
check_format_condition(expect_indirect is None or expect_indirect == IndirectReference(int(m.group(1)), int(m.group(2))),
|
||||
"indirect object definition different than expected")
|
||||
object, offset = klass.get_value(data, m.end(), max_nesting=max_nesting-1)
|
||||
if offset is None:
|
||||
return object, None
|
||||
m = klass.re_indirect_def_end.match(data, offset)
|
||||
check_format_condition(m, "indirect object definition end not found")
|
||||
return object, m.end()
|
||||
check_format_condition(not expect_indirect, "indirect object definition not found")
|
||||
m = klass.re_indirect_reference.match(data, offset)
|
||||
if m:
|
||||
check_format_condition(int(m.group(1)) > 0, "indirect object reference: object ID must be greater than 0")
|
||||
check_format_condition(int(m.group(2)) >= 0, "indirect object reference: generation must be non-negative")
|
||||
return IndirectReference(int(m.group(1)), int(m.group(2))), m.end()
|
||||
m = klass.re_dict_start.match(data, offset)
|
||||
if m:
|
||||
offset = m.end()
|
||||
result = {}
|
||||
m = klass.re_dict_end.match(data, offset)
|
||||
while not m:
|
||||
key, offset = klass.get_value(data, offset, max_nesting=max_nesting-1)
|
||||
if offset is None:
|
||||
return result, None
|
||||
value, offset = klass.get_value(data, offset, max_nesting=max_nesting-1)
|
||||
result[key] = value
|
||||
if offset is None:
|
||||
return result, None
|
||||
m = klass.re_dict_end.match(data, offset)
|
||||
offset = m.end()
|
||||
m = klass.re_stream_start.match(data, offset)
|
||||
if m:
|
||||
try:
|
||||
stream_len = int(result[b"Length"])
|
||||
except (TypeError, KeyError, ValueError):
|
||||
raise PdfFormatError("bad or missing Length in stream dict (%r)" % result.get(b"Length", None))
|
||||
stream_data = data[m.end():m.end() + stream_len]
|
||||
m = klass.re_stream_end.match(data, m.end() + stream_len)
|
||||
check_format_condition(m, "stream end not found")
|
||||
offset = m.end()
|
||||
result = PdfStream(PdfDict(result), stream_data)
|
||||
else:
|
||||
result = PdfDict(result)
|
||||
return result, offset
|
||||
m = klass.re_array_start.match(data, offset)
|
||||
if m:
|
||||
offset = m.end()
|
||||
result = []
|
||||
m = klass.re_array_end.match(data, offset)
|
||||
while not m:
|
||||
value, offset = klass.get_value(data, offset, max_nesting=max_nesting-1)
|
||||
result.append(value)
|
||||
if offset is None:
|
||||
return result, None
|
||||
m = klass.re_array_end.match(data, offset)
|
||||
return result, m.end()
|
||||
m = klass.re_null.match(data, offset)
|
||||
if m:
|
||||
return None, m.end()
|
||||
m = klass.re_true.match(data, offset)
|
||||
if m:
|
||||
return True, m.end()
|
||||
m = klass.re_false.match(data, offset)
|
||||
if m:
|
||||
return False, m.end()
|
||||
m = klass.re_name.match(data, offset)
|
||||
if m:
|
||||
return PdfName(klass.interpret_name(m.group(1))), m.end()
|
||||
m = klass.re_int.match(data, offset)
|
||||
if m:
|
||||
return int(m.group(1)), m.end()
|
||||
m = klass.re_real.match(data, offset)
|
||||
if m:
|
||||
return float(m.group(1)), m.end() # XXX Decimal instead of float???
|
||||
m = klass.re_string_hex.match(data, offset)
|
||||
if m:
|
||||
hex_string = bytearray([b for b in m.group(1) if b in b"0123456789abcdefABCDEF"]) # filter out whitespace
|
||||
if len(hex_string) % 2 == 1:
|
||||
hex_string.append(ord(b"0")) # append a 0 if the length is not even - yes, at the end
|
||||
return bytearray.fromhex(hex_string.decode("us-ascii")), m.end()
|
||||
m = klass.re_string_lit.match(data, offset)
|
||||
if m:
|
||||
return klass.get_literal_string(data, m.end())
|
||||
#return None, offset # fallback (only for debugging)
|
||||
raise PdfFormatError("unrecognized object: " + repr(data[offset:offset+32]))
|
||||
|
||||
re_lit_str_token = re.compile(br"(\\[nrtbf()\\])|(\\[0-9]{1,3})|(\\(\r\n|\r|\n))|(\r\n|\r|\n)|(\()|(\))")
|
||||
escaped_chars = {
|
||||
b"n": b"\n",
|
||||
b"r": b"\r",
|
||||
b"t": b"\t",
|
||||
b"b": b"\b",
|
||||
b"f": b"\f",
|
||||
b"(": b"(",
|
||||
b")": b")",
|
||||
b"\\": b"\\",
|
||||
ord(b"n"): b"\n",
|
||||
ord(b"r"): b"\r",
|
||||
ord(b"t"): b"\t",
|
||||
ord(b"b"): b"\b",
|
||||
ord(b"f"): b"\f",
|
||||
ord(b"("): b"(",
|
||||
ord(b")"): b")",
|
||||
ord(b"\\"): b"\\",
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def get_literal_string(klass, data, offset):
|
||||
nesting_depth = 0
|
||||
result = bytearray()
|
||||
for m in klass.re_lit_str_token.finditer(data, offset):
|
||||
result.extend(data[offset:m.start()])
|
||||
if m.group(1):
|
||||
result.extend(klass.escaped_chars[m.group(1)[1]])
|
||||
elif m.group(2):
|
||||
result.append(int(m.group(2)[1:], 8))
|
||||
elif m.group(3):
|
||||
pass
|
||||
elif m.group(5):
|
||||
result.extend(b"\n")
|
||||
elif m.group(6):
|
||||
result.extend(b"(")
|
||||
nesting_depth += 1
|
||||
elif m.group(7):
|
||||
if nesting_depth == 0:
|
||||
return bytes(result), m.end()
|
||||
result.extend(b")")
|
||||
nesting_depth -= 1
|
||||
offset = m.end()
|
||||
raise PdfFormatError("unfinished literal string")
|
||||
|
||||
re_xref_section_start = re.compile(whitespace_optional + br"xref" + newline)
|
||||
re_xref_subsection_start = re.compile(whitespace_optional + br"([0-9]+)" + whitespace_mandatory + br"([0-9]+)" + whitespace_optional + newline_only)
|
||||
re_xref_entry = re.compile(br"([0-9]{10}) ([0-9]{5}) ([fn])( \r| \n|\r\n)")
|
||||
|
||||
def read_xref_table(self, xref_section_offset):
|
||||
subsection_found = False
|
||||
m = self.re_xref_section_start.match(self.buf, xref_section_offset + self.start_offset)
|
||||
check_format_condition(m, "xref section start not found")
|
||||
offset = m.end()
|
||||
while True:
|
||||
m = self.re_xref_subsection_start.match(self.buf, offset)
|
||||
if not m:
|
||||
check_format_condition(subsection_found, "xref subsection start not found")
|
||||
break
|
||||
subsection_found = True
|
||||
offset = m.end()
|
||||
first_object = int(m.group(1))
|
||||
num_objects = int(m.group(2))
|
||||
for i in range(first_object, first_object+num_objects):
|
||||
m = self.re_xref_entry.match(self.buf, offset)
|
||||
check_format_condition(m, "xref entry not found")
|
||||
offset = m.end()
|
||||
is_free = m.group(3) == b"f"
|
||||
generation = int(m.group(2))
|
||||
if not is_free:
|
||||
new_entry = (int(m.group(1)), generation)
|
||||
check_format_condition(i not in self.xref_table or self.xref_table[i] == new_entry, "xref entry duplicated (and not identical)")
|
||||
self.xref_table[i] = new_entry
|
||||
return offset
|
||||
|
||||
def read_indirect(self, ref, max_nesting=-1):
|
||||
offset, generation = self.xref_table[ref[0]]
|
||||
check_format_condition(generation == ref[1], "expected to find generation %s for object ID %s in xref table, instead found generation %s at offset %s" \
|
||||
% (ref[1], ref[0], generation, offset))
|
||||
value = self.get_value(self.buf, offset + self.start_offset, expect_indirect=IndirectReference(*ref), max_nesting=max_nesting)[0]
|
||||
self.cached_objects[ref] = value
|
||||
return value
|
||||
|
||||
def linearize_page_tree(self, node=None):
|
||||
if node is None:
|
||||
node = self.page_tree_root
|
||||
check_format_condition(node[b"Type"] == b"Pages", "/Type of page tree node is not /Pages")
|
||||
pages = []
|
||||
for kid in node[b"Kids"]:
|
||||
kid_object = self.read_indirect(kid)
|
||||
if kid_object[b"Type"] == b"Page":
|
||||
pages.append(kid)
|
||||
else:
|
||||
pages.extend(self.linearize_page_tree(node=kid_object))
|
||||
return pages
|
Loading…
Reference in New Issue
Block a user