Add step event

This commit is contained in:
Jahongir Qurbonov 2024-10-17 19:52:14 +05:00
parent 3264d07e48
commit f5838961dd

View File

@ -173,6 +173,7 @@ class Sender:
_request_event: Event _request_event: Event
_read_buffer: bytearray _read_buffer: bytearray
_write_drain_pending: bool _write_drain_pending: bool
_step_event: Event
_recv_task: Optional[Task[bytes]] = None _recv_task: Optional[Task[bytes]] = None
_send_task: Optional[Task[None]] = None _send_task: Optional[Task[None]] = None
@ -204,6 +205,7 @@ class Sender:
_request_event=Event(), _request_event=Event(),
_read_buffer=bytearray(), _read_buffer=bytearray(),
_write_drain_pending=False, _write_drain_pending=False,
_step_event=Event(),
) )
sender._recv_task = asyncio.create_task(sender._do_recv()) sender._recv_task = asyncio.create_task(sender._do_recv())
@ -265,6 +267,7 @@ class Sender:
if self._recv_task is None or self._send_task is None: if self._recv_task is None or self._send_task is None:
return return
self._step_event.clear()
self._try_fill_write() self._try_fill_write()
await asyncio.wait( await asyncio.wait(
@ -285,16 +288,24 @@ class Sender:
self._on_net_write() self._on_net_write()
self._send_task = asyncio.create_task(self._do_send()) self._send_task = asyncio.create_task(self._do_send())
await self._step_event.wait()
async def _do_recv(self) -> bytes: async def _do_recv(self) -> bytes:
async with asyncio.timeout(PING_DELAY): try:
return await self._reader.read(MAXIMUM_DATA) async with asyncio.timeout(PING_DELAY):
return await self._reader.read(MAXIMUM_DATA)
finally:
self._step_event.set()
async def _do_send(self) -> None: async def _do_send(self) -> None:
if self._write_drain_pending: try:
await self._writer.drain() if self._write_drain_pending:
self._write_drain_pending = False await self._writer.drain()
else: self._write_drain_pending = False
await self._request_event.wait() else:
await self._request_event.wait()
finally:
self._step_event.set()
def _try_fill_write(self) -> None: def _try_fill_write(self) -> None:
if self._write_drain_pending: if self._write_drain_pending: