From cb2d943139bb0de42fd7014d887866eb7ab4dea2 Mon Sep 17 00:00:00 2001 From: Andrey Egorov Date: Sun, 29 Oct 2017 15:33:03 +0300 Subject: [PATCH 1/2] Remove forgotten points --- telethon/network/mtproto_sender.py | 4 ++-- telethon/telegram_bare_client.py | 2 +- telethon/tl/tl_message.py | 2 -- 3 files changed, 3 insertions(+), 5 deletions(-) diff --git a/telethon/network/mtproto_sender.py b/telethon/network/mtproto_sender.py index e84cb6f0..26e5302a 100644 --- a/telethon/network/mtproto_sender.py +++ b/telethon/network/mtproto_sender.py @@ -275,12 +275,10 @@ class MtProtoSender: async def _resend_request(self, msg_id): request = self._pop_request(msg_id) if request: - self._logger.debug('requests is about to resend') await self.send(request) return requests = self._pop_requests_of_container(msg_id) if requests: - self._logger.debug('container of requests is about to resend') await self.send(*requests) async def _handle_pong(self, msg_id, sequence, reader): @@ -362,6 +360,7 @@ class MtProtoSender: # TODO For now, simply ack msg_new.answer_msg_id # Relevant tdesktop source code: https://goo.gl/VvpCC6 + await self._send_acknowledge(msg_new.answer_msg_id) return True async def _handle_msg_new_detailed_info(self, msg_id, sequence, reader): @@ -370,6 +369,7 @@ class MtProtoSender: # TODO For now, simply ack msg_new.answer_msg_id # Relevant tdesktop source code: https://goo.gl/G7DPsR + await self._send_acknowledge(msg_new.answer_msg_id) return True async def _handle_new_session_created(self, msg_id, sequence, reader): diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index 82823105..acf37ba5 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -693,7 +693,7 @@ class TelegramBareClient: """ self.updates.process(await self(GetStateRequest())) - async def add_update_handler(self, handler): + def add_update_handler(self, handler): """Adds an update handler (a function which takes a TLObject, an update, as its parameter) and listens for updates""" self.updates.handlers.append(handler) diff --git a/telethon/tl/tl_message.py b/telethon/tl/tl_message.py index bb522325..bcb48279 100644 --- a/telethon/tl/tl_message.py +++ b/telethon/tl/tl_message.py @@ -1,5 +1,4 @@ import struct -import logging from . import TLObject, GzipPacked @@ -13,7 +12,6 @@ class TLMessage(TLObject): self.seq_no = session.generate_sequence(request.content_related) self.request = request self.container_msg_id = None - logging.getLogger(__name__).debug(self) def to_dict(self, recursive=True): return { From 25af22f1e7b7c7c372d7da240b8aee6d33a26652 Mon Sep 17 00:00:00 2001 From: Andrey Egorov Date: Tue, 14 Nov 2017 13:52:33 +0300 Subject: [PATCH 2/2] Bugfix in reconnection --- telethon/extensions/tcp_client.py | 75 +++++++++++++++++++++++++++---- telethon/telegram_bare_client.py | 2 +- 2 files changed, 67 insertions(+), 10 deletions(-) diff --git a/telethon/extensions/tcp_client.py b/telethon/extensions/tcp_client.py index eda4109f..5ad68c20 100644 --- a/telethon/extensions/tcp_client.py +++ b/telethon/extensions/tcp_client.py @@ -72,7 +72,7 @@ class TcpClient: raise def _get_connected(self): - return self._socket is not None + return self._socket is not None and self._socket.fileno() >= 0 connected = property(fget=_get_connected) @@ -90,11 +90,14 @@ class TcpClient: async def write(self, data): """Writes (sends) the specified bytes to the connected peer""" if self._socket is None: - raise ConnectionResetError() + self._raise_connection_reset() try: - await asyncio.wait_for(self._loop.sock_sendall(self._socket, data), - timeout=self.timeout, loop=self._loop) + await asyncio.wait_for( + self.sock_sendall(data), + timeout=self.timeout, + loop=self._loop + ) except asyncio.TimeoutError as e: raise TimeoutError() from e except BrokenPipeError: @@ -109,16 +112,18 @@ class TcpClient: """Reads (receives) a whole block of 'size bytes from the connected peer. """ - if self._socket is None: - raise ConnectionResetError() - # TODO Remove the timeout from this method, always use previous one with BufferedWriter(BytesIO(), buffer_size=size) as buffer: bytes_left = size while bytes_left != 0: try: - partial = await asyncio.wait_for(self._loop.sock_recv(self._socket, bytes_left), - timeout=self.timeout, loop=self._loop) + if self._socket is None: + self._raise_connection_reset() + partial = await asyncio.wait_for( + self.sock_recv(bytes_left), + timeout=self.timeout, + loop=self._loop + ) except asyncio.TimeoutError as e: raise TimeoutError() from e except OSError as e: @@ -140,3 +145,55 @@ class TcpClient: def _raise_connection_reset(self): self.close() # Connection reset -> flag as socket closed raise ConnectionResetError('The server has closed the connection.') + + # due to new https://github.com/python/cpython/pull/4386 + def sock_recv(self, n): + fut = self._loop.create_future() + self._sock_recv(fut, None, n) + return fut + + def _sock_recv(self, fut, registered_fd, n): + if registered_fd is not None: + self._loop.remove_reader(registered_fd) + if fut.cancelled(): + return + + try: + data = self._socket.recv(n) + except (BlockingIOError, InterruptedError): + fd = self._socket.fileno() + self._loop.add_reader(fd, self._sock_recv, fut, fd, n) + except Exception as exc: + fut.set_exception(exc) + else: + fut.set_result(data) + + def sock_sendall(self, data): + fut = self._loop.create_future() + if data: + self._sock_sendall(fut, None, data) + else: + fut.set_result(None) + return fut + + def _sock_sendall(self, fut, registered_fd, data): + if registered_fd: + self._loop.remove_writer(registered_fd) + if fut.cancelled(): + return + + try: + n = self._socket.send(data) + except (BlockingIOError, InterruptedError): + n = 0 + except Exception as exc: + fut.set_exception(exc) + return + + if n == len(data): + fut.set_result(None) + else: + if n: + data = data[n:] + fd = self._socket.fileno() + self._loop.add_writer(fd, self._sock_sendall, fut, fd, data) diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index fd42fbc9..a26d71b1 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -444,7 +444,7 @@ class TelegramBareClient: self._logger.debug('Server disconnected us. Reconnecting and ' 'resending request... (%d)' % retry) await self._reconnect() - if not self._sender.is_connected(): + if not self.is_connected(): await asyncio.sleep(retry + 1, loop=self._loop) return None