mirror of
https://github.com/python-pillow/Pillow.git
synced 2025-01-26 17:24:31 +03:00
issue #2959: keep file open, add context manager, add methods to support writing, eliminate the passing of file or buffer
This commit is contained in:
parent
78fe32a431
commit
ede57b91e0
|
@ -19,6 +19,11 @@ class TestFilePdf(PillowTestCase):
|
|||
# Assert
|
||||
self.assertTrue(os.path.isfile(outfile))
|
||||
self.assertGreater(os.path.getsize(outfile), 0)
|
||||
with pdfParser.PdfParser(outfile) as pdf:
|
||||
if kwargs.get("append_images", False) or kwargs.get("append", False):
|
||||
self.assertGreater(len(pdf.pages), 1)
|
||||
else:
|
||||
self.assertGreater(len(pdf.pages), 0)
|
||||
|
||||
return outfile
|
||||
|
||||
|
@ -100,27 +105,34 @@ class TestFilePdf(PillowTestCase):
|
|||
self.assertGreater(os.path.getsize(outfile), 0)
|
||||
|
||||
def test_pdf_open(self):
|
||||
# fail on empty buffer
|
||||
self.assertRaises(pdfParser.PdfFormatError, pdfParser.PdfParser, buf=bytearray())
|
||||
# fail on a buffer full of null bytes
|
||||
self.assertRaises(pdfParser.PdfFormatError, pdfParser.PdfParser, buf=bytearray(65536))
|
||||
# make an empty PDF object
|
||||
empty_pdf = pdfParser.PdfParser()
|
||||
self.assertEqual(len(empty_pdf.pages), 0)
|
||||
with pdfParser.PdfParser() as empty_pdf:
|
||||
self.assertEqual(len(empty_pdf.pages), 0)
|
||||
self.assertEqual(len(empty_pdf.info), 0)
|
||||
self.assertFalse(empty_pdf.should_close_buf)
|
||||
self.assertFalse(empty_pdf.should_close_file)
|
||||
# make a PDF file
|
||||
pdf_filename = self.helper_save_as_pdf("RGB")
|
||||
# open the PDF file
|
||||
hopper_pdf = pdfParser.PdfParser(filename=pdf_filename)
|
||||
self.assertEqual(len(hopper_pdf.pages), 1)
|
||||
with pdfParser.PdfParser(filename=pdf_filename) as hopper_pdf:
|
||||
self.assertEqual(len(hopper_pdf.pages), 1)
|
||||
self.assertTrue(hopper_pdf.should_close_buf)
|
||||
self.assertTrue(hopper_pdf.should_close_file)
|
||||
# read a PDF file from a buffer with a non-zero offset
|
||||
with open(pdf_filename, "rb") as f:
|
||||
content = b"xyzzy" + f.read()
|
||||
hopper_pdf = pdfParser.PdfParser(buf=content, start_offset=5)
|
||||
self.assertEqual(len(hopper_pdf.pages), 1)
|
||||
with pdfParser.PdfParser(buf=content, start_offset=5) as hopper_pdf:
|
||||
self.assertEqual(len(hopper_pdf.pages), 1)
|
||||
self.assertFalse(hopper_pdf.should_close_buf)
|
||||
self.assertFalse(hopper_pdf.should_close_file)
|
||||
# read a PDF file from an already open file
|
||||
with open(pdf_filename, "rb") as f:
|
||||
hopper_pdf = pdfParser.PdfParser(f=f)
|
||||
self.assertEqual(len(hopper_pdf.pages), 1)
|
||||
with pdfParser.PdfParser(f=f) as hopper_pdf:
|
||||
self.assertEqual(len(hopper_pdf.pages), 1)
|
||||
self.assertTrue(hopper_pdf.should_close_buf)
|
||||
self.assertFalse(hopper_pdf.should_close_file)
|
||||
|
||||
def test_pdf_append_fails_on_nonexistent_file(self):
|
||||
im = hopper("RGB")
|
||||
|
@ -134,50 +146,49 @@ class TestFilePdf(PillowTestCase):
|
|||
# make a PDF file
|
||||
pdf_filename = self.helper_save_as_pdf("RGB", producer="pdfParser")
|
||||
# open it, check pages and info
|
||||
pdf = pdfParser.PdfParser(pdf_filename)
|
||||
self.assertEqual(len(pdf.pages), 1)
|
||||
self.assertEqual(len(pdf.info), 1)
|
||||
self.assertEqual(pdf.info.Producer, "pdfParser")
|
||||
# append some info
|
||||
pdf.info.Title = "abc"
|
||||
pdf.info.Author = "def"
|
||||
pdf.info.Subject = u"ghi\uABCD"
|
||||
pdf.info.Keywords = "qw)e\\r(ty"
|
||||
pdf.info.Creator = "hopper()"
|
||||
with open(pdf_filename, "r+b") as f:
|
||||
f.seek(0, os.SEEK_END)
|
||||
with pdfParser.PdfParser(pdf_filename, mode="r+b") as pdf:
|
||||
self.assertEqual(len(pdf.pages), 1)
|
||||
self.assertEqual(len(pdf.info), 1)
|
||||
self.assertEqual(pdf.info.Producer, "pdfParser")
|
||||
# append some info
|
||||
pdf.info.Title = "abc"
|
||||
pdf.info.Author = "def"
|
||||
pdf.info.Subject = u"ghi\uABCD"
|
||||
pdf.info.Keywords = "qw)e\\r(ty"
|
||||
pdf.info.Creator = "hopper()"
|
||||
pdf.start_writing()
|
||||
pdf.write_xref_and_trailer(f)
|
||||
# open it again, check pages and info again
|
||||
pdf = pdfParser.PdfParser(pdf_filename)
|
||||
self.assertEqual(len(pdf.pages), 1)
|
||||
self.assertEqual(len(pdf.info), 6)
|
||||
self.assertEqual(pdf.info.Title, "abc")
|
||||
with pdfParser.PdfParser(pdf_filename) as pdf:
|
||||
self.assertEqual(len(pdf.pages), 1)
|
||||
self.assertEqual(len(pdf.info), 6)
|
||||
self.assertEqual(pdf.info.Title, "abc")
|
||||
# append two images
|
||||
mode_CMYK = hopper("CMYK")
|
||||
mode_P = hopper("P")
|
||||
mode_CMYK.save(pdf_filename, append=True, save_all=True, append_images=[mode_P])
|
||||
# open the PDF again, check pages and info again
|
||||
pdf = pdfParser.PdfParser(pdf_filename)
|
||||
self.assertEqual(len(pdf.pages), 3)
|
||||
self.assertEqual(len(pdf.info), 6)
|
||||
self.assertEqual(pdfParser.decode_text(pdf.info[b"Title"]), "abc")
|
||||
self.assertEqual(pdf.info.Title, "abc")
|
||||
self.assertEqual(pdf.info.Producer, "pdfParser")
|
||||
self.assertEqual(pdf.info.Keywords, "qw)e\\r(ty")
|
||||
self.assertEqual(pdf.info.Subject, u"ghi\uABCD")
|
||||
with pdfParser.PdfParser(pdf_filename) as pdf:
|
||||
self.assertEqual(len(pdf.pages), 3)
|
||||
self.assertEqual(len(pdf.info), 6)
|
||||
self.assertEqual(pdfParser.decode_text(pdf.info[b"Title"]), "abc")
|
||||
self.assertEqual(pdf.info.Title, "abc")
|
||||
self.assertEqual(pdf.info.Producer, "pdfParser")
|
||||
self.assertEqual(pdf.info.Keywords, "qw)e\\r(ty")
|
||||
self.assertEqual(pdf.info.Subject, u"ghi\uABCD")
|
||||
|
||||
def test_pdf_append(self):
|
||||
# make a PDF file
|
||||
pdf_filename = self.helper_save_as_pdf("RGB", title="title", author="author", subject="subject", keywords="keywords", creator="creator", producer="producer")
|
||||
# open it, check pages and info
|
||||
pdf = pdfParser.PdfParser(pdf_filename)
|
||||
self.assertEqual(len(pdf.info), 6)
|
||||
self.assertEqual(pdf.info.Title, "title")
|
||||
self.assertEqual(pdf.info.Author, "author")
|
||||
self.assertEqual(pdf.info.Subject, "subject")
|
||||
self.assertEqual(pdf.info.Keywords, "keywords")
|
||||
self.assertEqual(pdf.info.Creator, "creator")
|
||||
self.assertEqual(pdf.info.Producer, "producer")
|
||||
with pdfParser.PdfParser(pdf_filename) as pdf:
|
||||
self.assertEqual(len(pdf.info), 6)
|
||||
self.assertEqual(pdf.info.Title, "title")
|
||||
self.assertEqual(pdf.info.Author, "author")
|
||||
self.assertEqual(pdf.info.Subject, "subject")
|
||||
self.assertEqual(pdf.info.Keywords, "keywords")
|
||||
self.assertEqual(pdf.info.Creator, "creator")
|
||||
self.assertEqual(pdf.info.Producer, "producer")
|
||||
|
||||
def test_pdf_append_to_bytesio(self):
|
||||
im = hopper("RGB")
|
||||
|
|
|
@ -56,10 +56,9 @@ def _save(im, fp, filename, save_all=False):
|
|||
producer = im.encoderinfo.get("producer", None)
|
||||
|
||||
if is_appending:
|
||||
existing_pdf = pdfParser.PdfParser(f=fp, filename=filename)
|
||||
fp.seek(0, io.SEEK_END)
|
||||
existing_pdf = pdfParser.PdfParser(f=fp, filename=filename, mode="r+b")
|
||||
else:
|
||||
existing_pdf = pdfParser.PdfParser()
|
||||
existing_pdf = pdfParser.PdfParser(f=fp, filename=filename, mode="w+b")
|
||||
|
||||
if title:
|
||||
existing_pdf.info.Title = title
|
||||
|
@ -78,8 +77,9 @@ def _save(im, fp, filename, save_all=False):
|
|||
# make sure image data is available
|
||||
im.load()
|
||||
|
||||
existing_pdf.write_header(fp)
|
||||
existing_pdf.write_comment(fp, "created by PIL PDF driver " + __version__)
|
||||
existing_pdf.start_writing()
|
||||
existing_pdf.write_header()
|
||||
existing_pdf.write_comment("created by PIL PDF driver " + __version__)
|
||||
|
||||
#
|
||||
# pages
|
||||
|
@ -110,7 +110,7 @@ def _save(im, fp, filename, save_all=False):
|
|||
|
||||
#
|
||||
# catalog and list of pages
|
||||
existing_pdf.write_catalog(fp)
|
||||
existing_pdf.write_catalog()
|
||||
|
||||
pageNumber = 0
|
||||
for imSequence in ims:
|
||||
|
@ -175,7 +175,7 @@ def _save(im, fp, filename, save_all=False):
|
|||
|
||||
width, height = im.size
|
||||
|
||||
existing_pdf.write_obj(fp, image_refs[pageNumber], stream=op.getvalue(),
|
||||
existing_pdf.write_obj(image_refs[pageNumber], stream=op.getvalue(),
|
||||
Type=pdfParser.PdfName("XObject"),
|
||||
Subtype=pdfParser.PdfName("Image"),
|
||||
Width=width, # * 72.0 / resolution,
|
||||
|
@ -188,7 +188,7 @@ def _save(im, fp, filename, save_all=False):
|
|||
#
|
||||
# page
|
||||
|
||||
existing_pdf.write_page(fp, page_refs[pageNumber],
|
||||
existing_pdf.write_page(page_refs[pageNumber],
|
||||
Resources=pdfParser.PdfDict(
|
||||
ProcSet=[pdfParser.PdfName("PDF"), pdfParser.PdfName(procset)],
|
||||
XObject=pdfParser.PdfDict(image=image_refs[pageNumber])),
|
||||
|
@ -204,15 +204,16 @@ def _save(im, fp, filename, save_all=False):
|
|||
int(width * 72.0 / resolution),
|
||||
int(height * 72.0 / resolution)))
|
||||
|
||||
existing_pdf.write_obj(fp, contents_refs[pageNumber], stream=page_contents)
|
||||
existing_pdf.write_obj(contents_refs[pageNumber], stream=page_contents)
|
||||
|
||||
pageNumber += 1
|
||||
|
||||
#
|
||||
# trailer
|
||||
existing_pdf.write_xref_and_trailer(fp)
|
||||
existing_pdf.write_xref_and_trailer()
|
||||
if hasattr(fp, "flush"):
|
||||
fp.flush()
|
||||
existing_pdf.close()
|
||||
|
||||
#
|
||||
# --------------------------------------------------------------------
|
||||
|
|
|
@ -2,6 +2,7 @@ import codecs
|
|||
import collections
|
||||
import io
|
||||
import mmap
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import zlib
|
||||
|
@ -336,17 +337,25 @@ class PdfParser:
|
|||
Supports PDF up to 1.4
|
||||
"""
|
||||
|
||||
def __init__(self, filename=None, f=None, buf=None, start_offset=0):
|
||||
def __init__(self, filename=None, f=None, buf=None, start_offset=0, mode="rb"):
|
||||
# type: (PdfParser, str, file, Union[bytes, bytearray], int, str) -> None
|
||||
assert not (buf and f)
|
||||
self.filename = filename
|
||||
self.buf = buf
|
||||
self.f = f
|
||||
self.start_offset = start_offset
|
||||
if buf is not None:
|
||||
self.should_close_buf = False
|
||||
self.should_close_file = False
|
||||
if filename is not None and f is None:
|
||||
self.f = f = open(filename, mode)
|
||||
self.should_close_file = True
|
||||
if f is not None:
|
||||
self.buf = buf = self.get_buf_from_file(f)
|
||||
self.should_close_buf = True
|
||||
if not filename and hasattr(f, "name"):
|
||||
self.filename = f.name
|
||||
if buf:
|
||||
self.read_pdf_info()
|
||||
elif f is not None:
|
||||
self.read_pdf_info_from_file(f)
|
||||
elif filename is not None:
|
||||
with open(filename, "rb") as f:
|
||||
self.read_pdf_info_from_file(f)
|
||||
else:
|
||||
self.file_size_total = self.file_size_this = 0
|
||||
self.root = PdfDict()
|
||||
|
@ -360,33 +369,63 @@ class PdfParser:
|
|||
self.trailer_dict = {}
|
||||
self.xref_table = XrefTable()
|
||||
self.xref_table.reading_finished = True
|
||||
if f:
|
||||
self.seek_end()
|
||||
|
||||
def write_header(self, f):
|
||||
f.write(b"%PDF-1.4\n")
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def write_comment(self, f, s):
|
||||
f.write(("%% %s\n" % (s,)).encode("utf-8"))
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
self.close()
|
||||
return False # do not suppress exceptions
|
||||
|
||||
def write_catalog(self, f):
|
||||
def start_writing(self):
|
||||
self.close_buf()
|
||||
self.seek_end()
|
||||
|
||||
def close_buf(self):
|
||||
try:
|
||||
self.buf.close()
|
||||
except AttributeError:
|
||||
pass
|
||||
self.buf = None
|
||||
|
||||
def close(self):
|
||||
if self.should_close_buf:
|
||||
self.close_buf()
|
||||
if self.f is not None and self.should_close_file:
|
||||
self.f.close()
|
||||
self.f = None
|
||||
|
||||
def seek_end(self):
|
||||
self.f.seek(0, os.SEEK_END)
|
||||
|
||||
def write_header(self):
|
||||
self.f.write(b"%PDF-1.4\n")
|
||||
|
||||
def write_comment(self, s):
|
||||
self.f.write(("%% %s\n" % (s,)).encode("utf-8"))
|
||||
|
||||
def write_catalog(self):
|
||||
self.del_root()
|
||||
self.root_ref = self.next_object_id(f.tell())
|
||||
self.root_ref = self.next_object_id(self.f.tell())
|
||||
self.pages_ref = self.next_object_id(0)
|
||||
self.write_obj(f, self.root_ref,
|
||||
self.write_obj(self.root_ref,
|
||||
Type=PdfName(b"Catalog"),
|
||||
Pages=self.pages_ref)
|
||||
self.write_obj(f, self.pages_ref,
|
||||
self.write_obj(self.pages_ref,
|
||||
Type=PdfName("Pages"),
|
||||
Count=len(self.pages),
|
||||
Kids=self.pages)
|
||||
return self.root_ref
|
||||
|
||||
def write_xref_and_trailer(self, f, new_root_ref=None):
|
||||
def write_xref_and_trailer(self, new_root_ref=None):
|
||||
if new_root_ref:
|
||||
self.del_root()
|
||||
self.root_ref = new_root_ref
|
||||
if self.info:
|
||||
self.info_ref = self.write_obj(f, None, self.info)
|
||||
start_xref = self.xref_table.write(f)
|
||||
self.info_ref = self.write_obj(None, self.info)
|
||||
start_xref = self.xref_table.write(self.f)
|
||||
num_entries = len(self.xref_table)
|
||||
trailer_dict = {b"Root": self.root_ref, b"Size": num_entries}
|
||||
if self.last_xref_section_offset is not None:
|
||||
|
@ -394,18 +433,19 @@ class PdfParser:
|
|||
if self.info:
|
||||
trailer_dict[b"Info"] = self.info_ref
|
||||
self.last_xref_section_offset = start_xref
|
||||
f.write(b"trailer\n" + bytes(PdfDict(trailer_dict)) + make_bytes("\nstartxref\n%d\n%%%%EOF" % start_xref))
|
||||
self.f.write(b"trailer\n" + bytes(PdfDict(trailer_dict)) + make_bytes("\nstartxref\n%d\n%%%%EOF" % start_xref))
|
||||
|
||||
def write_page(self, f, ref, *objs, **dict_obj):
|
||||
def write_page(self, ref, *objs, **dict_obj):
|
||||
if isinstance(ref, int):
|
||||
ref = self.pages[ref]
|
||||
if "Type" not in dict_obj:
|
||||
dict_obj["Type"] = PdfName("Page")
|
||||
if "Parent" not in dict_obj:
|
||||
dict_obj["Parent"] = self.pages_ref
|
||||
return self.write_obj(f, ref, *objs, **dict_obj)
|
||||
return self.write_obj(ref, *objs, **dict_obj)
|
||||
|
||||
def write_obj(self, f, ref, *objs, **dict_obj):
|
||||
def write_obj(self, ref, *objs, **dict_obj):
|
||||
f = self.f
|
||||
if ref is None:
|
||||
ref = self.next_object_id(f.tell())
|
||||
else:
|
||||
|
@ -432,22 +472,17 @@ class PdfParser:
|
|||
del self.xref_table[self.root[b"Pages"].object_id]
|
||||
# XXX TODO delete Pages tree recursively
|
||||
|
||||
def read_pdf_info_from_file(self, f):
|
||||
@staticmethod
|
||||
def get_buf_from_file(f):
|
||||
if hasattr(f, "getbuffer"):
|
||||
self.buf = f.getbuffer()
|
||||
need_close = False
|
||||
return f.getbuffer()
|
||||
elif hasattr(f, "getvalue"):
|
||||
self.buf = f.getvalue()
|
||||
need_close = False
|
||||
return f.getvalue()
|
||||
else:
|
||||
self.buf = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
|
||||
need_close = True
|
||||
try:
|
||||
self.read_pdf_info()
|
||||
finally:
|
||||
if need_close:
|
||||
self.buf.close()
|
||||
self.buf = None
|
||||
try:
|
||||
return mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
|
||||
except ValueError: # cannot mmap an empty file
|
||||
return b""
|
||||
|
||||
def read_pdf_info(self):
|
||||
self.file_size_total = len(self.buf)
|
||||
|
|
Loading…
Reference in New Issue
Block a user