Telethon/telethon/full_sync.py

185 lines
5.9 KiB
Python

"""
This magical module will rewrite all public methods in the public interface of
the library so they can delegate the call to an asyncio event loop in another
thread and wait for the result. This rewrite may not be desirable if the end
user always uses the methods they way they should be ran, but it's incredibly
useful for quick scripts and legacy code.
"""
import asyncio
import functools
import inspect
import threading
from concurrent.futures import Future, ThreadPoolExecutor
from async_generator import isasyncgenfunction
from . import events
from .client.telegramclient import TelegramClient
from .tl.custom import (
Draft, Dialog, MessageButton, Forward, Message, InlineResult, Conversation
)
from .tl.custom.chatgetter import ChatGetter
from .tl.custom.sendergetter import SenderGetter
async def _proxy_future(af, cf):
try:
res = await af
cf.set_result(res)
except Exception as e:
cf.set_exception(e)
def _sync_result(loop, x):
f = Future()
loop.call_soon_threadsafe(asyncio.ensure_future, _proxy_future(x, f))
return f.result()
class _SyncGen:
def __init__(self, loop, gen):
self.loop = loop
self.gen = gen
def __iter__(self):
return self
def __next__(self):
try:
return _sync_result(self.loop, self.gen.__anext__())
except StopAsyncIteration:
raise StopIteration from None
def _syncify_wrap(t, method_name, loop, thread_ident,
syncifier=_sync_result, rename=None):
method = getattr(t, method_name)
@functools.wraps(method)
def syncified(*args, **kwargs):
coro = method(*args, **kwargs)
return (
coro if threading.get_ident() == thread_ident
else syncifier(loop, coro)
)
setattr(t, rename or method_name, syncified)
def _syncify(*types, loop, thread_ident):
for t in types:
# __enter__ and __exit__ need special care (VERY dirty hack).
#
# Normally we want them to raise if the loop is running because
# the user can't await there, and they need the async with variant.
#
# However they check if the loop is running to raise, which it is
# with full_sync enabled, so we patch them with the async variant.
if hasattr(t, '__aenter__'):
_syncify_wrap(
t, '__aenter__', loop, thread_ident, rename='__enter__')
_syncify_wrap(
t, '__aexit__', loop, thread_ident, rename='__exit__')
for name in dir(t):
if not name.startswith('_') or name == '__call__':
meth = getattr(t, name)
meth = getattr(meth, '__tl.sync', meth)
if inspect.iscoroutinefunction(meth):
_syncify_wrap(t, name, loop, thread_ident)
elif isasyncgenfunction(meth):
_syncify_wrap(t, name, loop, thread_ident, _SyncGen)
__asyncthread = None
def enable(*, loop=None, executor=None, max_workers=1):
"""
Enables the fully synchronous mode. You should enable this at
the beginning of your script, right after importing, only once.
**Please** make sure to call `stop` at the end of your script.
You can define the event loop to use and executor, otherwise
the default loop and ``ThreadPoolExecutor`` will be used, in
which case `max_workers` will be passed to it. If you pass a
custom executor, `max_workers` will be ignored.
"""
global __asyncthread
if __asyncthread is not None:
raise RuntimeError("full_sync can only be enabled once")
if not loop:
loop = asyncio.get_event_loop()
if not executor:
executor = ThreadPoolExecutor(max_workers=max_workers)
def start():
asyncio.set_event_loop(loop)
loop.run_forever()
__asyncthread = threading.Thread(
target=start, name="__telethon_async_thread__", daemon=True
)
__asyncthread.start()
__asyncthread.loop = loop
__asyncthread.executor = executor
TelegramClient.__init__ = functools.partialmethod(
TelegramClient.__init__, loop=loop
)
event_cls = filter(None, (
getattr(getattr(events, name), 'Event', None)
for name in dir(events)
))
_syncify(TelegramClient, Draft, Dialog, MessageButton, ChatGetter,
SenderGetter, Forward, Message, InlineResult, Conversation,
*event_cls,
loop=loop, thread_ident=__asyncthread.ident)
_syncify_wrap(TelegramClient, "start", loop, __asyncthread.ident)
old_add_event_handler = TelegramClient.add_event_handler
old_remove_event_handler = TelegramClient.remove_event_handler
proxied_event_handlers = {}
@functools.wraps(old_add_event_handler)
def add_proxied_event_handler(self, callback, *args, **kwargs):
async def _proxy(*pargs, **pkwargs):
await loop.run_in_executor(
executor, functools.partial(callback, *pargs, **pkwargs))
proxied_event_handlers[callback] = _proxy
args = (self, _proxy, *args)
return old_add_event_handler(*args, **kwargs)
@functools.wraps(old_remove_event_handler)
def remove_proxied_event_handler(self, callback, *args, **kwargs):
args = (self, proxied_event_handlers.get(callback, callback), *args)
return old_remove_event_handler(*args, **kwargs)
TelegramClient.add_event_handler = add_proxied_event_handler
TelegramClient.remove_event_handler = remove_proxied_event_handler
def run_until_disconnected(self):
return _sync_result(loop, self._run_until_disconnected())
TelegramClient.run_until_disconnected = run_until_disconnected
return __asyncthread
def stop():
"""
Stops the fully synchronous code. You
should call this before your script exits.
"""
global __asyncthread
if not __asyncthread:
raise RuntimeError("Can't find asyncio thread")
__asyncthread.loop.call_soon_threadsafe(__asyncthread.loop.stop)
__asyncthread.executor.shutdown()