Move upload/download file methods to the TelegramClient

This commit is contained in:
Lonami Exo 2018-01-18 13:55:03 +01:00
parent 55efb2b104
commit 1a3feec481
2 changed files with 251 additions and 214 deletions

View File

@ -3,19 +3,16 @@ import os
import threading
import warnings
from datetime import timedelta, datetime
from hashlib import md5
from io import BytesIO
from signal import signal, SIGINT, SIGTERM, SIGABRT
from threading import Lock
from time import sleep
from . import helpers as utils, version
from .crypto import rsa, CdnDecrypter
from . import version
from .crypto import rsa
from .errors import (
RPCError, BrokenAuthKeyError, ServerError,
FloodWaitError, FloodTestPhoneWaitError, FileMigrateError,
TypeNotFoundError, UnauthorizedError, PhoneMigrateError,
NetworkMigrateError, UserMigrateError
RPCError, BrokenAuthKeyError, ServerError, FloodWaitError,
FloodTestPhoneWaitError, TypeNotFoundError, UnauthorizedError,
PhoneMigrateError, NetworkMigrateError, UserMigrateError
)
from .network import authenticator, MtProtoSender, Connection, ConnectionMode
from .tl import TLObject, Session
@ -30,15 +27,8 @@ from .tl.functions.help import (
GetCdnConfigRequest, GetConfigRequest
)
from .tl.functions.updates import GetStateRequest
from .tl.functions.upload import (
GetFileRequest, SaveBigFilePartRequest, SaveFilePartRequest
)
from .tl.types import InputFile, InputFileBig
from .tl.types.auth import ExportedAuthorization
from .tl.types.upload import FileCdnRedirect
from .update_state import UpdateState
from .utils import get_appropriated_part_size
DEFAULT_DC_ID = 4
DEFAULT_IPV4_IP = '149.154.167.51'
@ -565,203 +555,6 @@ class TelegramBareClient:
# endregion
# region Uploading media
def upload_file(self,
file,
part_size_kb=None,
file_name=None,
allow_cache=True,
progress_callback=None):
"""Uploads the specified file and returns a handle (an instance
of InputFile or InputFileBig, as required) which can be later used.
Uploading a file will simply return a "handle" to the file stored
remotely in the Telegram servers, which can be later used on. This
will NOT upload the file to your own chat.
'file' may be either a file path, a byte array, or a stream.
Note that if the file is a stream it will need to be read
entirely into memory to tell its size first.
If 'progress_callback' is not None, it should be a function that
takes two parameters, (bytes_uploaded, total_bytes).
Default values for the optional parameters if left as None are:
part_size_kb = get_appropriated_part_size(file_size)
file_name = os.path.basename(file_path)
"""
if isinstance(file, (InputFile, InputFileBig)):
return file # Already uploaded
if isinstance(file, str):
file_size = os.path.getsize(file)
elif isinstance(file, bytes):
file_size = len(file)
else:
file = file.read()
file_size = len(file)
# File will now either be a string or bytes
if not part_size_kb:
part_size_kb = get_appropriated_part_size(file_size)
if part_size_kb > 512:
raise ValueError('The part size must be less or equal to 512KB')
part_size = int(part_size_kb * 1024)
if part_size % 1024 != 0:
raise ValueError('The part size must be evenly divisible by 1024')
# Set a default file name if None was specified
file_id = utils.generate_random_long()
if not file_name:
if isinstance(file, str):
file_name = os.path.basename(file)
else:
file_name = str(file_id)
# Determine whether the file is too big (over 10MB) or not
# Telegram does make a distinction between smaller or larger files
is_large = file_size > 10 * 1024 * 1024
if not is_large:
# Calculate the MD5 hash before anything else.
# As this needs to be done always for small files,
# might as well do it before anything else and
# check the cache.
if isinstance(file, str):
with open(file, 'rb') as stream:
file = stream.read()
hash_md5 = md5(file)
else:
hash_md5 = None
part_count = (file_size + part_size - 1) // part_size
__log__.info('Uploading file of %d bytes in %d chunks of %d',
file_size, part_count, part_size)
with open(file, 'rb') if isinstance(file, str) else BytesIO(file) \
as stream:
for part_index in range(part_count):
# Read the file by in chunks of size part_size
part = stream.read(part_size)
# The SavePartRequest is different depending on whether
# the file is too large or not (over or less than 10MB)
if is_large:
request = SaveBigFilePartRequest(file_id, part_index,
part_count, part)
else:
request = SaveFilePartRequest(file_id, part_index, part)
result = self(request)
if result:
__log__.debug('Uploaded %d/%d', part_index + 1, part_count)
if progress_callback:
progress_callback(stream.tell(), file_size)
else:
raise RuntimeError(
'Failed to upload file part {}.'.format(part_index))
if is_large:
return InputFileBig(file_id, part_count, file_name)
else:
return InputFile(file_id, part_count, file_name,
md5_checksum=hash_md5.hexdigest())
# endregion
# region Downloading media
def download_file(self,
input_location,
file,
part_size_kb=None,
file_size=None,
progress_callback=None):
"""Downloads the given InputFileLocation to file (a stream or str).
If 'progress_callback' is not None, it should be a function that
takes two parameters, (bytes_downloaded, total_bytes). Note that
'total_bytes' simply equals 'file_size', and may be None.
"""
if not part_size_kb:
if not file_size:
part_size_kb = 64 # Reasonable default
else:
part_size_kb = get_appropriated_part_size(file_size)
part_size = int(part_size_kb * 1024)
# https://core.telegram.org/api/files says:
# > part_size % 1024 = 0 (divisible by 1KB)
#
# But https://core.telegram.org/cdn (more recent) says:
# > limit must be divisible by 4096 bytes
# So we just stick to the 4096 limit.
if part_size % 4096 != 0:
raise ValueError('The part size must be evenly divisible by 4096.')
if isinstance(file, str):
# Ensure that we'll be able to download the media
utils.ensure_parent_dir_exists(file)
f = open(file, 'wb')
else:
f = file
# The used client will change if FileMigrateError occurs
client = self
cdn_decrypter = None
__log__.info('Downloading file in chunks of %d bytes', part_size)
try:
offset = 0
while True:
try:
if cdn_decrypter:
result = cdn_decrypter.get_file()
else:
result = client(GetFileRequest(
input_location, offset, part_size
))
if isinstance(result, FileCdnRedirect):
__log__.info('File lives in a CDN')
cdn_decrypter, result = \
CdnDecrypter.prepare_decrypter(
client, self._get_cdn_client(result), result
)
except FileMigrateError as e:
__log__.info('File lives in another DC')
client = self._get_exported_client(e.new_dc)
continue
offset += part_size
# If we have received no data (0 bytes), the file is over
# So there is nothing left to download and write
if not result.bytes:
# Return some extra information, unless it's a CDN file
return getattr(result, 'type', '')
f.write(result.bytes)
__log__.debug('Saved %d more bytes', len(result.bytes))
if progress_callback:
progress_callback(f.tell(), file_size)
finally:
if client != self:
client.disconnect()
if cdn_decrypter:
try:
cdn_decrypter.client.disconnect()
except:
pass
if isinstance(file, str):
f.close()
# endregion
# region Updates handling
def sync_updates(self):

View File

@ -1,4 +1,6 @@
import hashlib
import itertools
import logging
import os
import sys
import time
@ -6,6 +8,14 @@ from collections import OrderedDict, UserList
from datetime import datetime, timedelta
from mimetypes import guess_type
from io import BytesIO
from telethon.crypto import CdnDecrypter
from telethon.tl.functions.upload import (
SaveBigFilePartRequest, SaveFilePartRequest, GetFileRequest
)
from telethon.tl.types.upload import FileCdnRedirect
try:
import socks
except ImportError:
@ -16,7 +26,8 @@ from . import helpers, utils
from .errors import (
RPCError, UnauthorizedError, PhoneCodeEmptyError, PhoneCodeExpiredError,
PhoneCodeHashEmptyError, PhoneCodeInvalidError, LocationInvalidError,
SessionPasswordNeededError, FilePartMissingError)
SessionPasswordNeededError, FilePartMissingError, FileMigrateError
)
from .network import ConnectionMode
from .tl import TLObject
from .tl.custom import Draft, Dialog
@ -55,11 +66,13 @@ from .tl.types import (
UpdateNewChannelMessage, UpdateNewMessage, UpdateShortSentMessage,
PeerUser, InputPeerUser, InputPeerChat, InputPeerChannel, MessageEmpty,
ChatInvite, ChatInviteAlready, PeerChannel, Photo, InputPeerSelf,
InputSingleMedia, InputMediaPhoto, InputPhoto
InputSingleMedia, InputMediaPhoto, InputPhoto, InputFile, InputFileBig
)
from .tl.types.messages import DialogsSlice
from .extensions import markdown
__log__ = logging.getLogger(__name__)
class TelegramClient(TelegramBareClient):
"""
@ -1011,6 +1024,130 @@ class TelegramClient(TelegramBareClient):
progress_callback=progress_callback, reply_to=reply_to
)
def upload_file(self,
file,
part_size_kb=None,
file_name=None,
allow_cache=True,
progress_callback=None):
"""
Uploads the specified file and returns a handle (an instance of
InputFile or InputFileBig, as required) which can be later used
before it expires (they are usable during less than a day).
Uploading a file will simply return a "handle" to the file stored
remotely in the Telegram servers, which can be later used on. This
will **not** upload the file to your own chat or any chat at all.
Args:
file (:obj:`str` | :obj:`bytes` | :obj:`file`):
The path of the file, byte array, or stream that will be sent.
Note that if a byte array or a stream is given, a filename
or its type won't be inferred, and it will be sent as an
"unnamed application/octet-stream".
Subsequent calls with the very same file will result in
immediate uploads, unless ``.clear_file_cache()`` is called.
part_size_kb (:obj:`int`, optional):
Chunk size when uploading files. The larger, the less
requests will be made (up to 512KB maximum).
file_name (:obj:`str`, optional):
The file name which will be used on the resulting InputFile.
If not specified, the name will be taken from the ``file``
and if this is not a ``str``, it will be ``"unnamed"``.
allow_cache (:obj:`bool`, optional):
Whether to allow reusing the file from cache or not. Unused.
progress_callback (:obj:`callable`, optional):
A callback function accepting two parameters:
``(sent bytes, total)``.
Returns:
The InputFile (or InputFileBig if >10MB).
"""
if isinstance(file, (InputFile, InputFileBig)):
return file # Already uploaded
if isinstance(file, str):
file_size = os.path.getsize(file)
elif isinstance(file, bytes):
file_size = len(file)
else:
file = file.read()
file_size = len(file)
# File will now either be a string or bytes
if not part_size_kb:
part_size_kb = utils.get_appropriated_part_size(file_size)
if part_size_kb > 512:
raise ValueError('The part size must be less or equal to 512KB')
part_size = int(part_size_kb * 1024)
if part_size % 1024 != 0:
raise ValueError(
'The part size must be evenly divisible by 1024')
# Set a default file name if None was specified
file_id = helpers.generate_random_long()
if not file_name:
if isinstance(file, str):
file_name = os.path.basename(file)
else:
file_name = str(file_id)
# Determine whether the file is too big (over 10MB) or not
# Telegram does make a distinction between smaller or larger files
is_large = file_size > 10 * 1024 * 1024
if not is_large:
# Calculate the MD5 hash before anything else.
# As this needs to be done always for small files,
# might as well do it before anything else and
# check the cache.
if isinstance(file, str):
with open(file, 'rb') as stream:
file = stream.read()
hash_md5 = hashlib.md5(file)
else:
hash_md5 = None
part_count = (file_size + part_size - 1) // part_size
__log__.info('Uploading file of %d bytes in %d chunks of %d',
file_size, part_count, part_size)
with open(file, 'rb') if isinstance(file, str) else BytesIO(file) \
as stream:
for part_index in range(part_count):
# Read the file by in chunks of size part_size
part = stream.read(part_size)
# The SavePartRequest is different depending on whether
# the file is too large or not (over or less than 10MB)
if is_large:
request = SaveBigFilePartRequest(file_id, part_index,
part_count, part)
else:
request = SaveFilePartRequest(file_id, part_index, part)
result = self(request)
if result:
__log__.debug('Uploaded %d/%d', part_index + 1,
part_count)
if progress_callback:
progress_callback(stream.tell(), file_size)
else:
raise RuntimeError(
'Failed to upload file part {}.'.format(part_index))
if is_large:
return InputFileBig(file_id, part_count, file_name)
else:
return InputFile(file_id, part_count, file_name,
md5_checksum=hash_md5.hexdigest())
# endregion
# region Downloading media requests
@ -1292,6 +1429,113 @@ class TelegramClient(TelegramBareClient):
return result
i += 1
def download_file(self,
input_location,
file,
part_size_kb=None,
file_size=None,
progress_callback=None):
"""
Downloads the given input location to a file.
Args:
input_location (:obj:`InputFileLocation`):
The file location from which the file will be downloaded.
file (:obj:`str` | :obj:`file`, optional):
The output file path, directory, or stream-like object.
If the path exists and is a file, it will be overwritten.
part_size_kb (:obj:`int`, optional):
Chunk size when downloading files. The larger, the less
requests will be made (up to 512KB maximum).
file_size (:obj:`int`, optional):
The file size that is about to be downloaded, if known.
Only used if ``progress_callback`` is specified.
progress_callback (:obj:`callable`, optional):
A callback function accepting two parameters:
``(downloaded bytes, total)``. Note that the
``total`` is the provided ``file_size``.
"""
if not part_size_kb:
if not file_size:
part_size_kb = 64 # Reasonable default
else:
part_size_kb = utils.get_appropriated_part_size(file_size)
part_size = int(part_size_kb * 1024)
# https://core.telegram.org/api/files says:
# > part_size % 1024 = 0 (divisible by 1KB)
#
# But https://core.telegram.org/cdn (more recent) says:
# > limit must be divisible by 4096 bytes
# So we just stick to the 4096 limit.
if part_size % 4096 != 0:
raise ValueError(
'The part size must be evenly divisible by 4096.')
if isinstance(file, str):
# Ensure that we'll be able to download the media
helpers.ensure_parent_dir_exists(file)
f = open(file, 'wb')
else:
f = file
# The used client will change if FileMigrateError occurs
client = self
cdn_decrypter = None
__log__.info('Downloading file in chunks of %d bytes', part_size)
try:
offset = 0
while True:
try:
if cdn_decrypter:
result = cdn_decrypter.get_file()
else:
result = client(GetFileRequest(
input_location, offset, part_size
))
if isinstance(result, FileCdnRedirect):
__log__.info('File lives in a CDN')
cdn_decrypter, result = \
CdnDecrypter.prepare_decrypter(
client, self._get_cdn_client(result),
result
)
except FileMigrateError as e:
__log__.info('File lives in another DC')
client = self._get_exported_client(e.new_dc)
continue
offset += part_size
# If we have received no data (0 bytes), the file is over
# So there is nothing left to download and write
if not result.bytes:
# Return some extra information, unless it's a CDN file
return getattr(result, 'type', '')
f.write(result.bytes)
__log__.debug('Saved %d more bytes', len(result.bytes))
if progress_callback:
progress_callback(f.tell(), file_size)
finally:
if client != self:
client.disconnect()
if cdn_decrypter:
try:
cdn_decrypter.client.disconnect()
except:
pass
if isinstance(file, str):
f.close()
# endregion
# endregion