Support extracting metadata from bytes and stream objects (#1547)

This should enable more accurate uploads of in-memory files.
This commit is contained in:
Allerter 2020-09-08 02:50:37 +04:30 committed by GitHub
parent 02b8f1d007
commit 1ed0f75c49
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

View File

@ -601,17 +601,53 @@ def get_message_id(message):
def _get_metadata(file):
# `hachoir` only deals with paths to in-disk files, while
# `_get_extension` supports a few other things. The parser
# may also fail in any case and we don't want to crash if
if not hachoir:
return
stream = None
close_stream = True
seekable = True
# The parser may fail and we don't want to crash if
# the extraction process fails.
if hachoir and isinstance(file, str) and os.path.isfile(file):
try:
with hachoir.parser.createParser(file) as parser:
# Note: aiofiles are intentionally left out for simplicity
if isinstance(file, str):
stream = open(file, 'rb')
elif isinstance(file, bytes):
stream = io.BytesIO(file)
else:
stream = file
close_stream = False
if getattr(file, 'seekable', None):
seekable = file.seekable()
else:
seekable = False
if not seekable:
return None
pos = stream.tell()
filename = getattr(file, 'name', '')
parser = hachoir.parser.guess.guessParser(hachoir.stream.InputIOStream(
stream,
source='file:' + filename,
tags=[],
filename=filename
))
return hachoir.metadata.extractMetadata(parser)
except Exception as e:
_log.warning('Failed to analyze %s: %s %s', file, e.__class__, e)
finally:
if stream and close_stream:
stream.close()
elif stream and seekable:
stream.seek(pos)
def get_attributes(file, *, attributes=None, mime_type=None,
force_document=False, voice_note=False, video_note=False,
@ -803,14 +839,30 @@ def is_gif(file):
def is_audio(file):
"""Returns `True` if the file extension looks like an audio file."""
file = 'a' + _get_extension(file)
"""Returns `True` if the file has an audio mime type."""
ext = _get_extension(file)
if not ext:
metadata = _get_metadata(file)
if metadata and metadata.has('mime_type'):
return metadata.get('mime_type').startswith('audio/')
else:
return False
else:
file = 'a' + ext
return (mimetypes.guess_type(file)[0] or '').startswith('audio/')
def is_video(file):
"""Returns `True` if the file extension looks like a video file."""
file = 'a' + _get_extension(file)
"""Returns `True` if the file has a video mime type."""
ext = _get_extension(file)
if not ext:
metadata = _get_metadata(file)
if metadata and metadata.has('mime_type'):
return metadata.get('mime_type').startswith('video/')
else:
return False
else:
file = 'a' + ext
return (mimetypes.guess_type(file)[0] or '').startswith('video/')