Make certain methods and members private

This commit is contained in:
Lonami Exo 2017-05-29 17:06:48 +02:00
parent 63b1881c83
commit 452532cce7
3 changed files with 126 additions and 126 deletions

View File

@ -23,7 +23,7 @@ class MtProtoPlainSender:
"""Sends a plain packet (auth_key_id = 0) containing the given message body (data)""" """Sends a plain packet (auth_key_id = 0) containing the given message body (data)"""
with BinaryWriter() as writer: with BinaryWriter() as writer:
writer.write_long(0) writer.write_long(0)
writer.write_long(self.get_new_msg_id()) writer.write_long(self._get_new_msg_id())
writer.write_int(len(data)) writer.write_int(len(data))
writer.write(data) writer.write(data)
@ -41,7 +41,7 @@ class MtProtoPlainSender:
response = reader.read(message_length) response = reader.read(message_length)
return response return response
def get_new_msg_id(self): def _get_new_msg_id(self):
"""Generates a new message ID based on the current time (in ms) since epoch""" """Generates a new message ID based on the current time (in ms) since epoch"""
# See https://core.telegram.org/mtproto/description#message-identifier-msg-id # See https://core.telegram.org/mtproto/description#message-identifier-msg-id
ms_time = int(time.time() * 1000) ms_time = int(time.time() * 1000)

View File

