Support GZIP_1 compression

This commit is contained in:
Andrew Murray 2024-03-26 21:29:25 +11:00
parent 58c4c757a2
commit 142473c7b4
5 changed files with 459 additions and 13 deletions

BIN
Tests/images/m13.fits Normal file

Binary file not shown.

366
Tests/images/m13_gzip.fits Normal file

File diff suppressed because one or more lines are too long

View File

@ -6,7 +6,7 @@ import pytest
from PIL import FitsImagePlugin, Image from PIL import FitsImagePlugin, Image
from .helper import assert_image_equal, hopper from .helper import assert_image_equal, assert_image_equal_tofile, hopper
TEST_FILE = "Tests/images/hopper.fits" TEST_FILE = "Tests/images/hopper.fits"
@ -22,6 +22,11 @@ def test_open() -> None:
assert_image_equal(im, hopper("L")) assert_image_equal(im, hopper("L"))
def test_gzip1() -> None:
with Image.open("Tests/images/m13_gzip.fits") as im:
assert_image_equal_tofile(im, "Tests/images/m13.fits")
def test_invalid_file() -> None: def test_invalid_file() -> None:
# Arrange # Arrange
invalid_file = "Tests/images/flower.jpg" invalid_file = "Tests/images/flower.jpg"

View File

@ -1339,7 +1339,8 @@ FITS
.. versionadded:: 9.1.0 .. versionadded:: 9.1.0
Pillow identifies and reads FITS files, commonly used for astronomy. Pillow identifies and reads FITS files, commonly used for astronomy. Uncompressed and
GZIP_1 compressed images can be read.
FLI, FLC FLI, FLC
^^^^^^^^ ^^^^^^^^

View File

@ -10,6 +10,7 @@
# #
from __future__ import annotations from __future__ import annotations
import gzip
import math import math
from . import Image, ImageFile from . import Image, ImageFile
@ -27,14 +28,32 @@ class FitsImageFile(ImageFile.ImageFile):
assert self.fp is not None assert self.fp is not None
headers: dict[bytes, bytes] = {} headers: dict[bytes, bytes] = {}
header_in_progress = False
decoder_name = ""
while True: while True:
header = self.fp.read(80) header = self.fp.read(80)
if not header: if not header:
msg = "Truncated FITS file" msg = "Truncated FITS file"
raise OSError(msg) raise OSError(msg)
keyword = header[:8].strip() keyword = header[:8].strip()
if keyword == b"END": if keyword in (b"SIMPLE", b"XTENSION"):
header_in_progress = True
elif headers and not header_in_progress:
# This is now a data unit
break break
elif keyword == b"END":
# Seek to the end of the header unit
self.fp.seek(math.ceil(self.fp.tell() / 2880) * 2880)
if not decoder_name:
decoder_name, offset, args = self._parse_headers(headers)
header_in_progress = False
continue
if decoder_name:
# Keep going to read past the headers
continue
value = header[8:].split(b"/")[0].strip() value = header[8:].split(b"/")[0].strip()
if value.startswith(b"="): if value.startswith(b"="):
value = value[1:].strip() value = value[1:].strip()
@ -43,32 +62,87 @@ class FitsImageFile(ImageFile.ImageFile):
raise SyntaxError(msg) raise SyntaxError(msg)
headers[keyword] = value headers[keyword] = value
naxis = int(headers[b"NAXIS"]) if not decoder_name:
if naxis == 0:
msg = "No image data" msg = "No image data"
raise ValueError(msg) raise ValueError(msg)
elif naxis == 1:
self._size = 1, int(headers[b"NAXIS1"])
else:
self._size = int(headers[b"NAXIS1"]), int(headers[b"NAXIS2"])
number_of_bits = int(headers[b"BITPIX"]) offset += self.fp.tell() - 80
self.tile = [(decoder_name, (0, 0) + self.size, offset, args)]
def _get_size(
self, headers: dict[bytes, bytes], prefix: bytes
) -> tuple[int, int] | None:
naxis = int(headers[prefix + b"NAXIS"])
if naxis == 0:
return None
if naxis == 1:
return 1, int(headers[prefix + b"NAXIS1"])
else:
return int(headers[prefix + b"NAXIS1"]), int(headers[prefix + b"NAXIS2"])
def _parse_headers(
self, headers: dict[bytes, bytes]
) -> tuple[str, int, tuple[str | int, ...]]:
prefix = b""
decoder_name = "raw"
offset = 0
if (
headers.get(b"XTENSION") == b"'BINTABLE'"
and headers.get(b"ZIMAGE") == b"T"
and headers[b"ZCMPTYPE"] == b"'GZIP_1 '"
):
no_prefix_size = self._get_size(headers, prefix) or (0, 0)
number_of_bits = int(headers[b"BITPIX"])
offset = no_prefix_size[0] * no_prefix_size[1] * (number_of_bits // 8)
prefix = b"Z"
decoder_name = "fits_gzip"
size = self._get_size(headers, prefix)
if not size:
return "", 0, ()
self._size = size
number_of_bits = int(headers[prefix + b"BITPIX"])
if number_of_bits == 8: if number_of_bits == 8:
self._mode = "L" self._mode = "L"
elif number_of_bits == 16: elif number_of_bits == 16:
self._mode = "I" self._mode = "I;16"
elif number_of_bits == 32: elif number_of_bits == 32:
self._mode = "I" self._mode = "I"
elif number_of_bits in (-32, -64): elif number_of_bits in (-32, -64):
self._mode = "F" self._mode = "F"
offset = math.ceil(self.fp.tell() / 2880) * 2880 args = (self.mode, 0, -1) if decoder_name == "raw" else (number_of_bits,)
self.tile = [("raw", (0, 0) + self.size, offset, (self.mode, 0, -1))] return decoder_name, offset, args
class FitsGzipDecoder(ImageFile.PyDecoder):
_pulls_fd = True
def decode(self, buffer):
assert self.fd is not None
value = gzip.decompress(self.fd.read())
rows = []
offset = 0
number_of_bits = min(self.args[0] // 8, 4)
for y in range(self.state.ysize):
row = bytearray()
for x in range(self.state.xsize):
row += value[offset + (4 - number_of_bits) : offset + 4]
offset += 4
rows.append(row)
self.set_as_raw(bytes([pixel for row in rows[::-1] for pixel in row]))
return -1, 0
# -------------------------------------------------------------------- # --------------------------------------------------------------------
# Registry # Registry
Image.register_open(FitsImageFile.format, FitsImageFile, _accept) Image.register_open(FitsImageFile.format, FitsImageFile, _accept)
Image.register_decoder("fits_gzip", FitsGzipDecoder)
Image.register_extensions(FitsImageFile.format, [".fit", ".fits"]) Image.register_extensions(FitsImageFile.format, [".fit", ".fits"])