mirror of
https://github.com/python-pillow/Pillow.git
synced 2025-01-24 00:04:09 +03:00
1539 lines
49 KiB
Python
1539 lines
49 KiB
Python
#
|
||
# The Python Imaging Library.
|
||
# $Id$
|
||
#
|
||
# PNG support code
|
||
#
|
||
# See "PNG (Portable Network Graphics) Specification, version 1.0;
|
||
# W3C Recommendation", 1996-10-01, Thomas Boutell (ed.).
|
||
#
|
||
# history:
|
||
# 1996-05-06 fl Created (couldn't resist it)
|
||
# 1996-12-14 fl Upgraded, added read and verify support (0.2)
|
||
# 1996-12-15 fl Separate PNG stream parser
|
||
# 1996-12-29 fl Added write support, added getchunks
|
||
# 1996-12-30 fl Eliminated circular references in decoder (0.3)
|
||
# 1998-07-12 fl Read/write 16-bit images as mode I (0.4)
|
||
# 2001-02-08 fl Added transparency support (from Zircon) (0.5)
|
||
# 2001-04-16 fl Don't close data source in "open" method (0.6)
|
||
# 2004-02-24 fl Don't even pretend to support interlaced files (0.7)
|
||
# 2004-08-31 fl Do basic sanity check on chunk identifiers (0.8)
|
||
# 2004-09-20 fl Added PngInfo chunk container
|
||
# 2004-12-18 fl Added DPI read support (based on code by Niki Spahiev)
|
||
# 2008-08-13 fl Added tRNS support for RGB images
|
||
# 2009-03-06 fl Support for preserving ICC profiles (by Florian Hoech)
|
||
# 2009-03-08 fl Added zTXT support (from Lowell Alleman)
|
||
# 2009-03-29 fl Read interlaced PNG files (from Conrado Porto Lopes Gouvua)
|
||
#
|
||
# Copyright (c) 1997-2009 by Secret Labs AB
|
||
# Copyright (c) 1996 by Fredrik Lundh
|
||
#
|
||
# See the README file for information on usage and redistribution.
|
||
#
|
||
from __future__ import annotations
|
||
|
||
import itertools
|
||
import logging
|
||
import re
|
||
import struct
|
||
import warnings
|
||
import zlib
|
||
from collections.abc import Callable
|
||
from enum import IntEnum
|
||
from typing import IO, TYPE_CHECKING, Any, NamedTuple, NoReturn, cast
|
||
|
||
from . import Image, ImageChops, ImageFile, ImagePalette, ImageSequence
|
||
from ._binary import i16be as i16
|
||
from ._binary import i32be as i32
|
||
from ._binary import o8
|
||
from ._binary import o16be as o16
|
||
from ._binary import o32be as o32
|
||
|
||
if TYPE_CHECKING:
|
||
from . import _imaging
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
is_cid = re.compile(rb"\w\w\w\w").match
|
||
|
||
|
||
_MAGIC = b"\211PNG\r\n\032\n"
|
||
|
||
|
||
_MODES = {
|
||
# supported bits/color combinations, and corresponding modes/rawmodes
|
||
# Grayscale
|
||
(1, 0): ("1", "1"),
|
||
(2, 0): ("L", "L;2"),
|
||
(4, 0): ("L", "L;4"),
|
||
(8, 0): ("L", "L"),
|
||
(16, 0): ("I;16", "I;16B"),
|
||
# Truecolour
|
||
(8, 2): ("RGB", "RGB"),
|
||
(16, 2): ("RGB", "RGB;16B"),
|
||
# Indexed-colour
|
||
(1, 3): ("P", "P;1"),
|
||
(2, 3): ("P", "P;2"),
|
||
(4, 3): ("P", "P;4"),
|
||
(8, 3): ("P", "P"),
|
||
# Grayscale with alpha
|
||
(8, 4): ("LA", "LA"),
|
||
(16, 4): ("RGBA", "LA;16B"), # LA;16B->LA not yet available
|
||
# Truecolour with alpha
|
||
(8, 6): ("RGBA", "RGBA"),
|
||
(16, 6): ("RGBA", "RGBA;16B"),
|
||
}
|
||
|
||
|
||
_simple_palette = re.compile(b"^\xff*\x00\xff*$")
|
||
|
||
MAX_TEXT_CHUNK = ImageFile.SAFEBLOCK
|
||
"""
|
||
Maximum decompressed size for a iTXt or zTXt chunk.
|
||
Eliminates decompression bombs where compressed chunks can expand 1000x.
|
||
See :ref:`Text in PNG File Format<png-text>`.
|
||
"""
|
||
MAX_TEXT_MEMORY = 64 * MAX_TEXT_CHUNK
|
||
"""
|
||
Set the maximum total text chunk size.
|
||
See :ref:`Text in PNG File Format<png-text>`.
|
||
"""
|
||
|
||
|
||
# APNG frame disposal modes
|
||
class Disposal(IntEnum):
|
||
OP_NONE = 0
|
||
"""
|
||
No disposal is done on this frame before rendering the next frame.
|
||
See :ref:`Saving APNG sequences<apng-saving>`.
|
||
"""
|
||
OP_BACKGROUND = 1
|
||
"""
|
||
This frame’s modified region is cleared to fully transparent black before rendering
|
||
the next frame.
|
||
See :ref:`Saving APNG sequences<apng-saving>`.
|
||
"""
|
||
OP_PREVIOUS = 2
|
||
"""
|
||
This frame’s modified region is reverted to the previous frame’s contents before
|
||
rendering the next frame.
|
||
See :ref:`Saving APNG sequences<apng-saving>`.
|
||
"""
|
||
|
||
|
||
# APNG frame blend modes
|
||
class Blend(IntEnum):
|
||
OP_SOURCE = 0
|
||
"""
|
||
All color components of this frame, including alpha, overwrite the previous output
|
||
image contents.
|
||
See :ref:`Saving APNG sequences<apng-saving>`.
|
||
"""
|
||
OP_OVER = 1
|
||
"""
|
||
This frame should be alpha composited with the previous output image contents.
|
||
See :ref:`Saving APNG sequences<apng-saving>`.
|
||
"""
|
||
|
||
|
||
def _safe_zlib_decompress(s: bytes) -> bytes:
|
||
dobj = zlib.decompressobj()
|
||
plaintext = dobj.decompress(s, MAX_TEXT_CHUNK)
|
||
if dobj.unconsumed_tail:
|
||
msg = "Decompressed Data Too Large"
|
||
raise ValueError(msg)
|
||
return plaintext
|
||
|
||
|
||
def _crc32(data: bytes, seed: int = 0) -> int:
|
||
return zlib.crc32(data, seed) & 0xFFFFFFFF
|
||
|
||
|
||
# --------------------------------------------------------------------
|
||
# Support classes. Suitable for PNG and related formats like MNG etc.
|
||
|
||
|
||
class ChunkStream:
|
||
def __init__(self, fp: IO[bytes]) -> None:
|
||
self.fp: IO[bytes] | None = fp
|
||
self.queue: list[tuple[bytes, int, int]] | None = []
|
||
|
||
def read(self) -> tuple[bytes, int, int]:
|
||
"""Fetch a new chunk. Returns header information."""
|
||
cid = None
|
||
|
||
assert self.fp is not None
|
||
if self.queue:
|
||
cid, pos, length = self.queue.pop()
|
||
self.fp.seek(pos)
|
||
else:
|
||
s = self.fp.read(8)
|
||
cid = s[4:]
|
||
pos = self.fp.tell()
|
||
length = i32(s)
|
||
|
||
if not is_cid(cid):
|
||
if not ImageFile.LOAD_TRUNCATED_IMAGES:
|
||
msg = f"broken PNG file (chunk {repr(cid)})"
|
||
raise SyntaxError(msg)
|
||
|
||
return cid, pos, length
|
||
|
||
def __enter__(self) -> ChunkStream:
|
||
return self
|
||
|
||
def __exit__(self, *args: object) -> None:
|
||
self.close()
|
||
|
||
def close(self) -> None:
|
||
self.queue = self.fp = None
|
||
|
||
def push(self, cid: bytes, pos: int, length: int) -> None:
|
||
assert self.queue is not None
|
||
self.queue.append((cid, pos, length))
|
||
|
||
def call(self, cid: bytes, pos: int, length: int) -> bytes:
|
||
"""Call the appropriate chunk handler"""
|
||
|
||
logger.debug("STREAM %r %s %s", cid, pos, length)
|
||
return getattr(self, f"chunk_{cid.decode('ascii')}")(pos, length)
|
||
|
||
def crc(self, cid: bytes, data: bytes) -> None:
|
||
"""Read and verify checksum"""
|
||
|
||
# Skip CRC checks for ancillary chunks if allowed to load truncated
|
||
# images
|
||
# 5th byte of first char is 1 [specs, section 5.4]
|
||
if ImageFile.LOAD_TRUNCATED_IMAGES and (cid[0] >> 5 & 1):
|
||
self.crc_skip(cid, data)
|
||
return
|
||
|
||
assert self.fp is not None
|
||
try:
|
||
crc1 = _crc32(data, _crc32(cid))
|
||
crc2 = i32(self.fp.read(4))
|
||
if crc1 != crc2:
|
||
msg = f"broken PNG file (bad header checksum in {repr(cid)})"
|
||
raise SyntaxError(msg)
|
||
except struct.error as e:
|
||
msg = f"broken PNG file (incomplete checksum in {repr(cid)})"
|
||
raise SyntaxError(msg) from e
|
||
|
||
def crc_skip(self, cid: bytes, data: bytes) -> None:
|
||
"""Read checksum"""
|
||
|
||
assert self.fp is not None
|
||
self.fp.read(4)
|
||
|
||
def verify(self, endchunk: bytes = b"IEND") -> list[bytes]:
|
||
# Simple approach; just calculate checksum for all remaining
|
||
# blocks. Must be called directly after open.
|
||
|
||
cids = []
|
||
|
||
assert self.fp is not None
|
||
while True:
|
||
try:
|
||
cid, pos, length = self.read()
|
||
except struct.error as e:
|
||
msg = "truncated PNG file"
|
||
raise OSError(msg) from e
|
||
|
||
if cid == endchunk:
|
||
break
|
||
self.crc(cid, ImageFile._safe_read(self.fp, length))
|
||
cids.append(cid)
|
||
|
||
return cids
|
||
|
||
|
||
class iTXt(str):
|
||
"""
|
||
Subclass of string to allow iTXt chunks to look like strings while
|
||
keeping their extra information
|
||
|
||
"""
|
||
|
||
lang: str | bytes | None
|
||
tkey: str | bytes | None
|
||
|
||
@staticmethod
|
||
def __new__(
|
||
cls, text: str, lang: str | None = None, tkey: str | None = None
|
||
) -> iTXt:
|
||
"""
|
||
:param cls: the class to use when creating the instance
|
||
:param text: value for this key
|
||
:param lang: language code
|
||
:param tkey: UTF-8 version of the key name
|
||
"""
|
||
|
||
self = str.__new__(cls, text)
|
||
self.lang = lang
|
||
self.tkey = tkey
|
||
return self
|
||
|
||
|
||
class PngInfo:
|
||
"""
|
||
PNG chunk container (for use with save(pnginfo=))
|
||
|
||
"""
|
||
|
||
def __init__(self) -> None:
|
||
self.chunks: list[tuple[bytes, bytes, bool]] = []
|
||
|
||
def add(self, cid: bytes, data: bytes, after_idat: bool = False) -> None:
|
||
"""Appends an arbitrary chunk. Use with caution.
|
||
|
||
:param cid: a byte string, 4 bytes long.
|
||
:param data: a byte string of the encoded data
|
||
:param after_idat: for use with private chunks. Whether the chunk
|
||
should be written after IDAT
|
||
|
||
"""
|
||
|
||
self.chunks.append((cid, data, after_idat))
|
||
|
||
def add_itxt(
|
||
self,
|
||
key: str | bytes,
|
||
value: str | bytes,
|
||
lang: str | bytes = "",
|
||
tkey: str | bytes = "",
|
||
zip: bool = False,
|
||
) -> None:
|
||
"""Appends an iTXt chunk.
|
||
|
||
:param key: latin-1 encodable text key name
|
||
:param value: value for this key
|
||
:param lang: language code
|
||
:param tkey: UTF-8 version of the key name
|
||
:param zip: compression flag
|
||
|
||
"""
|
||
|
||
if not isinstance(key, bytes):
|
||
key = key.encode("latin-1", "strict")
|
||
if not isinstance(value, bytes):
|
||
value = value.encode("utf-8", "strict")
|
||
if not isinstance(lang, bytes):
|
||
lang = lang.encode("utf-8", "strict")
|
||
if not isinstance(tkey, bytes):
|
||
tkey = tkey.encode("utf-8", "strict")
|
||
|
||
if zip:
|
||
self.add(
|
||
b"iTXt",
|
||
key + b"\0\x01\0" + lang + b"\0" + tkey + b"\0" + zlib.compress(value),
|
||
)
|
||
else:
|
||
self.add(b"iTXt", key + b"\0\0\0" + lang + b"\0" + tkey + b"\0" + value)
|
||
|
||
def add_text(
|
||
self, key: str | bytes, value: str | bytes | iTXt, zip: bool = False
|
||
) -> None:
|
||
"""Appends a text chunk.
|
||
|
||
:param key: latin-1 encodable text key name
|
||
:param value: value for this key, text or an
|
||
:py:class:`PIL.PngImagePlugin.iTXt` instance
|
||
:param zip: compression flag
|
||
|
||
"""
|
||
if isinstance(value, iTXt):
|
||
return self.add_itxt(
|
||
key,
|
||
value,
|
||
value.lang if value.lang is not None else b"",
|
||
value.tkey if value.tkey is not None else b"",
|
||
zip=zip,
|
||
)
|
||
|
||
# The tEXt chunk stores latin-1 text
|
||
if not isinstance(value, bytes):
|
||
try:
|
||
value = value.encode("latin-1", "strict")
|
||
except UnicodeError:
|
||
return self.add_itxt(key, value, zip=zip)
|
||
|
||
if not isinstance(key, bytes):
|
||
key = key.encode("latin-1", "strict")
|
||
|
||
if zip:
|
||
self.add(b"zTXt", key + b"\0\0" + zlib.compress(value))
|
||
else:
|
||
self.add(b"tEXt", key + b"\0" + value)
|
||
|
||
|
||
# --------------------------------------------------------------------
|
||
# PNG image stream (IHDR/IEND)
|
||
|
||
|
||
class _RewindState(NamedTuple):
|
||
info: dict[str | tuple[int, int], Any]
|
||
tile: list[ImageFile._Tile]
|
||
seq_num: int | None
|
||
|
||
|
||
class PngStream(ChunkStream):
|
||
def __init__(self, fp: IO[bytes]) -> None:
|
||
super().__init__(fp)
|
||
|
||
# local copies of Image attributes
|
||
self.im_info: dict[str | tuple[int, int], Any] = {}
|
||
self.im_text: dict[str, str | iTXt] = {}
|
||
self.im_size = (0, 0)
|
||
self.im_mode = ""
|
||
self.im_tile: list[ImageFile._Tile] = []
|
||
self.im_palette: tuple[str, bytes] | None = None
|
||
self.im_custom_mimetype: str | None = None
|
||
self.im_n_frames: int | None = None
|
||
self._seq_num: int | None = None
|
||
self.rewind_state = _RewindState({}, [], None)
|
||
|
||
self.text_memory = 0
|
||
|
||
def check_text_memory(self, chunklen: int) -> None:
|
||
self.text_memory += chunklen
|
||
if self.text_memory > MAX_TEXT_MEMORY:
|
||
msg = (
|
||
"Too much memory used in text chunks: "
|
||
f"{self.text_memory}>MAX_TEXT_MEMORY"
|
||
)
|
||
raise ValueError(msg)
|
||
|
||
def save_rewind(self) -> None:
|
||
self.rewind_state = _RewindState(
|
||
self.im_info.copy(),
|
||
self.im_tile,
|
||
self._seq_num,
|
||
)
|
||
|
||
def rewind(self) -> None:
|
||
self.im_info = self.rewind_state.info.copy()
|
||
self.im_tile = self.rewind_state.tile
|
||
self._seq_num = self.rewind_state.seq_num
|
||
|
||
def chunk_iCCP(self, pos: int, length: int) -> bytes:
|
||
# ICC profile
|
||
assert self.fp is not None
|
||
s = ImageFile._safe_read(self.fp, length)
|
||
# according to PNG spec, the iCCP chunk contains:
|
||
# Profile name 1-79 bytes (character string)
|
||
# Null separator 1 byte (null character)
|
||
# Compression method 1 byte (0)
|
||
# Compressed profile n bytes (zlib with deflate compression)
|
||
i = s.find(b"\0")
|
||
logger.debug("iCCP profile name %r", s[:i])
|
||
comp_method = s[i + 1]
|
||
logger.debug("Compression method %s", comp_method)
|
||
if comp_method != 0:
|
||
msg = f"Unknown compression method {comp_method} in iCCP chunk"
|
||
raise SyntaxError(msg)
|
||
try:
|
||
icc_profile = _safe_zlib_decompress(s[i + 2 :])
|
||
except ValueError:
|
||
if ImageFile.LOAD_TRUNCATED_IMAGES:
|
||
icc_profile = None
|
||
else:
|
||
raise
|
||
except zlib.error:
|
||
icc_profile = None # FIXME
|
||
self.im_info["icc_profile"] = icc_profile
|
||
return s
|
||
|
||
def chunk_IHDR(self, pos: int, length: int) -> bytes:
|
||
# image header
|
||
assert self.fp is not None
|
||
s = ImageFile._safe_read(self.fp, length)
|
||
if length < 13:
|
||
if ImageFile.LOAD_TRUNCATED_IMAGES:
|
||
return s
|
||
msg = "Truncated IHDR chunk"
|
||
raise ValueError(msg)
|
||
self.im_size = i32(s, 0), i32(s, 4)
|
||
try:
|
||
self.im_mode, self.im_rawmode = _MODES[(s[8], s[9])]
|
||
except Exception:
|
||
pass
|
||
if s[12]:
|
||
self.im_info["interlace"] = 1
|
||
if s[11]:
|
||
msg = "unknown filter category"
|
||
raise SyntaxError(msg)
|
||
return s
|
||
|
||
def chunk_IDAT(self, pos: int, length: int) -> NoReturn:
|
||
# image data
|
||
if "bbox" in self.im_info:
|
||
tile = [ImageFile._Tile("zip", self.im_info["bbox"], pos, self.im_rawmode)]
|
||
else:
|
||
if self.im_n_frames is not None:
|
||
self.im_info["default_image"] = True
|
||
tile = [ImageFile._Tile("zip", (0, 0) + self.im_size, pos, self.im_rawmode)]
|
||
self.im_tile = tile
|
||
self.im_idat = length
|
||
msg = "image data found"
|
||
raise EOFError(msg)
|
||
|
||
def chunk_IEND(self, pos: int, length: int) -> NoReturn:
|
||
msg = "end of PNG image"
|
||
raise EOFError(msg)
|
||
|
||
def chunk_PLTE(self, pos: int, length: int) -> bytes:
|
||
# palette
|
||
assert self.fp is not None
|
||
s = ImageFile._safe_read(self.fp, length)
|
||
if self.im_mode == "P":
|
||
self.im_palette = "RGB", s
|
||
return s
|
||
|
||
def chunk_tRNS(self, pos: int, length: int) -> bytes:
|
||
# transparency
|
||
assert self.fp is not None
|
||
s = ImageFile._safe_read(self.fp, length)
|
||
if self.im_mode == "P":
|
||
if _simple_palette.match(s):
|
||
# tRNS contains only one full-transparent entry,
|
||
# other entries are full opaque
|
||
i = s.find(b"\0")
|
||
if i >= 0:
|
||
self.im_info["transparency"] = i
|
||
else:
|
||
# otherwise, we have a byte string with one alpha value
|
||
# for each palette entry
|
||
self.im_info["transparency"] = s
|
||
elif self.im_mode in ("1", "L", "I;16"):
|
||
self.im_info["transparency"] = i16(s)
|
||
elif self.im_mode == "RGB":
|
||
self.im_info["transparency"] = i16(s), i16(s, 2), i16(s, 4)
|
||
return s
|
||
|
||
def chunk_gAMA(self, pos: int, length: int) -> bytes:
|
||
# gamma setting
|
||
assert self.fp is not None
|
||
s = ImageFile._safe_read(self.fp, length)
|
||
self.im_info["gamma"] = i32(s) / 100000.0
|
||
return s
|
||
|
||
def chunk_cHRM(self, pos: int, length: int) -> bytes:
|
||
# chromaticity, 8 unsigned ints, actual value is scaled by 100,000
|
||
# WP x,y, Red x,y, Green x,y Blue x,y
|
||
|
||
assert self.fp is not None
|
||
s = ImageFile._safe_read(self.fp, length)
|
||
raw_vals = struct.unpack(">%dI" % (len(s) // 4), s)
|
||
self.im_info["chromaticity"] = tuple(elt / 100000.0 for elt in raw_vals)
|
||
return s
|
||
|
||
def chunk_sRGB(self, pos: int, length: int) -> bytes:
|
||
# srgb rendering intent, 1 byte
|
||
# 0 perceptual
|
||
# 1 relative colorimetric
|
||
# 2 saturation
|
||
# 3 absolute colorimetric
|
||
|
||
assert self.fp is not None
|
||
s = ImageFile._safe_read(self.fp, length)
|
||
if length < 1:
|
||
if ImageFile.LOAD_TRUNCATED_IMAGES:
|
||
return s
|
||
msg = "Truncated sRGB chunk"
|
||
raise ValueError(msg)
|
||
self.im_info["srgb"] = s[0]
|
||
return s
|
||
|
||
def chunk_pHYs(self, pos: int, length: int) -> bytes:
|
||
# pixels per unit
|
||
assert self.fp is not None
|
||
s = ImageFile._safe_read(self.fp, length)
|
||
if length < 9:
|
||
if ImageFile.LOAD_TRUNCATED_IMAGES:
|
||
return s
|
||
msg = "Truncated pHYs chunk"
|
||
raise ValueError(msg)
|
||
px, py = i32(s, 0), i32(s, 4)
|
||
unit = s[8]
|
||
if unit == 1: # meter
|
||
dpi = px * 0.0254, py * 0.0254
|
||
self.im_info["dpi"] = dpi
|
||
elif unit == 0:
|
||
self.im_info["aspect"] = px, py
|
||
return s
|
||
|
||
def chunk_tEXt(self, pos: int, length: int) -> bytes:
|
||
# text
|
||
assert self.fp is not None
|
||
s = ImageFile._safe_read(self.fp, length)
|
||
try:
|
||
k, v = s.split(b"\0", 1)
|
||
except ValueError:
|
||
# fallback for broken tEXt tags
|
||
k = s
|
||
v = b""
|
||
if k:
|
||
k_str = k.decode("latin-1", "strict")
|
||
v_str = v.decode("latin-1", "replace")
|
||
|
||
self.im_info[k_str] = v if k == b"exif" else v_str
|
||
self.im_text[k_str] = v_str
|
||
self.check_text_memory(len(v_str))
|
||
|
||
return s
|
||
|
||
def chunk_zTXt(self, pos: int, length: int) -> bytes:
|
||
# compressed text
|
||
assert self.fp is not None
|
||
s = ImageFile._safe_read(self.fp, length)
|
||
try:
|
||
k, v = s.split(b"\0", 1)
|
||
except ValueError:
|
||
k = s
|
||
v = b""
|
||
if v:
|
||
comp_method = v[0]
|
||
else:
|
||
comp_method = 0
|
||
if comp_method != 0:
|
||
msg = f"Unknown compression method {comp_method} in zTXt chunk"
|
||
raise SyntaxError(msg)
|
||
try:
|
||
v = _safe_zlib_decompress(v[1:])
|
||
except ValueError:
|
||
if ImageFile.LOAD_TRUNCATED_IMAGES:
|
||
v = b""
|
||
else:
|
||
raise
|
||
except zlib.error:
|
||
v = b""
|
||
|
||
if k:
|
||
k_str = k.decode("latin-1", "strict")
|
||
v_str = v.decode("latin-1", "replace")
|
||
|
||
self.im_info[k_str] = self.im_text[k_str] = v_str
|
||
self.check_text_memory(len(v_str))
|
||
|
||
return s
|
||
|
||
def chunk_iTXt(self, pos: int, length: int) -> bytes:
|
||
# international text
|
||
assert self.fp is not None
|
||
r = s = ImageFile._safe_read(self.fp, length)
|
||
try:
|
||
k, r = r.split(b"\0", 1)
|
||
except ValueError:
|
||
return s
|
||
if len(r) < 2:
|
||
return s
|
||
cf, cm, r = r[0], r[1], r[2:]
|
||
try:
|
||
lang, tk, v = r.split(b"\0", 2)
|
||
except ValueError:
|
||
return s
|
||
if cf != 0:
|
||
if cm == 0:
|
||
try:
|
||
v = _safe_zlib_decompress(v)
|
||
except ValueError:
|
||
if ImageFile.LOAD_TRUNCATED_IMAGES:
|
||
return s
|
||
else:
|
||
raise
|
||
except zlib.error:
|
||
return s
|
||
else:
|
||
return s
|
||
if k == b"XML:com.adobe.xmp":
|
||
self.im_info["xmp"] = v
|
||
try:
|
||
k_str = k.decode("latin-1", "strict")
|
||
lang_str = lang.decode("utf-8", "strict")
|
||
tk_str = tk.decode("utf-8", "strict")
|
||
v_str = v.decode("utf-8", "strict")
|
||
except UnicodeError:
|
||
return s
|
||
|
||
self.im_info[k_str] = self.im_text[k_str] = iTXt(v_str, lang_str, tk_str)
|
||
self.check_text_memory(len(v_str))
|
||
|
||
return s
|
||
|
||
def chunk_eXIf(self, pos: int, length: int) -> bytes:
|
||
assert self.fp is not None
|
||
s = ImageFile._safe_read(self.fp, length)
|
||
self.im_info["exif"] = b"Exif\x00\x00" + s
|
||
return s
|
||
|
||
# APNG chunks
|
||
def chunk_acTL(self, pos: int, length: int) -> bytes:
|
||
assert self.fp is not None
|
||
s = ImageFile._safe_read(self.fp, length)
|
||
if length < 8:
|
||
if ImageFile.LOAD_TRUNCATED_IMAGES:
|
||
return s
|
||
msg = "APNG contains truncated acTL chunk"
|
||
raise ValueError(msg)
|
||
if self.im_n_frames is not None:
|
||
self.im_n_frames = None
|
||
warnings.warn("Invalid APNG, will use default PNG image if possible")
|
||
return s
|
||
n_frames = i32(s)
|
||
if n_frames == 0 or n_frames > 0x80000000:
|
||
warnings.warn("Invalid APNG, will use default PNG image if possible")
|
||
return s
|
||
self.im_n_frames = n_frames
|
||
self.im_info["loop"] = i32(s, 4)
|
||
self.im_custom_mimetype = "image/apng"
|
||
return s
|
||
|
||
def chunk_fcTL(self, pos: int, length: int) -> bytes:
|
||
assert self.fp is not None
|
||
s = ImageFile._safe_read(self.fp, length)
|
||
if length < 26:
|
||
if ImageFile.LOAD_TRUNCATED_IMAGES:
|
||
return s
|
||
msg = "APNG contains truncated fcTL chunk"
|
||
raise ValueError(msg)
|
||
seq = i32(s)
|
||
if (self._seq_num is None and seq != 0) or (
|
||
self._seq_num is not None and self._seq_num != seq - 1
|
||
):
|
||
msg = "APNG contains frame sequence errors"
|
||
raise SyntaxError(msg)
|
||
self._seq_num = seq
|
||
width, height = i32(s, 4), i32(s, 8)
|
||
px, py = i32(s, 12), i32(s, 16)
|
||
im_w, im_h = self.im_size
|
||
if px + width > im_w or py + height > im_h:
|
||
msg = "APNG contains invalid frames"
|
||
raise SyntaxError(msg)
|
||
self.im_info["bbox"] = (px, py, px + width, py + height)
|
||
delay_num, delay_den = i16(s, 20), i16(s, 22)
|
||
if delay_den == 0:
|
||
delay_den = 100
|
||
self.im_info["duration"] = float(delay_num) / float(delay_den) * 1000
|
||
self.im_info["disposal"] = s[24]
|
||
self.im_info["blend"] = s[25]
|
||
return s
|
||
|
||
def chunk_fdAT(self, pos: int, length: int) -> bytes:
|
||
assert self.fp is not None
|
||
if length < 4:
|
||
if ImageFile.LOAD_TRUNCATED_IMAGES:
|
||
s = ImageFile._safe_read(self.fp, length)
|
||
return s
|
||
msg = "APNG contains truncated fDAT chunk"
|
||
raise ValueError(msg)
|
||
s = ImageFile._safe_read(self.fp, 4)
|
||
seq = i32(s)
|
||
if self._seq_num != seq - 1:
|
||
msg = "APNG contains frame sequence errors"
|
||
raise SyntaxError(msg)
|
||
self._seq_num = seq
|
||
return self.chunk_IDAT(pos + 4, length - 4)
|
||
|
||
|
||
# --------------------------------------------------------------------
|
||
# PNG reader
|
||
|
||
|
||
def _accept(prefix: bytes) -> bool:
|
||
return prefix[:8] == _MAGIC
|
||
|
||
|
||
##
|
||
# Image plugin for PNG images.
|
||
|
||
|
||
class PngImageFile(ImageFile.ImageFile):
|
||
format = "PNG"
|
||
format_description = "Portable network graphics"
|
||
|
||
def _open(self) -> None:
|
||
if not _accept(self.fp.read(8)):
|
||
msg = "not a PNG file"
|
||
raise SyntaxError(msg)
|
||
self._fp = self.fp
|
||
self.__frame = 0
|
||
|
||
#
|
||
# Parse headers up to the first IDAT or fDAT chunk
|
||
|
||
self.private_chunks: list[tuple[bytes, bytes] | tuple[bytes, bytes, bool]] = []
|
||
self.png: PngStream | None = PngStream(self.fp)
|
||
|
||
while True:
|
||
#
|
||
# get next chunk
|
||
|
||
cid, pos, length = self.png.read()
|
||
|
||
try:
|
||
s = self.png.call(cid, pos, length)
|
||
except EOFError:
|
||
break
|
||
except AttributeError:
|
||
logger.debug("%r %s %s (unknown)", cid, pos, length)
|
||
s = ImageFile._safe_read(self.fp, length)
|
||
if cid[1:2].islower():
|
||
self.private_chunks.append((cid, s))
|
||
|
||
self.png.crc(cid, s)
|
||
|
||
#
|
||
# Copy relevant attributes from the PngStream. An alternative
|
||
# would be to let the PngStream class modify these attributes
|
||
# directly, but that introduces circular references which are
|
||
# difficult to break if things go wrong in the decoder...
|
||
# (believe me, I've tried ;-)
|
||
|
||
self._mode = self.png.im_mode
|
||
self._size = self.png.im_size
|
||
self.info = self.png.im_info
|
||
self._text: dict[str, str | iTXt] | None = None
|
||
self.tile = self.png.im_tile
|
||
self.custom_mimetype = self.png.im_custom_mimetype
|
||
self.n_frames = self.png.im_n_frames or 1
|
||
self.default_image = self.info.get("default_image", False)
|
||
|
||
if self.png.im_palette:
|
||
rawmode, data = self.png.im_palette
|
||
self.palette = ImagePalette.raw(rawmode, data)
|
||
|
||
if cid == b"fdAT":
|
||
self.__prepare_idat = length - 4
|
||
else:
|
||
self.__prepare_idat = length # used by load_prepare()
|
||
|
||
if self.png.im_n_frames is not None:
|
||
self._close_exclusive_fp_after_loading = False
|
||
self.png.save_rewind()
|
||
self.__rewind_idat = self.__prepare_idat
|
||
self.__rewind = self._fp.tell()
|
||
if self.default_image:
|
||
# IDAT chunk contains default image and not first animation frame
|
||
self.n_frames += 1
|
||
self._seek(0)
|
||
self.is_animated = self.n_frames > 1
|
||
|
||
@property
|
||
def text(self) -> dict[str, str | iTXt]:
|
||
# experimental
|
||
if self._text is None:
|
||
# iTxt, tEXt and zTXt chunks may appear at the end of the file
|
||
# So load the file to ensure that they are read
|
||
if self.is_animated:
|
||
frame = self.__frame
|
||
# for APNG, seek to the final frame before loading
|
||
self.seek(self.n_frames - 1)
|
||
self.load()
|
||
if self.is_animated:
|
||
self.seek(frame)
|
||
assert self._text is not None
|
||
return self._text
|
||
|
||
def verify(self) -> None:
|
||
"""Verify PNG file"""
|
||
|
||
if self.fp is None:
|
||
msg = "verify must be called directly after open"
|
||
raise RuntimeError(msg)
|
||
|
||
# back up to beginning of IDAT block
|
||
self.fp.seek(self.tile[0][2] - 8)
|
||
|
||
assert self.png is not None
|
||
self.png.verify()
|
||
self.png.close()
|
||
|
||
if self._exclusive_fp:
|
||
self.fp.close()
|
||
self.fp = None
|
||
|
||
def seek(self, frame: int) -> None:
|
||
if not self._seek_check(frame):
|
||
return
|
||
if frame < self.__frame:
|
||
self._seek(0, True)
|
||
|
||
last_frame = self.__frame
|
||
for f in range(self.__frame + 1, frame + 1):
|
||
try:
|
||
self._seek(f)
|
||
except EOFError as e:
|
||
self.seek(last_frame)
|
||
msg = "no more images in APNG file"
|
||
raise EOFError(msg) from e
|
||
|
||
def _seek(self, frame: int, rewind: bool = False) -> None:
|
||
assert self.png is not None
|
||
|
||
self.dispose: _imaging.ImagingCore | None
|
||
dispose_extent = None
|
||
if frame == 0:
|
||
if rewind:
|
||
self._fp.seek(self.__rewind)
|
||
self.png.rewind()
|
||
self.__prepare_idat = self.__rewind_idat
|
||
self._im = None
|
||
self.info = self.png.im_info
|
||
self.tile = self.png.im_tile
|
||
self.fp = self._fp
|
||
self._prev_im = None
|
||
self.dispose = None
|
||
self.default_image = self.info.get("default_image", False)
|
||
self.dispose_op = self.info.get("disposal")
|
||
self.blend_op = self.info.get("blend")
|
||
dispose_extent = self.info.get("bbox")
|
||
self.__frame = 0
|
||
else:
|
||
if frame != self.__frame + 1:
|
||
msg = f"cannot seek to frame {frame}"
|
||
raise ValueError(msg)
|
||
|
||
# ensure previous frame was loaded
|
||
self.load()
|
||
|
||
if self.dispose:
|
||
self.im.paste(self.dispose, self.dispose_extent)
|
||
self._prev_im = self.im.copy()
|
||
|
||
self.fp = self._fp
|
||
|
||
# advance to the next frame
|
||
if self.__prepare_idat:
|
||
ImageFile._safe_read(self.fp, self.__prepare_idat)
|
||
self.__prepare_idat = 0
|
||
frame_start = False
|
||
while True:
|
||
self.fp.read(4) # CRC
|
||
|
||
try:
|
||
cid, pos, length = self.png.read()
|
||
except (struct.error, SyntaxError):
|
||
break
|
||
|
||
if cid == b"IEND":
|
||
msg = "No more images in APNG file"
|
||
raise EOFError(msg)
|
||
if cid == b"fcTL":
|
||
if frame_start:
|
||
# there must be at least one fdAT chunk between fcTL chunks
|
||
msg = "APNG missing frame data"
|
||
raise SyntaxError(msg)
|
||
frame_start = True
|
||
|
||
try:
|
||
self.png.call(cid, pos, length)
|
||
except UnicodeDecodeError:
|
||
break
|
||
except EOFError:
|
||
if cid == b"fdAT":
|
||
length -= 4
|
||
if frame_start:
|
||
self.__prepare_idat = length
|
||
break
|
||
ImageFile._safe_read(self.fp, length)
|
||
except AttributeError:
|
||
logger.debug("%r %s %s (unknown)", cid, pos, length)
|
||
ImageFile._safe_read(self.fp, length)
|
||
|
||
self.__frame = frame
|
||
self.tile = self.png.im_tile
|
||
self.dispose_op = self.info.get("disposal")
|
||
self.blend_op = self.info.get("blend")
|
||
dispose_extent = self.info.get("bbox")
|
||
|
||
if not self.tile:
|
||
msg = "image not found in APNG frame"
|
||
raise EOFError(msg)
|
||
if dispose_extent:
|
||
self.dispose_extent: tuple[float, float, float, float] = dispose_extent
|
||
|
||
# setup frame disposal (actual disposal done when needed in the next _seek())
|
||
if self._prev_im is None and self.dispose_op == Disposal.OP_PREVIOUS:
|
||
self.dispose_op = Disposal.OP_BACKGROUND
|
||
|
||
self.dispose = None
|
||
if self.dispose_op == Disposal.OP_PREVIOUS:
|
||
if self._prev_im:
|
||
self.dispose = self._prev_im.copy()
|
||
self.dispose = self._crop(self.dispose, self.dispose_extent)
|
||
elif self.dispose_op == Disposal.OP_BACKGROUND:
|
||
self.dispose = Image.core.fill(self.mode, self.size)
|
||
self.dispose = self._crop(self.dispose, self.dispose_extent)
|
||
|
||
def tell(self) -> int:
|
||
return self.__frame
|
||
|
||
def load_prepare(self) -> None:
|
||
"""internal: prepare to read PNG file"""
|
||
|
||
if self.info.get("interlace"):
|
||
self.decoderconfig = self.decoderconfig + (1,)
|
||
|
||
self.__idat = self.__prepare_idat # used by load_read()
|
||
ImageFile.ImageFile.load_prepare(self)
|
||
|
||
def load_read(self, read_bytes: int) -> bytes:
|
||
"""internal: read more image data"""
|
||
|
||
assert self.png is not None
|
||
while self.__idat == 0:
|
||
# end of chunk, skip forward to next one
|
||
|
||
self.fp.read(4) # CRC
|
||
|
||
cid, pos, length = self.png.read()
|
||
|
||
if cid not in [b"IDAT", b"DDAT", b"fdAT"]:
|
||
self.png.push(cid, pos, length)
|
||
return b""
|
||
|
||
if cid == b"fdAT":
|
||
try:
|
||
self.png.call(cid, pos, length)
|
||
except EOFError:
|
||
pass
|
||
self.__idat = length - 4 # sequence_num has already been read
|
||
else:
|
||
self.__idat = length # empty chunks are allowed
|
||
|
||
# read more data from this chunk
|
||
if read_bytes <= 0:
|
||
read_bytes = self.__idat
|
||
else:
|
||
read_bytes = min(read_bytes, self.__idat)
|
||
|
||
self.__idat = self.__idat - read_bytes
|
||
|
||
return self.fp.read(read_bytes)
|
||
|
||
def load_end(self) -> None:
|
||
"""internal: finished reading image data"""
|
||
assert self.png is not None
|
||
if self.__idat != 0:
|
||
self.fp.read(self.__idat)
|
||
while True:
|
||
self.fp.read(4) # CRC
|
||
|
||
try:
|
||
cid, pos, length = self.png.read()
|
||
except (struct.error, SyntaxError):
|
||
break
|
||
|
||
if cid == b"IEND":
|
||
break
|
||
elif cid == b"fcTL" and self.is_animated:
|
||
# start of the next frame, stop reading
|
||
self.__prepare_idat = 0
|
||
self.png.push(cid, pos, length)
|
||
break
|
||
|
||
try:
|
||
self.png.call(cid, pos, length)
|
||
except UnicodeDecodeError:
|
||
break
|
||
except EOFError:
|
||
if cid == b"fdAT":
|
||
length -= 4
|
||
try:
|
||
ImageFile._safe_read(self.fp, length)
|
||
except OSError as e:
|
||
if ImageFile.LOAD_TRUNCATED_IMAGES:
|
||
break
|
||
else:
|
||
raise e
|
||
except AttributeError:
|
||
logger.debug("%r %s %s (unknown)", cid, pos, length)
|
||
s = ImageFile._safe_read(self.fp, length)
|
||
if cid[1:2].islower():
|
||
self.private_chunks.append((cid, s, True))
|
||
self._text = self.png.im_text
|
||
if not self.is_animated:
|
||
self.png.close()
|
||
self.png = None
|
||
else:
|
||
if self._prev_im and self.blend_op == Blend.OP_OVER:
|
||
updated = self._crop(self.im, self.dispose_extent)
|
||
if self.im.mode == "RGB" and "transparency" in self.info:
|
||
mask = updated.convert_transparent(
|
||
"RGBA", self.info["transparency"]
|
||
)
|
||
else:
|
||
mask = updated.convert("RGBA")
|
||
self._prev_im.paste(updated, self.dispose_extent, mask)
|
||
self.im = self._prev_im
|
||
|
||
def _getexif(self) -> dict[int, Any] | None:
|
||
if "exif" not in self.info:
|
||
self.load()
|
||
if "exif" not in self.info and "Raw profile type exif" not in self.info:
|
||
return None
|
||
return self.getexif()._get_merged_dict()
|
||
|
||
def getexif(self) -> Image.Exif:
|
||
if "exif" not in self.info:
|
||
self.load()
|
||
|
||
return super().getexif()
|
||
|
||
|
||
# --------------------------------------------------------------------
|
||
# PNG writer
|
||
|
||
_OUTMODES = {
|
||
# supported PIL modes, and corresponding rawmode, bit depth and color type
|
||
"1": ("1", b"\x01", b"\x00"),
|
||
"L;1": ("L;1", b"\x01", b"\x00"),
|
||
"L;2": ("L;2", b"\x02", b"\x00"),
|
||
"L;4": ("L;4", b"\x04", b"\x00"),
|
||
"L": ("L", b"\x08", b"\x00"),
|
||
"LA": ("LA", b"\x08", b"\x04"),
|
||
"I": ("I;16B", b"\x10", b"\x00"),
|
||
"I;16": ("I;16B", b"\x10", b"\x00"),
|
||
"I;16B": ("I;16B", b"\x10", b"\x00"),
|
||
"P;1": ("P;1", b"\x01", b"\x03"),
|
||
"P;2": ("P;2", b"\x02", b"\x03"),
|
||
"P;4": ("P;4", b"\x04", b"\x03"),
|
||
"P": ("P", b"\x08", b"\x03"),
|
||
"RGB": ("RGB", b"\x08", b"\x02"),
|
||
"RGBA": ("RGBA", b"\x08", b"\x06"),
|
||
}
|
||
|
||
|
||
def putchunk(fp: IO[bytes], cid: bytes, *data: bytes) -> None:
|
||
"""Write a PNG chunk (including CRC field)"""
|
||
|
||
byte_data = b"".join(data)
|
||
|
||
fp.write(o32(len(byte_data)) + cid)
|
||
fp.write(byte_data)
|
||
crc = _crc32(byte_data, _crc32(cid))
|
||
fp.write(o32(crc))
|
||
|
||
|
||
class _idat:
|
||
# wrap output from the encoder in IDAT chunks
|
||
|
||
def __init__(self, fp: IO[bytes], chunk: Callable[..., None]) -> None:
|
||
self.fp = fp
|
||
self.chunk = chunk
|
||
|
||
def write(self, data: bytes) -> None:
|
||
self.chunk(self.fp, b"IDAT", data)
|
||
|
||
|
||
class _fdat:
|
||
# wrap encoder output in fdAT chunks
|
||
|
||
def __init__(self, fp: IO[bytes], chunk: Callable[..., None], seq_num: int) -> None:
|
||
self.fp = fp
|
||
self.chunk = chunk
|
||
self.seq_num = seq_num
|
||
|
||
def write(self, data: bytes) -> None:
|
||
self.chunk(self.fp, b"fdAT", o32(self.seq_num), data)
|
||
self.seq_num += 1
|
||
|
||
|
||
class _Frame(NamedTuple):
|
||
im: Image.Image
|
||
bbox: tuple[int, int, int, int] | None
|
||
encoderinfo: dict[str, Any]
|
||
|
||
|
||
def _write_multiple_frames(
|
||
im: Image.Image,
|
||
fp: IO[bytes],
|
||
chunk: Callable[..., None],
|
||
mode: str,
|
||
rawmode: str,
|
||
default_image: Image.Image | None,
|
||
append_images: list[Image.Image],
|
||
) -> Image.Image | None:
|
||
duration = im.encoderinfo.get("duration")
|
||
loop = im.encoderinfo.get("loop", im.info.get("loop", 0))
|
||
disposal = im.encoderinfo.get("disposal", im.info.get("disposal", Disposal.OP_NONE))
|
||
blend = im.encoderinfo.get("blend", im.info.get("blend", Blend.OP_SOURCE))
|
||
|
||
if default_image:
|
||
chain = itertools.chain(append_images)
|
||
else:
|
||
chain = itertools.chain([im], append_images)
|
||
|
||
im_frames: list[_Frame] = []
|
||
frame_count = 0
|
||
for im_seq in chain:
|
||
for im_frame in ImageSequence.Iterator(im_seq):
|
||
if im_frame.mode == mode:
|
||
im_frame = im_frame.copy()
|
||
else:
|
||
im_frame = im_frame.convert(mode)
|
||
encoderinfo = im.encoderinfo.copy()
|
||
if isinstance(duration, (list, tuple)):
|
||
encoderinfo["duration"] = duration[frame_count]
|
||
elif duration is None and "duration" in im_frame.info:
|
||
encoderinfo["duration"] = im_frame.info["duration"]
|
||
if isinstance(disposal, (list, tuple)):
|
||
encoderinfo["disposal"] = disposal[frame_count]
|
||
if isinstance(blend, (list, tuple)):
|
||
encoderinfo["blend"] = blend[frame_count]
|
||
frame_count += 1
|
||
|
||
if im_frames:
|
||
previous = im_frames[-1]
|
||
prev_disposal = previous.encoderinfo.get("disposal")
|
||
prev_blend = previous.encoderinfo.get("blend")
|
||
if prev_disposal == Disposal.OP_PREVIOUS and len(im_frames) < 2:
|
||
prev_disposal = Disposal.OP_BACKGROUND
|
||
|
||
if prev_disposal == Disposal.OP_BACKGROUND:
|
||
base_im = previous.im.copy()
|
||
dispose = Image.core.fill("RGBA", im.size, (0, 0, 0, 0))
|
||
bbox = previous.bbox
|
||
if bbox:
|
||
dispose = dispose.crop(bbox)
|
||
else:
|
||
bbox = (0, 0) + im.size
|
||
base_im.paste(dispose, bbox)
|
||
elif prev_disposal == Disposal.OP_PREVIOUS:
|
||
base_im = im_frames[-2].im
|
||
else:
|
||
base_im = previous.im
|
||
delta = ImageChops.subtract_modulo(
|
||
im_frame.convert("RGBA"), base_im.convert("RGBA")
|
||
)
|
||
bbox = delta.getbbox(alpha_only=False)
|
||
if (
|
||
not bbox
|
||
and prev_disposal == encoderinfo.get("disposal")
|
||
and prev_blend == encoderinfo.get("blend")
|
||
and "duration" in encoderinfo
|
||
):
|
||
previous.encoderinfo["duration"] += encoderinfo["duration"]
|
||
continue
|
||
else:
|
||
bbox = None
|
||
im_frames.append(_Frame(im_frame, bbox, encoderinfo))
|
||
|
||
if len(im_frames) == 1 and not default_image:
|
||
return im_frames[0].im
|
||
|
||
# animation control
|
||
chunk(
|
||
fp,
|
||
b"acTL",
|
||
o32(len(im_frames)), # 0: num_frames
|
||
o32(loop), # 4: num_plays
|
||
)
|
||
|
||
# default image IDAT (if it exists)
|
||
if default_image:
|
||
if im.mode != mode:
|
||
im = im.convert(mode)
|
||
ImageFile._save(
|
||
im,
|
||
cast(IO[bytes], _idat(fp, chunk)),
|
||
[ImageFile._Tile("zip", (0, 0) + im.size, 0, rawmode)],
|
||
)
|
||
|
||
seq_num = 0
|
||
for frame, frame_data in enumerate(im_frames):
|
||
im_frame = frame_data.im
|
||
if not frame_data.bbox:
|
||
bbox = (0, 0) + im_frame.size
|
||
else:
|
||
bbox = frame_data.bbox
|
||
im_frame = im_frame.crop(bbox)
|
||
size = im_frame.size
|
||
encoderinfo = frame_data.encoderinfo
|
||
frame_duration = int(round(encoderinfo.get("duration", 0)))
|
||
frame_disposal = encoderinfo.get("disposal", disposal)
|
||
frame_blend = encoderinfo.get("blend", blend)
|
||
# frame control
|
||
chunk(
|
||
fp,
|
||
b"fcTL",
|
||
o32(seq_num), # sequence_number
|
||
o32(size[0]), # width
|
||
o32(size[1]), # height
|
||
o32(bbox[0]), # x_offset
|
||
o32(bbox[1]), # y_offset
|
||
o16(frame_duration), # delay_numerator
|
||
o16(1000), # delay_denominator
|
||
o8(frame_disposal), # dispose_op
|
||
o8(frame_blend), # blend_op
|
||
)
|
||
seq_num += 1
|
||
# frame data
|
||
if frame == 0 and not default_image:
|
||
# first frame must be in IDAT chunks for backwards compatibility
|
||
ImageFile._save(
|
||
im_frame,
|
||
cast(IO[bytes], _idat(fp, chunk)),
|
||
[ImageFile._Tile("zip", (0, 0) + im_frame.size, 0, rawmode)],
|
||
)
|
||
else:
|
||
fdat_chunks = _fdat(fp, chunk, seq_num)
|
||
ImageFile._save(
|
||
im_frame,
|
||
cast(IO[bytes], fdat_chunks),
|
||
[ImageFile._Tile("zip", (0, 0) + im_frame.size, 0, rawmode)],
|
||
)
|
||
seq_num = fdat_chunks.seq_num
|
||
return None
|
||
|
||
|
||
def _save_all(im: Image.Image, fp: IO[bytes], filename: str | bytes) -> None:
|
||
_save(im, fp, filename, save_all=True)
|
||
|
||
|
||
def _save(
|
||
im: Image.Image,
|
||
fp: IO[bytes],
|
||
filename: str | bytes,
|
||
chunk: Callable[..., None] = putchunk,
|
||
save_all: bool = False,
|
||
) -> None:
|
||
# save an image to disk (called by the save method)
|
||
|
||
if save_all:
|
||
default_image = im.encoderinfo.get(
|
||
"default_image", im.info.get("default_image")
|
||
)
|
||
modes = set()
|
||
sizes = set()
|
||
append_images = im.encoderinfo.get("append_images", [])
|
||
for im_seq in itertools.chain([im], append_images):
|
||
for im_frame in ImageSequence.Iterator(im_seq):
|
||
modes.add(im_frame.mode)
|
||
sizes.add(im_frame.size)
|
||
for mode in ("RGBA", "RGB", "P"):
|
||
if mode in modes:
|
||
break
|
||
else:
|
||
mode = modes.pop()
|
||
size = tuple(max(frame_size[i] for frame_size in sizes) for i in range(2))
|
||
else:
|
||
size = im.size
|
||
mode = im.mode
|
||
|
||
outmode = mode
|
||
if mode == "P":
|
||
#
|
||
# attempt to minimize storage requirements for palette images
|
||
if "bits" in im.encoderinfo:
|
||
# number of bits specified by user
|
||
colors = min(1 << im.encoderinfo["bits"], 256)
|
||
else:
|
||
# check palette contents
|
||
if im.palette:
|
||
colors = max(min(len(im.palette.getdata()[1]) // 3, 256), 1)
|
||
else:
|
||
colors = 256
|
||
|
||
if colors <= 16:
|
||
if colors <= 2:
|
||
bits = 1
|
||
elif colors <= 4:
|
||
bits = 2
|
||
else:
|
||
bits = 4
|
||
outmode += f";{bits}"
|
||
|
||
# encoder options
|
||
im.encoderconfig = (
|
||
im.encoderinfo.get("optimize", False),
|
||
im.encoderinfo.get("compress_level", -1),
|
||
im.encoderinfo.get("compress_type", -1),
|
||
im.encoderinfo.get("dictionary", b""),
|
||
)
|
||
|
||
# get the corresponding PNG mode
|
||
try:
|
||
rawmode, bit_depth, color_type = _OUTMODES[outmode]
|
||
except KeyError as e:
|
||
msg = f"cannot write mode {mode} as PNG"
|
||
raise OSError(msg) from e
|
||
|
||
#
|
||
# write minimal PNG file
|
||
|
||
fp.write(_MAGIC)
|
||
|
||
chunk(
|
||
fp,
|
||
b"IHDR",
|
||
o32(size[0]), # 0: size
|
||
o32(size[1]),
|
||
bit_depth,
|
||
color_type,
|
||
b"\0", # 10: compression
|
||
b"\0", # 11: filter category
|
||
b"\0", # 12: interlace flag
|
||
)
|
||
|
||
chunks = [b"cHRM", b"gAMA", b"sBIT", b"sRGB", b"tIME"]
|
||
|
||
icc = im.encoderinfo.get("icc_profile", im.info.get("icc_profile"))
|
||
if icc:
|
||
# ICC profile
|
||
# according to PNG spec, the iCCP chunk contains:
|
||
# Profile name 1-79 bytes (character string)
|
||
# Null separator 1 byte (null character)
|
||
# Compression method 1 byte (0)
|
||
# Compressed profile n bytes (zlib with deflate compression)
|
||
name = b"ICC Profile"
|
||
data = name + b"\0\0" + zlib.compress(icc)
|
||
chunk(fp, b"iCCP", data)
|
||
|
||
# You must either have sRGB or iCCP.
|
||
# Disallow sRGB chunks when an iCCP-chunk has been emitted.
|
||
chunks.remove(b"sRGB")
|
||
|
||
info = im.encoderinfo.get("pnginfo")
|
||
if info:
|
||
chunks_multiple_allowed = [b"sPLT", b"iTXt", b"tEXt", b"zTXt"]
|
||
for info_chunk in info.chunks:
|
||
cid, data = info_chunk[:2]
|
||
if cid in chunks:
|
||
chunks.remove(cid)
|
||
chunk(fp, cid, data)
|
||
elif cid in chunks_multiple_allowed:
|
||
chunk(fp, cid, data)
|
||
elif cid[1:2].islower():
|
||
# Private chunk
|
||
after_idat = len(info_chunk) == 3 and info_chunk[2]
|
||
if not after_idat:
|
||
chunk(fp, cid, data)
|
||
|
||
if im.mode == "P":
|
||
palette_byte_number = colors * 3
|
||
palette_bytes = im.im.getpalette("RGB")[:palette_byte_number]
|
||
while len(palette_bytes) < palette_byte_number:
|
||
palette_bytes += b"\0"
|
||
chunk(fp, b"PLTE", palette_bytes)
|
||
|
||
transparency = im.encoderinfo.get("transparency", im.info.get("transparency", None))
|
||
|
||
if transparency or transparency == 0:
|
||
if im.mode == "P":
|
||
# limit to actual palette size
|
||
alpha_bytes = colors
|
||
if isinstance(transparency, bytes):
|
||
chunk(fp, b"tRNS", transparency[:alpha_bytes])
|
||
else:
|
||
transparency = max(0, min(255, transparency))
|
||
alpha = b"\xFF" * transparency + b"\0"
|
||
chunk(fp, b"tRNS", alpha[:alpha_bytes])
|
||
elif im.mode in ("1", "L", "I", "I;16"):
|
||
transparency = max(0, min(65535, transparency))
|
||
chunk(fp, b"tRNS", o16(transparency))
|
||
elif im.mode == "RGB":
|
||
red, green, blue = transparency
|
||
chunk(fp, b"tRNS", o16(red) + o16(green) + o16(blue))
|
||
else:
|
||
if "transparency" in im.encoderinfo:
|
||
# don't bother with transparency if it's an RGBA
|
||
# and it's in the info dict. It's probably just stale.
|
||
msg = "cannot use transparency for this mode"
|
||
raise OSError(msg)
|
||
else:
|
||
if im.mode == "P" and im.im.getpalettemode() == "RGBA":
|
||
alpha = im.im.getpalette("RGBA", "A")
|
||
alpha_bytes = colors
|
||
chunk(fp, b"tRNS", alpha[:alpha_bytes])
|
||
|
||
dpi = im.encoderinfo.get("dpi")
|
||
if dpi:
|
||
chunk(
|
||
fp,
|
||
b"pHYs",
|
||
o32(int(dpi[0] / 0.0254 + 0.5)),
|
||
o32(int(dpi[1] / 0.0254 + 0.5)),
|
||
b"\x01",
|
||
)
|
||
|
||
if info:
|
||
chunks = [b"bKGD", b"hIST"]
|
||
for info_chunk in info.chunks:
|
||
cid, data = info_chunk[:2]
|
||
if cid in chunks:
|
||
chunks.remove(cid)
|
||
chunk(fp, cid, data)
|
||
|
||
exif = im.encoderinfo.get("exif")
|
||
if exif:
|
||
if isinstance(exif, Image.Exif):
|
||
exif = exif.tobytes(8)
|
||
if exif.startswith(b"Exif\x00\x00"):
|
||
exif = exif[6:]
|
||
chunk(fp, b"eXIf", exif)
|
||
|
||
single_im: Image.Image | None = im
|
||
if save_all:
|
||
single_im = _write_multiple_frames(
|
||
im, fp, chunk, mode, rawmode, default_image, append_images
|
||
)
|
||
if single_im:
|
||
ImageFile._save(
|
||
single_im,
|
||
cast(IO[bytes], _idat(fp, chunk)),
|
||
[ImageFile._Tile("zip", (0, 0) + single_im.size, 0, rawmode)],
|
||
)
|
||
|
||
if info:
|
||
for info_chunk in info.chunks:
|
||
cid, data = info_chunk[:2]
|
||
if cid[1:2].islower():
|
||
# Private chunk
|
||
after_idat = len(info_chunk) == 3 and info_chunk[2]
|
||
if after_idat:
|
||
chunk(fp, cid, data)
|
||
|
||
chunk(fp, b"IEND", b"")
|
||
|
||
if hasattr(fp, "flush"):
|
||
fp.flush()
|
||
|
||
|
||
# --------------------------------------------------------------------
|
||
# PNG chunk converter
|
||
|
||
|
||
def getchunks(im: Image.Image, **params: Any) -> list[tuple[bytes, bytes, bytes]]:
|
||
"""Return a list of PNG chunks representing this image."""
|
||
from io import BytesIO
|
||
|
||
chunks = []
|
||
|
||
def append(fp: IO[bytes], cid: bytes, *data: bytes) -> None:
|
||
byte_data = b"".join(data)
|
||
crc = o32(_crc32(byte_data, _crc32(cid)))
|
||
chunks.append((cid, byte_data, crc))
|
||
|
||
fp = BytesIO()
|
||
|
||
try:
|
||
im.encoderinfo = params
|
||
_save(im, fp, "", append)
|
||
finally:
|
||
del im.encoderinfo
|
||
|
||
return chunks
|
||
|
||
|
||
# --------------------------------------------------------------------
|
||
# Registry
|
||
|
||
Image.register_open(PngImageFile.format, PngImageFile, _accept)
|
||
Image.register_save(PngImageFile.format, _save)
|
||
Image.register_save_all(PngImageFile.format, _save_all)
|
||
|
||
Image.register_extensions(PngImageFile.format, [".png", ".apng"])
|
||
|
||
Image.register_mime(PngImageFile.format, "image/png")
|