@ -21,43 +21,43 @@ class MtProtoSender:
"""MTProto Mobile Protocol sender (https://core.telegram.org/mtproto/description)""" """MTProto Mobile Protocol sender (https://core.telegram.org/mtproto/description)"""
def __init__(self, transport, session): def __init__(self, transport, session):
self.transport = transport self._transport = transport
self.session = session self.session = session
self.logger = logging.getLogger(__name__) self._logger = logging.getLogger(__name__)
self.need_confirmation = [] # Message IDs that need confirmation self._need_confirmation = [] # Message IDs that need confirmation
self.on_update_handlers = [] self._on_update_handlers = []
# Store an RLock instance to make this class safely multi-threaded # Store an RLock instance to make this class safely multi-threaded
self.lock = RLock() self._lock = RLock()
# Flag used to determine whether we've received a sent request yet or not # Flag used to determine whether we've received a sent request yet or not
# We need this to avoid using the updates thread if we're waiting to read # We need this to avoid using the updates thread if we're waiting to read
self.waiting_receive = Event() self._waiting_receive = Event()
# Used when logging out, the only request that seems to use 'ack' requests # Used when logging out, the only request that seems to use 'ack' requests
# TODO There might be a better way to handle msgs_ack requests # TODO There might be a better way to handle msgs_ack requests
self.logging_out = False self.logging_out = False
self.ping_interval = 60 self.ping_interval = 60
self.ping_time_last = time() self._ping_time_last = time()
# Flags used to determine the status of the updates thread. # Flags used to determine the status of the updates thread.
self.updates_thread_running = Event() self._updates_thread_running = Event()
self.updates_thread_receiving = Event() self._updates_thread_receiving = Event()
# Sleep amount on "must sleep" error for the updates thread to sleep too # Sleep amount on "must sleep" error for the updates thread to sleep too
self.updates_thread_sleep = None self._updates_thread_sleep = None
self.updates_thread = None # Set later self._updates_thread = None # Set later
def connect(self): def connect(self):
"""Connects to the server""" """Connects to the server"""
self.transport.connect() self._transport.connect()
def disconnect(self): def disconnect(self):
"""Disconnects and **stops all the running threads** if any""" """Disconnects and **stops all the running threads** if any"""
self.set_updates_thread(running=False) self._set_updates_thread(running=False)
self.transport.close() self._transport.close()
def reconnect(self): def reconnect(self):
"""Disconnects and connects again (effectively reconnecting)""" """Disconnects and connects again (effectively reconnecting)"""
@ -67,11 +67,11 @@ class MtProtoSender:
def setup_ping_thread(self): def setup_ping_thread(self):
"""Sets up the Ping's thread, so that a connection can be kept """Sets up the Ping's thread, so that a connection can be kept
alive for a longer time without Telegram disconnecting us""" alive for a longer time without Telegram disconnecting us"""
self.updates_thread = Thread( self._updates_thread = Thread(
name='UpdatesThread', daemon=True, name='UpdatesThread', daemon=True,
target=self.updates_thread_method) target=self._updates_thread_method)
self.set_updates_thread(running=True) self._set_updates_thread(running=True)
def add_update_handler(self, handler): def add_update_handler(self, handler):
"""Adds an update handler (a method with one argument, the received """Adds an update handler (a method with one argument, the received
@ -79,12 +79,12 @@ class MtProtoSender:
# The updates thread is already running for periodic ping requests, # The updates thread is already running for periodic ping requests,
# so there is no need to start it when adding update handlers. # so there is no need to start it when adding update handlers.
self.on_update_handlers.append(handler) self._on_update_handlers.append(handler)
def remove_update_handler(self, handler): def remove_update_handler(self, handler):
self.on_update_handlers.remove(handler) self._on_update_handlers.remove(handler)
def generate_sequence(self, confirmed): def _generate_sequence(self, confirmed):
"""Generates the next sequence number, based on whether it """Generates the next sequence number, based on whether it
was confirmed yet or not""" was confirmed yet or not"""
if confirmed: if confirmed:
@ -109,34 +109,34 @@ class MtProtoSender:
# Only cancel the receive *if* it was the # Only cancel the receive *if* it was the
# updates thread who was receiving. We do # updates thread who was receiving. We do
# not want to cancel other pending requests! # not want to cancel other pending requests!
if self.updates_thread_receiving.is_set(): if self._updates_thread_receiving.is_set():
self.logger.info('Cancelling updates receive from send()...') self._logger.info('Cancelling updates receive from send()...')
self.transport.cancel_receive() self._transport.cancel_receive()
# Now only us can be using this method # Now only us can be using this method
with self.lock: with self._lock:
self.logger.debug('send() acquired the lock') self._logger.debug('send() acquired the lock')
# Set the flag to true so the updates thread stops trying to receive # Set the flag to true so the updates thread stops trying to receive
self.waiting_receive.set() self._waiting_receive.set()
# If any message needs confirmation send an AckRequest first # If any message needs confirmation send an AckRequest first
if self.need_confirmation: if self._need_confirmation:
msgs_ack = MsgsAck(self.need_confirmation) msgs_ack = MsgsAck(self._need_confirmation)
with BinaryWriter() as writer: with BinaryWriter() as writer:
msgs_ack.on_send(writer) msgs_ack.on_send(writer)
self.send_packet(writer.get_bytes(), msgs_ack) self._send_packet(writer.get_bytes(), msgs_ack)
del self.need_confirmation[:] del self._need_confirmation[:]
# Finally send our packed request # Finally send our packed request
with BinaryWriter() as writer: with BinaryWriter() as writer:
request.on_send(writer) request.on_send(writer)
self.send_packet(writer.get_bytes(), request) self._send_packet(writer.get_bytes(), request)
# And update the saved session # And update the saved session
self.session.save() self.session.save()
self.logger.debug('send() released the lock') self._logger.debug('send() released the lock')
def receive(self, request, timeout=timedelta(seconds=5)): def receive(self, request, timeout=timedelta(seconds=5)):
"""Receives the specified MTProtoRequest ("fills in it" """Receives the specified MTProtoRequest ("fills in it"
@ -144,29 +144,29 @@ class MtProtoSender:
An optional timeout can be specified to cancel the operation An optional timeout can be specified to cancel the operation
if no data has been read after its time delta""" if no data has been read after its time delta"""
with self.lock: with self._lock:
self.logger.debug('receive() acquired the lock') self._logger.debug('receive() acquired the lock')
# Don't stop trying to receive until we get the request we wanted # Don't stop trying to receive until we get the request we wanted
while not request.confirm_received: while not request.confirm_received:
self.logger.info('Trying to .receive() the request result...') self._logger.info('Trying to .receive() the request result...')
seq, body = self.transport.receive(timeout) seq, body = self._transport.receive(timeout)
message, remote_msg_id, remote_sequence = self.decode_msg(body) message, remote_msg_id, remote_sequence = self._decode_msg(body)
with BinaryReader(message) as reader: with BinaryReader(message) as reader:
self.process_msg(remote_msg_id, remote_sequence, reader, self._process_msg(remote_msg_id, remote_sequence, reader,
request) request)
self.logger.info('Request result received') self._logger.info('Request result received')
# We can now set the flag to False thus resuming the updates thread # We can now set the flag to False thus resuming the updates thread
self.waiting_receive.clear() self._waiting_receive.clear()
self.logger.debug('receive() released the lock') self._logger.debug('receive() released the lock')
# endregion # endregion
# region Low level processing # region Low level processing
def send_packet(self, packet, request): def _send_packet(self, packet, request):
"""Sends the given packet bytes with the additional """Sends the given packet bytes with the additional
information of the original request. This does NOT lock the threads!""" information of the original request. This does NOT lock the threads!"""
request.msg_id = self.session.get_new_msg_id() request.msg_id = self.session.get_new_msg_id()
@ -176,7 +176,7 @@ class MtProtoSender:
plain_writer.write_long(self.session.salt, signed=False) plain_writer.write_long(self.session.salt, signed=False)
plain_writer.write_long(self.session.id, signed=False) plain_writer.write_long(self.session.id, signed=False)
plain_writer.write_long(request.msg_id) plain_writer.write_long(request.msg_id)
plain_writer.write_int(self.generate_sequence(request.confirmed)) plain_writer.write_int(self._generate_sequence(request.confirmed))
plain_writer.write_int(len(packet)) plain_writer.write_int(len(packet))
plain_writer.write(packet) plain_writer.write(packet)
@ -191,9 +191,9 @@ class MtProtoSender:
self.session.auth_key.key_id, signed=False) self.session.auth_key.key_id, signed=False)
cipher_writer.write(msg_key) cipher_writer.write(msg_key)
cipher_writer.write(cipher_text) cipher_writer.write(cipher_text)
self.transport.send(cipher_writer.get_bytes()) self._transport.send(cipher_writer.get_bytes())
def decode_msg(self, body): def _decode_msg(self, body):
"""Decodes an received encrypted message body bytes""" """Decodes an received encrypted message body bytes"""
message = None message = None
remote_msg_id = None remote_msg_id = None
@ -221,39 +221,39 @@ class MtProtoSender:
return message, remote_msg_id, remote_sequence return message, remote_msg_id, remote_sequence
def process_msg(self, msg_id, sequence, reader, request=None): def _process_msg(self, msg_id, sequence, reader, request=None):
"""Processes and handles a Telegram message""" """Processes and handles a Telegram message"""
# TODO Check salt, session_id and sequence_number # TODO Check salt, session_id and sequence_number
self.need_confirmation.append(msg_id) self._need_confirmation.append(msg_id)
code = reader.read_int(signed=False) code = reader.read_int(signed=False)
reader.seek(-4) reader.seek(-4)
# The following codes are "parsed manually" # The following codes are "parsed manually"
if code == 0xf35c6d01: # rpc_result, (response of an RPC call, i.e., we sent a request) if code == 0xf35c6d01: # rpc_result, (response of an RPC call, i.e., we sent a request)
return self.handle_rpc_result(msg_id, sequence, reader, request) return self._handle_rpc_result(msg_id, sequence, reader, request)
if code == 0x347773c5: # pong if code == 0x347773c5: # pong
return self.handle_pong(msg_id, sequence, reader, request) return self._handle_pong(msg_id, sequence, reader, request)
if code == 0x73f1f8dc: # msg_container if code == 0x73f1f8dc: # msg_container
return self.handle_container(msg_id, sequence, reader, request) return self._handle_container(msg_id, sequence, reader, request)
if code == 0x3072cfa1: # gzip_packed if code == 0x3072cfa1: # gzip_packed
return self.handle_gzip_packed(msg_id, sequence, reader, request) return self._handle_gzip_packed(msg_id, sequence, reader, request)
if code == 0xedab447b: # bad_server_salt if code == 0xedab447b: # bad_server_salt
return self.handle_bad_server_salt(msg_id, sequence, reader, return self._handle_bad_server_salt(msg_id, sequence, reader,
request) request)
if code == 0xa7eff811: # bad_msg_notification if code == 0xa7eff811: # bad_msg_notification
return self.handle_bad_msg_notification(msg_id, sequence, reader) return self._handle_bad_msg_notification(msg_id, sequence, reader)
# msgs_ack, it may handle the request we wanted # msgs_ack, it may handle the request we wanted
if code == 0x62d6b459: if code == 0x62d6b459:
ack = reader.tgread_object() ack = reader.tgread_object()
if request and request.msg_id in ack.msg_ids: if request and request.msg_id in ack.msg_ids:
self.logger.warning('Ack found for the current request ID') self._logger.warning('Ack found for the current request ID')
if self.logging_out: if self.logging_out:
self.logger.info('Message ack confirmed the logout request') self._logger.info('Message ack confirmed the logout request')
request.confirm_received = True request.confirm_received = True
return False return False
@ -262,7 +262,7 @@ class MtProtoSender:
# In this case, we will simply treat the incoming TLObject as an Update, # In this case, we will simply treat the incoming TLObject as an Update,
# if we can first find a matching TLObject # if we can first find a matching TLObject
if code in tlobjects.keys(): if code in tlobjects.keys():
return self.handle_update(msg_id, sequence, reader) return self._handle_update(msg_id, sequence, reader)
print('Unknown message: {}'.format(hex(code))) print('Unknown message: {}'.format(hex(code)))
return False return False
@ -271,27 +271,27 @@ class MtProtoSender:
# region Message handling # region Message handling
def handle_update(self, msg_id, sequence, reader): def _handle_update(self, msg_id, sequence, reader):
tlobject = reader.tgread_object() tlobject = reader.tgread_object()
self.logger.debug('Handling update for object %s', repr(tlobject)) self._logger.debug('Handling update for object %s', repr(tlobject))
for handler in self.on_update_handlers: for handler in self._on_update_handlers:
handler(tlobject) handler(tlobject)
return False return False
def handle_pong(self, msg_id, sequence, reader, request): def _handle_pong(self, msg_id, sequence, reader, request):
self.logger.debug('Handling pong') self._logger.debug('Handling pong')
reader.read_int(signed=False) # code reader.read_int(signed=False) # code
received_msg_id = reader.read_long(signed=False) received_msg_id = reader.read_long(signed=False)
if received_msg_id == request.msg_id: if received_msg_id == request.msg_id:
self.logger.warning('Pong confirmed a request') self._logger.warning('Pong confirmed a request')
request.confirm_received = True request.confirm_received = True
return False return False
def handle_container(self, msg_id, sequence, reader, request): def _handle_container(self, msg_id, sequence, reader, request):
self.logger.debug('Handling container') self._logger.debug('Handling container')
reader.read_int(signed=False) # code reader.read_int(signed=False) # code
size = reader.read_int() size = reader.read_int()
for _ in range(size): for _ in range(size):
@ -302,13 +302,13 @@ class MtProtoSender:
# note: this code is IMPORTANT for skipping RPC results of lost # note: this code is IMPORTANT for skipping RPC results of lost
# requests (for example, ones from the previous connection session) # requests (for example, ones from the previous connection session)
if not self.process_msg(inner_msg_id, sequence, reader, request): if not self._process_msg(inner_msg_id, sequence, reader, request):
reader.set_position(begin_position + inner_length) reader.set_position(begin_position + inner_length)
return False return False
def handle_bad_server_salt(self, msg_id, sequence, reader, request): def _handle_bad_server_salt(self, msg_id, sequence, reader, request):
self.logger.debug('Handling bad server salt') self._logger.debug('Handling bad server salt')
reader.read_int(signed=False) # code reader.read_int(signed=False) # code
reader.read_long(signed=False) # bad_msg_id reader.read_long(signed=False) # bad_msg_id
reader.read_int() # bad_msg_seq_no reader.read_int() # bad_msg_seq_no
@ -326,8 +326,8 @@ class MtProtoSender:
return True return True
def handle_bad_msg_notification(self, msg_id, sequence, reader): def _handle_bad_msg_notification(self, msg_id, sequence, reader):
self.logger.debug('Handling bad message notification') self._logger.debug('Handling bad message notification')
reader.read_int(signed=False) # code reader.read_int(signed=False) # code
reader.read_long(signed=False) # request_id reader.read_long(signed=False) # request_id
reader.read_int() # request_sequence reader.read_int() # request_sequence
@ -339,13 +339,13 @@ class MtProtoSender:
# Use the current msg_id to determine the right time offset. # Use the current msg_id to determine the right time offset.
self.session.update_time_offset(correct_msg_id=msg_id) self.session.update_time_offset(correct_msg_id=msg_id)
self.session.save() self.session.save()
self.logger.warning('Read Bad Message error: ' + str(error)) self._logger.warning('Read Bad Message error: ' + str(error))
self.logger.info('Attempting to use the correct time offset.') self._logger.info('Attempting to use the correct time offset.')
else: else:
raise error raise error
def handle_rpc_result(self, msg_id, sequence, reader, request): def _handle_rpc_result(self, msg_id, sequence, reader, request):
self.logger.debug('Handling RPC result, request is%s None', ' not' if request else '') self._logger.debug('Handling RPC result, request is%s None', ' not' if request else '')
reader.read_int(signed=False) # code reader.read_int(signed=False) # code
request_id = reader.read_long(signed=False) request_id = reader.read_long(signed=False)
inner_code = reader.read_int(signed=False) inner_code = reader.read_int(signed=False)
@ -357,7 +357,7 @@ class MtProtoSender:
error = RPCError( error = RPCError(
code=reader.read_int(), message=reader.tgread_string()) code=reader.read_int(), message=reader.tgread_string())
self.logger.warning('Read RPC error: %s', str(error)) self._logger.warning('Read RPC error: %s', str(error))
if error.must_resend: if error.must_resend:
if not request: if not request:
raise ValueError( raise ValueError(
@ -366,7 +366,7 @@ class MtProtoSender:
request.confirm_received = False request.confirm_received = False
if error.message.startswith('FLOOD_WAIT_'): if error.message.startswith('FLOOD_WAIT_'):
self.updates_thread_sleep = error.additional_data self._updates_thread_sleep = error.additional_data
raise FloodWaitError(seconds=error.additional_data) raise FloodWaitError(seconds=error.additional_data)
elif '_MIGRATE_' in error.message: elif '_MIGRATE_' in error.message:
@ -379,7 +379,7 @@ class MtProtoSender:
raise ValueError( raise ValueError(
'Cannot receive a request from inside an RPC result from the updates thread.') 'Cannot receive a request from inside an RPC result from the updates thread.')
self.logger.debug('Reading request response') self._logger.debug('Reading request response')
if inner_code == 0x3072cfa1: # GZip packed if inner_code == 0x3072cfa1: # GZip packed
unpacked_data = gzip.decompress(reader.tgread_bytes()) unpacked_data = gzip.decompress(reader.tgread_bytes())
with BinaryReader(unpacked_data) as compressed_reader: with BinaryReader(unpacked_data) as compressed_reader:
@ -391,107 +391,107 @@ class MtProtoSender:
else: else:
# note: if it's really a result for RPC from previous connection # note: if it's really a result for RPC from previous connection
# session, it will be skipped by the handle_container() # session, it will be skipped by the handle_container()
self.logger.warning('RPC result found for unknown request (maybe from previous connection session)') self._logger.warning('RPC result found for unknown request (maybe from previous connection session)')
def handle_gzip_packed(self, msg_id, sequence, reader, request): def _handle_gzip_packed(self, msg_id, sequence, reader, request):
self.logger.debug('Handling gzip packed data') self._logger.debug('Handling gzip packed data')
reader.read_int(signed=False) # code reader.read_int(signed=False) # code
packed_data = reader.tgread_bytes() packed_data = reader.tgread_bytes()
unpacked_data = gzip.decompress(packed_data) unpacked_data = gzip.decompress(packed_data)
with BinaryReader(unpacked_data) as compressed_reader: with BinaryReader(unpacked_data) as compressed_reader:
return self.process_msg(msg_id, sequence, compressed_reader, return self._process_msg(msg_id, sequence, compressed_reader,
request) request)
# endregion # endregion
def set_updates_thread(self, running): def _set_updates_thread(self, running):
"""Sets the updates thread status (running or not)""" """Sets the updates thread status (running or not)"""
if not self.updates_thread or \ if not self._updates_thread or \
running == self.updates_thread_running.is_set(): running == self._updates_thread_running.is_set():
return return
# Different state, update the saved value and behave as required # Different state, update the saved value and behave as required
self.logger.info('Changing updates thread running status to %s', running) self._logger.info('Changing updates thread running status to %s', running)
if running: if running:
self.updates_thread_running.set() self._updates_thread_running.set()
self.updates_thread.start() self._updates_thread.start()
else: else:
self.updates_thread_running.clear() self._updates_thread_running.clear()
if self.updates_thread_receiving.is_set(): if self._updates_thread_receiving.is_set():
self.transport.cancel_receive() self._transport.cancel_receive()
def updates_thread_method(self): def _updates_thread_method(self):
"""This method will run until specified and listen for incoming updates""" """This method will run until specified and listen for incoming updates"""
# Set a reasonable timeout when checking for updates # Set a reasonable timeout when checking for updates
timeout = timedelta(minutes=1) timeout = timedelta(minutes=1)
while self.updates_thread_running.is_set(): while self._updates_thread_running.is_set():
# Always sleep a bit before each iteration to relax the CPU, # Always sleep a bit before each iteration to relax the CPU,
# since it's possible to early 'continue' the loop to reach # since it's possible to early 'continue' the loop to reach
# the next iteration, but we still should to sleep. # the next iteration, but we still should to sleep.
if self.updates_thread_sleep: if self._updates_thread_sleep:
sleep(self.updates_thread_sleep) sleep(self._updates_thread_sleep)
self.updates_thread_sleep = None self._updates_thread_sleep = None
else: else:
# Longer sleep if we're not expecting updates (only pings) # Longer sleep if we're not expecting updates (only pings)
sleep(0.1 if self.on_update_handlers else 1) sleep(0.1 if self._on_update_handlers else 1)
# Only try to receive updates if we're not waiting to receive a request # Only try to receive updates if we're not waiting to receive a request
if not self.waiting_receive.is_set(): if not self._waiting_receive.is_set():
with self.lock: with self._lock:
self.logger.debug('Updates thread acquired the lock') self._logger.debug('Updates thread acquired the lock')
try: try:
now = time() now = time()
# If ping_interval seconds passed since last ping, send a new one # If ping_interval seconds passed since last ping, send a new one
if now >= self.ping_time_last + self.ping_interval: if now >= self._ping_time_last + self.ping_interval:
self.ping_time_last = now self._ping_time_last = now
self.send_ping() self.send_ping()
self.logger.debug('Ping sent from the updates thread') self._logger.debug('Ping sent from the updates thread')
# Exit the loop if we're not expecting to receive any updates # Exit the loop if we're not expecting to receive any updates
if not self.on_update_handlers: if not self._on_update_handlers:
self.logger.debug('No updates handlers found, continuing') self._logger.debug('No updates handlers found, continuing')
continue continue
self.updates_thread_receiving.set() self._updates_thread_receiving.set()
self.logger.debug('Trying to receive updates from the updates thread') self._logger.debug('Trying to receive updates from the updates thread')
seq, body = self.transport.receive(timeout) seq, body = self._transport.receive(timeout)
message, remote_msg_id, remote_sequence = self.decode_msg( message, remote_msg_id, remote_sequence = self._decode_msg(
body) body)
self.logger.info('Received update from the updates thread') self._logger.info('Received update from the updates thread')
with BinaryReader(message) as reader: with BinaryReader(message) as reader:
self.process_msg(remote_msg_id, remote_sequence, self._process_msg(remote_msg_id, remote_sequence,
reader) reader)
except TimeoutError: except TimeoutError:
self.logger.debug('Receiving updates timed out') self._logger.debug('Receiving updates timed out')
# TODO Workaround for issue #50 # TODO Workaround for issue #50
r = GetStateRequest() r = GetStateRequest()
try: try:
self.logger.debug('Sending GetStateRequest (workaround for issue #50)') self._logger.debug('Sending GetStateRequest (workaround for issue #50)')
self.send(r) self.send(r)
self.receive(r) self.receive(r)
except TimeoutError: except TimeoutError:
self.logger.warning('Timed out inside a timeout, trying to reconnect...') self._logger.warning('Timed out inside a timeout, trying to reconnect...')
self.reconnect() self.reconnect()
self.send(r) self.send(r)
self.receive(r) self.receive(r)
except ReadCancelledError: except ReadCancelledError:
self.logger.info('Receiving updates cancelled') self._logger.info('Receiving updates cancelled')
except OSError: except OSError:
self.logger.warning('OSError on updates thread, %s logging out', self._logger.warning('OSError on updates thread, %s logging out',
'was' if self.logging_out else 'was not') 'was' if self.logging_out else 'was not')
if self.logging_out: if self.logging_out:
# This error is okay when logging out, means we got disconnected # This error is okay when logging out, means we got disconnected
# TODO Not sure why this happens because we call disconnect()… # TODO Not sure why this happens because we call disconnect()…
self.set_updates_thread(running=False) self._set_updates_thread(running=False)
else: else:
raise raise
self.logger.debug('Updates thread released the lock') self._logger.debug('Updates thread released the lock')
self.updates_thread_receiving.clear() self._updates_thread_receiving.clear()

View File

@ -126,7 +126,7 @@ class TelegramClient:
print('Could not stabilise initial connection: {}'.format(error)) print('Could not stabilise initial connection: {}'.format(error))
return False return False
def reconnect_to_dc(self, dc_id): def _reconnect_to_dc(self, dc_id):
"""Reconnects to the specified DC ID. This is automatically called after an InvalidDCError is raised""" """Reconnects to the specified DC ID. This is automatically called after an InvalidDCError is raised"""
if self.dc_options is None or not self.dc_options: if self.dc_options is None or not self.dc_options:
raise ConnectionError( raise ConnectionError(
@ -178,7 +178,7 @@ class TelegramClient:
if throw_invalid_dc: if throw_invalid_dc:
raise raise
self.reconnect_to_dc(error.new_dc) self._reconnect_to_dc(error.new_dc)
return self.invoke(request, timeout=timeout, throw_invalid_dc=True) return self.invoke(request, timeout=timeout, throw_invalid_dc=True)
# region Authorization requests # region Authorization requests