Merge pull request #8354 from radarhere/type_hint

This commit is contained in:
Hugo van Kemenade 2024-09-08 16:19:08 +03:00 committed by GitHub
commit f30eefaae2
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
20 changed files with 199 additions and 150 deletions

View File

@ -1011,7 +1011,9 @@ class TestFileJpeg:
# Even though this decoder never says that it is finished
# the image should still end when there is no new data
class InfiniteMockPyDecoder(ImageFile.PyDecoder):
def decode(self, buffer: bytes) -> tuple[int, int]:
def decode(
self, buffer: bytes | Image.SupportsArrayInterface
) -> tuple[int, int]:
return 0, 0
Image.register_decoder("INFINITE", InfiniteMockPyDecoder)

View File

@ -210,7 +210,7 @@ class MockPyDecoder(ImageFile.PyDecoder):
super().__init__(mode, *args)
def decode(self, buffer: bytes) -> tuple[int, int]:
def decode(self, buffer: bytes | Image.SupportsArrayInterface) -> tuple[int, int]:
# eof
return -1, 0

View File

@ -121,7 +121,7 @@ nitpicky = True
# generating warnings in “nitpicky mode”. Note that type should include the domain name
# if present. Example entries would be ('py:func', 'int') or
# ('envvar', 'LD_LIBRARY_PATH').
# nitpick_ignore = []
nitpick_ignore = [("py:class", "_io.BytesIO")]
# -- Options for HTML output ----------------------------------------------

View File

@ -257,7 +257,7 @@ class DdsImageFile(ImageFile.ImageFile):
class DXT1Decoder(ImageFile.PyDecoder):
_pulls_fd = True
def decode(self, buffer: bytes) -> tuple[int, int]:
def decode(self, buffer: bytes | Image.SupportsArrayInterface) -> tuple[int, int]:
assert self.fd is not None
try:
self.set_as_raw(_dxt1(self.fd, self.state.xsize, self.state.ysize))
@ -270,7 +270,7 @@ class DXT1Decoder(ImageFile.PyDecoder):
class DXT5Decoder(ImageFile.PyDecoder):
_pulls_fd = True
def decode(self, buffer: bytes) -> tuple[int, int]:
def decode(self, buffer: bytes | Image.SupportsArrayInterface) -> tuple[int, int]:
assert self.fd is not None
try:
self.set_as_raw(_dxt5(self.fd, self.state.xsize, self.state.ysize))

View File

@ -33,6 +33,10 @@ Internal Modules
Provides a convenient way to import type hints that are not available
on some Python versions.
.. py:class:: Buffer
Typing alias.
.. py:class:: IntegralLike
Typing alias.

257
setup.py
View File

