Remove all Thread's except from UpdateState

This commit is contained in:
Lonami Exo 2017-10-06 19:30:14 +02:00
parent a17def8026
commit 9716d1d543
5 changed files with 51 additions and 230 deletions

View File

@ -3,14 +3,12 @@ import errno
import socket
from datetime import timedelta
from io import BytesIO, BufferedWriter
from threading import Lock
class TcpClient:
def __init__(self, proxy=None, timeout=timedelta(seconds=5)):
self.proxy = proxy
self._socket = None
self._closing_lock = Lock()
if isinstance(timeout, timedelta):
self.timeout = timeout.seconds
@ -65,11 +63,6 @@ class TcpClient:
def close(self):
"""Closes the connection"""
if self._closing_lock.locked():
# Already closing, no need to close again (avoid None.close())
return
with self._closing_lock:
try:
if self._socket is not None:
self._socket.shutdown(socket.SHUT_RDWR)

View File

@ -1,10 +1,8 @@
import logging
import os
import threading
from datetime import timedelta, datetime
from hashlib import md5
from io import BytesIO
from threading import Lock
from time import sleep
from . import helpers as utils
@ -18,7 +16,7 @@ from .network import authenticator, MtProtoSender, Connection, ConnectionMode
from .tl import TLObject, Session
from .tl.all_tlobjects import LAYER
from .tl.functions import (
InitConnectionRequest, InvokeWithLayerRequest, PingRequest
InitConnectionRequest, InvokeWithLayerRequest
)
from .tl.functions.auth import (
ImportAuthorizationRequest, ExportAuthorizationRequest
@ -67,8 +65,6 @@ class TelegramBareClient:
def __init__(self, session, api_id, api_hash,
connection_mode=ConnectionMode.TCP_FULL,
proxy=None,
update_workers=None,
spawn_read_thread=False,
timeout=timedelta(seconds=5),
**kwargs):
"""Refer to TelegramClient.__init__ for docs on this method"""
@ -101,10 +97,6 @@ class TelegramBareClient:
self._logger = logging.getLogger(__name__)
# Two threads may be calling reconnect() when the connection is lost,
# we only want one to actually perform the reconnection.
self._reconnect_lock = Lock()
# Cache "exported" sessions as 'dc_id: Session' not to recreate
# them all the time since generating a new key is a relatively
# expensive operation.
@ -112,7 +104,7 @@ class TelegramBareClient:
# This member will process updates if enabled.
# One may change self.updates.enabled at any later point.
self.updates = UpdateState(workers=update_workers)
self.updates = UpdateState(workers=None)
# Used on connection - the user may modify these and reconnect
kwargs['app_version'] = kwargs.get('app_version', self.__version__)
@ -136,24 +128,10 @@ class TelegramBareClient:
# Uploaded files cache so subsequent calls are instant
self._upload_cache = {}
# Constantly read for results and updates from within the main client,
# if the user has left enabled such option.
self._spawn_read_thread = spawn_read_thread
self._recv_thread = None
# Identifier of the main thread (the one that called .connect()).
# This will be used to create new connections from any other thread,
# so that requests can be sent in parallel.
self._main_thread_ident = None
# Default PingRequest delay
self._last_ping = datetime.now()
self._ping_delay = timedelta(minutes=1)
# Some errors are known but there's nothing we can do from the
# background thread. If any of these happens, call .disconnect(),
# and raise them next time .invoke() is tried to be called.
self._background_error = None
# endregion
@ -179,9 +157,6 @@ class TelegramBareClient:
If '_cdn' is False, methods that are not allowed on such data
centers won't be invoked.
"""
self._main_thread_ident = threading.get_ident()
self._background_error = None # Clear previous errors
try:
self._sender.connect()
if not self.session.auth_key:
@ -268,20 +243,9 @@ class TelegramBareClient:
return result
def disconnect(self):
"""Disconnects from the Telegram server
and stops all the spawned threads"""
"""Disconnects from the Telegram server"""
self._user_connected = False
self._recv_thread = None
# Stop the workers from the background thread
self.updates.stop_workers()
# This will trigger a "ConnectionResetError", for subsequent calls
# to read or send (from another thread) and usually, the background
# thread would try restarting the connection but since the
# ._recv_thread = None, it knows it doesn't have to.
self._sender.disconnect()
# TODO Shall we clear the _exported_sessions, or may be reused?
pass
@ -296,12 +260,7 @@ class TelegramBareClient:
"""
if new_dc is None:
# Assume we are disconnected due to some error, so connect again
with self._reconnect_lock:
# Another thread may have connected again, so check that first
if not self.is_connected():
return self.connect()
else:
return True
else:
self.disconnect()
self.session.auth_key = None # Force creating new auth_key
@ -316,10 +275,6 @@ class TelegramBareClient:
# region Working with different connections/Data Centers
def _on_read_thread(self):
return self._recv_thread is not None and \
threading.get_ident() == self._recv_thread.ident
def _get_dc(self, dc_id, ipv6=False, cdn=False):
"""Gets the Data Center (DC) associated to 'dc_id'"""
if TelegramBareClient._dc_options is None:
@ -424,26 +379,12 @@ class TelegramBareClient:
x.content_related for x in requests):
raise ValueError('You can only invoke requests, not types!')
# Determine the sender to be used (main or a new connection)
on_main_thread = threading.get_ident() == self._main_thread_ident
if on_main_thread or self._on_read_thread():
sender = self._sender
else:
sender = self._sender.clone()
sender.connect()
# TODO Determine the sender to be used (main or a new connection)
sender = self._sender # .clone(), .connect()
# We should call receive from this thread if there's no background
# thread reading or if the server disconnected us and we're trying
# to reconnect. This is because the read thread may either be
# locked also trying to reconnect or we may be said thread already.
call_receive = not on_main_thread or self._recv_thread is None \
or self._reconnect_lock.locked()
try:
for _ in range(retries):
if self._background_error and on_main_thread:
raise self._background_error
result = self._invoke(sender, call_receive, *requests)
result = self._invoke(sender, *requests)
if result:
return result
@ -455,7 +396,7 @@ class TelegramBareClient:
# Let people use client.invoke(SomeRequest()) instead client(...)
invoke = __call__
def _invoke(self, sender, call_receive, *requests):
def _invoke(self, sender, *requests):
try:
# Ensure that we start with no previous errors (i.e. resending)
for x in requests:
@ -463,17 +404,6 @@ class TelegramBareClient:
x.rpc_error = None
sender.send(*requests)
if not call_receive:
# TODO This will be slightly troublesome if we allow
# switching between constant read or not on the fly.
# Must also watch out for calling .read() from two places,
# in which case a Lock would be required for .receive().
for x in requests:
x.confirm_received.wait(
sender.connection.get_timeout()
)
else:
while not all(x.confirm_received.is_set() for x in requests):
sender.receive(update_state=self.updates)
@ -481,9 +411,8 @@ class TelegramBareClient:
pass # We will just retry
except ConnectionResetError:
if not self._authorized or self._reconnect_lock.locked():
# Only attempt reconnecting if we're authorized and not
# reconnecting already.
if not self._authorized:
# Only attempt reconnecting if we're authorized
raise
self._logger.debug('Server disconnected us. Reconnecting and '
@ -520,12 +449,8 @@ class TelegramBareClient:
'attempting to reconnect at DC {}'.format(e.new_dc)
)
# TODO What happens with the background thread here?
# For normal use cases, this won't happen, because this will only
# be on the very first connection (not authorized, not running),
# but may be an issue for people who actually travel?
self._reconnect(new_dc=e.new_dc)
return self._invoke(sender, call_receive, *requests)
return self._invoke(sender, *requests)
except ServerError as e:
# Telegram is having some issues, just retry
@ -759,64 +684,6 @@ class TelegramBareClient:
def _set_connected_and_authorized(self):
self._authorized = True
self.updates.setup_workers()
if self._spawn_read_thread and self._recv_thread is None:
self._recv_thread = threading.Thread(
name='ReadThread', daemon=True,
target=self._recv_thread_impl
)
self._recv_thread.start()
# By using this approach, another thread will be
# created and started upon connection to constantly read
# from the other end. Otherwise, manual calls to .receive()
# must be performed. The MtProtoSender cannot be connected,
# or an error will be thrown.
#
# This way, sending and receiving will be completely independent.
def _recv_thread_impl(self):
while self._user_connected:
try:
if datetime.now() > self._last_ping + self._ping_delay:
self._sender.send(PingRequest(
int.from_bytes(os.urandom(8), 'big', signed=True)
))
self._last_ping = datetime.now()
self._sender.receive(update_state=self.updates)
except TimeoutError:
# No problem.
pass
except ConnectionResetError:
self._logger.debug('Server disconnected us. Reconnecting...')
while self._user_connected and not self._reconnect():
sleep(0.1) # Retry forever, this is instant messaging
except Exception as error:
# Unknown exception, pass it to the main thread
self._logger.debug(
'[ERROR] Unknown error on the read thread, please report',
error
)
try:
import socks
if isinstance(error, socks.GeneralProxyError):
# This is a known error, and it's not related to
# Telegram but rather to the proxy. Disconnect and
# hand it over to the main thread.
self._background_error = error
self.disconnect()
break
except ImportError:
"Not using PySocks, so it can't be a socket error"
# If something strange happens we don't want to enter an
# infinite loop where all we do is raise an exception, so
# add a little sleep to avoid the CPU usage going mad.
sleep(0.1)
break
self._recv_thread = None
# TODO self.updates.setup_workers()
# endregion

View File

@ -52,21 +52,14 @@ from .tl.types.messages import DialogsSlice
class TelegramClient(TelegramBareClient):
"""Full featured TelegramClient meant to extend the basic functionality -
As opposed to the TelegramBareClient, this one features downloading
media from different data centers, starting a second thread to
handle updates, and some very common functionality.
"""
"""Full featured TelegramClient meant to extend the basic functionality"""
# region Initialization
def __init__(self, session, api_id, api_hash,
connection_mode=ConnectionMode.TCP_FULL,
proxy=None,
update_workers=None,
timeout=timedelta(seconds=5),
spawn_read_thread=True,
**kwargs):
"""Initializes the Telegram client with the specified API ID and Hash.
@ -79,22 +72,6 @@ class TelegramClient(TelegramBareClient):
This will only affect how messages are sent over the network
and how much processing is required before sending them.
The integer 'update_workers' represents depending on its value:
is None: Updates will *not* be stored in memory.
= 0: Another thread is responsible for calling self.updates.poll()
> 0: 'update_workers' background threads will be spawned, any
any of them will invoke all the self.updates.handlers.
If 'spawn_read_thread', a background thread will be started once
an authorized user has been logged in to Telegram to read items
(such as updates and responses) from the network as soon as they
occur, which will speed things up.
If you don't want to spawn any additional threads, pending updates
will be read and processed accordingly after invoking a request
and not immediately. This is useful if you don't care about updates
at all and have set 'update_workers=None'.
If more named arguments are provided as **kwargs, they will be
used to update the Session instance. Most common settings are:
device_model = platform.node()
@ -108,8 +85,6 @@ class TelegramClient(TelegramBareClient):
session, api_id, api_hash,
connection_mode=connection_mode,
proxy=proxy,
update_workers=update_workers,
spawn_read_thread=spawn_read_thread,
timeout=timeout,
**kwargs
)

View File

@ -1,5 +1,3 @@
from threading import Lock
import re
from .. import utils
@ -20,8 +18,6 @@ class EntityDatabase:
"""
self.enabled = enabled
self.enabled_full = enabled_full
self._lock = Lock()
self._entities = {} # marked_id: user|chat|channel
if input_list:
@ -81,7 +77,6 @@ class EntityDatabase:
except ValueError:
pass
with self._lock:
before = len(self._input_entities)
self._input_entities.update(new_input)
for e in new:

View File

@ -4,7 +4,6 @@ import platform
import time
from base64 import b64encode, b64decode
from os.path import isfile as file_exists
from threading import Lock
from .entity_database import EntityDatabase
from .. import helpers
@ -51,11 +50,6 @@ class Session:
self.report_errors = True
self.save_entities = True
# Cross-thread safety
self._seq_no_lock = Lock()
self._msg_id_lock = Lock()
self._save_lock = Lock()
self.id = helpers.generate_random_long(signed=False)
self._sequence = 0
self.time_offset = 0
@ -71,10 +65,9 @@ class Session:
def save(self):
"""Saves the current session object as session_user_id.session"""
if not self.session_user_id or self._save_lock.locked():
if not self.session_user_id:
return
with self._save_lock:
with open('{}.session'.format(self.session_user_id), 'w') as file:
out_dict = {
'port': self.port,
@ -149,7 +142,6 @@ class Session:
Note that if confirmed=True, the sequence number
will be increased by one too
"""
with self._seq_no_lock:
if content_related:
result = self._sequence * 2 + 1
self._sequence += 1
@ -166,7 +158,6 @@ class Session:
# "message identifiers are divisible by 4"
new_msg_id = (int(now) << 32) | (nanoseconds << 2)
with self._msg_id_lock:
if self._last_msg_id >= new_msg_id:
new_msg_id = self._last_msg_id + 4