Remove async next

This commit is contained in:
Jahongir Qurbonov 2024-10-16 20:43:46 +05:00
parent 2931eee561
commit 106e7bd8bb

View File

@ -1,18 +1,15 @@
import asyncio import asyncio
import logging import logging
import struct import struct
import sys
import time import time
from abc import ABC from abc import ABC
from asyncio import FIRST_COMPLETED, Event, Future, Lock, Task from asyncio import Event, Future, Lock
from collections.abc import AsyncGenerator, Iterator from collections.abc import Iterator
from dataclasses import dataclass from dataclasses import dataclass
from typing import ( from typing import (
Generic, Generic,
Literal,
Optional, Optional,
Protocol, Protocol,
Set,
Type, Type,
TypeVar, TypeVar,
) )
@ -40,14 +37,6 @@ from ..tl.mtproto.functions import ping_delay_disconnect
from ..tl.types import UpdateDeleteMessages, UpdateShort from ..tl.types import UpdateDeleteMessages, UpdateShort
from ..tl.types.messages import AffectedFoundMessages, AffectedHistory, AffectedMessages from ..tl.types.messages import AffectedFoundMessages, AffectedHistory, AffectedMessages
if sys.version_info < (3, 10):
Y = TypeVar("Y")
S = TypeVar("S")
async def anext(it: AsyncGenerator[Y, S]) -> Y:
return await it.__anext__()
MAXIMUM_DATA = (1024 * 1024) + (8 * 1024) MAXIMUM_DATA = (1024 * 1024) + (8 * 1024)
PING_DELAY = 60 PING_DELAY = 60
@ -189,9 +178,9 @@ class Sender:
_requests: list[Request[object]] _requests: list[Request[object]]
_request_event: Event _request_event: Event
_read_buffer: bytearray _read_buffer: bytearray
_step_lock: Lock _recv_lock: Lock
_send_lock: Lock
_step_event: Event _step_event: Event
_step_generator: AsyncGenerator[None, None] | None = None
@classmethod @classmethod
async def connect( async def connect(
@ -220,12 +209,12 @@ class Sender:
_requests=[], _requests=[],
_request_event=Event(), _request_event=Event(),
_read_buffer=bytearray(), _read_buffer=bytearray(),
_step_lock=Lock(), _recv_lock=Lock(),
_send_lock=Lock(),
_step_event=Event(), _step_event=Event(),
) )
async def disconnect(self) -> None: async def disconnect(self) -> None:
await self.step.aclose()
self._writer.close() self._writer.close()
await self._writer.wait_closed() await self._writer.wait_closed()
@ -249,65 +238,39 @@ class Sender:
async def _step_until_receive(self, rx: Future[bytes]) -> bytes: async def _step_until_receive(self, rx: Future[bytes]) -> bytes:
while True: while True:
await self.do_step() await self.step()
if rx.done(): if rx.done():
return rx.result() return rx.result()
async def get_updates(self) -> list[Updates]: async def get_updates(self) -> list[Updates]:
await self.do_step() await self.step()
result = self._updates.copy() result = self._updates.copy()
self._updates.clear() self._updates.clear()
return result return result
async def do_step(self) -> None: async def step(self) -> None:
if self._step_lock.locked(): self._step_event.clear()
await self._step_event.wait()
return
await self._do_step() if not self._recv_lock.locked():
asyncio.create_task(self.step_recv())
if not self._send_lock.locked():
asyncio.create_task(self.step_send())
async def _do_step(self) -> None: await self._step_event.wait()
async with self._step_lock:
async def step_recv(self) -> None:
async with self._recv_lock:
try: try:
self._step_event.clear() await self._step_recv()
await anext(self.step)
except StopAsyncIteration:
pass
finally: finally:
self._step_event.set() self._step_event.set()
@property async def step_send(self) -> None:
def step(self) -> AsyncGenerator[None, None]: async with self._send_lock:
if self._step_generator is None: try:
self._step_generator = self._step() await self._step_send()
return self._step_generator finally:
self._step_event.set()
async def _step(self) -> AsyncGenerator[None, None]:
recv_data = asyncio.create_task(self._step_recv())
send_data = asyncio.create_task(self._step_send())
pending: Set[Task[Literal[True] | None]] = set()
try:
while True:
_, pending = await asyncio.wait(
(recv_data, send_data), return_when=FIRST_COMPLETED
) # pyright: ignore [reportAssignmentType]
yield
if recv_data.done():
recv_data = asyncio.create_task(self._step_recv())
if send_data.done():
send_data = asyncio.create_task(self._step_send())
finally:
await self._try_cancel_tasks(pending)
async def _try_cancel_tasks(self, pending: set[Task]) -> None:
if pending:
for task in pending:
task.cancel()
await asyncio.wait(pending)
async def _step_recv(self) -> None: async def _step_recv(self) -> None:
try: try: