diff --git a/.gitignore b/.gitignore index 62bd6e89..930ca05c 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ __pycache__/ .pytest_cache/ .mypy_cache/ .ruff_cache/ +.venv/ dist/ dist-doc/ build/ diff --git a/client/pyproject.toml b/client/pyproject.toml index a61df2e3..edee4903 100644 --- a/client/pyproject.toml +++ b/client/pyproject.toml @@ -7,7 +7,7 @@ authors = [ ] readme = "README.md" license = {file = "LICENSE"} -requires-python = ">=3.8" +requires-python = ">=3.9" keywords = ["telegram", "chat", "messaging", "mtproto", "telethon"] classifiers = [ "Development Status :: 5 - Production/Stable", diff --git a/client/src/telethon/_impl/client/client/auth.py b/client/src/telethon/_impl/client/client/auth.py index 0a33d35c..220a0081 100644 --- a/client/src/telethon/_impl/client/client/auth.py +++ b/client/src/telethon/_impl/client/client/auth.py @@ -61,7 +61,7 @@ async def handle_migrate(client: Client, dc_id: Optional[int]) -> None: client._config, client._session.dcs, DataCenter(id=dc_id) ) - async with client._sender.lock: + async with client._sender._lock: old_sender = client._sender client._sender = sender await old_sender.disconnect() diff --git a/client/src/telethon/_impl/client/client/net.py b/client/src/telethon/_impl/client/client/net.py index d7f6e31b..aa8c7904 100644 --- a/client/src/telethon/_impl/client/client/net.py +++ b/client/src/telethon/_impl/client/client/net.py @@ -265,7 +265,7 @@ async def invoke_request( async def step_sender(client: Client) -> None: try: assert client._sender - updates = await client._sender.step() + updates = await client._sender.get_updates() except ConnectionError: if client.connected: raise diff --git a/client/src/telethon/_impl/mtsender/sender.py b/client/src/telethon/_impl/mtsender/sender.py index 5bbb9e4c..94850029 100644 --- a/client/src/telethon/_impl/mtsender/sender.py +++ b/client/src/telethon/_impl/mtsender/sender.py @@ -162,18 +162,20 @@ class Request(Generic[Return]): class Sender: dc_id: int addr: str - lock: Lock _logger: logging.Logger + _lock: Lock _reader: AsyncReader _writer: AsyncWriter _transport: Transport _mtp: Mtp _mtp_buffer: bytearray + _updates: list[Updates] _requests: list[Request[object]] _request_event: Event _next_ping: float _read_buffer: bytearray _write_drain_pending: bool + _step_counter: int @classmethod async def connect( @@ -192,18 +194,20 @@ class Sender: return cls( dc_id=dc_id, addr=addr, - lock=Lock(), _logger=base_logger.getChild("mtsender"), + _lock=Lock(), _reader=reader, _writer=writer, _transport=transport, _mtp=mtp, _mtp_buffer=bytearray(), + _updates=[], _requests=[], _request_event=Event(), _next_ping=asyncio.get_running_loop().time() + PING_DELAY, _read_buffer=bytearray(), _write_drain_pending=False, + _step_counter=0, ) async def disconnect(self) -> None: @@ -230,15 +234,26 @@ class Sender: async def _step_until_receive(self, rx: Future[bytes]) -> bytes: while True: - await self.step() + await self._step() if rx.done(): return rx.result() - async def step(self) -> list[Updates]: - async with self.lock: - return await self._step() + async def get_updates(self) -> list[Updates]: + await self._step() + updates, self._updates = self._updates, [] + return updates - async def _step(self) -> list[Updates]: + async def _step(self) -> None: + ticket_number = self._step_counter + + async with self._lock: + if self._step_counter == ticket_number: + # We're the one to drive IO. + self._step_counter += 1 + await self._do_step() + # else: # A different task drive IO. + + async def _do_step(self) -> list[Updates]: self._try_fill_write() recv_req = asyncio.create_task(self._request_event.wait()) @@ -268,6 +283,7 @@ class Sender: async def _do_send(self) -> None: if self._write_drain_pending: + self._on_net_write() await self._writer.drain() self._write_drain_pending = False else: @@ -310,7 +326,7 @@ class Sender: break else: del self._read_buffer[:n] - self._process_mtp_buffer(updates) + self._process_mtp_buffer() return updates @@ -330,12 +346,12 @@ class Sender: ) self._next_ping = asyncio.get_running_loop().time() + PING_DELAY - def _process_mtp_buffer(self, updates: list[Updates]) -> None: + def _process_mtp_buffer(self) -> None: results = self._mtp.deserialize(self._mtp_buffer) for result in results: if isinstance(result, Update): - self._process_update(updates, result.body) + self._process_update(result.body) elif isinstance(result, RpcResult): self._process_result(result) elif isinstance(result, RpcError): @@ -343,11 +359,9 @@ class Sender: else: self._process_bad_message(result) - def _process_update( - self, updates: list[Updates], update: bytes | bytearray | memoryview - ) -> None: + def _process_update(self, update: bytes | bytearray | memoryview) -> None: try: - updates.append(Updates.from_bytes(update)) + self._updates.append(Updates.from_bytes(update)) except ValueError: cid = struct.unpack_from("I", update)[0] alt_classes: tuple[Type[Serializable], ...] = ( @@ -367,7 +381,7 @@ class Sender: AffectedMessages, ), ) - updates.append( + self._updates.append( UpdateShort( update=UpdateDeleteMessages( messages=[], diff --git a/generator/pyproject.toml b/generator/pyproject.toml index ec85efaa..4c564b9f 100644 --- a/generator/pyproject.toml +++ b/generator/pyproject.toml @@ -7,7 +7,7 @@ authors = [ ] readme = "README.md" license = {file = "LICENSE"} -requires-python = ">=3.8" +requires-python = ">=3.9" keywords = ["telegram", "parser", "codegen", "telethon"] classifiers = [ "Development Status :: 5 - Production/Stable", diff --git a/generator/tests/parameter_test.py b/generator/tests/parameter_test.py index 6aeacae2..f09b3833 100644 --- a/generator/tests/parameter_test.py +++ b/generator/tests/parameter_test.py @@ -6,7 +6,7 @@ from telethon_generator.tl_parser import ( NormalParameter, Parameter, Type, - TypeDefNotImplemented, + TypeDefNotImplementedError, ) @@ -39,7 +39,7 @@ def test_bad_generics(param: str) -> None: def test_type_def_param() -> None: - with raises(TypeDefNotImplemented) as e: + with raises(TypeDefNotImplementedError) as e: Parameter.from_str("{a:Type}") e.match("typedef not implemented: a")