mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2025-01-24 08:14:14 +03:00
Make MtProtoSender not thread-safe
Rationale: a new connection should be spawned if one desires to send and receive requests in parallel, which would otherwise cause one of either threads to lock.
This commit is contained in:
parent
003e231239
commit
5da300ca84
|
@ -1,7 +1,6 @@
|
||||||
import gzip
|
import gzip
|
||||||
import logging
|
import logging
|
||||||
import struct
|
import struct
|
||||||
from threading import RLock
|
|
||||||
|
|
||||||
from .. import helpers as utils
|
from .. import helpers as utils
|
||||||
from ..crypto import AES
|
from ..crypto import AES
|
||||||
|
@ -20,7 +19,12 @@ logging.getLogger(__name__).addHandler(logging.NullHandler())
|
||||||
|
|
||||||
class MtProtoSender:
|
class MtProtoSender:
|
||||||
"""MTProto Mobile Protocol sender
|
"""MTProto Mobile Protocol sender
|
||||||
(https://core.telegram.org/mtproto/description)
|
(https://core.telegram.org/mtproto/description).
|
||||||
|
|
||||||
|
Note that this class is not thread-safe, and calling send/receive
|
||||||
|
from two or more threads at the same time is undefined behaviour.
|
||||||
|
Rationale: a new connection should be spawned to send/receive requests
|
||||||
|
in parallel, so thread-safety (hence locking) isn't needed.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, session, connection):
|
def __init__(self, session, connection):
|
||||||
|
@ -37,11 +41,6 @@ class MtProtoSender:
|
||||||
# Requests (as msg_id: Message) sent waiting to be received
|
# Requests (as msg_id: Message) sent waiting to be received
|
||||||
self._pending_receive = {}
|
self._pending_receive = {}
|
||||||
|
|
||||||
# Sending and receiving are independent, but two threads cannot
|
|
||||||
# send or receive at the same time no matter what.
|
|
||||||
self._send_lock = RLock()
|
|
||||||
self._recv_lock = RLock()
|
|
||||||
|
|
||||||
def connect(self):
|
def connect(self):
|
||||||
"""Connects to the server"""
|
"""Connects to the server"""
|
||||||
self.connection.connect()
|
self.connection.connect()
|
||||||
|
@ -93,19 +92,18 @@ class MtProtoSender:
|
||||||
Any unhandled object (likely updates) will be passed to
|
Any unhandled object (likely updates) will be passed to
|
||||||
update_state.process(TLObject).
|
update_state.process(TLObject).
|
||||||
"""
|
"""
|
||||||
with self._recv_lock:
|
try:
|
||||||
try:
|
body = self.connection.recv()
|
||||||
body = self.connection.recv()
|
except (BufferError, InvalidChecksumError):
|
||||||
except (BufferError, InvalidChecksumError):
|
# TODO BufferError, we should spot the cause...
|
||||||
# TODO BufferError, we should spot the cause...
|
# "No more bytes left"; something wrong happened, clear
|
||||||
# "No more bytes left"; something wrong happened, clear
|
# everything to be on the safe side, or:
|
||||||
# everything to be on the safe side, or:
|
#
|
||||||
#
|
# "This packet should be skipped"; since this may have
|
||||||
# "This packet should be skipped"; since this may have
|
# been a result for a request, invalidate every request
|
||||||
# been a result for a request, invalidate every request
|
# and just re-invoke them to avoid problems
|
||||||
# and just re-invoke them to avoid problems
|
self._clear_all_pending()
|
||||||
self._clear_all_pending()
|
return
|
||||||
return
|
|
||||||
|
|
||||||
message, remote_msg_id, remote_seq = self._decode_msg(body)
|
message, remote_msg_id, remote_seq = self._decode_msg(body)
|
||||||
with BinaryReader(message) as reader:
|
with BinaryReader(message) as reader:
|
||||||
|
@ -128,8 +126,7 @@ class MtProtoSender:
|
||||||
cipher_text = AES.encrypt_ige(plain_text, key, iv)
|
cipher_text = AES.encrypt_ige(plain_text, key, iv)
|
||||||
|
|
||||||
result = key_id + msg_key + cipher_text
|
result = key_id + msg_key + cipher_text
|
||||||
with self._send_lock:
|
self.connection.send(result)
|
||||||
self.connection.send(result)
|
|
||||||
|
|
||||||
def _decode_msg(self, body):
|
def _decode_msg(self, body):
|
||||||
"""Decodes an received encrypted message body bytes"""
|
"""Decodes an received encrypted message body bytes"""
|
||||||
|
|
Loading…
Reference in New Issue
Block a user