Use RLocks properly on MtProtoSender (only needed on net IO)

This commit is contained in:
Lonami Exo 2017-09-07 21:32:46 +02:00
parent 16a5ab3070
commit 25bbb20b0c
2 changed files with 19 additions and 38 deletions

View File

@ -28,8 +28,10 @@ class MtProtoSender:
self._need_confirmation = [] # Message IDs that need confirmation
self._pending_receive = [] # Requests sent waiting to be received
# Store an RLock instance to make this class safely multi-threaded
self._lock = RLock()
# Sending and receiving are independent, but two threads cannot
# send or receive at the same time no matter what.
self._send_lock = RLock()
self._recv_lock = RLock()
# Used when logging out, the only request that seems to use 'ack'
# TODO There might be a better way to handle msgs_ack requests
@ -52,23 +54,17 @@ class MtProtoSender:
"""Sends the specified MTProtoRequest, previously sending any message
which needed confirmation."""
# Now only us can be using this method
with self._lock:
self._logger.debug('send() acquired the lock')
# If any message needs confirmation send an AckRequest first
self._send_acknowledges()
# If any message needs confirmation send an AckRequest first
self._send_acknowledges()
# Finally send our packed request
with BinaryWriter() as writer:
request.on_send(writer)
self._send_packet(writer.get_bytes(), request)
self._pending_receive.append(request)
# Finally send our packed request
with BinaryWriter() as writer:
request.on_send(writer)
self._send_packet(writer.get_bytes(), request)
self._pending_receive.append(request)
# And update the saved session
self.session.save()
self._logger.debug('send() released the lock')
# And update the saved session
self.session.save()
def _send_acknowledges(self):
"""Sends a messages acknowledge for all those who _need_confirmation"""
@ -90,16 +86,13 @@ class MtProtoSender:
Any unhandled object (likely updates) will be passed to
update_state.process(TLObject).
"""
# TODO Don't ignore updates
self._logger.debug('Receiving a message...')
body = self.connection.recv()
message, remote_msg_id, remote_seq = self._decode_msg(body)
with self._recv_lock:
body = self.connection.recv()
message, remote_msg_id, remote_seq = self._decode_msg(body)
with BinaryReader(message) as reader:
self._process_msg(remote_msg_id, remote_seq, reader, update_state)
self._logger.debug('Received message.')
# endregion
# region Low level processing
@ -107,8 +100,6 @@ class MtProtoSender:
def _send_packet(self, packet, request):
"""Sends the given packet bytes with the additional
information of the original request.
This does NOT lock the threads!
"""
request.request_msg_id = self.session.get_new_msg_id()
@ -134,7 +125,8 @@ class MtProtoSender:
self.session.auth_key.key_id, signed=False)
cipher_writer.write(msg_key)
cipher_writer.write(cipher_text)
self.connection.send(cipher_writer.get_bytes())
with self._send_lock:
self.connection.send(cipher_writer.get_bytes())
def _decode_msg(self, body):
"""Decodes an received encrypted message body bytes"""

View File

@ -2,7 +2,7 @@ import os
import threading
from datetime import datetime, timedelta
from mimetypes import guess_type
from threading import RLock, Thread
from threading import Thread
from . import TelegramBareClient
from . import helpers as utils
@ -50,9 +50,6 @@ class TelegramClient(TelegramBareClient):
As opposed to the TelegramBareClient, this one features downloading
media from different data centers, starting a second thread to
handle updates, and some very common functionality.
This should be used when the (slight) overhead of having locks,
threads, and possibly multiple connections is not an issue.
"""
# region Initialization
@ -118,9 +115,6 @@ class TelegramClient(TelegramBareClient):
timeout=timeout
)
# Safety across multiple threads (for the updates thread)
self._lock = RLock()
# Used on connection - the user may modify these and reconnect
kwargs['app_version'] = kwargs.get('app_version', self.__version__)
for name, value in kwargs.items():
@ -239,8 +233,6 @@ class TelegramClient(TelegramBareClient):
raise AssertionError('Cannot invoke requests from the ReadThread')
try:
self._lock.acquire()
# Users may call this method from within some update handler.
# If this is the case, then the thread invoking the request
# will be the one which should be reading (but is invoking the
@ -259,9 +251,6 @@ class TelegramClient(TelegramBareClient):
self.reconnect(new_dc=e.new_dc)
return self.invoke(request)
finally:
self._lock.release()
# Let people use client(SomeRequest()) instead client.invoke(...)
__call__ = invoke