From a3f93b3f68804d7c459faee165edf021e54f850a Mon Sep 17 00:00:00 2001 From: Andrew Murray Date: Wed, 17 Jul 2024 16:23:29 +1000 Subject: [PATCH] Changed ContainerIO to subclass IO --- Tests/test_file_container.py | 91 +++++++++++++++++++++++++++++++----- pyproject.toml | 1 - src/PIL/ContainerIO.py | 72 ++++++++++++++++++++++++---- src/PIL/TarIO.py | 10 ---- 4 files changed, 142 insertions(+), 32 deletions(-) diff --git a/Tests/test_file_container.py b/Tests/test_file_container.py index 7f76fb47a..237045acc 100644 --- a/Tests/test_file_container.py +++ b/Tests/test_file_container.py @@ -1,7 +1,5 @@ from __future__ import annotations -from typing import Literal - import pytest from PIL import ContainerIO, Image @@ -23,6 +21,13 @@ def test_isatty() -> None: assert container.isatty() is False +def test_seekable() -> None: + with hopper() as im: + container = ContainerIO.ContainerIO(im, 0, 0) + + assert container.seekable() is True + + @pytest.mark.parametrize( "mode, expected_position", ( @@ -31,7 +36,7 @@ def test_isatty() -> None: (2, 100), ), ) -def test_seek_mode(mode: Literal[0, 1, 2], expected_position: int) -> None: +def test_seek_mode(mode: int, expected_position: int) -> None: # Arrange with open(TEST_FILE, "rb") as fh: container = ContainerIO.ContainerIO(fh, 22, 100) @@ -44,6 +49,14 @@ def test_seek_mode(mode: Literal[0, 1, 2], expected_position: int) -> None: assert container.tell() == expected_position +@pytest.mark.parametrize("bytesmode", (True, False)) +def test_readable(bytesmode: bool) -> None: + with open(TEST_FILE, "rb" if bytesmode else "r") as fh: + container = ContainerIO.ContainerIO(fh, 0, 120) + + assert container.readable() is True + + @pytest.mark.parametrize("bytesmode", (True, False)) def test_read_n0(bytesmode: bool) -> None: # Arrange @@ -51,7 +64,7 @@ def test_read_n0(bytesmode: bool) -> None: container = ContainerIO.ContainerIO(fh, 22, 100) # Act - container.seek(81) + assert container.seek(81) == 81 data = container.read() # Assert @@ -67,7 +80,7 @@ def test_read_n(bytesmode: bool) -> None: container = ContainerIO.ContainerIO(fh, 22, 100) # Act - container.seek(81) + assert container.seek(81) == 81 data = container.read(3) # Assert @@ -83,7 +96,7 @@ def test_read_eof(bytesmode: bool) -> None: container = ContainerIO.ContainerIO(fh, 22, 100) # Act - container.seek(100) + assert container.seek(100) == 100 data = container.read() # Assert @@ -94,21 +107,65 @@ def test_read_eof(bytesmode: bool) -> None: @pytest.mark.parametrize("bytesmode", (True, False)) def test_readline(bytesmode: bool) -> None: - # Arrange with open(TEST_FILE, "rb" if bytesmode else "r") as fh: container = ContainerIO.ContainerIO(fh, 0, 120) - # Act data = container.readline() - - # Assert if bytesmode: data = data.decode() assert data == "This is line 1\n" + data = container.readline(4) + if bytesmode: + data = data.decode() + assert data == "This" + @pytest.mark.parametrize("bytesmode", (True, False)) def test_readlines(bytesmode: bool) -> None: + expected = [ + "This is line 1\n", + "This is line 2\n", + "This is line 3\n", + "This is line 4\n", + "This is line 5\n", + "This is line 6\n", + "This is line 7\n", + "This is line 8\n", + ] + with open(TEST_FILE, "rb" if bytesmode else "r") as fh: + container = ContainerIO.ContainerIO(fh, 0, 120) + + data = container.readlines() + if bytesmode: + data = [line.decode() for line in data] + assert data == expected + + assert container.seek(0) == 0 + + data = container.readlines(2) + if bytesmode: + data = [line.decode() for line in data] + assert data == expected[:2] + + +@pytest.mark.parametrize("bytesmode", (True, False)) +def test_write(bytesmode: bool) -> None: + with open(TEST_FILE, "rb" if bytesmode else "r") as fh: + container = ContainerIO.ContainerIO(fh, 0, 120) + + assert container.writable() is False + + with pytest.raises(NotImplementedError): + container.write(b"" if bytesmode else "") + with pytest.raises(NotImplementedError): + container.writelines([]) + with pytest.raises(NotImplementedError): + container.truncate() + + +@pytest.mark.parametrize("bytesmode", (True, False)) +def test_iter(bytesmode: bool) -> None: # Arrange expected = [ "This is line 1\n", @@ -124,9 +181,21 @@ def test_readlines(bytesmode: bool) -> None: container = ContainerIO.ContainerIO(fh, 0, 120) # Act - data = container.readlines() + data = [] + for line in container: + data.append(line) # Assert if bytesmode: data = [line.decode() for line in data] assert data == expected + + +@pytest.mark.parametrize("bytesmode", (True, False)) +def test_file(bytesmode: bool) -> None: + with open(TEST_FILE, "rb" if bytesmode else "r") as fh: + container = ContainerIO.ContainerIO(fh, 0, 120) + + assert isinstance(container.fileno(), int) + container.flush() + container.close() diff --git a/pyproject.toml b/pyproject.toml index cd7248669..b76f3c24d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -161,5 +161,4 @@ exclude = [ '^Tests/test_qt_image_qapplication.py$', '^Tests/test_font_pcf_charsets.py$', '^Tests/test_font_pcf.py$', - '^Tests/test_file_tar.py$', ] diff --git a/src/PIL/ContainerIO.py b/src/PIL/ContainerIO.py index 0035296a4..ec9e66c71 100644 --- a/src/PIL/ContainerIO.py +++ b/src/PIL/ContainerIO.py @@ -16,10 +16,11 @@ from __future__ import annotations import io -from typing import IO, AnyStr, Generic, Literal +from collections.abc import Iterable +from typing import IO, AnyStr, NoReturn -class ContainerIO(Generic[AnyStr]): +class ContainerIO(IO[AnyStr]): """ A file object that provides read access to a part of an existing file (for example a TAR file). @@ -45,7 +46,10 @@ class ContainerIO(Generic[AnyStr]): def isatty(self) -> bool: return False - def seek(self, offset: int, mode: Literal[0, 1, 2] = io.SEEK_SET) -> None: + def seekable(self) -> bool: + return True + + def seek(self, offset: int, mode: int = io.SEEK_SET) -> int: """ Move file pointer. @@ -53,6 +57,7 @@ class ContainerIO(Generic[AnyStr]): :param mode: Starting position. Use 0 for beginning of region, 1 for current offset, and 2 for end of region. You cannot move the pointer outside the defined region. + :returns: Offset from start of region, in bytes. """ if mode == 1: self.pos = self.pos + offset @@ -63,6 +68,7 @@ class ContainerIO(Generic[AnyStr]): # clamp self.pos = max(0, min(self.pos, self.length)) self.fh.seek(self.offset + self.pos) + return self.pos def tell(self) -> int: """ @@ -72,27 +78,32 @@ class ContainerIO(Generic[AnyStr]): """ return self.pos - def read(self, n: int = 0) -> AnyStr: + def readable(self) -> bool: + return True + + def read(self, n: int = -1) -> AnyStr: """ Read data. - :param n: Number of bytes to read. If omitted or zero, + :param n: Number of bytes to read. If omitted, zero or negative, read until end of region. :returns: An 8-bit string. """ - if n: + if n > 0: n = min(n, self.length - self.pos) else: n = self.length - self.pos - if not n: # EOF + if n <= 0: # EOF return b"" if "b" in self.fh.mode else "" # type: ignore[return-value] self.pos = self.pos + n return self.fh.read(n) - def readline(self) -> AnyStr: + def readline(self, n: int = -1) -> AnyStr: """ Read a line of text. + :param n: Number of bytes to read. If omitted, zero or negative, + read until end of line. :returns: An 8-bit string. """ s: AnyStr = b"" if "b" in self.fh.mode else "" # type: ignore[assignment] @@ -102,14 +113,16 @@ class ContainerIO(Generic[AnyStr]): if not c: break s = s + c - if c == newline_character: + if c == newline_character or len(s) == n: break return s - def readlines(self) -> list[AnyStr]: + def readlines(self, n: int | None = -1) -> list[AnyStr]: """ Read multiple lines of text. + :param n: Number of lines to read. If omitted, zero, negative or None, + read until end of region. :returns: A list of 8-bit strings. """ lines = [] @@ -118,4 +131,43 @@ class ContainerIO(Generic[AnyStr]): if not s: break lines.append(s) + if len(lines) == n: + break return lines + + def writable(self) -> bool: + return False + + def write(self, b: AnyStr) -> NoReturn: + raise NotImplementedError() + + def writelines(self, lines: Iterable[AnyStr]) -> NoReturn: + raise NotImplementedError() + + def truncate(self, size: int | None = None) -> int: + raise NotImplementedError() + + def __enter__(self) -> ContainerIO[AnyStr]: + return self + + def __exit__(self, *args: object) -> None: + self.close() + + def __iter__(self) -> ContainerIO[AnyStr]: + return self + + def __next__(self) -> AnyStr: + line = self.readline() + if not line: + msg = "end of region" + raise StopIteration(msg) + return line + + def fileno(self) -> int: + return self.fh.fileno() + + def flush(self) -> None: + self.fh.flush() + + def close(self) -> None: + self.fh.close() diff --git a/src/PIL/TarIO.py b/src/PIL/TarIO.py index cba26d4b0..779288b1c 100644 --- a/src/PIL/TarIO.py +++ b/src/PIL/TarIO.py @@ -55,13 +55,3 @@ class TarIO(ContainerIO.ContainerIO[bytes]): # Open region super().__init__(self.fh, self.fh.tell(), size) - - # Context manager support - def __enter__(self) -> TarIO: - return self - - def __exit__(self, *args: object) -> None: - self.close() - - def close(self) -> None: - self.fh.close()