Telethon doesn't come with scheduling builtin because it would be unnecessary bloat. If you don't want more dependencies, you can just use asyncio
to schedule functions to run later or at a given time.
For example, if you want to run foo
after 10 minutes:
async def foo():
print('hi')
def foo_cb():
client.loop.create_task(foo())
client.loop.call_later(10 * 60, foo_cb)
loop.call_later()
can only schedule synchronous functions (foo_cb
), but you can use create_task
to spawn a background, asynchronous task.
You can also run something "every N seconds". For example, to repeat foo
every 10 minutes:
async def foo():
print('hi')
def foo_cb():
client.loop.create_task(foo())
client.loop.call_later(10 * 60, foo_cb)
foo_cb()
This schedules itself again when it runs, so it is always running. It runs, 10 minutes pass, it schedules itself again, and repeat. This is similar to JavaScript's setTimeout
.
Important note. Do not call the synchronous function when you schedule it, or it will recurse forever.
loop.call_later(delay, func())
won't work, butloop.call_later(delay, func)
will.
You can also use while
loops:
async def foo():
while True:
print('hi')
await sleep(10 * 60)
client.loop.create_task(foo())
When you're done setting everything up, the event loop must be running or things won't work. You can do this with:
client.run_until_disconnected()
or:
loop.run_forever()
If you don't mind using other libraries, aiocron
is a good option. See https://crontab.guru/ to learn its time syntax.
You can also use apscheduler
, which is much simpler and much efficient.
Here is a example for running a job (Function) everyday at 6am.
from apscheduler.schedulers.asyncio import AsyncIOScheduler
async def greet_me():
print("Good morning Midhun.")
await client.send_message('me', 'Good Morning To Myself !')
scheduler = AsyncIOScheduler()
scheduler.add_job(greet_me, trigger="cron", hour=6)
scheduler.start()
You can also run a job at specific intervals, here is a example for it.
from apscheduler.schedulers.asyncio import AsyncIOScheduler
async def this_will_run_at_every_one_hour():
await client.send_message('me', 'One hour has passed.')
scheduler = AsyncIOScheduler()
scheduler.add_job(this_will_run_at_every_one_hour, 'interval', minutes=60)
scheduler.start()
You can also set a custom time zone and can also use a database for running these functions.
This is more advanced and works smoothly with Telethon.
Read more about this Here