Merge pull request #8240 from radarhere/containerio

Changed ContainerIO to subclass IO
This commit is contained in:
Hugo van Kemenade 2024-07-17 14:23:55 +03:00 committed by GitHub
commit d05262105a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 142 additions and 32 deletions

View File

@ -1,7 +1,5 @@
from __future__ import annotations
from typing import Literal
import pytest
from PIL import ContainerIO, Image
@ -23,6 +21,13 @@ def test_isatty() -> None:
assert container.isatty() is False
def test_seekable() -> None:
with hopper() as im:
container = ContainerIO.ContainerIO(im, 0, 0)
assert container.seekable() is True
@pytest.mark.parametrize(
"mode, expected_position",
(
@ -31,7 +36,7 @@ def test_isatty() -> None:
(2, 100),
),
)
def test_seek_mode(mode: Literal[0, 1, 2], expected_position: int) -> None:
def test_seek_mode(mode: int, expected_position: int) -> None:
# Arrange
with open(TEST_FILE, "rb") as fh:
container = ContainerIO.ContainerIO(fh, 22, 100)
@ -44,6 +49,14 @@ def test_seek_mode(mode: Literal[0, 1, 2], expected_position: int) -> None:
assert container.tell() == expected_position
@pytest.mark.parametrize("bytesmode", (True, False))
def test_readable(bytesmode: bool) -> None:
with open(TEST_FILE, "rb" if bytesmode else "r") as fh:
container = ContainerIO.ContainerIO(fh, 0, 120)
assert container.readable() is True
@pytest.mark.parametrize("bytesmode", (True, False))
def test_read_n0(bytesmode: bool) -> None:
# Arrange
@ -51,7 +64,7 @@ def test_read_n0(bytesmode: bool) -> None:
container = ContainerIO.ContainerIO(fh, 22, 100)
# Act
container.seek(81)
assert container.seek(81) == 81
data = container.read()
# Assert
@ -67,7 +80,7 @@ def test_read_n(bytesmode: bool) -> None:
container = ContainerIO.ContainerIO(fh, 22, 100)
# Act
container.seek(81)
assert container.seek(81) == 81
data = container.read(3)
# Assert
@ -83,7 +96,7 @@ def test_read_eof(bytesmode: bool) -> None:
container = ContainerIO.ContainerIO(fh, 22, 100)
# Act
container.seek(100)
assert container.seek(100) == 100
data = container.read()
# Assert
@ -94,21 +107,65 @@ def test_read_eof(bytesmode: bool) -> None:
@pytest.mark.parametrize("bytesmode", (True, False))
def test_readline(bytesmode: bool) -> None:
# Arrange
with open(TEST_FILE, "rb" if bytesmode else "r") as fh:
container = ContainerIO.ContainerIO(fh, 0, 120)
# Act
data = container.readline()
# Assert
if bytesmode:
data = data.decode()
assert data == "This is line 1\n"
data = container.readline(4)
if bytesmode:
data = data.decode()
assert data == "This"
@pytest.mark.parametrize("bytesmode", (True, False))
def test_readlines(bytesmode: bool) -> None:
expected = [
"This is line 1\n",
"This is line 2\n",
"This is line 3\n",
"This is line 4\n",
"This is line 5\n",
"This is line 6\n",
"This is line 7\n",
"This is line 8\n",
]
with open(TEST_FILE, "rb" if bytesmode else "r") as fh:
container = ContainerIO.ContainerIO(fh, 0, 120)
data = container.readlines()
if bytesmode:
data = [line.decode() for line in data]
assert data == expected
assert container.seek(0) == 0
data = container.readlines(2)
if bytesmode:
data = [line.decode() for line in data]
assert data == expected[:2]
@pytest.mark.parametrize("bytesmode", (True, False))
def test_write(bytesmode: bool) -> None:
with open(TEST_FILE, "rb" if bytesmode else "r") as fh:
container = ContainerIO.ContainerIO(fh, 0, 120)
assert container.writable() is False
with pytest.raises(NotImplementedError):
container.write(b"" if bytesmode else "")
with pytest.raises(NotImplementedError):
container.writelines([])
with pytest.raises(NotImplementedError):
container.truncate()
@pytest.mark.parametrize("bytesmode", (True, False))
def test_iter(bytesmode: bool) -> None:
# Arrange
expected = [
"This is line 1\n",
@ -124,9 +181,21 @@ def test_readlines(bytesmode: bool) -> None:
container = ContainerIO.ContainerIO(fh, 0, 120)
# Act
data = container.readlines()
data = []
for line in container:
data.append(line)
# Assert
if bytesmode:
data = [line.decode() for line in data]
assert data == expected
@pytest.mark.parametrize("bytesmode", (True, False))
def test_file(bytesmode: bool) -> None:
with open(TEST_FILE, "rb" if bytesmode else "r") as fh:
container = ContainerIO.ContainerIO(fh, 0, 120)
assert isinstance(container.fileno(), int)
container.flush()
container.close()

View File

@ -161,5 +161,4 @@ exclude = [
'^Tests/test_qt_image_qapplication.py$',
'^Tests/test_font_pcf_charsets.py$',
'^Tests/test_font_pcf.py$',
'^Tests/test_file_tar.py$',
]

View File

@ -16,10 +16,11 @@
from __future__ import annotations
import io
from typing import IO, AnyStr, Generic, Literal
from collections.abc import Iterable
from typing import IO, AnyStr, NoReturn
class ContainerIO(Generic[AnyStr]):
class ContainerIO(IO[AnyStr]):
"""
A file object that provides read access to a part of an existing
file (for example a TAR file).
@ -45,7 +46,10 @@ class ContainerIO(Generic[AnyStr]):
def isatty(self) -> bool:
return False
def seek(self, offset: int, mode: Literal[0, 1, 2] = io.SEEK_SET) -> None:
def seekable(self) -> bool:
return True
def seek(self, offset: int, mode: int = io.SEEK_SET) -> int:
"""
Move file pointer.
@ -53,6 +57,7 @@ class ContainerIO(Generic[AnyStr]):
:param mode: Starting position. Use 0 for beginning of region, 1
for current offset, and 2 for end of region. You cannot move
the pointer outside the defined region.
:returns: Offset from start of region, in bytes.
"""
if mode == 1:
self.pos = self.pos + offset
@ -63,6 +68,7 @@ class ContainerIO(Generic[AnyStr]):
# clamp
self.pos = max(0, min(self.pos, self.length))
self.fh.seek(self.offset + self.pos)
return self.pos
def tell(self) -> int:
"""
@ -72,27 +78,32 @@ class ContainerIO(Generic[AnyStr]):
"""
return self.pos
def read(self, n: int = 0) -> AnyStr:
def readable(self) -> bool:
return True
def read(self, n: int = -1) -> AnyStr:
"""
Read data.
:param n: Number of bytes to read. If omitted or zero,
:param n: Number of bytes to read. If omitted, zero or negative,
read until end of region.
:returns: An 8-bit string.
"""
if n:
if n > 0:
n = min(n, self.length - self.pos)
else:
n = self.length - self.pos
if not n: # EOF
if n <= 0: # EOF
return b"" if "b" in self.fh.mode else "" # type: ignore[return-value]
self.pos = self.pos + n
return self.fh.read(n)
def readline(self) -> AnyStr:
def readline(self, n: int = -1) -> AnyStr:
"""
Read a line of text.
:param n: Number of bytes to read. If omitted, zero or negative,
read until end of line.
:returns: An 8-bit string.
"""
s: AnyStr = b"" if "b" in self.fh.mode else "" # type: ignore[assignment]
@ -102,14 +113,16 @@ class ContainerIO(Generic[AnyStr]):
if not c:
break
s = s + c
if c == newline_character:
if c == newline_character or len(s) == n:
break
return s
def readlines(self) -> list[AnyStr]:
def readlines(self, n: int | None = -1) -> list[AnyStr]:
"""
Read multiple lines of text.
:param n: Number of lines to read. If omitted, zero, negative or None,
read until end of region.
:returns: A list of 8-bit strings.
"""
lines = []
@ -118,4 +131,43 @@ class ContainerIO(Generic[AnyStr]):
if not s:
break
lines.append(s)
if len(lines) == n:
break
return lines
def writable(self) -> bool:
return False
def write(self, b: AnyStr) -> NoReturn:
raise NotImplementedError()
def writelines(self, lines: Iterable[AnyStr]) -> NoReturn:
raise NotImplementedError()
def truncate(self, size: int | None = None) -> int:
raise NotImplementedError()
def __enter__(self) -> ContainerIO[AnyStr]:
return self
def __exit__(self, *args: object) -> None:
self.close()
def __iter__(self) -> ContainerIO[AnyStr]:
return self
def __next__(self) -> AnyStr:
line = self.readline()
if not line:
msg = "end of region"
raise StopIteration(msg)
return line
def fileno(self) -> int:
return self.fh.fileno()
def flush(self) -> None:
self.fh.flush()
def close(self) -> None:
self.fh.close()

View File

@ -55,13 +55,3 @@ class TarIO(ContainerIO.ContainerIO[bytes]):
# Open region
super().__init__(self.fh, self.fh.tell(), size)
# Context manager support
def __enter__(self) -> TarIO:
return self
def __exit__(self, *args: object) -> None:
self.close()
def close(self) -> None:
self.fh.close()