From 652f447412b50e08bfb17e217783560c04a4de5c Mon Sep 17 00:00:00 2001 From: Piolie Date: Sat, 16 Jan 2021 16:53:24 -0300 Subject: [PATCH] Implement grayscale/color decoder --- src/PIL/PpmImagePlugin.py | 63 ++++++++++++++++++++++++++++++++++++++- 1 file changed, 62 insertions(+), 1 deletion(-) diff --git a/src/PIL/PpmImagePlugin.py b/src/PIL/PpmImagePlugin.py index 6bebec6de..86616b89f 100644 --- a/src/PIL/PpmImagePlugin.py +++ b/src/PIL/PpmImagePlugin.py @@ -217,7 +217,68 @@ class PpmPlainDecoder(ImageFile.PyDecoder): return decoded_data def _decode_blocks(self, channels=1, depth=8): - raise NotImplementedError + decoded_data = bytearray() + if depth == 32: + maxval = 2 ** 31 - 1 # HACK: 32-bit grayscale uses signed int + else: + maxval = 2 ** depth - 1 # FIXME: should be passed by _open + max_len = 10 + bytes_per_sample = depth // 8 + total_tokens = self.size * channels + + token_spans = False + comment_spans = False + half_token = False + tokens_read = 0 + while True: + block = self._read_block() # read next block + if not block: + if token_spans: + block = bytearray(b" ") # flush half_token + else: + raise ValueError("Reached EOF while reading data") + + while block and comment_spans: + comment_end = self._find_comment_end(block) + if comment_end != -1: # comment ends in this block + block = block[comment_end + 1 :] # delete tail of previous comment + break + else: # comment spans whole block + block = self._read_block() + + block, comment_spans = self._ignore_comments(block) + + if token_spans: + block = half_token + block # stitch half_token to new block + token_spans = False + + tokens = block.split() + + if block and not block[-1:].isspace(): # block might split token + token_spans = True + half_token = tokens.pop() # save half token for later + if len(half_token) > max_len: # prevent buildup of half_token + raise ValueError( + f"Token too long found in data: {half_token[:max_len + 1]}" + ) + + for token in tokens: + if len(token) > max_len: + raise ValueError( + f"Token too long found in data: {token[:max_len + 1]}" + ) + try: + token = int(token) + except ValueError: + raise ValueError( + f"Non-decimal-ASCII found in data: {token}" + ) from None + tokens_read += 1 + if token > maxval: + raise ValueError(f"Channel value too large for this mode: {token}") + decoded_data += token.to_bytes(bytes_per_sample, "big") + if tokens_read == total_tokens: # finished! + return decoded_data def decode(self, buffer): self.size = self.state.xsize * self.state.ysize