diff --git a/client/src/telethon/_impl/mtsender/sender.py b/client/src/telethon/_impl/mtsender/sender.py index d0cfde4f..20962125 100644 --- a/client/src/telethon/_impl/mtsender/sender.py +++ b/client/src/telethon/_impl/mtsender/sender.py @@ -191,6 +191,7 @@ class Sender: _next_ping: float _read_buffer: bytearray _step_lock: Lock + _step_event: Event _step_generator: AsyncGenerator[None, None] | None = None @classmethod @@ -222,6 +223,7 @@ class Sender: _next_ping=asyncio.get_running_loop().time() + PING_DELAY, _read_buffer=bytearray(), _step_lock=Lock(), + _step_event=Event(), ) async def disconnect(self) -> None: @@ -249,9 +251,7 @@ class Sender: async def _step_until_receive(self, rx: Future[bytes]) -> bytes: while True: - step = asyncio.create_task(self.do_step()) - await asyncio.wait((step, rx), return_when=FIRST_COMPLETED) - + await self.do_step() if rx.done(): return rx.result() @@ -262,8 +262,21 @@ class Sender: return result async def do_step(self) -> None: + if self._step_lock.locked(): + await self._step_event.wait() + return + + await self._do_step() + + async def _do_step(self) -> None: async with self._step_lock: - await anext(self.step) + try: + self._step_event.clear() + await anext(self.step) + except StopAsyncIteration: + pass + finally: + self._step_event.set() @property def step(self) -> AsyncGenerator[None, None]: @@ -272,7 +285,6 @@ class Sender: return self._step_generator async def _step(self) -> AsyncGenerator[None, None]: - recv_req = asyncio.create_task(self._request_event.wait()) recv_data = asyncio.create_task(self._step_recv()) send_data = asyncio.create_task(self._step_send()) @@ -288,8 +300,6 @@ class Sender: yield - if recv_req.done(): - recv_req = asyncio.create_task(self._request_event.wait()) if recv_data.done(): recv_data = asyncio.create_task(self._step_recv()) if send_data.done():