mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2025-02-10 16:40:57 +03:00
Use put_nowait and avoid double await
This commit is contained in:
parent
485ce5ca3b
commit
acd6025731
|
@ -142,7 +142,7 @@ class MTProtoSender:
|
||||||
|
|
||||||
__log__.info('Disconnection from {} complete!'.format(self._ip))
|
__log__.info('Disconnection from {} complete!'.format(self._ip))
|
||||||
|
|
||||||
async def send(self, request, ordered=False):
|
def send(self, request, ordered=False):
|
||||||
"""
|
"""
|
||||||
This method enqueues the given request to be sent.
|
This method enqueues the given request to be sent.
|
||||||
|
|
||||||
|
@ -154,7 +154,7 @@ class MTProtoSender:
|
||||||
|
|
||||||
async def method():
|
async def method():
|
||||||
# Sending (enqueued for the send loop)
|
# Sending (enqueued for the send loop)
|
||||||
future = await sender.send(request)
|
future = sender.send(request)
|
||||||
# Receiving (waits for the receive loop to read the result)
|
# Receiving (waits for the receive loop to read the result)
|
||||||
result = await future
|
result = await future
|
||||||
|
|
||||||
|
@ -167,23 +167,20 @@ class MTProtoSender:
|
||||||
Since the receiving part is "built in" the future, it's
|
Since the receiving part is "built in" the future, it's
|
||||||
impossible to await receive a result that was never sent.
|
impossible to await receive a result that was never sent.
|
||||||
"""
|
"""
|
||||||
# TODO Perhaps this method should be synchronous and just return
|
|
||||||
# a `Future` that you need to further ``await`` instead of the
|
|
||||||
# currently double ``await (await send())``?
|
|
||||||
if utils.is_list_like(request):
|
if utils.is_list_like(request):
|
||||||
result = []
|
result = []
|
||||||
after = None
|
after = None
|
||||||
for r in request:
|
for r in request:
|
||||||
message = self.state.create_message(r, after=after)
|
message = self.state.create_message(r, after=after)
|
||||||
self._pending_messages[message.msg_id] = message
|
self._pending_messages[message.msg_id] = message
|
||||||
await self._send_queue.put(message)
|
self._send_queue.put_nowait(message)
|
||||||
result.append(message.future)
|
result.append(message.future)
|
||||||
after = ordered and message
|
after = ordered and message
|
||||||
return result
|
return result
|
||||||
else:
|
else:
|
||||||
message = self.state.create_message(request)
|
message = self.state.create_message(request)
|
||||||
self._pending_messages[message.msg_id] = message
|
self._pending_messages[message.msg_id] = message
|
||||||
await self._send_queue.put(message)
|
self._send_queue.put_nowait(message)
|
||||||
return message.future
|
return message.future
|
||||||
|
|
||||||
# Private methods
|
# Private methods
|
||||||
|
@ -264,7 +261,7 @@ class MTProtoSender:
|
||||||
"""
|
"""
|
||||||
while self._user_connected and not self._reconnecting:
|
while self._user_connected and not self._reconnecting:
|
||||||
if self._pending_ack:
|
if self._pending_ack:
|
||||||
await self._send_queue.put(self.state.create_message(
|
self._send_queue.put_nowait(self.state.create_message(
|
||||||
MsgsAck(list(self._pending_ack))
|
MsgsAck(list(self._pending_ack))
|
||||||
))
|
))
|
||||||
self._pending_ack.clear()
|
self._pending_ack.clear()
|
||||||
|
@ -299,7 +296,7 @@ class MTProtoSender:
|
||||||
if m.future.cancelled():
|
if m.future.cancelled():
|
||||||
self._pending_messages.pop(m.msg_id, None)
|
self._pending_messages.pop(m.msg_id, None)
|
||||||
else:
|
else:
|
||||||
await self._send_queue.put(m)
|
self._send_queue.put_nowait(m)
|
||||||
|
|
||||||
__log__.debug('Outgoing messages sent!')
|
__log__.debug('Outgoing messages sent!')
|
||||||
|
|
||||||
|
@ -387,7 +384,7 @@ class MTProtoSender:
|
||||||
if rpc_result.error:
|
if rpc_result.error:
|
||||||
# TODO Report errors if possible/enabled
|
# TODO Report errors if possible/enabled
|
||||||
error = rpc_message_to_error(rpc_result.error)
|
error = rpc_message_to_error(rpc_result.error)
|
||||||
await self._send_queue.put(self.state.create_message(
|
self._send_queue.put_nowait(self.state.create_message(
|
||||||
MsgsAck([message.msg_id])
|
MsgsAck([message.msg_id])
|
||||||
))
|
))
|
||||||
|
|
||||||
|
@ -459,7 +456,7 @@ class MTProtoSender:
|
||||||
__log__.debug('Handling bad salt')
|
__log__.debug('Handling bad salt')
|
||||||
bad_salt = message.obj
|
bad_salt = message.obj
|
||||||
self.state.salt = bad_salt.new_server_salt
|
self.state.salt = bad_salt.new_server_salt
|
||||||
await self._send_queue.put(self._pending_messages[bad_salt.bad_msg_id])
|
self._send_queue.put_nowait(self._pending_messages[bad_salt.bad_msg_id])
|
||||||
|
|
||||||
async def _handle_bad_notification(self, message):
|
async def _handle_bad_notification(self, message):
|
||||||
"""
|
"""
|
||||||
|
@ -489,7 +486,7 @@ class MTProtoSender:
|
||||||
return
|
return
|
||||||
|
|
||||||
# Messages are to be re-sent once we've corrected the issue
|
# Messages are to be re-sent once we've corrected the issue
|
||||||
await self._send_queue.put(self._pending_messages[bad_msg.bad_msg_id])
|
self._send_queue.put_nowait(self._pending_messages[bad_msg.bad_msg_id])
|
||||||
|
|
||||||
async def _handle_detailed_info(self, message):
|
async def _handle_detailed_info(self, message):
|
||||||
"""
|
"""
|
||||||
|
@ -603,7 +600,7 @@ class _ContainerQueue(asyncio.Queue):
|
||||||
while not self.empty():
|
while not self.empty():
|
||||||
item = self.get_nowait()
|
item = self.get_nowait()
|
||||||
if isinstance(item.obj, MessageContainer):
|
if isinstance(item.obj, MessageContainer):
|
||||||
await self.put(item)
|
self.put_nowait(item)
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
result.append(item)
|
result.append(item)
|
||||||
|
|
Loading…
Reference in New Issue
Block a user