@ -15,18 +15,20 @@ import struct
import subprocess
import sys
import warnings
from collections.abc import Iterator
from typing import Any
from setuptools import Extension, setup
from setuptools.command.build_ext import build_ext
def get_version():
def get_version() -> str:
version_file = "src/PIL/_version.py"
with open(version_file, encoding="utf-8") as f:
return f.read().split('"')[1]
configuration = {}
configuration: dict[str, list[str]] = {}
PILLOW_VERSION = get_version()
@ -143,7 +145,7 @@ class RequiredDependencyException(Exception):
PLATFORM_MINGW = os.name == "nt" and "GCC" in sys.version
def _dbg(s, tp=None):
def _dbg(s: str, tp: Any = None) -> None:
if DEBUG:
if tp:
print(s % tp)
@ -151,10 +153,13 @@ def _dbg(s, tp=None):
print(s)
def _find_library_dirs_ldconfig():
def _find_library_dirs_ldconfig() -> list[str]:
# Based on ctypes.util from Python 2
ldconfig = "ldconfig" if shutil.which("ldconfig") else "/sbin/ldconfig"
args: list[str]
env: dict[str, str]
expr: str
if sys.platform.startswith("linux") or sys.platform.startswith("gnu"):
if struct.calcsize("l") == 4:
machine = os.uname()[4] + "-32"
@ -184,13 +189,11 @@ def _find_library_dirs_ldconfig():
try:
p = subprocess.Popen(
args, stderr=subprocess.DEVNULL, stdout=subprocess.PIPE, env=env
args, stderr=subprocess.DEVNULL, stdout=subprocess.PIPE, env=env, text=True
)
except OSError: # E.g. command not found
return []
[data, _] = p.communicate()
if isinstance(data, bytes):
data = data.decode("latin1")
data = p.communicate()[0]
dirs = []
for dll in re.findall(expr, data):
@ -200,7 +203,9 @@ def _find_library_dirs_ldconfig():
return dirs
def _add_directory(path, subdir, where=None):
def _add_directory(
path: list[str], subdir: str | None, where: int | None = None
) -> None:
if subdir is None:
return
subdir = os.path.realpath(subdir)
@ -216,7 +221,7 @@ def _add_directory(path, subdir, where=None):
path.insert(where, subdir)
def _find_include_file(self, include):
def _find_include_file(self: pil_build_ext, include: str) -> int:
for directory in self.compiler.include_dirs:
_dbg("Checking for include file %s in %s", (include, directory))
if os.path.isfile(os.path.join(directory, include)):
@ -225,7 +230,7 @@ def _find_include_file(self, include):
return 0
def _find_library_file(self, library):
def _find_library_file(self: pil_build_ext, library: str) -> str | None:
ret = self.compiler.find_library_file(self.compiler.library_dirs, library)
if ret:
_dbg("Found library %s at %s", (library, ret))
@ -234,7 +239,7 @@ def _find_library_file(self, library):
return ret
def _find_include_dir(self, dirname, include):
def _find_include_dir(self: pil_build_ext, dirname: str, include: str) -> bool | str:
for directory in self.compiler.include_dirs:
_dbg("Checking for include file %s in %s", (include, directory))
if os.path.isfile(os.path.join(directory, include)):
@ -245,6 +250,7 @@ def _find_include_dir(self, dirname, include):
if os.path.isfile(os.path.join(subdir, include)):
_dbg("Found %s in %s", (include, subdir))
return subdir
return False
def _cmd_exists(cmd: str) -> bool:
@ -256,7 +262,7 @@ def _cmd_exists(cmd: str) -> bool:
)
def _pkg_config(name):
def _pkg_config(name: str) -> tuple[list[str], list[str]] | None:
command = os.environ.get("PKG_CONFIG", "pkg-config")
for keep_system in (True, False):
try:
@ -283,10 +289,11 @@ def _pkg_config(name):
return libs, cflags
except Exception:
pass
return None
class pil_build_ext(build_ext):
class feature:
class ext_feature:
features = [
"zlib",
"jpeg",
@ -301,25 +308,32 @@ class pil_build_ext(build_ext):
]
required = {"jpeg", "zlib"}
vendor = set()
vendor: set[str] = set()
def __init__(self):
def __init__(self) -> None:
self._settings: dict[str, str | bool | None] = {}
for f in self.features:
setattr(self, f, None)
self.set(f, None)
def require(self, feat):
def require(self, feat: str) -> bool:
return feat in self.required
def want(self, feat):
return getattr(self, feat) is None
def get(self, feat: str) -> str | bool | None:
return self._settings[feat]
def want_vendor(self, feat):
def set(self, feat: str, value: str | bool | None) -> None:
self._settings[feat] = value
def want(self, feat: str) -> bool:
return self._settings[feat] is None
def want_vendor(self, feat: str) -> bool:
return feat in self.vendor
def __iter__(self):
def __iter__(self) -> Iterator[str]:
yield from self.features
feature = feature()
feature = ext_feature()
user_options = (
build_ext.user_options
@ -337,10 +351,10 @@ class pil_build_ext(build_ext):
)
@staticmethod
def check_configuration(option, value):
def check_configuration(option: str, value: str) -> bool | None:
return True if value in configuration.get(option, []) else None
def initialize_options(self):
def initialize_options(self) -> None:
self.disable_platform_guessing = self.check_configuration(
"platform-guessing", "disable"
)
@ -355,7 +369,7 @@ class pil_build_ext(build_ext):
self.debug = True
self.parallel = configuration.get("parallel", [None])[-1]
def finalize_options(self):
def finalize_options(self) -> None:
build_ext.finalize_options(self)
if self.debug:
global DEBUG
@ -363,12 +377,16 @@ class pil_build_ext(build_ext):
if not self.parallel:
# If --parallel (or -j) wasn't specified, we want to reproduce the same
# behavior as before, that is, auto-detect the number of jobs.
try:
self.parallel = int(
os.environ.get("MAX_CONCURRENCY", min(4, os.cpu_count()))
)
except TypeError:
self.parallel = None
self.parallel = None
cpu_count = os.cpu_count()
if cpu_count is not None:
try:
self.parallel = int(
os.environ.get("MAX_CONCURRENCY", min(4, cpu_count))
)
except TypeError:
pass
for x in self.feature:
if getattr(self, f"disable_{x}"):
setattr(self.feature, x, False)
@ -402,7 +420,13 @@ class pil_build_ext(build_ext):
_dbg("Using vendored version of %s", x)
self.feature.vendor.add(x)
def _update_extension(self, name, libraries, define_macros=None, sources=None):
def _update_extension(
self,
name: str,
libraries: list[str] | list[str | bool | None],
define_macros: list[tuple[str, str | None]] | None = None,
sources: list[str] | None = None,
) -> None:
for extension in self.extensions:
if extension.name == name:
extension.libraries += libraries
@ -415,13 +439,13 @@ class pil_build_ext(build_ext):
extension.extra_link_args = ["--stdlib=libc++"]
break
def _remove_extension(self, name):
def _remove_extension(self, name: str) -> None:
for extension in self.extensions:
if extension.name == name:
self.extensions.remove(extension)
break
def get_macos_sdk_path(self):
def get_macos_sdk_path(self) -> str | None:
try:
sdk_path = (
subprocess.check_output(["xcrun", "--show-sdk-path"])
@ -442,9 +466,9 @@ class pil_build_ext(build_ext):
sdk_path = commandlinetools_sdk_path
return sdk_path
def build_extensions(self):
library_dirs = []
include_dirs = []
def build_extensions(self) -> None:
library_dirs: list[str] = []
include_dirs: list[str] = []
pkg_config = None
if _cmd_exists(os.environ.get("PKG_CONFIG", "pkg-config")):
@ -468,19 +492,22 @@ class pil_build_ext(build_ext):
root = globals()[root_name]
if root is None and root_name in os.environ:
prefix = os.environ[root_name]
root = (os.path.join(prefix, "lib"), os.path.join(prefix, "include"))
root_prefix = os.environ[root_name]
root = (
os.path.join(root_prefix, "lib"),
os.path.join(root_prefix, "include"),
)
if root is None and pkg_config:
if isinstance(lib_name, tuple):
if isinstance(lib_name, str):
_dbg(f"Looking for `{lib_name}` using pkg-config.")
root = pkg_config(lib_name)
else:
for lib_name2 in lib_name:
_dbg(f"Looking for `{lib_name2}` using pkg-config.")
root = pkg_config(lib_name2)
if root:
break
else:
_dbg(f"Looking for `{lib_name}` using pkg-config.")
root = pkg_config(lib_name)
if isinstance(root, tuple):
lib_root, include_root = root
@ -660,22 +687,22 @@ class pil_build_ext(build_ext):
_dbg("Looking for zlib")
if _find_include_file(self, "zlib.h"):
if _find_library_file(self, "z"):
feature.zlib = "z"
feature.set("zlib", "z")
elif sys.platform == "win32" and _find_library_file(self, "zlib"):
feature.zlib = "zlib" # alternative name
feature.set("zlib", "zlib") # alternative name
if feature.want("jpeg"):
_dbg("Looking for jpeg")
if _find_include_file(self, "jpeglib.h"):
if _find_library_file(self, "jpeg"):
feature.jpeg = "jpeg"
feature.set("jpeg", "jpeg")
elif sys.platform == "win32" and _find_library_file(self, "libjpeg"):
feature.jpeg = "libjpeg" # alternative name
feature.set("jpeg", "libjpeg") # alternative name
feature.openjpeg_version = None
feature.set("openjpeg_version", None)
if feature.want("jpeg2000"):
_dbg("Looking for jpeg2000")
best_version = None
best_version: tuple[int, ...] | None = None
best_path = None
# Find the best version
@ -705,26 +732,26 @@ class pil_build_ext(build_ext):
# <openjpeg.h> rather than having to cope with the versioned
# include path
_add_directory(self.compiler.include_dirs, best_path, 0)
feature.jpeg2000 = "openjp2"
feature.openjpeg_version = ".".join(str(x) for x in best_version)
feature.set("jpeg2000", "openjp2")
feature.set("openjpeg_version", ".".join(str(x) for x in best_version))
if feature.want("imagequant"):
_dbg("Looking for imagequant")
if _find_include_file(self, "libimagequant.h"):
if _find_library_file(self, "imagequant"):
feature.imagequant = "imagequant"
feature.set("imagequant", "imagequant")
elif _find_library_file(self, "libimagequant"):
feature.imagequant = "libimagequant"
feature.set("imagequant", "libimagequant")
if feature.want("tiff"):
_dbg("Looking for tiff")
if _find_include_file(self, "tiff.h"):
if _find_library_file(self, "tiff"):
feature.tiff = "tiff"
feature.set("tiff", "tiff")
if sys.platform in ["win32", "darwin"] and _find_library_file(
self, "libtiff"
):
feature.tiff = "libtiff"
feature.set("tiff", "libtiff")
if feature.want("freetype"):
_dbg("Looking for freetype")
@ -745,31 +772,31 @@ class pil_build_ext(build_ext):
freetype_version = 21
break
if freetype_version:
feature.freetype = "freetype"
feature.set("freetype", "freetype")
if subdir:
_add_directory(self.compiler.include_dirs, subdir, 0)
if feature.freetype and feature.want("raqm"):
if feature.get("freetype") and feature.want("raqm"):
if not feature.want_vendor("raqm"): # want system Raqm
_dbg("Looking for Raqm")
if _find_include_file(self, "raqm.h"):
if _find_library_file(self, "raqm"):
feature.raqm = "raqm"
feature.set("raqm", "raqm")
elif _find_library_file(self, "libraqm"):
feature.raqm = "libraqm"
feature.set("raqm", "libraqm")
else: # want to build Raqm from src/thirdparty
_dbg("Looking for HarfBuzz")
feature.harfbuzz = None
feature.set("harfbuzz", None)
hb_dir = _find_include_dir(self, "harfbuzz", "hb.h")
if hb_dir:
if isinstance(hb_dir, str):
_add_directory(self.compiler.include_dirs, hb_dir, 0)
if _find_library_file(self, "harfbuzz"):
feature.harfbuzz = "harfbuzz"
if feature.harfbuzz:
feature.set("harfbuzz", "harfbuzz")
if feature.get("harfbuzz"):
if not feature.want_vendor("fribidi"): # want system FriBiDi
_dbg("Looking for FriBiDi")
feature.fribidi = None
feature.set("fribidi", None)
fribidi_dir = _find_include_dir(self, "fribidi", "fribidi.h")
if fribidi_dir:
if isinstance(fribidi_dir, str):
@ -777,19 +804,19 @@ class pil_build_ext(build_ext):
self.compiler.include_dirs, fribidi_dir, 0
)
if _find_library_file(self, "fribidi"):
feature.fribidi = "fribidi"
feature.raqm = True
feature.set("fribidi", "fribidi")
feature.set("raqm", True)
else: # want to build FriBiDi shim from src/thirdparty
feature.raqm = True
feature.set("raqm", True)
if feature.want("lcms"):
_dbg("Looking for lcms")
if _find_include_file(self, "lcms2.h"):
if _find_library_file(self, "lcms2"):
feature.lcms = "lcms2"
feature.set("lcms", "lcms2")
elif _find_library_file(self, "lcms2_static"):
# alternate Windows name.
feature.lcms = "lcms2_static"
feature.set("lcms", "lcms2_static")
if feature.want("webp"):
_dbg("Looking for webp")
@ -803,17 +830,17 @@ class pil_build_ext(build_ext):
_find_library_file(self, prefix + library)
for library in ("webp", "webpmux", "webpdemux")
):
feature.webp = prefix + "webp"
feature.set("webp", prefix + "webp")
break
if feature.want("xcb"):
_dbg("Looking for xcb")
if _find_include_file(self, "xcb/xcb.h"):
if _find_library_file(self, "xcb"):
feature.xcb = "xcb"
feature.set("xcb", "xcb")
for f in feature:
if not getattr(feature, f) and feature.require(f):
if not feature.get(f) and feature.require(f):
if f in ("jpeg", "zlib"):
raise RequiredDependencyException(f)
raise DependencyException(f)
@ -821,10 +848,11 @@ class pil_build_ext(build_ext):
#
# core library
libs = self.add_imaging_libs.split()
defs = []
if feature.tiff:
libs.append(feature.tiff)
libs: list[str | bool | None] = []
libs.extend(self.add_imaging_libs.split())
defs: list[tuple[str, str | None]] = []
if feature.get("tiff"):
libs.append(feature.get("tiff"))
defs.append(("HAVE_LIBTIFF", None))
if sys.platform == "win32":
# This define needs to be defined if-and-only-if it was defined
@ -832,22 +860,22 @@ class pil_build_ext(build_ext):
# so we have to guess; by default it is defined in all Windows builds.
# See #4237, #5243, #5359 for more information.
defs.append(("USE_WIN32_FILEIO", None))
if feature.jpeg:
libs.append(feature.jpeg)
if feature.get("jpeg"):
libs.append(feature.get("jpeg"))
defs.append(("HAVE_LIBJPEG", None))
if feature.jpeg2000:
libs.append(feature.jpeg2000)
if feature.get("jpeg2000"):
libs.append(feature.get("jpeg2000"))
defs.append(("HAVE_OPENJPEG", None))
if sys.platform == "win32" and not PLATFORM_MINGW:
defs.append(("OPJ_STATIC", None))
if feature.zlib:
libs.append(feature.zlib)
if feature.get("zlib"):
libs.append(feature.get("zlib"))
defs.append(("HAVE_LIBZ", None))
if feature.imagequant:
libs.append(feature.imagequant)
if feature.get("imagequant"):
libs.append(feature.get("imagequant"))
defs.append(("HAVE_LIBIMAGEQUANT", None))
if feature.xcb:
libs.append(feature.xcb)
if feature.get("xcb"):
libs.append(feature.get("xcb"))
defs.append(("HAVE_XCB", None))
if sys.platform == "win32":
libs.extend(["kernel32", "user32", "gdi32"])
@ -861,22 +889,22 @@ class pil_build_ext(build_ext):
#
# additional libraries
if feature.freetype:
if feature.get("freetype"):
srcs = []
libs = ["freetype"]
defs = []
if feature.raqm:
if feature.get("raqm"):
if not feature.want_vendor("raqm"): # using system Raqm
defs.append(("HAVE_RAQM", None))
defs.append(("HAVE_RAQM_SYSTEM", None))
libs.append(feature.raqm)
libs.append(feature.get("raqm"))
else: # building Raqm from src/thirdparty
defs.append(("HAVE_RAQM", None))
srcs.append("src/thirdparty/raqm/raqm.c")
libs.append(feature.harfbuzz)
libs.append(feature.get("harfbuzz"))
if not feature.want_vendor("fribidi"): # using system FriBiDi
defs.append(("HAVE_FRIBIDI_SYSTEM", None))
libs.append(feature.fribidi)
libs.append(feature.get("fribidi"))
else: # building FriBiDi shim from src/thirdparty
srcs.append("src/thirdparty/fribidi-shim/fribidi.c")
self._update_extension("PIL._imagingft", libs, defs, srcs)
@ -884,16 +912,17 @@ class pil_build_ext(build_ext):
else:
self._remove_extension("PIL._imagingft")
if feature.lcms:
extra = []
if feature.get("lcms"):
libs = [feature.get("lcms")]
if sys.platform == "win32":
extra.extend(["user32", "gdi32"])
self._update_extension("PIL._imagingcms", [feature.lcms] + extra)
libs.extend(["user32", "gdi32"])
self._update_extension("PIL._imagingcms", libs)
else:
self._remove_extension("PIL._imagingcms")
if feature.webp:
libs = [feature.webp, feature.webp + "mux", feature.webp + "demux"]
webp = feature.get("webp")
if isinstance(webp, str):
libs = [webp, webp + "mux", webp + "demux"]
self._update_extension("PIL._webp", libs)
else:
self._remove_extension("PIL._webp")
@ -908,14 +937,14 @@ class pil_build_ext(build_ext):
self.summary_report(feature)
def summary_report(self, feature):
def summary_report(self, feature: ext_feature) -> None:
print("-" * 68)
print("PIL SETUP SUMMARY")
print("-" * 68)
print(f"version Pillow {PILLOW_VERSION}")
v = sys.version.split("[")
print(f"platform {sys.platform} {v[0].strip()}")
for v in v[1:]:
version = sys.version.split("[")
print(f"platform {sys.platform} {version[0].strip()}")
for v in version[1:]:
print(f" [{v.strip()}")
print("-" * 68)
@ -926,16 +955,20 @@ class pil_build_ext(build_ext):
raqm_extra_info += ", FriBiDi shim"
options = [
(feature.jpeg, "JPEG"),
(feature.jpeg2000, "OPENJPEG (JPEG2000)", feature.openjpeg_version),
(feature.zlib, "ZLIB (PNG/ZIP)"),
(feature.imagequant, "LIBIMAGEQUANT"),
(feature.tiff, "LIBTIFF"),
(feature.freetype, "FREETYPE2"),
(feature.raqm, "RAQM (Text shaping)", raqm_extra_info),
(feature.lcms, "LITTLECMS2"),
(feature.webp, "WEBP"),
(feature.xcb, "XCB (X protocol)"),
(feature.get("jpeg"), "JPEG"),
(
feature.get("jpeg2000"),
"OPENJPEG (JPEG2000)",
feature.get("openjpeg_version"),
),
(feature.get("zlib"), "ZLIB (PNG/ZIP)"),
(feature.get("imagequant"), "LIBIMAGEQUANT"),
(feature.get("tiff"), "LIBTIFF"),
(feature.get("freetype"), "FREETYPE2"),
(feature.get("raqm"), "RAQM (Text shaping)", raqm_extra_info),
(feature.get("lcms"), "LITTLECMS2"),
(feature.get("webp"), "WEBP"),
(feature.get("xcb"), "XCB (X protocol)"),
]
all = 1
@ -964,7 +997,7 @@ class pil_build_ext(build_ext):
print("")
def debug_build():
def debug_build() -> bool:
return hasattr(sys, "gettotalrefcount") or FUZZING_BUILD

View File

@ -279,7 +279,7 @@ class BlpImageFile(ImageFile.ImageFile):
class _BLPBaseDecoder(ImageFile.PyDecoder):
_pulls_fd = True
def decode(self, buffer: bytes) -> tuple[int, int]:
def decode(self, buffer: bytes | Image.SupportsArrayInterface) -> tuple[int, int]:
try:
self._read_blp_header()
self._load()

View File

@ -321,7 +321,7 @@ class BmpImageFile(ImageFile.ImageFile):
class BmpRleDecoder(ImageFile.PyDecoder):
_pulls_fd = True
def decode(self, buffer: bytes) -> tuple[int, int]:
def decode(self, buffer: bytes | Image.SupportsArrayInterface) -> tuple[int, int]:
assert self.fd is not None
rle4 = self.args[1]
data = bytearray()

View File

@ -481,7 +481,7 @@ class DdsImageFile(ImageFile.ImageFile):
class DdsRgbDecoder(ImageFile.PyDecoder):
_pulls_fd = True
def decode(self, buffer: bytes) -> tuple[int, int]:
def decode(self, buffer: bytes | Image.SupportsArrayInterface) -> tuple[int, int]:
assert self.fd is not None
bitcount, masks = self.args

View File

@ -126,7 +126,7 @@ class FitsImageFile(ImageFile.ImageFile):
class FitsGzipDecoder(ImageFile.PyDecoder):
_pulls_fd = True
def decode(self, buffer: bytes) -> tuple[int, int]:
def decode(self, buffer: bytes | Image.SupportsArrayInterface) -> tuple[int, int]:
assert self.fd is not None
value = gzip.decompress(self.fd.read())

