From 55efb2b104f8e1d827a7e947983938df173a77b9 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Thu, 18 Jan 2018 09:52:39 +0100 Subject: [PATCH 1/5] Use a different schema for file cache which actually persists Caching the inputFile values would not persist accross several days so the cache was nearly unnecessary. Saving the id/hash of the actual inputMedia sent is a much better/persistent idea. --- telethon/telegram_bare_client.py | 10 ---- telethon/tl/session.py | 84 ++++++++++++++++++++------------ 2 files changed, 53 insertions(+), 41 deletions(-) diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index d2e84ee6..af86c0f1 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -633,13 +633,6 @@ class TelegramBareClient: with open(file, 'rb') as stream: file = stream.read() hash_md5 = md5(file) - tuple_ = self.session.get_file(hash_md5.digest(), file_size) - if tuple_ and allow_cache: - __log__.info('File was already cached, not uploading again') - return InputFile(name=file_name, - md5_checksum=tuple_[0], id=tuple_[2], parts=tuple_[3]) - elif tuple_ and not allow_cache: - self.session.clear_file(hash_md5.digest(), file_size) else: hash_md5 = None @@ -673,9 +666,6 @@ class TelegramBareClient: if is_large: return InputFileBig(file_id, part_count, file_name) else: - self.session.cache_file( - hash_md5.digest(), file_size, file_id, part_count) - return InputFile(file_id, part_count, file_name, md5_checksum=hash_md5.hexdigest()) diff --git a/telethon/tl/session.py b/telethon/tl/session.py index 1dbf99c5..5d89a5f7 100644 --- a/telethon/tl/session.py +++ b/telethon/tl/session.py @@ -5,6 +5,7 @@ import sqlite3 import struct import time from base64 import b64decode +from enum import Enum from os.path import isfile as file_exists from threading import Lock @@ -12,11 +13,26 @@ from .. import utils from ..tl import TLObject from ..tl.types import ( PeerUser, PeerChat, PeerChannel, - InputPeerUser, InputPeerChat, InputPeerChannel + InputPeerUser, InputPeerChat, InputPeerChannel, + InputPhoto, InputDocument ) EXTENSION = '.session' -CURRENT_VERSION = 2 # database version +CURRENT_VERSION = 3 # database version + + +class _SentFileType(Enum): + DOCUMENT = 0 + PHOTO = 1 + + @staticmethod + def from_type(cls): + if cls == InputDocument: + return _SentFileType.DOCUMENT + elif cls == InputPhoto: + return _SentFileType.PHOTO + else: + raise ValueError('The cls must be either InputDocument/InputPhoto') class Session: @@ -130,9 +146,10 @@ class Session: """sent_files ( md5_digest blob, file_size integer, - file_id integer, - part_count integer, - primary key(md5_digest, file_size) + type integer, + id integer, + hash integer, + primary key(md5_digest, file_size, type) )""" ) c.execute("insert into version values (?)", (CURRENT_VERSION,)) @@ -171,18 +188,22 @@ class Session: def _upgrade_database(self, old): c = self._conn.cursor() - if old == 1: - self._create_table(c,"""sent_files ( - md5_digest blob, - file_size integer, - file_id integer, - part_count integer, - primary key(md5_digest, file_size) - )""") - old = 2 + # old == 1 doesn't have the old sent_files so no need to drop + if old == 2: + # Old cache from old sent_files lasts then a day anyway, drop + c.execute('drop table sent_files') + self._create_table(c, """sent_files ( + md5_digest blob, + file_size integer, + type integer, + id integer, + hash integer, + primary key(md5_digest, file_size, type) + )""") c.close() - def _create_table(self, c, *definitions): + @staticmethod + def _create_table(c, *definitions): """ Creates a table given its definition 'name (columns). If the sqlite version is >= 3.8.2, it will use "without rowid". @@ -420,24 +441,25 @@ class Session: # File processing - def get_file(self, md5_digest, file_size): - return self._conn.execute( - 'select * from sent_files ' - 'where md5_digest = ? and file_size = ?', (md5_digest, file_size) + def get_file(self, md5_digest, file_size, cls): + tuple_ = self._conn.execute( + 'select id, hash from sent_files ' + 'where md5_digest = ? and file_size = ? and type = ?', + (md5_digest, file_size, _SentFileType.from_type(cls)) ).fetchone() + if tuple_: + # Both allowed classes have (id, access_hash) as parameters + return cls(tuple_[0], tuple_[1]) + + def cache_file(self, md5_digest, file_size, instance): + if not isinstance(instance, (InputDocument, InputPhoto)): + raise TypeError('Cannot cache %s instance' % type(instance)) - def cache_file(self, md5_digest, file_size, file_id, part_count): with self._db_lock: self._conn.execute( - 'insert into sent_files values (?,?,?,?)', - (md5_digest, file_size, file_id, part_count) - ) - self.save() - - def clear_file(self, md5_digest, file_size): - with self._db_lock: - self._conn.execute( - 'delete from sent_files where ' - 'md5_digest = ? and file_size = ?', (md5_digest, file_size) - ) + 'insert into sent_files values (?,?,?,?,?)', ( + md5_digest, file_size, + _SentFileType.from_type(type(instance)), + instance.id, instance.access_hash + )) self.save() From 1a3feec481f33035f356971ef1f8a7f7cc9d0d48 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Thu, 18 Jan 2018 13:55:03 +0100 Subject: [PATCH 2/5] Move upload/download file methods to the TelegramClient --- telethon/telegram_bare_client.py | 217 +-------------------------- telethon/telegram_client.py | 248 ++++++++++++++++++++++++++++++- 2 files changed, 251 insertions(+), 214 deletions(-) diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index af86c0f1..9684a034 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -3,19 +3,16 @@ import os import threading import warnings from datetime import timedelta, datetime -from hashlib import md5 -from io import BytesIO from signal import signal, SIGINT, SIGTERM, SIGABRT from threading import Lock from time import sleep -from . import helpers as utils, version -from .crypto import rsa, CdnDecrypter +from . import version +from .crypto import rsa from .errors import ( - RPCError, BrokenAuthKeyError, ServerError, - FloodWaitError, FloodTestPhoneWaitError, FileMigrateError, - TypeNotFoundError, UnauthorizedError, PhoneMigrateError, - NetworkMigrateError, UserMigrateError + RPCError, BrokenAuthKeyError, ServerError, FloodWaitError, + FloodTestPhoneWaitError, TypeNotFoundError, UnauthorizedError, + PhoneMigrateError, NetworkMigrateError, UserMigrateError ) from .network import authenticator, MtProtoSender, Connection, ConnectionMode from .tl import TLObject, Session @@ -30,15 +27,8 @@ from .tl.functions.help import ( GetCdnConfigRequest, GetConfigRequest ) from .tl.functions.updates import GetStateRequest -from .tl.functions.upload import ( - GetFileRequest, SaveBigFilePartRequest, SaveFilePartRequest -) -from .tl.types import InputFile, InputFileBig from .tl.types.auth import ExportedAuthorization -from .tl.types.upload import FileCdnRedirect from .update_state import UpdateState -from .utils import get_appropriated_part_size - DEFAULT_DC_ID = 4 DEFAULT_IPV4_IP = '149.154.167.51' @@ -565,203 +555,6 @@ class TelegramBareClient: # endregion - # region Uploading media - - def upload_file(self, - file, - part_size_kb=None, - file_name=None, - allow_cache=True, - progress_callback=None): - """Uploads the specified file and returns a handle (an instance - of InputFile or InputFileBig, as required) which can be later used. - - Uploading a file will simply return a "handle" to the file stored - remotely in the Telegram servers, which can be later used on. This - will NOT upload the file to your own chat. - - 'file' may be either a file path, a byte array, or a stream. - Note that if the file is a stream it will need to be read - entirely into memory to tell its size first. - - If 'progress_callback' is not None, it should be a function that - takes two parameters, (bytes_uploaded, total_bytes). - - Default values for the optional parameters if left as None are: - part_size_kb = get_appropriated_part_size(file_size) - file_name = os.path.basename(file_path) - """ - if isinstance(file, (InputFile, InputFileBig)): - return file # Already uploaded - - if isinstance(file, str): - file_size = os.path.getsize(file) - elif isinstance(file, bytes): - file_size = len(file) - else: - file = file.read() - file_size = len(file) - - # File will now either be a string or bytes - if not part_size_kb: - part_size_kb = get_appropriated_part_size(file_size) - - if part_size_kb > 512: - raise ValueError('The part size must be less or equal to 512KB') - - part_size = int(part_size_kb * 1024) - if part_size % 1024 != 0: - raise ValueError('The part size must be evenly divisible by 1024') - - # Set a default file name if None was specified - file_id = utils.generate_random_long() - if not file_name: - if isinstance(file, str): - file_name = os.path.basename(file) - else: - file_name = str(file_id) - - # Determine whether the file is too big (over 10MB) or not - # Telegram does make a distinction between smaller or larger files - is_large = file_size > 10 * 1024 * 1024 - if not is_large: - # Calculate the MD5 hash before anything else. - # As this needs to be done always for small files, - # might as well do it before anything else and - # check the cache. - if isinstance(file, str): - with open(file, 'rb') as stream: - file = stream.read() - hash_md5 = md5(file) - else: - hash_md5 = None - - part_count = (file_size + part_size - 1) // part_size - __log__.info('Uploading file of %d bytes in %d chunks of %d', - file_size, part_count, part_size) - - with open(file, 'rb') if isinstance(file, str) else BytesIO(file) \ - as stream: - for part_index in range(part_count): - # Read the file by in chunks of size part_size - part = stream.read(part_size) - - # The SavePartRequest is different depending on whether - # the file is too large or not (over or less than 10MB) - if is_large: - request = SaveBigFilePartRequest(file_id, part_index, - part_count, part) - else: - request = SaveFilePartRequest(file_id, part_index, part) - - result = self(request) - if result: - __log__.debug('Uploaded %d/%d', part_index + 1, part_count) - if progress_callback: - progress_callback(stream.tell(), file_size) - else: - raise RuntimeError( - 'Failed to upload file part {}.'.format(part_index)) - - if is_large: - return InputFileBig(file_id, part_count, file_name) - else: - return InputFile(file_id, part_count, file_name, - md5_checksum=hash_md5.hexdigest()) - - # endregion - - # region Downloading media - - def download_file(self, - input_location, - file, - part_size_kb=None, - file_size=None, - progress_callback=None): - """Downloads the given InputFileLocation to file (a stream or str). - - If 'progress_callback' is not None, it should be a function that - takes two parameters, (bytes_downloaded, total_bytes). Note that - 'total_bytes' simply equals 'file_size', and may be None. - """ - if not part_size_kb: - if not file_size: - part_size_kb = 64 # Reasonable default - else: - part_size_kb = get_appropriated_part_size(file_size) - - part_size = int(part_size_kb * 1024) - # https://core.telegram.org/api/files says: - # > part_size % 1024 = 0 (divisible by 1KB) - # - # But https://core.telegram.org/cdn (more recent) says: - # > limit must be divisible by 4096 bytes - # So we just stick to the 4096 limit. - if part_size % 4096 != 0: - raise ValueError('The part size must be evenly divisible by 4096.') - - if isinstance(file, str): - # Ensure that we'll be able to download the media - utils.ensure_parent_dir_exists(file) - f = open(file, 'wb') - else: - f = file - - # The used client will change if FileMigrateError occurs - client = self - cdn_decrypter = None - - __log__.info('Downloading file in chunks of %d bytes', part_size) - try: - offset = 0 - while True: - try: - if cdn_decrypter: - result = cdn_decrypter.get_file() - else: - result = client(GetFileRequest( - input_location, offset, part_size - )) - - if isinstance(result, FileCdnRedirect): - __log__.info('File lives in a CDN') - cdn_decrypter, result = \ - CdnDecrypter.prepare_decrypter( - client, self._get_cdn_client(result), result - ) - - except FileMigrateError as e: - __log__.info('File lives in another DC') - client = self._get_exported_client(e.new_dc) - continue - - offset += part_size - - # If we have received no data (0 bytes), the file is over - # So there is nothing left to download and write - if not result.bytes: - # Return some extra information, unless it's a CDN file - return getattr(result, 'type', '') - - f.write(result.bytes) - __log__.debug('Saved %d more bytes', len(result.bytes)) - if progress_callback: - progress_callback(f.tell(), file_size) - finally: - if client != self: - client.disconnect() - - if cdn_decrypter: - try: - cdn_decrypter.client.disconnect() - except: - pass - if isinstance(file, str): - f.close() - - # endregion - # region Updates handling def sync_updates(self): diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index 84c9beea..6e69e3f6 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -1,4 +1,6 @@ +import hashlib import itertools +import logging import os import sys import time @@ -6,6 +8,14 @@ from collections import OrderedDict, UserList from datetime import datetime, timedelta from mimetypes import guess_type +from io import BytesIO + +from telethon.crypto import CdnDecrypter +from telethon.tl.functions.upload import ( + SaveBigFilePartRequest, SaveFilePartRequest, GetFileRequest +) +from telethon.tl.types.upload import FileCdnRedirect + try: import socks except ImportError: @@ -16,7 +26,8 @@ from . import helpers, utils from .errors import ( RPCError, UnauthorizedError, PhoneCodeEmptyError, PhoneCodeExpiredError, PhoneCodeHashEmptyError, PhoneCodeInvalidError, LocationInvalidError, - SessionPasswordNeededError, FilePartMissingError) + SessionPasswordNeededError, FilePartMissingError, FileMigrateError +) from .network import ConnectionMode from .tl import TLObject from .tl.custom import Draft, Dialog @@ -55,11 +66,13 @@ from .tl.types import ( UpdateNewChannelMessage, UpdateNewMessage, UpdateShortSentMessage, PeerUser, InputPeerUser, InputPeerChat, InputPeerChannel, MessageEmpty, ChatInvite, ChatInviteAlready, PeerChannel, Photo, InputPeerSelf, - InputSingleMedia, InputMediaPhoto, InputPhoto + InputSingleMedia, InputMediaPhoto, InputPhoto, InputFile, InputFileBig ) from .tl.types.messages import DialogsSlice from .extensions import markdown +__log__ = logging.getLogger(__name__) + class TelegramClient(TelegramBareClient): """ @@ -1011,6 +1024,130 @@ class TelegramClient(TelegramBareClient): progress_callback=progress_callback, reply_to=reply_to ) + def upload_file(self, + file, + part_size_kb=None, + file_name=None, + allow_cache=True, + progress_callback=None): + """ + Uploads the specified file and returns a handle (an instance of + InputFile or InputFileBig, as required) which can be later used + before it expires (they are usable during less than a day). + + Uploading a file will simply return a "handle" to the file stored + remotely in the Telegram servers, which can be later used on. This + will **not** upload the file to your own chat or any chat at all. + + Args: + file (:obj:`str` | :obj:`bytes` | :obj:`file`): + The path of the file, byte array, or stream that will be sent. + Note that if a byte array or a stream is given, a filename + or its type won't be inferred, and it will be sent as an + "unnamed application/octet-stream". + + Subsequent calls with the very same file will result in + immediate uploads, unless ``.clear_file_cache()`` is called. + + part_size_kb (:obj:`int`, optional): + Chunk size when uploading files. The larger, the less + requests will be made (up to 512KB maximum). + + file_name (:obj:`str`, optional): + The file name which will be used on the resulting InputFile. + If not specified, the name will be taken from the ``file`` + and if this is not a ``str``, it will be ``"unnamed"``. + + allow_cache (:obj:`bool`, optional): + Whether to allow reusing the file from cache or not. Unused. + + progress_callback (:obj:`callable`, optional): + A callback function accepting two parameters: + ``(sent bytes, total)``. + + Returns: + The InputFile (or InputFileBig if >10MB). + """ + if isinstance(file, (InputFile, InputFileBig)): + return file # Already uploaded + + if isinstance(file, str): + file_size = os.path.getsize(file) + elif isinstance(file, bytes): + file_size = len(file) + else: + file = file.read() + file_size = len(file) + + # File will now either be a string or bytes + if not part_size_kb: + part_size_kb = utils.get_appropriated_part_size(file_size) + + if part_size_kb > 512: + raise ValueError('The part size must be less or equal to 512KB') + + part_size = int(part_size_kb * 1024) + if part_size % 1024 != 0: + raise ValueError( + 'The part size must be evenly divisible by 1024') + + # Set a default file name if None was specified + file_id = helpers.generate_random_long() + if not file_name: + if isinstance(file, str): + file_name = os.path.basename(file) + else: + file_name = str(file_id) + + # Determine whether the file is too big (over 10MB) or not + # Telegram does make a distinction between smaller or larger files + is_large = file_size > 10 * 1024 * 1024 + if not is_large: + # Calculate the MD5 hash before anything else. + # As this needs to be done always for small files, + # might as well do it before anything else and + # check the cache. + if isinstance(file, str): + with open(file, 'rb') as stream: + file = stream.read() + hash_md5 = hashlib.md5(file) + else: + hash_md5 = None + + part_count = (file_size + part_size - 1) // part_size + __log__.info('Uploading file of %d bytes in %d chunks of %d', + file_size, part_count, part_size) + + with open(file, 'rb') if isinstance(file, str) else BytesIO(file) \ + as stream: + for part_index in range(part_count): + # Read the file by in chunks of size part_size + part = stream.read(part_size) + + # The SavePartRequest is different depending on whether + # the file is too large or not (over or less than 10MB) + if is_large: + request = SaveBigFilePartRequest(file_id, part_index, + part_count, part) + else: + request = SaveFilePartRequest(file_id, part_index, part) + + result = self(request) + if result: + __log__.debug('Uploaded %d/%d', part_index + 1, + part_count) + if progress_callback: + progress_callback(stream.tell(), file_size) + else: + raise RuntimeError( + 'Failed to upload file part {}.'.format(part_index)) + + if is_large: + return InputFileBig(file_id, part_count, file_name) + else: + return InputFile(file_id, part_count, file_name, + md5_checksum=hash_md5.hexdigest()) + # endregion # region Downloading media requests @@ -1292,6 +1429,113 @@ class TelegramClient(TelegramBareClient): return result i += 1 + def download_file(self, + input_location, + file, + part_size_kb=None, + file_size=None, + progress_callback=None): + """ + Downloads the given input location to a file. + + Args: + input_location (:obj:`InputFileLocation`): + The file location from which the file will be downloaded. + + file (:obj:`str` | :obj:`file`, optional): + The output file path, directory, or stream-like object. + If the path exists and is a file, it will be overwritten. + + part_size_kb (:obj:`int`, optional): + Chunk size when downloading files. The larger, the less + requests will be made (up to 512KB maximum). + + file_size (:obj:`int`, optional): + The file size that is about to be downloaded, if known. + Only used if ``progress_callback`` is specified. + + progress_callback (:obj:`callable`, optional): + A callback function accepting two parameters: + ``(downloaded bytes, total)``. Note that the + ``total`` is the provided ``file_size``. + """ + if not part_size_kb: + if not file_size: + part_size_kb = 64 # Reasonable default + else: + part_size_kb = utils.get_appropriated_part_size(file_size) + + part_size = int(part_size_kb * 1024) + # https://core.telegram.org/api/files says: + # > part_size % 1024 = 0 (divisible by 1KB) + # + # But https://core.telegram.org/cdn (more recent) says: + # > limit must be divisible by 4096 bytes + # So we just stick to the 4096 limit. + if part_size % 4096 != 0: + raise ValueError( + 'The part size must be evenly divisible by 4096.') + + if isinstance(file, str): + # Ensure that we'll be able to download the media + helpers.ensure_parent_dir_exists(file) + f = open(file, 'wb') + else: + f = file + + # The used client will change if FileMigrateError occurs + client = self + cdn_decrypter = None + + __log__.info('Downloading file in chunks of %d bytes', part_size) + try: + offset = 0 + while True: + try: + if cdn_decrypter: + result = cdn_decrypter.get_file() + else: + result = client(GetFileRequest( + input_location, offset, part_size + )) + + if isinstance(result, FileCdnRedirect): + __log__.info('File lives in a CDN') + cdn_decrypter, result = \ + CdnDecrypter.prepare_decrypter( + client, self._get_cdn_client(result), + result + ) + + except FileMigrateError as e: + __log__.info('File lives in another DC') + client = self._get_exported_client(e.new_dc) + continue + + offset += part_size + + # If we have received no data (0 bytes), the file is over + # So there is nothing left to download and write + if not result.bytes: + # Return some extra information, unless it's a CDN file + return getattr(result, 'type', '') + + f.write(result.bytes) + __log__.debug('Saved %d more bytes', len(result.bytes)) + if progress_callback: + progress_callback(f.tell(), file_size) + finally: + if client != self: + client.disconnect() + + if cdn_decrypter: + try: + cdn_decrypter.client.disconnect() + except: + pass + if isinstance(file, str): + f.close() + # endregion # endregion From 7e707dbbd991ba22cdf70a1346683837ba970a2f Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Thu, 18 Jan 2018 19:35:46 +0100 Subject: [PATCH 3/5] Fix using enum on sqlite instead its value --- telethon/tl/session.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/telethon/tl/session.py b/telethon/tl/session.py index 5d89a5f7..e2c653d4 100644 --- a/telethon/tl/session.py +++ b/telethon/tl/session.py @@ -445,7 +445,7 @@ class Session: tuple_ = self._conn.execute( 'select id, hash from sent_files ' 'where md5_digest = ? and file_size = ? and type = ?', - (md5_digest, file_size, _SentFileType.from_type(cls)) + (md5_digest, file_size, _SentFileType.from_type(cls).value) ).fetchone() if tuple_: # Both allowed classes have (id, access_hash) as parameters @@ -459,7 +459,7 @@ class Session: self._conn.execute( 'insert into sent_files values (?,?,?,?,?)', ( md5_digest, file_size, - _SentFileType.from_type(type(instance)), + _SentFileType.from_type(type(instance)).value, instance.id, instance.access_hash )) self.save() From 0e4611a593dd805c05a4420aced410166504c57c Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Thu, 18 Jan 2018 19:36:47 +0100 Subject: [PATCH 4/5] Properly implement InputPhoto/InputDocument caching Since uploading a file is done on the TelegramClient, and the InputFiles are only valid for a short period of time, it only makes sense to cache the sent media instead (which should not expire). The problem is the MD5 is only needed when uploading the file. The solution is to allow this method to check for the wanted cache, and if available, return an instance of that, so to preserve the flexibility of both options (always InputFile, or the cached InputPhoto/InputDocument) instead reuploading. --- telethon/telegram_client.py | 146 +++++++++++++++++++++--------------- telethon/tl/session.py | 2 +- 2 files changed, 85 insertions(+), 63 deletions(-) diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index 6e69e3f6..6e249cc4 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -66,7 +66,8 @@ from .tl.types import ( UpdateNewChannelMessage, UpdateNewMessage, UpdateShortSentMessage, PeerUser, InputPeerUser, InputPeerChat, InputPeerChannel, MessageEmpty, ChatInvite, ChatInviteAlready, PeerChannel, Photo, InputPeerSelf, - InputSingleMedia, InputMediaPhoto, InputPhoto, InputFile, InputFileBig + InputSingleMedia, InputMediaPhoto, InputPhoto, InputFile, InputFileBig, + InputDocument, InputMediaDocument ) from .tl.types.messages import DialogsSlice from .extensions import markdown @@ -875,7 +876,9 @@ class TelegramClient(TelegramBareClient): allow_cache (:obj:`bool`, optional): Whether to allow using the cached version stored in the - database or not. Defaults to ``True`` to avoid reuploads. + database or not. Defaults to ``True`` to avoid re-uploads. + Must be ``False`` if you wish to use different attributes + or thumb than those that were used when the file was cached. Kwargs: If "is_voice_note" in kwargs, despite its value, and the file is @@ -892,8 +895,7 @@ class TelegramClient(TelegramBareClient): if all(utils.is_image(x) for x in file): return self._send_album( entity, file, caption=caption, - progress_callback=progress_callback, reply_to=reply_to, - allow_cache=allow_cache + progress_callback=progress_callback, reply_to=reply_to ) # Not all are images, so send all the files one by one return [ @@ -905,10 +907,20 @@ class TelegramClient(TelegramBareClient): ) for x in file ] + as_image = utils.is_image(file) and not force_document + use_cache = InputPhoto if as_image else InputDocument file_handle = self.upload_file( - file, progress_callback=progress_callback, allow_cache=allow_cache) + file, progress_callback=progress_callback, + use_cache=use_cache if allow_cache else None + ) - if utils.is_image(file) and not force_document: + if isinstance(file_handle, use_cache): + # File was cached, so an instance of use_cache was returned + if as_image: + media = InputMediaPhoto(file_handle, caption) + else: + media = InputMediaDocument(file_handle, caption) + elif as_image: media = InputMediaUploadedPhoto(file_handle, caption) else: mime_type = None @@ -964,19 +976,19 @@ class TelegramClient(TelegramBareClient): media=media, reply_to_msg_id=self._get_reply_to(reply_to) ) - try: - return self._get_response_message(request, self(request)) - except FilePartMissingError: - # After a while, cached files are invalidated and this - # error is raised. The file needs to be uploaded again. - if not allow_cache: - raise - return self.send_file( - entity, file, allow_cache=False, - caption=caption, force_document=force_document, - progress_callback=progress_callback, reply_to=reply_to, - attributes=attributes, thumb=thumb, **kwargs - ) + msg = self._get_response_message(request, self(request)) + if msg and isinstance(file_handle, InputFile): + # There was a response message and we didn't use cached + # version, so cache whatever we just sent to the database. + # Note that the InputFile was modified to have md5/size. + md5, size = file_handle.md5, file_handle.size + if as_image: + to_cache = utils.get_input_photo(msg.media.photo) + else: + to_cache = utils.get_input_document(msg.media.document) + self.session.cache_file(md5, size, to_cache) + + return msg def send_voice_note(self, entity, file, caption='', progress_callback=None, reply_to=None): @@ -987,48 +999,44 @@ class TelegramClient(TelegramBareClient): is_voice_note=()) # empty tuple is enough def _send_album(self, entity, files, caption='', - progress_callback=None, reply_to=None, - allow_cache=True): + progress_callback=None, reply_to=None): """Specialized version of .send_file for albums""" + # We don't care if the user wants to avoid cache, we will use it + # anyway. Why? The cached version will be exactly the same thing + # we need to produce right now to send albums (uploadMedia), and + # cache only makes a difference for documents where the user may + # want the attributes used on them to change. Caption's ignored. entity = self.get_input_entity(entity) reply_to = self._get_reply_to(reply_to) - try: - # Need to upload the media first - media = [ - self(UploadMediaRequest(entity, InputMediaUploadedPhoto( - self.upload_file(file, allow_cache=allow_cache), - caption=caption - ))) - for file in files - ] - # Now we can construct the multi-media request - result = self(SendMultiMediaRequest( - entity, reply_to_msg_id=reply_to, multi_media=[ - InputSingleMedia(InputMediaPhoto( - InputPhoto(m.photo.id, m.photo.access_hash), - caption=caption - )) - for m in media - ] - )) - return [ - self._get_response_message(update.id, result) - for update in result.updates - if isinstance(update, UpdateMessageID) - ] - except FilePartMissingError: - if not allow_cache: - raise - return self._send_album( - entity, files, allow_cache=False, caption=caption, - progress_callback=progress_callback, reply_to=reply_to - ) + + # Need to upload the media first, but only if they're not cached yet + media = [] + for file in files: + # fh will either be InputPhoto or a modified InputFile + fh = self.upload_file(file, use_cache=InputPhoto) + if not isinstance(fh, InputPhoto): + input_photo = utils.get_input_photo(self(UploadMediaRequest( + entity, media=InputMediaUploadedPhoto(fh, caption) + )).photo) + self.session.cache_file(fh.md5, fh.size, input_photo) + fh = input_photo + media.append(InputSingleMedia(InputMediaPhoto(fh, caption))) + + # Now we can construct the multi-media request + result = self(SendMultiMediaRequest( + entity, reply_to_msg_id=reply_to, multi_media=media + )) + return [ + self._get_response_message(update.id, result) + for update in result.updates + if isinstance(update, UpdateMessageID) + ] def upload_file(self, file, part_size_kb=None, file_name=None, - allow_cache=True, + use_cache=None, progress_callback=None): """ Uploads the specified file and returns a handle (an instance of @@ -1058,15 +1066,20 @@ class TelegramClient(TelegramBareClient): If not specified, the name will be taken from the ``file`` and if this is not a ``str``, it will be ``"unnamed"``. - allow_cache (:obj:`bool`, optional): - Whether to allow reusing the file from cache or not. Unused. + use_cache (:obj:`type`, optional): + The type of cache to use (currently either ``InputDocument`` + or ``InputPhoto``). If present and the file is small enough + to need the MD5, it will be checked against the database, + and if a match is found, the upload won't be made. Instead, + an instance of type ``use_cache`` will be returned. progress_callback (:obj:`callable`, optional): A callback function accepting two parameters: ``(sent bytes, total)``. Returns: - The InputFile (or InputFileBig if >10MB). + The InputFile (or InputFileBig if >10MB) with two extra + attributes: ``.md5`` (its ``.digest()``) and ``size``. """ if isinstance(file, (InputFile, InputFileBig)): return file # Already uploaded @@ -1102,6 +1115,7 @@ class TelegramClient(TelegramBareClient): # Determine whether the file is too big (over 10MB) or not # Telegram does make a distinction between smaller or larger files is_large = file_size > 10 * 1024 * 1024 + hash_md5 = hashlib.md5() if not is_large: # Calculate the MD5 hash before anything else. # As this needs to be done always for small files, @@ -1110,9 +1124,13 @@ class TelegramClient(TelegramBareClient): if isinstance(file, str): with open(file, 'rb') as stream: file = stream.read() - hash_md5 = hashlib.md5(file) - else: - hash_md5 = None + hash_md5.update(file) + if use_cache: + cached = self.session.get_file( + hash_md5.digest(), file_size, cls=use_cache + ) + if cached: + return cached part_count = (file_size + part_size - 1) // part_size __log__.info('Uploading file of %d bytes in %d chunks of %d', @@ -1143,10 +1161,14 @@ class TelegramClient(TelegramBareClient): 'Failed to upload file part {}.'.format(part_index)) if is_large: - return InputFileBig(file_id, part_count, file_name) + result = InputFileBig(file_id, part_count, file_name) else: - return InputFile(file_id, part_count, file_name, - md5_checksum=hash_md5.hexdigest()) + result = InputFile(file_id, part_count, file_name, + md5_checksum=hash_md5.hexdigest()) + + result.md5 = hash_md5.digest() + result.size = file_size + return result # endregion diff --git a/telethon/tl/session.py b/telethon/tl/session.py index e2c653d4..bfed1a79 100644 --- a/telethon/tl/session.py +++ b/telethon/tl/session.py @@ -457,7 +457,7 @@ class Session: with self._db_lock: self._conn.execute( - 'insert into sent_files values (?,?,?,?,?)', ( + 'insert or replace into sent_files values (?,?,?,?,?)', ( md5_digest, file_size, _SentFileType.from_type(type(instance)).value, instance.id, instance.access_hash From b546c022109429d061dd6482fde928721252944e Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Thu, 18 Jan 2018 20:08:05 +0100 Subject: [PATCH 5/5] Return a custom class for sized InputFile instead extra attrs --- telethon/telegram_client.py | 32 +++++++++++--------------- telethon/tl/custom/__init__.py | 1 + telethon/tl/custom/input_sized_file.py | 9 ++++++++ 3 files changed, 24 insertions(+), 18 deletions(-) create mode 100644 telethon/tl/custom/input_sized_file.py diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index 6e249cc4..5c4493f3 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -6,15 +6,15 @@ import sys import time from collections import OrderedDict, UserList from datetime import datetime, timedelta +from io import BytesIO from mimetypes import guess_type -from io import BytesIO - -from telethon.crypto import CdnDecrypter -from telethon.tl.functions.upload import ( - SaveBigFilePartRequest, SaveFilePartRequest, GetFileRequest +from .crypto import CdnDecrypter +from .tl.custom import InputSizedFile +from .tl.functions.upload import ( + SaveBigFilePartRequest, SaveFilePartRequest, GetFileRequest ) -from telethon.tl.types.upload import FileCdnRedirect +from .tl.types.upload import FileCdnRedirect try: import socks @@ -26,7 +26,7 @@ from . import helpers, utils from .errors import ( RPCError, UnauthorizedError, PhoneCodeEmptyError, PhoneCodeExpiredError, PhoneCodeHashEmptyError, PhoneCodeInvalidError, LocationInvalidError, - SessionPasswordNeededError, FilePartMissingError, FileMigrateError + SessionPasswordNeededError, FileMigrateError ) from .network import ConnectionMode from .tl import TLObject @@ -977,10 +977,9 @@ class TelegramClient(TelegramBareClient): reply_to_msg_id=self._get_reply_to(reply_to) ) msg = self._get_response_message(request, self(request)) - if msg and isinstance(file_handle, InputFile): + if msg and isinstance(file_handle, InputSizedFile): # There was a response message and we didn't use cached # version, so cache whatever we just sent to the database. - # Note that the InputFile was modified to have md5/size. md5, size = file_handle.md5, file_handle.size if as_image: to_cache = utils.get_input_photo(msg.media.photo) @@ -1078,8 +1077,8 @@ class TelegramClient(TelegramBareClient): ``(sent bytes, total)``. Returns: - The InputFile (or InputFileBig if >10MB) with two extra - attributes: ``.md5`` (its ``.digest()``) and ``size``. + ``InputFileBig`` if the file size is larger than 10MB, + ``InputSizedFile`` (subclass of ``InputFile``) otherwise. """ if isinstance(file, (InputFile, InputFileBig)): return file # Already uploaded @@ -1161,14 +1160,11 @@ class TelegramClient(TelegramBareClient): 'Failed to upload file part {}.'.format(part_index)) if is_large: - result = InputFileBig(file_id, part_count, file_name) + return InputFileBig(file_id, part_count, file_name) else: - result = InputFile(file_id, part_count, file_name, - md5_checksum=hash_md5.hexdigest()) - - result.md5 = hash_md5.digest() - result.size = file_size - return result + return InputSizedFile( + file_id, part_count, file_name, md5=hash_md5, size=file_size + ) # endregion diff --git a/telethon/tl/custom/__init__.py b/telethon/tl/custom/__init__.py index 5b6bf44d..f74189f6 100644 --- a/telethon/tl/custom/__init__.py +++ b/telethon/tl/custom/__init__.py @@ -1,2 +1,3 @@ from .draft import Draft from .dialog import Dialog +from .input_sized_file import InputSizedFile diff --git a/telethon/tl/custom/input_sized_file.py b/telethon/tl/custom/input_sized_file.py new file mode 100644 index 00000000..fcb743f6 --- /dev/null +++ b/telethon/tl/custom/input_sized_file.py @@ -0,0 +1,9 @@ +from ..types import InputFile + + +class InputSizedFile(InputFile): + """InputFile class with two extra parameters: md5 (digest) and size""" + def __init__(self, id_, parts, name, md5, size): + super().__init__(id_, parts, name, md5.hexdigest()) + self.md5 = md5.digest() + self.size = size