Stop using asyncio.get_event_loop()

It is deprecated in newer Python versions.
Closes #4013.
This commit is contained in:
Lonami Exo 2023-01-11 21:02:29 +01:00
parent fb97a8aa87
commit 83bafa25e3
14 changed files with 82 additions and 99 deletions

View File

@ -40,22 +40,22 @@ because tasks are smaller than threads, which are smaller than processes.
What are asyncio basics? What are asyncio basics?
======================== ========================
The code samples below assume that you have Python 3.7 or greater installed.
.. code-block:: python .. code-block:: python
# First we need the asyncio library # First we need the asyncio library
import asyncio import asyncio
# Then we need a loop to work with
loop = asyncio.get_event_loop()
# We also need something to run # We also need something to run
async def main(): async def main():
for char in 'Hello, world!\n': for char in 'Hello, world!\n':
print(char, end='', flush=True) print(char, end='', flush=True)
await asyncio.sleep(0.2) await asyncio.sleep(0.2)
# Then, we need to run the loop with a task # Then, we can create a new asyncio loop and use it to run our coroutine.
loop.run_until_complete(main()) # The creation and tear-down of the loop is hidden away from us.
asyncio.run(main())
What does telethon.sync do? What does telethon.sync do?
@ -101,7 +101,7 @@ Instead of this:
# or, using asyncio's default loop (it's the same) # or, using asyncio's default loop (it's the same)
import asyncio import asyncio
loop = asyncio.get_event_loop() # == client.loop loop = asyncio.get_running_loop() # == client.loop
me = loop.run_until_complete(client.get_me()) me = loop.run_until_complete(client.get_me())
print(me.username) print(me.username)
@ -158,13 +158,10 @@ loops or use ``async with``:
print(message.sender.username) print(message.sender.username)
loop = asyncio.get_event_loop() asyncio.run(main())
# ^ this assigns the default event loop from the main thread to a variable # ^ this will create a new asyncio loop behind the scenes and tear it down
# once the function returns. It will run the loop untiil main finishes.
loop.run_until_complete(main()) # You should only use this function if there is no other loop running.
# ^ this runs the *entire* loop until the main() function finishes.
# While the main() function does not finish, the loop will be running.
# While the loop is running, you can't run it again.
The ``await`` keyword blocks the *current* task, and the loop can run The ``await`` keyword blocks the *current* task, and the loop can run
@ -184,14 +181,14 @@ concurrently:
await asyncio.sleep(delay) # await tells the loop this task is "busy" await asyncio.sleep(delay) # await tells the loop this task is "busy"
print('world') # eventually the loop finishes all tasks print('world') # eventually the loop finishes all tasks
loop = asyncio.get_event_loop() # get the default loop for the main thread async def main():
loop.create_task(world(2)) # create the world task, passing 2 as delay asyncio.create_task(world(2)) # create the world task, passing 2 as delay
loop.create_task(hello(delay=1)) # another task, but with delay 1 asyncio.create_task(hello(delay=1)) # another task, but with delay 1
await asyncio.sleep(3) # wait for three seconds before exiting
try: try:
# run the event loop forever; ctrl+c to stop it # create a new temporary asyncio loop and use it to run main
# we could also run the loop for three seconds: asyncio.run(main())
# loop.run_until_complete(asyncio.sleep(3))
loop.run_forever()
except KeyboardInterrupt: except KeyboardInterrupt:
pass pass
@ -209,10 +206,15 @@ The same example, but without the comment noise:
await asyncio.sleep(delay) await asyncio.sleep(delay)
print('world') print('world')
loop = asyncio.get_event_loop() async def main():
loop.create_task(world(2)) asyncio.create_task(world(2))
loop.create_task(hello(1)) asyncio.create_task(hello(delay=1))
loop.run_until_complete(asyncio.sleep(3)) await asyncio.sleep(3)
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
Can I use threads? Can I use threads?
@ -250,9 +252,9 @@ You may have seen this error:
RuntimeError: There is no current event loop in thread 'Thread-1'. RuntimeError: There is no current event loop in thread 'Thread-1'.
It just means you didn't create a loop for that thread, and if you don't It just means you didn't create a loop for that thread. Please refer to
pass a loop when creating the client, it uses ``asyncio.get_event_loop()``, the ``asyncio`` documentation to correctly learn how to set the event loop
which only works in the main thread. for non-main threads.
client.run_until_disconnected() blocks! client.run_until_disconnected() blocks!

