Fixed series of tuples as advanced argument (#15)

* Removed check_avif_leaks.py

* Removed _VALID_AVIF_MODES

* Fixed series of tuples as advanced argument

* Do not pass advanced values to C as bytes

* Simplified code

* Reuse size

* Destroy image on failure

* Rearranged image settings

* Fixed typo

* Test roundtrip colors from premultiplied alpha

---------

Co-authored-by: Andrew Murray <radarhere@users.noreply.github.com>
This commit is contained in:
Andrew Murray 2025-01-16 05:19:55 +11:00 committed by GitHub
parent 29c158d369
commit 4c63ea6186
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 60 additions and 123 deletions

View File

@ -1,43 +0,0 @@
from __future__ import annotations
from io import BytesIO
import pytest
from PIL import Image
from .helper import is_win32, skip_unless_feature
# Limits for testing the leak
mem_limit = 1024 * 1048576
stack_size = 8 * 1048576
iterations = int((mem_limit / stack_size) * 2)
test_file = "Tests/images/avif/hopper.avif"
pytestmark = [
pytest.mark.skipif(is_win32(), reason="requires Unix or macOS"),
skip_unless_feature("avif"),
]
def test_leak_load() -> None:
from resource import RLIMIT_AS, RLIMIT_STACK, setrlimit
setrlimit(RLIMIT_STACK, (stack_size, stack_size))
setrlimit(RLIMIT_AS, (mem_limit, mem_limit))
for _ in range(iterations):
with Image.open(test_file) as im:
im.load()
def test_leak_save() -> None:
from resource import RLIMIT_AS, RLIMIT_STACK, setrlimit
setrlimit(RLIMIT_STACK, (stack_size, stack_size))
setrlimit(RLIMIT_AS, (mem_limit, mem_limit))
for _ in range(iterations):
test_output = BytesIO()
with Image.open(test_file) as im:
im.save(test_output, "AVIF")
test_output.seek(0)
test_output.read()

View File

@ -8,7 +8,6 @@ from collections.abc import Generator
from contextlib import contextmanager from contextlib import contextmanager
from io import BytesIO from io import BytesIO
from pathlib import Path from pathlib import Path
from struct import unpack
from typing import Any from typing import Any
import pytest import pytest
@ -75,39 +74,6 @@ def is_docker_qemu() -> bool:
return "qemu" in init_proc_exe return "qemu" in init_proc_exe
def has_alpha_premultiplied(im_bytes: bytes) -> bool:
stream = BytesIO(im_bytes)
length = len(im_bytes)
while stream.tell() < length:
start = stream.tell()
size, boxtype = unpack(">L4s", stream.read(8))
if not all(0x20 <= c <= 0x7E for c in boxtype):
# Not ascii
return False
if size == 1: # 64bit size
(size,) = unpack(">Q", stream.read(8))
end = start + size
version, _ = unpack(">B3s", stream.read(4))
if boxtype in (b"ftyp", b"hdlr", b"pitm", b"iloc", b"iinf"):
# Skip these boxes
stream.seek(end)
continue
elif boxtype == b"meta":
# Container box possibly including iref prem, continue to parse boxes
# inside it
continue
elif boxtype == b"iref":
while stream.tell() < end:
_, iref_type = unpack(">L4s", stream.read(8))
version, _ = unpack(">B3s", stream.read(4))
if iref_type == b"prem":
return True
stream.read(2 if version == 0 else 4)
else:
return False
return False
class TestUnsupportedAvif: class TestUnsupportedAvif:
def test_unsupported(self, monkeypatch: pytest.MonkeyPatch) -> None: def test_unsupported(self, monkeypatch: pytest.MonkeyPatch) -> None:
monkeypatch.setattr(AvifImagePlugin, "SUPPORTED", False) monkeypatch.setattr(AvifImagePlugin, "SUPPORTED", False)
@ -170,10 +136,8 @@ class TestFileAvif:
# difference between the two images is less than the epsilon value, # difference between the two images is less than the epsilon value,
# then we're going to accept that it's a reasonable lossy version of # then we're going to accept that it's a reasonable lossy version of
# the image. # the image.
target = hopper(mode) expected = hopper()
if mode != "RGB": assert_image_similar(image, expected, epsilon)
target = target.convert("RGB")
assert_image_similar(image, target, epsilon)
def test_write_rgb(self, tmp_path: Path) -> None: def test_write_rgb(self, tmp_path: Path) -> None:
""" """
@ -479,7 +443,19 @@ class TestFileAvif:
@skip_unless_avif_encoder("aom") @skip_unless_avif_encoder("aom")
@skip_unless_feature("avif") @skip_unless_feature("avif")
def test_encoder_advanced_codec_options(self) -> None: @pytest.mark.parametrize(
"advanced",
[
{
"aq-mode": "1",
"enable-chroma-deltaq": "1",
},
(("aq-mode", "1"), ("enable-chroma-deltaq", "1")),
],
)
def test_encoder_advanced_codec_options(
self, advanced: dict[str, str] | tuple[tuple[str, str], ...]
) -> None:
with Image.open(TEST_AVIF_FILE) as im: with Image.open(TEST_AVIF_FILE) as im:
ctrl_buf = BytesIO() ctrl_buf = BytesIO()
im.save(ctrl_buf, "AVIF", codec="aom") im.save(ctrl_buf, "AVIF", codec="aom")
@ -488,10 +464,7 @@ class TestFileAvif:
test_buf, test_buf,
"AVIF", "AVIF",
codec="aom", codec="aom",
advanced={ advanced=advanced,
"aq-mode": "1",
"enable-chroma-deltaq": "1",
},
) )
assert ctrl_buf.getvalue() != test_buf.getvalue() assert ctrl_buf.getvalue() != test_buf.getvalue()
@ -699,13 +672,18 @@ class TestAvifAnimation:
with Image.open("Tests/images/avif/rgba10.heif"): with Image.open("Tests/images/avif/rgba10.heif"):
pass pass
@pytest.mark.parametrize("alpha_premultipled", [False, True]) @pytest.mark.parametrize("alpha_premultiplied", [False, True])
def test_alpha_premultiplied_true(self, alpha_premultipled: bool) -> None: def test_alpha_premultiplied(
im = Image.new("RGBA", (10, 10), (0, 0, 0, 0)) self, tmp_path: Path, alpha_premultiplied: bool
im_buf = BytesIO() ) -> None:
im.save(im_buf, "AVIF", alpha_premultiplied=alpha_premultipled) temp_file = str(tmp_path / "temp.avif")
im_bytes = im_buf.getvalue() color = (200, 200, 200, 1)
assert has_alpha_premultiplied(im_bytes) is alpha_premultipled im = Image.new("RGBA", (1, 1), color)
im.save(temp_file, alpha_premultiplied=alpha_premultiplied)
expected = (255, 255, 255, 1) if alpha_premultiplied else color
with Image.open(temp_file) as reloaded:
assert reloaded.getpixel((0, 0)) == expected
def test_timestamp_and_duration(self, tmp_path: Path) -> None: def test_timestamp_and_duration(self, tmp_path: Path) -> None:
""" """

