Merge pull request #277 from LonamiWebs/many-connections

This commit is contained in:
Lonami 2017-09-30 18:00:48 +02:00 committed by GitHub
commit 4685cda751
6 changed files with 461 additions and 435 deletions

View File

@ -10,7 +10,7 @@ from ..errors import CdnFileTamperedError
class CdnDecrypter:
"""Used when downloading a file results in a 'FileCdnRedirect' to
both prepare the redirect, decrypt the file as it downloads, and
ensure the file hasn't been tampered.
ensure the file hasn't been tampered. https://core.telegram.org/cdn
"""
def __init__(self, cdn_client, file_token, cdn_aes, cdn_file_hashes):
self.client = cdn_client
@ -19,46 +19,26 @@ class CdnDecrypter:
self.cdn_file_hashes = cdn_file_hashes
@staticmethod
def prepare_decrypter(client, client_cls, cdn_redirect):
def prepare_decrypter(client, cdn_client, cdn_redirect):
"""Prepares a CDN decrypter, returning (decrypter, file data).
'client' should be the original TelegramBareClient that
tried to download the file.
'client_cls' should be the class of the TelegramBareClient.
'client' should be an existing client not connected to a CDN.
'cdn_client' should be an already-connected TelegramBareClient
with the auth key already created.
"""
# TODO Avoid the need for 'client_cls=TelegramBareClient'
# https://core.telegram.org/cdn
cdn_aes = AESModeCTR(
key=cdn_redirect.encryption_key,
# 12 first bytes of the IV..4 bytes of the offset (0, big endian)
iv=cdn_redirect.encryption_iv[:12] + bytes(4)
)
# Create a new client on said CDN
dc = client._get_dc(cdn_redirect.dc_id, cdn=True)
session = Session(client.session)
session.server_address = dc.ip_address
session.port = dc.port
cdn_client = client_cls( # Avoid importing TelegramBareClient
session, client.api_id, client.api_hash,
timeout=client._sender.connection.get_timeout()
)
# This will make use of the new RSA keys for this specific CDN.
#
# We assume that cdn_redirect.cdn_file_hashes are ordered by offset,
# and that there will be enough of these to retrieve the whole file.
#
# This relies on the fact that TelegramBareClient._dc_options is
# static and it won't be called from this DC (it would fail).
cdn_client.connect()
# CDN client is ready, create the resulting CdnDecrypter
decrypter = CdnDecrypter(
cdn_client, cdn_redirect.file_token,
cdn_aes, cdn_redirect.cdn_file_hashes
)
cdn_file = client(GetCdnFileRequest(
cdn_file = cdn_client(GetCdnFileRequest(
file_token=cdn_redirect.file_token,
offset=cdn_redirect.cdn_file_hashes[0].offset,
limit=cdn_redirect.cdn_file_hashes[0].limit

View File

@ -130,6 +130,13 @@ class Connection:
def close(self):
self.conn.close()
def clone(self):
"""Creates a copy of this Connection"""
return Connection(self.ip, self.port,
mode=self._mode,
proxy=self.conn.proxy,
timeout=self.conn.timeout)
# region Receive message implementations
def recv(self):

View File

@ -1,7 +1,6 @@
import gzip
import logging
import struct
from threading import RLock
from .. import helpers as utils
from ..crypto import AES
@ -20,7 +19,12 @@ logging.getLogger(__name__).addHandler(logging.NullHandler())
class MtProtoSender:
"""MTProto Mobile Protocol sender
(https://core.telegram.org/mtproto/description)
(https://core.telegram.org/mtproto/description).
Note that this class is not thread-safe, and calling send/receive
from two or more threads at the same time is undefined behaviour.
Rationale: a new connection should be spawned to send/receive requests
in parallel, so thread-safety (hence locking) isn't needed.
"""
def __init__(self, session, connection):
@ -37,11 +41,6 @@ class MtProtoSender:
# Requests (as msg_id: Message) sent waiting to be received
self._pending_receive = {}
# Sending and receiving are independent, but two threads cannot
# send or receive at the same time no matter what.
self._send_lock = RLock()
self._recv_lock = RLock()
def connect(self):
"""Connects to the server"""
self.connection.connect()
@ -55,6 +54,10 @@ class MtProtoSender:
self._need_confirmation.clear()
self._clear_all_pending()
def clone(self):
"""Creates a copy of this MtProtoSender as a new connection"""
return MtProtoSender(self.session, self.connection.clone())
# region Send and receive
def send(self, *requests):
@ -93,19 +96,18 @@ class MtProtoSender:
Any unhandled object (likely updates) will be passed to
update_state.process(TLObject).
"""
with self._recv_lock:
try:
body = self.connection.recv()
except (BufferError, InvalidChecksumError):
# TODO BufferError, we should spot the cause...
# "No more bytes left"; something wrong happened, clear
# everything to be on the safe side, or:
#
# "This packet should be skipped"; since this may have
# been a result for a request, invalidate every request
# and just re-invoke them to avoid problems
self._clear_all_pending()
return
try:
body = self.connection.recv()
except (BufferError, InvalidChecksumError):
# TODO BufferError, we should spot the cause...
# "No more bytes left"; something wrong happened, clear
# everything to be on the safe side, or:
#
# "This packet should be skipped"; since this may have
# been a result for a request, invalidate every request
# and just re-invoke them to avoid problems
self._clear_all_pending()
return
message, remote_msg_id, remote_seq = self._decode_msg(body)
with BinaryReader(message) as reader:
@ -128,8 +130,7 @@ class MtProtoSender:
cipher_text = AES.encrypt_ige(plain_text, key, iv)
result = key_id + msg_key + cipher_text
with self._send_lock:
self.connection.send(result)
self.connection.send(result)
def _decode_msg(self, body):
"""Decodes an received encrypted message body bytes"""

View File

@ -1,21 +1,24 @@
import logging
from datetime import timedelta
import os
import threading
from datetime import timedelta, datetime
from hashlib import md5
from io import BytesIO
from os import path
from threading import Lock
from time import sleep
from . import helpers as utils
from .crypto import rsa, CdnDecrypter
from .errors import (
RPCError, BrokenAuthKeyError,
FloodWaitError, FileMigrateError, TypeNotFoundError
FloodWaitError, FileMigrateError, TypeNotFoundError,
UnauthorizedError, PhoneMigrateError, NetworkMigrateError, UserMigrateError
)
from .network import authenticator, MtProtoSender, Connection, ConnectionMode
from .tl import TLObject, Session
from .tl.all_tlobjects import LAYER
from .tl.functions import (
InitConnectionRequest, InvokeWithLayerRequest
InitConnectionRequest, InvokeWithLayerRequest, PingRequest
)
from .tl.functions.auth import (
ImportAuthorizationRequest, ExportAuthorizationRequest
@ -23,6 +26,7 @@ from .tl.functions.auth import (
from .tl.functions.help import (
GetCdnConfigRequest, GetConfigRequest
)
from .tl.functions.updates import GetStateRequest
from .tl.functions.upload import (
GetFileRequest, SaveBigFilePartRequest, SaveFilePartRequest
)
@ -63,18 +67,34 @@ class TelegramBareClient:
def __init__(self, session, api_id, api_hash,
connection_mode=ConnectionMode.TCP_FULL,
proxy=None,
process_updates=False,
timeout=timedelta(seconds=5)):
"""Initializes the Telegram client with the specified API ID and Hash.
Session must always be a Session instance, and an optional proxy
can also be specified to be used on the connection.
"""
update_workers=None,
spawn_read_thread=False,
timeout=timedelta(seconds=5),
**kwargs):
"""Refer to TelegramClient.__init__ for docs on this method"""
if not api_id or not api_hash:
raise PermissionError(
"Your API ID or Hash cannot be empty or None. "
"Refer to Telethon's README.rst for more information.")
# Determine what session object we have
if isinstance(session, str) or session is None:
session = Session.try_load_or_create_new(session)
elif not isinstance(session, Session):
raise ValueError(
'The given session must be a str or a Session instance.'
)
self.session = session
self.api_id = int(api_id)
self.api_hash = api_hash
if self.api_id < 20: # official apps must use obfuscated
connection_mode = ConnectionMode.TCP_OBFUSCATED
# This is the main sender, which will be used from the thread
# that calls .connect(). Every other thread will spawn a new
# temporary connection. The connection on this one is always
# kept open so Telegram can send us updates.
self._sender = MtProtoSender(self.session, Connection(
self.session.server_address, self.session.port,
mode=connection_mode, proxy=proxy, timeout=timeout
@ -86,28 +106,76 @@ class TelegramBareClient:
# we only want one to actually perform the reconnection.
self._connect_lock = Lock()
# Cache "exported" senders 'dc_id: TelegramBareClient' and
# their corresponding sessions not to recreate them all
# the time since it's a (somewhat expensive) process.
self._cached_clients = {}
# Cache "exported" sessions as 'dc_id: Session' not to recreate
# them all the time since generating a new key is a relatively
# expensive operation.
self._exported_sessions = {}
# This member will process updates if enabled.
# One may change self.updates.enabled at any later point.
self.updates = UpdateState(process_updates)
self.updates = UpdateState(workers=update_workers)
# Used on connection - the user may modify these and reconnect
kwargs['app_version'] = kwargs.get('app_version', self.__version__)
for name, value in kwargs.items():
if hasattr(self.session, name):
setattr(self.session, name, value)
# Despite the state of the real connection, keep track of whether
# the user has explicitly called .connect() or .disconnect() here.
# This information is required by the read thread, who will be the
# one attempting to reconnect on the background *while* the user
# doesn't explicitly call .disconnect(), thus telling it to stop
# retrying. The main thread, knowing there is a background thread
# attempting reconnection as soon as it happens, will just sleep.
self._user_connected = False
# Save whether the user is authorized here (a.k.a. logged in)
self._authorized = False
# Uploaded files cache so subsequent calls are instant
self._upload_cache = {}
# Constantly read for results and updates from within the main client,
# if the user has left enabled such option.
self._spawn_read_thread = spawn_read_thread
self._recv_thread = None
# Identifier of the main thread (the one that called .connect()).
# This will be used to create new connections from any other thread,
# so that requests can be sent in parallel.
self._main_thread_ident = None
# Default PingRequest delay
self._last_ping = datetime.now()
self._ping_delay = timedelta(minutes=1)
# endregion
# region Connecting
def connect(self, exported_auth=None):
def connect(self, _exported_auth=None, _sync_updates=True, _cdn=False):
"""Connects to the Telegram servers, executing authentication if
required. Note that authenticating to the Telegram servers is
not the same as authenticating the desired user itself, which
may require a call (or several) to 'sign_in' for the first time.
If 'exported_auth' is not None, it will be used instead to
Note that the optional parameters are meant for internal use.
If '_exported_auth' is not None, it will be used instead to
determine the authorization key for the current session.
If '_sync_updates', sync_updates() will be called and a
second thread will be started if necessary. Note that this
will FAIL if the client is not connected to the user's
native data center, raising a "UserMigrateError", and
calling .disconnect() in the process.
If '_cdn' is False, methods that are not allowed on such data
centers won't be invoked.
"""
self._main_thread_ident = threading.get_ident()
try:
self._sender.connect()
if not self.session.auth_key:
@ -126,30 +194,46 @@ class TelegramBareClient:
init_connection = self.session.layer != LAYER
if init_connection:
if exported_auth is not None:
if _exported_auth is not None:
self._init_connection(ImportAuthorizationRequest(
exported_auth.id, exported_auth.bytes
_exported_auth.id, _exported_auth.bytes
))
else:
elif not _cdn:
TelegramBareClient._dc_options = \
self._init_connection(GetConfigRequest()).dc_options
elif exported_auth is not None:
elif _exported_auth is not None:
self(ImportAuthorizationRequest(
exported_auth.id, exported_auth.bytes
_exported_auth.id, _exported_auth.bytes
))
if TelegramBareClient._dc_options is None:
if TelegramBareClient._dc_options is None and not _cdn:
TelegramBareClient._dc_options = \
self(GetConfigRequest()).dc_options
# Connection was successful! Try syncing the update state
# UNLESS '_sync_updates' is False (we probably are in
# another data center and this would raise UserMigrateError)
# to also assert whether the user is logged in or not.
self._user_connected = True
if _sync_updates and not _cdn:
try:
self.sync_updates()
self._set_connected_and_authorized()
except UnauthorizedError:
self._authorized = False
return True
except TypeNotFoundError as e:
# This is fine, probably layer migration
self._logger.debug('Found invalid item, probably migrating', e)
self.disconnect()
return self.connect(exported_auth=exported_auth)
return self.connect(
_exported_auth=_exported_auth,
_sync_updates=_sync_updates,
_cdn=_cdn
)
except (RPCError, ConnectionError) as error:
# Probably errors from the previous session, ignore them
@ -178,9 +262,20 @@ class TelegramBareClient:
return result
def disconnect(self):
"""Disconnects from the Telegram server"""
"""Disconnects from the Telegram server
and stops all the spawned threads"""
self._user_connected = False
self._recv_thread = None
# This will trigger a "ConnectionResetError", for subsequent calls
# to read or send (from another thread) and usually, the background
# thread would try restarting the connection but since the
# ._recv_thread = None, it knows it doesn't have to.
self._sender.disconnect()
# TODO Shall we clear the _exported_sessions, or may be reused?
pass
def _reconnect(self, new_dc=None):
"""If 'new_dc' is not set, only a call to .connect() will be made
since it's assumed that the connection has been lost and the
@ -210,7 +305,11 @@ class TelegramBareClient:
# endregion
# region Working with different Data Centers
# region Working with different connections/Data Centers
def _on_read_thread(self):
return self._recv_thread is not None and \
threading.get_ident() == self._recv_thread.ident
def _get_dc(self, dc_id, ipv6=False, cdn=False):
"""Gets the Data Center (DC) associated to 'dc_id'"""
@ -237,30 +336,23 @@ class TelegramBareClient:
TelegramBareClient._dc_options = self(GetConfigRequest()).dc_options
return self._get_dc(dc_id, ipv6=ipv6, cdn=cdn)
def _get_exported_client(self, dc_id,
init_connection=False,
bypass_cache=False):
"""Gets a cached exported TelegramBareClient for the desired DC.
def _get_exported_client(self, dc_id):
"""Creates and connects a new TelegramBareClient for the desired DC.
If it's the first time retrieving the TelegramBareClient, the
current authorization is exported to the new DC so that
it can be used there, and the connection is initialized.
If after using the sender a ConnectionResetError is raised,
this method should be called again with init_connection=True
in order to perform the reconnection.
If bypass_cache is True, a new client will be exported and
it will not be cached.
If it's the first time calling the method with a given dc_id,
a new session will be first created, and its auth key generated.
Exporting/Importing the authorization will also be done so that
the auth is bound with the key.
"""
# Thanks badoualy/kotlogram on /telegram/api/DefaultTelegramClient.kt
# for clearly showing how to export the authorization! ^^
client = self._cached_clients.get(dc_id)
if client and not bypass_cache:
if init_connection:
client.reconnect()
return client
session = self._exported_sessions.get(dc_id)
if session:
export_auth = None # Already bound with the auth key
else:
# TODO Add a lock, don't allow two threads to create an auth key
# (when calling .connect() if there wasn't a previous session).
# for the same data center.
dc = self._get_dc(dc_id)
# Export the current authorization to the new DC.
@ -274,46 +366,90 @@ class TelegramBareClient:
session = Session(self.session)
session.server_address = dc.ip_address
session.port = dc.port
client = TelegramBareClient(
session, self.api_id, self.api_hash,
proxy=self._sender.connection.conn.proxy,
timeout=self._sender.connection.get_timeout()
)
client.connect(exported_auth=export_auth)
self._exported_sessions[dc_id] = session
if not bypass_cache:
# Don't go through this expensive process every time.
self._cached_clients[dc_id] = client
return client
client = TelegramBareClient(
session, self.api_id, self.api_hash,
proxy=self._sender.connection.conn.proxy,
timeout=self._sender.connection.get_timeout()
)
client.connect(_exported_auth=export_auth, _sync_updates=False)
return client
def _get_cdn_client(self, cdn_redirect):
"""Similar to ._get_exported_client, but for CDNs"""
session = self._exported_sessions.get(cdn_redirect.dc_id)
if not session:
dc = self._get_dc(cdn_redirect.dc_id, cdn=True)
session = Session(self.session)
session.server_address = dc.ip_address
session.port = dc.port
self._exported_sessions[cdn_redirect.dc_id] = session
client = TelegramBareClient(
session, self.api_id, self.api_hash,
proxy=self._sender.connection.conn.proxy,
timeout=self._sender.connection.get_timeout()
)
# This will make use of the new RSA keys for this specific CDN.
#
# This relies on the fact that TelegramBareClient._dc_options is
# static and it won't be called from this DC (it would fail).
client.connect(cdn=True) # Avoid invoking non-CDN specific methods
return client
# endregion
# region Invoking Telegram requests
def invoke(self, *requests, call_receive=True, retries=5):
def invoke(self, *requests, retries=5):
"""Invokes (sends) a MTProtoRequest and returns (receives) its result.
If 'updates' is not None, all read update object will be put
in such list. Otherwise, update objects will be ignored.
If 'call_receive' is set to False, then there should be another
thread calling to 'self._sender.receive()' running or this method
will lock forever.
The invoke will be retried up to 'retries' times before raising
ValueError().
"""
# Any error from a background thread will be "posted" and checked here
self.updates.check_error()
if not all(isinstance(x, TLObject) and
x.content_related for x in requests):
raise ValueError('You can only invoke requests, not types!')
if retries <= 0:
raise ValueError('Number of retries reached 0.')
# Determine the sender to be used (main or a new connection)
on_main_thread = threading.get_ident() == self._main_thread_ident
if on_main_thread or self._on_read_thread():
sender = self._sender
else:
sender = self._sender.clone()
sender.connect()
# We should call receive from this thread if there's no background
# thread reading or if the server disconnected us and we're trying
# to reconnect. This is because the read thread may either be
# locked also trying to reconnect or we may be said thread already.
call_receive = not on_main_thread or \
self._recv_thread is None or self._connect_lock.locked()
try:
for _ in range(retries):
result = self._invoke(sender, call_receive, *requests)
if result:
return result
raise ValueError('Number of retries reached 0.')
finally:
if sender != self._sender:
sender.disconnect() # Close temporary connections
def _invoke(self, sender, call_receive, *requests):
try:
# Ensure that we start with no previous errors (i.e. resending)
for x in requests:
x.confirm_received.clear()
x.rpc_error = None
self._sender.send(*requests)
sender.send(*requests)
if not call_receive:
# TODO This will be slightly troublesome if we allow
# switching between constant read or not on the fly.
@ -321,33 +457,60 @@ class TelegramBareClient:
# in which case a Lock would be required for .receive().
for x in requests:
x.confirm_received.wait(
self._sender.connection.get_timeout()
sender.connection.get_timeout()
)
else:
while not all(x.confirm_received.is_set() for x in requests):
self._sender.receive(update_state=self.updates)
sender.receive(update_state=self.updates)
except (PhoneMigrateError, NetworkMigrateError,
UserMigrateError) as e:
self._logger.debug(
'DC error when invoking request, '
'attempting to reconnect at DC {}'.format(e.new_dc)
)
# TODO What happens with the background thread here?
# For normal use cases, this won't happen, because this will only
# be on the very first connection (not authorized, not running),
# but may be an issue for people who actually travel?
self._reconnect(new_dc=e.new_dc)
return self._invoke(sender, call_receive, *requests)
except TimeoutError:
pass # We will just retry
except ConnectionResetError:
if self._connect_lock.locked():
# We are connecting and we don't want to reconnect there...
raise
self._logger.debug('Server disconnected us. Reconnecting and '
'resending request...')
self._reconnect()
if sender != self._sender:
# TODO Try reconnecting forever too?
sender.connect()
else:
while self._user_connected and not self._reconnect():
sleep(0.1) # Retry forever until we can send the request
except FloodWaitError:
sender.disconnect()
self.disconnect()
raise
finally:
if sender != self._sender:
sender.disconnect()
try:
raise next(x.rpc_error for x in requests if x.rpc_error)
except StopIteration:
if any(x.result is None for x in requests):
# "A container may only be accepted or
# rejected by the other party as a whole."
return self.invoke(
*requests, call_receive=call_receive, retries=(retries - 1)
)
# rejected by the other party as a whole."
return None
elif len(requests) == 1:
return requests[0].result
else:
@ -356,6 +519,13 @@ class TelegramBareClient:
# Let people use client(SomeRequest()) instead client.invoke(...)
__call__ = invoke
# Some really basic functionality
def is_user_authorized(self):
"""Has the user been authorized yet
(code request sent and confirmed)?"""
return self._authorized
# endregion
# region Uploading media
@ -381,10 +551,10 @@ class TelegramBareClient:
Default values for the optional parameters if left as None are:
part_size_kb = get_appropriated_part_size(file_size)
file_name = path.basename(file_path)
file_name = os.path.basename(file_path)
"""
if isinstance(file, str):
file_size = path.getsize(file)
file_size = os.path.getsize(file)
elif isinstance(file, bytes):
file_size = len(file)
else:
@ -440,7 +610,7 @@ class TelegramBareClient:
# Set a default file name if None was specified
if not file_name:
if isinstance(file, str):
file_name = path.basename(file)
file_name = os.path.basename(file)
else:
file_name = str(file_id)
@ -509,7 +679,7 @@ class TelegramBareClient:
if isinstance(result, FileCdnRedirect):
cdn_decrypter, result = \
CdnDecrypter.prepare_decrypter(
client, TelegramBareClient, result
client, self._get_cdn_client(result), result
)
except FileMigrateError as e:
@ -528,6 +698,9 @@ class TelegramBareClient:
if progress_callback:
progress_callback(f.tell(), file_size)
finally:
if client != self:
client.disconnect()
if cdn_decrypter:
try:
cdn_decrypter.client.disconnect()
@ -537,3 +710,73 @@ class TelegramBareClient:
f.close()
# endregion
# region Updates handling
def sync_updates(self):
"""Synchronizes self.updates to their initial state. Will be
called automatically on connection if self.updates.enabled = True,
otherwise it should be called manually after enabling updates.
"""
self.updates.process(self(GetStateRequest()))
def add_update_handler(self, handler):
"""Adds an update handler (a function which takes a TLObject,
an update, as its parameter) and listens for updates"""
sync = not self.updates.handlers
self.updates.handlers.append(handler)
if sync:
self.sync_updates()
def remove_update_handler(self, handler):
self.updates.handlers.remove(handler)
def list_update_handlers(self):
return self.updates.handlers[:]
# endregion
# Constant read
def _set_connected_and_authorized(self):
self._authorized = True
if self._spawn_read_thread and self._recv_thread is None:
self._recv_thread = threading.Thread(
name='ReadThread', daemon=True,
target=self._recv_thread_impl
)
self._recv_thread.start()
# By using this approach, another thread will be
# created and started upon connection to constantly read
# from the other end. Otherwise, manual calls to .receive()
# must be performed. The MtProtoSender cannot be connected,
# or an error will be thrown.
#
# This way, sending and receiving will be completely independent.
def _recv_thread_impl(self):
while self._user_connected:
try:
if datetime.now() > self._last_ping + self._ping_delay:
self._sender.send(PingRequest(
int.from_bytes(os.urandom(8), 'big', signed=True)
))
self._last_ping = datetime.now()
self._sender.receive(update_state=self.updates)
except TimeoutError:
# No problem.
pass
except ConnectionResetError:
self._logger.debug('Server disconnected us. Reconnecting...')
while self._user_connected and not self._reconnect():
sleep(0.1) # Retry forever, this is instant messaging
except Exception as e:
# Unknown exception, pass it to the main thread
self.updates.set_error(e)
break
self._recv_thread = None
# endregion

View File

@ -1,10 +1,7 @@
import os
import threading
from datetime import datetime, timedelta
from functools import lru_cache
from mimetypes import guess_type
from threading import Thread
from time import sleep
try:
import socks
@ -15,12 +12,10 @@ from . import TelegramBareClient
from . import helpers as utils
from .errors import (
RPCError, UnauthorizedError, InvalidParameterError, PhoneCodeEmptyError,
PhoneMigrateError, NetworkMigrateError, UserMigrateError,
PhoneCodeExpiredError, PhoneCodeHashEmptyError, PhoneCodeInvalidError
)
from .network import ConnectionMode
from .tl import Session, TLObject
from .tl.functions import PingRequest
from .tl import TLObject
from .tl.functions.account import (
GetPasswordRequest
)
@ -35,9 +30,6 @@ from .tl.functions.messages import (
GetDialogsRequest, GetHistoryRequest, ReadHistoryRequest, SendMediaRequest,
SendMessageRequest
)
from .tl.functions.updates import (
GetStateRequest
)
from .tl.functions.users import (
GetUsersRequest
)
@ -65,8 +57,9 @@ class TelegramClient(TelegramBareClient):
def __init__(self, session, api_id, api_hash,
connection_mode=ConnectionMode.TCP_FULL,
proxy=None,
process_updates=False,
update_workers=None,
timeout=timedelta(seconds=5),
spawn_read_thread=True,
**kwargs):
"""Initializes the Telegram client with the specified API ID and Hash.
@ -79,15 +72,21 @@ class TelegramClient(TelegramBareClient):
This will only affect how messages are sent over the network
and how much processing is required before sending them.
If 'process_updates' is set to True, incoming updates will be
processed and you must manually call 'self.updates.poll()' from
another thread to retrieve the saved update objects, or your
memory will fill with these. You may modify the value of
'self.updates.polling' at any later point.
The integer 'update_workers' represents depending on its value:
is None: Updates will *not* be stored in memory.
= 0: Another thread is responsible for calling self.updates.poll()
> 0: 'update_workers' background threads will be spawned, any
any of them will invoke all the self.updates.handlers.
Despite the value of 'process_updates', if you later call
'.add_update_handler(...)', updates will also be processed
and the update objects will be passed to the handlers you added.
If 'spawn_read_thread', a background thread will be started once
an authorized user has been logged in to Telegram to read items
(such as updates and responses) from the network as soon as they
occur, which will speed things up.
If you don't want to spawn any additional threads, pending updates
will be read and processed accordingly after invoking a request
and not immediately. This is useful if you don't care about updates
at all and have set 'update_workers=None'.
If more named arguments are provided as **kwargs, they will be
used to update the Session instance. Most common settings are:
@ -98,221 +97,25 @@ class TelegramClient(TelegramBareClient):
system_lang_code = lang_code
report_errors = True
"""
if not api_id or not api_hash:
raise PermissionError(
"Your API ID or Hash cannot be empty or None. "
"Refer to Telethon's README.rst for more information.")
# Determine what session object we have
if isinstance(session, str) or session is None:
session = Session.try_load_or_create_new(session)
elif not isinstance(session, Session):
raise ValueError(
'The given session must be a str or a Session instance.')
super().__init__(
session, api_id, api_hash,
connection_mode=connection_mode,
proxy=proxy,
process_updates=process_updates,
update_workers=update_workers,
spawn_read_thread=spawn_read_thread,
timeout=timeout
)
# Used on connection - the user may modify these and reconnect
kwargs['app_version'] = kwargs.get('app_version', self.__version__)
for name, value in kwargs.items():
if hasattr(self.session, name):
setattr(self.session, name, value)
self._updates_thread = None
# Some fields to easy signing in
self._phone_code_hash = None
self._phone = None
# Despite the state of the real connection, keep track of whether
# the user has explicitly called .connect() or .disconnect() here.
# This information is required by the read thread, who will be the
# one attempting to reconnect on the background *while* the user
# doesn't explicitly call .disconnect(), thus telling it to stop
# retrying. The main thread, knowing there is a background thread
# attempting reconnection as soon as it happens, will just sleep.
self._user_connected = False
# Save whether the user is authorized here (a.k.a. logged in)
self._authorized = False
# Uploaded files cache so subsequent calls are instant
self._upload_cache = {}
# Constantly read for results and updates from within the main client
self._recv_thread = None
# Default PingRequest delay
self._last_ping = datetime.now()
self._ping_delay = timedelta(minutes=1)
# endregion
# region Connecting
def connect(self, exported_auth=None):
"""Connects to the Telegram servers, executing authentication if
required. Note that authenticating to the Telegram servers is
not the same as authenticating the desired user itself, which
may require a call (or several) to 'sign_in' for the first time.
exported_auth is meant for internal purposes and can be ignored.
"""
if socks and self._recv_thread:
# Treat proxy errors specially since they're not related to
# Telegram itself, but rather to the proxy. If any happens on
# the read thread forward it to the main thread.
try:
ok = super().connect(exported_auth=exported_auth)
except socks.ProxyConnectionError as e:
ok = False
# Report the exception to the main thread
self.updates.set_error(e)
else:
ok = super().connect(exported_auth=exported_auth)
if not ok:
return False
self._user_connected = True
try:
self.sync_updates()
self._set_connected_and_authorized()
except UnauthorizedError:
self._authorized = False
return True
def disconnect(self):
"""Disconnects from the Telegram server
and stops all the spawned threads"""
self._user_connected = False
self._recv_thread = None
# This will trigger a "ConnectionResetError", usually, the background
# thread would try restarting the connection but since the
# ._recv_thread = None, it knows it doesn't have to.
super().disconnect()
# Also disconnect all the cached senders
for sender in self._cached_clients.values():
sender.disconnect()
self._cached_clients.clear()
# endregion
# region Working with different connections
def _on_read_thread(self):
return self._recv_thread is not None and \
threading.get_ident() == self._recv_thread.ident
def create_new_connection(self, on_dc=None, timeout=timedelta(seconds=5)):
"""Creates a new connection which can be used in parallel
with the original TelegramClient. A TelegramBareClient
will be returned already connected, and the caller is
responsible to disconnect it.
If 'on_dc' is None, the new client will run on the same
data center as the current client (most common case).
If the client is meant to be used on a different data
center, the data center ID should be specified instead.
"""
if on_dc is None:
client = TelegramBareClient(
self.session, self.api_id, self.api_hash,
proxy=self._sender.connection.conn.proxy, timeout=timeout
)
client.connect()
else:
client = self._get_exported_client(on_dc, bypass_cache=True)
return client
# endregion
# region Telegram requests functions
def invoke(self, *requests, **kwargs):
"""Invokes (sends) one or several MTProtoRequest and returns
(receives) their result. An optional named 'retries' parameter
can be used, indicating how many times it should retry.
"""
# This is only valid when the read thread is reconnecting,
# that is, the connection lock is locked.
if self._on_read_thread() and not self._connect_lock.locked():
return # Just ignore, we would be raising and crashing the thread
self.updates.check_error()
try:
# We should call receive from this thread if there's no background
# thread reading or if the server disconnected us and we're trying
# to reconnect. This is because the read thread may either be
# locked also trying to reconnect or we may be said thread already.
call_receive = \
self._recv_thread is None or self._connect_lock.locked()
return super().invoke(
*requests,
call_receive=call_receive,
retries=kwargs.get('retries', 5)
)
except (PhoneMigrateError, NetworkMigrateError, UserMigrateError) as e:
self._logger.debug('DC error when invoking request, '
'attempting to reconnect at DC {}'
.format(e.new_dc))
# TODO What happens with the background thread here?
# For normal use cases, this won't happen, because this will only
# be on the very first connection (not authorized, not running),
# but may be an issue for people who actually travel?
self._reconnect(new_dc=e.new_dc)
return self.invoke(*requests)
except ConnectionResetError as e:
if self._connect_lock.locked():
# We are connecting and we don't want to reconnect there...
raise
while self._user_connected and not self._reconnect():
sleep(0.1) # Retry forever until we can send the request
# Let people use client(SomeRequest()) instead client.invoke(...)
__call__ = invoke
def invoke_on_dc(self, request, dc_id, reconnect=False):
"""Invokes the given request on a different DC
by making use of the exported MtProtoSenders.
If 'reconnect=True', then the a reconnection will be performed and
ConnectionResetError will be raised if it occurs a second time.
"""
try:
client = self._get_exported_client(
dc_id, init_connection=reconnect)
return client.invoke(request)
except ConnectionResetError:
if reconnect:
raise
else:
return self.invoke_on_dc(request, dc_id, reconnect=True)
# region Authorization requests
def is_user_authorized(self):
"""Has the user been authorized yet
(code request sent and confirmed)?"""
return self._authorized
def send_code_request(self, phone):
"""Sends a code request to the specified phone number"""
if isinstance(phone, int):
@ -1006,73 +809,3 @@ class TelegramClient(TelegramBareClient):
)
# endregion
# region Updates handling
def sync_updates(self):
"""Synchronizes self.updates to their initial state. Will be
called automatically on connection if self.updates.enabled = True,
otherwise it should be called manually after enabling updates.
"""
self.updates.process(self(GetStateRequest()))
def add_update_handler(self, handler):
"""Adds an update handler (a function which takes a TLObject,
an update, as its parameter) and listens for updates"""
sync = not self.updates.handlers
self.updates.handlers.append(handler)
if sync:
self.sync_updates()
def remove_update_handler(self, handler):
self.updates.handlers.remove(handler)
def list_update_handlers(self):
return self.updates.handlers[:]
# endregion
# Constant read
def _set_connected_and_authorized(self):
self._authorized = True
if self._recv_thread is None:
self._recv_thread = Thread(
name='ReadThread', daemon=True,
target=self._recv_thread_impl
)
self._recv_thread.start()
# By using this approach, another thread will be
# created and started upon connection to constantly read
# from the other end. Otherwise, manual calls to .receive()
# must be performed. The MtProtoSender cannot be connected,
# or an error will be thrown.
#
# This way, sending and receiving will be completely independent.
def _recv_thread_impl(self):
while self._user_connected:
try:
if datetime.now() > self._last_ping + self._ping_delay:
self._sender.send(PingRequest(
int.from_bytes(os.urandom(8), 'big', signed=True)
))
self._last_ping = datetime.now()
self._sender.receive(update_state=self.updates)
except TimeoutError:
# No problem.
pass
except ConnectionResetError:
self._logger.debug('Server disconnected us. Reconnecting...')
while self._user_connected and not self._reconnect():
sleep(0.1) # Retry forever, this is instant messaging
except Exception as e:
# Unknown exception, pass it to the main thread
self.updates.set_error(e)
break
self._recv_thread = None
# endregion

View File

@ -1,6 +1,7 @@
import logging
from collections import deque
from datetime import datetime
from threading import RLock, Event
from threading import RLock, Event, Thread
from .tl import types as tl
@ -9,27 +10,46 @@ class UpdateState:
"""Used to hold the current state of processed updates.
To retrieve an update, .poll() should be called.
"""
def __init__(self, polling):
self._polling = polling
WORKER_POLL_TIMEOUT = 5.0 # Avoid waiting forever on the workers
def __init__(self, workers=None):
"""
:param workers: This integer parameter has three possible cases:
workers is None: Updates will *not* be stored on self.
workers = 0: Another thread is responsible for calling self.poll()
workers > 0: 'workers' background threads will be spawned, any
any of them will invoke all the self.handlers.
"""
self._workers = workers
self._worker_threads = []
self.handlers = []
self._updates_lock = RLock()
self._updates_available = Event()
self._updates = deque()
self._logger = logging.getLogger(__name__)
# https://core.telegram.org/api/updates
self._state = tl.updates.State(0, 0, datetime.now(), 0, 0)
self._setup_workers()
def can_poll(self):
"""Returns True if a call to .poll() won't lock"""
return self._updates_available.is_set()
def poll(self):
"""Polls an update or blocks until an update object is available"""
if not self._polling:
raise ValueError('Updates are not being polled hence not saved.')
def poll(self, timeout=None):
"""Polls an update or blocks until an update object is available.
If 'timeout is not None', it should be a floating point value,
and the method will 'return None' if waiting times out.
"""
if not self._updates_available.wait(timeout=timeout):
return
self._updates_available.wait()
with self._updates_lock:
if not self._updates_available.is_set():
return
update = self._updates.popleft()
if not self._updates:
self._updates_available.clear()
@ -39,16 +59,62 @@ class UpdateState:
return update
def get_polling(self):
return self._polling
def get_workers(self):
return self._workers
def set_polling(self, polling):
self._polling = polling
if not polling:
with self._updates_lock:
self._updates.clear()
def set_workers(self, n):
"""Changes the number of workers running.
If 'n is None', clears all pending updates from memory.
"""
self._stop_workers()
self._workers = n
if n is None:
self._updates.clear()
else:
self._setup_workers()
polling = property(fget=get_polling, fset=set_polling)
workers = property(fget=get_workers, fset=set_workers)
def _stop_workers(self):
"""Raises "StopIterationException" on the worker threads to stop them,
and also clears all of them off the list
"""
self.set_error(StopIteration())
for t in self._worker_threads:
t.join()
self._worker_threads.clear()
def _setup_workers(self):
if self._worker_threads or not self._workers:
# There already are workers, or workers is None or 0. Do nothing.
return
for i in range(self._workers):
thread = Thread(
target=UpdateState._worker_loop,
name='UpdateWorker{}'.format(i),
daemon=True,
args=(self, i)
)
self._worker_threads.append(thread)
thread.start()
def _worker_loop(self, wid):
while True:
try:
update = self.poll(timeout=UpdateState.WORKER_POLL_TIMEOUT)
# TODO Maybe people can add different handlers per update type
if update:
for handler in self.handlers:
handler(update)
except StopIteration:
break
except Exception as e:
# We don't want to crash a worker thread due to any reason
self._logger.debug(
'[ERROR] Unhandled exception on worker {}'.format(wid), e
)
def set_error(self, error):
"""Sets an error, so that the next call to .poll() will raise it.
@ -69,8 +135,8 @@ class UpdateState:
"""Processes an update object. This method is normally called by
the library itself.
"""
if not self._polling and not self.handlers:
return
if self._workers is None:
return # No processing needs to be done if nobody's working
with self._updates_lock:
if isinstance(update, tl.updates.State):
@ -82,9 +148,5 @@ class UpdateState:
return # We already handled this update
self._state.pts = pts
if self._polling:
self._updates.append(update)
self._updates_available.set()
for handler in self.handlers:
handler(update)
self._updates.append(update)
self._updates_available.set()