From 25af22f1e7b7c7c372d7da240b8aee6d33a26652 Mon Sep 17 00:00:00 2001 From: Andrey Egorov Date: Tue, 14 Nov 2017 13:52:33 +0300 Subject: [PATCH] Bugfix in reconnection --- telethon/extensions/tcp_client.py | 75 +++++++++++++++++++++++++++---- telethon/telegram_bare_client.py | 2 +- 2 files changed, 67 insertions(+), 10 deletions(-) diff --git a/telethon/extensions/tcp_client.py b/telethon/extensions/tcp_client.py index eda4109f..5ad68c20 100644 --- a/telethon/extensions/tcp_client.py +++ b/telethon/extensions/tcp_client.py @@ -72,7 +72,7 @@ class TcpClient: raise def _get_connected(self): - return self._socket is not None + return self._socket is not None and self._socket.fileno() >= 0 connected = property(fget=_get_connected) @@ -90,11 +90,14 @@ class TcpClient: async def write(self, data): """Writes (sends) the specified bytes to the connected peer""" if self._socket is None: - raise ConnectionResetError() + self._raise_connection_reset() try: - await asyncio.wait_for(self._loop.sock_sendall(self._socket, data), - timeout=self.timeout, loop=self._loop) + await asyncio.wait_for( + self.sock_sendall(data), + timeout=self.timeout, + loop=self._loop + ) except asyncio.TimeoutError as e: raise TimeoutError() from e except BrokenPipeError: @@ -109,16 +112,18 @@ class TcpClient: """Reads (receives) a whole block of 'size bytes from the connected peer. """ - if self._socket is None: - raise ConnectionResetError() - # TODO Remove the timeout from this method, always use previous one with BufferedWriter(BytesIO(), buffer_size=size) as buffer: bytes_left = size while bytes_left != 0: try: - partial = await asyncio.wait_for(self._loop.sock_recv(self._socket, bytes_left), - timeout=self.timeout, loop=self._loop) + if self._socket is None: + self._raise_connection_reset() + partial = await asyncio.wait_for( + self.sock_recv(bytes_left), + timeout=self.timeout, + loop=self._loop + ) except asyncio.TimeoutError as e: raise TimeoutError() from e except OSError as e: @@ -140,3 +145,55 @@ class TcpClient: def _raise_connection_reset(self): self.close() # Connection reset -> flag as socket closed raise ConnectionResetError('The server has closed the connection.') + + # due to new https://github.com/python/cpython/pull/4386 + def sock_recv(self, n): + fut = self._loop.create_future() + self._sock_recv(fut, None, n) + return fut + + def _sock_recv(self, fut, registered_fd, n): + if registered_fd is not None: + self._loop.remove_reader(registered_fd) + if fut.cancelled(): + return + + try: + data = self._socket.recv(n) + except (BlockingIOError, InterruptedError): + fd = self._socket.fileno() + self._loop.add_reader(fd, self._sock_recv, fut, fd, n) + except Exception as exc: + fut.set_exception(exc) + else: + fut.set_result(data) + + def sock_sendall(self, data): + fut = self._loop.create_future() + if data: + self._sock_sendall(fut, None, data) + else: + fut.set_result(None) + return fut + + def _sock_sendall(self, fut, registered_fd, data): + if registered_fd: + self._loop.remove_writer(registered_fd) + if fut.cancelled(): + return + + try: + n = self._socket.send(data) + except (BlockingIOError, InterruptedError): + n = 0 + except Exception as exc: + fut.set_exception(exc) + return + + if n == len(data): + fut.set_result(None) + else: + if n: + data = data[n:] + fd = self._socket.fileno() + self._loop.add_writer(fd, self._sock_sendall, fut, fd, data) diff --git a/telethon/telegram_bare_client.py b/telethon/telegram_bare_client.py index fd42fbc9..a26d71b1 100644 --- a/telethon/telegram_bare_client.py +++ b/telethon/telegram_bare_client.py @@ -444,7 +444,7 @@ class TelegramBareClient: self._logger.debug('Server disconnected us. Reconnecting and ' 'resending request... (%d)' % retry) await self._reconnect() - if not self._sender.is_connected(): + if not self.is_connected(): await asyncio.sleep(retry + 1, loop=self._loop) return None