View File

@ -18,8 +18,6 @@ except ImportError:
DECODE_CODEC_CHOICE = "auto" DECODE_CODEC_CHOICE = "auto"
DEFAULT_MAX_THREADS = 0 DEFAULT_MAX_THREADS = 0
_VALID_AVIF_MODES = {"RGB", "RGBA"}
def _accept(prefix: bytes) -> bool | str: def _accept(prefix: bytes) -> bool | str:
if prefix[4:8] != b"ftyp": if prefix[4:8] != b"ftyp":
@ -41,8 +39,7 @@ def _accept(prefix: bytes) -> bool | str:
): ):
if not SUPPORTED: if not SUPPORTED:
return ( return (
"image file could not be identified because AVIF " "image file could not be identified because AVIF support not installed"
"support not installed"
) )
return True return True
return False return False
@ -63,9 +60,6 @@ class AvifImageFile(ImageFile.ImageFile):
__loaded = -1 __loaded = -1
__frame = 0 __frame = 0
def load_seek(self, pos: int) -> None:
pass
def _open(self) -> None: def _open(self) -> None:
if not SUPPORTED: if not SUPPORTED:
msg = ( msg = (
@ -136,6 +130,9 @@ class AvifImageFile(ImageFile.ImageFile):
return super().load() return super().load()
def load_seek(self, pos: int) -> None:
pass
def tell(self) -> int: def tell(self) -> int:
return self.__frame return self.__frame
@ -200,24 +197,21 @@ def _save(
xmp = xmp.encode("utf-8") xmp = xmp.encode("utf-8")
advanced = info.get("advanced") advanced = info.get("advanced")
if isinstance(advanced, dict):
advanced = tuple([k, v] for (k, v) in advanced.items())
if advanced is not None: if advanced is not None:
if isinstance(advanced, dict):
advanced = advanced.items()
try: try:
advanced = tuple(advanced) advanced = tuple(advanced)
except TypeError: except TypeError:
invalid = True invalid = True
else: else:
invalid = all(isinstance(v, tuple) and len(v) == 2 for v in advanced) invalid = any(not isinstance(v, tuple) or len(v) != 2 for v in advanced)
if invalid: if invalid:
msg = ( msg = (
"advanced codec options must be a dict of key-value string " "advanced codec options must be a dict of key-value string "
"pairs or a series of key-value two-tuples" "pairs or a series of key-value two-tuples"
) )
raise ValueError(msg) raise ValueError(msg)
advanced = tuple(
(str(k).encode("utf-8"), str(v).encode("utf-8")) for k, v in advanced
)
# Setup the AVIF encoder # Setup the AVIF encoder
enc = _avif.AvifEncoder( enc = _avif.AvifEncoder(
@ -257,7 +251,7 @@ def _save(
# Make sure image mode is supported # Make sure image mode is supported
frame = ims frame = ims
rawmode = ims.mode rawmode = ims.mode
if ims.mode not in _VALID_AVIF_MODES: if ims.mode not in {"RGB", "RGBA"}:
rawmode = "RGBA" if ims.has_transparency_data else "RGB" rawmode = "RGBA" if ims.has_transparency_data else "RGB"
frame = ims.convert(rawmode) frame = ims.convert(rawmode)

View File

@ -188,7 +188,6 @@ static int
_add_codec_specific_options(avifEncoder *encoder, PyObject *opts) { _add_codec_specific_options(avifEncoder *encoder, PyObject *opts) {
Py_ssize_t i, size; Py_ssize_t i, size;
PyObject *keyval, *py_key, *py_val; PyObject *keyval, *py_key, *py_val;
char *key, *val;
if (!PyTuple_Check(opts)) { if (!PyTuple_Check(opts)) {
PyErr_SetString(PyExc_ValueError, "Invalid advanced codec options"); PyErr_SetString(PyExc_ValueError, "Invalid advanced codec options");
return 1; return 1;
@ -203,12 +202,16 @@ _add_codec_specific_options(avifEncoder *encoder, PyObject *opts) {
} }
py_key = PyTuple_GetItem(keyval, 0); py_key = PyTuple_GetItem(keyval, 0);
py_val = PyTuple_GetItem(keyval, 1); py_val = PyTuple_GetItem(keyval, 1);
if (!PyBytes_Check(py_key) || !PyBytes_Check(py_val)) { if (!PyUnicode_Check(py_key) || !PyUnicode_Check(py_val)) {
PyErr_SetString(PyExc_ValueError, "Invalid advanced codec options");
return 1;
}
const char *key = PyUnicode_AsUTF8(py_key);
const char *val = PyUnicode_AsUTF8(py_val);
if (key == NULL || val == NULL) {
PyErr_SetString(PyExc_ValueError, "Invalid advanced codec options"); PyErr_SetString(PyExc_ValueError, "Invalid advanced codec options");
return 1; return 1;
} }
key = PyBytes_AsString(py_key);
val = PyBytes_AsString(py_val);
avifResult result = avifEncoderSetCodecSpecificOption(encoder, key, val); avifResult result = avifEncoderSetCodecSpecificOption(encoder, key, val);
if (result != AVIF_RESULT_OK) { if (result != AVIF_RESULT_OK) {
@ -286,6 +289,7 @@ AvifEncoderNew(PyObject *self_, PyObject *args) {
image->yuvRange = AVIF_RANGE_LIMITED; image->yuvRange = AVIF_RANGE_LIMITED;
} else { } else {
PyErr_SetString(PyExc_ValueError, "Invalid range"); PyErr_SetString(PyExc_ValueError, "Invalid range");
avifImageDestroy(image);
return NULL; return NULL;
} }
if (strcmp(subsampling, "4:0:0") == 0) { if (strcmp(subsampling, "4:0:0") == 0) {
@ -298,13 +302,10 @@ AvifEncoderNew(PyObject *self_, PyObject *args) {
image->yuvFormat = AVIF_PIXEL_FORMAT_YUV444; image->yuvFormat = AVIF_PIXEL_FORMAT_YUV444;
} else { } else {
PyErr_Format(PyExc_ValueError, "Invalid subsampling: %s", subsampling); PyErr_Format(PyExc_ValueError, "Invalid subsampling: %s", subsampling);
avifImageDestroy(image);
return NULL; return NULL;
} }
image->colorPrimaries = AVIF_COLOR_PRIMARIES_UNSPECIFIED;
image->transferCharacteristics = AVIF_TRANSFER_CHARACTERISTICS_UNSPECIFIED;
image->matrixCoefficients = AVIF_MATRIX_COEFFICIENTS_BT601;
// Validate canvas dimensions // Validate canvas dimensions
if (width <= 0 || height <= 0) { if (width <= 0 || height <= 0) {
PyErr_SetString(PyExc_ValueError, "invalid canvas dimensions"); PyErr_SetString(PyExc_ValueError, "invalid canvas dimensions");
@ -387,12 +388,13 @@ AvifEncoderNew(PyObject *self_, PyObject *args) {
self->xmp_bytes = NULL; self->xmp_bytes = NULL;
avifResult result; avifResult result;
if (PyBytes_GET_SIZE(icc_bytes)) { Py_ssize_t size = PyBytes_GET_SIZE(icc_bytes);
if (size) {
self->icc_bytes = icc_bytes; self->icc_bytes = icc_bytes;
Py_INCREF(icc_bytes); Py_INCREF(icc_bytes);
result = avifImageSetProfileICC( result = avifImageSetProfileICC(
image, (uint8_t *)PyBytes_AS_STRING(icc_bytes), PyBytes_GET_SIZE(icc_bytes) image, (uint8_t *)PyBytes_AS_STRING(icc_bytes), size
); );
if (result != AVIF_RESULT_OK) { if (result != AVIF_RESULT_OK) {
PyErr_Format( PyErr_Format(
@ -406,19 +408,23 @@ AvifEncoderNew(PyObject *self_, PyObject *args) {
PyObject_Del(self); PyObject_Del(self);
return NULL; return NULL;
} }
// colorPrimaries and transferCharacteristics are ignored when an ICC
// profile is present, so set them to UNSPECIFIED.
image->colorPrimaries = AVIF_COLOR_PRIMARIES_UNSPECIFIED;
image->transferCharacteristics = AVIF_TRANSFER_CHARACTERISTICS_UNSPECIFIED;
} else { } else {
image->colorPrimaries = AVIF_COLOR_PRIMARIES_BT709; image->colorPrimaries = AVIF_COLOR_PRIMARIES_BT709;
image->transferCharacteristics = AVIF_TRANSFER_CHARACTERISTICS_SRGB; image->transferCharacteristics = AVIF_TRANSFER_CHARACTERISTICS_SRGB;
} }
image->matrixCoefficients = AVIF_MATRIX_COEFFICIENTS_BT601;
if (PyBytes_GET_SIZE(exif_bytes)) { size = PyBytes_GET_SIZE(exif_bytes);
if (size) {
self->exif_bytes = exif_bytes; self->exif_bytes = exif_bytes;
Py_INCREF(exif_bytes); Py_INCREF(exif_bytes);
result = avifImageSetMetadataExif( result = avifImageSetMetadataExif(
image, image, (uint8_t *)PyBytes_AS_STRING(exif_bytes), size
(uint8_t *)PyBytes_AS_STRING(exif_bytes),
PyBytes_GET_SIZE(exif_bytes)
); );
if (result != AVIF_RESULT_OK) { if (result != AVIF_RESULT_OK) {
PyErr_Format( PyErr_Format(
@ -434,12 +440,14 @@ AvifEncoderNew(PyObject *self_, PyObject *args) {
return NULL; return NULL;
} }
} }
if (PyBytes_GET_SIZE(xmp_bytes)) {
size = PyBytes_GET_SIZE(xmp_bytes);
if (size) {
self->xmp_bytes = xmp_bytes; self->xmp_bytes = xmp_bytes;
Py_INCREF(xmp_bytes); Py_INCREF(xmp_bytes);
result = avifImageSetMetadataXMP( result = avifImageSetMetadataXMP(
image, (uint8_t *)PyBytes_AS_STRING(xmp_bytes), PyBytes_GET_SIZE(xmp_bytes) image, (uint8_t *)PyBytes_AS_STRING(xmp_bytes), size
); );
if (result != AVIF_RESULT_OK) { if (result != AVIF_RESULT_OK) {
PyErr_Format( PyErr_Format(