View File

@ -29,7 +29,6 @@ import itertools
import math
import os
import subprocess
import sys
from enum import IntEnum
from functools import cached_property
from typing import IO, TYPE_CHECKING, Any, Literal, NamedTuple, Union
@ -49,6 +48,7 @@ from ._binary import o16le as o16
if TYPE_CHECKING:
from . import _imaging
from ._typing import Buffer
class LoadingStrategy(IntEnum):
@ -1157,18 +1157,9 @@ def getdata(
class Collector(BytesIO):
data = []
if sys.version_info >= (3, 12):
from collections.abc import Buffer
def write(self, data: Buffer) -> int:
self.data.append(data)
return len(data)
else:
def write(self, data: Any) -> int:
self.data.append(data)
return len(data)
def write(self, data: Buffer) -> int:
self.data.append(data)
return len(data)
im.load() # make sure raster data is available

View File

@ -856,7 +856,10 @@ class Image:
)
def frombytes(
self, data: bytes | bytearray, decoder_name: str = "raw", *args: Any
self,
data: bytes | bytearray | SupportsArrayInterface,
decoder_name: str = "raw",
*args: Any,
) -> None:
"""
Loads this image with pixel data from a bytes object.
@ -3145,7 +3148,7 @@ def new(
def frombytes(
mode: str,
size: tuple[int, int],
data: bytes | bytearray,
data: bytes | bytearray | SupportsArrayInterface,
decoder_name: str = "raw",
*args: Any,
) -> Image:
@ -3189,7 +3192,11 @@ def frombytes(
def frombuffer(
mode: str, size: tuple[int, int], data, decoder_name: str = "raw", *args: Any
mode: str,
size: tuple[int, int],
data: bytes | SupportsArrayInterface,
decoder_name: str = "raw",
*args: Any,
) -> Image:
"""
Creates an image memory referencing pixel data in a byte buffer.

