mirror of
https://github.com/python-pillow/Pillow.git
synced 2025-01-25 00:34:14 +03:00
Add TIFF save_all writer.
This commit is contained in:
parent
8f49022ac5
commit
92c7337df0
|
@ -1686,7 +1686,9 @@ class Image(object):
|
|||
save_handler = SAVE[format.upper()]
|
||||
|
||||
if open_fp:
|
||||
fp = builtins.open(filename, "wb")
|
||||
# Open also for reading ("+"), because TIFF save_all
|
||||
# writer needs to go back and edit the written data.
|
||||
fp = builtins.open(filename, "w+b")
|
||||
|
||||
try:
|
||||
save_handler(self, fp, filename)
|
||||
|
|
|
@ -1404,7 +1404,6 @@ def _save(im, fp, filename):
|
|||
if im.mode == "P":
|
||||
lut = im.im.getpalette("RGB", "RGB;L")
|
||||
ifd[COLORMAP] = tuple(i8(v) * 256 for v in lut)
|
||||
|
||||
# data orientation
|
||||
stride = len(bits) * ((im.size[0]*bits[0]+7)//8)
|
||||
ifd[ROWSPERSTRIP] = im.size[1]
|
||||
|
@ -1491,12 +1490,273 @@ def _save(im, fp, filename):
|
|||
# just to access o32 and o16 (using correct byte order)
|
||||
im._debug_multipage = ifd
|
||||
|
||||
class AppendingTiffWriter:
|
||||
fieldSizes = [
|
||||
0, # None
|
||||
1, # byte
|
||||
1, # ascii
|
||||
2, # short
|
||||
4, # long
|
||||
8, # rational
|
||||
1, # sbyte
|
||||
1, # undefined
|
||||
2, # sshort
|
||||
4, # slong
|
||||
8, # srational
|
||||
4, # float
|
||||
8, # double
|
||||
]
|
||||
|
||||
# StripOffsets = 273
|
||||
# FreeOffsets = 288
|
||||
# TileOffsets = 324
|
||||
# JPEGQTables = 519
|
||||
# JPEGDCTables = 520
|
||||
# JPEGACTables = 521
|
||||
Tags = set((273, 288, 324, 519, 520, 521))
|
||||
|
||||
def __init__(self, fn, new=False):
|
||||
if hasattr(fn, 'read'):
|
||||
self.f = fn
|
||||
self.close_fp = False
|
||||
else:
|
||||
self.name = fn
|
||||
self.close_fp = True
|
||||
try:
|
||||
self.f = io.open(fn, "w+b" if new else "r+b")
|
||||
except IOError:
|
||||
self.f = io.open(fn, "w+b")
|
||||
self.beginning = self.f.tell()
|
||||
self.setup()
|
||||
|
||||
def setup(self):
|
||||
# Reset everything.
|
||||
self.f.seek(self.beginning, os.SEEK_SET)
|
||||
|
||||
self.whereToWriteNewIFDOffset = None
|
||||
self.offsetOfNewPage = 0
|
||||
|
||||
self.IIMM = IIMM = self.f.read(4)
|
||||
if not IIMM:
|
||||
# empty file - first page
|
||||
self.isFirst = True
|
||||
return
|
||||
|
||||
self.isFirst = False
|
||||
if IIMM == b"II\x2a\x00":
|
||||
self.setEndian("<")
|
||||
elif IIMM == b"MM\x00\x2a":
|
||||
self.setEndian(">")
|
||||
else:
|
||||
raise RuntimeError("Invalid TIFF file header")
|
||||
|
||||
self.skipIFDs()
|
||||
self.goToEnd()
|
||||
|
||||
def finalize(self):
|
||||
if self.isFirst:
|
||||
return
|
||||
|
||||
# fix offsets
|
||||
self.f.seek(self.offsetOfNewPage)
|
||||
|
||||
IIMM = self.f.read(4)
|
||||
if not IIMM:
|
||||
# raise RuntimeError("nothing written into new page")
|
||||
# Make it easy to finish a frame without committing to a new one.
|
||||
return
|
||||
|
||||
if IIMM != self.IIMM:
|
||||
raise RuntimeError("IIMM of new page doesn't match IIMM of "
|
||||
"first page")
|
||||
|
||||
IFDoffset = self.readLong()
|
||||
IFDoffset += self.offsetOfNewPage
|
||||
self.f.seek(self.whereToWriteNewIFDOffset)
|
||||
self.writeLong(IFDoffset)
|
||||
self.f.seek(IFDoffset)
|
||||
self.fixIFD()
|
||||
|
||||
def newFrame(self):
|
||||
# Call this to finish a frame.
|
||||
self.finalize()
|
||||
self.setup()
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc_value, traceback):
|
||||
if self.close_fp:
|
||||
self.close()
|
||||
return False
|
||||
|
||||
def tell(self):
|
||||
return self.f.tell() - self.offsetOfNewPage
|
||||
|
||||
def seek(self, offset, whence):
|
||||
if whence == os.SEEK_SET:
|
||||
offset += self.offsetOfNewPage
|
||||
|
||||
self.f.seek(offset, whence)
|
||||
return self.tell()
|
||||
|
||||
def goToEnd(self):
|
||||
self.f.seek(0, os.SEEK_END)
|
||||
pos = self.f.tell()
|
||||
|
||||
# pad to 16 byte boundary
|
||||
padBytes = 16 - pos % 16
|
||||
if 0 < padBytes < 16:
|
||||
self.f.write(bytes(bytearray(padBytes)))
|
||||
self.offsetOfNewPage = self.f.tell()
|
||||
|
||||
def setEndian(self, endian):
|
||||
self.endian = endian
|
||||
self.longFmt = self.endian + "L"
|
||||
self.shortFmt = self.endian + "H"
|
||||
self.tagFormat = self.endian + "HHL"
|
||||
|
||||
def skipIFDs(self):
|
||||
while True:
|
||||
IFDoffset = self.readLong()
|
||||
if IFDoffset == 0:
|
||||
self.whereToWriteNewIFDOffset = self.f.tell() - 4
|
||||
break
|
||||
|
||||
self.f.seek(IFDoffset)
|
||||
numTags = self.readShort()
|
||||
self.f.seek(numTags * 12, os.SEEK_CUR)
|
||||
|
||||
def write(self, data):
|
||||
return self.f.write(data)
|
||||
|
||||
def readShort(self):
|
||||
value, = struct.unpack(self.shortFmt, self.f.read(2))
|
||||
return value
|
||||
|
||||
def readLong(self):
|
||||
value, = struct.unpack(self.longFmt, self.f.read(4))
|
||||
return value
|
||||
|
||||
def rewriteLastShortToLong(self, value):
|
||||
self.f.seek(-2, os.SEEK_CUR)
|
||||
bytesWritten = self.f.write(struct.pack(self.longFmt, value))
|
||||
if bytesWritten is not None and bytesWritten != 4:
|
||||
raise RuntimeError("wrote only %u bytes but wanted 4" %
|
||||
bytesWritten)
|
||||
|
||||
def rewriteLastShort(self, value):
|
||||
self.f.seek(-2, os.SEEK_CUR)
|
||||
bytesWritten = self.f.write(struct.pack(self.shortFmt, value))
|
||||
if bytesWritten is not None and bytesWritten != 2:
|
||||
raise RuntimeError("wrote only %u bytes but wanted 2" %
|
||||
bytesWritten)
|
||||
|
||||
def rewriteLastLong(self, value):
|
||||
self.f.seek(-4, os.SEEK_CUR)
|
||||
bytesWritten = self.f.write(struct.pack(self.longFmt, value))
|
||||
if bytesWritten is not None and bytesWritten != 4:
|
||||
raise RuntimeError("wrote only %u bytes but wanted 4" %
|
||||
bytesWritten)
|
||||
|
||||
def writeShort(self, value):
|
||||
bytesWritten = self.f.write(struct.pack(self.shortFmt, value))
|
||||
if bytesWritten is not None and bytesWritten != 2:
|
||||
raise RuntimeError("wrote only %u bytes but wanted 2" %
|
||||
bytesWritten)
|
||||
|
||||
def writeLong(self, value):
|
||||
bytesWritten = self.f.write(struct.pack(self.longFmt, value))
|
||||
if bytesWritten is not None and bytesWritten != 4:
|
||||
raise RuntimeError("wrote only %u bytes but wanted 4" %
|
||||
bytesWritten)
|
||||
|
||||
def close(self):
|
||||
self.finalize()
|
||||
self.f.close()
|
||||
|
||||
def fixIFD(self):
|
||||
numTags = self.readShort()
|
||||
#trace("fixing IFD at %X; number of tags: %u (0x%X)", self.f.tell()-2,
|
||||
# numTags, numTags)
|
||||
|
||||
for i in range(numTags):
|
||||
tag, fieldType, count = struct.unpack(self.tagFormat, self.f.read(8))
|
||||
#trace(" at %X: tag %u (0x%X), type %u, count %u", self.f.tell()-8,
|
||||
# tag, tag, fieldType, count)
|
||||
|
||||
fieldSize = self.fieldSizes[fieldType]
|
||||
totalSize = fieldSize * count
|
||||
isLocal = (totalSize <= 4)
|
||||
if not isLocal:
|
||||
offset = self.readLong()
|
||||
offset += self.offsetOfNewPage
|
||||
self.rewriteLastLong(offset)
|
||||
|
||||
if tag in self.Tags:
|
||||
curPos = self.f.tell()
|
||||
|
||||
if isLocal:
|
||||
self.fixOffsets(count, isShort=(fieldSize == 2),
|
||||
isLong=(fieldSize == 4))
|
||||
self.f.seek(curPos + 4)
|
||||
else:
|
||||
self.f.seek(offset)
|
||||
self.fixOffsets(count, isShort=(fieldSize == 2),
|
||||
isLong=(fieldSize == 4))
|
||||
self.f.seek(curPos)
|
||||
|
||||
offset = curPos = None
|
||||
|
||||
elif isLocal:
|
||||
# skip the locally stored value that is not an offset
|
||||
self.f.seek(4, os.SEEK_CUR)
|
||||
|
||||
def fixOffsets(self, count, isShort=False, isLong=False):
|
||||
if not isShort and not isLong:
|
||||
raise RuntimeError("offset is neither short nor long")
|
||||
|
||||
for i in range(count):
|
||||
offset = self.readShort() if isShort else self.readLong()
|
||||
offset += self.offsetOfNewPage
|
||||
if isShort and offset >= 65536:
|
||||
# offset is now too large - we must convert shorts to longs
|
||||
if count != 1:
|
||||
raise RuntimeError("not implemented") # XXX TODO
|
||||
|
||||
# simple case - the offset is just one and therefore it is
|
||||
# local (not referenced with another offset)
|
||||
self.rewriteLastShortToLong(offset)
|
||||
self.f.seek(-10, os.SEEK_CUR)
|
||||
self.writeShort(4) # rewrite the type to LONG
|
||||
self.f.seek(8, os.SEEK_CUR)
|
||||
elif isShort:
|
||||
self.rewriteLastShort(offset)
|
||||
else:
|
||||
self.rewriteLastLong(offset)
|
||||
|
||||
def _save_all(im, fp, filename):
|
||||
if not hasattr(im, "n_frames"):
|
||||
return _save(im, fp, filename)
|
||||
|
||||
cur_idx = im.tell()
|
||||
try:
|
||||
with AppendingTiffWriter(fp) as tf:
|
||||
for idx in range(im.n_frames):
|
||||
im.seek(idx)
|
||||
im.load()
|
||||
_save(im, tf, filename)
|
||||
tf.newFrame()
|
||||
finally:
|
||||
im.seek(cur_idx)
|
||||
|
||||
#
|
||||
# --------------------------------------------------------------------
|
||||
# Register
|
||||
|
||||
Image.register_open(TiffImageFile.format, TiffImageFile, _accept)
|
||||
Image.register_save(TiffImageFile.format, _save)
|
||||
Image.register_save_all(TiffImageFile.format, _save_all)
|
||||
|
||||
Image.register_extension(TiffImageFile.format, ".tif")
|
||||
Image.register_extension(TiffImageFile.format, ".tiff")
|
||||
|
|
|
@ -487,6 +487,17 @@ class TestFileTiff(PillowTestCase):
|
|||
|
||||
self.assert_image_equal(im, reloaded)
|
||||
|
||||
def test_tiff_save_all(self):
|
||||
import io
|
||||
import os
|
||||
|
||||
mp = io.BytesIO()
|
||||
with Image.open("Tests/images/multipage.tiff") as im:
|
||||
im.save(mp, format="tiff", save_all=True)
|
||||
|
||||
mp.seek(0, os.SEEK_SET)
|
||||
with Image.open(mp) as im:
|
||||
self.assertEqual(im.n_frames, 3)
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
Loading…
Reference in New Issue
Block a user