From 1f7ac7118750ed84e2165dce9c6aca2e6ea0c6a4 Mon Sep 17 00:00:00 2001 From: Lonami Exo Date: Mon, 10 Jul 2017 15:21:20 +0200 Subject: [PATCH] Debug level should always be used for logging since it's a library --- telethon/extensions/threaded_tcp_client.py | 94 ++++++++++++++++++++++ telethon/network/mtproto_sender.py | 20 ++--- telethon/telegram_bare_client.py | 6 +- telethon/telegram_client.py | 16 ++-- 4 files changed, 115 insertions(+), 21 deletions(-) create mode 100644 telethon/extensions/threaded_tcp_client.py diff --git a/telethon/extensions/threaded_tcp_client.py b/telethon/extensions/threaded_tcp_client.py new file mode 100644 index 00000000..d97ce6ee --- /dev/null +++ b/telethon/extensions/threaded_tcp_client.py @@ -0,0 +1,94 @@ +import socket +import time +from datetime import datetime, timedelta +from io import BytesIO, BufferedWriter +from threading import Event, Lock, Thread, Condition + +from ..errors import ReadCancelledError + + +class ThreadedTcpClient: + """The main difference with the TcpClient class is that this one + will spawn a secondary thread that will be constantly reading + from the network and putting everything on another buffer. + """ + def __init__(self, proxy=None): + self.connected = False + self._proxy = proxy + self._recreate_socket() + + # Support for multi-threading advantages and safety + self.cancelled = Event() # Has the read operation been cancelled? + self.delay = 0.1 # Read delay when there was no data available + self._lock = Lock() + + self._buffer = [] + self._read_thread = Thread(target=self._reading_thread, daemon=True) + self._cv = Condition() # Condition Variable + + def _recreate_socket(self): + if self._proxy is None: + self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + else: + import socks + self._socket = socks.socksocket(socket.AF_INET, socket.SOCK_STREAM) + if type(self._proxy) is dict: + self._socket.set_proxy(**self._proxy) + else: # tuple, list, etc. + self._socket.set_proxy(*self._proxy) + + def connect(self, ip, port, timeout): + """Connects to the specified IP and port number. + 'timeout' must be given in seconds + """ + if not self.connected: + self._socket.settimeout(timeout) + self._socket.connect((ip, port)) + self._socket.setblocking(False) + self.connected = True + + def close(self): + """Closes the connection""" + if self.connected: + self._socket.shutdown(socket.SHUT_RDWR) + self._socket.close() + self.connected = False + self._recreate_socket() + + def write(self, data): + """Writes (sends) the specified bytes to the connected peer""" + self._socket.sendall(data) + + def read(self, size, timeout=timedelta(seconds=5)): + """Reads (receives) a whole block of 'size bytes + from the connected peer. + + A timeout can be specified, which will cancel the operation if + no data has been read in the specified time. If data was read + and it's waiting for more, the timeout will NOT cancel the + operation. Set to None for no timeout + """ + with self._cv: + print('wait for...') + self._cv.wait_for(lambda: len(self._buffer) >= size, timeout=timeout.seconds) + print('got', size) + result, self._buffer = self._buffer[:size], self._buffer[size:] + return result + + def _reading_thread(self): + while True: + partial = self._socket.recv(4096) + if len(partial) == 0: + self.connected = False + raise ConnectionResetError( + 'The server has closed the connection.') + + with self._cv: + print('extended', len(partial)) + self._buffer.extend(partial) + self._cv.notify() + + def cancel_read(self): + """Cancels the read operation IF it hasn't yet + started, raising a ReadCancelledError""" + self.cancelled.set() diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index 0266e7b4..32059986 100644 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -100,7 +100,7 @@ class MtProtoSender: # or, if there is no request, until we read an update while (request and not request.confirm_received) or \ (not request and not updates): - self._logger.info('Trying to .receive() the request result...') + self._logger.debug('Trying to .receive() the request result...') seq, body = self.transport.receive(**kwargs) message, remote_msg_id, remote_seq = self._decode_msg(body) @@ -114,7 +114,7 @@ class MtProtoSender: self._pending_receive.remove(request) except ValueError: pass - self._logger.info('Request result received') + self._logger.debug('Request result received') self._logger.debug('receive() released the lock') def receive_updates(self, **kwargs): @@ -226,10 +226,10 @@ class MtProtoSender: ack = reader.tgread_object() for r in self._pending_receive: if r.request_msg_id in ack.msg_ids: - self._logger.warning('Ack found for the a request') + self._logger.debug('Ack found for the a request') if self.logging_out: - self._logger.info('Message ack confirmed a request') + self._logger.debug('Message ack confirmed a request') r.confirm_received = True return True @@ -247,7 +247,7 @@ class MtProtoSender: return True - self._logger.warning('Unknown message: {}'.format(hex(code))) + self._logger.debug('Unknown message: {}'.format(hex(code))) return False # endregion @@ -263,7 +263,7 @@ class MtProtoSender: request = next(r for r in self._pending_receive if r.request_msg_id == received_msg_id) - self._logger.warning('Pong confirmed a request') + self._logger.debug('Pong confirmed a request') request.confirm_received = True except StopIteration: pass @@ -318,8 +318,8 @@ class MtProtoSender: # Use the current msg_id to determine the right time offset. self.session.update_time_offset(correct_msg_id=msg_id) self.session.save() - self._logger.warning('Read Bad Message error: ' + str(error)) - self._logger.info('Attempting to use the correct time offset.') + self._logger.debug('Read Bad Message error: ' + str(error)) + self._logger.debug('Attempting to use the correct time offset.') return True else: raise error @@ -346,7 +346,7 @@ class MtProtoSender: self._need_confirmation.append(request_id) self._send_acknowledges() - self._logger.warning('Read RPC error: %s', str(error)) + self._logger.debug('Read RPC error: %s', str(error)) if isinstance(error, InvalidDCError): # Must resend this request, if any if request: @@ -368,7 +368,7 @@ class MtProtoSender: else: # If it's really a result for RPC from previous connection # session, it will be skipped by the handle_container() - self._logger.warning('Lost request will be skipped.') + self._logger.debug('Lost request will be skipped.') return False def _handle_gzip_packed(self, msg_id, sequence, reader, updates): diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index 6076d6ca..c45498cf 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -90,7 +90,7 @@ class TelegramBareClient: determine the authorization key for the current session. """ if self._sender and self._sender.is_connected(): - self._logger.warning( + self._logger.debug( 'Attempted to connect when the client was already connected.' ) return @@ -143,7 +143,7 @@ class TelegramBareClient: except (RPCError, ConnectionError) as error: # Probably errors from the previous session, ignore them self.disconnect() - self._logger.warning('Could not stabilise initial connection: {}' + self._logger.debug('Could not stabilise initial connection: {}' .format(error)) return False @@ -277,7 +277,7 @@ class TelegramBareClient: return request.result except ConnectionResetError: - self._logger.info('Server disconnected us. Reconnecting and ' + self._logger.debug('Server disconnected us. Reconnecting and ' 'resending request...') self.reconnect() return self.invoke(request) diff --git a/telethon/telegram_client.py b/telethon/telegram_client.py index 9447e8ae..7f7ba729 100644 --- a/telethon/telegram_client.py +++ b/telethon/telegram_client.py @@ -214,7 +214,7 @@ class TelegramClient(TelegramBareClient): return result except (PhoneMigrateError, NetworkMigrateError, UserMigrateError) as e: - self._logger.info('DC error when invoking request, ' + self._logger.debug('DC error when invoking request, ' 'attempting to reconnect at DC {}' .format(e.new_dc)) @@ -698,7 +698,7 @@ class TelegramClient(TelegramBareClient): return # Different state, update the saved value and behave as required - self._logger.info('Changing updates thread running status to %s', running) + self._logger.debug('Changing updates thread running status to %s', running) if running: self._updates_thread_running.set() if not self._updates_thread: @@ -739,7 +739,7 @@ class TelegramClient(TelegramBareClient): updates = self._sender.receive_updates(timeout=timeout) self._updates_thread_receiving.clear() - self._logger.info( + self._logger.debug( 'Received {} update(s) from the updates thread' .format(len(updates)) ) @@ -748,25 +748,25 @@ class TelegramClient(TelegramBareClient): handler(update) except ConnectionResetError: - self._logger.info('Server disconnected us. Reconnecting...') + self._logger.debug('Server disconnected us. Reconnecting...') self.reconnect() except TimeoutError: self._logger.debug('Receiving updates timed out') except ReadCancelledError: - self._logger.info('Receiving updates cancelled') + self._logger.debug('Receiving updates cancelled') except BrokenPipeError: - self._logger.info('Tcp session is broken. Reconnecting...') + self._logger.debug('Tcp session is broken. Reconnecting...') self.reconnect() except InvalidChecksumError: - self._logger.info('MTProto session is broken. Reconnecting...') + self._logger.debug('MTProto session is broken. Reconnecting...') self.reconnect() except OSError: - self._logger.warning('OSError on updates thread, %s logging out', + self._logger.debug('OSError on updates thread, %s logging out', 'was' if self._sender.logging_out else 'was not') if self._sender.logging_out: