mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2025-10-24 12:41:02 +03:00
Actually use the new connection class
This commit is contained in:
parent
daf94e416b
commit
5daad2aaab
|
@ -1,6 +1,5 @@
|
||||||
import abc
|
import abc
|
||||||
import asyncio
|
import asyncio
|
||||||
import collections
|
|
||||||
import inspect
|
import inspect
|
||||||
import logging
|
import logging
|
||||||
import platform
|
import platform
|
||||||
|
@ -54,7 +53,7 @@ class TelegramBaseClient(abc.ABC):
|
||||||
|
|
||||||
connection (`telethon.network.connection.common.Connection`, optional):
|
connection (`telethon.network.connection.common.Connection`, optional):
|
||||||
The connection instance to be used when creating a new connection
|
The connection instance to be used when creating a new connection
|
||||||
to the servers. If it's a type, the `proxy` argument will be used.
|
to the servers. It **must** be a type.
|
||||||
|
|
||||||
Defaults to `telethon.network.connection.tcpfull.ConnectionTcpFull`.
|
Defaults to `telethon.network.connection.tcpfull.ConnectionTcpFull`.
|
||||||
|
|
||||||
|
@ -206,9 +205,8 @@ class TelegramBaseClient(abc.ABC):
|
||||||
self._connection_retries = connection_retries or sys.maxsize
|
self._connection_retries = connection_retries or sys.maxsize
|
||||||
self._auto_reconnect = auto_reconnect
|
self._auto_reconnect = auto_reconnect
|
||||||
|
|
||||||
if isinstance(connection, type):
|
assert isinstance(connection, type)
|
||||||
connection = connection(
|
self._connection = connection
|
||||||
proxy=proxy, timeout=timeout, loop=self._loop)
|
|
||||||
|
|
||||||
# Used on connection. Capture the variables in a lambda since
|
# Used on connection. Capture the variables in a lambda since
|
||||||
# exporting clients need to create this InvokeWithLayerRequest.
|
# exporting clients need to create this InvokeWithLayerRequest.
|
||||||
|
@ -229,7 +227,7 @@ class TelegramBaseClient(abc.ABC):
|
||||||
state = MTProtoState(self.session.auth_key)
|
state = MTProtoState(self.session.auth_key)
|
||||||
self._connection = connection
|
self._connection = connection
|
||||||
self._sender = MTProtoSender(
|
self._sender = MTProtoSender(
|
||||||
state, connection, self._loop,
|
state, self._loop,
|
||||||
retries=self._connection_retries,
|
retries=self._connection_retries,
|
||||||
auto_reconnect=self._auto_reconnect,
|
auto_reconnect=self._auto_reconnect,
|
||||||
update_callback=self._handle_update,
|
update_callback=self._handle_update,
|
||||||
|
@ -308,8 +306,8 @@ class TelegramBaseClient(abc.ABC):
|
||||||
"""
|
"""
|
||||||
Connects to Telegram.
|
Connects to Telegram.
|
||||||
"""
|
"""
|
||||||
await self._sender.connect(
|
await self._sender.connect(self._connection(
|
||||||
self.session.server_address, self.session.port)
|
self.session.server_address, self.session.port, loop=self._loop))
|
||||||
|
|
||||||
await self._sender.send(self._init_with(
|
await self._sender.send(self._init_with(
|
||||||
functions.help.GetConfigRequest()))
|
functions.help.GetConfigRequest()))
|
||||||
|
@ -420,8 +418,9 @@ class TelegramBaseClient(abc.ABC):
|
||||||
#
|
#
|
||||||
# If one were to do that, Telegram would reset the connection
|
# If one were to do that, Telegram would reset the connection
|
||||||
# with no further clues.
|
# with no further clues.
|
||||||
sender = MTProtoSender(state, self._connection.clone(), self._loop)
|
sender = MTProtoSender(state, self._loop)
|
||||||
await sender.connect(dc.ip_address, dc.port)
|
await sender.connect(self._connection(
|
||||||
|
dc.ip_address, dc.port, loop=self._loop))
|
||||||
__log__.info('Exporting authorization for data center %s', dc)
|
__log__.info('Exporting authorization for data center %s', dc)
|
||||||
auth = await self(functions.auth.ExportAuthorizationRequest(dc_id))
|
auth = await self(functions.auth.ExportAuthorizationRequest(dc_id))
|
||||||
req = self._init_with(functions.auth.ImportAuthorizationRequest(
|
req = self._init_with(functions.auth.ImportAuthorizationRequest(
|
||||||
|
|
|
@ -40,14 +40,12 @@ class MTProtoSender:
|
||||||
A new authorization key will be generated on connection if no other
|
A new authorization key will be generated on connection if no other
|
||||||
key exists yet.
|
key exists yet.
|
||||||
"""
|
"""
|
||||||
def __init__(self, state, connection, loop, *,
|
def __init__(self, state, loop, *,
|
||||||
retries=5, auto_reconnect=True, update_callback=None,
|
retries=5, auto_reconnect=True, update_callback=None,
|
||||||
auth_key_callback=None, auto_reconnect_callback=None):
|
auth_key_callback=None, auto_reconnect_callback=None):
|
||||||
self.state = state
|
self.state = state
|
||||||
self._connection = connection
|
self._connection = None
|
||||||
self._loop = loop
|
self._loop = loop
|
||||||
self._ip = None
|
|
||||||
self._port = None
|
|
||||||
self._retries = retries
|
self._retries = retries
|
||||||
self._auto_reconnect = auto_reconnect
|
self._auto_reconnect = auto_reconnect
|
||||||
self._update_callback = update_callback
|
self._update_callback = update_callback
|
||||||
|
@ -110,7 +108,7 @@ class MTProtoSender:
|
||||||
|
|
||||||
# Public API
|
# Public API
|
||||||
|
|
||||||
async def connect(self, ip, port):
|
async def connect(self, connection):
|
||||||
"""
|
"""
|
||||||
Connects to the specified ``ip:port``, and generates a new
|
Connects to the specified ``ip:port``, and generates a new
|
||||||
authorization key for the `MTProtoSender.session` if it does
|
authorization key for the `MTProtoSender.session` if it does
|
||||||
|
@ -120,8 +118,7 @@ class MTProtoSender:
|
||||||
__log__.info('User is already connected!')
|
__log__.info('User is already connected!')
|
||||||
return
|
return
|
||||||
|
|
||||||
self._ip = ip
|
self._connection = connection
|
||||||
self._port = port
|
|
||||||
self._user_connected = True
|
self._user_connected = True
|
||||||
await self._connect()
|
await self._connect()
|
||||||
|
|
||||||
|
@ -140,11 +137,11 @@ class MTProtoSender:
|
||||||
await self._disconnect()
|
await self._disconnect()
|
||||||
|
|
||||||
async def _disconnect(self, error=None):
|
async def _disconnect(self, error=None):
|
||||||
__log__.info('Disconnecting from {}...'.format(self._ip))
|
__log__.info('Disconnecting from %s...', self._connection._ip)
|
||||||
self._user_connected = False
|
self._user_connected = False
|
||||||
try:
|
try:
|
||||||
__log__.debug('Closing current connection...')
|
__log__.debug('Closing current connection...')
|
||||||
await self._connection.close()
|
self._connection.disconnect()
|
||||||
finally:
|
finally:
|
||||||
__log__.debug('Cancelling {} pending message(s)...'
|
__log__.debug('Cancelling {} pending message(s)...'
|
||||||
.format(len(self._pending_messages)))
|
.format(len(self._pending_messages)))
|
||||||
|
@ -166,7 +163,7 @@ class MTProtoSender:
|
||||||
__log__.debug('Cancelling the receive loop...')
|
__log__.debug('Cancelling the receive loop...')
|
||||||
self._recv_loop_handle.cancel()
|
self._recv_loop_handle.cancel()
|
||||||
|
|
||||||
__log__.info('Disconnection from {} complete!'.format(self._ip))
|
__log__.info('Disconnection from %s complete!', self._connection._ip)
|
||||||
if self._disconnected and not self._disconnected.done():
|
if self._disconnected and not self._disconnected.done():
|
||||||
if error:
|
if error:
|
||||||
self._disconnected.set_exception(error)
|
self._disconnected.set_exception(error)
|
||||||
|
@ -238,11 +235,12 @@ class MTProtoSender:
|
||||||
authorization key if necessary, and starting the send and
|
authorization key if necessary, and starting the send and
|
||||||
receive loops.
|
receive loops.
|
||||||
"""
|
"""
|
||||||
__log__.info('Connecting to {}:{}...'.format(self._ip, self._port))
|
__log__.info('Connecting to %s:%d...',
|
||||||
|
self._connection._ip, self._connection._port)
|
||||||
for retry in range(1, self._retries + 1):
|
for retry in range(1, self._retries + 1):
|
||||||
try:
|
try:
|
||||||
__log__.debug('Connection attempt {}...'.format(retry))
|
__log__.debug('Connection attempt {}...'.format(retry))
|
||||||
await self._connection.connect(self._ip, self._port)
|
await self._connection.connect()
|
||||||
except (asyncio.TimeoutError, OSError) as e:
|
except (asyncio.TimeoutError, OSError) as e:
|
||||||
__log__.warning('Attempt {} at connecting failed: {}: {}'
|
__log__.warning('Attempt {} at connecting failed: {}: {}'
|
||||||
.format(retry, type(e).__name__, e))
|
.format(retry, type(e).__name__, e))
|
||||||
|
@ -283,7 +281,7 @@ class MTProtoSender:
|
||||||
# First connection or manual reconnection after a failure
|
# First connection or manual reconnection after a failure
|
||||||
if self._disconnected is None or self._disconnected.done():
|
if self._disconnected is None or self._disconnected.done():
|
||||||
self._disconnected = self._loop.create_future()
|
self._disconnected = self._loop.create_future()
|
||||||
__log__.info('Connection to {} complete!'.format(self._ip))
|
__log__.info('Connection to %s complete!', self._connection._ip)
|
||||||
|
|
||||||
async def _reconnect(self):
|
async def _reconnect(self):
|
||||||
"""
|
"""
|
||||||
|
@ -299,7 +297,7 @@ class MTProtoSender:
|
||||||
await self._recv_loop_handle
|
await self._recv_loop_handle
|
||||||
|
|
||||||
__log__.debug('Closing current connection...')
|
__log__.debug('Closing current connection...')
|
||||||
await self._connection.close()
|
self._connection.disconnect()
|
||||||
|
|
||||||
self._reconnecting = False
|
self._reconnecting = False
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user