Added type hints

This commit is contained in:
Andrew Murray 2024-01-19 21:50:27 +11:00
parent 6dc6d6d71f
commit 1d63cffdad
11 changed files with 40 additions and 29 deletions

View File

@ -4,7 +4,7 @@ from PIL import Image
TEST_FILE = "Tests/images/fli_overflow.fli" TEST_FILE = "Tests/images/fli_overflow.fli"
def test_fli_overflow(): def test_fli_overflow() -> None:
# this should not crash with a malloc error or access violation # this should not crash with a malloc error or access violation
with Image.open(TEST_FILE) as im: with Image.open(TEST_FILE) as im:
im.load() im.load()

View File

@ -2,6 +2,8 @@
from __future__ import annotations from __future__ import annotations
import pytest import pytest
from typing import Any, Callable
from PIL import Image from PIL import Image
from .helper import is_win32 from .helper import is_win32
@ -12,31 +14,34 @@ max_iterations = 10000
pytestmark = pytest.mark.skipif(is_win32(), reason="requires Unix or macOS") pytestmark = pytest.mark.skipif(is_win32(), reason="requires Unix or macOS")
def _get_mem_usage(): def _get_mem_usage() -> float:
from resource import RUSAGE_SELF, getpagesize, getrusage from resource import RUSAGE_SELF, getpagesize, getrusage
mem = getrusage(RUSAGE_SELF).ru_maxrss mem = getrusage(RUSAGE_SELF).ru_maxrss
return mem * getpagesize() / 1024 / 1024 return mem * getpagesize() / 1024 / 1024
def _test_leak(min_iterations, max_iterations, fn, *args, **kwargs): def _test_leak(
min_iterations: int, max_iterations: int, fn: Callable[..., None], *args: Any
) -> None:
mem_limit = None mem_limit = None
for i in range(max_iterations): for i in range(max_iterations):
fn(*args, **kwargs) fn(*args)
mem = _get_mem_usage() mem = _get_mem_usage()
if i < min_iterations: if i < min_iterations:
mem_limit = mem + 1 mem_limit = mem + 1
continue continue
msg = f"memory usage limit exceeded after {i + 1} iterations" msg = f"memory usage limit exceeded after {i + 1} iterations"
assert mem_limit is not None
assert mem <= mem_limit, msg assert mem <= mem_limit, msg
def test_leak_putdata(): def test_leak_putdata() -> None:
im = Image.new("RGB", (25, 25)) im = Image.new("RGB", (25, 25))
_test_leak(min_iterations, max_iterations, im.putdata, im.getdata()) _test_leak(min_iterations, max_iterations, im.putdata, im.getdata())
def test_leak_getlist(): def test_leak_getlist() -> None:
im = Image.new("P", (25, 25)) im = Image.new("P", (25, 25))
_test_leak( _test_leak(
min_iterations, min_iterations,

View File

@ -19,7 +19,7 @@ pytestmark = [
] ]
def test_leak_load(): def test_leak_load() -> None:
from resource import RLIMIT_AS, RLIMIT_STACK, setrlimit from resource import RLIMIT_AS, RLIMIT_STACK, setrlimit
setrlimit(RLIMIT_STACK, (stack_size, stack_size)) setrlimit(RLIMIT_STACK, (stack_size, stack_size))
@ -29,7 +29,7 @@ def test_leak_load():
im.load() im.load()
def test_leak_save(): def test_leak_save() -> None:
from resource import RLIMIT_AS, RLIMIT_STACK, setrlimit from resource import RLIMIT_AS, RLIMIT_STACK, setrlimit
setrlimit(RLIMIT_STACK, (stack_size, stack_size)) setrlimit(RLIMIT_STACK, (stack_size, stack_size))

View File

@ -1,10 +1,11 @@
from __future__ import annotations from __future__ import annotations
from pathlib import PosixPath
import pytest import pytest
from PIL import Image from PIL import Image
def test_j2k_overflow(tmp_path): def test_j2k_overflow(tmp_path: PosixPath) -> None:
im = Image.new("RGBA", (1024, 131584)) im = Image.new("RGBA", (1024, 131584))
target = str(tmp_path / "temp.jpc") target = str(tmp_path / "temp.jpc")
with pytest.raises(OSError): with pytest.raises(OSError):

View File

@ -110,14 +110,14 @@ standard_chrominance_qtable = (
[standard_l_qtable, standard_chrominance_qtable], [standard_l_qtable, standard_chrominance_qtable],
), ),
) )
def test_qtables_leak(qtables): def test_qtables_leak(qtables: tuple[tuple[int, ...]] | list[tuple[int, ...]]) -> None:
im = hopper("RGB") im = hopper("RGB")
for _ in range(iterations): for _ in range(iterations):
test_output = BytesIO() test_output = BytesIO()
im.save(test_output, "JPEG", qtables=qtables) im.save(test_output, "JPEG", qtables=qtables)
def test_exif_leak(): def test_exif_leak() -> None:
""" """
pre patch: pre patch:
@ -180,7 +180,7 @@ def test_exif_leak():
im.save(test_output, "JPEG", exif=exif) im.save(test_output, "JPEG", exif=exif)
def test_base_save(): def test_base_save() -> None:
""" """
base case: base case:
MB MB