View File

@ -191,8 +191,7 @@ so the code above and the following are equivalent:
async def main(): async def main():
await client.disconnected await client.disconnected
loop = asyncio.get_event_loop() asyncio.run(main())
loop.run_until_complete(main())
You could also run `client.disconnected You could also run `client.disconnected

View File

@ -2088,7 +2088,7 @@ the scenes! This means you're now able to do both of the following:
async def main(): async def main():
await client.send_message('me', 'Hello!') await client.send_message('me', 'Hello!')
asyncio.get_event_loop().run_until_complete(main()) asyncio.run(main())
# ...can be rewritten as: # ...can be rewritten as:

View File

@ -161,19 +161,17 @@ just get rid of ``telethon.sync`` and work inside an ``async def``:
await client.run_until_disconnected() await client.run_until_disconnected()
loop = asyncio.get_event_loop() asyncio.run(main())
loop.run_until_complete(main())
The ``telethon.sync`` magic module simply wraps every method behind: The ``telethon.sync`` magic module essentially wraps every method behind:
.. code-block:: python .. code-block:: python
loop = asyncio.get_event_loop() asyncio.run(main())
loop.run_until_complete(main())
So that you don't have to write it yourself every time. That's the With some other tricks, so that you don't have to write it yourself every time.
overhead you pay if you import it, and what you save if you don't. That's the overhead you pay if you import it, and what you save if you don't.
Learning Learning
======== ========

View File

@ -192,7 +192,7 @@ class TelegramBaseClient(abc.ABC):
Defaults to `lang_code`. Defaults to `lang_code`.
loop (`asyncio.AbstractEventLoop`, optional): loop (`asyncio.AbstractEventLoop`, optional):
Asyncio event loop to use. Defaults to `asyncio.get_event_loop()`. Asyncio event loop to use. Defaults to `asyncio.get_running_loop()`.
This argument is ignored. This argument is ignored.
base_logger (`str` | `logging.Logger`, optional): base_logger (`str` | `logging.Logger`, optional):
@ -470,7 +470,7 @@ class TelegramBaseClient(abc.ABC):
# Join the task (wait for it to complete) # Join the task (wait for it to complete)
await task await task
""" """
return asyncio.get_event_loop() return helpers.get_running_loop()
@property @property
def disconnected(self: 'TelegramClient') -> asyncio.Future: def disconnected(self: 'TelegramClient') -> asyncio.Future:

View File

@ -4,7 +4,7 @@ import re
import asyncio import asyncio
from .common import EventBuilder, EventCommon, name_inner_event from .common import EventBuilder, EventCommon, name_inner_event
from .. import utils from .. import utils, helpers
from ..tl import types, functions, custom from ..tl import types, functions, custom
from ..tl.custom.sendergetter import SenderGetter from ..tl.custom.sendergetter import SenderGetter
@ -242,6 +242,6 @@ class InlineQuery(EventBuilder):
if inspect.isawaitable(obj): if inspect.isawaitable(obj):
return asyncio.ensure_future(obj) return asyncio.ensure_future(obj)
f = asyncio.get_event_loop().create_future() f = helpers.get_running_loop().create_future()
f.set_result(obj) f.set_result(obj)
return f return f

View File

@ -426,7 +426,10 @@ class _FileStream(io.IOBase):
# endregion # endregion
def get_running_loop(): def get_running_loop():
if sys.version_info[:2] <= (3, 6): if sys.version_info >= (3, 7):
return asyncio._get_running_loop() try:
return asyncio.get_running_loop()
return asyncio.get_running_loop() except RuntimeError:
return asyncio.get_event_loop_policy().get_event_loop()
else:
return asyncio.get_event_loop()

View File

@ -155,7 +155,7 @@ class Connection(abc.ABC):
# Actual TCP connection is performed here. # Actual TCP connection is performed here.
await asyncio.wait_for( await asyncio.wait_for(
asyncio.get_event_loop().sock_connect(sock=sock, address=address), helpers.get_running_loop().sock_connect(sock=sock, address=address),
timeout=timeout timeout=timeout
) )
@ -190,7 +190,7 @@ class Connection(abc.ABC):
# Actual TCP connection and negotiation performed here. # Actual TCP connection and negotiation performed here.
await asyncio.wait_for( await asyncio.wait_for(
asyncio.get_event_loop().sock_connect(sock=sock, address=address), helpers.get_running_loop().sock_connect(sock=sock, address=address),
timeout=timeout timeout=timeout
) )
@ -244,7 +244,7 @@ class Connection(abc.ABC):
await self._connect(timeout=timeout, ssl=ssl) await self._connect(timeout=timeout, ssl=ssl)
self._connected = True self._connected = True
loop = asyncio.get_event_loop() loop = helpers.get_running_loop()
self._send_task = loop.create_task(self._send_loop()) self._send_task = loop.create_task(self._send_loop())
self._recv_task = loop.create_task(self._recv_loop()) self._recv_task = loop.create_task(self._recv_loop())

View File

@ -68,7 +68,7 @@ class MTProtoSender:
# pending futures should be cancelled. # pending futures should be cancelled.
self._user_connected = False self._user_connected = False
self._reconnecting = False self._reconnecting = False
self._disconnected = asyncio.get_event_loop().create_future() self._disconnected = helpers.get_running_loop().create_future()
self._disconnected.set_result(None) self._disconnected.set_result(None)
# We need to join the loops upon disconnection # We need to join the loops upon disconnection
@ -261,7 +261,7 @@ class MTProtoSender:
await self._disconnect(error=e) await self._disconnect(error=e)
raise e raise e
loop = asyncio.get_event_loop() loop = helpers.get_running_loop()
self._log.debug('Starting send loop') self._log.debug('Starting send loop')
self._send_loop_handle = loop.create_task(self._send_loop()) self._send_loop_handle = loop.create_task(self._send_loop())
@ -400,7 +400,7 @@ class MTProtoSender:
self._pending_state.clear() self._pending_state.clear()
if self._auto_reconnect_callback: if self._auto_reconnect_callback:
asyncio.get_event_loop().create_task(self._auto_reconnect_callback()) helpers.get_running_loop().create_task(self._auto_reconnect_callback())
break break
else: else:
@ -425,7 +425,7 @@ class MTProtoSender:
# gets stuck. # gets stuck.
# TODO It still gets stuck? Investigate where and why. # TODO It still gets stuck? Investigate where and why.
self._reconnecting = True self._reconnecting = True
asyncio.get_event_loop().create_task(self._reconnect(error)) helpers.get_running_loop().create_task(self._reconnect(error))
def _keepalive_ping(self, rnd_id): def _keepalive_ping(self, rnd_id):
""" """

View File

@ -14,7 +14,7 @@ import asyncio
import functools import functools
import inspect import inspect
from . import events, errors, utils, connection from . import events, errors, utils, connection, helpers
from .client.account import _TakeoutClient from .client.account import _TakeoutClient
from .client.telegramclient import TelegramClient from .client.telegramclient import TelegramClient
from .tl import types, functions, custom from .tl import types, functions, custom
@ -32,7 +32,7 @@ def _syncify_wrap(t, method_name):
@functools.wraps(method) @functools.wraps(method)
def syncified(*args, **kwargs): def syncified(*args, **kwargs):
coro = method(*args, **kwargs) coro = method(*args, **kwargs)
loop = asyncio.get_event_loop() loop = helpers.get_running_loop()
if loop.is_running(): if loop.is_running():
return coro return coro
else: else:

View File

