This commit is contained in:
Andrew Murray 2025-04-20 16:09:43 +10:00 committed by GitHub
commit fd84c132c5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
13 changed files with 381 additions and 21 deletions

View File

@ -1,5 +1,6 @@
from __future__ import annotations
from io import BytesIO
from pathlib import Path
import pytest
@ -705,6 +706,58 @@ def test_apng_save_blend(tmp_path: Path) -> None:
assert im.getpixel((0, 0)) == (0, 255, 0, 255)
def test_save_all_progress() -> None:
out = BytesIO()
progress = []
def callback(state):
if state["image_filename"]:
state["image_filename"] = (
state["image_filename"].replace("\\", "/").split("Tests/images/")[-1]
)
progress.append(state)
Image.new("RGB", (1, 1)).save(out, "PNG", save_all=True, progress=callback)
assert progress == [
{
"image_index": 0,
"image_filename": None,
"completed_frames": 1,
"total_frames": 1,
}
]
out = BytesIO()
progress = []
with Image.open("Tests/images/apng/single_frame.png") as im:
with Image.open("Tests/images/apng/delay.png") as im2:
im.save(
out, "PNG", save_all=True, append_images=[im, im2], progress=callback
)
expected = []
for i in range(2):
expected.append(
{
"image_index": i,
"image_filename": "apng/single_frame.png",
"completed_frames": i + 1,
"total_frames": 7,
}
)
for i in range(5):
expected.append(
{
"image_index": 2,
"image_filename": "apng/delay.png",
"completed_frames": i + 3,
"total_frames": 7,
}
)
assert progress == expected
def test_apng_save_size(tmp_path: Path) -> None:
test_file = tmp_path / "temp.png"

View File