View File

@ -1,5 +1,7 @@
from __future__ import annotations from __future__ import annotations
from pathlib import PosixPath
import sys import sys
from types import ModuleType
import pytest import pytest
@ -15,6 +17,7 @@ from PIL import Image
# 2.7 and 3.2. # 2.7 and 3.2.
numpy: ModuleType | None
try: try:
import numpy import numpy
except ImportError: except ImportError:
@ -27,23 +30,24 @@ XDIM = 48000
pytestmark = pytest.mark.skipif(sys.maxsize <= 2**32, reason="requires 64-bit system") pytestmark = pytest.mark.skipif(sys.maxsize <= 2**32, reason="requires 64-bit system")
def _write_png(tmp_path, xdim, ydim): def _write_png(tmp_path: PosixPath, xdim: int, ydim: int) -> None:
f = str(tmp_path / "temp.png") f = str(tmp_path / "temp.png")
im = Image.new("L", (xdim, ydim), 0) im = Image.new("L", (xdim, ydim), 0)
im.save(f) im.save(f)
def test_large(tmp_path): def test_large(tmp_path: PosixPath) -> None:
"""succeeded prepatch""" """succeeded prepatch"""
_write_png(tmp_path, XDIM, YDIM) _write_png(tmp_path, XDIM, YDIM)
def test_2gpx(tmp_path): def test_2gpx(tmp_path: PosixPath) -> None:
"""failed prepatch""" """failed prepatch"""
_write_png(tmp_path, XDIM, XDIM) _write_png(tmp_path, XDIM, XDIM)
@pytest.mark.skipif(numpy is None, reason="Numpy is not installed") @pytest.mark.skipif(numpy is None, reason="Numpy is not installed")
def test_size_greater_than_int(): def test_size_greater_than_int() -> None:
assert numpy is not None
arr = numpy.ndarray(shape=(16394, 16394)) arr = numpy.ndarray(shape=(16394, 16394))
Image.fromarray(arr) Image.fromarray(arr)

View File

@ -1,4 +1,5 @@
from __future__ import annotations from __future__ import annotations
from pathlib import PosixPath
import sys import sys
import pytest import pytest
@ -23,7 +24,7 @@ XDIM = 48000
pytestmark = pytest.mark.skipif(sys.maxsize <= 2**32, reason="requires 64-bit system") pytestmark = pytest.mark.skipif(sys.maxsize <= 2**32, reason="requires 64-bit system")
def _write_png(tmp_path, xdim, ydim): def _write_png(tmp_path: PosixPath, xdim: int, ydim: int) -> None:
dtype = np.uint8 dtype = np.uint8
a = np.zeros((xdim, ydim), dtype=dtype) a = np.zeros((xdim, ydim), dtype=dtype)
f = str(tmp_path / "temp.png") f = str(tmp_path / "temp.png")
@ -31,11 +32,11 @@ def _write_png(tmp_path, xdim, ydim):
im.save(f) im.save(f)
def test_large(tmp_path): def test_large(tmp_path: PosixPath) -> None:
"""succeeded prepatch""" """succeeded prepatch"""
_write_png(tmp_path, XDIM, YDIM) _write_png(tmp_path, XDIM, YDIM)
def test_2gpx(tmp_path): def test_2gpx(tmp_path: PosixPath) -> None:
"""failed prepatch""" """failed prepatch"""
_write_png(tmp_path, XDIM, XDIM) _write_png(tmp_path, XDIM, XDIM)

View File

@ -6,7 +6,7 @@ from PIL import Image
TEST_FILE = "Tests/images/libtiff_segfault.tif" TEST_FILE = "Tests/images/libtiff_segfault.tif"
def test_libtiff_segfault(): def test_libtiff_segfault() -> None:
"""This test should not segfault. It will on Pillow <= 3.1.0 and """This test should not segfault. It will on Pillow <= 3.1.0 and
libtiff >= 4.0.0 libtiff >= 4.0.0
""" """

View File

@ -7,7 +7,7 @@ from PIL import Image, ImageFile, PngImagePlugin
TEST_FILE = "Tests/images/png_decompression_dos.png" TEST_FILE = "Tests/images/png_decompression_dos.png"
def test_ignore_dos_text(): def test_ignore_dos_text() -> None:
ImageFile.LOAD_TRUNCATED_IMAGES = True ImageFile.LOAD_TRUNCATED_IMAGES = True
try: try:
@ -23,7 +23,7 @@ def test_ignore_dos_text():
assert len(s) < 1024 * 1024, "Text chunk larger than 1M" assert len(s) < 1024 * 1024, "Text chunk larger than 1M"
def test_dos_text(): def test_dos_text() -> None:
try: try:
im = Image.open(TEST_FILE) im = Image.open(TEST_FILE)
im.load() im.load()
@ -35,7 +35,7 @@ def test_dos_text():
assert len(s) < 1024 * 1024, "Text chunk larger than 1M" assert len(s) < 1024 * 1024, "Text chunk larger than 1M"
def test_dos_total_memory(): def test_dos_total_memory() -> None:
im = Image.new("L", (1, 1)) im = Image.new("L", (1, 1))
compressed_data = zlib.compress(b"a" * 1024 * 1023) compressed_data = zlib.compress(b"a" * 1024 * 1023)
@ -52,7 +52,7 @@ def test_dos_total_memory():
try: try:
im2 = Image.open(b) im2 = Image.open(b)
except ValueError as msg: except ValueError as msg:
assert "Too much memory" in msg assert "Too much memory" in str(msg)
return return
total_len = 0 total_len = 0

View File

@ -4,7 +4,7 @@ import sys
from PIL import features from PIL import features
def test_wheel_modules(): def test_wheel_modules() -> None:
expected_modules = {"pil", "tkinter", "freetype2", "littlecms2", "webp"} expected_modules = {"pil", "tkinter", "freetype2", "littlecms2", "webp"}
# tkinter is not available in cibuildwheel installed CPython on Windows # tkinter is not available in cibuildwheel installed CPython on Windows
@ -18,13 +18,13 @@ def test_wheel_modules():
assert set(features.get_supported_modules()) == expected_modules assert set(features.get_supported_modules()) == expected_modules
def test_wheel_codecs(): def test_wheel_codecs() -> None:
expected_codecs = {"jpg", "jpg_2000", "zlib", "libtiff"} expected_codecs = {"jpg", "jpg_2000", "zlib", "libtiff"}
assert set(features.get_supported_codecs()) == expected_codecs assert set(features.get_supported_codecs()) == expected_codecs
def test_wheel_features(): def test_wheel_features() -> None:
expected_features = { expected_features = {
"webp_anim", "webp_anim",
"webp_mux", "webp_mux",

View File

@ -238,7 +238,7 @@ def tostring(im, string_format, **options):
return out.getvalue() return out.getvalue()
def hopper(mode=None, cache={}): def hopper(mode: str | None = None, cache: dict[str, Image.Image] = {}) -> Image.Image:
if mode is None: if mode is None:
# Always return fresh not-yet-loaded version of image. # Always return fresh not-yet-loaded version of image.
# Operations on not-yet-loaded images is separate class of errors # Operations on not-yet-loaded images is separate class of errors
@ -323,7 +323,7 @@ def is_ppc64le():
return platform.machine() == "ppc64le" return platform.machine() == "ppc64le"
def is_win32(): def is_win32() -> bool:
return sys.platform.startswith("win32") return sys.platform.startswith("win32")