From 106e7bd8bb8eaba535f72609115a0eb0c847943e Mon Sep 17 00:00:00 2001 From: Jahongir Qurbonov Date: Wed, 16 Oct 2024 20:43:46 +0500 Subject: [PATCH] Remove async next --- client/src/telethon/_impl/mtsender/sender.py | 87 ++++++-------------- 1 file changed, 25 insertions(+), 62 deletions(-) diff --git a/client/src/telethon/_impl/mtsender/sender.py b/client/src/telethon/_impl/mtsender/sender.py index fd21d376..fcf57179 100644 --- a/client/src/telethon/_impl/mtsender/sender.py +++ b/client/src/telethon/_impl/mtsender/sender.py @@ -1,18 +1,15 @@ import asyncio import logging import struct -import sys import time from abc import ABC -from asyncio import FIRST_COMPLETED, Event, Future, Lock, Task -from collections.abc import AsyncGenerator, Iterator +from asyncio import Event, Future, Lock +from collections.abc import Iterator from dataclasses import dataclass from typing import ( Generic, - Literal, Optional, Protocol, - Set, Type, TypeVar, ) @@ -40,14 +37,6 @@ from ..tl.mtproto.functions import ping_delay_disconnect from ..tl.types import UpdateDeleteMessages, UpdateShort from ..tl.types.messages import AffectedFoundMessages, AffectedHistory, AffectedMessages -if sys.version_info < (3, 10): - Y = TypeVar("Y") - S = TypeVar("S") - - async def anext(it: AsyncGenerator[Y, S]) -> Y: - return await it.__anext__() - - MAXIMUM_DATA = (1024 * 1024) + (8 * 1024) PING_DELAY = 60 @@ -189,9 +178,9 @@ class Sender: _requests: list[Request[object]] _request_event: Event _read_buffer: bytearray - _step_lock: Lock + _recv_lock: Lock + _send_lock: Lock _step_event: Event - _step_generator: AsyncGenerator[None, None] | None = None @classmethod async def connect( @@ -220,12 +209,12 @@ class Sender: _requests=[], _request_event=Event(), _read_buffer=bytearray(), - _step_lock=Lock(), + _recv_lock=Lock(), + _send_lock=Lock(), _step_event=Event(), ) async def disconnect(self) -> None: - await self.step.aclose() self._writer.close() await self._writer.wait_closed() @@ -249,65 +238,39 @@ class Sender: async def _step_until_receive(self, rx: Future[bytes]) -> bytes: while True: - await self.do_step() + await self.step() if rx.done(): return rx.result() async def get_updates(self) -> list[Updates]: - await self.do_step() + await self.step() result = self._updates.copy() self._updates.clear() return result - async def do_step(self) -> None: - if self._step_lock.locked(): - await self._step_event.wait() - return + async def step(self) -> None: + self._step_event.clear() - await self._do_step() + if not self._recv_lock.locked(): + asyncio.create_task(self.step_recv()) + if not self._send_lock.locked(): + asyncio.create_task(self.step_send()) - async def _do_step(self) -> None: - async with self._step_lock: + await self._step_event.wait() + + async def step_recv(self) -> None: + async with self._recv_lock: try: - self._step_event.clear() - await anext(self.step) - except StopAsyncIteration: - pass + await self._step_recv() finally: self._step_event.set() - @property - def step(self) -> AsyncGenerator[None, None]: - if self._step_generator is None: - self._step_generator = self._step() - return self._step_generator - - async def _step(self) -> AsyncGenerator[None, None]: - recv_data = asyncio.create_task(self._step_recv()) - send_data = asyncio.create_task(self._step_send()) - - pending: Set[Task[Literal[True] | None]] = set() - - try: - while True: - _, pending = await asyncio.wait( - (recv_data, send_data), return_when=FIRST_COMPLETED - ) # pyright: ignore [reportAssignmentType] - - yield - - if recv_data.done(): - recv_data = asyncio.create_task(self._step_recv()) - if send_data.done(): - send_data = asyncio.create_task(self._step_send()) - finally: - await self._try_cancel_tasks(pending) - - async def _try_cancel_tasks(self, pending: set[Task]) -> None: - if pending: - for task in pending: - task.cancel() - await asyncio.wait(pending) + async def step_send(self) -> None: + async with self._send_lock: + try: + await self._step_send() + finally: + self._step_event.set() async def _step_recv(self) -> None: try: