2017-06-08 14:12:57 +03:00
|
|
|
import logging
|
2017-09-29 21:50:27 +03:00
|
|
|
import os
|
|
|
|
import threading
|
2017-10-07 18:55:37 +03:00
|
|
|
import warnings
|
2017-09-29 21:50:27 +03:00
|
|
|
from datetime import timedelta, datetime
|
2017-06-08 14:12:57 +03:00
|
|
|
from hashlib import md5
|
2017-08-23 02:43:08 +03:00
|
|
|
from io import BytesIO
|
2017-10-22 14:15:52 +03:00
|
|
|
from signal import signal, SIGINT, SIGTERM, SIGABRT
|
2017-09-22 13:44:09 +03:00
|
|
|
from threading import Lock
|
2017-09-29 21:50:27 +03:00
|
|
|
from time import sleep
|
2017-06-08 14:12:57 +03:00
|
|
|
|
|
|
|
from . import helpers as utils
|
2017-09-04 18:18:33 +03:00
|
|
|
from .crypto import rsa, CdnDecrypter
|
2017-07-26 17:10:45 +03:00
|
|
|
from .errors import (
|
2017-09-30 19:51:25 +03:00
|
|
|
RPCError, BrokenAuthKeyError, ServerError,
|
2017-09-29 21:50:27 +03:00
|
|
|
FloodWaitError, FileMigrateError, TypeNotFoundError,
|
|
|
|
UnauthorizedError, PhoneMigrateError, NetworkMigrateError, UserMigrateError
|
2017-07-26 17:10:45 +03:00
|
|
|
)
|
2017-09-04 12:24:10 +03:00
|
|
|
from .network import authenticator, MtProtoSender, Connection, ConnectionMode
|
2017-08-29 16:59:08 +03:00
|
|
|
from .tl import TLObject, Session
|
2017-09-17 17:17:55 +03:00
|
|
|
from .tl.all_tlobjects import LAYER
|
2017-08-24 14:02:48 +03:00
|
|
|
from .tl.functions import (
|
2017-09-29 21:50:27 +03:00
|
|
|
InitConnectionRequest, InvokeWithLayerRequest, PingRequest
|
2017-08-24 14:02:48 +03:00
|
|
|
)
|
2017-07-04 11:21:15 +03:00
|
|
|
from .tl.functions.auth import (
|
|
|
|
ImportAuthorizationRequest, ExportAuthorizationRequest
|
|
|
|
)
|
2017-09-04 18:18:33 +03:00
|
|
|
from .tl.functions.help import (
|
|
|
|
GetCdnConfigRequest, GetConfigRequest
|
|
|
|
)
|
2017-09-29 21:50:27 +03:00
|
|
|
from .tl.functions.updates import GetStateRequest
|
2017-06-08 14:12:57 +03:00
|
|
|
from .tl.functions.upload import (
|
2017-09-04 18:18:33 +03:00
|
|
|
GetFileRequest, SaveBigFilePartRequest, SaveFilePartRequest
|
2017-07-04 11:21:15 +03:00
|
|
|
)
|
2017-06-08 14:12:57 +03:00
|
|
|
from .tl.types import InputFile, InputFileBig
|
2017-10-24 16:40:51 +03:00
|
|
|
from .tl.types.auth import ExportedAuthorization
|
2017-09-04 18:18:33 +03:00
|
|
|
from .tl.types.upload import FileCdnRedirect
|
2017-09-07 19:49:08 +03:00
|
|
|
from .update_state import UpdateState
|
2017-09-04 18:18:33 +03:00
|
|
|
from .utils import get_appropriated_part_size
|
2017-06-08 14:12:57 +03:00
|
|
|
|
|
|
|
|
|
|
|
class TelegramBareClient:
|
|
|
|
"""Bare Telegram Client with just the minimum -
|
|
|
|
|
|
|
|
The reason to distinguish between a MtProtoSender and a
|
|
|
|
TelegramClient itself is because the sender is just that,
|
|
|
|
a sender, which should know nothing about Telegram but
|
|
|
|
rather how to handle this specific connection.
|
|
|
|
|
|
|
|
The TelegramClient itself should know how to initialize
|
|
|
|
a proper connection to the servers, as well as other basic
|
|
|
|
methods such as disconnection and reconnection.
|
|
|
|
|
|
|
|
This distinction between a bare client and a full client
|
|
|
|
makes it possible to create clones of the bare version
|
|
|
|
(by using the same session, IP address and port) to be
|
|
|
|
able to execute queries on either, without the additional
|
|
|
|
cost that would involve having the methods for signing in,
|
|
|
|
logging out, and such.
|
|
|
|
"""
|
|
|
|
|
|
|
|
# Current TelegramClient version
|
2017-10-21 00:33:08 +03:00
|
|
|
__version__ = '0.15.3'
|
2017-06-08 14:12:57 +03:00
|
|
|
|
2017-09-17 15:30:23 +03:00
|
|
|
# TODO Make this thread-safe, all connections share the same DC
|
2017-10-24 16:40:51 +03:00
|
|
|
_config = None # Server configuration (with .dc_options)
|
2017-09-17 15:30:23 +03:00
|
|
|
|
2017-06-08 14:12:57 +03:00
|
|
|
# region Initialization
|
|
|
|
|
2017-06-22 12:43:42 +03:00
|
|
|
def __init__(self, session, api_id, api_hash,
|
2017-09-04 12:24:10 +03:00
|
|
|
connection_mode=ConnectionMode.TCP_FULL,
|
2017-09-07 19:49:08 +03:00
|
|
|
proxy=None,
|
2017-09-30 12:17:31 +03:00
|
|
|
update_workers=None,
|
2017-09-30 16:53:47 +03:00
|
|
|
spawn_read_thread=False,
|
2017-09-29 21:50:27 +03:00
|
|
|
timeout=timedelta(seconds=5),
|
|
|
|
**kwargs):
|
|
|
|
"""Refer to TelegramClient.__init__ for docs on this method"""
|
|
|
|
if not api_id or not api_hash:
|
|
|
|
raise PermissionError(
|
|
|
|
"Your API ID or Hash cannot be empty or None. "
|
|
|
|
"Refer to Telethon's README.rst for more information.")
|
|
|
|
|
|
|
|
# Determine what session object we have
|
|
|
|
if isinstance(session, str) or session is None:
|
|
|
|
session = Session.try_load_or_create_new(session)
|
|
|
|
elif not isinstance(session, Session):
|
|
|
|
raise ValueError(
|
|
|
|
'The given session must be a str or a Session instance.'
|
|
|
|
)
|
|
|
|
|
2017-06-08 14:12:57 +03:00
|
|
|
self.session = session
|
2017-06-11 23:42:04 +03:00
|
|
|
self.api_id = int(api_id)
|
2017-06-08 14:12:57 +03:00
|
|
|
self.api_hash = api_hash
|
2017-09-15 12:49:39 +03:00
|
|
|
if self.api_id < 20: # official apps must use obfuscated
|
2017-09-21 14:43:33 +03:00
|
|
|
connection_mode = ConnectionMode.TCP_OBFUSCATED
|
|
|
|
|
2017-09-30 12:45:35 +03:00
|
|
|
# This is the main sender, which will be used from the thread
|
|
|
|
# that calls .connect(). Every other thread will spawn a new
|
|
|
|
# temporary connection. The connection on this one is always
|
|
|
|
# kept open so Telegram can send us updates.
|
2017-09-21 14:43:33 +03:00
|
|
|
self._sender = MtProtoSender(self.session, Connection(
|
|
|
|
mode=connection_mode, proxy=proxy, timeout=timeout
|
|
|
|
))
|
|
|
|
|
2017-06-08 14:12:57 +03:00
|
|
|
self._logger = logging.getLogger(__name__)
|
|
|
|
|
2017-09-22 13:20:38 +03:00
|
|
|
# Two threads may be calling reconnect() when the connection is lost,
|
|
|
|
# we only want one to actually perform the reconnection.
|
2017-09-30 19:25:09 +03:00
|
|
|
self._reconnect_lock = Lock()
|
2017-09-22 13:20:38 +03:00
|
|
|
|
2017-09-30 17:32:10 +03:00
|
|
|
# Cache "exported" sessions as 'dc_id: Session' not to recreate
|
|
|
|
# them all the time since generating a new key is a relatively
|
|
|
|
# expensive operation.
|
|
|
|
self._exported_sessions = {}
|
2017-07-04 11:21:15 +03:00
|
|
|
|
2017-09-07 19:49:08 +03:00
|
|
|
# This member will process updates if enabled.
|
|
|
|
# One may change self.updates.enabled at any later point.
|
2017-09-30 12:17:31 +03:00
|
|
|
self.updates = UpdateState(workers=update_workers)
|
2017-09-02 22:45:27 +03:00
|
|
|
|
2017-09-29 21:50:27 +03:00
|
|
|
# Used on connection - the user may modify these and reconnect
|
|
|
|
kwargs['app_version'] = kwargs.get('app_version', self.__version__)
|
|
|
|
for name, value in kwargs.items():
|
2017-09-30 19:45:56 +03:00
|
|
|
if not hasattr(self.session, name):
|
|
|
|
raise ValueError('Unknown named parameter', name)
|
|
|
|
setattr(self.session, name, value)
|
2017-09-29 21:50:27 +03:00
|
|
|
|
|
|
|
# Despite the state of the real connection, keep track of whether
|
|
|
|
# the user has explicitly called .connect() or .disconnect() here.
|
|
|
|
# This information is required by the read thread, who will be the
|
|
|
|
# one attempting to reconnect on the background *while* the user
|
|
|
|
# doesn't explicitly call .disconnect(), thus telling it to stop
|
|
|
|
# retrying. The main thread, knowing there is a background thread
|
|
|
|
# attempting reconnection as soon as it happens, will just sleep.
|
|
|
|
self._user_connected = False
|
|
|
|
|
|
|
|
# Save whether the user is authorized here (a.k.a. logged in)
|
2017-10-18 13:17:13 +03:00
|
|
|
self._authorized = None # None = We don't know yet
|
2017-09-29 21:50:27 +03:00
|
|
|
|
|
|
|
# Uploaded files cache so subsequent calls are instant
|
|
|
|
self._upload_cache = {}
|
|
|
|
|
2017-09-30 12:28:15 +03:00
|
|
|
# Constantly read for results and updates from within the main client,
|
|
|
|
# if the user has left enabled such option.
|
|
|
|
self._spawn_read_thread = spawn_read_thread
|
2017-09-29 21:50:27 +03:00
|
|
|
self._recv_thread = None
|
|
|
|
|
|
|
|
# Identifier of the main thread (the one that called .connect()).
|
|
|
|
# This will be used to create new connections from any other thread,
|
|
|
|
# so that requests can be sent in parallel.
|
|
|
|
self._main_thread_ident = None
|
|
|
|
|
|
|
|
# Default PingRequest delay
|
|
|
|
self._last_ping = datetime.now()
|
|
|
|
self._ping_delay = timedelta(minutes=1)
|
|
|
|
|
2017-10-04 15:09:46 +03:00
|
|
|
# Some errors are known but there's nothing we can do from the
|
|
|
|
# background thread. If any of these happens, call .disconnect(),
|
|
|
|
# and raise them next time .invoke() is tried to be called.
|
|
|
|
self._background_error = None
|
|
|
|
|
2017-06-08 14:12:57 +03:00
|
|
|
# endregion
|
|
|
|
|
|
|
|
# region Connecting
|
|
|
|
|
2017-10-24 16:40:51 +03:00
|
|
|
def connect(self, _sync_updates=True):
|
2017-06-08 14:12:57 +03:00
|
|
|
"""Connects to the Telegram servers, executing authentication if
|
|
|
|
required. Note that authenticating to the Telegram servers is
|
|
|
|
not the same as authenticating the desired user itself, which
|
|
|
|
may require a call (or several) to 'sign_in' for the first time.
|
2017-06-09 11:35:19 +03:00
|
|
|
|
2017-09-30 17:32:10 +03:00
|
|
|
Note that the optional parameters are meant for internal use.
|
|
|
|
|
|
|
|
If '_sync_updates', sync_updates() will be called and a
|
|
|
|
second thread will be started if necessary. Note that this
|
|
|
|
will FAIL if the client is not connected to the user's
|
|
|
|
native data center, raising a "UserMigrateError", and
|
|
|
|
calling .disconnect() in the process.
|
2017-06-08 14:12:57 +03:00
|
|
|
"""
|
2017-09-29 21:50:27 +03:00
|
|
|
self._main_thread_ident = threading.get_ident()
|
2017-10-04 15:09:46 +03:00
|
|
|
self._background_error = None # Clear previous errors
|
2017-09-29 21:50:27 +03:00
|
|
|
|
2017-06-08 14:12:57 +03:00
|
|
|
try:
|
2017-09-21 14:43:33 +03:00
|
|
|
self._sender.connect()
|
2017-06-08 14:12:57 +03:00
|
|
|
|
2017-09-29 21:50:27 +03:00
|
|
|
# Connection was successful! Try syncing the update state
|
2017-09-30 17:32:10 +03:00
|
|
|
# UNLESS '_sync_updates' is False (we probably are in
|
|
|
|
# another data center and this would raise UserMigrateError)
|
2017-09-29 21:50:27 +03:00
|
|
|
# to also assert whether the user is logged in or not.
|
|
|
|
self._user_connected = True
|
2017-10-24 16:40:51 +03:00
|
|
|
if self._authorized is None and _sync_updates:
|
2017-09-30 17:11:16 +03:00
|
|
|
try:
|
|
|
|
self.sync_updates()
|
|
|
|
self._set_connected_and_authorized()
|
|
|
|
except UnauthorizedError:
|
|
|
|
self._authorized = False
|
2017-10-18 15:45:08 +03:00
|
|
|
elif self._authorized:
|
|
|
|
self._set_connected_and_authorized()
|
2017-09-29 21:50:27 +03:00
|
|
|
|
2017-09-17 15:25:53 +03:00
|
|
|
return True
|
2017-06-08 14:12:57 +03:00
|
|
|
|
2017-07-26 17:10:45 +03:00
|
|
|
except TypeNotFoundError as e:
|
|
|
|
# This is fine, probably layer migration
|
|
|
|
self._logger.debug('Found invalid item, probably migrating', e)
|
|
|
|
self.disconnect()
|
2017-10-24 16:40:51 +03:00
|
|
|
return self.connect(_sync_updates=_sync_updates)
|
2017-07-26 17:10:45 +03:00
|
|
|
|
2017-06-08 14:12:57 +03:00
|
|
|
except (RPCError, ConnectionError) as error:
|
|
|
|
# Probably errors from the previous session, ignore them
|
|
|
|
self.disconnect()
|
2017-09-04 18:10:04 +03:00
|
|
|
self._logger.debug(
|
|
|
|
'Could not stabilise initial connection: {}'.format(error)
|
|
|
|
)
|
2017-09-17 17:20:04 +03:00
|
|
|
return False
|
2017-06-08 14:12:57 +03:00
|
|
|
|
2017-09-17 17:39:29 +03:00
|
|
|
def is_connected(self):
|
2017-09-21 14:43:33 +03:00
|
|
|
return self._sender.is_connected()
|
2017-09-17 17:39:29 +03:00
|
|
|
|
2017-10-24 16:40:51 +03:00
|
|
|
def _wrap_init_connection(self, query):
|
|
|
|
"""Wraps query around InvokeWithLayerRequest(InitConnectionRequest())"""
|
|
|
|
return InvokeWithLayerRequest(LAYER, InitConnectionRequest(
|
2017-09-17 15:25:53 +03:00
|
|
|
api_id=self.api_id,
|
|
|
|
device_model=self.session.device_model,
|
|
|
|
system_version=self.session.system_version,
|
|
|
|
app_version=self.session.app_version,
|
|
|
|
lang_code=self.session.lang_code,
|
|
|
|
system_lang_code=self.session.system_lang_code,
|
|
|
|
lang_pack='', # "langPacks are for official apps only"
|
|
|
|
query=query
|
2017-10-24 16:40:51 +03:00
|
|
|
))
|
2017-09-17 15:25:53 +03:00
|
|
|
|
2017-06-08 14:12:57 +03:00
|
|
|
def disconnect(self):
|
2017-09-29 21:50:27 +03:00
|
|
|
"""Disconnects from the Telegram server
|
|
|
|
and stops all the spawned threads"""
|
2017-10-09 12:47:10 +03:00
|
|
|
self._user_connected = False # This will stop recv_thread's loop
|
2017-10-01 20:56:24 +03:00
|
|
|
self.updates.stop_workers()
|
|
|
|
|
2017-10-09 12:47:10 +03:00
|
|
|
# This will trigger a "ConnectionResetError" on the recv_thread,
|
|
|
|
# which won't attempt reconnecting as ._user_connected is False.
|
2017-09-21 14:43:33 +03:00
|
|
|
self._sender.disconnect()
|
2017-06-08 14:12:57 +03:00
|
|
|
|
2017-10-09 12:47:10 +03:00
|
|
|
if self._recv_thread:
|
|
|
|
self._recv_thread.join()
|
|
|
|
|
2017-09-30 17:32:10 +03:00
|
|
|
# TODO Shall we clear the _exported_sessions, or may be reused?
|
|
|
|
pass
|
2017-09-29 21:50:27 +03:00
|
|
|
|
2017-09-22 13:31:41 +03:00
|
|
|
def _reconnect(self, new_dc=None):
|
2017-09-22 13:20:38 +03:00
|
|
|
"""If 'new_dc' is not set, only a call to .connect() will be made
|
|
|
|
since it's assumed that the connection has been lost and the
|
|
|
|
library is reconnecting.
|
2017-06-08 17:51:20 +03:00
|
|
|
|
2017-09-22 13:20:38 +03:00
|
|
|
If 'new_dc' is set, the client is first disconnected from the
|
|
|
|
current data center, clears the auth key for the old DC, and
|
|
|
|
connects to the new data center.
|
2017-06-08 17:51:20 +03:00
|
|
|
"""
|
2017-09-22 13:20:38 +03:00
|
|
|
if new_dc is None:
|
|
|
|
# Assume we are disconnected due to some error, so connect again
|
2017-09-30 19:25:09 +03:00
|
|
|
with self._reconnect_lock:
|
2017-09-22 13:20:38 +03:00
|
|
|
# Another thread may have connected again, so check that first
|
2017-10-09 14:23:39 +03:00
|
|
|
if self.is_connected():
|
2017-09-22 13:45:14 +03:00
|
|
|
return True
|
2017-10-09 14:23:39 +03:00
|
|
|
|
|
|
|
try:
|
|
|
|
return self.connect()
|
|
|
|
except ConnectionResetError:
|
|
|
|
return False
|
2017-09-22 13:20:38 +03:00
|
|
|
else:
|
2017-10-24 16:40:51 +03:00
|
|
|
# Since we're reconnecting possibly due to a UserMigrateError,
|
|
|
|
# we need to first know the Data Centers we can connect to. Do
|
|
|
|
# that before disconnecting.
|
2017-06-08 17:51:20 +03:00
|
|
|
dc = self._get_dc(new_dc)
|
2017-10-24 16:40:51 +03:00
|
|
|
|
|
|
|
self.session.server_address = dc.ip_address
|
2017-10-01 20:26:20 +03:00
|
|
|
self.session.port = dc.port
|
2017-10-24 16:40:51 +03:00
|
|
|
# auth_key's are associated with a server, which has now changed
|
|
|
|
# so it's not valid anymore. Set to None to force recreating it.
|
|
|
|
self.session.auth_key = None
|
2017-06-08 17:51:20 +03:00
|
|
|
self.session.save()
|
2017-10-24 16:40:51 +03:00
|
|
|
self.disconnect()
|
2017-09-22 13:45:14 +03:00
|
|
|
return self.connect()
|
2017-06-08 14:12:57 +03:00
|
|
|
|
|
|
|
# endregion
|
|
|
|
|
2017-09-29 21:50:27 +03:00
|
|
|
# region Working with different connections/Data Centers
|
|
|
|
|
|
|
|
def _on_read_thread(self):
|
|
|
|
return self._recv_thread is not None and \
|
|
|
|
threading.get_ident() == self._recv_thread.ident
|
2017-06-08 14:12:57 +03:00
|
|
|
|
2017-08-29 16:59:08 +03:00
|
|
|
def _get_dc(self, dc_id, ipv6=False, cdn=False):
|
2017-06-08 14:12:57 +03:00
|
|
|
"""Gets the Data Center (DC) associated to 'dc_id'"""
|
2017-10-24 16:40:51 +03:00
|
|
|
if not TelegramBareClient._config:
|
|
|
|
TelegramBareClient._config = self(GetConfigRequest())
|
2017-06-08 14:12:57 +03:00
|
|
|
|
2017-08-24 14:02:48 +03:00
|
|
|
try:
|
2017-09-05 17:11:02 +03:00
|
|
|
if cdn:
|
|
|
|
# Ensure we have the latest keys for the CDNs
|
|
|
|
for pk in self(GetCdnConfigRequest()).public_keys:
|
|
|
|
rsa.add_key(pk.public_key)
|
|
|
|
|
2017-08-29 14:49:41 +03:00
|
|
|
return next(
|
2017-10-24 16:40:51 +03:00
|
|
|
dc for dc in TelegramBareClient._config.dc_options
|
|
|
|
if dc.id == dc_id and bool(dc.ipv6) == ipv6 and bool(dc.cdn) == cdn
|
2017-08-29 14:49:41 +03:00
|
|
|
)
|
2017-08-24 14:02:48 +03:00
|
|
|
except StopIteration:
|
|
|
|
if not cdn:
|
|
|
|
raise
|
|
|
|
|
2017-09-05 17:11:02 +03:00
|
|
|
# New configuration, perhaps a new CDN was added?
|
2017-10-24 16:40:51 +03:00
|
|
|
TelegramBareClient._config = self(GetConfigRequest())
|
2017-08-29 14:49:41 +03:00
|
|
|
return self._get_dc(dc_id, ipv6=ipv6, cdn=cdn)
|
2017-06-08 14:12:57 +03:00
|
|
|
|
2017-09-30 17:32:10 +03:00
|
|
|
def _get_exported_client(self, dc_id):
|
|
|
|
"""Creates and connects a new TelegramBareClient for the desired DC.
|
2017-07-04 11:21:15 +03:00
|
|
|
|
2017-09-30 17:32:10 +03:00
|
|
|
If it's the first time calling the method with a given dc_id,
|
|
|
|
a new session will be first created, and its auth key generated.
|
|
|
|
Exporting/Importing the authorization will also be done so that
|
|
|
|
the auth is bound with the key.
|
2017-07-04 11:21:15 +03:00
|
|
|
"""
|
|
|
|
# Thanks badoualy/kotlogram on /telegram/api/DefaultTelegramClient.kt
|
|
|
|
# for clearly showing how to export the authorization! ^^
|
2017-09-30 17:32:10 +03:00
|
|
|
session = self._exported_sessions.get(dc_id)
|
|
|
|
if session:
|
|
|
|
export_auth = None # Already bound with the auth key
|
2017-07-04 11:21:15 +03:00
|
|
|
else:
|
2017-09-30 17:32:10 +03:00
|
|
|
# TODO Add a lock, don't allow two threads to create an auth key
|
2017-09-30 18:51:07 +03:00
|
|
|
# (when calling .connect() if there wasn't a previous session).
|
2017-09-30 17:32:10 +03:00
|
|
|
# for the same data center.
|
2017-07-04 11:21:15 +03:00
|
|
|
dc = self._get_dc(dc_id)
|
|
|
|
|
|
|
|
# Export the current authorization to the new DC.
|
|
|
|
export_auth = self(ExportAuthorizationRequest(dc_id))
|
|
|
|
|
|
|
|
# Create a temporary session for this IP address, which needs
|
|
|
|
# to be different because each auth_key is unique per DC.
|
|
|
|
#
|
|
|
|
# Construct this session with the connection parameters
|
|
|
|
# (system version, device model...) from the current one.
|
2017-08-29 16:59:08 +03:00
|
|
|
session = Session(self.session)
|
2017-07-04 11:21:15 +03:00
|
|
|
session.server_address = dc.ip_address
|
|
|
|
session.port = dc.port
|
2017-09-30 17:32:10 +03:00
|
|
|
self._exported_sessions[dc_id] = session
|
2017-07-04 11:21:15 +03:00
|
|
|
|
2017-09-30 17:32:10 +03:00
|
|
|
client = TelegramBareClient(
|
|
|
|
session, self.api_id, self.api_hash,
|
|
|
|
proxy=self._sender.connection.conn.proxy,
|
|
|
|
timeout=self._sender.connection.get_timeout()
|
|
|
|
)
|
2017-10-24 16:40:51 +03:00
|
|
|
client.connect(_sync_updates=False)
|
|
|
|
if isinstance(export_auth, ExportedAuthorization):
|
|
|
|
client(ImportAuthorizationRequest(
|
|
|
|
id=export_auth.id, bytes=export_auth.bytes
|
|
|
|
))
|
|
|
|
elif export_auth is not None:
|
|
|
|
self._logger.warning('Unknown return export_auth type', export_auth)
|
|
|
|
|
2017-09-30 19:33:34 +03:00
|
|
|
client._authorized = True # We exported the auth, so we got auth
|
2017-09-30 17:32:10 +03:00
|
|
|
return client
|
2017-07-04 11:21:15 +03:00
|
|
|
|
2017-09-30 18:51:07 +03:00
|
|
|
def _get_cdn_client(self, cdn_redirect):
|
|
|
|
"""Similar to ._get_exported_client, but for CDNs"""
|
|
|
|
session = self._exported_sessions.get(cdn_redirect.dc_id)
|
|
|
|
if not session:
|
|
|
|
dc = self._get_dc(cdn_redirect.dc_id, cdn=True)
|
|
|
|
session = Session(self.session)
|
|
|
|
session.server_address = dc.ip_address
|
|
|
|
session.port = dc.port
|
|
|
|
self._exported_sessions[cdn_redirect.dc_id] = session
|
|
|
|
|
|
|
|
client = TelegramBareClient(
|
|
|
|
session, self.api_id, self.api_hash,
|
|
|
|
proxy=self._sender.connection.conn.proxy,
|
|
|
|
timeout=self._sender.connection.get_timeout()
|
|
|
|
)
|
|
|
|
|
|
|
|
# This will make use of the new RSA keys for this specific CDN.
|
|
|
|
#
|
2017-10-24 16:40:51 +03:00
|
|
|
# We won't be calling GetConfigRequest because it's only called
|
|
|
|
# when needed by ._get_dc, and also it's static so it's likely
|
|
|
|
# set already. Avoid invoking non-CDN methods by not syncing updates.
|
|
|
|
client.connect(_sync_updates=False)
|
2017-09-30 19:33:34 +03:00
|
|
|
client._authorized = self._authorized
|
2017-09-30 18:51:07 +03:00
|
|
|
return client
|
|
|
|
|
2017-06-08 14:12:57 +03:00
|
|
|
# endregion
|
|
|
|
|
|
|
|
# region Invoking Telegram requests
|
|
|
|
|
2017-10-15 12:05:56 +03:00
|
|
|
def __call__(self, *requests, retries=5):
|
2017-06-08 14:12:57 +03:00
|
|
|
"""Invokes (sends) a MTProtoRequest and returns (receives) its result.
|
|
|
|
|
2017-09-29 21:50:27 +03:00
|
|
|
The invoke will be retried up to 'retries' times before raising
|
|
|
|
ValueError().
|
2017-06-08 14:12:57 +03:00
|
|
|
"""
|
2017-09-25 21:52:27 +03:00
|
|
|
if not all(isinstance(x, TLObject) and
|
|
|
|
x.content_related for x in requests):
|
2017-07-24 17:54:48 +03:00
|
|
|
raise ValueError('You can only invoke requests, not types!')
|
2017-06-08 14:12:57 +03:00
|
|
|
|
2017-09-29 21:50:27 +03:00
|
|
|
# Determine the sender to be used (main or a new connection)
|
|
|
|
on_main_thread = threading.get_ident() == self._main_thread_ident
|
2017-10-15 12:05:56 +03:00
|
|
|
if on_main_thread or self._on_read_thread():
|
|
|
|
sender = self._sender
|
2017-10-21 00:30:02 +03:00
|
|
|
update_state = self.updates
|
2017-10-15 12:05:56 +03:00
|
|
|
else:
|
|
|
|
sender = self._sender.clone()
|
|
|
|
sender.connect()
|
2017-10-21 00:30:02 +03:00
|
|
|
# We're on another connection, Telegram will resend all the
|
|
|
|
# updates that we haven't acknowledged (potentially entering
|
|
|
|
# an infinite loop if we're calling this in response to an
|
|
|
|
# update event, as it would be received again and again). So
|
|
|
|
# to avoid this we will simply not process updates on these
|
|
|
|
# new temporary connections, as they will be sent and later
|
|
|
|
# acknowledged over the main connection.
|
|
|
|
update_state = None
|
2017-09-29 20:55:14 +03:00
|
|
|
|
2017-09-30 12:45:35 +03:00
|
|
|
# We should call receive from this thread if there's no background
|
|
|
|
# thread reading or if the server disconnected us and we're trying
|
|
|
|
# to reconnect. This is because the read thread may either be
|
|
|
|
# locked also trying to reconnect or we may be said thread already.
|
2017-09-30 19:25:09 +03:00
|
|
|
call_receive = not on_main_thread or self._recv_thread is None \
|
|
|
|
or self._reconnect_lock.locked()
|
2017-09-30 12:45:35 +03:00
|
|
|
try:
|
|
|
|
for _ in range(retries):
|
2017-10-04 15:09:46 +03:00
|
|
|
if self._background_error and on_main_thread:
|
|
|
|
raise self._background_error
|
|
|
|
|
2017-10-21 00:30:02 +03:00
|
|
|
result = self._invoke(
|
|
|
|
sender, call_receive, update_state, *requests
|
|
|
|
)
|
2017-10-07 19:52:27 +03:00
|
|
|
if result is not None:
|
2017-09-30 12:45:35 +03:00
|
|
|
return result
|
|
|
|
|
2017-09-30 17:18:16 +03:00
|
|
|
raise ValueError('Number of retries reached 0.')
|
2017-09-30 12:45:35 +03:00
|
|
|
finally:
|
|
|
|
if sender != self._sender:
|
|
|
|
sender.disconnect() # Close temporary connections
|
|
|
|
|
2017-10-01 17:04:14 +03:00
|
|
|
# Let people use client.invoke(SomeRequest()) instead client(...)
|
|
|
|
invoke = __call__
|
|
|
|
|
2017-10-21 00:30:02 +03:00
|
|
|
def _invoke(self, sender, call_receive, update_state, *requests):
|
2017-10-24 16:40:51 +03:00
|
|
|
# We need to specify the new layer (by initializing a new
|
|
|
|
# connection) if it has changed from the latest known one.
|
|
|
|
init_connection = self.session.layer != LAYER
|
|
|
|
|
2017-06-08 14:12:57 +03:00
|
|
|
try:
|
2017-09-03 14:45:13 +03:00
|
|
|
# Ensure that we start with no previous errors (i.e. resending)
|
2017-09-25 21:52:27 +03:00
|
|
|
for x in requests:
|
|
|
|
x.confirm_received.clear()
|
|
|
|
x.rpc_error = None
|
2017-09-03 14:45:13 +03:00
|
|
|
|
2017-10-24 16:40:51 +03:00
|
|
|
if not self.session.auth_key:
|
|
|
|
# New key, we need to tell the server we're going to use
|
|
|
|
# the latest layer and initialize the connection doing so.
|
|
|
|
self.session.auth_key, self.session.time_offset = \
|
|
|
|
authenticator.do_authentication(self._sender.connection)
|
|
|
|
init_connection = True
|
|
|
|
|
|
|
|
if init_connection:
|
|
|
|
if len(requests) == 1:
|
|
|
|
requests = [self._wrap_init_connection(requests[0])]
|
|
|
|
else:
|
|
|
|
# We need a SINGLE request (like GetConfig) to init conn.
|
|
|
|
# Once that's done, the N original requests will be
|
|
|
|
# invoked.
|
|
|
|
TelegramBareClient._config = self(
|
|
|
|
self._wrap_init_connection(GetConfigRequest())
|
|
|
|
)
|
|
|
|
|
2017-09-29 20:55:14 +03:00
|
|
|
sender.send(*requests)
|
2017-09-29 21:50:27 +03:00
|
|
|
|
2017-09-03 10:56:10 +03:00
|
|
|
if not call_receive:
|
2017-09-02 22:27:11 +03:00
|
|
|
# TODO This will be slightly troublesome if we allow
|
|
|
|
# switching between constant read or not on the fly.
|
|
|
|
# Must also watch out for calling .read() from two places,
|
|
|
|
# in which case a Lock would be required for .receive().
|
2017-09-25 21:52:27 +03:00
|
|
|
for x in requests:
|
|
|
|
x.confirm_received.wait(
|
2017-09-29 20:55:14 +03:00
|
|
|
sender.connection.get_timeout()
|
2017-09-25 21:52:27 +03:00
|
|
|
)
|
2017-09-02 22:27:11 +03:00
|
|
|
else:
|
2017-09-25 21:52:27 +03:00
|
|
|
while not all(x.confirm_received.is_set() for x in requests):
|
2017-10-21 00:30:02 +03:00
|
|
|
sender.receive(update_state=update_state)
|
2017-09-02 22:27:11 +03:00
|
|
|
|
2017-10-24 16:40:51 +03:00
|
|
|
except BrokenAuthKeyError:
|
|
|
|
self._logger.error('Broken auth key, a new one will be generated')
|
|
|
|
self.session.auth_key = None
|
|
|
|
|
2017-09-17 17:32:51 +03:00
|
|
|
except TimeoutError:
|
|
|
|
pass # We will just retry
|
|
|
|
|
2017-06-08 14:12:57 +03:00
|
|
|
except ConnectionResetError:
|
2017-10-08 11:52:56 +03:00
|
|
|
if not self._user_connected or self._reconnect_lock.locked():
|
|
|
|
# Only attempt reconnecting if the user called connect and not
|
2017-09-30 19:33:34 +03:00
|
|
|
# reconnecting already.
|
2017-09-29 21:50:27 +03:00
|
|
|
raise
|
|
|
|
|
2017-07-10 16:21:20 +03:00
|
|
|
self._logger.debug('Server disconnected us. Reconnecting and '
|
2017-08-24 14:02:48 +03:00
|
|
|
'resending request...')
|
2017-09-29 21:50:27 +03:00
|
|
|
|
2017-09-29 20:55:14 +03:00
|
|
|
if sender != self._sender:
|
2017-09-29 21:50:27 +03:00
|
|
|
# TODO Try reconnecting forever too?
|
2017-09-29 20:55:14 +03:00
|
|
|
sender.connect()
|
|
|
|
else:
|
2017-09-29 21:50:27 +03:00
|
|
|
while self._user_connected and not self._reconnect():
|
|
|
|
sleep(0.1) # Retry forever until we can send the request
|
2017-10-19 11:51:34 +03:00
|
|
|
return None
|
2017-09-29 21:50:27 +03:00
|
|
|
|
2017-10-24 16:40:51 +03:00
|
|
|
if init_connection:
|
|
|
|
# We initialized the connection successfully, even if
|
|
|
|
# a request had an RPC error we have invoked it fine.
|
|
|
|
self.session.layer = LAYER
|
|
|
|
self.session.save()
|
|
|
|
|
2017-09-25 21:52:27 +03:00
|
|
|
try:
|
|
|
|
raise next(x.rpc_error for x in requests if x.rpc_error)
|
|
|
|
except StopIteration:
|
|
|
|
if any(x.result is None for x in requests):
|
|
|
|
# "A container may only be accepted or
|
2017-09-30 12:45:35 +03:00
|
|
|
# rejected by the other party as a whole."
|
|
|
|
return None
|
2017-10-01 14:24:04 +03:00
|
|
|
|
2017-10-01 17:02:29 +03:00
|
|
|
if len(requests) == 1:
|
|
|
|
return requests[0].result
|
|
|
|
else:
|
|
|
|
return [x.result for x in requests]
|
2017-09-11 11:52:36 +03:00
|
|
|
|
2017-10-01 12:25:56 +03:00
|
|
|
except (PhoneMigrateError, NetworkMigrateError,
|
|
|
|
UserMigrateError) as e:
|
|
|
|
self._logger.debug(
|
|
|
|
'DC error when invoking request, '
|
|
|
|
'attempting to reconnect at DC {}'.format(e.new_dc)
|
|
|
|
)
|
|
|
|
|
|
|
|
# TODO What happens with the background thread here?
|
|
|
|
# For normal use cases, this won't happen, because this will only
|
|
|
|
# be on the very first connection (not authorized, not running),
|
|
|
|
# but may be an issue for people who actually travel?
|
|
|
|
self._reconnect(new_dc=e.new_dc)
|
2017-10-21 00:30:02 +03:00
|
|
|
return self._invoke(sender, call_receive, update_state, *requests)
|
2017-10-01 12:25:56 +03:00
|
|
|
|
|
|
|
except ServerError as e:
|
|
|
|
# Telegram is having some issues, just retry
|
|
|
|
self._logger.debug(
|
|
|
|
'[ERROR] Telegram is having some internal issues', e
|
|
|
|
)
|
|
|
|
|
2017-10-08 17:15:30 +03:00
|
|
|
except FloodWaitError as e:
|
|
|
|
if e.seconds > self.session.flood_sleep_threshold | 0:
|
|
|
|
raise
|
|
|
|
|
|
|
|
self._logger.debug(
|
|
|
|
'Sleep of %d seconds below threshold, sleeping' % e.seconds
|
|
|
|
)
|
|
|
|
sleep(e.seconds)
|
2017-10-01 12:25:56 +03:00
|
|
|
|
2017-09-29 21:50:27 +03:00
|
|
|
# Some really basic functionality
|
|
|
|
|
|
|
|
def is_user_authorized(self):
|
|
|
|
"""Has the user been authorized yet
|
|
|
|
(code request sent and confirmed)?"""
|
|
|
|
return self._authorized
|
|
|
|
|
2017-06-08 14:12:57 +03:00
|
|
|
# endregion
|
|
|
|
|
|
|
|
# region Uploading media
|
|
|
|
|
|
|
|
def upload_file(self,
|
2017-08-23 02:43:08 +03:00
|
|
|
file,
|
2017-06-08 14:12:57 +03:00
|
|
|
part_size_kb=None,
|
|
|
|
file_name=None,
|
|
|
|
progress_callback=None):
|
2017-08-23 02:43:08 +03:00
|
|
|
"""Uploads the specified file and returns a handle (an instance
|
2017-06-08 14:12:57 +03:00
|
|
|
of InputFile or InputFileBig, as required) which can be later used.
|
|
|
|
|
2017-08-23 01:27:33 +03:00
|
|
|
Uploading a file will simply return a "handle" to the file stored
|
|
|
|
remotely in the Telegram servers, which can be later used on. This
|
|
|
|
will NOT upload the file to your own chat.
|
|
|
|
|
2017-08-23 02:43:08 +03:00
|
|
|
'file' may be either a file path, a byte array, or a stream.
|
|
|
|
Note that if the file is a stream it will need to be read
|
|
|
|
entirely into memory to tell its size first.
|
|
|
|
|
2017-06-08 14:12:57 +03:00
|
|
|
If 'progress_callback' is not None, it should be a function that
|
|
|
|
takes two parameters, (bytes_uploaded, total_bytes).
|
|
|
|
|
|
|
|
Default values for the optional parameters if left as None are:
|
|
|
|
part_size_kb = get_appropriated_part_size(file_size)
|
2017-09-29 21:50:27 +03:00
|
|
|
file_name = os.path.basename(file_path)
|
2017-06-08 14:12:57 +03:00
|
|
|
"""
|
2017-08-23 02:43:08 +03:00
|
|
|
if isinstance(file, str):
|
2017-09-29 21:50:27 +03:00
|
|
|
file_size = os.path.getsize(file)
|
2017-08-23 02:43:08 +03:00
|
|
|
elif isinstance(file, bytes):
|
|
|
|
file_size = len(file)
|
|
|
|
else:
|
|
|
|
file = file.read()
|
|
|
|
file_size = len(file)
|
|
|
|
|
2017-06-08 14:12:57 +03:00
|
|
|
if not part_size_kb:
|
|
|
|
part_size_kb = get_appropriated_part_size(file_size)
|
|
|
|
|
|
|
|
if part_size_kb > 512:
|
|
|
|
raise ValueError('The part size must be less or equal to 512KB')
|
|
|
|
|
|
|
|
part_size = int(part_size_kb * 1024)
|
|
|
|
if part_size % 1024 != 0:
|
|
|
|
raise ValueError('The part size must be evenly divisible by 1024')
|
|
|
|
|
|
|
|
# Determine whether the file is too big (over 10MB) or not
|
|
|
|
# Telegram does make a distinction between smaller or larger files
|
|
|
|
is_large = file_size > 10 * 1024 * 1024
|
|
|
|
part_count = (file_size + part_size - 1) // part_size
|
|
|
|
|
|
|
|
file_id = utils.generate_random_long()
|
|
|
|
hash_md5 = md5()
|
|
|
|
|
2017-08-23 02:43:08 +03:00
|
|
|
stream = open(file, 'rb') if isinstance(file, str) else BytesIO(file)
|
|
|
|
try:
|
2017-06-08 14:12:57 +03:00
|
|
|
for part_index in range(part_count):
|
|
|
|
# Read the file by in chunks of size part_size
|
2017-08-23 02:43:08 +03:00
|
|
|
part = stream.read(part_size)
|
2017-06-08 14:12:57 +03:00
|
|
|
|
|
|
|
# The SavePartRequest is different depending on whether
|
|
|
|
# the file is too large or not (over or less than 10MB)
|
|
|
|
if is_large:
|
|
|
|
request = SaveBigFilePartRequest(file_id, part_index,
|
|
|
|
part_count, part)
|
|
|
|
else:
|
|
|
|
request = SaveFilePartRequest(file_id, part_index, part)
|
|
|
|
|
2017-07-02 12:56:40 +03:00
|
|
|
result = self(request)
|
2017-06-08 14:12:57 +03:00
|
|
|
if result:
|
|
|
|
if not is_large:
|
|
|
|
# No need to update the hash if it's a large file
|
|
|
|
hash_md5.update(part)
|
|
|
|
|
|
|
|
if progress_callback:
|
2017-08-23 02:43:08 +03:00
|
|
|
progress_callback(stream.tell(), file_size)
|
2017-06-08 14:12:57 +03:00
|
|
|
else:
|
|
|
|
raise ValueError('Failed to upload file part {}.'
|
|
|
|
.format(part_index))
|
2017-08-23 02:43:08 +03:00
|
|
|
finally:
|
|
|
|
stream.close()
|
2017-06-08 14:12:57 +03:00
|
|
|
|
|
|
|
# Set a default file name if None was specified
|
|
|
|
if not file_name:
|
2017-08-23 02:43:08 +03:00
|
|
|
if isinstance(file, str):
|
2017-09-29 21:50:27 +03:00
|
|
|
file_name = os.path.basename(file)
|
2017-08-23 02:43:08 +03:00
|
|
|
else:
|
|
|
|
file_name = str(file_id)
|
2017-06-08 14:12:57 +03:00
|
|
|
|
|
|
|
if is_large:
|
|
|
|
return InputFileBig(file_id, part_count, file_name)
|
|
|
|
else:
|
|
|
|
return InputFile(file_id, part_count, file_name,
|
|
|
|
md5_checksum=hash_md5.hexdigest())
|
|
|
|
|
|
|
|
# endregion
|
|
|
|
|
|
|
|
# region Downloading media
|
|
|
|
|
|
|
|
def download_file(self,
|
2017-06-09 11:35:19 +03:00
|
|
|
input_location,
|
2017-06-09 12:12:56 +03:00
|
|
|
file,
|
2017-06-09 11:35:19 +03:00
|
|
|
part_size_kb=None,
|
|
|
|
file_size=None,
|
|
|
|
progress_callback=None):
|
2017-06-09 12:12:56 +03:00
|
|
|
"""Downloads the given InputFileLocation to file (a stream or str).
|
2017-06-08 14:12:57 +03:00
|
|
|
|
|
|
|
If 'progress_callback' is not None, it should be a function that
|
|
|
|
takes two parameters, (bytes_downloaded, total_bytes). Note that
|
2017-06-09 11:35:19 +03:00
|
|
|
'total_bytes' simply equals 'file_size', and may be None.
|
|
|
|
"""
|
2017-06-08 14:12:57 +03:00
|
|
|
if not part_size_kb:
|
|
|
|
if not file_size:
|
|
|
|
part_size_kb = 64 # Reasonable default
|
|
|
|
else:
|
|
|
|
part_size_kb = get_appropriated_part_size(file_size)
|
|
|
|
|
|
|
|
part_size = int(part_size_kb * 1024)
|
2017-08-24 14:15:33 +03:00
|
|
|
# https://core.telegram.org/api/files says:
|
|
|
|
# > part_size % 1024 = 0 (divisible by 1KB)
|
|
|
|
#
|
|
|
|
# But https://core.telegram.org/cdn (more recent) says:
|
|
|
|
# > limit must be divisible by 4096 bytes
|
|
|
|
# So we just stick to the 4096 limit.
|
|
|
|
if part_size % 4096 != 0:
|
|
|
|
raise ValueError('The part size must be evenly divisible by 4096.')
|
2017-06-08 14:12:57 +03:00
|
|
|
|
2017-06-09 12:12:56 +03:00
|
|
|
if isinstance(file, str):
|
|
|
|
# Ensure that we'll be able to download the media
|
|
|
|
utils.ensure_parent_dir_exists(file)
|
|
|
|
f = open(file, 'wb')
|
|
|
|
else:
|
|
|
|
f = file
|
2017-06-08 14:12:57 +03:00
|
|
|
|
2017-07-04 11:21:15 +03:00
|
|
|
# The used client will change if FileMigrateError occurs
|
|
|
|
client = self
|
2017-08-28 17:33:23 +03:00
|
|
|
cdn_decrypter = None
|
2017-07-04 11:21:15 +03:00
|
|
|
|
2017-06-09 12:12:56 +03:00
|
|
|
try:
|
2017-10-09 13:00:14 +03:00
|
|
|
offset = 0
|
2017-06-08 14:12:57 +03:00
|
|
|
while True:
|
2017-07-04 11:21:15 +03:00
|
|
|
try:
|
2017-08-28 17:25:10 +03:00
|
|
|
if cdn_decrypter:
|
2017-09-05 17:43:53 +03:00
|
|
|
result = cdn_decrypter.get_file()
|
2017-08-24 14:02:48 +03:00
|
|
|
else:
|
|
|
|
result = client(GetFileRequest(
|
|
|
|
input_location, offset, part_size
|
2017-10-15 12:05:56 +03:00
|
|
|
))
|
2017-08-24 14:02:48 +03:00
|
|
|
|
2017-08-24 19:00:27 +03:00
|
|
|
if isinstance(result, FileCdnRedirect):
|
2017-08-28 17:25:10 +03:00
|
|
|
cdn_decrypter, result = \
|
|
|
|
CdnDecrypter.prepare_decrypter(
|
2017-09-30 18:51:07 +03:00
|
|
|
client, self._get_cdn_client(result), result
|
2017-09-04 18:10:04 +03:00
|
|
|
)
|
2017-08-24 14:02:48 +03:00
|
|
|
|
2017-07-04 11:21:15 +03:00
|
|
|
except FileMigrateError as e:
|
|
|
|
client = self._get_exported_client(e.new_dc)
|
|
|
|
continue
|
|
|
|
|
2017-10-09 13:00:14 +03:00
|
|
|
offset += part_size
|
2017-06-08 14:12:57 +03:00
|
|
|
|
|
|
|
# If we have received no data (0 bytes), the file is over
|
|
|
|
# So there is nothing left to download and write
|
|
|
|
if not result.bytes:
|
2017-08-28 17:25:10 +03:00
|
|
|
# Return some extra information, unless it's a CDN file
|
|
|
|
return getattr(result, 'type', '')
|
2017-08-28 15:17:31 +03:00
|
|
|
|
|
|
|
f.write(result.bytes)
|
2017-06-08 14:12:57 +03:00
|
|
|
if progress_callback:
|
2017-06-09 12:12:56 +03:00
|
|
|
progress_callback(f.tell(), file_size)
|
|
|
|
finally:
|
2017-09-30 17:32:10 +03:00
|
|
|
if client != self:
|
|
|
|
client.disconnect()
|
|
|
|
|
2017-08-28 17:33:23 +03:00
|
|
|
if cdn_decrypter:
|
2017-09-04 18:10:04 +03:00
|
|
|
try:
|
|
|
|
cdn_decrypter.client.disconnect()
|
|
|
|
except:
|
|
|
|
pass
|
2017-06-09 12:12:56 +03:00
|
|
|
if isinstance(file, str):
|
|
|
|
f.close()
|
2017-06-08 14:12:57 +03:00
|
|
|
|
|
|
|
# endregion
|
2017-09-29 21:50:27 +03:00
|
|
|
|
|
|
|
# region Updates handling
|
|
|
|
|
|
|
|
def sync_updates(self):
|
|
|
|
"""Synchronizes self.updates to their initial state. Will be
|
|
|
|
called automatically on connection if self.updates.enabled = True,
|
|
|
|
otherwise it should be called manually after enabling updates.
|
|
|
|
"""
|
|
|
|
self.updates.process(self(GetStateRequest()))
|
|
|
|
|
|
|
|
def add_update_handler(self, handler):
|
|
|
|
"""Adds an update handler (a function which takes a TLObject,
|
|
|
|
an update, as its parameter) and listens for updates"""
|
2017-10-21 14:47:54 +03:00
|
|
|
if self.updates.workers is None:
|
|
|
|
warnings.warn(
|
|
|
|
"You have not setup any workers, so you won't receive updates."
|
|
|
|
" Pass update_workers=4 when creating the TelegramClient,"
|
|
|
|
" or set client.self.updates.workers = 4"
|
|
|
|
)
|
2017-10-07 18:55:37 +03:00
|
|
|
|
2017-09-29 21:50:27 +03:00
|
|
|
self.updates.handlers.append(handler)
|
|
|
|
|
|
|
|
def remove_update_handler(self, handler):
|
|
|
|
self.updates.handlers.remove(handler)
|
|
|
|
|
|
|
|
def list_update_handlers(self):
|
|
|
|
return self.updates.handlers[:]
|
|
|
|
|
|
|
|
# endregion
|
|
|
|
|
|
|
|
# Constant read
|
|
|
|
|
|
|
|
def _set_connected_and_authorized(self):
|
|
|
|
self._authorized = True
|
2017-10-01 20:56:24 +03:00
|
|
|
self.updates.setup_workers()
|
2017-09-30 12:28:15 +03:00
|
|
|
if self._spawn_read_thread and self._recv_thread is None:
|
2017-09-29 21:50:27 +03:00
|
|
|
self._recv_thread = threading.Thread(
|
|
|
|
name='ReadThread', daemon=True,
|
|
|
|
target=self._recv_thread_impl
|
|
|
|
)
|
|
|
|
self._recv_thread.start()
|
|
|
|
|
2017-10-22 14:15:52 +03:00
|
|
|
def _signal_handler(self, signum, frame):
|
|
|
|
if self._user_connected:
|
|
|
|
self.disconnect()
|
|
|
|
else:
|
|
|
|
self._logger.debug('Forcing exit...')
|
|
|
|
os._exit(1)
|
|
|
|
|
|
|
|
def idle(self, stop_signals=(SIGINT, SIGTERM, SIGABRT)):
|
|
|
|
"""
|
|
|
|
Idles the program by looping forever and listening for updates
|
|
|
|
until one of the signals are received, which breaks the loop.
|
|
|
|
|
|
|
|
:param stop_signals:
|
|
|
|
Iterable containing signals from the signal module that will
|
|
|
|
be subscribed to TelegramClient.disconnect() (effectively
|
|
|
|
stopping the idle loop), which will be called on receiving one
|
|
|
|
of those signals.
|
|
|
|
:return:
|
|
|
|
"""
|
|
|
|
if self._spawn_read_thread and not self._on_read_thread():
|
|
|
|
raise ValueError('Can only idle if spawn_read_thread=False')
|
|
|
|
|
|
|
|
for sig in stop_signals:
|
|
|
|
signal(sig, self._signal_handler)
|
|
|
|
|
2017-09-29 21:50:27 +03:00
|
|
|
while self._user_connected:
|
|
|
|
try:
|
|
|
|
if datetime.now() > self._last_ping + self._ping_delay:
|
|
|
|
self._sender.send(PingRequest(
|
|
|
|
int.from_bytes(os.urandom(8), 'big', signed=True)
|
|
|
|
))
|
|
|
|
self._last_ping = datetime.now()
|
|
|
|
|
|
|
|
self._sender.receive(update_state=self.updates)
|
|
|
|
except TimeoutError:
|
|
|
|
# No problem.
|
|
|
|
pass
|
|
|
|
except ConnectionResetError:
|
|
|
|
self._logger.debug('Server disconnected us. Reconnecting...')
|
|
|
|
while self._user_connected and not self._reconnect():
|
|
|
|
sleep(0.1) # Retry forever, this is instant messaging
|
|
|
|
|
2017-10-22 14:15:52 +03:00
|
|
|
# By using this approach, another thread will be
|
|
|
|
# created and started upon connection to constantly read
|
|
|
|
# from the other end. Otherwise, manual calls to .receive()
|
|
|
|
# must be performed. The MtProtoSender cannot be connected,
|
|
|
|
# or an error will be thrown.
|
|
|
|
#
|
|
|
|
# This way, sending and receiving will be completely independent.
|
|
|
|
def _recv_thread_impl(self):
|
|
|
|
# This thread is "idle" (only listening for updates), but also
|
|
|
|
# excepts everything unlike the manual idle because it should
|
|
|
|
# not crash.
|
|
|
|
while self._user_connected:
|
|
|
|
try:
|
|
|
|
self.idle(stop_signals=tuple())
|
2017-09-30 19:39:31 +03:00
|
|
|
except Exception as error:
|
2017-09-29 21:50:27 +03:00
|
|
|
# Unknown exception, pass it to the main thread
|
2017-10-22 14:15:52 +03:00
|
|
|
self._logger.error(
|
|
|
|
'Unknown error on the read thread, please report',
|
2017-09-30 19:39:31 +03:00
|
|
|
error
|
|
|
|
)
|
2017-10-04 15:09:46 +03:00
|
|
|
|
|
|
|
try:
|
|
|
|
import socks
|
2017-10-13 12:38:12 +03:00
|
|
|
if isinstance(error, (
|
|
|
|
socks.GeneralProxyError, socks.ProxyConnectionError
|
|
|
|
)):
|
2017-10-04 15:09:46 +03:00
|
|
|
# This is a known error, and it's not related to
|
|
|
|
# Telegram but rather to the proxy. Disconnect and
|
|
|
|
# hand it over to the main thread.
|
|
|
|
self._background_error = error
|
|
|
|
self.disconnect()
|
|
|
|
break
|
|
|
|
except ImportError:
|
|
|
|
"Not using PySocks, so it can't be a socket error"
|
|
|
|
|
2017-09-30 19:39:31 +03:00
|
|
|
# If something strange happens we don't want to enter an
|
|
|
|
# infinite loop where all we do is raise an exception, so
|
|
|
|
# add a little sleep to avoid the CPU usage going mad.
|
|
|
|
sleep(0.1)
|
2017-09-29 21:50:27 +03:00
|
|
|
|
|
|
|
self._recv_thread = None
|
|
|
|
|
|
|
|
# endregion
|