Allow setting whether the MtProtoSender should use constant_read

This commit is contained in:
Lonami Exo 2017-09-02 21:27:11 +02:00
parent 863d2e8368
commit 21eaf8bd72
3 changed files with 71 additions and 17 deletions

View File

@ -13,9 +13,22 @@ logging.getLogger(__name__).addHandler(logging.NullHandler())
class MtProtoSender: class MtProtoSender:
"""MTProto Mobile Protocol sender (https://core.telegram.org/mtproto/description)""" """MTProto Mobile Protocol sender
(https://core.telegram.org/mtproto/description)
"""
def __init__(self, connection, session): def __init__(self, connection, session, constant_read):
"""Creates a new MtProtoSender configured to send messages through
'connection' and using the parameters from 'session'.
If 'constant_read' is set to True, another thread will be
created and started upon connection to constantly read
from the other end. Otherwise, manual calls to .receive()
must be performed. The MtProtoSender cannot be connected,
or an error will be thrown.
This way, sending and receiving will be completely independent.
"""
self.connection = connection self.connection = connection
self.session = session self.session = session
self._logger = logging.getLogger(__name__) self._logger = logging.getLogger(__name__)
@ -30,23 +43,35 @@ class MtProtoSender:
# TODO There might be a better way to handle msgs_ack requests # TODO There might be a better way to handle msgs_ack requests
self.logging_out = False self.logging_out = False
# Reading and writing shouldn't be related. Call .recv() forever here. # Will create a new _recv_thread when connecting if set
# TODO Maybe this could be disabled with some "constant_read=bool". self._constant_read = constant_read
self._recv_thread = Thread( self._recv_thread = None
name='ReadThread', daemon=True, target=self._recv_thread_impl
)
def connect(self): def connect(self):
"""Connects to the server""" """Connects to the server"""
self.connection.connect() if not self.is_connected():
self._recv_thread.start() self.connection.connect()
if self._constant_read:
self._recv_thread = Thread(
name='ReadThread', daemon=True,
target=self._recv_thread_impl
)
self._recv_thread.start()
def is_connected(self): def is_connected(self):
return self.connection.is_connected() return self.connection.is_connected()
def disconnect(self): def disconnect(self):
"""Disconnects from the server""" """Disconnects from the server"""
self.connection.close() if self.is_connected():
self.connection.close()
if self._constant_read:
# The existing thread will close eventually, since it's
# only running while the MtProtoSender.is_connected()
self._recv_thread = None
def is_constant_read(self):
return self._constant_read
# region Send and receive # region Send and receive
@ -85,13 +110,18 @@ class MtProtoSender:
def _recv_thread_impl(self): def _recv_thread_impl(self):
while self.is_connected(): while self.is_connected():
try: try:
self._receive_message() self.receive()
except TimeoutError: except TimeoutError:
# No problem. # No problem.
pass pass
def _receive_message(self): def receive(self):
"""Receives a single message from the connected endpoint.""" """Receives a single message from the connected endpoint.
This method returns nothing, and will only affect other parts
of the MtProtoSender such as the updates callback being fired
or a pending request being confirmed.
"""
# TODO Don't ignore updates # TODO Don't ignore updates
self._logger.debug('Receiving a message...') self._logger.debug('Receiving a message...')
body = self.connection.recv() body = self.connection.recv()

View File

@ -91,7 +91,8 @@ class TelegramBareClient:
# region Connecting # region Connecting
def connect(self, exported_auth=None, initial_query=None): def connect(self, exported_auth=None, initial_query=None,
constant_read=False):
"""Connects to the Telegram servers, executing authentication if """Connects to the Telegram servers, executing authentication if
required. Note that authenticating to the Telegram servers is required. Note that authenticating to the Telegram servers is
not the same as authenticating the desired user itself, which not the same as authenticating the desired user itself, which
@ -103,6 +104,9 @@ class TelegramBareClient:
If 'initial_query' is not None, it will override the default If 'initial_query' is not None, it will override the default
'GetConfigRequest()', and its result will be returned ONLY 'GetConfigRequest()', and its result will be returned ONLY
if the client wasn't connected already. if the client wasn't connected already.
The 'constant_read' parameter will be used when creating
the MtProtoSender. Refer to it for more information.
""" """
if self._sender and self._sender.is_connected(): if self._sender and self._sender.is_connected():
# Try sending a ping to make sure we're connected already # Try sending a ping to make sure we're connected already
@ -129,7 +133,9 @@ class TelegramBareClient:
self.session.save() self.session.save()
self._sender = MtProtoSender(connection, self.session) self._sender = MtProtoSender(
connection, self.session, constant_read=constant_read
)
self._sender.connect() self._sender.connect()
# Now it's time to send an InitConnectionRequest # Now it's time to send an InitConnectionRequest
@ -294,7 +300,16 @@ class TelegramBareClient:
try: try:
self._sender.send(request) self._sender.send(request)
request.confirm_received.wait() # TODO Optional timeout here? if self._sender.is_constant_read():
# TODO This will be slightly troublesome if we allow
# switching between constant read or not on the fly.
# Must also watch out for calling .read() from two places,
# in which case a Lock would be required for .receive().
request.confirm_received.wait() # TODO Optional timeout here?
else:
while not request.confirm_received.is_set():
self._sender.receive()
if request.rpc_error: if request.rpc_error:
raise request.rpc_error raise request.rpc_error
return request.result return request.result

View File

@ -131,7 +131,16 @@ class TelegramClient(TelegramBareClient):
*args will be ignored. *args will be ignored.
""" """
return super().connect() # The main TelegramClient is the only one that will have
# constant_read, since it's also the only one who receives
# updates and need to be processed as soon as they occur.
#
# TODO Allow to disable this to avoid the creation of a new thread
# if the user is not going to work with updates at all? Whether to
# read constantly or not for updates needs to be known before hand,
# and further updates won't be able to be added unless allowing to
# switch the mode on the fly.
return super().connect(constant_read=True)
def disconnect(self): def disconnect(self):
"""Disconnects from the Telegram server """Disconnects from the Telegram server