Changed ContainerIO to subclass IO

This commit is contained in:
Andrew Murray 2024-07-17 16:23:29 +10:00
parent 86f4cfbbf1
commit a3f93b3f68
4 changed files with 142 additions and 32 deletions

View File

@ -1,7 +1,5 @@
from __future__ import annotations from __future__ import annotations
from typing import Literal
import pytest import pytest
from PIL import ContainerIO, Image from PIL import ContainerIO, Image
@ -23,6 +21,13 @@ def test_isatty() -> None:
assert container.isatty() is False assert container.isatty() is False
def test_seekable() -> None:
with hopper() as im:
container = ContainerIO.ContainerIO(im, 0, 0)
assert container.seekable() is True
@pytest.mark.parametrize( @pytest.mark.parametrize(
"mode, expected_position", "mode, expected_position",
( (
@ -31,7 +36,7 @@ def test_isatty() -> None:
(2, 100), (2, 100),
), ),
) )
def test_seek_mode(mode: Literal[0, 1, 2], expected_position: int) -> None: def test_seek_mode(mode: int, expected_position: int) -> None:
# Arrange # Arrange
with open(TEST_FILE, "rb") as fh: with open(TEST_FILE, "rb") as fh:
container = ContainerIO.ContainerIO(fh, 22, 100) container = ContainerIO.ContainerIO(fh, 22, 100)
@ -44,6 +49,14 @@ def test_seek_mode(mode: Literal[0, 1, 2], expected_position: int) -> None:
assert container.tell() == expected_position assert container.tell() == expected_position
@pytest.mark.parametrize("bytesmode", (True, False))
def test_readable(bytesmode: bool) -> None:
with open(TEST_FILE, "rb" if bytesmode else "r") as fh:
container = ContainerIO.ContainerIO(fh, 0, 120)
assert container.readable() is True
@pytest.mark.parametrize("bytesmode", (True, False)) @pytest.mark.parametrize("bytesmode", (True, False))
def test_read_n0(bytesmode: bool) -> None: def test_read_n0(bytesmode: bool) -> None:
# Arrange # Arrange
@ -51,7 +64,7 @@ def test_read_n0(bytesmode: bool) -> None:
container = ContainerIO.ContainerIO(fh, 22, 100) container = ContainerIO.ContainerIO(fh, 22, 100)
# Act # Act
container.seek(81) assert container.seek(81) == 81
data = container.read() data = container.read()
# Assert # Assert
@ -67,7 +80,7 @@ def test_read_n(bytesmode: bool) -> None:
container = ContainerIO.ContainerIO(fh, 22, 100) container = ContainerIO.ContainerIO(fh, 22, 100)
# Act # Act
container.seek(81) assert container.seek(81) == 81
data = container.read(3) data = container.read(3)
# Assert # Assert
@ -83,7 +96,7 @@ def test_read_eof(bytesmode: bool) -> None:
container = ContainerIO.ContainerIO(fh, 22, 100) container = ContainerIO.ContainerIO(fh, 22, 100)
# Act # Act
container.seek(100) assert container.seek(100) == 100
data = container.read() data = container.read()
# Assert # Assert
@ -94,21 +107,65 @@ def test_read_eof(bytesmode: bool) -> None:
@pytest.mark.parametrize("bytesmode", (True, False)) @pytest.mark.parametrize("bytesmode", (True, False))
def test_readline(bytesmode: bool) -> None: def test_readline(bytesmode: bool) -> None:
# Arrange
with open(TEST_FILE, "rb" if bytesmode else "r") as fh: with open(TEST_FILE, "rb" if bytesmode else "r") as fh:
container = ContainerIO.ContainerIO(fh, 0, 120) container = ContainerIO.ContainerIO(fh, 0, 120)
# Act
data = container.readline() data = container.readline()
# Assert
if bytesmode: if bytesmode:
data = data.decode() data = data.decode()
assert data == "This is line 1\n" assert data == "This is line 1\n"
data = container.readline(4)
if bytesmode:
data = data.decode()
assert data == "This"
@pytest.mark.parametrize("bytesmode", (True, False)) @pytest.mark.parametrize("bytesmode", (True, False))
def test_readlines(bytesmode: bool) -> None: def test_readlines(bytesmode: bool) -> None:
expected = [
"This is line 1\n",
"This is line 2\n",
"This is line 3\n",
"This is line 4\n",
"This is line 5\n",
"This is line 6\n",
"This is line 7\n",
"This is line 8\n",
]
with open(TEST_FILE, "rb" if bytesmode else "r") as fh:
container = ContainerIO.ContainerIO(fh, 0, 120)
data = container.readlines()
if bytesmode:
data = [line.decode() for line in data]
assert data == expected
assert container.seek(0) == 0
data = container.readlines(2)
if bytesmode:
data = [line.decode() for line in data]
assert data == expected[:2]
@pytest.mark.parametrize("bytesmode", (True, False))
def test_write(bytesmode: bool) -> None:
with open(TEST_FILE, "rb" if bytesmode else "r") as fh:
container = ContainerIO.ContainerIO(fh, 0, 120)
assert container.writable() is False
with pytest.raises(NotImplementedError):
container.write(b"" if bytesmode else "")
with pytest.raises(NotImplementedError):
container.writelines([])
with pytest.raises(NotImplementedError):
container.truncate()
@pytest.mark.parametrize("bytesmode", (True, False))
def test_iter(bytesmode: bool) -> None:
# Arrange # Arrange
expected = [ expected = [
"This is line 1\n", "This is line 1\n",
@ -124,9 +181,21 @@ def test_readlines(bytesmode: bool) -> None:
container = ContainerIO.ContainerIO(fh, 0, 120) container = ContainerIO.ContainerIO(fh, 0, 120)
# Act # Act
data = container.readlines() data = []
for line in container:
data.append(line)
# Assert # Assert
if bytesmode: if bytesmode:
data = [line.decode() for line in data] data = [line.decode() for line in data]
assert data == expected assert data == expected
@pytest.mark.parametrize("bytesmode", (True, False))
def test_file(bytesmode: bool) -> None:
with open(TEST_FILE, "rb" if bytesmode else "r") as fh:
container = ContainerIO.ContainerIO(fh, 0, 120)
assert isinstance(container.fileno(), int)
container.flush()
container.close()

View File

@ -161,5 +161,4 @@ exclude = [
'^Tests/test_qt_image_qapplication.py$', '^Tests/test_qt_image_qapplication.py$',
'^Tests/test_font_pcf_charsets.py$', '^Tests/test_font_pcf_charsets.py$',
'^Tests/test_font_pcf.py$', '^Tests/test_font_pcf.py$',
'^Tests/test_file_tar.py$',
] ]

View File

@ -16,10 +16,11 @@
from __future__ import annotations from __future__ import annotations
import io import io
from typing import IO, AnyStr, Generic, Literal from collections.abc import Iterable
from typing import IO, AnyStr, NoReturn
class ContainerIO(Generic[AnyStr]): class ContainerIO(IO[AnyStr]):
""" """
A file object that provides read access to a part of an existing A file object that provides read access to a part of an existing
file (for example a TAR file). file (for example a TAR file).
@ -45,7 +46,10 @@ class ContainerIO(Generic[AnyStr]):
def isatty(self) -> bool: def isatty(self) -> bool:
return False return False
def seek(self, offset: int, mode: Literal[0, 1, 2] = io.SEEK_SET) -> None: def seekable(self) -> bool:
return True
def seek(self, offset: int, mode: int = io.SEEK_SET) -> int:
""" """
Move file pointer. Move file pointer.
@ -53,6 +57,7 @@ class ContainerIO(Generic[AnyStr]):
:param mode: Starting position. Use 0 for beginning of region, 1 :param mode: Starting position. Use 0 for beginning of region, 1
for current offset, and 2 for end of region. You cannot move for current offset, and 2 for end of region. You cannot move
the pointer outside the defined region. the pointer outside the defined region.
:returns: Offset from start of region, in bytes.
""" """
if mode == 1: if mode == 1:
self.pos = self.pos + offset self.pos = self.pos + offset
@ -63,6 +68,7 @@ class ContainerIO(Generic[AnyStr]):
# clamp # clamp
self.pos = max(0, min(self.pos, self.length)) self.pos = max(0, min(self.pos, self.length))
self.fh.seek(self.offset + self.pos) self.fh.seek(self.offset + self.pos)
return self.pos
def tell(self) -> int: def tell(self) -> int:
""" """
@ -72,27 +78,32 @@ class ContainerIO(Generic[AnyStr]):
""" """
return self.pos return self.pos
def read(self, n: int = 0) -> AnyStr: def readable(self) -> bool:
return True
def read(self, n: int = -1) -> AnyStr:
""" """
Read data. Read data.
:param n: Number of bytes to read. If omitted or zero, :param n: Number of bytes to read. If omitted, zero or negative,
read until end of region. read until end of region.
:returns: An 8-bit string. :returns: An 8-bit string.
""" """
if n: if n > 0:
n = min(n, self.length - self.pos) n = min(n, self.length - self.pos)
else: else:
n = self.length - self.pos n = self.length - self.pos
if not n: # EOF if n <= 0: # EOF
return b"" if "b" in self.fh.mode else "" # type: ignore[return-value] return b"" if "b" in self.fh.mode else "" # type: ignore[return-value]
self.pos = self.pos + n self.pos = self.pos + n
return self.fh.read(n) return self.fh.read(n)
def readline(self) -> AnyStr: def readline(self, n: int = -1) -> AnyStr:
""" """
Read a line of text. Read a line of text.
:param n: Number of bytes to read. If omitted, zero or negative,
read until end of line.
:returns: An 8-bit string. :returns: An 8-bit string.
""" """
s: AnyStr = b"" if "b" in self.fh.mode else "" # type: ignore[assignment] s: AnyStr = b"" if "b" in self.fh.mode else "" # type: ignore[assignment]
@ -102,14 +113,16 @@ class ContainerIO(Generic[AnyStr]):
if not c: if not c:
break break
s = s + c s = s + c
if c == newline_character: if c == newline_character or len(s) == n:
break break
return s return s
def readlines(self) -> list[AnyStr]: def readlines(self, n: int | None = -1) -> list[AnyStr]:
""" """
Read multiple lines of text. Read multiple lines of text.
:param n: Number of lines to read. If omitted, zero, negative or None,
read until end of region.
:returns: A list of 8-bit strings. :returns: A list of 8-bit strings.
""" """
lines = [] lines = []
@ -118,4 +131,43 @@ class ContainerIO(Generic[AnyStr]):
if not s: if not s:
break break
lines.append(s) lines.append(s)
if len(lines) == n:
break
return lines return lines
def writable(self) -> bool:
return False
def write(self, b: AnyStr) -> NoReturn:
raise NotImplementedError()
def writelines(self, lines: Iterable[AnyStr]) -> NoReturn:
raise NotImplementedError()
def truncate(self, size: int | None = None) -> int:
raise NotImplementedError()
def __enter__(self) -> ContainerIO[AnyStr]:
return self
def __exit__(self, *args: object) -> None:
self.close()
def __iter__(self) -> ContainerIO[AnyStr]:
return self
def __next__(self) -> AnyStr:
line = self.readline()
if not line:
msg = "end of region"
raise StopIteration(msg)
return line
def fileno(self) -> int:
return self.fh.fileno()
def flush(self) -> None:
self.fh.flush()
def close(self) -> None:
self.fh.close()

View File

@ -55,13 +55,3 @@ class TarIO(ContainerIO.ContainerIO[bytes]):
# Open region # Open region
super().__init__(self.fh, self.fh.tell(), size) super().__init__(self.fh, self.fh.tell(), size)
# Context manager support
def __enter__(self) -> TarIO:
return self
def __exit__(self, *args: object) -> None:
self.close()
def close(self) -> None:
self.fh.close()