Add a thread to constantly read messages from the network

This commit is contained in:
Lonami Exo 2017-09-02 18:27:22 +02:00
parent 0197271f74
commit 43b79c3d36
3 changed files with 33 additions and 53 deletions

View File

@ -1,6 +1,6 @@
import gzip import gzip
from datetime import timedelta from datetime import timedelta
from threading import RLock from threading import RLock, Thread
from .. import helpers as utils from .. import helpers as utils
from ..crypto import AES from ..crypto import AES
@ -31,9 +31,16 @@ class MtProtoSender:
# TODO There might be a better way to handle msgs_ack requests # TODO There might be a better way to handle msgs_ack requests
self.logging_out = False self.logging_out = False
# Reading and writing shouldn't be related. Call .recv() forever here.
# TODO Maybe this could be disabled with some "constant_read=bool".
self._recv_thread = Thread(
name='ReadThread', daemon=True, target=self._recv_thread_impl
)
def connect(self): def connect(self):
"""Connects to the server""" """Connects to the server"""
self.connection.connect() self.connection.connect()
self._recv_thread.start()
def is_connected(self): def is_connected(self):
return self.connection.is_connected() return self.connection.is_connected()
@ -76,57 +83,30 @@ class MtProtoSender:
del self._need_confirmation[:] del self._need_confirmation[:]
def receive(self, request=None, updates=None, **kwargs): def _recv_thread_impl(self):
"""Receives the specified MTProtoRequest ("fills in it" while self.is_connected():
the received data). This also restores the updates thread. try:
self._receive_message()
except TimeoutError:
# No problem.
pass
def _receive_message(self, **kwargs):
"""Receives a single message from the connected endpoint.
An optional named parameter 'timeout' can be specified if An optional named parameter 'timeout' can be specified if
one desires to override 'self.connection.timeout'. one desires to override 'self.connection.timeout'.
If 'request' is None, a single item will be read into
the 'updates' list (which cannot be None).
If 'request' is not None, any update received before
reading the request's result will be put there unless
it's None, in which case updates will be ignored.
""" """
if request is None and updates is None: # TODO Don't ignore updates
raise ValueError('Both the "request" and "updates"' self._logger.debug('Receiving a message...')
'parameters cannot be None at the same time.')
with self._lock:
self._logger.debug('receive() acquired the lock')
# Don't stop trying to receive until we get the request we wanted
# or, if there is no request, until we read an update
while (request and not request.confirm_received) or \
(not request and not updates):
self._logger.debug('Trying to .receive() the request result...')
body = self.connection.recv(**kwargs) body = self.connection.recv(**kwargs)
message, remote_msg_id, remote_seq = self._decode_msg(body) message, remote_msg_id, remote_seq = self._decode_msg(body)
with BinaryReader(message) as reader: with BinaryReader(message) as reader:
self._process_msg( self._process_msg(
remote_msg_id, remote_seq, reader, updates) remote_msg_id, remote_seq, reader, updates=None)
# We're done receiving, remove the request from pending, if any self._logger.debug('Received message.')
if request:
try:
self._pending_receive.remove(request)
except ValueError: pass
self._logger.debug('Request result received')
self._logger.debug('receive() released the lock')
def receive_updates(self, **kwargs):
"""Wrapper for .receive(request=None, updates=[])"""
updates = []
self.receive(updates=updates, **kwargs)
return updates
def cancel_receive(self):
"""Cancels any pending receive operation
by raising a ReadCancelledError"""
self.connection.cancel_receive()
# endregion # endregion

View File

@ -1,5 +1,5 @@
import logging import logging
import pyaes from time import sleep
from datetime import timedelta from datetime import timedelta
from hashlib import md5 from hashlib import md5
from os import path from os import path
@ -318,7 +318,8 @@ class TelegramBareClient:
try: try:
self._sender.send(request) self._sender.send(request)
self._sender.receive(request, updates=updates) while not request.confirm_received:
sleep(0.1) # TODO Use a proper lock
return request.result return request.result
except ConnectionResetError: except ConnectionResetError:

View File

@ -184,9 +184,6 @@ class TelegramClient(TelegramBareClient):
*args will be ignored. *args will be ignored.
""" """
if self._updates_thread_receiving.is_set():
self._sender.cancel_receive()
try: try:
self._lock.acquire() self._lock.acquire()
@ -918,7 +915,8 @@ class TelegramClient(TelegramBareClient):
else: else:
self._updates_thread_running.clear() self._updates_thread_running.clear()
if self._updates_thread_receiving.is_set(): if self._updates_thread_receiving.is_set():
self._sender.cancel_receive() # self._sender.cancel_receive()
pass
def _updates_thread_method(self): def _updates_thread_method(self):
"""This method will run until specified and listen for incoming updates""" """This method will run until specified and listen for incoming updates"""
@ -944,7 +942,8 @@ class TelegramClient(TelegramBareClient):
self._next_ping_at = time() + self.ping_interval self._next_ping_at = time() + self.ping_interval
self(PingRequest(utils.generate_random_long())) self(PingRequest(utils.generate_random_long()))
updates = self._sender.receive_updates(timeout=timeout) #updates = self._sender.receive_updates(timeout=timeout)
updates = []
self._updates_thread_receiving.clear() self._updates_thread_receiving.clear()
self._logger.debug( self._logger.debug(