Minor improvement to updates handling

Now the updates thread won't start unless you add,
at least, one updates handler. Also, if the TcpClient
was receiving (i.e., from an update), it will let the
update to be received first instead of crashing
This commit is contained in:
Lonami 2016-09-11 11:50:38 +02:00
parent c11795f294
commit cdb1674a27
4 changed files with 31 additions and 34 deletions

View File

@ -36,8 +36,22 @@ class MtProtoSender:
def add_update_handler(self, handler): def add_update_handler(self, handler):
"""Adds an update handler (a method with one argument, the received """Adds an update handler (a method with one argument, the received
TLObject) that is fired when there are updates available""" TLObject) that is fired when there are updates available"""
first_handler = not self.on_update_handlers
self.on_update_handlers.append(handler) self.on_update_handlers.append(handler)
# If this is the first added handler,
# we must start the thread to receive updates
if first_handler:
self.set_listen_for_updates(enabled=True)
def remove_update_handler(self, handler):
self.on_update_handlers.remove(handler)
# If there are no more update handlers, stop the thread
if not self.on_update_handlers:
self.set_listen_for_updates(False)
def generate_sequence(self, confirmed): def generate_sequence(self, confirmed):
"""Generates the next sequence number, based on whether it """Generates the next sequence number, based on whether it
was confirmed yet or not""" was confirmed yet or not"""

View File

@ -45,7 +45,12 @@ class TcpClient:
self.cancelled = False self.cancelled = False
with BinaryWriter() as writer: with BinaryWriter() as writer:
while writer.written_count < buffer_size and not self.cancelled: while writer.written_count < buffer_size:
# Only do cancel if no data was read yet
# Otherwise, carry on reading and finish
if self.cancelled and writer.written_count == 0:
raise ReadCancelledError()
try: try:
# When receiving from the socket, we may not receive all the data at once # When receiving from the socket, we may not receive all the data at once
# This is why we need to keep checking to make sure that we receive it all # This is why we need to keep checking to make sure that we receive it all
@ -57,20 +62,10 @@ class TcpClient:
# There was no data available for us to read. Sleep a bit # There was no data available for us to read. Sleep a bit
time.sleep(self.delay) time.sleep(self.delay)
# If the operation was cancelled *but* data was read,
# this will result on data loss so raise an exception
# TODO this could be solved by using an internal FIFO buffer (first in, first out)
if self.cancelled:
if writer.written_count == 0:
raise ReadCancelledError()
else:
raise NotImplementedError('The read operation was cancelled when some data '
'was already read. This has not yet implemented '
'an internal buffer, so cannot continue.')
# If everything went fine, return the read bytes # If everything went fine, return the read bytes
return writer.get_bytes() return writer.get_bytes()
def cancel_read(self): def cancel_read(self):
"""Cancels the read operation raising a ReadCancelledError""" """Cancels the read operation IF it hasn't yet
started, raising a ReadCancelledError"""
self.cancelled = True self.cancelled = True

View File

@ -123,9 +123,8 @@ def parse_message_entities(msg):
# Second case, both inside: so*me_th*in_g. # Second case, both inside: so*me_th*in_g.
# In this case, the current entity length is decreased by one, # In this case, the current entity length is decreased by one,
# and all the subentities offset and length decrease 1 # and all the subentities offset and length decrease 1
elif (subentity.offset > entity.offset and elif (entity.offset < subentity.offset < entity.offset + entity.length and
subentity.offset < entity.offset + entity.length and subentity.offset + subentity.length > entity.offset + entity.length):
subentity.offset + subentity.length > entity.offset + entity.length):
entity.length -= 1 entity.length -= 1
subentity.offset -= 1 subentity.offset -= 1
subentity.length -= 1 subentity.length -= 1

View File

@ -18,9 +18,6 @@ from tl.functions.help import GetConfigRequest
from tl.functions.auth import SendCodeRequest, SignInRequest from tl.functions.auth import SendCodeRequest, SignInRequest
from tl.functions.messages import GetDialogsRequest, GetHistoryRequest, SendMessageRequest from tl.functions.messages import GetDialogsRequest, GetHistoryRequest, SendMessageRequest
# For working with updates
from tl.types import UpdateShortMessage
class TelegramClient: class TelegramClient:
@ -59,7 +56,6 @@ class TelegramClient:
self.session.save() self.session.save()
self.sender = MtProtoSender(self.transport, self.session) self.sender = MtProtoSender(self.transport, self.session)
self.sender.add_update_handler(self.on_update)
# Now it's time to send an InitConnectionRequest # Now it's time to send an InitConnectionRequest
# This must always be invoked with the layer we'll be using # This must always be invoked with the layer we'll be using
@ -72,10 +68,6 @@ class TelegramClient:
result = self.invoke(InvokeWithLayerRequest(layer=self.layer, query=query)) result = self.invoke(InvokeWithLayerRequest(layer=self.layer, query=query))
# Only listen for updates if we're authorized
if self.is_user_authorized():
self.sender.set_listen_for_updates(True)
# We're only interested in the DC options, # We're only interested in the DC options,
# although many other options are available! # although many other options are available!
self.dc_options = result.dc_options self.dc_options = result.dc_options
@ -260,21 +252,18 @@ class TelegramClient:
return InputPeerChannel(channel.id, channel.access_hash) return InputPeerChannel(channel.id, channel.access_hash)
except StopIteration: except StopIteration:
pass return None
return None
# endregion # endregion
# region Updates handling # region Updates handling
def on_update(self, tlobject): def add_update_handler(self, handler):
"""This method is fired when there are updates from Telegram. """Adds an update handler (a function which takes a TLObject,
Add your own implementation below, or simply override it!""" an update, as its parameter) and listens for updates"""
self.sender.add_update_handler(handler)
# Only show incoming messages def remove_update_handler(self, handler):
if type(tlobject) is UpdateShortMessage: self.sender.remove_update_handler(handler)
if not tlobject.out:
print('> User with ID {} said "{}"'.format(tlobject.user_id, tlobject.message))
# endregion # endregion