Merge pull request #8228 from radarhere/type_hint

This commit is contained in:
Hugo van Kemenade 2024-07-13 11:09:48 +02:00 committed by GitHub
commit 8c9eea9d07
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 79 additions and 73 deletions

View File

@ -41,7 +41,7 @@ MAGIC = PngImagePlugin._MAGIC
def chunk(cid: bytes, *data: bytes) -> bytes:
test_file = BytesIO()
PngImagePlugin.putchunk(*(test_file, cid) + data)
PngImagePlugin.putchunk(test_file, cid, *data)
return test_file.getvalue()

View File

@ -127,7 +127,7 @@ class TestImageFile:
def test_raise_typeerror(self) -> None:
with pytest.raises(TypeError):
parser = ImageFile.Parser()
parser.feed(1)
parser.feed(1) # type: ignore[arg-type]
def test_negative_stride(self) -> None:
with open("Tests/images/raw_negative_stride.bin", "rb") as f:
@ -305,7 +305,7 @@ class TestPyDecoder(CodecsTest):
im.load()
def test_decode(self) -> None:
decoder = ImageFile.PyDecoder(None)
decoder = ImageFile.PyDecoder("")
with pytest.raises(NotImplementedError):
decoder.decode(b"")
@ -383,7 +383,7 @@ class TestPyEncoder(CodecsTest):
)
def test_encode(self) -> None:
encoder = ImageFile.PyEncoder(None)
encoder = ImageFile.PyEncoder("")
with pytest.raises(NotImplementedError):
encoder.encode(0)

View File

@ -86,7 +86,7 @@ def raise_oserror(error: int) -> OSError:
raise _get_oserror(error, encoder=False)
def _tilesort(t):
def _tilesort(t) -> int:
# sort on offset
return t[2]
@ -161,7 +161,7 @@ class ImageFile(Image.Image):
return Image.MIME.get(self.format.upper())
return None
def __setstate__(self, state):
def __setstate__(self, state) -> None:
self.tile = []
super().__setstate__(state)
@ -333,14 +333,14 @@ class ImageFile(Image.Image):
# def load_read(self, read_bytes: int) -> bytes:
# pass
def _seek_check(self, frame):
def _seek_check(self, frame: int) -> bool:
if (
frame < self._min_frame
# Only check upper limit on frames if additional seek operations
# are not required to do so
or (
not (hasattr(self, "_n_frames") and self._n_frames is None)
and frame >= self.n_frames + self._min_frame
and frame >= getattr(self, "n_frames") + self._min_frame
)
):
msg = "attempt to seek outside sequence"
@ -370,7 +370,7 @@ class StubImageFile(ImageFile):
msg = "StubImageFile subclass must implement _open"
raise NotImplementedError(msg)
def load(self):
def load(self) -> Image.core.PixelAccess | None:
loader = self._load()
if loader is None:
msg = f"cannot find loader for this {self.format} file"
@ -378,7 +378,7 @@ class StubImageFile(ImageFile):
image = loader.load(self)
assert image is not None
# become the other object (!)
self.__class__ = image.__class__
self.__class__ = image.__class__ # type: ignore[assignment]
self.__dict__ = image.__dict__
return image.load()
@ -396,8 +396,8 @@ class Parser:
incremental = None
image: Image.Image | None = None
data = None
decoder = None
data: bytes | None = None
decoder: Image.core.ImagingDecoder | PyDecoder | None = None
offset = 0
finished = 0
@ -409,7 +409,7 @@ class Parser:
"""
assert self.data is None, "cannot reuse parsers"
def feed(self, data):
def feed(self, data: bytes) -> None:
"""
(Consumer) Feed data to the parser.
@ -491,7 +491,7 @@ class Parser:
def __exit__(self, *args: object) -> None:
self.close()
def close(self):
def close(self) -> Image.Image:
"""
(Consumer) Close the stream.
@ -525,7 +525,7 @@ class Parser:
# --------------------------------------------------------------------
def _save(im, fp, tile, bufsize=0) -> None:
def _save(im, fp, tile, bufsize: int = 0) -> None:
"""Helper to save image based on tile list
:param im: Image object.
@ -553,7 +553,7 @@ def _save(im, fp, tile, bufsize=0) -> None:
fp.flush()
def _encode_tile(im, fp, tile: list[_Tile], bufsize, fh, exc=None):
def _encode_tile(im, fp, tile: list[_Tile], bufsize: int, fh, exc=None) -> None:
for encoder_name, extents, offset, args in tile:
if offset > 0:
fp.seek(offset)
@ -629,18 +629,18 @@ class PyCodecState:
class PyCodec:
fd: IO[bytes] | None
def __init__(self, mode, *args):
self.im = None
def __init__(self, mode: str, *args: Any) -> None:
self.im: Image.core.ImagingCore | None = None
self.state = PyCodecState()
self.fd = None
self.mode = mode
self.init(args)
def init(self, args) -> None:
def init(self, args: tuple[Any, ...]) -> None:
"""
Override to perform codec specific initialization
:param args: Array of args items from the tile entry
:param args: Tuple of arg items from the tile entry
:returns: None
"""
self.args = args
@ -662,7 +662,7 @@ class PyCodec:
"""
self.fd = fd
def setimage(self, im, extents: tuple[int, int, int, int] | None = None) -> None:
def setimage(self, im, extents=None):
"""
Called from ImageFile to set the core output image for the codec

View File

@ -33,7 +33,7 @@ class Iterator:
:param im: An image object.
"""
def __init__(self, im: Image.Image):
def __init__(self, im: Image.Image) -> None:
if not hasattr(im, "seek"):
msg = "im must have seek method"
raise AttributeError(msg)

View File

@ -827,11 +827,11 @@ def _save_cjpeg(im: Image.Image, fp: IO[bytes], filename: str | bytes) -> None:
##
# Factory for making JPEG and MPO instances
def jpeg_factory(fp=None, filename=None):
def jpeg_factory(fp: IO[bytes] | None = None, filename: str | bytes | None = None):
im = JpegImageFile(fp, filename)
try:
mpheader = im._getmp()
if mpheader[45057] > 1:
if mpheader is not None and mpheader[45057] > 1:
for segment, content in im.applist:
if segment == "APP1" and b' hdrgm:Version="' in content:
# Ultra HDR images are not yet supported

View File

@ -174,12 +174,15 @@ def _write_image(im, filename, existing_pdf, image_refs):
return image_ref, procset
def _save(im, fp, filename, save_all=False):
def _save(
im: Image.Image, fp: IO[bytes], filename: str | bytes, save_all: bool = False
) -> None:
is_appending = im.encoderinfo.get("append", False)
filename_str = filename.decode() if isinstance(filename, bytes) else filename
if is_appending:
existing_pdf = PdfParser.PdfParser(f=fp, filename=filename, mode="r+b")
existing_pdf = PdfParser.PdfParser(f=fp, filename=filename_str, mode="r+b")
else:
existing_pdf = PdfParser.PdfParser(f=fp, filename=filename, mode="w+b")
existing_pdf = PdfParser.PdfParser(f=fp, filename=filename_str, mode="w+b")
dpi = im.encoderinfo.get("dpi")
if dpi:
@ -228,12 +231,7 @@ def _save(im, fp, filename, save_all=False):
for im in ims:
im_number_of_pages = 1
if save_all:
try:
im_number_of_pages = im.n_frames
except AttributeError:
# Image format does not have n_frames.
# It is a single frame image
pass
im_number_of_pages = getattr(im, "n_frames", 1)
number_of_pages += im_number_of_pages
for i in range(im_number_of_pages):
image_refs.append(existing_pdf.next_object_id(0))
@ -250,7 +248,9 @@ def _save(im, fp, filename, save_all=False):
page_number = 0
for im_sequence in ims:
im_pages = ImageSequence.Iterator(im_sequence) if save_all else [im_sequence]
im_pages: ImageSequence.Iterator | list[Image.Image] = (
ImageSequence.Iterator(im_sequence) if save_all else [im_sequence]
)
for im in im_pages:
image_ref, procset = _write_image(im, filename, existing_pdf, image_refs)

View File

@ -144,7 +144,7 @@ def _safe_zlib_decompress(s):
return plaintext
def _crc32(data, seed=0):
def _crc32(data: bytes, seed: int = 0) -> int:
return zlib.crc32(data, seed) & 0xFFFFFFFF
@ -191,7 +191,7 @@ class ChunkStream:
assert self.queue is not None
self.queue.append((cid, pos, length))
def call(self, cid, pos, length):
def call(self, cid: bytes, pos: int, length: int) -> bytes:
"""Call the appropriate chunk handler"""
logger.debug("STREAM %r %s %s", cid, pos, length)
@ -1091,21 +1091,21 @@ _OUTMODES = {
}
def putchunk(fp, cid, *data):
def putchunk(fp: IO[bytes], cid: bytes, *data: bytes) -> None:
"""Write a PNG chunk (including CRC field)"""
data = b"".join(data)
byte_data = b"".join(data)
fp.write(o32(len(data)) + cid)
fp.write(data)
crc = _crc32(data, _crc32(cid))
fp.write(o32(len(byte_data)) + cid)
fp.write(byte_data)
crc = _crc32(byte_data, _crc32(cid))
fp.write(o32(crc))
class _idat:
# wrap output from the encoder in IDAT chunks
def __init__(self, fp, chunk):
def __init__(self, fp, chunk) -> None:
self.fp = fp
self.chunk = chunk
@ -1116,7 +1116,7 @@ class _idat:
class _fdat:
# wrap encoder output in fdAT chunks
def __init__(self, fp, chunk, seq_num):
def __init__(self, fp: IO[bytes], chunk, seq_num: int) -> None:
self.fp = fp
self.chunk = chunk
self.seq_num = seq_num
@ -1259,7 +1259,9 @@ def _save_all(im: Image.Image, fp: IO[bytes], filename: str | bytes) -> None:
_save(im, fp, filename, save_all=True)
def _save(im, fp, filename, chunk=putchunk, save_all=False):
def _save(
im: Image.Image, fp, filename: str | bytes, chunk=putchunk, save_all: bool = False
) -> None:
# save an image to disk (called by the save method)
if save_all:
@ -1461,7 +1463,7 @@ def _save(im, fp, filename, chunk=putchunk, save_all=False):
# PNG chunk converter
def getchunks(im, **params):
def getchunks(im: Image.Image, **params: Any) -> list[tuple[bytes, bytes, bytes]]:
"""Return a list of PNG chunks representing this image."""
class collector:
@ -1470,19 +1472,19 @@ def getchunks(im, **params):
def write(self, data: bytes) -> None:
pass
def append(self, chunk: bytes) -> None:
def append(self, chunk: tuple[bytes, bytes, bytes]) -> None:
self.data.append(chunk)
def append(fp, cid, *data):
data = b"".join(data)
crc = o32(_crc32(data, _crc32(cid)))
fp.append((cid, data, crc))
def append(fp: collector, cid: bytes, *data: bytes) -> None:
byte_data = b"".join(data)
crc = o32(_crc32(byte_data, _crc32(cid)))
fp.append((cid, byte_data, crc))
fp = collector()
try:
im.encoderinfo = params
_save(im, fp, None, append)
_save(im, fp, "", append)
finally:
del im.encoderinfo

View File

@ -334,12 +334,13 @@ class IFDRational(Rational):
__slots__ = ("_numerator", "_denominator", "_val")
def __init__(self, value, denominator=1):
def __init__(self, value, denominator: int = 1) -> None:
"""
:param value: either an integer numerator, a
float/rational/other number, or an IFDRational
:param denominator: Optional integer denominator
"""
self._val: Fraction | float
if isinstance(value, IFDRational):
self._numerator = value.numerator
self._denominator = value.denominator
@ -636,13 +637,13 @@ class ImageFileDirectory_v2(_IFDv2Base):
val = (val,)
return val
def __contains__(self, tag):
def __contains__(self, tag: object) -> bool:
return tag in self._tags_v2 or tag in self._tagdata
def __setitem__(self, tag, value):
def __setitem__(self, tag, value) -> None:
self._setitem(tag, value, self.legacy_api)
def _setitem(self, tag, value, legacy_api):
def _setitem(self, tag, value, legacy_api) -> None:
basetypes = (Number, bytes, str)
info = TiffTags.lookup(tag, self.group)
@ -758,7 +759,7 @@ class ImageFileDirectory_v2(_IFDv2Base):
return data
@_register_writer(1) # Basic type, except for the legacy API.
def write_byte(self, data):
def write_byte(self, data) -> bytes:
if isinstance(data, IFDRational):
data = int(data)
if isinstance(data, int):
@ -772,7 +773,7 @@ class ImageFileDirectory_v2(_IFDv2Base):
return data.decode("latin-1", "replace")
@_register_writer(2)
def write_string(self, value):
def write_string(self, value) -> bytes:
# remerge of https://github.com/python-pillow/Pillow/pull/1416
if isinstance(value, int):
value = str(value)
@ -790,7 +791,7 @@ class ImageFileDirectory_v2(_IFDv2Base):
return tuple(combine(num, denom) for num, denom in zip(vals[::2], vals[1::2]))
@_register_writer(5)
def write_rational(self, *values):
def write_rational(self, *values) -> bytes:
return b"".join(
self._pack("2L", *_limit_rational(frac, 2**32 - 1)) for frac in values
)
@ -800,7 +801,7 @@ class ImageFileDirectory_v2(_IFDv2Base):
return data
@_register_writer(7)
def write_undefined(self, value):
def write_undefined(self, value) -> bytes:
if isinstance(value, IFDRational):
value = int(value)
if isinstance(value, int):
@ -817,13 +818,13 @@ class ImageFileDirectory_v2(_IFDv2Base):
return tuple(combine(num, denom) for num, denom in zip(vals[::2], vals[1::2]))
@_register_writer(10)
def write_signed_rational(self, *values):
def write_signed_rational(self, *values) -> bytes:
return b"".join(
self._pack("2l", *_limit_signed_rational(frac, 2**31 - 1, -(2**31)))
for frac in values
)
def _ensure_read(self, fp, size):
def _ensure_read(self, fp: IO[bytes], size: int) -> bytes:
ret = fp.read(size)
if len(ret) != size:
msg = (
@ -977,7 +978,7 @@ class ImageFileDirectory_v2(_IFDv2Base):
return result
def save(self, fp):
def save(self, fp: IO[bytes]) -> int:
if fp.tell() == 0: # skip TIFF header on subsequent pages
# tiff header -- PIL always starts the first IFD at offset 8
fp.write(self._prefix + self._pack("HL", 42, 8))
@ -1017,7 +1018,7 @@ class ImageFileDirectory_v1(ImageFileDirectory_v2):
.. deprecated:: 3.0.0
"""
def __init__(self, *args, **kwargs):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self._legacy_api = True
@ -1029,7 +1030,7 @@ class ImageFileDirectory_v1(ImageFileDirectory_v2):
"""Dictionary of tag types"""
@classmethod
def from_v2(cls, original):
def from_v2(cls, original) -> ImageFileDirectory_v1:
"""Returns an
:py:class:`~PIL.TiffImagePlugin.ImageFileDirectory_v1`
instance with the same data as is contained in the original
@ -1063,7 +1064,7 @@ class ImageFileDirectory_v1(ImageFileDirectory_v2):
ifd._tags_v2 = dict(self._tags_v2)
return ifd
def __contains__(self, tag):
def __contains__(self, tag: object) -> bool:
return tag in self._tags_v1 or tag in self._tagdata
def __len__(self) -> int:
@ -1072,7 +1073,7 @@ class ImageFileDirectory_v1(ImageFileDirectory_v2):
def __iter__(self):
return iter(set(self._tagdata) | set(self._tags_v1))
def __setitem__(self, tag, value):
def __setitem__(self, tag, value) -> None:
for legacy_api in (False, True):
self._setitem(tag, value, legacy_api)
@ -1122,7 +1123,7 @@ class TiffImageFile(ImageFile.ImageFile):
self.tag_v2 = ImageFileDirectory_v2(ifh)
# legacy IFD entries will be filled in later
self.ifd = None
self.ifd: ImageFileDirectory_v1 | None = None
# setup frame pointers
self.__first = self.__next = self.tag_v2.next
@ -1343,7 +1344,7 @@ class TiffImageFile(ImageFile.ImageFile):
return Image.Image.load(self)
def _setup(self):
def _setup(self) -> None:
"""Setup this image object based on current tags"""
if 0xBC01 in self.tag_v2:
@ -1537,13 +1538,13 @@ class TiffImageFile(ImageFile.ImageFile):
# adjust stride width accordingly
stride /= bps_count
a = (tile_rawmode, int(stride), 1)
args = (tile_rawmode, int(stride), 1)
self.tile.append(
(
self._compression,
(x, y, min(x + w, xsize), min(y + h, ysize)),
offset,
a,
args,
)
)
x = x + w
@ -1938,7 +1939,7 @@ class AppendingTiffWriter:
521, # JPEGACTables
}
def __init__(self, fn, new=False):
def __init__(self, fn, new: bool = False) -> None:
if hasattr(fn, "read"):
self.f = fn
self.close_fp = False
@ -2015,7 +2016,7 @@ class AppendingTiffWriter:
def tell(self) -> int:
return self.f.tell() - self.offsetOfNewPage
def seek(self, offset, whence=io.SEEK_SET):
def seek(self, offset: int, whence=io.SEEK_SET) -> int:
if whence == os.SEEK_SET:
offset += self.offsetOfNewPage

View File

@ -24,8 +24,11 @@ and has been tested with a few sample files found using google.
"""
from __future__ import annotations
from typing import IO
from . import Image, ImageFile
from ._binary import i32le as i32
from ._typing import StrOrBytesPath
class WalImageFile(ImageFile.ImageFile):
@ -58,7 +61,7 @@ class WalImageFile(ImageFile.ImageFile):
return Image.Image.load(self)
def open(filename):
def open(filename: StrOrBytesPath | IO[bytes]) -> WalImageFile:
"""
Load texture from a Quake2 WAL texture file.