Added type hints

This commit is contained in:
Andrew Murray 2024-07-12 21:16:56 +10:00
parent 6a9acfa5ca
commit 5bae934317
9 changed files with 79 additions and 73 deletions

View File

@ -41,7 +41,7 @@ MAGIC = PngImagePlugin._MAGIC
def chunk(cid: bytes, *data: bytes) -> bytes: def chunk(cid: bytes, *data: bytes) -> bytes:
test_file = BytesIO() test_file = BytesIO()
PngImagePlugin.putchunk(*(test_file, cid) + data) PngImagePlugin.putchunk(test_file, cid, *data)
return test_file.getvalue() return test_file.getvalue()

View File

@ -127,7 +127,7 @@ class TestImageFile:
def test_raise_typeerror(self) -> None: def test_raise_typeerror(self) -> None:
with pytest.raises(TypeError): with pytest.raises(TypeError):
parser = ImageFile.Parser() parser = ImageFile.Parser()
parser.feed(1) parser.feed(1) # type: ignore[arg-type]
def test_negative_stride(self) -> None: def test_negative_stride(self) -> None:
with open("Tests/images/raw_negative_stride.bin", "rb") as f: with open("Tests/images/raw_negative_stride.bin", "rb") as f:
@ -305,7 +305,7 @@ class TestPyDecoder(CodecsTest):
im.load() im.load()
def test_decode(self) -> None: def test_decode(self) -> None:
decoder = ImageFile.PyDecoder(None) decoder = ImageFile.PyDecoder("")
with pytest.raises(NotImplementedError): with pytest.raises(NotImplementedError):
decoder.decode(b"") decoder.decode(b"")
@ -383,7 +383,7 @@ class TestPyEncoder(CodecsTest):
) )
def test_encode(self) -> None: def test_encode(self) -> None:
encoder = ImageFile.PyEncoder(None) encoder = ImageFile.PyEncoder("")
with pytest.raises(NotImplementedError): with pytest.raises(NotImplementedError):
encoder.encode(0) encoder.encode(0)

View File

