mirror of
https://github.com/python-pillow/Pillow.git
synced 2025-01-27 09:44:31 +03:00
Implement bitonal decoder
This commit is contained in:
parent
1ed05715d2
commit
ea7e108ca3
|
@ -142,6 +142,108 @@ class PpmImageFile(ImageFile.ImageFile):
|
||||||
# --------------------------------------------------------------------
|
# --------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
class PpmPlainDecoder(ImageFile.PyDecoder):
|
||||||
|
_pulls_fd = True
|
||||||
|
|
||||||
|
def _read_block(self, block_size=10 ** 6):
|
||||||
|
return bytearray(self.fd.read(block_size))
|
||||||
|
# return self.fd.read(block_size)
|
||||||
|
|
||||||
|
def _find_comment_end(self, block, start=0):
|
||||||
|
a = block.find(b"\n", start)
|
||||||
|
b = block.find(b"\r", start)
|
||||||
|
return min(a, b) if a * b > 0 else max(a, b) # lowest nonnegative index (or -1)
|
||||||
|
|
||||||
|
def _ignore_comments(self, block):
|
||||||
|
"""
|
||||||
|
Deletes comments from block. If comment does not end in this
|
||||||
|
block, raises a flag.
|
||||||
|
"""
|
||||||
|
|
||||||
|
comment_spans = False
|
||||||
|
while True:
|
||||||
|
comment_start = block.find(b"#") # look for next comment
|
||||||
|
if comment_start == -1: # no comment found
|
||||||
|
break
|
||||||
|
comment_end = self._find_comment_end(block, comment_start)
|
||||||
|
if comment_end != -1: # comment ends in this block
|
||||||
|
block = (
|
||||||
|
block[:comment_start] + block[comment_end + 1 :]
|
||||||
|
) # delete comment
|
||||||
|
else: # last comment continues to next block(s)
|
||||||
|
block = block[:comment_start]
|
||||||
|
comment_spans = True
|
||||||
|
break
|
||||||
|
return block, comment_spans
|
||||||
|
|
||||||
|
def _decode_bitonal(self):
|
||||||
|
"""
|
||||||
|
The reason this is a separate method is that in the plain PBM
|
||||||
|
format all data tokens are exactly one byte, and so the
|
||||||
|
inter-token whitespace is optional.
|
||||||
|
"""
|
||||||
|
decoded_data = bytearray()
|
||||||
|
total_tokens = self.size
|
||||||
|
|
||||||
|
comment_spans = False
|
||||||
|
tokens_read = 0
|
||||||
|
while True:
|
||||||
|
block = self._read_block() # read next block
|
||||||
|
if not block:
|
||||||
|
raise ValueError("Reached EOF while reading data")
|
||||||
|
|
||||||
|
while block and comment_spans:
|
||||||
|
comment_end = self._find_comment_end(block)
|
||||||
|
if comment_end != -1: # comment ends in this block
|
||||||
|
comment_spans = False
|
||||||
|
block = block[comment_end + 1 :] # delete tail of previous comment
|
||||||
|
else: # comment spans whole block
|
||||||
|
block = self._read_block()
|
||||||
|
|
||||||
|
block, comment_spans = self._ignore_comments(block)
|
||||||
|
|
||||||
|
tokens = b"".join(block.split())
|
||||||
|
|
||||||
|
for token in tokens:
|
||||||
|
if token in (48, 49):
|
||||||
|
tokens_read += 1
|
||||||
|
else:
|
||||||
|
raise ValueError(f"Invalid token for this mode: {bytes([token])}")
|
||||||
|
|
||||||
|
decoded_data.append(token)
|
||||||
|
if tokens_read == total_tokens: # finished!
|
||||||
|
invert = bytes.maketrans(b"01", b"\xFF\x00")
|
||||||
|
decoded_data = decoded_data.translate(invert)
|
||||||
|
return decoded_data
|
||||||
|
|
||||||
|
def _decode_blocks(self, channels=1, depth=8):
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def decode(self, buffer):
|
||||||
|
self.size = self.state.xsize * self.state.ysize
|
||||||
|
rawmode = self.args[0]
|
||||||
|
|
||||||
|
if self.mode == "1":
|
||||||
|
decoded_data = self._decode_bitonal()
|
||||||
|
rawmode = "1;8"
|
||||||
|
elif self.mode == "L":
|
||||||
|
decoded_data = self._decode_blocks(channels=1, depth=8)
|
||||||
|
elif self.mode == "I":
|
||||||
|
if rawmode == "I;16B":
|
||||||
|
decoded_data = self._decode_blocks(channels=1, depth=16)
|
||||||
|
elif rawmode == "I;32B":
|
||||||
|
decoded_data = self._decode_blocks(channels=1, depth=32)
|
||||||
|
elif self.mode == "RGB":
|
||||||
|
decoded_data = self._decode_blocks(channels=3, depth=8)
|
||||||
|
|
||||||
|
self.set_as_raw(bytes(decoded_data), rawmode)
|
||||||
|
return -1, 0
|
||||||
|
|
||||||
|
|
||||||
|
#
|
||||||
|
# --------------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
def _save(im, fp, filename):
|
def _save(im, fp, filename):
|
||||||
if im.mode == "1":
|
if im.mode == "1":
|
||||||
rawmode, head = "1;I", b"P4"
|
rawmode, head = "1;I", b"P4"
|
||||||
|
@ -177,7 +279,7 @@ def _save(im, fp, filename):
|
||||||
#
|
#
|
||||||
# --------------------------------------------------------------------
|
# --------------------------------------------------------------------
|
||||||
|
|
||||||
|
Image.register_decoder("plain", PpmPlainDecoder)
|
||||||
Image.register_open(PpmImageFile.format, PpmImageFile, _accept)
|
Image.register_open(PpmImageFile.format, PpmImageFile, _accept)
|
||||||
Image.register_save(PpmImageFile.format, _save)
|
Image.register_save(PpmImageFile.format, _save)
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user