diff --git a/src/PIL/PpmImagePlugin.py b/src/PIL/PpmImagePlugin.py index c94b27a47..6bebec6de 100644 --- a/src/PIL/PpmImagePlugin.py +++ b/src/PIL/PpmImagePlugin.py @@ -142,6 +142,108 @@ class PpmImageFile(ImageFile.ImageFile): # -------------------------------------------------------------------- +class PpmPlainDecoder(ImageFile.PyDecoder): + _pulls_fd = True + + def _read_block(self, block_size=10 ** 6): + return bytearray(self.fd.read(block_size)) + # return self.fd.read(block_size) + + def _find_comment_end(self, block, start=0): + a = block.find(b"\n", start) + b = block.find(b"\r", start) + return min(a, b) if a * b > 0 else max(a, b) # lowest nonnegative index (or -1) + + def _ignore_comments(self, block): + """ + Deletes comments from block. If comment does not end in this + block, raises a flag. + """ + + comment_spans = False + while True: + comment_start = block.find(b"#") # look for next comment + if comment_start == -1: # no comment found + break + comment_end = self._find_comment_end(block, comment_start) + if comment_end != -1: # comment ends in this block + block = ( + block[:comment_start] + block[comment_end + 1 :] + ) # delete comment + else: # last comment continues to next block(s) + block = block[:comment_start] + comment_spans = True + break + return block, comment_spans + + def _decode_bitonal(self): + """ + The reason this is a separate method is that in the plain PBM + format all data tokens are exactly one byte, and so the + inter-token whitespace is optional. + """ + decoded_data = bytearray() + total_tokens = self.size + + comment_spans = False + tokens_read = 0 + while True: + block = self._read_block() # read next block + if not block: + raise ValueError("Reached EOF while reading data") + + while block and comment_spans: + comment_end = self._find_comment_end(block) + if comment_end != -1: # comment ends in this block + comment_spans = False + block = block[comment_end + 1 :] # delete tail of previous comment + else: # comment spans whole block + block = self._read_block() + + block, comment_spans = self._ignore_comments(block) + + tokens = b"".join(block.split()) + + for token in tokens: + if token in (48, 49): + tokens_read += 1 + else: + raise ValueError(f"Invalid token for this mode: {bytes([token])}") + + decoded_data.append(token) + if tokens_read == total_tokens: # finished! + invert = bytes.maketrans(b"01", b"\xFF\x00") + decoded_data = decoded_data.translate(invert) + return decoded_data + + def _decode_blocks(self, channels=1, depth=8): + raise NotImplementedError + + def decode(self, buffer): + self.size = self.state.xsize * self.state.ysize + rawmode = self.args[0] + + if self.mode == "1": + decoded_data = self._decode_bitonal() + rawmode = "1;8" + elif self.mode == "L": + decoded_data = self._decode_blocks(channels=1, depth=8) + elif self.mode == "I": + if rawmode == "I;16B": + decoded_data = self._decode_blocks(channels=1, depth=16) + elif rawmode == "I;32B": + decoded_data = self._decode_blocks(channels=1, depth=32) + elif self.mode == "RGB": + decoded_data = self._decode_blocks(channels=3, depth=8) + + self.set_as_raw(bytes(decoded_data), rawmode) + return -1, 0 + + +# +# -------------------------------------------------------------------- + + def _save(im, fp, filename): if im.mode == "1": rawmode, head = "1;I", b"P4" @@ -177,7 +279,7 @@ def _save(im, fp, filename): # # -------------------------------------------------------------------- - +Image.register_decoder("plain", PpmPlainDecoder) Image.register_open(PpmImageFile.format, PpmImageFile, _accept) Image.register_save(PpmImageFile.format, _save)