@ -86,7 +86,7 @@ def raise_oserror(error: int) -> OSError:
raise _get_oserror(error, encoder=False) raise _get_oserror(error, encoder=False)
def _tilesort(t): def _tilesort(t) -> int:
# sort on offset # sort on offset
return t[2] return t[2]
@ -161,7 +161,7 @@ class ImageFile(Image.Image):
return Image.MIME.get(self.format.upper()) return Image.MIME.get(self.format.upper())
return None return None
def __setstate__(self, state): def __setstate__(self, state) -> None:
self.tile = [] self.tile = []
super().__setstate__(state) super().__setstate__(state)
@ -333,14 +333,14 @@ class ImageFile(Image.Image):
# def load_read(self, read_bytes: int) -> bytes: # def load_read(self, read_bytes: int) -> bytes:
# pass # pass
def _seek_check(self, frame): def _seek_check(self, frame: int) -> bool:
if ( if (
frame < self._min_frame frame < self._min_frame
# Only check upper limit on frames if additional seek operations # Only check upper limit on frames if additional seek operations
# are not required to do so # are not required to do so
or ( or (
not (hasattr(self, "_n_frames") and self._n_frames is None) not (hasattr(self, "_n_frames") and self._n_frames is None)
and frame >= self.n_frames + self._min_frame and frame >= getattr(self, "n_frames") + self._min_frame
) )
): ):
msg = "attempt to seek outside sequence" msg = "attempt to seek outside sequence"
@ -370,7 +370,7 @@ class StubImageFile(ImageFile):
msg = "StubImageFile subclass must implement _open" msg = "StubImageFile subclass must implement _open"
raise NotImplementedError(msg) raise NotImplementedError(msg)
def load(self): def load(self) -> Image.core.PixelAccess | None:
loader = self._load() loader = self._load()
if loader is None: if loader is None:
msg = f"cannot find loader for this {self.format} file" msg = f"cannot find loader for this {self.format} file"
@ -378,7 +378,7 @@ class StubImageFile(ImageFile):
image = loader.load(self) image = loader.load(self)
assert image is not None assert image is not None
# become the other object (!) # become the other object (!)
self.__class__ = image.__class__ self.__class__ = image.__class__ # type: ignore[assignment]
self.__dict__ = image.__dict__ self.__dict__ = image.__dict__
return image.load() return image.load()
@ -396,8 +396,8 @@ class Parser:
incremental = None incremental = None
image: Image.Image | None = None image: Image.Image | None = None
data = None data: bytes | None = None
decoder = None decoder: Image.core.ImagingDecoder | PyDecoder | None = None
offset = 0 offset = 0
finished = 0 finished = 0
@ -409,7 +409,7 @@ class Parser:
""" """
assert self.data is None, "cannot reuse parsers" assert self.data is None, "cannot reuse parsers"
def feed(self, data): def feed(self, data: bytes) -> None:
""" """
(Consumer) Feed data to the parser. (Consumer) Feed data to the parser.
@ -491,7 +491,7 @@ class Parser:
def __exit__(self, *args: object) -> None: def __exit__(self, *args: object) -> None:
self.close() self.close()
def close(self): def close(self) -> Image.Image:
""" """
(Consumer) Close the stream. (Consumer) Close the stream.
@ -525,7 +525,7 @@ class Parser:
# -------------------------------------------------------------------- # --------------------------------------------------------------------
def _save(im, fp, tile, bufsize=0) -> None: def _save(im, fp, tile, bufsize: int = 0) -> None:
"""Helper to save image based on tile list """Helper to save image based on tile list
:param im: Image object. :param im: Image object.
@ -553,7 +553,7 @@ def _save(im, fp, tile, bufsize=0) -> None:
fp.flush() fp.flush()
def _encode_tile(im, fp, tile: list[_Tile], bufsize, fh, exc=None): def _encode_tile(im, fp, tile: list[_Tile], bufsize: int, fh, exc=None) -> None:
for encoder_name, extents, offset, args in tile: for encoder_name, extents, offset, args in tile:
if offset > 0: if offset > 0:
fp.seek(offset) fp.seek(offset)
@ -629,18 +629,18 @@ class PyCodecState:
class PyCodec: class PyCodec:
fd: IO[bytes] | None fd: IO[bytes] | None
def __init__(self, mode, *args): def __init__(self, mode: str, *args: Any) -> None:
self.im = None self.im: Image.core.ImagingCore | None = None
self.state = PyCodecState() self.state = PyCodecState()
self.fd = None self.fd = None
self.mode = mode self.mode = mode
self.init(args) self.init(args)
def init(self, args) -> None: def init(self, args: tuple[Any, ...]) -> None:
""" """
Override to perform codec specific initialization Override to perform codec specific initialization
:param args: Array of args items from the tile entry :param args: Tuple of arg items from the tile entry
:returns: None :returns: None
""" """
self.args = args self.args = args
@ -662,7 +662,7 @@ class PyCodec:
""" """
self.fd = fd self.fd = fd
def setimage(self, im, extents: tuple[int, int, int, int] | None = None) -> None: def setimage(self, im, extents=None):
""" """
Called from ImageFile to set the core output image for the codec Called from ImageFile to set the core output image for the codec

View File

@ -33,7 +33,7 @@ class Iterator:
:param im: An image object. :param im: An image object.
""" """
def __init__(self, im: Image.Image): def __init__(self, im: Image.Image) -> None:
if not hasattr(im, "seek"): if not hasattr(im, "seek"):
msg = "im must have seek method" msg = "im must have seek method"
raise AttributeError(msg) raise AttributeError(msg)

View File

@ -827,11 +827,11 @@ def _save_cjpeg(im: Image.Image, fp: IO[bytes], filename: str | bytes) -> None:
## ##
# Factory for making JPEG and MPO instances # Factory for making JPEG and MPO instances
def jpeg_factory(fp=None, filename=None): def jpeg_factory(fp: IO[bytes] | None = None, filename: str | bytes | None = None):
im = JpegImageFile(fp, filename) im = JpegImageFile(fp, filename)
try: try:
mpheader = im._getmp() mpheader = im._getmp()
if mpheader[45057] > 1: if mpheader is not None and mpheader[45057] > 1:
for segment, content in im.applist: for segment, content in im.applist:
if segment == "APP1" and b' hdrgm:Version="' in content: if segment == "APP1" and b' hdrgm:Version="' in content:
# Ultra HDR images are not yet supported # Ultra HDR images are not yet supported

