Remove temporary connections and use a lock again

These seem to be the reason for missing some updates (#237)
This commit is contained in:
Lonami Exo 2018-01-06 23:43:40 +01:00
parent 7745b8e7ee
commit d81dd055e6
2 changed files with 31 additions and 68 deletions

View File

@ -5,6 +5,7 @@ encrypting every packet, and relies on a valid AuthKey in the used Session.
import gzip import gzip
import logging import logging
import struct import struct
from threading import Lock
from .. import helpers as utils from .. import helpers as utils
from ..crypto import AES from ..crypto import AES
@ -53,6 +54,9 @@ class MtProtoSender:
# Requests (as msg_id: Message) sent waiting to be received # Requests (as msg_id: Message) sent waiting to be received
self._pending_receive = {} self._pending_receive = {}
# Multithreading
self._send_lock = Lock()
def connect(self): def connect(self):
"""Connects to the server.""" """Connects to the server."""
self.connection.connect(self.session.server_address, self.session.port) self.connection.connect(self.session.server_address, self.session.port)
@ -71,10 +75,6 @@ class MtProtoSender:
self._need_confirmation.clear() self._need_confirmation.clear()
self._clear_all_pending() self._clear_all_pending()
def clone(self):
"""Creates a copy of this MtProtoSender as a new connection."""
return MtProtoSender(self.session, self.connection.clone())
# region Send and receive # region Send and receive
def send(self, *requests): def send(self, *requests):
@ -156,7 +156,8 @@ class MtProtoSender:
:param message: the TLMessage to be sent. :param message: the TLMessage to be sent.
""" """
self.connection.send(utils.pack_message(self.session, message)) with self._send_lock:
self.connection.send(utils.pack_message(self.session, message))
def _decode_msg(self, body): def _decode_msg(self, body):
""" """

View File

@ -163,11 +163,6 @@ class TelegramBareClient:
self._spawn_read_thread = spawn_read_thread self._spawn_read_thread = spawn_read_thread
self._recv_thread = None self._recv_thread = None
# Identifier of the main thread (the one that called .connect()).
# This will be used to create new connections from any other thread,
# so that requests can be sent in parallel.
self._main_thread_ident = None
# Default PingRequest delay # Default PingRequest delay
self._last_ping = datetime.now() self._last_ping = datetime.now()
self._ping_delay = timedelta(minutes=1) self._ping_delay = timedelta(minutes=1)
@ -198,7 +193,6 @@ class TelegramBareClient:
__log__.info('Connecting to %s:%d...', __log__.info('Connecting to %s:%d...',
self.session.server_address, self.session.port) self.session.server_address, self.session.port)
self._main_thread_ident = threading.get_ident()
self._background_error = None # Clear previous errors self._background_error = None # Clear previous errors
try: try:
@ -431,6 +425,9 @@ class TelegramBareClient:
x.content_related for x in requests): x.content_related for x in requests):
raise TypeError('You can only invoke requests, not types!') raise TypeError('You can only invoke requests, not types!')
if self._background_error:
raise self._background_error
# For logging purposes # For logging purposes
if len(requests) == 1: if len(requests) == 1:
which = type(requests[0]).__name__ which = type(requests[0]).__name__
@ -439,66 +436,31 @@ class TelegramBareClient:
len(requests), [type(x).__name__ for x in requests]) len(requests), [type(x).__name__ for x in requests])
# Determine the sender to be used (main or a new connection) # Determine the sender to be used (main or a new connection)
on_main_thread = threading.get_ident() == self._main_thread_ident __log__.debug('Invoking %s', which)
if on_main_thread or self._on_read_thread():
__log__.debug('Invoking %s from main thread', which)
sender = self._sender
update_state = self.updates
else:
__log__.debug('Invoking %s from background thread. '
'Creating temporary connection', which)
sender = self._sender.clone() call_receive = self._recv_thread is None or self._reconnect_lock.locked()
sender.connect() for retry in range(retries):
# We're on another connection, Telegram will resend all the result = self._invoke(call_receive, *requests)
# updates that we haven't acknowledged (potentially entering if result is not None:
# an infinite loop if we're calling this in response to an return result
# update event, as it would be received again and again). So
# to avoid this we will simply not process updates on these
# new temporary connections, as they will be sent and later
# acknowledged over the main connection.
update_state = None
# We should call receive from this thread if there's no background __log__.warning('Invoking %s failed %d times, '
# thread reading or if the server disconnected us and we're trying 'reconnecting and retrying',
# to reconnect. This is because the read thread may either be [str(x) for x in requests], retry + 1)
# locked also trying to reconnect or we may be said thread already. sleep(1)
call_receive = not on_main_thread or self._recv_thread is None \ # The ReadThread has priority when attempting reconnection,
or self._reconnect_lock.locked() # since this thread is constantly running while __call__ is
try: # only done sometimes. Here try connecting only once/retry.
for attempt in range(retries): if not self._reconnect_lock.locked():
if self._background_error and on_main_thread: with self._reconnect_lock:
raise self._background_error self._reconnect()
result = self._invoke( raise RuntimeError('Number of retries reached 0.')
sender, call_receive, update_state, *requests
)
if result is not None:
return result
__log__.warning('Invoking %s failed %d times, '
'reconnecting and retrying',
[str(x) for x in requests], attempt + 1)
sleep(1)
# The ReadThread has priority when attempting reconnection,
# since this thread is constantly running while __call__ is
# only done sometimes. Here try connecting only once/retry.
if sender == self._sender:
if not self._reconnect_lock.locked():
with self._reconnect_lock:
self._reconnect()
else:
sender.connect()
raise RuntimeError('Number of retries reached 0.')
finally:
if sender != self._sender:
sender.disconnect() # Close temporary connections
# Let people use client.invoke(SomeRequest()) instead client(...) # Let people use client.invoke(SomeRequest()) instead client(...)
invoke = __call__ invoke = __call__
def _invoke(self, sender, call_receive, update_state, *requests): def _invoke(self, call_receive, *requests):
try: try:
# Ensure that we start with no previous errors (i.e. resending) # Ensure that we start with no previous errors (i.e. resending)
for x in requests: for x in requests:
@ -523,7 +485,7 @@ class TelegramBareClient:
self._wrap_init_connection(GetConfigRequest()) self._wrap_init_connection(GetConfigRequest())
) )
sender.send(*requests) self._sender.send(*requests)
if not call_receive: if not call_receive:
# TODO This will be slightly troublesome if we allow # TODO This will be slightly troublesome if we allow
@ -532,11 +494,11 @@ class TelegramBareClient:
# in which case a Lock would be required for .receive(). # in which case a Lock would be required for .receive().
for x in requests: for x in requests:
x.confirm_received.wait( x.confirm_received.wait(
sender.connection.get_timeout() self._sender.connection.get_timeout()
) )
else: else:
while not all(x.confirm_received.is_set() for x in requests): while not all(x.confirm_received.is_set() for x in requests):
sender.receive(update_state=update_state) self._sender.receive(update_state=self.updates)
except BrokenAuthKeyError: except BrokenAuthKeyError:
__log__.error('Authorization key seems broken and was invalid!') __log__.error('Authorization key seems broken and was invalid!')
@ -578,7 +540,7 @@ class TelegramBareClient:
# be on the very first connection (not authorized, not running), # be on the very first connection (not authorized, not running),
# but may be an issue for people who actually travel? # but may be an issue for people who actually travel?
self._reconnect(new_dc=e.new_dc) self._reconnect(new_dc=e.new_dc)
return self._invoke(sender, call_receive, update_state, *requests) return self._invoke(call_receive, *requests)
except ServerError as e: except ServerError as e:
# Telegram is having some issues, just retry # Telegram is having some issues, just retry