Move the "constant read" thread to the main TelegramClient

This commit is contained in:
Lonami Exo 2017-09-03 09:56:10 +02:00
parent 69d182815f
commit 4de4026bb3
3 changed files with 58 additions and 51 deletions

View File

@ -17,17 +17,9 @@ class MtProtoSender:
(https://core.telegram.org/mtproto/description) (https://core.telegram.org/mtproto/description)
""" """
def __init__(self, connection, session, constant_read): def __init__(self, connection, session):
"""Creates a new MtProtoSender configured to send messages through """Creates a new MtProtoSender configured to send messages through
'connection' and using the parameters from 'session'. 'connection' and using the parameters from 'session'.
If 'constant_read' is set to True, another thread will be
created and started upon connection to constantly read
from the other end. Otherwise, manual calls to .receive()
must be performed. The MtProtoSender cannot be connected,
or an error will be thrown.
This way, sending and receiving will be completely independent.
""" """
self.connection = connection self.connection = connection
self.session = session self.session = session
@ -43,10 +35,6 @@ class MtProtoSender:
# TODO There might be a better way to handle msgs_ack requests # TODO There might be a better way to handle msgs_ack requests
self.logging_out = False self.logging_out = False
# Will create a new _recv_thread when connecting if set
self._constant_read = constant_read
self._recv_thread = None
# Every unhandled result gets passed to these callbacks, which # Every unhandled result gets passed to these callbacks, which
# should be functions accepting a single parameter: a TLObject. # should be functions accepting a single parameter: a TLObject.
# This should only be Update(s), although it can actually be any type. # This should only be Update(s), although it can actually be any type.
@ -59,29 +47,14 @@ class MtProtoSender:
def connect(self): def connect(self):
"""Connects to the server""" """Connects to the server"""
if not self.is_connected():
self.connection.connect() self.connection.connect()
if self._constant_read:
self._recv_thread = Thread(
name='ReadThread', daemon=True,
target=self._recv_thread_impl
)
self._recv_thread.start()
def is_connected(self): def is_connected(self):
return self.connection.is_connected() return self.connection.is_connected()
def disconnect(self): def disconnect(self):
"""Disconnects from the server""" """Disconnects from the server"""
if self.is_connected():
self.connection.close() self.connection.close()
if self._constant_read:
# The existing thread will close eventually, since it's
# only running while the MtProtoSender.is_connected()
self._recv_thread = None
def is_constant_read(self):
return self._constant_read
# region Send and receive # region Send and receive
@ -117,14 +90,6 @@ class MtProtoSender:
del self._need_confirmation[:] del self._need_confirmation[:]
def _recv_thread_impl(self):
while self.is_connected():
try:
self.receive()
except TimeoutError:
# No problem.
pass
def receive(self): def receive(self):
"""Receives a single message from the connected endpoint. """Receives a single message from the connected endpoint.

View File

@ -97,8 +97,7 @@ class TelegramBareClient:
# region Connecting # region Connecting
def connect(self, exported_auth=None, initial_query=None, def connect(self, exported_auth=None, initial_query=None):
constant_read=False):
"""Connects to the Telegram servers, executing authentication if """Connects to the Telegram servers, executing authentication if
required. Note that authenticating to the Telegram servers is required. Note that authenticating to the Telegram servers is
not the same as authenticating the desired user itself, which not the same as authenticating the desired user itself, which
@ -110,9 +109,6 @@ class TelegramBareClient:
If 'initial_query' is not None, it will override the default If 'initial_query' is not None, it will override the default
'GetConfigRequest()', and its result will be returned ONLY 'GetConfigRequest()', and its result will be returned ONLY
if the client wasn't connected already. if the client wasn't connected already.
The 'constant_read' parameter will be used when creating
the MtProtoSender. Refer to it for more information.
""" """
if self._sender and self._sender.is_connected(): if self._sender and self._sender.is_connected():
# Try sending a ping to make sure we're connected already # Try sending a ping to make sure we're connected already
@ -139,9 +135,7 @@ class TelegramBareClient:
self.session.save() self.session.save()
self._sender = MtProtoSender( self._sender = MtProtoSender(connection, self.session)
connection, self.session, constant_read=constant_read
)
self._sender.unhandled_callbacks = self._update_callbacks self._sender.unhandled_callbacks = self._update_callbacks
self._sender.connect() self._sender.connect()
@ -293,11 +287,15 @@ class TelegramBareClient:
# region Invoking Telegram requests # region Invoking Telegram requests
def invoke(self, request, updates=None): def invoke(self, request, updates=None, call_receive=True):
"""Invokes (sends) a MTProtoRequest and returns (receives) its result. """Invokes (sends) a MTProtoRequest and returns (receives) its result.
If 'updates' is not None, all read update object will be put If 'updates' is not None, all read update object will be put
in such list. Otherwise, update objects will be ignored. in such list. Otherwise, update objects will be ignored.
If 'call_receive' is set to False, then there should be another
thread calling to 'self._sender.receive()' running or this method
will lock forever.
""" """
if not isinstance(request, TLObject) and not request.content_related: if not isinstance(request, TLObject) and not request.content_related:
raise ValueError('You can only invoke requests, not types!') raise ValueError('You can only invoke requests, not types!')
@ -307,12 +305,12 @@ class TelegramBareClient:
try: try:
self._sender.send(request) self._sender.send(request)
if self._sender.is_constant_read(): if not call_receive:
# TODO This will be slightly troublesome if we allow # TODO This will be slightly troublesome if we allow
# switching between constant read or not on the fly. # switching between constant read or not on the fly.
# Must also watch out for calling .read() from two places, # Must also watch out for calling .read() from two places,
# in which case a Lock would be required for .receive(). # in which case a Lock would be required for .receive().
request.confirm_received.wait() # TODO Optional timeout here? request.confirm_received.wait() # TODO Socket's timeout here?
else: else:
while not request.confirm_received.is_set(): while not request.confirm_received.is_set():
self._sender.receive() self._sender.receive()

View File

@ -111,6 +111,10 @@ class TelegramClient(TelegramBareClient):
# Uploaded files cache so subsequent calls are instant # Uploaded files cache so subsequent calls are instant
self._upload_cache = {} self._upload_cache = {}
# Constantly read for results and updates from within the main client
self._recv_thread = None
# endregion # endregion
# region Connecting # region Connecting
@ -123,6 +127,10 @@ class TelegramClient(TelegramBareClient):
*args will be ignored. *args will be ignored.
""" """
if self._sender.is_connected():
return
ok = super().connect()
# The main TelegramClient is the only one that will have # The main TelegramClient is the only one that will have
# constant_read, since it's also the only one who receives # constant_read, since it's also the only one who receives
# updates and need to be processed as soon as they occur. # updates and need to be processed as soon as they occur.
@ -132,13 +140,27 @@ class TelegramClient(TelegramBareClient):
# read constantly or not for updates needs to be known before hand, # read constantly or not for updates needs to be known before hand,
# and further updates won't be able to be added unless allowing to # and further updates won't be able to be added unless allowing to
# switch the mode on the fly. # switch the mode on the fly.
return super().connect(constant_read=True) if ok:
self._recv_thread = Thread(
name='ReadThread', daemon=True,
target=self._recv_thread_impl
)
self._recv_thread.start()
return ok
def disconnect(self): def disconnect(self):
"""Disconnects from the Telegram server """Disconnects from the Telegram server
and stops all the spawned threads""" and stops all the spawned threads"""
if not self._sender.is_connected():
return
super().disconnect() super().disconnect()
# The existing thread will close eventually, since it's
# only running while the MtProtoSender.is_connected()
self._recv_thread = None
# Also disconnect all the cached senders # Also disconnect all the cached senders
for sender in self._cached_clients.values(): for sender in self._cached_clients.values():
sender.disconnect() sender.disconnect()
@ -185,7 +207,9 @@ class TelegramClient(TelegramBareClient):
self._lock.acquire() self._lock.acquire()
# TODO Retry if 'result' is None? # TODO Retry if 'result' is None?
return super().invoke(request) return super().invoke(
request, call_receive=self._recv_thread is None
)
except (PhoneMigrateError, NetworkMigrateError, UserMigrateError) as e: except (PhoneMigrateError, NetworkMigrateError, UserMigrateError) as e:
self._logger.debug('DC error when invoking request, ' self._logger.debug('DC error when invoking request, '
@ -876,3 +900,23 @@ class TelegramClient(TelegramBareClient):
return self._update_callbacks[:] return self._update_callbacks[:]
# endregion # endregion
# Constant read
# By using this approach, another thread will be
# created and started upon connection to constantly read
# from the other end. Otherwise, manual calls to .receive()
# must be performed. The MtProtoSender cannot be connected,
# or an error will be thrown.
#
# This way, sending and receiving will be completely independent.
def _recv_thread_impl(self):
while self._sender.is_connected():
try:
self._sender.receive()
print('got one')
except TimeoutError:
# No problem.
pass
# endregion