View File

@ -174,12 +174,15 @@ def _write_image(im, filename, existing_pdf, image_refs):
return image_ref, procset return image_ref, procset
def _save(im, fp, filename, save_all=False): def _save(
im: Image.Image, fp: IO[bytes], filename: str | bytes, save_all: bool = False
) -> None:
is_appending = im.encoderinfo.get("append", False) is_appending = im.encoderinfo.get("append", False)
filename_str = filename.decode() if isinstance(filename, bytes) else filename
if is_appending: if is_appending:
existing_pdf = PdfParser.PdfParser(f=fp, filename=filename, mode="r+b") existing_pdf = PdfParser.PdfParser(f=fp, filename=filename_str, mode="r+b")
else: else:
existing_pdf = PdfParser.PdfParser(f=fp, filename=filename, mode="w+b") existing_pdf = PdfParser.PdfParser(f=fp, filename=filename_str, mode="w+b")
dpi = im.encoderinfo.get("dpi") dpi = im.encoderinfo.get("dpi")
if dpi: if dpi:
@ -228,12 +231,7 @@ def _save(im, fp, filename, save_all=False):
for im in ims: for im in ims:
im_number_of_pages = 1 im_number_of_pages = 1
if save_all: if save_all:
try: im_number_of_pages = getattr(im, "n_frames", 1)
im_number_of_pages = im.n_frames
except AttributeError:
# Image format does not have n_frames.
# It is a single frame image
pass
number_of_pages += im_number_of_pages number_of_pages += im_number_of_pages
for i in range(im_number_of_pages): for i in range(im_number_of_pages):
image_refs.append(existing_pdf.next_object_id(0)) image_refs.append(existing_pdf.next_object_id(0))
@ -250,7 +248,9 @@ def _save(im, fp, filename, save_all=False):
page_number = 0 page_number = 0
for im_sequence in ims: for im_sequence in ims:
im_pages = ImageSequence.Iterator(im_sequence) if save_all else [im_sequence] im_pages: ImageSequence.Iterator | list[Image.Image] = (
ImageSequence.Iterator(im_sequence) if save_all else [im_sequence]
)
for im in im_pages: for im in im_pages:
image_ref, procset = _write_image(im, filename, existing_pdf, image_refs) image_ref, procset = _write_image(im, filename, existing_pdf, image_refs)

View File

