mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2025-02-03 13:14:31 +03:00
Debug level should always be used for logging since it's a library
This commit is contained in:
parent
eab44af4c0
commit
1f7ac71187
94
telethon/extensions/threaded_tcp_client.py
Normal file
94
telethon/extensions/threaded_tcp_client.py
Normal file
|
@ -0,0 +1,94 @@
|
||||||
|
import socket
|
||||||
|
import time
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from io import BytesIO, BufferedWriter
|
||||||
|
from threading import Event, Lock, Thread, Condition
|
||||||
|
|
||||||
|
from ..errors import ReadCancelledError
|
||||||
|
|
||||||
|
|
||||||
|
class ThreadedTcpClient:
|
||||||
|
"""The main difference with the TcpClient class is that this one
|
||||||
|
will spawn a secondary thread that will be constantly reading
|
||||||
|
from the network and putting everything on another buffer.
|
||||||
|
"""
|
||||||
|
def __init__(self, proxy=None):
|
||||||
|
self.connected = False
|
||||||
|
self._proxy = proxy
|
||||||
|
self._recreate_socket()
|
||||||
|
|
||||||
|
# Support for multi-threading advantages and safety
|
||||||
|
self.cancelled = Event() # Has the read operation been cancelled?
|
||||||
|
self.delay = 0.1 # Read delay when there was no data available
|
||||||
|
self._lock = Lock()
|
||||||
|
|
||||||
|
self._buffer = []
|
||||||
|
self._read_thread = Thread(target=self._reading_thread, daemon=True)
|
||||||
|
self._cv = Condition() # Condition Variable
|
||||||
|
|
||||||
|
def _recreate_socket(self):
|
||||||
|
if self._proxy is None:
|
||||||
|
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
else:
|
||||||
|
import socks
|
||||||
|
self._socket = socks.socksocket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
if type(self._proxy) is dict:
|
||||||
|
self._socket.set_proxy(**self._proxy)
|
||||||
|
else: # tuple, list, etc.
|
||||||
|
self._socket.set_proxy(*self._proxy)
|
||||||
|
|
||||||
|
def connect(self, ip, port, timeout):
|
||||||
|
"""Connects to the specified IP and port number.
|
||||||
|
'timeout' must be given in seconds
|
||||||
|
"""
|
||||||
|
if not self.connected:
|
||||||
|
self._socket.settimeout(timeout)
|
||||||
|
self._socket.connect((ip, port))
|
||||||
|
self._socket.setblocking(False)
|
||||||
|
self.connected = True
|
||||||
|
|
||||||
|
def close(self):
|
||||||
|
"""Closes the connection"""
|
||||||
|
if self.connected:
|
||||||
|
self._socket.shutdown(socket.SHUT_RDWR)
|
||||||
|
self._socket.close()
|
||||||
|
self.connected = False
|
||||||
|
self._recreate_socket()
|
||||||
|
|
||||||
|
def write(self, data):
|
||||||
|
"""Writes (sends) the specified bytes to the connected peer"""
|
||||||
|
self._socket.sendall(data)
|
||||||
|
|
||||||
|
def read(self, size, timeout=timedelta(seconds=5)):
|
||||||
|
"""Reads (receives) a whole block of 'size bytes
|
||||||
|
from the connected peer.
|
||||||
|
|
||||||
|
A timeout can be specified, which will cancel the operation if
|
||||||
|
no data has been read in the specified time. If data was read
|
||||||
|
and it's waiting for more, the timeout will NOT cancel the
|
||||||
|
operation. Set to None for no timeout
|
||||||
|
"""
|
||||||
|
with self._cv:
|
||||||
|
print('wait for...')
|
||||||
|
self._cv.wait_for(lambda: len(self._buffer) >= size, timeout=timeout.seconds)
|
||||||
|
print('got', size)
|
||||||
|
result, self._buffer = self._buffer[:size], self._buffer[size:]
|
||||||
|
return result
|
||||||
|
|
||||||
|
def _reading_thread(self):
|
||||||
|
while True:
|
||||||
|
partial = self._socket.recv(4096)
|
||||||
|
if len(partial) == 0:
|
||||||
|
self.connected = False
|
||||||
|
raise ConnectionResetError(
|
||||||
|
'The server has closed the connection.')
|
||||||
|
|
||||||
|
with self._cv:
|
||||||
|
print('extended', len(partial))
|
||||||
|
self._buffer.extend(partial)
|
||||||
|
self._cv.notify()
|
||||||
|
|
||||||
|
def cancel_read(self):
|
||||||
|
"""Cancels the read operation IF it hasn't yet
|
||||||
|
started, raising a ReadCancelledError"""
|
||||||
|
self.cancelled.set()
|
|
@ -100,7 +100,7 @@ class MtProtoSender:
|
||||||
# or, if there is no request, until we read an update
|
# or, if there is no request, until we read an update
|
||||||
while (request and not request.confirm_received) or \
|
while (request and not request.confirm_received) or \
|
||||||
(not request and not updates):
|
(not request and not updates):
|
||||||
self._logger.info('Trying to .receive() the request result...')
|
self._logger.debug('Trying to .receive() the request result...')
|
||||||
seq, body = self.transport.receive(**kwargs)
|
seq, body = self.transport.receive(**kwargs)
|
||||||
message, remote_msg_id, remote_seq = self._decode_msg(body)
|
message, remote_msg_id, remote_seq = self._decode_msg(body)
|
||||||
|
|
||||||
|
@ -114,7 +114,7 @@ class MtProtoSender:
|
||||||
self._pending_receive.remove(request)
|
self._pending_receive.remove(request)
|
||||||
except ValueError: pass
|
except ValueError: pass
|
||||||
|
|
||||||
self._logger.info('Request result received')
|
self._logger.debug('Request result received')
|
||||||
self._logger.debug('receive() released the lock')
|
self._logger.debug('receive() released the lock')
|
||||||
|
|
||||||
def receive_updates(self, **kwargs):
|
def receive_updates(self, **kwargs):
|
||||||
|
@ -226,10 +226,10 @@ class MtProtoSender:
|
||||||
ack = reader.tgread_object()
|
ack = reader.tgread_object()
|
||||||
for r in self._pending_receive:
|
for r in self._pending_receive:
|
||||||
if r.request_msg_id in ack.msg_ids:
|
if r.request_msg_id in ack.msg_ids:
|
||||||
self._logger.warning('Ack found for the a request')
|
self._logger.debug('Ack found for the a request')
|
||||||
|
|
||||||
if self.logging_out:
|
if self.logging_out:
|
||||||
self._logger.info('Message ack confirmed a request')
|
self._logger.debug('Message ack confirmed a request')
|
||||||
r.confirm_received = True
|
r.confirm_received = True
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
@ -247,7 +247,7 @@ class MtProtoSender:
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
self._logger.warning('Unknown message: {}'.format(hex(code)))
|
self._logger.debug('Unknown message: {}'.format(hex(code)))
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# endregion
|
# endregion
|
||||||
|
@ -263,7 +263,7 @@ class MtProtoSender:
|
||||||
request = next(r for r in self._pending_receive
|
request = next(r for r in self._pending_receive
|
||||||
if r.request_msg_id == received_msg_id)
|
if r.request_msg_id == received_msg_id)
|
||||||
|
|
||||||
self._logger.warning('Pong confirmed a request')
|
self._logger.debug('Pong confirmed a request')
|
||||||
request.confirm_received = True
|
request.confirm_received = True
|
||||||
except StopIteration: pass
|
except StopIteration: pass
|
||||||
|
|
||||||
|
@ -318,8 +318,8 @@ class MtProtoSender:
|
||||||
# Use the current msg_id to determine the right time offset.
|
# Use the current msg_id to determine the right time offset.
|
||||||
self.session.update_time_offset(correct_msg_id=msg_id)
|
self.session.update_time_offset(correct_msg_id=msg_id)
|
||||||
self.session.save()
|
self.session.save()
|
||||||
self._logger.warning('Read Bad Message error: ' + str(error))
|
self._logger.debug('Read Bad Message error: ' + str(error))
|
||||||
self._logger.info('Attempting to use the correct time offset.')
|
self._logger.debug('Attempting to use the correct time offset.')
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
raise error
|
raise error
|
||||||
|
@ -346,7 +346,7 @@ class MtProtoSender:
|
||||||
self._need_confirmation.append(request_id)
|
self._need_confirmation.append(request_id)
|
||||||
self._send_acknowledges()
|
self._send_acknowledges()
|
||||||
|
|
||||||
self._logger.warning('Read RPC error: %s', str(error))
|
self._logger.debug('Read RPC error: %s', str(error))
|
||||||
if isinstance(error, InvalidDCError):
|
if isinstance(error, InvalidDCError):
|
||||||
# Must resend this request, if any
|
# Must resend this request, if any
|
||||||
if request:
|
if request:
|
||||||
|
@ -368,7 +368,7 @@ class MtProtoSender:
|
||||||
else:
|
else:
|
||||||
# If it's really a result for RPC from previous connection
|
# If it's really a result for RPC from previous connection
|
||||||
# session, it will be skipped by the handle_container()
|
# session, it will be skipped by the handle_container()
|
||||||
self._logger.warning('Lost request will be skipped.')
|
self._logger.debug('Lost request will be skipped.')
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def _handle_gzip_packed(self, msg_id, sequence, reader, updates):
|
def _handle_gzip_packed(self, msg_id, sequence, reader, updates):
|
||||||
|
|
|
@ -90,7 +90,7 @@ class TelegramBareClient:
|
||||||
determine the authorization key for the current session.
|
determine the authorization key for the current session.
|
||||||
"""
|
"""
|
||||||
if self._sender and self._sender.is_connected():
|
if self._sender and self._sender.is_connected():
|
||||||
self._logger.warning(
|
self._logger.debug(
|
||||||
'Attempted to connect when the client was already connected.'
|
'Attempted to connect when the client was already connected.'
|
||||||
)
|
)
|
||||||
return
|
return
|
||||||
|
@ -143,7 +143,7 @@ class TelegramBareClient:
|
||||||
except (RPCError, ConnectionError) as error:
|
except (RPCError, ConnectionError) as error:
|
||||||
# Probably errors from the previous session, ignore them
|
# Probably errors from the previous session, ignore them
|
||||||
self.disconnect()
|
self.disconnect()
|
||||||
self._logger.warning('Could not stabilise initial connection: {}'
|
self._logger.debug('Could not stabilise initial connection: {}'
|
||||||
.format(error))
|
.format(error))
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
@ -277,7 +277,7 @@ class TelegramBareClient:
|
||||||
return request.result
|
return request.result
|
||||||
|
|
||||||
except ConnectionResetError:
|
except ConnectionResetError:
|
||||||
self._logger.info('Server disconnected us. Reconnecting and '
|
self._logger.debug('Server disconnected us. Reconnecting and '
|
||||||
'resending request...')
|
'resending request...')
|
||||||
self.reconnect()
|
self.reconnect()
|
||||||
return self.invoke(request)
|
return self.invoke(request)
|
||||||
|
|
|
@ -214,7 +214,7 @@ class TelegramClient(TelegramBareClient):
|
||||||
return result
|
return result
|
||||||
|
|
||||||
except (PhoneMigrateError, NetworkMigrateError, UserMigrateError) as e:
|
except (PhoneMigrateError, NetworkMigrateError, UserMigrateError) as e:
|
||||||
self._logger.info('DC error when invoking request, '
|
self._logger.debug('DC error when invoking request, '
|
||||||
'attempting to reconnect at DC {}'
|
'attempting to reconnect at DC {}'
|
||||||
.format(e.new_dc))
|
.format(e.new_dc))
|
||||||
|
|
||||||
|
@ -698,7 +698,7 @@ class TelegramClient(TelegramBareClient):
|
||||||
return
|
return
|
||||||
|
|
||||||
# Different state, update the saved value and behave as required
|
# Different state, update the saved value and behave as required
|
||||||
self._logger.info('Changing updates thread running status to %s', running)
|
self._logger.debug('Changing updates thread running status to %s', running)
|
||||||
if running:
|
if running:
|
||||||
self._updates_thread_running.set()
|
self._updates_thread_running.set()
|
||||||
if not self._updates_thread:
|
if not self._updates_thread:
|
||||||
|
@ -739,7 +739,7 @@ class TelegramClient(TelegramBareClient):
|
||||||
updates = self._sender.receive_updates(timeout=timeout)
|
updates = self._sender.receive_updates(timeout=timeout)
|
||||||
|
|
||||||
self._updates_thread_receiving.clear()
|
self._updates_thread_receiving.clear()
|
||||||
self._logger.info(
|
self._logger.debug(
|
||||||
'Received {} update(s) from the updates thread'
|
'Received {} update(s) from the updates thread'
|
||||||
.format(len(updates))
|
.format(len(updates))
|
||||||
)
|
)
|
||||||
|
@ -748,25 +748,25 @@ class TelegramClient(TelegramBareClient):
|
||||||
handler(update)
|
handler(update)
|
||||||
|
|
||||||
except ConnectionResetError:
|
except ConnectionResetError:
|
||||||
self._logger.info('Server disconnected us. Reconnecting...')
|
self._logger.debug('Server disconnected us. Reconnecting...')
|
||||||
self.reconnect()
|
self.reconnect()
|
||||||
|
|
||||||
except TimeoutError:
|
except TimeoutError:
|
||||||
self._logger.debug('Receiving updates timed out')
|
self._logger.debug('Receiving updates timed out')
|
||||||
|
|
||||||
except ReadCancelledError:
|
except ReadCancelledError:
|
||||||
self._logger.info('Receiving updates cancelled')
|
self._logger.debug('Receiving updates cancelled')
|
||||||
|
|
||||||
except BrokenPipeError:
|
except BrokenPipeError:
|
||||||
self._logger.info('Tcp session is broken. Reconnecting...')
|
self._logger.debug('Tcp session is broken. Reconnecting...')
|
||||||
self.reconnect()
|
self.reconnect()
|
||||||
|
|
||||||
except InvalidChecksumError:
|
except InvalidChecksumError:
|
||||||
self._logger.info('MTProto session is broken. Reconnecting...')
|
self._logger.debug('MTProto session is broken. Reconnecting...')
|
||||||
self.reconnect()
|
self.reconnect()
|
||||||
|
|
||||||
except OSError:
|
except OSError:
|
||||||
self._logger.warning('OSError on updates thread, %s logging out',
|
self._logger.debug('OSError on updates thread, %s logging out',
|
||||||
'was' if self._sender.logging_out else 'was not')
|
'was' if self._sender.logging_out else 'was not')
|
||||||
|
|
||||||
if self._sender.logging_out:
|
if self._sender.logging_out:
|
||||||
|
|
Loading…
Reference in New Issue
Block a user