From a50fc8a375f48ec5cb51149247d25b6e59dc7444 Mon Sep 17 00:00:00 2001 From: Jahongir Qurbonov Date: Tue, 22 Oct 2024 14:43:50 +0500 Subject: [PATCH] Fix writing in sender --- client/src/telethon/_impl/mtsender/sender.py | 19 ++++++++----------- 1 file changed, 8 insertions(+), 11 deletions(-) diff --git a/client/src/telethon/_impl/mtsender/sender.py b/client/src/telethon/_impl/mtsender/sender.py index 93edac25..5978dfc0 100644 --- a/client/src/telethon/_impl/mtsender/sender.py +++ b/client/src/telethon/_impl/mtsender/sender.py @@ -204,10 +204,11 @@ class Sender: ticket_number = self._step_counter async with self.lock: + self._try_fill_write() if self._step_counter == ticket_number: # We're the one to drive IO. self._step_counter += 1 - await self._step() + await self._wait_response() # else: different task drove IO. def pop_updates(self) -> list[Updates]: @@ -215,15 +216,6 @@ class Sender: self._updates.clear() return updates - async def _step(self) -> None: - assert self._protocol - if self._protocol.is_closed(): - raise ConnectionResetError - - self._response_state.clear() - self._try_fill_write() - await self._wait_response() - def _try_fill_write(self) -> None: for request in self._requests: if isinstance(request.state, NotSerialized): @@ -242,6 +234,12 @@ class Sender: request.state = Sent(request.state.msg_id, container_msg_id) async def _wait_response(self) -> None: + assert self._protocol + if self._protocol.is_closed(): + raise ConnectionResetError + + self._response_state.clear() + try: async with asyncio.timeout(PING_DELAY): await self._response_state.wait() @@ -264,7 +262,6 @@ class Sender: self._read_buffer += bytes(n) self._read_buffer_head -= n self._process_mtp_buffer() - finally: self._response_state.set() def _on_conn_closed(self) -> None: