Telethon/telethon/client/telegrambaseclient.py

496 lines
18 KiB
Python
Raw Normal View History

import abc
2018-06-14 20:35:12 +03:00
import asyncio
import inspect
import logging
2018-03-02 22:05:09 +03:00
import platform
import sys
2018-06-18 14:22:25 +03:00
import time
from datetime import timedelta, datetime
2018-06-09 23:14:51 +03:00
from .. import version
from ..crypto import rsa
from ..extensions import markdown
from ..network import MTProtoSender, ConnectionTcpFull
from ..network.mtprotostate import MTProtoState
from ..sessions import Session, SQLiteSession
2018-06-20 20:48:00 +03:00
from ..tl import TLObject, functions, types
2018-06-18 22:02:42 +03:00
from ..tl.alltlobjects import LAYER
DEFAULT_DC_ID = 4
DEFAULT_IPV4_IP = '149.154.167.51'
DEFAULT_IPV6_IP = '[2001:67c:4e8:f002::a]'
DEFAULT_PORT = 443
__log__ = logging.getLogger(__name__)
class TelegramBaseClient(abc.ABC):
"""
This is the abstract base class for the client. It defines some
basic stuff like connecting, switching data center, etc, and
leaves the `__call__` unimplemented.
Args:
session (`str` | `telethon.sessions.abstract.Session`, `None`):
The file name of the session file to be used if a string is
given (it may be a full path), or the Session instance to be
used otherwise. If it's ``None``, the session will not be saved,
and you should call :meth:`.log_out()` when you're done.
Note that if you pass a string it will be a file in the current
working directory, although you can also pass absolute paths.
The session file contains enough information for you to login
without re-sending the code, so if you have to enter the code
more than once, maybe you're changing the working directory,
renaming or removing the file, or using random names.
api_id (`int` | `str`):
The API ID you obtained from https://my.telegram.org.
api_hash (`str`):
The API ID you obtained from https://my.telegram.org.
connection (`telethon.network.connection.common.Connection`, optional):
The connection instance to be used when creating a new connection
to the servers. If it's a type, the `proxy` argument will be used.
Defaults to `telethon.network.connection.tcpfull.ConnectionTcpFull`.
use_ipv6 (`bool`, optional):
Whether to connect to the servers through IPv6 or not.
By default this is ``False`` as IPv6 support is not
too widespread yet.
proxy (`tuple` | `dict`, optional):
A tuple consisting of ``(socks.SOCKS5, 'host', port)``.
See https://github.com/Anorov/PySocks#usage-1 for more.
timeout (`int` | `float` | `timedelta`, optional):
The timeout to be used when connecting, sending and receiving
responses from the network. This is **not** the timeout to
be used when ``await``'ing for invoked requests, and you
should use ``asyncio.wait`` or ``asyncio.wait_for`` for that.
request_retries (`int`, optional):
How many times a request should be retried. Request are retried
when Telegram is having internal issues (due to either
``errors.ServerError`` or ``errors.RpcCallFailError``),
when there is a ``errors.FloodWaitError`` less than
`flood_sleep_threshold`, or when there's a migrate error.
May set to a false-y value (``0`` or ``None``) for infinite
retries, but this is not recommended, since some requests can
always trigger a call fail (such as searching for messages).
connection_retries (`int`, optional):
How many times the reconnection should retry, either on the
initial connection or when Telegram disconnects us. May be
set to a false-y value (``0`` or ``None``) for infinite
retries, but this is not recommended, since the program can
get stuck in an infinite loop.
auto_reconnect (`bool`, optional):
Whether reconnection should be retried `connection_retries`
times automatically if Telegram disconnects us or not.
flood_sleep_threshold (`int` | `float`, optional):
The threshold below which the library should automatically
sleep on flood wait errors (inclusive). For instance, if a
``FloodWaitError`` for 17s occurs and `flood_sleep_threshold`
is 20s, the library will ``sleep`` automatically. If the error
was for 21s, it would ``raise FloodWaitError`` instead. Values
larger than a day (like ``float('inf')``) will be changed to a day.
device_model (`str`, optional):
"Device model" to be sent when creating the initial connection.
Defaults to ``platform.node()``.
system_version (`str`, optional):
"System version" to be sent when creating the initial connection.
Defaults to ``platform.system()``.
app_version (`str`, optional):
"App version" to be sent when creating the initial connection.
Defaults to `telethon.version.__version__`.
lang_code (`str`, optional):
"Language code" to be sent when creating the initial connection.
Defaults to ``'en'``.
system_lang_code (`str`, optional):
"System lang code" to be sent when creating the initial connection.
Defaults to `lang_code`.
"""
# Current TelegramClient version
__version__ = version.__version__
# Cached server configuration (with .dc_options), can be "global"
_config = None
_cdn_config = None
# region Initialization
def __init__(self, session, api_id, api_hash,
*,
connection=ConnectionTcpFull,
use_ipv6=False,
proxy=None,
2018-06-17 12:46:56 +03:00
timeout=timedelta(seconds=10),
request_retries=5,
connection_retries=5,
auto_reconnect=True,
flood_sleep_threshold=60,
2018-03-02 22:05:09 +03:00
device_model=None,
system_version=None,
app_version=None,
lang_code='en',
2018-06-14 20:35:12 +03:00
system_lang_code='en',
loop=None):
if not api_id or not api_hash:
raise ValueError(
"Your API ID or Hash cannot be empty or None. "
2018-01-08 16:04:04 +03:00
"Refer to telethon.rtfd.io for more information.")
self._use_ipv6 = use_ipv6
2018-06-14 20:35:12 +03:00
self._loop = loop or asyncio.get_event_loop()
# Determine what session object we have
if isinstance(session, str) or session is None:
session = SQLiteSession(session)
elif not isinstance(session, Session):
raise TypeError(
'The given session must be a str or a Session instance.'
)
# ':' in session.server_address is True if it's an IPv6 address
if (not session.server_address or
(':' in session.server_address) != use_ipv6):
session.set_dc(
DEFAULT_DC_ID,
DEFAULT_IPV6_IP if self._use_ipv6 else DEFAULT_IPV4_IP,
DEFAULT_PORT
)
self.flood_sleep_threshold = flood_sleep_threshold
self.session = session
2017-06-11 23:42:04 +03:00
self.api_id = int(api_id)
self.api_hash = api_hash
self._request_retries = request_retries or sys.maxsize
self._connection_retries = connection_retries or sys.maxsize
self._auto_reconnect = auto_reconnect
if isinstance(connection, type):
2018-06-14 20:35:12 +03:00
connection = connection(
proxy=proxy, timeout=timeout, loop=self._loop)
2018-06-11 21:05:10 +03:00
# Used on connection. Capture the variables in a lambda since
# exporting clients need to create this InvokeWithLayerRequest.
system = platform.uname()
2018-06-11 21:05:10 +03:00
self._init_with = lambda x: functions.InvokeWithLayerRequest(
LAYER, functions.InitConnectionRequest(
api_id=self.api_id,
device_model=device_model or system.system or 'Unknown',
system_version=system_version or system.release or '1.0',
app_version=app_version or self.__version__,
lang_code=lang_code,
system_lang_code=system_lang_code,
lang_pack='', # "langPacks are for official apps only"
2018-06-11 21:05:10 +03:00
query=x
)
)
2018-06-11 21:05:10 +03:00
state = MTProtoState(self.session.auth_key)
self._connection = connection
self._sender = MTProtoSender(
2018-06-14 20:35:12 +03:00
state, connection, self._loop,
retries=self._connection_retries,
auto_reconnect=self._auto_reconnect,
update_callback=self._handle_update,
auth_key_callback=self._auth_key_callback,
auto_reconnect_callback=self._handle_auto_reconnect
2018-06-11 21:05:10 +03:00
)
# Cache ``{dc_id: (n, MTProtoSender)}`` for all borrowed senders,
# being ``n`` the amount of borrows a given sender has; once ``n``
# reaches ``0`` it should be disconnected and removed.
self._borrowed_senders = {}
self._borrow_sender_lock = asyncio.Lock()
# Save whether the user is authorized here (a.k.a. logged in)
self._authorized = None # None = We don't know yet
# Default PingRequest delay
self._last_ping = datetime.now()
self._ping_delay = timedelta(minutes=1)
2018-06-18 14:22:25 +03:00
self._updates_handle = None
self._last_request = time.time()
2018-06-20 20:48:00 +03:00
self._channel_pts = {}
# Start with invalid state (-1) so we can have somewhere to store
# the state, but also be able to determine if we are authorized.
self._state = types.updates.State(-1, 0, datetime.now(), 0, -1)
# Some further state for subclasses
self._event_builders = []
self._events_pending_resolve = []
2018-06-15 11:11:22 +03:00
self._event_resolve_lock = asyncio.Lock()
# Default parse mode
self._parse_mode = markdown
# Some fields to easy signing in. Let {phone: hash} be
# a dictionary because the user may change their mind.
self._phone_code_hash = {}
self._phone = None
self._tos = None
# Sometimes we need to know who we are, cache the self peer
self._self_input_peer = None
# endregion
2018-06-14 20:35:12 +03:00
# region Properties
@property
def loop(self):
return self._loop
@property
def disconnected(self):
"""
Future that resolves when the connection to Telegram
ends, either by user action or in the background.
"""
return self._sender.disconnected
2018-06-14 20:35:12 +03:00
# endregion
# region Connecting
async def connect(self):
"""
Connects to Telegram.
"""
await self._sender.connect(
self.session.server_address, self.session.port)
await self._sender.send(self._init_with(
functions.help.GetConfigRequest()))
2018-06-18 14:22:25 +03:00
self._updates_handle = self._loop.create_task(self._update_loop())
def is_connected(self):
"""
Returns ``True`` if the user has connected.
"""
return self._sender.is_connected()
async def disconnect(self):
"""
Disconnects from Telegram.
"""
await self._disconnect()
if getattr(self, 'session', None):
self.session.close()
async def _disconnect(self):
"""
Disconnect only, without closing the session. Used in reconnections
to different data centers, where we don't want to close the session
file; user disconnects however should close it since it means that
their job with the client is complete and we should clean it up all.
"""
2018-06-26 13:03:31 +03:00
# All properties may be ``None`` if `__init__` fails, and this
# method will be called from `__del__` which would crash then.
if getattr(self, '_sender', None):
2018-06-26 13:03:31 +03:00
await self._sender.disconnect()
if getattr(self, '_updates_handle', None):
await self._updates_handle
def __del__(self):
if not self.is_connected() or self.loop.is_closed():
return
# Python 3.5.2's ``asyncio`` mod seems to have a bug where it's not
# able to close the pending tasks properly, and letting the script
# complete without calling disconnect causes the script to trigger
# 100% CPU load. Call disconnect to make sure it doesn't happen.
2018-06-26 12:23:51 +03:00
if not inspect.iscoroutinefunction(self.disconnect):
self.disconnect()
elif self._loop.is_running():
self._loop.create_task(self.disconnect())
else:
self._loop.run_until_complete(self.disconnect())
async def _switch_dc(self, new_dc):
"""
Permanently switches the current connection to the new data center.
"""
__log__.info('Reconnecting to new data center %s', new_dc)
dc = await self._get_dc(new_dc)
self.session.set_dc(dc.id, dc.ip_address, dc.port)
# auth_key's are associated with a server, which has now changed
# so it's not valid anymore. Set to None to force recreating it.
self.session.auth_key = self._sender.state.auth_key = None
self.session.save()
await self._disconnect()
return await self.connect()
def _auth_key_callback(self, auth_key):
"""
Callback from the sender whenever it needed to generate a
new authorization key. This means we are not authorized.
"""
self._authorized = None
self.session.auth_key = auth_key
self.session.save()
# endregion
# region Working with different connections/Data Centers
async def _get_dc(self, dc_id, cdn=False):
"""Gets the Data Center (DC) associated to 'dc_id'"""
cls = self.__class__
if not cls._config:
cls._config = await self(functions.help.GetConfigRequest())
if cdn and not self._cdn_config:
cls._cdn_config = await self(functions.help.GetCdnConfigRequest())
for pk in cls._cdn_config.public_keys:
rsa.add_key(pk.public_key)
return next(
dc for dc in cls._config.dc_options
if dc.id == dc_id
and bool(dc.ipv6) == self._use_ipv6 and bool(dc.cdn) == cdn
)
async def _create_exported_sender(self, dc_id):
2018-06-11 21:05:10 +03:00
"""
Creates a new exported `MTProtoSender` for the given `dc_id` and
returns it. This method should be used by `_borrow_exported_sender`.
"""
# Thanks badoualy/kotlogram on /telegram/api/DefaultTelegramClient.kt
2018-06-11 21:05:10 +03:00
# for clearly showing how to export the authorization
dc = await self._get_dc(dc_id)
state = MTProtoState(None)
2018-06-11 21:05:10 +03:00
# Can't reuse self._sender._connection as it has its own seqno.
#
# If one were to do that, Telegram would reset the connection
# with no further clues.
2018-06-14 20:35:12 +03:00
sender = MTProtoSender(state, self._connection.clone(), self._loop)
2018-06-11 21:05:10 +03:00
await sender.connect(dc.ip_address, dc.port)
__log__.info('Exporting authorization for data center %s', dc)
auth = await self(functions.auth.ExportAuthorizationRequest(dc_id))
req = self._init_with(functions.auth.ImportAuthorizationRequest(
id=auth.id, bytes=auth.bytes
))
await sender.send(req)
return sender
async def _borrow_exported_sender(self, dc_id):
"""
Borrows a connected `MTProtoSender` for the given `dc_id`.
If it's not cached, creates a new one if it doesn't exist yet,
and imports a freshly exported authorization key for it to be usable.
Once its job is over it should be `_return_exported_sender`.
"""
async with self._borrow_sender_lock:
n, sender = self._borrowed_senders.get(dc_id, (0, None))
if not sender:
sender = await self._create_exported_sender(dc_id)
sender.dc_id = dc_id
self._borrowed_senders[dc_id] = (n + 1, sender)
2018-06-11 21:05:10 +03:00
return sender
async def _return_exported_sender(self, sender):
"""
Returns a borrowed exported sender. If all borrows have
been returned, the sender is cleanly disconnected.
"""
async with self._borrow_sender_lock:
dc_id = sender.dc_id
n, _ = self._borrowed_senders[dc_id]
n -= 1
if n > 0:
self._borrowed_senders[dc_id] = (n, sender)
else:
__log__.info('Disconnecting borrowed sender for DC %d', dc_id)
await sender.disconnect()
del self._borrowed_senders[dc_id]
async def _get_cdn_client(self, cdn_redirect):
"""Similar to ._borrow_exported_client, but for CDNs"""
# TODO Implement
raise NotImplementedError
session = self._exported_sessions.get(cdn_redirect.dc_id)
if not session:
dc = await self._get_dc(cdn_redirect.dc_id, cdn=True)
session = self.session.clone()
session.set_dc(dc.id, dc.ip_address, dc.port)
self._exported_sessions[cdn_redirect.dc_id] = session
__log__.info('Creating new CDN client')
client = TelegramBareClient(
session, self.api_id, self.api_hash,
proxy=self._sender.connection.conn.proxy,
timeout=self._sender.connection.get_timeout()
)
# This will make use of the new RSA keys for this specific CDN.
#
# We won't be calling GetConfigRequest because it's only called
# when needed by ._get_dc, and also it's static so it's likely
# set already. Avoid invoking non-CDN methods by not syncing updates.
client.connect(_sync_updates=False)
client._authorized = self._authorized
return client
# endregion
# region Invoking Telegram requests
@abc.abstractmethod
def __call__(self, request, ordered=False):
"""
Invokes (sends) one or more MTProtoRequests and returns (receives)
their result.
Args:
request (`TLObject` | `list`):
The request or requests to be invoked.
ordered (`bool`, optional):
Whether the requests (if more than one was given) should be
executed sequentially on the server. They run in arbitrary
order by default.
Returns:
The result of the request (often a `TLObject`) or a list of
results if more than one request was given.
"""
raise NotImplementedError
2018-06-13 17:20:15 +03:00
@abc.abstractmethod
def _handle_update(self, update):
raise NotImplementedError
2018-06-18 14:22:25 +03:00
@abc.abstractmethod
def _update_loop(self):
raise NotImplementedError
@abc.abstractmethod
async def _handle_auto_reconnect(self):
raise NotImplementedError
# endregion