Move core functionality to the TelegramBareClient

Rationale: the intended behaviour of the TelegramClient will now
be to focus on abstracting the users from manually importing
requests and types to work with Telegram's API. Thus, all the
core functionality has been moved to the TelegramBareClient,
which will now be responsible of spawning new threads or
connections and even handling updates.

This way there is a clear distinction between the two clients,
TelegramClient is the one meant to be exposed to the end user,
since it provides all the mentioned abstractions, while the
TelegramBareClient is the "basic" client needed to work with
the API in a comfortable way.

There is still a need for an MtProtoSender, which still even
lower level, and knows as little as possible of what requests
are. This handles parsing the messages received from the
server so that their result can be understood.
This commit is contained in:
Lonami Exo 2017-09-29 20:50:27 +02:00
parent b61deb5cfb
commit 479afddf50
2 changed files with 232 additions and 287 deletions

View File

@ -1,21 +1,24 @@
import logging import logging
from datetime import timedelta import os
import threading
from datetime import timedelta, datetime
from hashlib import md5 from hashlib import md5
from io import BytesIO from io import BytesIO
from os import path
from threading import Lock from threading import Lock
from time import sleep
from . import helpers as utils from . import helpers as utils
from .crypto import rsa, CdnDecrypter from .crypto import rsa, CdnDecrypter
from .errors import ( from .errors import (
RPCError, BrokenAuthKeyError, RPCError, BrokenAuthKeyError,
FloodWaitError, FileMigrateError, TypeNotFoundError FloodWaitError, FileMigrateError, TypeNotFoundError,
UnauthorizedError, PhoneMigrateError, NetworkMigrateError, UserMigrateError
) )
from .network import authenticator, MtProtoSender, Connection, ConnectionMode from .network import authenticator, MtProtoSender, Connection, ConnectionMode
from .tl import TLObject, Session from .tl import TLObject, Session
from .tl.all_tlobjects import LAYER from .tl.all_tlobjects import LAYER
from .tl.functions import ( from .tl.functions import (
InitConnectionRequest, InvokeWithLayerRequest InitConnectionRequest, InvokeWithLayerRequest, PingRequest
) )
from .tl.functions.auth import ( from .tl.functions.auth import (
ImportAuthorizationRequest, ExportAuthorizationRequest ImportAuthorizationRequest, ExportAuthorizationRequest
@ -23,6 +26,7 @@ from .tl.functions.auth import (
from .tl.functions.help import ( from .tl.functions.help import (
GetCdnConfigRequest, GetConfigRequest GetCdnConfigRequest, GetConfigRequest
) )
from .tl.functions.updates import GetStateRequest
from .tl.functions.upload import ( from .tl.functions.upload import (
GetFileRequest, SaveBigFilePartRequest, SaveFilePartRequest GetFileRequest, SaveBigFilePartRequest, SaveFilePartRequest
) )
@ -64,11 +68,22 @@ class TelegramBareClient:
connection_mode=ConnectionMode.TCP_FULL, connection_mode=ConnectionMode.TCP_FULL,
proxy=None, proxy=None,
process_updates=False, process_updates=False,
timeout=timedelta(seconds=5)): timeout=timedelta(seconds=5),
"""Initializes the Telegram client with the specified API ID and Hash. **kwargs):
Session must always be a Session instance, and an optional proxy """Refer to TelegramClient.__init__ for docs on this method"""
can also be specified to be used on the connection. if not api_id or not api_hash:
""" raise PermissionError(
"Your API ID or Hash cannot be empty or None. "
"Refer to Telethon's README.rst for more information.")
# Determine what session object we have
if isinstance(session, str) or session is None:
session = Session.try_load_or_create_new(session)
elif not isinstance(session, Session):
raise ValueError(
'The given session must be a str or a Session instance.'
)
self.session = session self.session = session
self.api_id = int(api_id) self.api_id = int(api_id)
self.api_hash = api_hash self.api_hash = api_hash
@ -95,6 +110,39 @@ class TelegramBareClient:
# One may change self.updates.enabled at any later point. # One may change self.updates.enabled at any later point.
self.updates = UpdateState(process_updates) self.updates = UpdateState(process_updates)
# Used on connection - the user may modify these and reconnect
kwargs['app_version'] = kwargs.get('app_version', self.__version__)
for name, value in kwargs.items():
if hasattr(self.session, name):
setattr(self.session, name, value)
# Despite the state of the real connection, keep track of whether
# the user has explicitly called .connect() or .disconnect() here.
# This information is required by the read thread, who will be the
# one attempting to reconnect on the background *while* the user
# doesn't explicitly call .disconnect(), thus telling it to stop
# retrying. The main thread, knowing there is a background thread
# attempting reconnection as soon as it happens, will just sleep.
self._user_connected = False
# Save whether the user is authorized here (a.k.a. logged in)
self._authorized = False
# Uploaded files cache so subsequent calls are instant
self._upload_cache = {}
# Constantly read for results and updates from within the main client
self._recv_thread = None
# Identifier of the main thread (the one that called .connect()).
# This will be used to create new connections from any other thread,
# so that requests can be sent in parallel.
self._main_thread_ident = None
# Default PingRequest delay
self._last_ping = datetime.now()
self._ping_delay = timedelta(minutes=1)
# endregion # endregion
# region Connecting # region Connecting
@ -108,6 +156,8 @@ class TelegramBareClient:
If 'exported_auth' is not None, it will be used instead to If 'exported_auth' is not None, it will be used instead to
determine the authorization key for the current session. determine the authorization key for the current session.
""" """
self._main_thread_ident = threading.get_ident()
try: try:
self._sender.connect() self._sender.connect()
if not self.session.auth_key: if not self.session.auth_key:
@ -143,6 +193,15 @@ class TelegramBareClient:
TelegramBareClient._dc_options = \ TelegramBareClient._dc_options = \
self(GetConfigRequest()).dc_options self(GetConfigRequest()).dc_options
# Connection was successful! Try syncing the update state
# to also assert whether the user is logged in or not.
self._user_connected = True
try:
self.sync_updates()
self._set_connected_and_authorized()
except UnauthorizedError:
self._authorized = False
return True return True
except TypeNotFoundError as e: except TypeNotFoundError as e:
@ -178,9 +237,23 @@ class TelegramBareClient:
return result return result
def disconnect(self): def disconnect(self):
"""Disconnects from the Telegram server""" """Disconnects from the Telegram server
and stops all the spawned threads"""
self._user_connected = False
self._recv_thread = None
# This will trigger a "ConnectionResetError", for subsequent calls
# to read or send (from another thread) and usually, the background
# thread would try restarting the connection but since the
# ._recv_thread = None, it knows it doesn't have to.
self._sender.disconnect() self._sender.disconnect()
# Also disconnect all the cached senders
for sender in self._cached_clients.values():
sender.disconnect()
self._cached_clients.clear()
def _reconnect(self, new_dc=None): def _reconnect(self, new_dc=None):
"""If 'new_dc' is not set, only a call to .connect() will be made """If 'new_dc' is not set, only a call to .connect() will be made
since it's assumed that the connection has been lost and the since it's assumed that the connection has been lost and the
@ -210,7 +283,11 @@ class TelegramBareClient:
# endregion # endregion
# region Working with different Data Centers # region Working with different connections/Data Centers
def _on_read_thread(self):
return self._recv_thread is not None and \
threading.get_ident() == self._recv_thread.ident
def _get_dc(self, dc_id, ipv6=False, cdn=False): def _get_dc(self, dc_id, ipv6=False, cdn=False):
"""Gets the Data Center (DC) associated to 'dc_id'""" """Gets the Data Center (DC) associated to 'dc_id'"""
@ -290,16 +367,21 @@ class TelegramBareClient:
# region Invoking Telegram requests # region Invoking Telegram requests
def invoke(self, *requests, call_receive=True, retries=5, sender=None): def invoke(self, *requests, call_receive=True, retries=5):
"""Invokes (sends) a MTProtoRequest and returns (receives) its result. """Invokes (sends) a MTProtoRequest and returns (receives) its result.
If 'updates' is not None, all read update object will be put The invoke will be retried up to 'retries' times before raising
in such list. Otherwise, update objects will be ignored. ValueError().
If 'call_receive' is set to False, then there should be another
thread calling to 'self._sender.receive()' running or this method
will lock forever.
""" """
# This is only valid when the read thread is reconnecting,
# that is, the connection lock is locked.
on_read_thread = self._on_read_thread()
if on_read_thread and not self._connect_lock.locked():
return # Just ignore, we would be raising and crashing the thread
# Any error from a background thread will be "posted" and checked here
self.updates.check_error()
if not all(isinstance(x, TLObject) and if not all(isinstance(x, TLObject) and
x.content_related for x in requests): x.content_related for x in requests):
raise ValueError('You can only invoke requests, not types!') raise ValueError('You can only invoke requests, not types!')
@ -307,8 +389,20 @@ class TelegramBareClient:
if retries <= 0: if retries <= 0:
raise ValueError('Number of retries reached 0.') raise ValueError('Number of retries reached 0.')
if sender is None: # Determine the sender to be used (main or a new connection)
# TODO Polish this so it's nicer
on_main_thread = threading.get_ident() == self._main_thread_ident
if on_main_thread or on_read_thread:
sender = self._sender sender = self._sender
else:
conn = Connection(
self.session.server_address, self.session.port,
mode=self._sender.connection._mode,
proxy=self._sender.connection.conn.proxy,
timeout=self._sender.connection.get_timeout()
)
sender = MtProtoSender(self.session, conn)
sender.connect()
try: try:
# Ensure that we start with no previous errors (i.e. resending) # Ensure that we start with no previous errors (i.e. resending)
@ -317,6 +411,14 @@ class TelegramBareClient:
x.rpc_error = None x.rpc_error = None
sender.send(*requests) sender.send(*requests)
# We should call receive from this thread if there's no background
# thread reading or if the server disconnected us and we're trying
# to reconnect. This is because the read thread may either be
# locked also trying to reconnect or we may be said thread already.
call_receive = not on_main_thread or \
self._recv_thread is None or self._connect_lock.locked()
if not call_receive: if not call_receive:
# TODO This will be slightly troublesome if we allow # TODO This will be slightly troublesome if we allow
# switching between constant read or not on the fly. # switching between constant read or not on the fly.
@ -330,22 +432,49 @@ class TelegramBareClient:
while not all(x.confirm_received.is_set() for x in requests): while not all(x.confirm_received.is_set() for x in requests):
sender.receive(update_state=self.updates) sender.receive(update_state=self.updates)
except (PhoneMigrateError, NetworkMigrateError,
UserMigrateError) as e:
self._logger.debug(
'DC error when invoking request, '
'attempting to reconnect at DC {}'.format(e.new_dc)
)
# TODO What happens with the background thread here?
# For normal use cases, this won't happen, because this will only
# be on the very first connection (not authorized, not running),
# but may be an issue for people who actually travel?
self._reconnect(new_dc=e.new_dc)
return self.invoke(
*requests, call_receive=call_receive, retries=(retries - 1)
)
except TimeoutError: except TimeoutError:
pass # We will just retry pass # We will just retry
except ConnectionResetError: except ConnectionResetError:
if self._connect_lock.locked():
# We are connecting and we don't want to reconnect there...
raise
self._logger.debug('Server disconnected us. Reconnecting and ' self._logger.debug('Server disconnected us. Reconnecting and '
'resending request...') 'resending request...')
if sender != self._sender: if sender != self._sender:
# TODO Try reconnecting forever too?
sender.connect() sender.connect()
else: else:
self._reconnect() while self._user_connected and not self._reconnect():
sleep(0.1) # Retry forever until we can send the request
except FloodWaitError: except FloodWaitError:
sender.disconnect() sender.disconnect()
self.disconnect() self.disconnect()
raise raise
finally:
if sender != self._sender:
sender.disconnect()
try: try:
raise next(x.rpc_error for x in requests if x.rpc_error) raise next(x.rpc_error for x in requests if x.rpc_error)
except StopIteration: except StopIteration:
@ -363,6 +492,13 @@ class TelegramBareClient:
# Let people use client(SomeRequest()) instead client.invoke(...) # Let people use client(SomeRequest()) instead client.invoke(...)
__call__ = invoke __call__ = invoke
# Some really basic functionality
def is_user_authorized(self):
"""Has the user been authorized yet
(code request sent and confirmed)?"""
return self._authorized
# endregion # endregion
# region Uploading media # region Uploading media
@ -388,10 +524,10 @@ class TelegramBareClient:
Default values for the optional parameters if left as None are: Default values for the optional parameters if left as None are:
part_size_kb = get_appropriated_part_size(file_size) part_size_kb = get_appropriated_part_size(file_size)
file_name = path.basename(file_path) file_name = os.path.basename(file_path)
""" """
if isinstance(file, str): if isinstance(file, str):
file_size = path.getsize(file) file_size = os.path.getsize(file)
elif isinstance(file, bytes): elif isinstance(file, bytes):
file_size = len(file) file_size = len(file)
else: else:
@ -447,7 +583,7 @@ class TelegramBareClient:
# Set a default file name if None was specified # Set a default file name if None was specified
if not file_name: if not file_name:
if isinstance(file, str): if isinstance(file, str):
file_name = path.basename(file) file_name = os.path.basename(file)
else: else:
file_name = str(file_id) file_name = str(file_id)
@ -544,3 +680,73 @@ class TelegramBareClient:
f.close() f.close()
# endregion # endregion
# region Updates handling
def sync_updates(self):
"""Synchronizes self.updates to their initial state. Will be
called automatically on connection if self.updates.enabled = True,
otherwise it should be called manually after enabling updates.
"""
self.updates.process(self(GetStateRequest()))
def add_update_handler(self, handler):
"""Adds an update handler (a function which takes a TLObject,
an update, as its parameter) and listens for updates"""
sync = not self.updates.handlers
self.updates.handlers.append(handler)
if sync:
self.sync_updates()
def remove_update_handler(self, handler):
self.updates.handlers.remove(handler)
def list_update_handlers(self):
return self.updates.handlers[:]
# endregion
# Constant read
def _set_connected_and_authorized(self):
self._authorized = True
if self._recv_thread is None:
self._recv_thread = threading.Thread(
name='ReadThread', daemon=True,
target=self._recv_thread_impl
)
self._recv_thread.start()
# By using this approach, another thread will be
# created and started upon connection to constantly read
# from the other end. Otherwise, manual calls to .receive()
# must be performed. The MtProtoSender cannot be connected,
# or an error will be thrown.
#
# This way, sending and receiving will be completely independent.
def _recv_thread_impl(self):
while self._user_connected:
try:
if datetime.now() > self._last_ping + self._ping_delay:
self._sender.send(PingRequest(
int.from_bytes(os.urandom(8), 'big', signed=True)
))
self._last_ping = datetime.now()
self._sender.receive(update_state=self.updates)
except TimeoutError:
# No problem.
pass
except ConnectionResetError:
self._logger.debug('Server disconnected us. Reconnecting...')
while self._user_connected and not self._reconnect():
sleep(0.1) # Retry forever, this is instant messaging
except Exception as e:
# Unknown exception, pass it to the main thread
self.updates.set_error(e)
break
self._recv_thread = None
# endregion

View File

@ -1,10 +1,7 @@
import os import os
import threading
from datetime import datetime, timedelta from datetime import datetime, timedelta
from functools import lru_cache from functools import lru_cache
from mimetypes import guess_type from mimetypes import guess_type
from threading import Thread
from time import sleep
try: try:
import socks import socks
@ -15,12 +12,10 @@ from . import TelegramBareClient
from . import helpers as utils from . import helpers as utils
from .errors import ( from .errors import (
RPCError, UnauthorizedError, InvalidParameterError, PhoneCodeEmptyError, RPCError, UnauthorizedError, InvalidParameterError, PhoneCodeEmptyError,
PhoneMigrateError, NetworkMigrateError, UserMigrateError,
PhoneCodeExpiredError, PhoneCodeHashEmptyError, PhoneCodeInvalidError PhoneCodeExpiredError, PhoneCodeHashEmptyError, PhoneCodeInvalidError
) )
from .network import Connection, ConnectionMode, MtProtoSender from .network import ConnectionMode
from .tl import Session, TLObject from .tl import TLObject
from .tl.functions import PingRequest
from .tl.functions.account import ( from .tl.functions.account import (
GetPasswordRequest GetPasswordRequest
) )
@ -35,9 +30,6 @@ from .tl.functions.messages import (
GetDialogsRequest, GetHistoryRequest, ReadHistoryRequest, SendMediaRequest, GetDialogsRequest, GetHistoryRequest, ReadHistoryRequest, SendMediaRequest,
SendMessageRequest SendMessageRequest
) )
from .tl.functions.updates import (
GetStateRequest
)
from .tl.functions.users import ( from .tl.functions.users import (
GetUsersRequest GetUsersRequest
) )
@ -98,18 +90,6 @@ class TelegramClient(TelegramBareClient):
system_lang_code = lang_code system_lang_code = lang_code
report_errors = True report_errors = True
""" """
if not api_id or not api_hash:
raise PermissionError(
"Your API ID or Hash cannot be empty or None. "
"Refer to Telethon's README.rst for more information.")
# Determine what session object we have
if isinstance(session, str) or session is None:
session = Session.try_load_or_create_new(session)
elif not isinstance(session, Session):
raise ValueError(
'The given session must be a str or a Session instance.')
super().__init__( super().__init__(
session, api_id, api_hash, session, api_id, api_hash,
connection_mode=connection_mode, connection_mode=connection_mode,
@ -118,187 +98,16 @@ class TelegramClient(TelegramBareClient):
timeout=timeout timeout=timeout
) )
# Used on connection - the user may modify these and reconnect # Some fields to easy signing in
kwargs['app_version'] = kwargs.get('app_version', self.__version__)
for name, value in kwargs.items():
if hasattr(self.session, name):
setattr(self.session, name, value)
self._updates_thread = None
self._phone_code_hash = None self._phone_code_hash = None
self._phone = None self._phone = None
# Despite the state of the real connection, keep track of whether
# the user has explicitly called .connect() or .disconnect() here.
# This information is required by the read thread, who will be the
# one attempting to reconnect on the background *while* the user
# doesn't explicitly call .disconnect(), thus telling it to stop
# retrying. The main thread, knowing there is a background thread
# attempting reconnection as soon as it happens, will just sleep.
self._user_connected = False
# Save whether the user is authorized here (a.k.a. logged in)
self._authorized = False
# Uploaded files cache so subsequent calls are instant
self._upload_cache = {}
# Constantly read for results and updates from within the main client
self._recv_thread = None
# Identifier of the main thread (the one that called .connect()).
# This will be used to create new connections from any other thread,
# so that requests can be sent in parallel.
self._main_thread_ident = None
# Default PingRequest delay
self._last_ping = datetime.now()
self._ping_delay = timedelta(minutes=1)
# endregion
# region Connecting
def connect(self, exported_auth=None):
"""Connects to the Telegram servers, executing authentication if
required. Note that authenticating to the Telegram servers is
not the same as authenticating the desired user itself, which
may require a call (or several) to 'sign_in' for the first time.
exported_auth is meant for internal purposes and can be ignored.
"""
self._main_thread_ident = threading.get_ident()
if socks and self._recv_thread:
# Treat proxy errors specially since they're not related to
# Telegram itself, but rather to the proxy. If any happens on
# the read thread forward it to the main thread.
try:
ok = super().connect(exported_auth=exported_auth)
except socks.ProxyConnectionError as e:
ok = False
# Report the exception to the main thread
self.updates.set_error(e)
else:
ok = super().connect(exported_auth=exported_auth)
if not ok:
return False
self._user_connected = True
try:
self.sync_updates()
self._set_connected_and_authorized()
except UnauthorizedError:
self._authorized = False
return True
def disconnect(self):
"""Disconnects from the Telegram server
and stops all the spawned threads"""
self._user_connected = False
self._recv_thread = None
# This will trigger a "ConnectionResetError", usually, the background
# thread would try restarting the connection but since the
# ._recv_thread = None, it knows it doesn't have to.
super().disconnect()
# Also disconnect all the cached senders
for sender in self._cached_clients.values():
sender.disconnect()
self._cached_clients.clear()
# endregion
# region Working with different connections
def _on_read_thread(self):
return self._recv_thread is not None and \
threading.get_ident() == self._recv_thread.ident
# endregion # endregion
# region Telegram requests functions # region Telegram requests functions
def invoke(self, *requests, **kwargs):
"""Invokes (sends) one or several MTProtoRequest and returns
(receives) their result. An optional named 'retries' parameter
can be used, indicating how many times it should retry.
"""
# This is only valid when the read thread is reconnecting,
# that is, the connection lock is locked.
on_read_thread = self._on_read_thread()
if on_read_thread and not self._connect_lock.locked():
return # Just ignore, we would be raising and crashing the thread
self.updates.check_error()
# Determine the sender to be used (main or a new connection)
# TODO Polish this so it's nicer
on_main_thread = threading.get_ident() == self._main_thread_ident
if on_main_thread or on_read_thread:
sender = self._sender
else:
conn = Connection(
self.session.server_address, self.session.port,
mode=self._sender.connection._mode,
proxy=self._sender.connection.conn.proxy,
timeout=self._sender.connection.get_timeout()
)
sender = MtProtoSender(self.session, conn)
sender.connect()
try:
# We should call receive from this thread if there's no background
# thread reading or if the server disconnected us and we're trying
# to reconnect. This is because the read thread may either be
# locked also trying to reconnect or we may be said thread already.
call_receive = not on_main_thread or \
self._recv_thread is None or self._connect_lock.locked()
return super().invoke(
*requests,
call_receive=call_receive,
retries=kwargs.get('retries', 5),
sender=sender
)
except (PhoneMigrateError, NetworkMigrateError, UserMigrateError) as e:
self._logger.debug('DC error when invoking request, '
'attempting to reconnect at DC {}'
.format(e.new_dc))
# TODO What happens with the background thread here?
# For normal use cases, this won't happen, because this will only
# be on the very first connection (not authorized, not running),
# but may be an issue for people who actually travel?
self._reconnect(new_dc=e.new_dc)
return self.invoke(*requests)
except ConnectionResetError as e:
if self._connect_lock.locked():
# We are connecting and we don't want to reconnect there...
raise
while self._user_connected and not self._reconnect():
sleep(0.1) # Retry forever until we can send the request
finally:
if sender != self._sender:
sender.disconnect()
# Let people use client(SomeRequest()) instead client.invoke(...)
__call__ = invoke
# region Authorization requests # region Authorization requests
def is_user_authorized(self):
"""Has the user been authorized yet
(code request sent and confirmed)?"""
return self._authorized
def send_code_request(self, phone): def send_code_request(self, phone):
"""Sends a code request to the specified phone number""" """Sends a code request to the specified phone number"""
if isinstance(phone, int): if isinstance(phone, int):
@ -992,73 +801,3 @@ class TelegramClient(TelegramBareClient):
) )
# endregion # endregion
# region Updates handling
def sync_updates(self):
"""Synchronizes self.updates to their initial state. Will be
called automatically on connection if self.updates.enabled = True,
otherwise it should be called manually after enabling updates.
"""
self.updates.process(self(GetStateRequest()))
def add_update_handler(self, handler):
"""Adds an update handler (a function which takes a TLObject,
an update, as its parameter) and listens for updates"""
sync = not self.updates.handlers
self.updates.handlers.append(handler)
if sync:
self.sync_updates()
def remove_update_handler(self, handler):
self.updates.handlers.remove(handler)
def list_update_handlers(self):
return self.updates.handlers[:]
# endregion
# Constant read
def _set_connected_and_authorized(self):
self._authorized = True
if self._recv_thread is None:
self._recv_thread = Thread(
name='ReadThread', daemon=True,
target=self._recv_thread_impl
)
self._recv_thread.start()
# By using this approach, another thread will be
# created and started upon connection to constantly read
# from the other end. Otherwise, manual calls to .receive()
# must be performed. The MtProtoSender cannot be connected,
# or an error will be thrown.
#
# This way, sending and receiving will be completely independent.
def _recv_thread_impl(self):
while self._user_connected:
try:
if datetime.now() > self._last_ping + self._ping_delay:
self._sender.send(PingRequest(
int.from_bytes(os.urandom(8), 'big', signed=True)
))
self._last_ping = datetime.now()
self._sender.receive(update_state=self.updates)
except TimeoutError:
# No problem.
pass
except ConnectionResetError:
self._logger.debug('Server disconnected us. Reconnecting...')
while self._user_connected and not self._reconnect():
sleep(0.1) # Retry forever, this is instant messaging
except Exception as e:
# Unknown exception, pass it to the main thread
self.updates.set_error(e)
break
self._recv_thread = None
# endregion