@ -144,7 +144,7 @@ def _safe_zlib_decompress(s):
return plaintext return plaintext
def _crc32(data, seed=0): def _crc32(data: bytes, seed: int = 0) -> int:
return zlib.crc32(data, seed) & 0xFFFFFFFF return zlib.crc32(data, seed) & 0xFFFFFFFF
@ -191,7 +191,7 @@ class ChunkStream:
assert self.queue is not None assert self.queue is not None
self.queue.append((cid, pos, length)) self.queue.append((cid, pos, length))
def call(self, cid, pos, length): def call(self, cid: bytes, pos: int, length: int) -> bytes:
"""Call the appropriate chunk handler""" """Call the appropriate chunk handler"""
logger.debug("STREAM %r %s %s", cid, pos, length) logger.debug("STREAM %r %s %s", cid, pos, length)
@ -1091,21 +1091,21 @@ _OUTMODES = {
} }
def putchunk(fp, cid, *data): def putchunk(fp: IO[bytes], cid: bytes, *data: bytes) -> None:
"""Write a PNG chunk (including CRC field)""" """Write a PNG chunk (including CRC field)"""
data = b"".join(data) byte_data = b"".join(data)
fp.write(o32(len(data)) + cid) fp.write(o32(len(byte_data)) + cid)
fp.write(data) fp.write(byte_data)
crc = _crc32(data, _crc32(cid)) crc = _crc32(byte_data, _crc32(cid))
fp.write(o32(crc)) fp.write(o32(crc))
class _idat: class _idat:
# wrap output from the encoder in IDAT chunks # wrap output from the encoder in IDAT chunks
def __init__(self, fp, chunk): def __init__(self, fp, chunk) -> None:
self.fp = fp self.fp = fp
self.chunk = chunk self.chunk = chunk
@ -1116,7 +1116,7 @@ class _idat:
class _fdat: class _fdat:
# wrap encoder output in fdAT chunks # wrap encoder output in fdAT chunks
def __init__(self, fp, chunk, seq_num): def __init__(self, fp: IO[bytes], chunk, seq_num: int) -> None:
self.fp = fp self.fp = fp
self.chunk = chunk self.chunk = chunk
self.seq_num = seq_num self.seq_num = seq_num
@ -1259,7 +1259,9 @@ def _save_all(im: Image.Image, fp: IO[bytes], filename: str | bytes) -> None:
_save(im, fp, filename, save_all=True) _save(im, fp, filename, save_all=True)
def _save(im, fp, filename, chunk=putchunk, save_all=False): def _save(
im: Image.Image, fp, filename: str | bytes, chunk=putchunk, save_all: bool = False
) -> None:
# save an image to disk (called by the save method) # save an image to disk (called by the save method)
if save_all: if save_all:
@ -1461,7 +1463,7 @@ def _save(im, fp, filename, chunk=putchunk, save_all=False):
# PNG chunk converter # PNG chunk converter
def getchunks(im, **params): def getchunks(im: Image.Image, **params: Any) -> list[tuple[bytes, bytes, bytes]]:
"""Return a list of PNG chunks representing this image.""" """Return a list of PNG chunks representing this image."""
class collector: class collector:
@ -1470,19 +1472,19 @@ def getchunks(im, **params):
def write(self, data: bytes) -> None: def write(self, data: bytes) -> None:
pass pass
def append(self, chunk: bytes) -> None: def append(self, chunk: tuple[bytes, bytes, bytes]) -> None:
self.data.append(chunk) self.data.append(chunk)
def append(fp, cid, *data): def append(fp: collector, cid: bytes, *data: bytes) -> None:
data = b"".join(data) byte_data = b"".join(data)
crc = o32(_crc32(data, _crc32(cid))) crc = o32(_crc32(byte_data, _crc32(cid)))
fp.append((cid, data, crc)) fp.append((cid, byte_data, crc))
fp = collector() fp = collector()
try: try:
im.encoderinfo = params im.encoderinfo = params
_save(im, fp, None, append) _save(im, fp, "", append)
finally: finally:
del im.encoderinfo del im.encoderinfo

View File

