mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2025-08-05 12:40:22 +03:00
Merge branch 'master' into sync
This commit is contained in:
commit
25b220b4bd
|
@ -202,6 +202,7 @@ class DownloadMethods(UserMethods):
|
||||||
|
|
||||||
# The used sender will change if ``FileMigrateError`` occurs
|
# The used sender will change if ``FileMigrateError`` occurs
|
||||||
sender = self._sender
|
sender = self._sender
|
||||||
|
exported = False
|
||||||
input_location = utils.get_input_location(input_location)
|
input_location = utils.get_input_location(input_location)
|
||||||
|
|
||||||
__log__.info('Downloading file in chunks of %d bytes', part_size)
|
__log__.info('Downloading file in chunks of %d bytes', part_size)
|
||||||
|
@ -217,7 +218,8 @@ class DownloadMethods(UserMethods):
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
except errors.FileMigrateError as e:
|
except errors.FileMigrateError as e:
|
||||||
__log__.info('File lives in another DC')
|
__log__.info('File lives in another DC')
|
||||||
sender = self._get_exported_sender(e.new_dc)
|
sender = self._borrow_exported_sender(e.new_dc)
|
||||||
|
exported = True
|
||||||
continue
|
continue
|
||||||
|
|
||||||
offset += part_size
|
offset += part_size
|
||||||
|
@ -233,7 +235,9 @@ class DownloadMethods(UserMethods):
|
||||||
if progress_callback:
|
if progress_callback:
|
||||||
progress_callback(f.tell(), file_size)
|
progress_callback(f.tell(), file_size)
|
||||||
finally:
|
finally:
|
||||||
if sender != self._sender:
|
if exported:
|
||||||
|
self._return_exported_sender(sender)
|
||||||
|
elif sender != self._sender:
|
||||||
sender.disconnect()
|
sender.disconnect()
|
||||||
if isinstance(file, str) or in_memory:
|
if isinstance(file, str) or in_memory:
|
||||||
f.close()
|
f.close()
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
import asyncio
|
import asyncio
|
||||||
import itertools
|
import itertools
|
||||||
import logging
|
import logging
|
||||||
import warnings
|
import time
|
||||||
from collections import UserList
|
from collections import UserList
|
||||||
|
|
||||||
from async_generator import async_generator, yield_
|
from async_generator import async_generator, yield_
|
||||||
|
@ -188,7 +188,7 @@ class MessageMethods(UploadMethods, MessageParseMethods):
|
||||||
last_id = float('inf')
|
last_id = float('inf')
|
||||||
batch_size = min(max(batch_size, 1), 100)
|
batch_size = min(max(batch_size, 1), 100)
|
||||||
while have < limit:
|
while have < limit:
|
||||||
start = asyncio.get_event_loop().time()
|
start = time.time()
|
||||||
# Telegram has a hard limit of 100
|
# Telegram has a hard limit of 100
|
||||||
request.limit = min(limit - have, batch_size)
|
request.limit = min(limit - have, batch_size)
|
||||||
r = self(request)
|
r = self(request)
|
||||||
|
@ -242,9 +242,8 @@ class MessageMethods(UploadMethods, MessageParseMethods):
|
||||||
else:
|
else:
|
||||||
request.max_date = last_message.date
|
request.max_date = last_message.date
|
||||||
|
|
||||||
now = asyncio.get_event_loop().time()
|
|
||||||
asyncio.sleep(
|
asyncio.sleep(
|
||||||
max(wait_time - (now - start), 0), loop=self._loop)
|
max(wait_time - (time.time() - start), 0), loop=self._loop)
|
||||||
|
|
||||||
def get_messages(self, *args, **kwargs):
|
def get_messages(self, *args, **kwargs):
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -67,9 +67,7 @@ class TelegramBaseClient(abc.ABC):
|
||||||
|
|
||||||
timeout (`int` | `float` | `timedelta`, optional):
|
timeout (`int` | `float` | `timedelta`, optional):
|
||||||
The timeout to be used when connecting, sending and receiving
|
The timeout to be used when connecting, sending and receiving
|
||||||
responses from the network. This is **not** the timeout to
|
responses from the network.
|
||||||
be used when ``await``'ing for invoked requests, and you
|
|
||||||
should use ``asyncio.wait`` or ``asyncio.wait_for`` for that.
|
|
||||||
|
|
||||||
request_retries (`int`, optional):
|
request_retries (`int`, optional):
|
||||||
How many times a request should be retried. Request are retried
|
How many times a request should be retried. Request are retried
|
||||||
|
@ -211,9 +209,11 @@ class TelegramBaseClient(abc.ABC):
|
||||||
auto_reconnect_callback=self._handle_auto_reconnect
|
auto_reconnect_callback=self._handle_auto_reconnect
|
||||||
)
|
)
|
||||||
|
|
||||||
# Cache :tl:`ExportedAuthorization` as ``dc_id: MTProtoState``
|
# Cache ``{dc_id: (n, MTProtoSender)}`` for all borrowed senders,
|
||||||
# to easily import them when getting an exported sender.
|
# being ``n`` the amount of borrows a given sender has; once ``n``
|
||||||
self._exported_auths = {}
|
# reaches ``0`` it should be disconnected and removed.
|
||||||
|
self._borrowed_senders = {}
|
||||||
|
self._borrow_sender_lock = threading.Lock()
|
||||||
|
|
||||||
# Save whether the user is authorized here (a.k.a. logged in)
|
# Save whether the user is authorized here (a.k.a. logged in)
|
||||||
self._authorized = None # None = We don't know yet
|
self._authorized = None # None = We don't know yet
|
||||||
|
@ -355,36 +355,65 @@ class TelegramBaseClient(abc.ABC):
|
||||||
and bool(dc.ipv6) == self._use_ipv6 and bool(dc.cdn) == cdn
|
and bool(dc.ipv6) == self._use_ipv6 and bool(dc.cdn) == cdn
|
||||||
)
|
)
|
||||||
|
|
||||||
def _get_exported_sender(self, dc_id):
|
def _create_exported_sender(self, dc_id):
|
||||||
"""
|
"""
|
||||||
Returns a cached `MTProtoSender` for the given `dc_id`, or creates
|
Creates a new exported `MTProtoSender` for the given `dc_id` and
|
||||||
a new one if it doesn't exist yet, and imports a freshly exported
|
returns it. This method should be used by `_borrow_exported_sender`.
|
||||||
authorization key for it to be usable.
|
|
||||||
"""
|
"""
|
||||||
# Thanks badoualy/kotlogram on /telegram/api/DefaultTelegramClient.kt
|
# Thanks badoualy/kotlogram on /telegram/api/DefaultTelegramClient.kt
|
||||||
# for clearly showing how to export the authorization
|
# for clearly showing how to export the authorization
|
||||||
auth = self._exported_auths.get(dc_id)
|
|
||||||
dc = self._get_dc(dc_id)
|
dc = self._get_dc(dc_id)
|
||||||
state = MTProtoState(auth)
|
state = MTProtoState(None)
|
||||||
# Can't reuse self._sender._connection as it has its own seqno.
|
# Can't reuse self._sender._connection as it has its own seqno.
|
||||||
#
|
#
|
||||||
# If one were to do that, Telegram would reset the connection
|
# If one were to do that, Telegram would reset the connection
|
||||||
# with no further clues.
|
# with no further clues.
|
||||||
sender = MTProtoSender(state, self._connection.clone())
|
sender = MTProtoSender(state, self._connection.clone())
|
||||||
sender.connect(dc.ip_address, dc.port)
|
sender.connect(dc.ip_address, dc.port)
|
||||||
if not auth:
|
__log__.info('Exporting authorization for data center %s', dc)
|
||||||
__log__.info('Exporting authorization for data center %s', dc)
|
auth = self(functions.auth.ExportAuthorizationRequest(dc_id))
|
||||||
auth = self(functions.auth.ExportAuthorizationRequest(dc_id))
|
req = self._init_with(functions.auth.ImportAuthorizationRequest(
|
||||||
req = self._init_with(functions.auth.ImportAuthorizationRequest(
|
id=auth.id, bytes=auth.bytes
|
||||||
id=auth.id, bytes=auth.bytes
|
))
|
||||||
))
|
sender.send(req)
|
||||||
sender.send(req)
|
return sender
|
||||||
self._exported_auths[dc_id] = sender.state.auth_key
|
|
||||||
|
def _borrow_exported_sender(self, dc_id):
|
||||||
|
"""
|
||||||
|
Borrows a connected `MTProtoSender` for the given `dc_id`.
|
||||||
|
If it's not cached, creates a new one if it doesn't exist yet,
|
||||||
|
and imports a freshly exported authorization key for it to be usable.
|
||||||
|
|
||||||
|
Once its job is over it should be `_return_exported_sender`.
|
||||||
|
"""
|
||||||
|
with self._borrow_sender_lock:
|
||||||
|
n, sender = self._borrowed_senders.get(dc_id, (0, None))
|
||||||
|
if not sender:
|
||||||
|
sender = self._create_exported_sender(dc_id)
|
||||||
|
sender.dc_id = dc_id
|
||||||
|
|
||||||
|
self._borrowed_senders[dc_id] = (n + 1, sender)
|
||||||
|
|
||||||
return sender
|
return sender
|
||||||
|
|
||||||
|
def _return_exported_sender(self, sender):
|
||||||
|
"""
|
||||||
|
Returns a borrowed exported sender. If all borrows have
|
||||||
|
been returned, the sender is cleanly disconnected.
|
||||||
|
"""
|
||||||
|
with self._borrow_sender_lock:
|
||||||
|
dc_id = sender.dc_id
|
||||||
|
n, _ = self._borrowed_senders[dc_id]
|
||||||
|
n -= 1
|
||||||
|
if n > 0:
|
||||||
|
self._borrowed_senders[dc_id] = (n, sender)
|
||||||
|
else:
|
||||||
|
__log__.info('Disconnecting borrowed sender for DC %d', dc_id)
|
||||||
|
sender.disconnect()
|
||||||
|
del self._borrowed_senders[dc_id]
|
||||||
|
|
||||||
def _get_cdn_client(self, cdn_redirect):
|
def _get_cdn_client(self, cdn_redirect):
|
||||||
"""Similar to ._get_exported_client, but for CDNs"""
|
"""Similar to ._borrow_exported_client, but for CDNs"""
|
||||||
# TODO Implement
|
# TODO Implement
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
session = self._exported_sessions.get(cdn_redirect.dc_id)
|
session = self._exported_sessions.get(cdn_redirect.dc_id)
|
||||||
|
|
|
@ -39,10 +39,15 @@ class MTProtoPlainSender:
|
||||||
raise BrokenAuthKeyError()
|
raise BrokenAuthKeyError()
|
||||||
|
|
||||||
with BinaryReader(body) as reader:
|
with BinaryReader(body) as reader:
|
||||||
assert reader.read_long() == 0 # auth_key_id
|
assert reader.read_long() == 0, 'Bad auth_key_id' # auth_key_id
|
||||||
assert reader.read_long() > msg_id # msg_id
|
|
||||||
assert reader.read_int() # length
|
|
||||||
|
|
||||||
|
assert reader.read_long() != 0, 'Bad msg_id' # msg_id
|
||||||
|
# ^ We should make sure that the read ``msg_id`` is greater
|
||||||
|
# than our own ``msg_id``. However, under some circumstances
|
||||||
|
# (bad system clock/working behind proxies) this seems to not
|
||||||
|
# be the case, which would cause endless assertion errors.
|
||||||
|
|
||||||
|
assert reader.read_int() > 0, 'Bad length' # length
|
||||||
# We could read length bytes and use those in a new reader to read
|
# We could read length bytes and use those in a new reader to read
|
||||||
# the next TLObject without including the padding, but since the
|
# the next TLObject without including the padding, but since the
|
||||||
# reader isn't used for anything else after this, it's unnecessary.
|
# reader isn't used for anything else after this, it's unnecessary.
|
||||||
|
|
Loading…
Reference in New Issue
Block a user