@ -53,7 +53,7 @@ def callback(func):
def wrapped(*args, **kwargs): def wrapped(*args, **kwargs):
result = func(*args, **kwargs) result = func(*args, **kwargs)
if inspect.iscoroutine(result): if inspect.iscoroutine(result):
aio_loop.create_task(result) asyncio.create_task(result)
return wrapped return wrapped
@ -369,10 +369,4 @@ async def main(interval=0.05):
if __name__ == "__main__": if __name__ == "__main__":
# Some boilerplate code to set up the main method asyncio.run(main())
aio_loop = asyncio.get_event_loop()
try:
aio_loop.run_until_complete(main())
finally:
if not aio_loop.is_closed():
aio_loop.close()

View File

@ -9,9 +9,6 @@ from telethon.errors import SessionPasswordNeededError
from telethon.network import ConnectionTcpAbridged from telethon.network import ConnectionTcpAbridged
from telethon.utils import get_display_name from telethon.utils import get_display_name
# Create a global variable to hold the loop we will be using
loop = asyncio.get_event_loop()
def sprint(string, *args, **kwargs): def sprint(string, *args, **kwargs):
"""Safe Print (handle UnicodeEncodeErrors on some terminals)""" """Safe Print (handle UnicodeEncodeErrors on some terminals)"""
@ -50,7 +47,7 @@ async def async_input(prompt):
let the loop run while we wait for input. let the loop run while we wait for input.
""" """
print(prompt, end='', flush=True) print(prompt, end='', flush=True)
return (await loop.run_in_executor(None, sys.stdin.readline)).rstrip() return (await asyncio.get_running_loop().run_in_executor(None, sys.stdin.readline)).rstrip()
def get_env(name, message, cast=str): def get_env(name, message, cast=str):
@ -109,34 +106,34 @@ class InteractiveTelegramClient(TelegramClient):
# media known the message ID, for every message having media. # media known the message ID, for every message having media.
self.found_media = {} self.found_media = {}
async def init(self):
# Calling .connect() may raise a connection error False, so you need # Calling .connect() may raise a connection error False, so you need
# to except those before continuing. Otherwise you may want to retry # to except those before continuing. Otherwise you may want to retry
# as done here. # as done here.
print('Connecting to Telegram servers...') print('Connecting to Telegram servers...')
try: try:
loop.run_until_complete(self.connect()) await self.connect()
except IOError: except IOError:
# We handle IOError and not ConnectionError because # We handle IOError and not ConnectionError because
# PySocks' errors do not subclass ConnectionError # PySocks' errors do not subclass ConnectionError
# (so this will work with and without proxies). # (so this will work with and without proxies).
print('Initial connection failed. Retrying...') print('Initial connection failed. Retrying...')
loop.run_until_complete(self.connect()) await self.connect()
# If the user hasn't called .sign_in() or .sign_up() yet, they won't # If the user hasn't called .sign_in() or .sign_up() yet, they won't
# be authorized. The first thing you must do is authorize. Calling # be authorized. The first thing you must do is authorize. Calling
# .sign_in() should only be done once as the information is saved on # .sign_in() should only be done once as the information is saved on
# the *.session file so you don't need to enter the code every time. # the *.session file so you don't need to enter the code every time.
if not loop.run_until_complete(self.is_user_authorized()): if not await self.is_user_authorized():
print('First run. Sending code request...') print('First run. Sending code request...')
user_phone = input('Enter your phone: ') user_phone = input('Enter your phone: ')
loop.run_until_complete(self.sign_in(user_phone)) await self.sign_in(user_phone)
self_user = None self_user = None
while self_user is None: while self_user is None:
code = input('Enter the code you just received: ') code = input('Enter the code you just received: ')
try: try:
self_user =\ self_user = await self.sign_in(code=code)
loop.run_until_complete(self.sign_in(code=code))
# Two-step verification may be enabled, and .sign_in will # Two-step verification may be enabled, and .sign_in will
# raise this error. If that's the case ask for the password. # raise this error. If that's the case ask for the password.
@ -146,8 +143,7 @@ class InteractiveTelegramClient(TelegramClient):
pw = getpass('Two step verification is enabled. ' pw = getpass('Two step verification is enabled. '
'Please enter your password: ') 'Please enter your password: ')
self_user =\ self_user = await self.sign_in(password=pw)
loop.run_until_complete(self.sign_in(password=pw))
async def run(self): async def run(self):
"""Main loop of the TelegramClient, will wait for user action""" """Main loop of the TelegramClient, will wait for user action"""
@ -397,9 +393,14 @@ class InteractiveTelegramClient(TelegramClient):
)) ))
if __name__ == '__main__': async def main():
SESSION = os.environ.get('TG_SESSION', 'interactive') SESSION = os.environ.get('TG_SESSION', 'interactive')
API_ID = get_env('TG_API_ID', 'Enter your API ID: ', int) API_ID = get_env('TG_API_ID', 'Enter your API ID: ', int)
API_HASH = get_env('TG_API_HASH', 'Enter your API hash: ') API_HASH = get_env('TG_API_HASH', 'Enter your API hash: ')
client = InteractiveTelegramClient(SESSION, API_ID, API_HASH) client = InteractiveTelegramClient(SESSION, API_ID, API_HASH)
loop.run_until_complete(client.run()) await client.init()
await client.run()
if __name__ == '__main__':
asyncio.run()

View File

@ -7,8 +7,6 @@ import os
import time import time
import sys import sys
loop = asyncio.get_event_loop()
""" """
Provider token can be obtained via @BotFather. more info at https://core.telegram.org/bots/payments#getting-a-token Provider token can be obtained via @BotFather. more info at https://core.telegram.org/bots/payments#getting-a-token
@ -180,4 +178,4 @@ if __name__ == '__main__':
if not provider_token: if not provider_token:
logger.error("No provider token supplied.") logger.error("No provider token supplied.")
exit(1) exit(1)
loop.run_until_complete(main()) asyncio.run(main())

View File

@ -1,7 +1,6 @@
import base64 import base64
import os import os
import hypercorn.asyncio
from quart import Quart, render_template_string, request from quart import Quart, render_template_string, request
from telethon import TelegramClient, utils from telethon import TelegramClient, utils
@ -82,6 +81,8 @@ async def format_message(message):
# Connect the client before we start serving with Quart # Connect the client before we start serving with Quart
@app.before_serving @app.before_serving
async def startup(): async def startup():
# After connecting, the client will create additional asyncio tasks that run until it's disconnected again.
# Be careful to not mix different asyncio loops during a client's lifetime, or things won't work properly!
await client.connect() await client.connect()
@ -129,24 +130,11 @@ async def root():
return await render_template_string(BASE_TEMPLATE, content=CODE_FORM) return await render_template_string(BASE_TEMPLATE, content=CODE_FORM)
async def main():
await hypercorn.asyncio.serve(app, hypercorn.Config())
# By default, `Quart.run` uses `asyncio.run()`, which creates a new asyncio # By default, `Quart.run` uses `asyncio.run()`, which creates a new asyncio
# event loop. If we create the `TelegramClient` before, `telethon` will # event loop. If we had connected the `TelegramClient` before, `telethon` will
# use `asyncio.get_event_loop()`, which is the implicit loop in the main # use `asyncio.get_running_loop()` to create some additional tasks. If these
# thread. These two loops are different, and it won't work. # loops are different, it won't work.
# #
# So, we have to manually pass the same `loop` to both applications to # To keep things simple, be sure to not create multiple asyncio loops!
# make 100% sure it works and to avoid headaches.
#
# To run Quart inside `async def`, we must use `hypercorn.asyncio.serve()`
# directly.
#
# This example creates a global client outside of Quart handlers.
# If you create the client inside the handlers (common case), you
# won't have to worry about any of this, but it's still good to be
# explicit about the event loop.
if __name__ == '__main__': if __name__ == '__main__':
client.loop.run_until_complete(main()) app.run()