@ -334,12 +334,13 @@ class IFDRational(Rational):
__slots__ = ("_numerator", "_denominator", "_val") __slots__ = ("_numerator", "_denominator", "_val")
def __init__(self, value, denominator=1): def __init__(self, value, denominator: int = 1) -> None:
""" """
:param value: either an integer numerator, a :param value: either an integer numerator, a
float/rational/other number, or an IFDRational float/rational/other number, or an IFDRational
:param denominator: Optional integer denominator :param denominator: Optional integer denominator
""" """
self._val: Fraction | float
if isinstance(value, IFDRational): if isinstance(value, IFDRational):
self._numerator = value.numerator self._numerator = value.numerator
self._denominator = value.denominator self._denominator = value.denominator
@ -636,13 +637,13 @@ class ImageFileDirectory_v2(_IFDv2Base):
val = (val,) val = (val,)
return val return val
def __contains__(self, tag): def __contains__(self, tag: object) -> bool:
return tag in self._tags_v2 or tag in self._tagdata return tag in self._tags_v2 or tag in self._tagdata
def __setitem__(self, tag, value): def __setitem__(self, tag, value) -> None:
self._setitem(tag, value, self.legacy_api) self._setitem(tag, value, self.legacy_api)
def _setitem(self, tag, value, legacy_api): def _setitem(self, tag, value, legacy_api) -> None:
basetypes = (Number, bytes, str) basetypes = (Number, bytes, str)
info = TiffTags.lookup(tag, self.group) info = TiffTags.lookup(tag, self.group)
@ -758,7 +759,7 @@ class ImageFileDirectory_v2(_IFDv2Base):
return data return data
@_register_writer(1) # Basic type, except for the legacy API. @_register_writer(1) # Basic type, except for the legacy API.
def write_byte(self, data): def write_byte(self, data) -> bytes:
if isinstance(data, IFDRational): if isinstance(data, IFDRational):
data = int(data) data = int(data)
if isinstance(data, int): if isinstance(data, int):
@ -772,7 +773,7 @@ class ImageFileDirectory_v2(_IFDv2Base):
return data.decode("latin-1", "replace") return data.decode("latin-1", "replace")
@_register_writer(2) @_register_writer(2)
def write_string(self, value): def write_string(self, value) -> bytes:
# remerge of https://github.com/python-pillow/Pillow/pull/1416 # remerge of https://github.com/python-pillow/Pillow/pull/1416
if isinstance(value, int): if isinstance(value, int):
value = str(value) value = str(value)
@ -790,7 +791,7 @@ class ImageFileDirectory_v2(_IFDv2Base):
return tuple(combine(num, denom) for num, denom in zip(vals[::2], vals[1::2])) return tuple(combine(num, denom) for num, denom in zip(vals[::2], vals[1::2]))
@_register_writer(5) @_register_writer(5)
def write_rational(self, *values): def write_rational(self, *values) -> bytes:
return b"".join( return b"".join(
self._pack("2L", *_limit_rational(frac, 2**32 - 1)) for frac in values self._pack("2L", *_limit_rational(frac, 2**32 - 1)) for frac in values
) )
@ -800,7 +801,7 @@ class ImageFileDirectory_v2(_IFDv2Base):
return data return data
@_register_writer(7) @_register_writer(7)
def write_undefined(self, value): def write_undefined(self, value) -> bytes:
if isinstance(value, IFDRational): if isinstance(value, IFDRational):
value = int(value) value = int(value)
if isinstance(value, int): if isinstance(value, int):
@ -817,13 +818,13 @@ class ImageFileDirectory_v2(_IFDv2Base):
return tuple(combine(num, denom) for num, denom in zip(vals[::2], vals[1::2])) return tuple(combine(num, denom) for num, denom in zip(vals[::2], vals[1::2]))
@_register_writer(10) @_register_writer(10)
def write_signed_rational(self, *values): def write_signed_rational(self, *values) -> bytes:
return b"".join( return b"".join(
self._pack("2l", *_limit_signed_rational(frac, 2**31 - 1, -(2**31))) self._pack("2l", *_limit_signed_rational(frac, 2**31 - 1, -(2**31)))
for frac in values for frac in values
) )
def _ensure_read(self, fp, size): def _ensure_read(self, fp: IO[bytes], size: int) -> bytes:
ret = fp.read(size) ret = fp.read(size)
if len(ret) != size: if len(ret) != size:
msg = ( msg = (
@ -977,7 +978,7 @@ class ImageFileDirectory_v2(_IFDv2Base):
return result return result
def save(self, fp): def save(self, fp: IO[bytes]) -> int:
if fp.tell() == 0: # skip TIFF header on subsequent pages if fp.tell() == 0: # skip TIFF header on subsequent pages
# tiff header -- PIL always starts the first IFD at offset 8 # tiff header -- PIL always starts the first IFD at offset 8
fp.write(self._prefix + self._pack("HL", 42, 8)) fp.write(self._prefix + self._pack("HL", 42, 8))
@ -1017,7 +1018,7 @@ class ImageFileDirectory_v1(ImageFileDirectory_v2):
.. deprecated:: 3.0.0 .. deprecated:: 3.0.0
""" """
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self._legacy_api = True self._legacy_api = True
@ -1029,7 +1030,7 @@ class ImageFileDirectory_v1(ImageFileDirectory_v2):
"""Dictionary of tag types""" """Dictionary of tag types"""
@classmethod @classmethod
def from_v2(cls, original): def from_v2(cls, original) -> ImageFileDirectory_v1:
"""Returns an """Returns an
:py:class:`~PIL.TiffImagePlugin.ImageFileDirectory_v1` :py:class:`~PIL.TiffImagePlugin.ImageFileDirectory_v1`
instance with the same data as is contained in the original instance with the same data as is contained in the original
@ -1063,7 +1064,7 @@ class ImageFileDirectory_v1(ImageFileDirectory_v2):
ifd._tags_v2 = dict(self._tags_v2) ifd._tags_v2 = dict(self._tags_v2)
return ifd return ifd
def __contains__(self, tag): def __contains__(self, tag: object) -> bool:
return tag in self._tags_v1 or tag in self._tagdata return tag in self._tags_v1 or tag in self._tagdata
def __len__(self) -> int: def __len__(self) -> int:
@ -1072,7 +1073,7 @@ class ImageFileDirectory_v1(ImageFileDirectory_v2):
def __iter__(self): def __iter__(self):
return iter(set(self._tagdata) | set(self._tags_v1)) return iter(set(self._tagdata) | set(self._tags_v1))
def __setitem__(self, tag, value): def __setitem__(self, tag, value) -> None:
for legacy_api in (False, True): for legacy_api in (False, True):
self._setitem(tag, value, legacy_api) self._setitem(tag, value, legacy_api)
@ -1122,7 +1123,7 @@ class TiffImageFile(ImageFile.ImageFile):
self.tag_v2 = ImageFileDirectory_v2(ifh) self.tag_v2 = ImageFileDirectory_v2(ifh)
# legacy IFD entries will be filled in later # legacy IFD entries will be filled in later
self.ifd = None self.ifd: ImageFileDirectory_v1 | None = None
# setup frame pointers # setup frame pointers
self.__first = self.__next = self.tag_v2.next self.__first = self.__next = self.tag_v2.next
@ -1343,7 +1344,7 @@ class TiffImageFile(ImageFile.ImageFile):
return Image.Image.load(self) return Image.Image.load(self)
def _setup(self): def _setup(self) -> None:
"""Setup this image object based on current tags""" """Setup this image object based on current tags"""
if 0xBC01 in self.tag_v2: if 0xBC01 in self.tag_v2:
@ -1537,13 +1538,13 @@ class TiffImageFile(ImageFile.ImageFile):
# adjust stride width accordingly # adjust stride width accordingly
stride /= bps_count stride /= bps_count
a = (tile_rawmode, int(stride), 1) args = (tile_rawmode, int(stride), 1)
self.tile.append( self.tile.append(
( (
self._compression, self._compression,
(x, y, min(x + w, xsize), min(y + h, ysize)), (x, y, min(x + w, xsize), min(y + h, ysize)),
offset, offset,
a, args,
) )
) )
x = x + w x = x + w
@ -1938,7 +1939,7 @@ class AppendingTiffWriter:
521, # JPEGACTables 521, # JPEGACTables
} }
def __init__(self, fn, new=False): def __init__(self, fn, new: bool = False) -> None:
if hasattr(fn, "read"): if hasattr(fn, "read"):
self.f = fn self.f = fn
self.close_fp = False self.close_fp = False
@ -2015,7 +2016,7 @@ class AppendingTiffWriter:
def tell(self) -> int: def tell(self) -> int:
return self.f.tell() - self.offsetOfNewPage return self.f.tell() - self.offsetOfNewPage
def seek(self, offset, whence=io.SEEK_SET): def seek(self, offset: int, whence=io.SEEK_SET) -> int:
if whence == os.SEEK_SET: if whence == os.SEEK_SET:
offset += self.offsetOfNewPage offset += self.offsetOfNewPage

View File

@ -24,8 +24,11 @@ and has been tested with a few sample files found using google.
""" """
from __future__ import annotations from __future__ import annotations
from typing import IO
from . import Image, ImageFile from . import Image, ImageFile
from ._binary import i32le as i32 from ._binary import i32le as i32
from ._typing import StrOrBytesPath
class WalImageFile(ImageFile.ImageFile): class WalImageFile(ImageFile.ImageFile):
@ -58,7 +61,7 @@ class WalImageFile(ImageFile.ImageFile):
return Image.Image.load(self) return Image.Image.load(self)
def open(filename): def open(filename: StrOrBytesPath | IO[bytes]) -> WalImageFile:
""" """
Load texture from a Quake2 WAL texture file. Load texture from a Quake2 WAL texture file.