View File

@ -733,7 +733,7 @@ class PyDecoder(PyCodec):
def pulls_fd(self) -> bool:
return self._pulls_fd
def decode(self, buffer: bytes) -> tuple[int, int]:
def decode(self, buffer: bytes | Image.SupportsArrayInterface) -> tuple[int, int]:
"""
Override to perform the decoding process.

View File

@ -112,7 +112,7 @@ class MspDecoder(ImageFile.PyDecoder):
_pulls_fd = True
def decode(self, buffer: bytes) -> tuple[int, int]:
def decode(self, buffer: bytes | Image.SupportsArrayInterface) -> tuple[int, int]:
assert self.fd is not None
img = io.BytesIO()

View File

@ -284,7 +284,7 @@ class PpmPlainDecoder(ImageFile.PyDecoder):
break
return data
def decode(self, buffer: bytes) -> tuple[int, int]:
def decode(self, buffer: bytes | Image.SupportsArrayInterface) -> tuple[int, int]:
self._comment_spans = False
if self.mode == "1":
data = self._decode_bitonal()
@ -300,7 +300,7 @@ class PpmPlainDecoder(ImageFile.PyDecoder):
class PpmDecoder(ImageFile.PyDecoder):
_pulls_fd = True
def decode(self, buffer: bytes) -> tuple[int, int]:
def decode(self, buffer: bytes | Image.SupportsArrayInterface) -> tuple[int, int]:
assert self.fd is not None
data = bytearray()

View File

@ -47,7 +47,7 @@ class QoiDecoder(ImageFile.PyDecoder):
hash_value = (r * 3 + g * 5 + b * 7 + a * 11) % 64
self._previously_seen_pixels[hash_value] = value
def decode(self, buffer: bytes) -> tuple[int, int]:
def decode(self, buffer: bytes | Image.SupportsArrayInterface) -> tuple[int, int]:
assert self.fd is not None
self._previously_seen_pixels = {}

View File

@ -214,7 +214,7 @@ def _save(im: Image.Image, fp: IO[bytes], filename: str | bytes) -> None:
class SGI16Decoder(ImageFile.PyDecoder):
_pulls_fd = True
def decode(self, buffer: bytes) -> tuple[int, int]:
def decode(self, buffer: bytes | Image.SupportsArrayInterface) -> tuple[int, int]:
assert self.fd is not None
assert self.im is not None

View File

@ -62,7 +62,7 @@ from ._util import is_path
from .TiffTags import TYPES
if TYPE_CHECKING:
from ._typing import IntegralLike
from ._typing import Buffer, IntegralLike
logger = logging.getLogger(__name__)
@ -1646,7 +1646,7 @@ SAVE_INFO = {
}
def _save(im: Image.Image, fp, filename: str | bytes) -> None:
def _save(im: Image.Image, fp: IO[bytes], filename: str | bytes) -> None:
try:
rawmode, prefix, photo, format, bits, extra = SAVE_INFO[im.mode]
except KeyError as e:
@ -1957,7 +1957,7 @@ def _save(im: Image.Image, fp, filename: str | bytes) -> None:
setattr(im, "_debug_multipage", ifd)
class AppendingTiffWriter:
class AppendingTiffWriter(io.BytesIO):
fieldSizes = [
0, # None
1, # byte
@ -2067,6 +2067,12 @@ class AppendingTiffWriter:
return self.f.tell() - self.offsetOfNewPage
def seek(self, offset: int, whence: int = io.SEEK_SET) -> int:
"""
:param offset: Distance to seek.
:param whence: Whether the distance is relative to the start,
end or current position.
:returns: The resulting position, relative to the start.
"""
if whence == os.SEEK_SET:
offset += self.offsetOfNewPage
@ -2100,7 +2106,7 @@ class AppendingTiffWriter:
num_tags = self.readShort()
self.f.seek(num_tags * 12, os.SEEK_CUR)
def write(self, data: bytes) -> int | None:
def write(self, data: Buffer, /) -> int:
return self.f.write(data)
def readShort(self) -> int:
@ -2142,7 +2148,8 @@ class AppendingTiffWriter:
def close(self) -> None:
self.finalize()
self.f.close()
if self.close_fp:
self.f.close()
def fixIFD(self) -> None:
num_tags = self.readShort()

View File

@ -15,6 +15,11 @@ if TYPE_CHECKING:
except (ImportError, AttributeError):
pass
if sys.version_info >= (3, 12):
from collections.abc import Buffer
else:
Buffer = Any
if sys.version_info >= (3, 10):
from typing import TypeGuard
else:
@ -40,4 +45,4 @@ class SupportsRead(Protocol[_T_co]):
StrOrBytesPath = Union[str, bytes, "os.PathLike[str]", "os.PathLike[bytes]"]
__all__ = ["IntegralLike", "StrOrBytesPath", "SupportsRead", "TypeGuard"]
__all__ = ["Buffer", "IntegralLike", "StrOrBytesPath", "SupportsRead", "TypeGuard"]

View File

@ -36,4 +36,4 @@ deps =
extras =
typing
commands =
mypy docs src winbuild Tests {posargs}
mypy conftest.py selftest.py setup.py docs src winbuild Tests {posargs}