@ -296,6 +296,54 @@ def test_roundtrip_save_all_1(tmp_path: Path) -> None:
assert reloaded.getpixel((0, 0)) == 255
def test_save_all_progress():
out = BytesIO()
progress = []
def callback(state):
if state["image_filename"]:
state["image_filename"] = (
state["image_filename"].replace("\\", "/").split("Tests/images/")[-1]
)
progress.append(state)
Image.new("RGB", (1, 1)).save(out, "GIF", save_all=True, progress=callback)
assert progress == [
{
"image_index": 0,
"image_filename": None,
"completed_frames": 1,
"total_frames": 1,
}
]
out = BytesIO()
progress = []
with Image.open("Tests/images/chi.gif") as im2:
im = Image.new("RGB", im2.size)
im.save(out, "GIF", save_all=True, append_images=[im2], progress=callback)
expected = [
{
"image_index": 0,
"image_filename": None,
"completed_frames": 1,
"total_frames": 32,
}
]
for i in range(31):
expected.append(
{
"image_index": 1,
"image_filename": "chi.gif",
"completed_frames": i + 2,
"total_frames": 32,
}
)
assert progress == expected
@pytest.mark.parametrize(
"path, mode",
(

View File

@ -309,6 +309,48 @@ def test_save_all() -> None:
assert "mp" not in jpg.info
def test_save_all_progress() -> None:
out = BytesIO()
progress = []
def callback(state) -> None:
if state["image_filename"]:
state["image_filename"] = (
state["image_filename"].replace("\\", "/").split("Tests/images/")[-1]
)
progress.append(state)
Image.new("RGB", (1, 1)).save(out, "MPO", save_all=True, progress=callback)
assert progress == [
{
"image_index": 0,
"image_filename": None,
"completed_frames": 1,
"total_frames": 1,
}
]
out = BytesIO()
progress = []
with Image.open("Tests/images/sugarshack.mpo") as im:
with Image.open("Tests/images/frozenpond.mpo") as im2:
im.save(out, "MPO", save_all=True, append_images=[im2], progress=callback)
expected = []
for i, filename in enumerate(["sugarshack.mpo", "frozenpond.mpo"]):
for j in range(2):
expected.append(
{
"image_index": i,
"image_filename": filename,
"completed_frames": i * 2 + j + 1,
"total_frames": 4,
}
)
assert progress == expected
def test_save_xmp() -> None:
im = Image.new("RGB", (1, 1))
im2 = Image.new("RGB", (1, 1), "#f00")

View File

@ -1,11 +1,11 @@
from __future__ import annotations
import io
import os
import os.path
import tempfile
import time
from collections.abc import Generator
from io import BytesIO
from pathlib import Path
from typing import Any
@ -174,6 +174,48 @@ def test_save_all(tmp_path: Path) -> None:
assert os.path.getsize(outfile) > 0
def test_save_all_progress() -> None:
out = BytesIO()
progress = []
def callback(state):
if state["image_filename"]:
state["image_filename"] = (
state["image_filename"].replace("\\", "/").split("Tests/images/")[-1]
)
progress.append(state)
Image.new("RGB", (1, 1)).save(out, "PDF", save_all=True, progress=callback)
assert progress == [
{
"image_index": 0,
"image_filename": None,
"completed_frames": 1,
"total_frames": 1,
}
]
out = BytesIO()
progress = []
with Image.open("Tests/images/sugarshack.mpo") as im:
with Image.open("Tests/images/frozenpond.mpo") as im2:
im.save(out, "PDF", save_all=True, append_images=[im2], progress=callback)
expected = []
for i, filename in enumerate(["sugarshack.mpo", "frozenpond.mpo"]):
for j in range(2):
expected.append(
{
"image_index": i,
"image_filename": filename,
"completed_frames": i * 2 + j + 1,
"total_frames": 4,
}
)
assert progress == expected
def test_multiframe_normal_save(tmp_path: Path) -> None:
# Test saving a multiframe image without save_all
with Image.open("Tests/images/dispose_bgnd.gif") as im:
@ -329,12 +371,12 @@ def test_pdf_info(tmp_path: Path) -> None:
def test_pdf_append_to_bytesio() -> None:
im = hopper("RGB")
f = io.BytesIO()
f = BytesIO()
im.save(f, format="PDF")
initial_size = len(f.getvalue())
assert initial_size > 0
im = hopper("P")
f = io.BytesIO(f.getvalue())
f = BytesIO(f.getvalue())
im.save(f, format="PDF", append=True)
assert len(f.getvalue()) > initial_size

View File

@ -760,7 +760,7 @@ class TestFileTiff:
with Image.open(outfile) as reloaded:
assert_image_equal(im.convert("RGB"), reloaded.convert("RGB"))
def test_tiff_save_all(self) -> None:
def test_save_all(self) -> None:
mp = BytesIO()
with Image.open("Tests/images/multipage.tiff") as im:
im.save(mp, format="tiff", save_all=True)
@ -793,6 +793,57 @@ class TestFileTiff:
assert isinstance(reread, TiffImagePlugin.TiffImageFile)
assert reread.n_frames == 3
def test_save_all_progress(self) -> None:
out = BytesIO()
progress = []
def callback(state):
if state["image_filename"]:
state["image_filename"] = (
state["image_filename"]
.replace("\\", "/")
.split("Tests/images/")[-1]
)
progress.append(state)
Image.new("RGB", (1, 1)).save(out, "TIFF", save_all=True, progress=callback)
assert progress == [
{
"image_index": 0,
"image_filename": None,
"completed_frames": 1,
"total_frames": 1,
}
]
out = BytesIO()
progress = []
with Image.open("Tests/images/hopper.tif") as im:
with Image.open("Tests/images/multipage.tiff") as im2:
im.save(
out, "TIFF", save_all=True, append_images=[im2], progress=callback
)
expected = [
{
"image_index": 0,
"image_filename": "hopper.tif",
"completed_frames": 1,
"total_frames": 4,
}
]
for i in range(3):
expected.append(
{
"image_index": 1,
"image_filename": "multipage.tiff",
"completed_frames": i + 2,
"total_frames": 4,
}
)
assert progress == expected
def test_fixoffsets(self) -> None:
b = BytesIO(b"II\x2a\x00\x00\x00\x00\x00")
with TiffImagePlugin.AppendingTiffWriter(b) as a:

View File

@ -1,9 +1,9 @@
from __future__ import annotations
import io
import re
import sys
import warnings
from io import BytesIO
from pathlib import Path
from typing import Any
@ -105,10 +105,10 @@ class TestFileWebp:
def test_write_method(self, tmp_path: Path) -> None:
self._roundtrip(tmp_path, self.rgb_mode, 12.0, {"method": 6})
buffer_no_args = io.BytesIO()
buffer_no_args = BytesIO()
hopper().save(buffer_no_args, format="WEBP")
buffer_method = io.BytesIO()
buffer_method = BytesIO()
hopper().save(buffer_method, format="WEBP", method=6)
assert buffer_no_args.getbuffer() != buffer_method.getbuffer()
@ -129,6 +129,57 @@ class TestFileWebp:
with pytest.raises(ValueError):
_webp.WebPEncode(im.getim(), False, 0, 0, "", 4, 0, b"", "")
@skip_unless_feature("webp_anim")
def test_save_all_progress(self) -> None:
out = BytesIO()
progress = []
def callback(state):
if state["image_filename"]:
state["image_filename"] = (
state["image_filename"]
.replace("\\", "/")
.split("Tests/images/")[-1]
)
progress.append(state)
Image.new("RGB", (1, 1)).save(out, "WEBP", save_all=True, progress=callback)
assert progress == [
{
"image_index": 0,
"image_filename": None,
"completed_frames": 1,
"total_frames": 1,
}
]
out = BytesIO()
progress = []
with Image.open("Tests/images/iss634.webp") as im:
im2 = Image.new("RGB", im.size)
im.save(out, "WEBP", save_all=True, append_images=[im2], progress=callback)
expected = []
for i in range(42):
expected.append(
{
"image_index": 0,
"image_filename": "iss634.webp",
"completed_frames": i + 1,
"total_frames": 43,
}
)
expected.append(
{
"image_index": 1,
"image_filename": None,
"completed_frames": 43,
"total_frames": 43,
}
)
assert progress == expected
def test_icc_profile(self, tmp_path: Path) -> None:
self._roundtrip(tmp_path, self.rgb_mode, 12.5, {"icc_profile": None})
self._roundtrip(

View File

@ -25,7 +25,6 @@
#
from __future__ import annotations
import itertools
import math
import os
import subprocess
@ -656,11 +655,18 @@ def _write_multiple_frames(
duration = im.encoderinfo.get("duration")
disposal = im.encoderinfo.get("disposal", im.info.get("disposal"))
imSequences = [im] + list(im.encoderinfo.get("append_images", []))
progress = im.encoderinfo.get("progress")
if progress:
total = 0
for imSequence in imSequences:
total += getattr(imSequence, "n_frames", 1)
im_frames: list[_Frame] = []
previous_im: Image.Image | None = None
frame_count = 0
background_im = None
for imSequence in itertools.chain([im], im.encoderinfo.get("append_images", [])):
for i, imSequence in enumerate(imSequences):
for im_frame in ImageSequence.Iterator(imSequence):
# a copy is required here since seek can still mutate the image
im_frame = _normalize_mode(im_frame.copy())
@ -691,6 +697,10 @@ def _write_multiple_frames(
# This frame is identical to the previous frame
if encoderinfo.get("duration"):
im_frames[-1].encoderinfo["duration"] += encoderinfo["duration"]
if progress:
im._save_all_progress(
progress, imSequence, i, frame_count, total
)
continue
if im_frames[-1].encoderinfo.get("disposal") == 2:
# To appear correctly in viewers using a convention,
@ -754,6 +764,8 @@ def _write_multiple_frames(
bbox = None
previous_im = im_frame
im_frames.append(_Frame(diff_frame or im_frame, bbox, encoderinfo))
if progress:
im._save_all_progress(progress, imSequence, i, frame_count, total)
if len(im_frames) == 1:
if "duration" in im.encoderinfo:

View File

@ -2596,6 +2596,26 @@ class Image:
if open_fp:
fp.close()
def _save_all_progress(
self,
progress,
im: Image | None = None,
im_index: int = 0,
completed: int = 1,
total: int = 1,
) -> None:
if not progress:
return
progress(
{
"image_index": im_index,
"image_filename": getattr(im or self, "filename", None),
"completed_frames": completed,
"total_frames": total,
}
)
def seek(self, frame: int) -> None:
"""
Seeks to the given frame in this sequence file. If you seek

View File

@ -19,7 +19,6 @@
#
from __future__ import annotations
import itertools
import os
import struct
from typing import IO, Any, cast
@ -41,13 +40,21 @@ def _save(im: Image.Image, fp: IO[bytes], filename: str | bytes) -> None:
def _save_all(im: Image.Image, fp: IO[bytes], filename: str | bytes) -> None:
append_images = im.encoderinfo.get("append_images", [])
progress = im.encoderinfo.get("progress")
if not append_images and not getattr(im, "is_animated", False):
_save(im, fp, filename)
im._save_all_progress(progress)
return
mpf_offset = 28
offsets: list[int] = []
for imSequence in itertools.chain([im], append_images):
imSequences = [im] + list(append_images)
if progress:
completed = 0
total = 0
for imSequence in imSequences:
total += getattr(imSequence, "n_frames", 1)
for i, imSequence in enumerate(imSequences):
for im_frame in ImageSequence.Iterator(imSequence):
if not offsets:
# APP2 marker
@ -66,6 +73,9 @@ def _save_all(im: Image.Image, fp: IO[bytes], filename: str | bytes) -> None:
else:
im_frame.save(fp, "JPEG")
offsets.append(fp.tell() - offsets[-1])
if progress:
completed += 1
im._save_all_progress(progress, imSequence, i, completed, total)
ifd = TiffImagePlugin.ImageFileDirectory_v2()
ifd[0xB000] = b"0100"

View File

@ -254,7 +254,8 @@ def _save(
existing_pdf.write_catalog()
page_number = 0
for im_sequence in ims:
progress = im.encoderinfo.get("progress")
for i, im_sequence in enumerate(ims):
im_pages: ImageSequence.Iterator | list[Image.Image] = (
ImageSequence.Iterator(im_sequence) if save_all else [im_sequence]
)
@ -290,6 +291,9 @@ def _save(
existing_pdf.write_obj(contents_refs[page_number], stream=page_contents)
page_number += 1
im._save_all_progress(
progress, im_sequence, i, page_number, number_of_pages
)
#
# trailer

View File

@ -1168,16 +1168,21 @@ def _write_multiple_frames(
loop = im.encoderinfo.get("loop", im.info.get("loop", 0))
disposal = im.encoderinfo.get("disposal", im.info.get("disposal", Disposal.OP_NONE))
blend = im.encoderinfo.get("blend", im.info.get("blend", Blend.OP_SOURCE))
progress = im.encoderinfo.get("progress")
if default_image:
chain = itertools.chain(append_images)
else:
chain = itertools.chain([im], append_images)
imSequences = []
if not default_image:
imSequences.append(im)
imSequences += append_images
if progress:
total = 0
for imSequence in imSequences:
total += getattr(imSequence, "n_frames", 1)
im_frames: list[_Frame] = []
frame_count = 0
for im_seq in chain:
for im_frame in ImageSequence.Iterator(im_seq):
for i, imSequence in enumerate(imSequences):
for im_frame in ImageSequence.Iterator(imSequence):
if im_frame.mode == mode:
im_frame = im_frame.copy()
else:
@ -1224,10 +1229,16 @@ def _write_multiple_frames(
and "duration" in encoderinfo
):
previous.encoderinfo["duration"] += encoderinfo["duration"]
if progress:
im._save_all_progress(
progress, imSequence, i, frame_count, total
)
continue
else:
bbox = None
im_frames.append(_Frame(im_frame, bbox, encoderinfo))
if progress:
im._save_all_progress(progress, imSequence, i, frame_count, total)
if len(im_frames) == 1 and not default_image:
return im_frames[0].im

View File

@ -2302,14 +2302,23 @@ class AppendingTiffWriter(io.BytesIO):
def _save_all(im: Image.Image, fp: IO[bytes], filename: str | bytes) -> None:
progress = im.encoderinfo.get("progress")
append_images = list(im.encoderinfo.get("append_images", []))
if not hasattr(im, "n_frames") and not append_images:
return _save(im, fp, filename)
_save(im, fp, filename)
im._save_all_progress(progress)
return
cur_idx = im.tell()
imSequences = [im] + append_images
if progress:
completed = 0
total = 0
for ims in imSequences:
total += getattr(ims, "n_frames", 1)
try:
with AppendingTiffWriter(fp) as tf:
for ims in [im] + append_images:
for i, ims in enumerate(imSequences):
if not hasattr(ims, "encoderinfo"):
ims.encoderinfo = {}
if not hasattr(ims, "encoderconfig"):
@ -2320,6 +2329,10 @@ def _save_all(im: Image.Image, fp: IO[bytes], filename: str | bytes) -> None:
ims.seek(idx)
ims.load()
_save(ims, tf, filename)
if progress:
completed += 1
im._save_all_progress(progress, ims, i, completed, total)
tf.newFrame()
finally:
im.seek(cur_idx)

View File

@ -156,6 +156,7 @@ def _convert_frame(im: Image.Image) -> Image.Image:
def _save_all(im: Image.Image, fp: IO[bytes], filename: str | bytes) -> None:
encoderinfo = im.encoderinfo.copy()
progress = encoderinfo.get("progress")
append_images = list(encoderinfo.get("append_images", []))
# If total frame count is 1, then save using the legacy API, which
@ -165,6 +166,7 @@ def _save_all(im: Image.Image, fp: IO[bytes], filename: str | bytes) -> None:
total += getattr(ims, "n_frames", 1)
if total == 1:
_save(im, fp, filename)
im._save_all_progress(progress)
return
background: int | tuple[int, ...] = (0, 0, 0, 0)
@ -237,7 +239,7 @@ def _save_all(im: Image.Image, fp: IO[bytes], filename: str | bytes) -> None:
timestamp = 0
cur_idx = im.tell()
try:
for ims in [im] + append_images:
for i, ims in enumerate([im] + append_images):
# Get number of frames in this image
nfr = getattr(ims, "n_frames", 1)
@ -262,6 +264,7 @@ def _save_all(im: Image.Image, fp: IO[bytes], filename: str | bytes) -> None:
else:
timestamp += duration
frame_idx += 1
im._save_all_progress(progress, ims, i, frame_idx, total)
finally:
im.seek(cur_idx)