From e419fd7ab4a71bc269a9c624c92d8955b372aaed Mon Sep 17 00:00:00 2001 From: Andrew Murray Date: Wed, 15 May 2024 20:19:09 +1000 Subject: [PATCH] Added type hints --- src/PIL/DdsImagePlugin.py | 2 +- src/PIL/EpsImagePlugin.py | 4 +-- src/PIL/FtexImagePlugin.py | 2 +- src/PIL/IcoImagePlugin.py | 2 +- src/PIL/ImageFile.py | 4 +-- src/PIL/JpegImagePlugin.py | 2 +- src/PIL/MpoImagePlugin.py | 2 +- src/PIL/PngImagePlugin.py | 74 +++++++++++++++++++++----------------- src/PIL/WebPImagePlugin.py | 2 +- src/PIL/XpmImagePlugin.py | 7 ++-- src/PIL/features.py | 27 +++++++------- 11 files changed, 65 insertions(+), 63 deletions(-) diff --git a/src/PIL/DdsImagePlugin.py b/src/PIL/DdsImagePlugin.py index b2ddbe44f..1575f2d88 100644 --- a/src/PIL/DdsImagePlugin.py +++ b/src/PIL/DdsImagePlugin.py @@ -472,7 +472,7 @@ class DdsImageFile(ImageFile.ImageFile): else: self.tile = [ImageFile._Tile("raw", extents, 0, rawmode or self.mode)] - def load_seek(self, pos): + def load_seek(self, pos: int) -> None: pass diff --git a/src/PIL/EpsImagePlugin.py b/src/PIL/EpsImagePlugin.py index b57daca56..5a44baa49 100644 --- a/src/PIL/EpsImagePlugin.py +++ b/src/PIL/EpsImagePlugin.py @@ -42,7 +42,7 @@ gs_binary: str | bool | None = None gs_windows_binary = None -def has_ghostscript(): +def has_ghostscript() -> bool: global gs_binary, gs_windows_binary if gs_binary is None: if sys.platform.startswith("win"): @@ -404,7 +404,7 @@ class EpsImageFile(ImageFile.ImageFile): self.tile = [] return Image.Image.load(self) - def load_seek(self, pos): + def load_seek(self, pos: int) -> None: # we can't incrementally load, so force ImageFile.parser to # use our custom load method by defining this method. pass diff --git a/src/PIL/FtexImagePlugin.py b/src/PIL/FtexImagePlugin.py index 7fcf81376..5acbb4912 100644 --- a/src/PIL/FtexImagePlugin.py +++ b/src/PIL/FtexImagePlugin.py @@ -103,7 +103,7 @@ class FtexImageFile(ImageFile.ImageFile): self.fp.close() self.fp = BytesIO(data) - def load_seek(self, pos): + def load_seek(self, pos: int) -> None: pass diff --git a/src/PIL/IcoImagePlugin.py b/src/PIL/IcoImagePlugin.py index eacffbae6..cea093f9c 100644 --- a/src/PIL/IcoImagePlugin.py +++ b/src/PIL/IcoImagePlugin.py @@ -341,7 +341,7 @@ class IcoImageFile(ImageFile.ImageFile): self.size = im.size - def load_seek(self, pos): + def load_seek(self, pos: int) -> None: # Flag the ImageFile.Parser so that it # just does all the decode at the end. pass diff --git a/src/PIL/ImageFile.py b/src/PIL/ImageFile.py index b93e2ad2c..33467fc4f 100644 --- a/src/PIL/ImageFile.py +++ b/src/PIL/ImageFile.py @@ -324,11 +324,11 @@ class ImageFile(Image.Image): pass # may be defined for contained formats - # def load_seek(self, pos): + # def load_seek(self, pos: int) -> None: # pass # may be defined for blocked formats (e.g. PNG) - # def load_read(self, read_bytes): + # def load_read(self, read_bytes: int) -> bytes: # pass def _seek_check(self, frame): diff --git a/src/PIL/JpegImagePlugin.py b/src/PIL/JpegImagePlugin.py index 7a3c99b6c..909911dfe 100644 --- a/src/PIL/JpegImagePlugin.py +++ b/src/PIL/JpegImagePlugin.py @@ -408,7 +408,7 @@ class JpegImageFile(ImageFile.ImageFile): msg = "no marker found" raise SyntaxError(msg) - def load_read(self, read_bytes): + def load_read(self, read_bytes: int) -> bytes: """ internal: read more image data For premature EOF and LOAD_TRUNCATED_IMAGES adds EOI marker diff --git a/src/PIL/MpoImagePlugin.py b/src/PIL/MpoImagePlugin.py index eba35fb4d..766e1290c 100644 --- a/src/PIL/MpoImagePlugin.py +++ b/src/PIL/MpoImagePlugin.py @@ -124,7 +124,7 @@ class MpoImageFile(JpegImagePlugin.JpegImageFile): # for now we can only handle reading and individual frame extraction self.readonly = 1 - def load_seek(self, pos): + def load_seek(self, pos: int) -> None: self._fp.seek(pos) def seek(self, frame: int) -> None: diff --git a/src/PIL/PngImagePlugin.py b/src/PIL/PngImagePlugin.py index 76e0abc31..c74cbccf1 100644 --- a/src/PIL/PngImagePlugin.py +++ b/src/PIL/PngImagePlugin.py @@ -39,6 +39,7 @@ import struct import warnings import zlib from enum import IntEnum +from typing import IO from . import Image, ImageChops, ImageFile, ImagePalette, ImageSequence from ._binary import i16be as i16 @@ -149,14 +150,15 @@ def _crc32(data, seed=0): class ChunkStream: - def __init__(self, fp): - self.fp = fp - self.queue = [] + def __init__(self, fp: IO[bytes]) -> None: + self.fp: IO[bytes] | None = fp + self.queue: list[tuple[bytes, int, int]] | None = [] - def read(self): + def read(self) -> tuple[bytes, int, int]: """Fetch a new chunk. Returns header information.""" cid = None + assert self.fp is not None if self.queue: cid, pos, length = self.queue.pop() self.fp.seek(pos) @@ -173,7 +175,7 @@ class ChunkStream: return cid, pos, length - def __enter__(self): + def __enter__(self) -> ChunkStream: return self def __exit__(self, *args): @@ -182,7 +184,8 @@ class ChunkStream: def close(self) -> None: self.queue = self.fp = None - def push(self, cid, pos, length): + def push(self, cid: bytes, pos: int, length: int) -> None: + assert self.queue is not None self.queue.append((cid, pos, length)) def call(self, cid, pos, length): @@ -191,7 +194,7 @@ class ChunkStream: logger.debug("STREAM %r %s %s", cid, pos, length) return getattr(self, f"chunk_{cid.decode('ascii')}")(pos, length) - def crc(self, cid, data): + def crc(self, cid: bytes, data: bytes) -> None: """Read and verify checksum""" # Skip CRC checks for ancillary chunks if allowed to load truncated @@ -201,6 +204,7 @@ class ChunkStream: self.crc_skip(cid, data) return + assert self.fp is not None try: crc1 = _crc32(data, _crc32(cid)) crc2 = i32(self.fp.read(4)) @@ -211,12 +215,13 @@ class ChunkStream: msg = f"broken PNG file (incomplete checksum in {repr(cid)})" raise SyntaxError(msg) from e - def crc_skip(self, cid, data): + def crc_skip(self, cid: bytes, data: bytes) -> None: """Read checksum""" + assert self.fp is not None self.fp.read(4) - def verify(self, endchunk=b"IEND"): + def verify(self, endchunk: bytes = b"IEND") -> list[bytes]: # Simple approach; just calculate checksum for all remaining # blocks. Must be called directly after open. @@ -361,7 +366,7 @@ class PngStream(ChunkStream): self.text_memory = 0 - def check_text_memory(self, chunklen): + def check_text_memory(self, chunklen: int) -> None: self.text_memory += chunklen if self.text_memory > MAX_TEXT_MEMORY: msg = ( @@ -382,7 +387,7 @@ class PngStream(ChunkStream): self.im_tile = self.rewind_state["tile"] self._seq_num = self.rewind_state["seq_num"] - def chunk_iCCP(self, pos, length): + def chunk_iCCP(self, pos: int, length: int) -> bytes: # ICC profile s = ImageFile._safe_read(self.fp, length) # according to PNG spec, the iCCP chunk contains: @@ -409,7 +414,7 @@ class PngStream(ChunkStream): self.im_info["icc_profile"] = icc_profile return s - def chunk_IHDR(self, pos, length): + def chunk_IHDR(self, pos: int, length: int) -> bytes: # image header s = ImageFile._safe_read(self.fp, length) if length < 13: @@ -446,14 +451,14 @@ class PngStream(ChunkStream): msg = "end of PNG image" raise EOFError(msg) - def chunk_PLTE(self, pos, length): + def chunk_PLTE(self, pos: int, length: int) -> bytes: # palette s = ImageFile._safe_read(self.fp, length) if self.im_mode == "P": self.im_palette = "RGB", s return s - def chunk_tRNS(self, pos, length): + def chunk_tRNS(self, pos: int, length: int) -> bytes: # transparency s = ImageFile._safe_read(self.fp, length) if self.im_mode == "P": @@ -473,13 +478,13 @@ class PngStream(ChunkStream): self.im_info["transparency"] = i16(s), i16(s, 2), i16(s, 4) return s - def chunk_gAMA(self, pos, length): + def chunk_gAMA(self, pos: int, length: int) -> bytes: # gamma setting s = ImageFile._safe_read(self.fp, length) self.im_info["gamma"] = i32(s) / 100000.0 return s - def chunk_cHRM(self, pos, length): + def chunk_cHRM(self, pos: int, length: int) -> bytes: # chromaticity, 8 unsigned ints, actual value is scaled by 100,000 # WP x,y, Red x,y, Green x,y Blue x,y @@ -488,7 +493,7 @@ class PngStream(ChunkStream): self.im_info["chromaticity"] = tuple(elt / 100000.0 for elt in raw_vals) return s - def chunk_sRGB(self, pos, length): + def chunk_sRGB(self, pos: int, length: int) -> bytes: # srgb rendering intent, 1 byte # 0 perceptual # 1 relative colorimetric @@ -504,7 +509,7 @@ class PngStream(ChunkStream): self.im_info["srgb"] = s[0] return s - def chunk_pHYs(self, pos, length): + def chunk_pHYs(self, pos: int, length: int) -> bytes: # pixels per unit s = ImageFile._safe_read(self.fp, length) if length < 9: @@ -521,7 +526,7 @@ class PngStream(ChunkStream): self.im_info["aspect"] = px, py return s - def chunk_tEXt(self, pos, length): + def chunk_tEXt(self, pos: int, length: int) -> bytes: # text s = ImageFile._safe_read(self.fp, length) try: @@ -540,7 +545,7 @@ class PngStream(ChunkStream): return s - def chunk_zTXt(self, pos, length): + def chunk_zTXt(self, pos: int, length: int) -> bytes: # compressed text s = ImageFile._safe_read(self.fp, length) try: @@ -574,7 +579,7 @@ class PngStream(ChunkStream): return s - def chunk_iTXt(self, pos, length): + def chunk_iTXt(self, pos: int, length: int) -> bytes: # international text r = s = ImageFile._safe_read(self.fp, length) try: @@ -614,13 +619,13 @@ class PngStream(ChunkStream): return s - def chunk_eXIf(self, pos, length): + def chunk_eXIf(self, pos: int, length: int) -> bytes: s = ImageFile._safe_read(self.fp, length) self.im_info["exif"] = b"Exif\x00\x00" + s return s # APNG chunks - def chunk_acTL(self, pos, length): + def chunk_acTL(self, pos: int, length: int) -> bytes: s = ImageFile._safe_read(self.fp, length) if length < 8: if ImageFile.LOAD_TRUNCATED_IMAGES: @@ -640,7 +645,7 @@ class PngStream(ChunkStream): self.im_custom_mimetype = "image/apng" return s - def chunk_fcTL(self, pos, length): + def chunk_fcTL(self, pos: int, length: int) -> bytes: s = ImageFile._safe_read(self.fp, length) if length < 26: if ImageFile.LOAD_TRUNCATED_IMAGES: @@ -669,7 +674,7 @@ class PngStream(ChunkStream): self.im_info["blend"] = s[25] return s - def chunk_fdAT(self, pos, length): + def chunk_fdAT(self, pos: int, length: int) -> bytes: if length < 4: if ImageFile.LOAD_TRUNCATED_IMAGES: s = ImageFile._safe_read(self.fp, length) @@ -701,7 +706,7 @@ class PngImageFile(ImageFile.ImageFile): format = "PNG" format_description = "Portable network graphics" - def _open(self): + def _open(self) -> None: if not _accept(self.fp.read(8)): msg = "not a PNG file" raise SyntaxError(msg) @@ -711,8 +716,8 @@ class PngImageFile(ImageFile.ImageFile): # # Parse headers up to the first IDAT or fDAT chunk - self.private_chunks = [] - self.png = PngStream(self.fp) + self.private_chunks: list[tuple[bytes, bytes] | tuple[bytes, bytes, bool]] = [] + self.png: PngStream | None = PngStream(self.fp) while True: # @@ -793,6 +798,7 @@ class PngImageFile(ImageFile.ImageFile): # back up to beginning of IDAT block self.fp.seek(self.tile[0][2] - 8) + assert self.png is not None self.png.verify() self.png.close() @@ -921,9 +927,10 @@ class PngImageFile(ImageFile.ImageFile): self.__idat = self.__prepare_idat # used by load_read() ImageFile.ImageFile.load_prepare(self) - def load_read(self, read_bytes): + def load_read(self, read_bytes: int) -> bytes: """internal: read more image data""" + assert self.png is not None while self.__idat == 0: # end of chunk, skip forward to next one @@ -956,6 +963,7 @@ class PngImageFile(ImageFile.ImageFile): def load_end(self) -> None: """internal: finished reading image data""" + assert self.png is not None if self.__idat != 0: self.fp.read(self.__idat) while True: @@ -1079,7 +1087,7 @@ class _idat: self.fp = fp self.chunk = chunk - def write(self, data): + def write(self, data: bytes) -> None: self.chunk(self.fp, b"IDAT", data) @@ -1091,7 +1099,7 @@ class _fdat: self.chunk = chunk self.seq_num = seq_num - def write(self, data): + def write(self, data: bytes) -> None: self.chunk(self.fp, b"fdAT", o32(self.seq_num), data) self.seq_num += 1 @@ -1436,10 +1444,10 @@ def getchunks(im, **params): class collector: data = [] - def write(self, data): + def write(self, data: bytes) -> None: pass - def append(self, chunk): + def append(self, chunk: bytes) -> None: self.data.append(chunk) def append(fp, cid, *data): diff --git a/src/PIL/WebPImagePlugin.py b/src/PIL/WebPImagePlugin.py index 4b8cfe65c..cae124e9f 100644 --- a/src/PIL/WebPImagePlugin.py +++ b/src/PIL/WebPImagePlugin.py @@ -171,7 +171,7 @@ class WebPImageFile(ImageFile.ImageFile): return super().load() - def load_seek(self, pos): + def load_seek(self, pos: int) -> None: pass def tell(self) -> int: diff --git a/src/PIL/XpmImagePlugin.py b/src/PIL/XpmImagePlugin.py index 88d14e9c2..8d56331e6 100644 --- a/src/PIL/XpmImagePlugin.py +++ b/src/PIL/XpmImagePlugin.py @@ -103,16 +103,13 @@ class XpmImageFile(ImageFile.ImageFile): self.tile = [("raw", (0, 0) + self.size, self.fp.tell(), ("P", 0, 1))] - def load_read(self, read_bytes): + def load_read(self, read_bytes: int) -> bytes: # # load all image data in one chunk xsize, ysize = self.size - s = [None] * ysize - - for i in range(ysize): - s[i] = self.fp.readline()[1 : xsize + 1].ljust(xsize) + s = [self.fp.readline()[1 : xsize + 1].ljust(xsize) for i in range(ysize)] return b"".join(s) diff --git a/src/PIL/features.py b/src/PIL/features.py index 95c6c84cc..16c749f14 100644 --- a/src/PIL/features.py +++ b/src/PIL/features.py @@ -18,7 +18,7 @@ modules = { } -def check_module(feature): +def check_module(feature: str) -> bool: """ Checks if a module is available. @@ -42,7 +42,7 @@ def check_module(feature): return False -def version_module(feature): +def version_module(feature: str) -> str | None: """ :param feature: The module to check for. :returns: @@ -54,13 +54,10 @@ def version_module(feature): module, ver = modules[feature] - if ver is None: - return None - return getattr(__import__(module, fromlist=[ver]), ver) -def get_supported_modules(): +def get_supported_modules() -> list[str]: """ :returns: A list of all supported modules. """ @@ -75,7 +72,7 @@ codecs = { } -def check_codec(feature): +def check_codec(feature: str) -> bool: """ Checks if a codec is available. @@ -92,7 +89,7 @@ def check_codec(feature): return f"{codec}_encoder" in dir(Image.core) -def version_codec(feature): +def version_codec(feature: str) -> str | None: """ :param feature: The codec to check for. :returns: @@ -113,7 +110,7 @@ def version_codec(feature): return version -def get_supported_codecs(): +def get_supported_codecs() -> list[str]: """ :returns: A list of all supported codecs. """ @@ -133,7 +130,7 @@ features = { } -def check_feature(feature): +def check_feature(feature: str) -> bool | None: """ Checks if a feature is available. @@ -157,7 +154,7 @@ def check_feature(feature): return None -def version_feature(feature): +def version_feature(feature: str) -> str | None: """ :param feature: The feature to check for. :returns: The version number as a string, or ``None`` if not available. @@ -174,14 +171,14 @@ def version_feature(feature): return getattr(__import__(module, fromlist=[ver]), ver) -def get_supported_features(): +def get_supported_features() -> list[str]: """ :returns: A list of all supported features. """ return [f for f in features if check_feature(f)] -def check(feature): +def check(feature: str) -> bool | None: """ :param feature: A module, codec, or feature name. :returns: @@ -199,7 +196,7 @@ def check(feature): return False -def version(feature): +def version(feature: str) -> str | None: """ :param feature: The module, codec, or feature to check for. @@ -215,7 +212,7 @@ def version(feature): return None -def get_supported(): +def get_supported() -> list[str]: """ :returns: A list of all supported modules, features, and codecs. """