Add APNG support

See #3483

Adds support for reading APNG files and seeking through frames,
and adds basic support for writing APNG files.
This commit is contained in:
Peter Rowlands 2019-12-01 12:50:21 +09:00 committed by Andrew Murray
parent dab94e69d1
commit 67e3ccffeb

View File

@ -31,13 +31,15 @@
# See the README file for information on usage and redistribution.
#
import itertools
import logging
import re
import struct
import warnings
import zlib
from . import Image, ImageFile, ImagePalette
from ._binary import i8, i16be as i16, i32be as i32, o16be as o16, o32be as o32
from . import Image, ImageChops, ImageFile, ImagePalette, ImageSequence
from ._binary import i8, i16be as i16, i32be as i32, o8, o16be as o16, o32be as o32
logger = logging.getLogger(__name__)
@ -81,6 +83,16 @@ MAX_TEXT_CHUNK = ImageFile.SAFEBLOCK
MAX_TEXT_MEMORY = 64 * MAX_TEXT_CHUNK
# APNG frame disposal modes
APNG_DISPOSE_OP_NONE = 0
APNG_DISPOSE_OP_BACKGROUND = 1
APNG_DISPOSE_OP_PREVIOUS = 2
# APNG frame blend modes
APNG_BLEND_OP_SOURCE = 0
APNG_BLEND_OP_OVER = 1
def _safe_zlib_decompress(s):
dobj = zlib.decompressobj()
plaintext = dobj.decompress(s, MAX_TEXT_CHUNK)
@ -298,6 +310,9 @@ class PngStream(ChunkStream):
self.im_tile = None
self.im_palette = None
self.im_custom_mimetype = None
self.im_n_frames = None
self._seq_num = None
self.rewind_state = None
self.text_memory = 0
@ -309,6 +324,18 @@ class PngStream(ChunkStream):
% self.text_memory
)
def save_rewind(self):
self.rewind_state = {
"info": self.im_info.copy(),
"tile": self.im_tile,
"seq_num": self._seq_num,
}
def rewind(self):
self.im_info = self.rewind_state["info"]
self.im_tile = self.rewind_state["tile"]
self._seq_num = self.rewind_state["seq_num"]
def chunk_iCCP(self, pos, length):
# ICC profile
@ -356,7 +383,15 @@ class PngStream(ChunkStream):
def chunk_IDAT(self, pos, length):
# image data
self.im_tile = [("zip", (0, 0) + self.im_size, pos, self.im_rawmode)]
if "bbox" in self.im_info:
tile = [(
"zip", self.im_info["bbox"], pos, self.im_rawmode
)]
else:
if self.im_n_frames is not None:
self.im_info["default_image"] = True
tile = [("zip", (0, 0) + self.im_size, pos, self.im_rawmode)]
self.im_tile = tile
self.im_idat = length
raise EOFError
@ -537,9 +572,48 @@ class PngStream(ChunkStream):
# APNG chunks
def chunk_acTL(self, pos, length):
s = ImageFile._safe_read(self.fp, length)
if self.im_n_frames is not None:
self.im_n_frames = None
warnings.warn("Invalid APNG, will use default PNG image if possible")
return s
n_frames = i32(s)
if n_frames == 0 or n_frames > 0x80000000:
warnings.warn("Invalid APNG, will use default PNG image if possible")
return s
self.im_n_frames = n_frames
self.im_info["loop"] = i32(s[4:])
self.im_custom_mimetype = "image/apng"
return s
def chunk_fcTL(self, pos, length):
s = ImageFile._safe_read(self.fp, length)
seq = i32(s)
if (self._seq_num is None and seq != 0) or \
(self._seq_num is not None and self._seq_num != seq - 1):
raise SyntaxError("APNG contains frame sequence errors")
self._seq_num = seq
width, height = i32(s[4:]), i32(s[8:])
px, py = i32(s[12:]), i32(s[16:])
im_w, im_h = self.im_size
if px + width > im_w or py + height > im_h:
raise SyntaxError("APNG contains invalid frames")
self.im_info["bbox"] = (px, py, px + width, py + height)
delay_num, delay_den = i16(s[20:]), i16(s[22:])
if delay_den == 0:
delay_den = 100
self.im_info["duration"] = float(delay_num) / float(delay_den) * 1000
self.im_info["disposal"] = i8(s[24])
self.im_info["blend"] = i8(s[25])
return s
def chunk_fdAT(self, pos, length):
s = ImageFile._safe_read(self.fp, 4)
seq = i32(s)
if self._seq_num != seq - 1:
raise SyntaxError("APNG contains frame sequence errors")
self._seq_num = seq
return self.chunk_IDAT(pos + 4, length - 4)
# --------------------------------------------------------------------
# PNG reader
@ -562,9 +636,11 @@ class PngImageFile(ImageFile.ImageFile):
if self.fp.read(8) != _MAGIC:
raise SyntaxError("not a PNG file")
self.__fp = self.fp
#
# Parse headers up to the first IDAT chunk
# Parse headers up to the first IDAT chunk (or last fdAT chunk for
# APNG)
self.png = PngStream(self.fp)
@ -598,12 +674,27 @@ class PngImageFile(ImageFile.ImageFile):
self._text = None
self.tile = self.png.im_tile
self.custom_mimetype = self.png.im_custom_mimetype
self._n_frames = self.png.im_n_frames
self.default_image = self.info.get("default_image", False)
if self.png.im_palette:
rawmode, data = self.png.im_palette
self.palette = ImagePalette.raw(rawmode, data)
self.__prepare_idat = length # used by load_prepare()
if cid == b"fdAT":
self.__prepare_idat = length - 4
else:
self.__prepare_idat = length # used by load_prepare()
if self._n_frames is not None:
self._close_exclusive_fp_after_loading = False
self.png.save_rewind()
self.__rewind_idat = self.__prepare_idat
self.__rewind = self.__fp.tell()
if self.default_image:
# IDAT chunk contains default image and not first animation frame
self._n_frames += 1
self._seek(0)
@property
def text(self):
@ -611,9 +702,25 @@ class PngImageFile(ImageFile.ImageFile):
if self._text is None:
# iTxt, tEXt and zTXt chunks may appear at the end of the file
# So load the file to ensure that they are read
if self.is_animated:
frame = self.__frame
# for APNG, seek to the final frame before loading
self.seek(self.n_frames - 1)
self.load()
if self.is_animated:
self.seek(frame)
return self._text
@property
def n_frames(self):
if self._n_frames is None:
return 1
return self._n_frames
@property
def is_animated(self):
return self._n_frames is not None
def verify(self):
"""Verify PNG file"""
@ -630,6 +737,95 @@ class PngImageFile(ImageFile.ImageFile):
self.fp.close()
self.fp = None
def seek(self, frame):
if not self._seek_check(frame):
return
if frame < self.__frame:
self._seek(0, True)
last_frame = self.__frame
for f in range(self.__frame + 1, frame + 1):
try:
self._seek(f)
except EOFError:
self.seek(last_frame)
raise EOFError("no more images in APNG file")
def _seek(self, frame, rewind=False):
if frame == 0:
if rewind:
self.__fp.seek(self.__rewind)
self.png.rewind()
self.__prepare_idat = self.__rewind_idat
self.im = None
self.info = self.png.im_info
self.tile = self.png.im_tile
self.fp = self.__fp
self._prev_im = None
self.dispose = None
self.default_image = self.info.get("default_image", False)
self.dispose_op = self.info.get("disposal")
self.blend_op = self.info.get("blend")
self.dispose_extent = self.info.get("bbox")
self.__frame = 0
return
else:
if frame != self.__frame + 1:
raise ValueError("cannot seek to frame %d" % frame)
# ensure previous frame was loaded
self.load()
self.fp = self.__fp
# advance to the next frame
if self.__prepare_idat:
ImageFile._safe_read(self.fp, self.__prepare_idat)
self.__prepare_idat = 0
frame_start = False
while True:
self.fp.read(4) # CRC
try:
cid, pos, length = self.png.read()
except (struct.error, SyntaxError):
break
if cid == b"IEND":
raise EOFError("No more images in APNG file")
if cid == b"fcTL":
if frame_start:
# there must be at least one fdAT chunk between fcTL chunks
raise SyntaxError("APNG missing frame data")
frame_start = True
try:
self.png.call(cid, pos, length)
except UnicodeDecodeError:
break
except EOFError:
if cid == b"fdAT":
length -= 4
if frame_start:
self.__prepare_idat = length
break
ImageFile._safe_read(self.fp, length)
except AttributeError:
logger.debug("%r %s %s (unknown)", cid, pos, length)
ImageFile._safe_read(self.fp, length)
self.__frame = frame
self.tile = self.png.im_tile
self.dispose_op = self.info.get("disposal")
self.blend_op = self.info.get("blend")
self.dispose_extent = self.info.get("bbox")
if not self.tile:
raise EOFError
def tell(self):
return self.__frame
def load_prepare(self):
"""internal: prepare to read PNG file"""
@ -649,11 +845,18 @@ class PngImageFile(ImageFile.ImageFile):
cid, pos, length = self.png.read()
if cid not in [b"IDAT", b"DDAT"]:
if cid not in [b"IDAT", b"DDAT", b"fdAT"]:
self.png.push(cid, pos, length)
return b""
self.__idat = length # empty chunks are allowed
if cid == b"fdAT":
try:
self.png.call(cid, pos, length)
except EOFError:
pass
self.__idat = length - 4 # sequence_num has already been read
else:
self.__idat = length # empty chunks are allowed
# read more data from this chunk
if read_bytes <= 0:
@ -677,19 +880,50 @@ class PngImageFile(ImageFile.ImageFile):
if cid == b"IEND":
break
elif cid == b"fcTL" and self.is_animated:
# start of the next frame, stop reading
self.__prepare_idat = 0
self.png.push(cid, pos, length)
break
try:
self.png.call(cid, pos, length)
except UnicodeDecodeError:
break
except EOFError:
if cid == b"fdAT":
length -= 4
ImageFile._safe_read(self.fp, length)
except AttributeError:
logger.debug("%r %s %s (unknown)", cid, pos, length)
ImageFile._safe_read(self.fp, length)
self._text = self.png.im_text
self.png.close()
self.png = None
if not self.is_animated:
self.png.close()
self.png = None
else:
# setup frame disposal (actual disposal done when needed in _seek())
if self._prev_im is None and self.dispose_op == APNG_DISPOSE_OP_PREVIOUS:
self.dispose_op = APNG_DISPOSE_OP_BACKGROUND
if self.dispose_op == APNG_DISPOSE_OP_PREVIOUS:
dispose = self._prev_im.copy()
dispose = self._crop(dispose, self.dispose_extent)
elif self.dispose_op == APNG_DISPOSE_OP_BACKGROUND:
dispose = Image.core.fill("RGBA", self.size, (0, 0, 0, 0))
dispose = self._crop(dispose, self.dispose_extent)
else:
dispose = None
if self._prev_im and self.blend_op == APNG_BLEND_OP_OVER:
updated = self._crop(self.im, self.dispose_extent)
self._prev_im.paste(
updated, self.dispose_extent, updated.convert("RGBA"))
self.im = self._prev_im
self._prev_im = self.im.copy()
if dispose:
self._prev_im.paste(dispose, self.dispose_extent)
def _getexif(self):
if "exif" not in self.info:
@ -703,6 +937,15 @@ class PngImageFile(ImageFile.ImageFile):
self.load()
return ImageFile.ImageFile.getexif(self)
def _close__fp(self):
try:
if self.__fp != self.fp:
self.__fp.close()
except AttributeError:
pass
finally:
self.__fp = None
# --------------------------------------------------------------------
# PNG writer
@ -748,7 +991,139 @@ class _idat:
self.chunk(self.fp, b"IDAT", data)
def _save(im, fp, filename, chunk=putchunk):
class _fdat:
# wrap encoder output in fdAT chunks
def __init__(self, fp, chunk, seq_num):
self.fp = fp
self.chunk = chunk
self.seq_num = seq_num
def write(self, data):
self.chunk(self.fp, b"fdAT", o32(self.seq_num), data)
def _write_multiple_frames(im, fp, chunk, rawmode):
default_image = im.encoderinfo.get("default_image", im.info.get("default_image"))
duration = im.encoderinfo.get("duration", im.info.get("duration", 0))
loop = im.encoderinfo.get("loop", im.info.get("loop", 0))
disposal = im.encoderinfo.get("disposal", im.info.get("disposal"))
blend = im.encoderinfo.get("blend", im.info.get("blend"))
if default_image:
chain = itertools.chain(im.encoderinfo.get("append_images", []))
else:
chain = itertools.chain([im], im.encoderinfo.get("append_images", []))
im_frames = []
frame_count = 0
for im_seq in chain:
for im_frame in ImageSequence.Iterator(im_seq):
im_frame = im_frame.copy()
if im_frame.mode != im.mode:
if im.mode == "P":
im_frame = im_frame.convert(im.mode, palette=im.palette)
else:
im_frame = im_frame.convert(im.mode)
encoderinfo = im.encoderinfo.copy()
if isinstance(duration, (list, tuple)):
encoderinfo["duration"] = duration[frame_count]
if isinstance(disposal, (list, tuple)):
encoderinfo["disposal"] = disposal[frame_count]
if isinstance(blend, (list, tuple)):
encoderinfo["blend"] = blend[frame_count]
frame_count += 1
if im_frames:
previous = im_frames[-1]
prev_disposal = previous["encoderinfo"].get("disposal")
prev_blend = previous["encoderinfo"].get("blend")
if prev_disposal == APNG_DISPOSE_OP_PREVIOUS and len(im_frames) < 2:
prev_disposal == APNG_DISPOSE_OP_BACKGROUND
if prev_disposal == APNG_DISPOSE_OP_BACKGROUND:
base_im = previous["im"]
dispose = Image.core.fill("RGBA", im.size, (0, 0, 0, 0))
bbox = previous["bbox"]
if bbox:
dispose = dispose.crop(bbox)
else:
bbox = (0, 0) + im.size
base_im.paste(dispose, bbox)
elif prev_disposal == APNG_DISPOSE_OP_PREVIOUS:
base_im = im_frames[-2]["im"]
else:
base_im = previous["im"]
delta = ImageChops.subtract_modulo(im_frame, base_im)
bbox = delta.getbbox()
if (not bbox and prev_disposal == encoderinfo.get("disposal")
and prev_blend == encoderinfo.get("blend")):
duration = encoderinfo.get("duration", 0)
if duration:
if "duration" in previous["encoderinfo"]:
previous["encoderinfo"]["duration"] += duration
else:
previous["encoderinfo"]["duration"] = duration
continue
else:
bbox = None
im_frames.append({"im": im_frame, "bbox": bbox, "encoderinfo": encoderinfo})
# animation control
chunk(
fp,
b"acTL",
o32(len(im_frames)), # 0: num_frames
o32(loop), # 4: num_plays
)
# default image IDAT (if it exists)
if default_image:
ImageFile._save(im, _idat(fp, chunk), [("zip", (0, 0) + im.size, 0, rawmode)])
seq_num = 0
for frame, frame_data in enumerate(im_frames):
im_frame = frame_data["im"]
if not frame_data["bbox"]:
bbox = (0, 0) + im_frame.size
else:
bbox = frame_data["bbox"]
im_frame = im_frame.crop(bbox)
size = im_frame.size
duration = int(round(frame_data["encoderinfo"].get("duration", 0)))
disposal = frame_data["encoderinfo"].get("disposal", APNG_DISPOSE_OP_NONE)
blend = frame_data["encoderinfo"].get("blend", APNG_BLEND_OP_SOURCE)
# frame control
chunk(
fp,
b"fcTL",
o32(seq_num), # sequence_number
o32(size[0]), # width
o32(size[1]), # height
o32(bbox[0]), # x_offset
o32(bbox[1]), # y_offset
o16(duration), # delay_numerator
o16(1000), # delay_denominator
o8(disposal), # dispose_op
o8(blend), # blend_op
)
seq_num += 1
# frame data
if frame == 0 and not default_image:
# first frame must be in IDAT chunks for backwards compatibility
ImageFile._save(im_frame, _idat(fp, chunk),
[("zip", (0, 0) + im_frame.size, 0, rawmode)])
else:
ImageFile._save(im_frame, _fdat(fp, chunk, seq_num),
[("zip", (0, 0) + im_frame.size, 0, rawmode)])
seq_num += 1
def _save_all(im, fp, filename):
_save(im, fp, filename, save_all=True)
def _save(im, fp, filename, chunk=putchunk, save_all=False):
# save an image to disk (called by the save method)
mode = im.mode
@ -897,7 +1272,10 @@ def _save(im, fp, filename, chunk=putchunk):
exif = exif[6:]
chunk(fp, b"eXIf", exif)
ImageFile._save(im, _idat(fp, chunk), [("zip", (0, 0) + im.size, 0, rawmode)])
if save_all:
_write_multiple_frames(im, fp, chunk, rawmode)
else:
ImageFile._save(im, _idat(fp, chunk), [("zip", (0, 0) + im.size, 0, rawmode)])
chunk(fp, b"IEND", b"")
@ -942,6 +1320,7 @@ def getchunks(im, **params):
Image.register_open(PngImageFile.format, PngImageFile, _accept)
Image.register_save(PngImageFile.format, _save)
Image.register_save_all(PngImageFile.format, _save_all)
Image.register_extensions(PngImageFile.format, [".png", ".apng"])