Create a new connection when called from a different thread

This allows to invoke several requests in parallel while not
waiting for other requests to be written to the network.
This commit is contained in:
Lonami Exo 2017-09-29 19:55:14 +02:00
parent ee08232473
commit 0a693c705a
2 changed files with 44 additions and 9 deletions

View File

@ -290,7 +290,7 @@ class TelegramBareClient:
# region Invoking Telegram requests # region Invoking Telegram requests
def invoke(self, *requests, call_receive=True, retries=5): def invoke(self, *requests, call_receive=True, retries=5, sender=None):
"""Invokes (sends) a MTProtoRequest and returns (receives) its result. """Invokes (sends) a MTProtoRequest and returns (receives) its result.
If 'updates' is not None, all read update object will be put If 'updates' is not None, all read update object will be put
@ -307,13 +307,16 @@ class TelegramBareClient:
if retries <= 0: if retries <= 0:
raise ValueError('Number of retries reached 0.') raise ValueError('Number of retries reached 0.')
if sender is None:
sender = self._sender
try: try:
# Ensure that we start with no previous errors (i.e. resending) # Ensure that we start with no previous errors (i.e. resending)
for x in requests: for x in requests:
x.confirm_received.clear() x.confirm_received.clear()
x.rpc_error = None x.rpc_error = None
self._sender.send(*requests) sender.send(*requests)
if not call_receive: if not call_receive:
# TODO This will be slightly troublesome if we allow # TODO This will be slightly troublesome if we allow
# switching between constant read or not on the fly. # switching between constant read or not on the fly.
@ -321,11 +324,11 @@ class TelegramBareClient:
# in which case a Lock would be required for .receive(). # in which case a Lock would be required for .receive().
for x in requests: for x in requests:
x.confirm_received.wait( x.confirm_received.wait(
self._sender.connection.get_timeout() sender.connection.get_timeout()
) )
else: else:
while not all(x.confirm_received.is_set() for x in requests): while not all(x.confirm_received.is_set() for x in requests):
self._sender.receive(update_state=self.updates) sender.receive(update_state=self.updates)
except TimeoutError: except TimeoutError:
pass # We will just retry pass # We will just retry
@ -333,9 +336,13 @@ class TelegramBareClient:
except ConnectionResetError: except ConnectionResetError:
self._logger.debug('Server disconnected us. Reconnecting and ' self._logger.debug('Server disconnected us. Reconnecting and '
'resending request...') 'resending request...')
self._reconnect() if sender != self._sender:
sender.connect()
else:
self._reconnect()
except FloodWaitError: except FloodWaitError:
sender.disconnect()
self.disconnect() self.disconnect()
raise raise

View File

@ -18,7 +18,7 @@ from .errors import (
PhoneMigrateError, NetworkMigrateError, UserMigrateError, PhoneMigrateError, NetworkMigrateError, UserMigrateError,
PhoneCodeExpiredError, PhoneCodeHashEmptyError, PhoneCodeInvalidError PhoneCodeExpiredError, PhoneCodeHashEmptyError, PhoneCodeInvalidError
) )
from .network import ConnectionMode from .network import Connection, ConnectionMode, MtProtoSender
from .tl import Session, TLObject from .tl import Session, TLObject
from .tl.functions import PingRequest from .tl.functions import PingRequest
from .tl.functions.account import ( from .tl.functions.account import (
@ -146,6 +146,11 @@ class TelegramClient(TelegramBareClient):
# Constantly read for results and updates from within the main client # Constantly read for results and updates from within the main client
self._recv_thread = None self._recv_thread = None
# Identifier of the main thread (the one that called .connect()).
# This will be used to create new connections from any other thread,
# so that requests can be sent in parallel.
self._main_thread_ident = None
# Default PingRequest delay # Default PingRequest delay
self._last_ping = datetime.now() self._last_ping = datetime.now()
self._ping_delay = timedelta(minutes=1) self._ping_delay = timedelta(minutes=1)
@ -162,6 +167,8 @@ class TelegramClient(TelegramBareClient):
exported_auth is meant for internal purposes and can be ignored. exported_auth is meant for internal purposes and can be ignored.
""" """
self._main_thread_ident = threading.get_ident()
if socks and self._recv_thread: if socks and self._recv_thread:
# Treat proxy errors specially since they're not related to # Treat proxy errors specially since they're not related to
# Telegram itself, but rather to the proxy. If any happens on # Telegram itself, but rather to the proxy. If any happens on
@ -246,23 +253,40 @@ class TelegramClient(TelegramBareClient):
""" """
# This is only valid when the read thread is reconnecting, # This is only valid when the read thread is reconnecting,
# that is, the connection lock is locked. # that is, the connection lock is locked.
if self._on_read_thread() and not self._connect_lock.locked(): on_read_thread = self._on_read_thread()
if on_read_thread and not self._connect_lock.locked():
return # Just ignore, we would be raising and crashing the thread return # Just ignore, we would be raising and crashing the thread
self.updates.check_error() self.updates.check_error()
# Determine the sender to be used (main or a new connection)
# TODO Polish this so it's nicer
on_main_thread = threading.get_ident() == self._main_thread_ident
if on_main_thread or on_read_thread:
sender = self._sender
else:
conn = Connection(
self.session.server_address, self.session.port,
mode=self._sender.connection._mode,
proxy=self._sender.connection.conn.proxy,
timeout=self._sender.connection.get_timeout()
)
sender = MtProtoSender(self.session, conn)
sender.connect()
try: try:
# We should call receive from this thread if there's no background # We should call receive from this thread if there's no background
# thread reading or if the server disconnected us and we're trying # thread reading or if the server disconnected us and we're trying
# to reconnect. This is because the read thread may either be # to reconnect. This is because the read thread may either be
# locked also trying to reconnect or we may be said thread already. # locked also trying to reconnect or we may be said thread already.
call_receive = \ call_receive = not on_main_thread or \
self._recv_thread is None or self._connect_lock.locked() self._recv_thread is None or self._connect_lock.locked()
return super().invoke( return super().invoke(
*requests, *requests,
call_receive=call_receive, call_receive=call_receive,
retries=kwargs.get('retries', 5) retries=kwargs.get('retries', 5),
sender=sender
) )
except (PhoneMigrateError, NetworkMigrateError, UserMigrateError) as e: except (PhoneMigrateError, NetworkMigrateError, UserMigrateError) as e:
@ -284,6 +308,10 @@ class TelegramClient(TelegramBareClient):
while self._user_connected and not self._reconnect(): while self._user_connected and not self._reconnect():
sleep(0.1) # Retry forever until we can send the request sleep(0.1) # Retry forever until we can send the request
finally:
if sender != self._sender:
sender.disconnect()
# Let people use client(SomeRequest()) instead client.invoke(...) # Let people use client(SomeRequest()) instead client.invoke(...)
__call__ = invoke __call__ = invoke