From 67e3ccffeb09903e22c29d737e74070415890ed8 Mon Sep 17 00:00:00 2001 From: Peter Rowlands Date: Sun, 1 Dec 2019 12:50:21 +0900 Subject: [PATCH] Add APNG support See #3483 Adds support for reading APNG files and seeking through frames, and adds basic support for writing APNG files. --- src/PIL/PngImagePlugin.py | 401 ++++++++++++++++++++++++++++++++++++-- 1 file changed, 390 insertions(+), 11 deletions(-) diff --git a/src/PIL/PngImagePlugin.py b/src/PIL/PngImagePlugin.py index ee1400d67..5bb0e2fff 100644 --- a/src/PIL/PngImagePlugin.py +++ b/src/PIL/PngImagePlugin.py @@ -31,13 +31,15 @@ # See the README file for information on usage and redistribution. # +import itertools import logging import re import struct +import warnings import zlib -from . import Image, ImageFile, ImagePalette -from ._binary import i8, i16be as i16, i32be as i32, o16be as o16, o32be as o32 +from . import Image, ImageChops, ImageFile, ImagePalette, ImageSequence +from ._binary import i8, i16be as i16, i32be as i32, o8, o16be as o16, o32be as o32 logger = logging.getLogger(__name__) @@ -81,6 +83,16 @@ MAX_TEXT_CHUNK = ImageFile.SAFEBLOCK MAX_TEXT_MEMORY = 64 * MAX_TEXT_CHUNK +# APNG frame disposal modes +APNG_DISPOSE_OP_NONE = 0 +APNG_DISPOSE_OP_BACKGROUND = 1 +APNG_DISPOSE_OP_PREVIOUS = 2 + +# APNG frame blend modes +APNG_BLEND_OP_SOURCE = 0 +APNG_BLEND_OP_OVER = 1 + + def _safe_zlib_decompress(s): dobj = zlib.decompressobj() plaintext = dobj.decompress(s, MAX_TEXT_CHUNK) @@ -298,6 +310,9 @@ class PngStream(ChunkStream): self.im_tile = None self.im_palette = None self.im_custom_mimetype = None + self.im_n_frames = None + self._seq_num = None + self.rewind_state = None self.text_memory = 0 @@ -309,6 +324,18 @@ class PngStream(ChunkStream): % self.text_memory ) + def save_rewind(self): + self.rewind_state = { + "info": self.im_info.copy(), + "tile": self.im_tile, + "seq_num": self._seq_num, + } + + def rewind(self): + self.im_info = self.rewind_state["info"] + self.im_tile = self.rewind_state["tile"] + self._seq_num = self.rewind_state["seq_num"] + def chunk_iCCP(self, pos, length): # ICC profile @@ -356,7 +383,15 @@ class PngStream(ChunkStream): def chunk_IDAT(self, pos, length): # image data - self.im_tile = [("zip", (0, 0) + self.im_size, pos, self.im_rawmode)] + if "bbox" in self.im_info: + tile = [( + "zip", self.im_info["bbox"], pos, self.im_rawmode + )] + else: + if self.im_n_frames is not None: + self.im_info["default_image"] = True + tile = [("zip", (0, 0) + self.im_size, pos, self.im_rawmode)] + self.im_tile = tile self.im_idat = length raise EOFError @@ -537,9 +572,48 @@ class PngStream(ChunkStream): # APNG chunks def chunk_acTL(self, pos, length): s = ImageFile._safe_read(self.fp, length) + if self.im_n_frames is not None: + self.im_n_frames = None + warnings.warn("Invalid APNG, will use default PNG image if possible") + return s + n_frames = i32(s) + if n_frames == 0 or n_frames > 0x80000000: + warnings.warn("Invalid APNG, will use default PNG image if possible") + return s + self.im_n_frames = n_frames + self.im_info["loop"] = i32(s[4:]) self.im_custom_mimetype = "image/apng" return s + def chunk_fcTL(self, pos, length): + s = ImageFile._safe_read(self.fp, length) + seq = i32(s) + if (self._seq_num is None and seq != 0) or \ + (self._seq_num is not None and self._seq_num != seq - 1): + raise SyntaxError("APNG contains frame sequence errors") + self._seq_num = seq + width, height = i32(s[4:]), i32(s[8:]) + px, py = i32(s[12:]), i32(s[16:]) + im_w, im_h = self.im_size + if px + width > im_w or py + height > im_h: + raise SyntaxError("APNG contains invalid frames") + self.im_info["bbox"] = (px, py, px + width, py + height) + delay_num, delay_den = i16(s[20:]), i16(s[22:]) + if delay_den == 0: + delay_den = 100 + self.im_info["duration"] = float(delay_num) / float(delay_den) * 1000 + self.im_info["disposal"] = i8(s[24]) + self.im_info["blend"] = i8(s[25]) + return s + + def chunk_fdAT(self, pos, length): + s = ImageFile._safe_read(self.fp, 4) + seq = i32(s) + if self._seq_num != seq - 1: + raise SyntaxError("APNG contains frame sequence errors") + self._seq_num = seq + return self.chunk_IDAT(pos + 4, length - 4) + # -------------------------------------------------------------------- # PNG reader @@ -562,9 +636,11 @@ class PngImageFile(ImageFile.ImageFile): if self.fp.read(8) != _MAGIC: raise SyntaxError("not a PNG file") + self.__fp = self.fp # - # Parse headers up to the first IDAT chunk + # Parse headers up to the first IDAT chunk (or last fdAT chunk for + # APNG) self.png = PngStream(self.fp) @@ -598,12 +674,27 @@ class PngImageFile(ImageFile.ImageFile): self._text = None self.tile = self.png.im_tile self.custom_mimetype = self.png.im_custom_mimetype + self._n_frames = self.png.im_n_frames + self.default_image = self.info.get("default_image", False) if self.png.im_palette: rawmode, data = self.png.im_palette self.palette = ImagePalette.raw(rawmode, data) - self.__prepare_idat = length # used by load_prepare() + if cid == b"fdAT": + self.__prepare_idat = length - 4 + else: + self.__prepare_idat = length # used by load_prepare() + + if self._n_frames is not None: + self._close_exclusive_fp_after_loading = False + self.png.save_rewind() + self.__rewind_idat = self.__prepare_idat + self.__rewind = self.__fp.tell() + if self.default_image: + # IDAT chunk contains default image and not first animation frame + self._n_frames += 1 + self._seek(0) @property def text(self): @@ -611,9 +702,25 @@ class PngImageFile(ImageFile.ImageFile): if self._text is None: # iTxt, tEXt and zTXt chunks may appear at the end of the file # So load the file to ensure that they are read + if self.is_animated: + frame = self.__frame + # for APNG, seek to the final frame before loading + self.seek(self.n_frames - 1) self.load() + if self.is_animated: + self.seek(frame) return self._text + @property + def n_frames(self): + if self._n_frames is None: + return 1 + return self._n_frames + + @property + def is_animated(self): + return self._n_frames is not None + def verify(self): """Verify PNG file""" @@ -630,6 +737,95 @@ class PngImageFile(ImageFile.ImageFile): self.fp.close() self.fp = None + def seek(self, frame): + if not self._seek_check(frame): + return + if frame < self.__frame: + self._seek(0, True) + + last_frame = self.__frame + for f in range(self.__frame + 1, frame + 1): + try: + self._seek(f) + except EOFError: + self.seek(last_frame) + raise EOFError("no more images in APNG file") + + def _seek(self, frame, rewind=False): + if frame == 0: + if rewind: + self.__fp.seek(self.__rewind) + self.png.rewind() + self.__prepare_idat = self.__rewind_idat + self.im = None + self.info = self.png.im_info + self.tile = self.png.im_tile + self.fp = self.__fp + self._prev_im = None + self.dispose = None + self.default_image = self.info.get("default_image", False) + self.dispose_op = self.info.get("disposal") + self.blend_op = self.info.get("blend") + self.dispose_extent = self.info.get("bbox") + self.__frame = 0 + return + else: + if frame != self.__frame + 1: + raise ValueError("cannot seek to frame %d" % frame) + + # ensure previous frame was loaded + self.load() + + self.fp = self.__fp + + # advance to the next frame + if self.__prepare_idat: + ImageFile._safe_read(self.fp, self.__prepare_idat) + self.__prepare_idat = 0 + frame_start = False + while True: + self.fp.read(4) # CRC + + try: + cid, pos, length = self.png.read() + except (struct.error, SyntaxError): + break + + if cid == b"IEND": + raise EOFError("No more images in APNG file") + if cid == b"fcTL": + if frame_start: + # there must be at least one fdAT chunk between fcTL chunks + raise SyntaxError("APNG missing frame data") + frame_start = True + + try: + self.png.call(cid, pos, length) + except UnicodeDecodeError: + break + except EOFError: + if cid == b"fdAT": + length -= 4 + if frame_start: + self.__prepare_idat = length + break + ImageFile._safe_read(self.fp, length) + except AttributeError: + logger.debug("%r %s %s (unknown)", cid, pos, length) + ImageFile._safe_read(self.fp, length) + + self.__frame = frame + self.tile = self.png.im_tile + self.dispose_op = self.info.get("disposal") + self.blend_op = self.info.get("blend") + self.dispose_extent = self.info.get("bbox") + + if not self.tile: + raise EOFError + + def tell(self): + return self.__frame + def load_prepare(self): """internal: prepare to read PNG file""" @@ -649,11 +845,18 @@ class PngImageFile(ImageFile.ImageFile): cid, pos, length = self.png.read() - if cid not in [b"IDAT", b"DDAT"]: + if cid not in [b"IDAT", b"DDAT", b"fdAT"]: self.png.push(cid, pos, length) return b"" - self.__idat = length # empty chunks are allowed + if cid == b"fdAT": + try: + self.png.call(cid, pos, length) + except EOFError: + pass + self.__idat = length - 4 # sequence_num has already been read + else: + self.__idat = length # empty chunks are allowed # read more data from this chunk if read_bytes <= 0: @@ -677,19 +880,50 @@ class PngImageFile(ImageFile.ImageFile): if cid == b"IEND": break + elif cid == b"fcTL" and self.is_animated: + # start of the next frame, stop reading + self.__prepare_idat = 0 + self.png.push(cid, pos, length) + break try: self.png.call(cid, pos, length) except UnicodeDecodeError: break except EOFError: + if cid == b"fdAT": + length -= 4 ImageFile._safe_read(self.fp, length) except AttributeError: logger.debug("%r %s %s (unknown)", cid, pos, length) ImageFile._safe_read(self.fp, length) self._text = self.png.im_text - self.png.close() - self.png = None + if not self.is_animated: + self.png.close() + self.png = None + else: + # setup frame disposal (actual disposal done when needed in _seek()) + if self._prev_im is None and self.dispose_op == APNG_DISPOSE_OP_PREVIOUS: + self.dispose_op = APNG_DISPOSE_OP_BACKGROUND + + if self.dispose_op == APNG_DISPOSE_OP_PREVIOUS: + dispose = self._prev_im.copy() + dispose = self._crop(dispose, self.dispose_extent) + elif self.dispose_op == APNG_DISPOSE_OP_BACKGROUND: + dispose = Image.core.fill("RGBA", self.size, (0, 0, 0, 0)) + dispose = self._crop(dispose, self.dispose_extent) + else: + dispose = None + + if self._prev_im and self.blend_op == APNG_BLEND_OP_OVER: + updated = self._crop(self.im, self.dispose_extent) + self._prev_im.paste( + updated, self.dispose_extent, updated.convert("RGBA")) + self.im = self._prev_im + self._prev_im = self.im.copy() + + if dispose: + self._prev_im.paste(dispose, self.dispose_extent) def _getexif(self): if "exif" not in self.info: @@ -703,6 +937,15 @@ class PngImageFile(ImageFile.ImageFile): self.load() return ImageFile.ImageFile.getexif(self) + def _close__fp(self): + try: + if self.__fp != self.fp: + self.__fp.close() + except AttributeError: + pass + finally: + self.__fp = None + # -------------------------------------------------------------------- # PNG writer @@ -748,7 +991,139 @@ class _idat: self.chunk(self.fp, b"IDAT", data) -def _save(im, fp, filename, chunk=putchunk): +class _fdat: + # wrap encoder output in fdAT chunks + + def __init__(self, fp, chunk, seq_num): + self.fp = fp + self.chunk = chunk + self.seq_num = seq_num + + def write(self, data): + self.chunk(self.fp, b"fdAT", o32(self.seq_num), data) + + +def _write_multiple_frames(im, fp, chunk, rawmode): + default_image = im.encoderinfo.get("default_image", im.info.get("default_image")) + duration = im.encoderinfo.get("duration", im.info.get("duration", 0)) + loop = im.encoderinfo.get("loop", im.info.get("loop", 0)) + disposal = im.encoderinfo.get("disposal", im.info.get("disposal")) + blend = im.encoderinfo.get("blend", im.info.get("blend")) + + if default_image: + chain = itertools.chain(im.encoderinfo.get("append_images", [])) + else: + chain = itertools.chain([im], im.encoderinfo.get("append_images", [])) + + im_frames = [] + frame_count = 0 + for im_seq in chain: + for im_frame in ImageSequence.Iterator(im_seq): + im_frame = im_frame.copy() + if im_frame.mode != im.mode: + if im.mode == "P": + im_frame = im_frame.convert(im.mode, palette=im.palette) + else: + im_frame = im_frame.convert(im.mode) + encoderinfo = im.encoderinfo.copy() + if isinstance(duration, (list, tuple)): + encoderinfo["duration"] = duration[frame_count] + if isinstance(disposal, (list, tuple)): + encoderinfo["disposal"] = disposal[frame_count] + if isinstance(blend, (list, tuple)): + encoderinfo["blend"] = blend[frame_count] + frame_count += 1 + + if im_frames: + previous = im_frames[-1] + prev_disposal = previous["encoderinfo"].get("disposal") + prev_blend = previous["encoderinfo"].get("blend") + if prev_disposal == APNG_DISPOSE_OP_PREVIOUS and len(im_frames) < 2: + prev_disposal == APNG_DISPOSE_OP_BACKGROUND + + if prev_disposal == APNG_DISPOSE_OP_BACKGROUND: + base_im = previous["im"] + dispose = Image.core.fill("RGBA", im.size, (0, 0, 0, 0)) + bbox = previous["bbox"] + if bbox: + dispose = dispose.crop(bbox) + else: + bbox = (0, 0) + im.size + base_im.paste(dispose, bbox) + elif prev_disposal == APNG_DISPOSE_OP_PREVIOUS: + base_im = im_frames[-2]["im"] + else: + base_im = previous["im"] + delta = ImageChops.subtract_modulo(im_frame, base_im) + bbox = delta.getbbox() + if (not bbox and prev_disposal == encoderinfo.get("disposal") + and prev_blend == encoderinfo.get("blend")): + duration = encoderinfo.get("duration", 0) + if duration: + if "duration" in previous["encoderinfo"]: + previous["encoderinfo"]["duration"] += duration + else: + previous["encoderinfo"]["duration"] = duration + continue + else: + bbox = None + im_frames.append({"im": im_frame, "bbox": bbox, "encoderinfo": encoderinfo}) + + # animation control + chunk( + fp, + b"acTL", + o32(len(im_frames)), # 0: num_frames + o32(loop), # 4: num_plays + ) + + # default image IDAT (if it exists) + if default_image: + ImageFile._save(im, _idat(fp, chunk), [("zip", (0, 0) + im.size, 0, rawmode)]) + + seq_num = 0 + for frame, frame_data in enumerate(im_frames): + im_frame = frame_data["im"] + if not frame_data["bbox"]: + bbox = (0, 0) + im_frame.size + else: + bbox = frame_data["bbox"] + im_frame = im_frame.crop(bbox) + size = im_frame.size + duration = int(round(frame_data["encoderinfo"].get("duration", 0))) + disposal = frame_data["encoderinfo"].get("disposal", APNG_DISPOSE_OP_NONE) + blend = frame_data["encoderinfo"].get("blend", APNG_BLEND_OP_SOURCE) + # frame control + chunk( + fp, + b"fcTL", + o32(seq_num), # sequence_number + o32(size[0]), # width + o32(size[1]), # height + o32(bbox[0]), # x_offset + o32(bbox[1]), # y_offset + o16(duration), # delay_numerator + o16(1000), # delay_denominator + o8(disposal), # dispose_op + o8(blend), # blend_op + ) + seq_num += 1 + # frame data + if frame == 0 and not default_image: + # first frame must be in IDAT chunks for backwards compatibility + ImageFile._save(im_frame, _idat(fp, chunk), + [("zip", (0, 0) + im_frame.size, 0, rawmode)]) + else: + ImageFile._save(im_frame, _fdat(fp, chunk, seq_num), + [("zip", (0, 0) + im_frame.size, 0, rawmode)]) + seq_num += 1 + + +def _save_all(im, fp, filename): + _save(im, fp, filename, save_all=True) + + +def _save(im, fp, filename, chunk=putchunk, save_all=False): # save an image to disk (called by the save method) mode = im.mode @@ -897,7 +1272,10 @@ def _save(im, fp, filename, chunk=putchunk): exif = exif[6:] chunk(fp, b"eXIf", exif) - ImageFile._save(im, _idat(fp, chunk), [("zip", (0, 0) + im.size, 0, rawmode)]) + if save_all: + _write_multiple_frames(im, fp, chunk, rawmode) + else: + ImageFile._save(im, _idat(fp, chunk), [("zip", (0, 0) + im.size, 0, rawmode)]) chunk(fp, b"IEND", b"") @@ -942,6 +1320,7 @@ def getchunks(im, **params): Image.register_open(PngImageFile.format, PngImageFile, _accept) Image.register_save(PngImageFile.format, _save) +Image.register_save_all(PngImageFile.format, _save_all) Image.register_extensions(PngImageFile.format, [".png", ".apng"])