mirror of
https://github.com/LonamiWebs/Telethon.git
synced 2025-08-03 11:40:11 +03:00
Compare commits
112 Commits
v1
...
asyncio-st
Author | SHA1 | Date | |
---|---|---|---|
|
85103bcf6d | ||
|
43a0226b33 | ||
|
097acd874b | ||
|
f5a7a8da45 | ||
|
cb75092ba1 | ||
|
85089353f2 | ||
|
aba478789c | ||
|
aa80f92807 | ||
|
02fcaaa78c | ||
|
1aaafc9a43 | ||
|
98449bb32f | ||
|
3df90307a7 | ||
|
ed8c123b9a | ||
|
6130b8918d | ||
|
95eac6c151 | ||
|
b09d91b9ef | ||
|
3171efafcb | ||
|
1b76c1fc7b | ||
|
d20dc01afa | ||
|
c0fa2ae620 | ||
|
2cc0e17c7e | ||
|
fbea963230 | ||
|
fe299cc6cc | ||
|
97f3dd809b | ||
|
01a594ca5d | ||
|
cd410d7fd7 | ||
|
c0b61f3a63 | ||
|
dffbd2d689 | ||
|
2691872503 | ||
|
41f0e0c0a8 | ||
|
ae5a265ca1 | ||
|
52042d4a1b | ||
|
9e7cbb0b09 | ||
|
04a68f12cc | ||
|
4735392cf9 | ||
|
096b2a6f7a | ||
|
8260a13824 | ||
|
b7ae612246 | ||
|
aae6a26654 | ||
|
d980e947cf | ||
|
946dd69211 | ||
|
597433075e | ||
|
06af73ed65 | ||
|
b8030959b6 | ||
|
abcd09e7d0 | ||
|
1eb418e1ab | ||
|
ddf36c9cb0 | ||
|
2ee5201229 | ||
|
8bf140ca74 | ||
|
5cb3a9af36 | ||
|
8b0580901a | ||
|
d4e1f13e41 | ||
|
236fccea7f | ||
|
e505fc1711 | ||
|
bcd5f8d4a1 | ||
|
1047e9c3d5 | ||
|
fd602dfd81 | ||
|
a828e9d155 | ||
|
48ac6daef5 | ||
|
69970b5b20 | ||
|
cde314fc21 | ||
|
d5c5c3cff1 | ||
|
563d731c95 | ||
|
4432a2d14e | ||
|
784c2e9ed1 | ||
|
731a2956df | ||
|
c2fba26ad9 | ||
|
9054a12c11 | ||
|
7998fd59f7 | ||
|
6e854325a8 | ||
|
2e953dab50 | ||
|
a6c6bc73eb | ||
|
7da092894b | ||
|
d8376ee50d | ||
|
50515aa528 | ||
|
6cfb829e58 | ||
|
91e5ef852a | ||
|
2b9c06f0e6 | ||
|
a1d497a2c0 | ||
|
2f1d5e277e | ||
|
7c0af2c080 | ||
|
984f483b98 | ||
|
e71831050f | ||
|
e0802d1a2d | ||
|
c67f78eab7 | ||
|
de803a0ace | ||
|
004c92edbe | ||
|
32bca4f1b8 | ||
|
653dd21259 | ||
|
8a287c2860 | ||
|
7f5126c341 | ||
|
2efcfbd416 | ||
|
3111153822 | ||
|
69b3f64d19 | ||
|
25af22f1e7 | ||
|
5e172053da | ||
|
23b5a9d1f6 | ||
|
cb2d943139 | ||
|
6dc0ee9d6c | ||
|
8bd578711c | ||
|
3a7fa249a4 | ||
|
48ec0319d2 | ||
|
ffaa3ac064 | ||
|
30ac6789ce | ||
|
1a0d5e75bf | ||
|
780e0ceddf | ||
|
335bc6a789 | ||
|
e4bcab336b | ||
|
917665852d | ||
|
ef43e2e336 | ||
|
77c99db066 | ||
|
9716d1d543 |
37
README.rst
37
README.rst
|
@ -2,7 +2,8 @@ Telethon
|
|||
========
|
||||
.. epigraph::
|
||||
|
||||
⭐️ Thanks **everyone** who has starred the project, it means a lot!
|
||||
This is the ``asyncio`` version of the library. If you don't know how
|
||||
to work with it, `see here https://pypi.python.org/pypi/Telethon`__.
|
||||
|
||||
**Telethon** is Telegram client implementation in **Python 3** which uses
|
||||
the latest available API of Telegram.
|
||||
|
@ -22,7 +23,12 @@ Installing
|
|||
|
||||
.. code:: sh
|
||||
|
||||
pip3 install telethon
|
||||
pip3 install telethon-aio
|
||||
|
||||
.. warning::
|
||||
|
||||
Be careful **not** to install ``telethon-asyncio`` or other
|
||||
variants, someone else name-squatted those and are unofficial!
|
||||
|
||||
|
||||
Creating a client
|
||||
|
@ -30,6 +36,7 @@ Creating a client
|
|||
|
||||
.. code:: python
|
||||
|
||||
import asyncio
|
||||
from telethon import TelegramClient
|
||||
|
||||
# These example values won't work. You must get your own api_id and
|
||||
|
@ -38,22 +45,28 @@ Creating a client
|
|||
api_hash = '0123456789abcdef0123456789abcdef'
|
||||
|
||||
client = TelegramClient('session_name', api_id, api_hash)
|
||||
client.start()
|
||||
async def main():
|
||||
await client.start()
|
||||
|
||||
asyncio.get_event_loop().run_until_complete(main())
|
||||
|
||||
Doing stuff
|
||||
-----------
|
||||
|
||||
Note that this assumes you're inside an "async def" method. Check out the
|
||||
`Python documentation <https://docs.python.org/3/library/asyncio-dev.html>`_
|
||||
if you're new with ``asyncio``.
|
||||
|
||||
.. code:: python
|
||||
|
||||
print(client.get_me().stringify())
|
||||
print((await client.get_me()).stringify())
|
||||
|
||||
client.send_message('username', 'Hello! Talking to you from Telethon')
|
||||
client.send_file('username', '/home/myself/Pictures/holidays.jpg')
|
||||
await client.send_message('username', 'Hello! Talking to you from Telethon')
|
||||
await client.send_file('username', '/home/myself/Pictures/holidays.jpg')
|
||||
|
||||
client.download_profile_photo('me')
|
||||
messages = client.get_messages('username')
|
||||
client.download_media(messages[0])
|
||||
await client.download_profile_photo('me')
|
||||
messages = await client.get_messages('username')
|
||||
await client.download_media(messages[0])
|
||||
|
||||
|
||||
Next steps
|
||||
|
@ -61,5 +74,7 @@ Next steps
|
|||
|
||||
Do you like how Telethon looks? Check out
|
||||
`Read The Docs <http://telethon.rtfd.io/>`_
|
||||
for a more in-depth explanation, with examples,
|
||||
troubleshooting issues, and more useful information.
|
||||
for a more in-depth explanation, with examples, troubleshooting issues,
|
||||
and more useful information. Note that the examples there are written for
|
||||
the threaded version, not the one using asyncio. However, you just need to
|
||||
await every remote call.
|
||||
|
|
|
@ -67,7 +67,7 @@ Or we call `telethon.telegram_client.TelegramClient.get_input_entity()`:
|
|||
|
||||
.. code-block:: python
|
||||
|
||||
peer = client.get_input_entity('someone')
|
||||
peer = await client.get_input_entity('someone')
|
||||
|
||||
When you're going to invoke an API method, most require you to pass an
|
||||
:tl:`InputUser`, :tl:`InputChat`, or so on, this is why using
|
||||
|
@ -78,7 +78,7 @@ If you also need to have information about the whole user, use
|
|||
|
||||
.. code-block:: python
|
||||
|
||||
entity = client.get_entity('someone')
|
||||
entity = await client.get_entity('someone')
|
||||
|
||||
In the later case, when you use the entity, the library will cast it to
|
||||
its "input" version for you. If you already have the complete user and
|
||||
|
@ -104,7 +104,7 @@ request we do:
|
|||
|
||||
.. code-block:: python
|
||||
|
||||
result = client(SendMessageRequest(peer, 'Hello there!'))
|
||||
result = await client(SendMessageRequest(peer, 'Hello there!'))
|
||||
# __call__ is an alias for client.invoke(request). Both will work
|
||||
|
||||
Message sent! Of course, this is only an example. There are nearly 250
|
||||
|
@ -113,7 +113,8 @@ as you wish. Remember to use the right types! To sum up:
|
|||
|
||||
.. code-block:: python
|
||||
|
||||
result = client(SendMessageRequest(
|
||||
async def method():
|
||||
result = await client(SendMessageRequest(
|
||||
client.get_input_entity('username'), 'Hello there!'
|
||||
))
|
||||
|
||||
|
@ -122,9 +123,9 @@ This can further be simplified to:
|
|||
|
||||
.. code-block:: python
|
||||
|
||||
result = client(SendMessageRequest('username', 'Hello there!'))
|
||||
result = await client(SendMessageRequest('username', 'Hello there!'))
|
||||
# Or even
|
||||
result = client(SendMessageRequest(PeerChannel(id), 'Hello there!'))
|
||||
result = await client(SendMessageRequest(PeerChannel(id), 'Hello there!'))
|
||||
|
||||
|
||||
.. note::
|
||||
|
|
|
@ -4,41 +4,23 @@
|
|||
Update Modes
|
||||
============
|
||||
|
||||
Using ``asyncio`` simplifies the way you can work with updates. The library
|
||||
will always ensure the future of a loop that will poll updates for you, so
|
||||
you can do other things in the mean time.
|
||||
|
||||
The library can run in four distinguishable modes:
|
||||
|
||||
- With no extra threads at all.
|
||||
- With an extra thread that receives everything as soon as possible (default).
|
||||
- With several worker threads that run your update handlers.
|
||||
- A mix of the above.
|
||||
|
||||
Since this section is about updates, we'll describe the simplest way to
|
||||
work with them.
|
||||
|
||||
|
||||
Using multiple workers
|
||||
**********************
|
||||
|
||||
When you create your client, simply pass a number to the
|
||||
``update_workers`` parameter:
|
||||
|
||||
``client = TelegramClient('session', api_id, api_hash, update_workers=2)``
|
||||
|
||||
You can set any amount of workers you want. The more you put, the more
|
||||
update handlers that can be called "at the same time". One or two should
|
||||
suffice most of the time, since setting more will not make things run
|
||||
faster most of the times (actually, it could slow things down).
|
||||
|
||||
The next thing you want to do is to add a method that will be called when
|
||||
an `Update`__ arrives:
|
||||
Once you have your client ready, the next thing you want to do is to add a
|
||||
method that will be called when an `Update`__ arrives:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
def callback(update):
|
||||
import asyncio
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
async def callback(update):
|
||||
print('I received', update)
|
||||
|
||||
client.add_event_handler(callback)
|
||||
# do more work here, or simply sleep!
|
||||
loop.run_until_complete(client.add_event_handler(callback))
|
||||
loop.run_forever() # this blocks forever, don't let the script end!
|
||||
|
||||
That's it! This is the old way to listen for raw updates, with no further
|
||||
processing. If this feels annoying for you, remember that you can always
|
||||
|
@ -51,94 +33,18 @@ let's reply to them with the same text reversed:
|
|||
|
||||
from telethon.tl.types import UpdateShortMessage, PeerUser
|
||||
|
||||
def replier(update):
|
||||
async def replier(update):
|
||||
if isinstance(update, UpdateShortMessage) and not update.out:
|
||||
client.send_message(PeerUser(update.user_id), update.message[::-1])
|
||||
await client.send_message(PeerUser(update.user_id), update.message[::-1])
|
||||
|
||||
|
||||
client.add_event_handler(replier)
|
||||
input('Press enter to stop this!')
|
||||
client.disconnect()
|
||||
loop.run_until_complete(client.add_event_handler(replier))
|
||||
loop.run_forever()
|
||||
|
||||
We only ask you one thing: don't keep this running for too long, or your
|
||||
contacts will go mad.
|
||||
|
||||
|
||||
Spawning no worker at all
|
||||
*************************
|
||||
|
||||
All the workers do is loop forever and poll updates from a queue that is
|
||||
filled from the ``ReadThread``, responsible for reading every item off
|
||||
the network. If you only need a worker and the ``MainThread`` would be
|
||||
doing no other job, this is the preferred way. You can easily do the same
|
||||
as the workers like so:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
while True:
|
||||
try:
|
||||
update = client.updates.poll()
|
||||
if not update:
|
||||
continue
|
||||
|
||||
print('I received', update)
|
||||
except KeyboardInterrupt:
|
||||
break
|
||||
|
||||
client.disconnect()
|
||||
|
||||
Note that ``poll`` accepts a ``timeout=`` parameter, and it will return
|
||||
``None`` if other thread got the update before you could or if the timeout
|
||||
expired, so it's important to check ``if not update``.
|
||||
|
||||
This can coexist with the rest of ``N`` workers, or you can set it to ``0``
|
||||
additional workers:
|
||||
|
||||
``client = TelegramClient('session', api_id, api_hash, update_workers=0)``
|
||||
|
||||
You **must** set it to ``0`` (or higher), as it defaults to ``None`` and that
|
||||
has a different meaning. ``None`` workers means updates won't be processed
|
||||
*at all*, so you must set it to some integer value if you want
|
||||
``client.updates.poll()`` to work.
|
||||
|
||||
|
||||
Using the main thread instead the ``ReadThread``
|
||||
************************************************
|
||||
|
||||
If you have no work to do on the ``MainThread`` and you were planning to have
|
||||
a ``while True: sleep(1)``, don't do that. Instead, don't spawn the secondary
|
||||
``ReadThread`` at all like so:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
client = TelegramClient(
|
||||
...
|
||||
spawn_read_thread=False
|
||||
)
|
||||
|
||||
And then ``.idle()`` from the ``MainThread``:
|
||||
|
||||
``client.idle()``
|
||||
|
||||
You can stop it with :kbd:`Control+C`, and you can configure the signals
|
||||
to be used in a similar fashion to `Python Telegram Bot`__.
|
||||
|
||||
As a complete example:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
def callback(update):
|
||||
print('I received', update)
|
||||
|
||||
client = TelegramClient('session', api_id, api_hash,
|
||||
update_workers=1, spawn_read_thread=False)
|
||||
|
||||
client.connect()
|
||||
client.add_event_handler(callback)
|
||||
client.idle() # ends with Ctrl+C
|
||||
|
||||
|
||||
This is the preferred way to use if you're simply going to listen for updates.
|
||||
|
||||
__ https://lonamiwebs.github.io/Telethon/types/update.html
|
||||
__ https://github.com/python-telegram-bot/python-telegram-bot/blob/4b3315db6feebafb94edcaa803df52bb49999ced/telegram/ext/updater.py#L460
|
||||
|
|
|
@ -49,7 +49,7 @@ your disk. This is by default a database file using Python's ``sqlite3``.
|
|||
Before using the client, you must be connected to Telegram.
|
||||
Doing so is very easy:
|
||||
|
||||
``client.connect() # Must return True, otherwise, try again``
|
||||
``await client.connect() # Must return True, otherwise, try again``
|
||||
|
||||
You may or may not be authorized yet. You must be authorized
|
||||
before you're able to send any request:
|
||||
|
@ -61,8 +61,8 @@ If you're not authorized, you need to ``.sign_in()``:
|
|||
.. code-block:: python
|
||||
|
||||
phone_number = '+34600000000'
|
||||
client.send_code_request(phone_number)
|
||||
myself = client.sign_in(phone_number, input('Enter code: '))
|
||||
await client.send_code_request(phone_number)
|
||||
myself = await client.sign_in(phone_number, input('Enter code: '))
|
||||
# If .sign_in raises PhoneNumberUnoccupiedError, use .sign_up instead
|
||||
# If .sign_in raises SessionPasswordNeeded error, call .sign_in(password=...)
|
||||
# You can import both exceptions from telethon.errors.
|
||||
|
@ -84,19 +84,21 @@ As a full example:
|
|||
|
||||
.. code-block:: python
|
||||
|
||||
async def main():
|
||||
client = TelegramClient('anon', api_id, api_hash)
|
||||
assert client.connect()
|
||||
assert await client.connect()
|
||||
if not client.is_user_authorized():
|
||||
client.send_code_request(phone_number)
|
||||
me = client.sign_in(phone_number, input('Enter code: '))
|
||||
await client.send_code_request(phone_number)
|
||||
me = await client.sign_in(phone_number, input('Enter code: '))
|
||||
|
||||
|
||||
All of this, however, can be done through a call to ``.start()``:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
async def main():
|
||||
client = TelegramClient('anon', api_id, api_hash)
|
||||
client.start()
|
||||
await client.start()
|
||||
|
||||
|
||||
The code shown is just what ``.start()`` will be doing behind the scenes
|
||||
|
@ -110,6 +112,19 @@ is just a matter of taste, and how much control you need.
|
|||
|
||||
Remember that you can get yourself at any time with ``client.get_me()``.
|
||||
|
||||
Assuming you've written all of this in a ``async def main():``, you can
|
||||
run it with:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
import asyncio
|
||||
|
||||
async def main():
|
||||
...
|
||||
|
||||
asyncio.get_event_loop().run_until_complete(main())
|
||||
|
||||
|
||||
.. warning::
|
||||
Please note that if you fail to login around 5 times (or change the first
|
||||
parameter of the ``TelegramClient``, which is the session name) you will
|
||||
|
@ -146,11 +161,11 @@ account, calling :meth:`telethon.TelegramClient.sign_in` will raise a
|
|||
import getpass
|
||||
from telethon.errors import SessionPasswordNeededError
|
||||
|
||||
client.sign_in(phone)
|
||||
await client.sign_in(phone)
|
||||
try:
|
||||
client.sign_in(code=input('Enter code: '))
|
||||
await client.sign_in(code=input('Enter code: '))
|
||||
except SessionPasswordNeededError:
|
||||
client.sign_in(password=getpass.getpass())
|
||||
await client.sign_in(password=getpass.getpass())
|
||||
|
||||
|
||||
The mentioned ``.start()`` method will handle this for you as well, but
|
||||
|
@ -170,18 +185,18 @@ See the examples below:
|
|||
from telethon.errors import EmailUnconfirmedError
|
||||
|
||||
# Sets 2FA password for first time:
|
||||
client.edit_2fa(new_password='supersecurepassword')
|
||||
await client.edit_2fa(new_password='supersecurepassword')
|
||||
|
||||
# Changes password:
|
||||
client.edit_2fa(current_password='supersecurepassword',
|
||||
await client.edit_2fa(current_password='supersecurepassword',
|
||||
new_password='changedmymind')
|
||||
|
||||
# Clears current password (i.e. removes 2FA):
|
||||
client.edit_2fa(current_password='changedmymind', new_password=None)
|
||||
await client.edit_2fa(current_password='changedmymind', new_password=None)
|
||||
|
||||
# Sets new password with recovery email:
|
||||
try:
|
||||
client.edit_2fa(new_password='memes and dreams',
|
||||
await client.edit_2fa(new_password='memes and dreams',
|
||||
email='JohnSmith@example.com')
|
||||
# Raises error (you need to check your email to complete 2FA setup.)
|
||||
except EmailUnconfirmedError:
|
||||
|
@ -192,7 +207,7 @@ See the examples below:
|
|||
# give email parameter again it will keep the last used setting
|
||||
|
||||
# Set hint after already setting password:
|
||||
client.edit_2fa(current_password='memes and dreams',
|
||||
await client.edit_2fa(current_password='memes and dreams',
|
||||
new_password='memes and dreams',
|
||||
hint='It keeps you alive')
|
||||
|
||||
|
|
|
@ -38,35 +38,34 @@ Getting entities
|
|||
|
||||
Through the use of the :ref:`sessions`, the library will automatically
|
||||
remember the ID and hash pair, along with some extra information, so
|
||||
you're able to just do this:
|
||||
you're able to just do this (inside an ``async def``):
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
# Dialogs are the "conversations you have open".
|
||||
# This method returns a list of Dialog, which
|
||||
# has the .entity attribute and other information.
|
||||
dialogs = client.get_dialogs()
|
||||
dialogs = await client.get_dialogs()
|
||||
|
||||
# All of these work and do the same.
|
||||
lonami = client.get_entity('lonami')
|
||||
lonami = client.get_entity('t.me/lonami')
|
||||
lonami = client.get_entity('https://telegram.dog/lonami')
|
||||
lonami = await client.get_entity('lonami')
|
||||
lonami = await client.get_entity('t.me/lonami')
|
||||
lonami = await client.get_entity('https://telegram.dog/lonami')
|
||||
|
||||
# Other kind of entities.
|
||||
channel = client.get_entity('telegram.me/joinchat/AAAAAEkk2WdoDrB4-Q8-gg')
|
||||
contact = client.get_entity('+34xxxxxxxxx')
|
||||
friend = client.get_entity(friend_id)
|
||||
channel = await client.get_entity('telegram.me/joinchat/AAAAAEkk2WdoDrB4-Q8-gg')
|
||||
contact = await client.get_entity('+34xxxxxxxxx')
|
||||
friend = await client.get_entity(friend_id)
|
||||
|
||||
# Getting entities through their ID (User, Chat or Channel)
|
||||
entity = client.get_entity(some_id)
|
||||
entity = await client.get_entity(some_id)
|
||||
|
||||
# You can be more explicit about the type for said ID by wrapping
|
||||
# it inside a Peer instance. This is recommended but not necessary.
|
||||
from telethon.tl.types import PeerUser, PeerChat, PeerChannel
|
||||
|
||||
my_user = client.get_entity(PeerUser(some_id))
|
||||
my_chat = client.get_entity(PeerChat(some_id))
|
||||
my_channel = client.get_entity(PeerChannel(some_id))
|
||||
my_user = await client.get_entity(PeerUser(some_id))
|
||||
my_chat = await client.get_entity(PeerChat(some_id))
|
||||
my_channel = await client.get_entity(PeerChannel(some_id))
|
||||
|
||||
|
||||
All methods in the :ref:`telegram-client` call ``.get_input_entity()`` prior
|
||||
|
@ -126,7 +125,7 @@ library, the raw requests you make to the API are also able to call
|
|||
|
||||
.. code-block:: python
|
||||
|
||||
client(SendMessageRequest('username', 'hello'))
|
||||
await client(SendMessageRequest('username', 'hello'))
|
||||
|
||||
The library will call the ``.resolve()`` method of the request, which will
|
||||
resolve ``'username'`` with the appropriated :tl:`InputPeer`. Don't worry if
|
||||
|
|
|
@ -19,15 +19,19 @@ Creating a client
|
|||
|
||||
.. code-block:: python
|
||||
|
||||
import asyncio
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
from telethon import TelegramClient
|
||||
|
||||
|
||||
# These example values won't work. You must get your own api_id and
|
||||
# api_hash from https://my.telegram.org, under API Development.
|
||||
api_id = 12345
|
||||
api_hash = '0123456789abcdef0123456789abcdef'
|
||||
|
||||
client = TelegramClient('session_name', api_id, api_hash)
|
||||
client.start()
|
||||
loop.run_until_complete(client.start())
|
||||
|
||||
**More details**: :ref:`creating-a-client`
|
||||
|
||||
|
@ -37,31 +41,33 @@ Basic Usage
|
|||
|
||||
.. code-block:: python
|
||||
|
||||
# You should write all this inside of an async def.
|
||||
#
|
||||
# Getting information about yourself
|
||||
print(client.get_me().stringify())
|
||||
print((await client.get_me()).stringify())
|
||||
|
||||
# Sending a message (you can use 'me' or 'self' to message yourself)
|
||||
client.send_message('username', 'Hello World from Telethon!')
|
||||
await client.send_message('username', 'Hello World from Telethon!')
|
||||
|
||||
# Sending a file
|
||||
client.send_file('username', '/home/myself/Pictures/holidays.jpg')
|
||||
await client.send_file('username', '/home/myself/Pictures/holidays.jpg')
|
||||
|
||||
# Retrieving messages from a chat
|
||||
from telethon import utils
|
||||
for message in client.iter_messages('username', limit=10):
|
||||
async for message in client.iter_messages('username', limit=10):
|
||||
print(utils.get_display_name(message.sender), message.message)
|
||||
|
||||
# Listing all the dialogs (conversations you have open)
|
||||
for dialog in client.get_dialogs(limit=10):
|
||||
async for dialog in client.get_dialogs(limit=10):
|
||||
print(utils.get_display_name(dialog.entity), dialog.draft.message)
|
||||
|
||||
# Downloading profile photos (default path is the working directory)
|
||||
client.download_profile_photo('username')
|
||||
await client.download_profile_photo('username')
|
||||
|
||||
# Once you have a message with .media (if message.media)
|
||||
# you can download it using client.download_media():
|
||||
messages = client.get_messages('username')
|
||||
client.download_media(messages[0])
|
||||
messages = await client.get_messages('username')
|
||||
await client.download_media(messages[0])
|
||||
|
||||
**More details**: :ref:`telegram-client`
|
||||
|
||||
|
@ -73,17 +79,15 @@ Handling Updates
|
|||
|
||||
.. code-block:: python
|
||||
|
||||
import asyncio
|
||||
from telethon import events
|
||||
|
||||
# We need to have some worker running
|
||||
client.updates.workers = 1
|
||||
|
||||
@client.on(events.NewMessage(incoming=True, pattern='(?i)hi'))
|
||||
def handler(event):
|
||||
event.reply('Hello!')
|
||||
async def handler(event):
|
||||
await event.reply('Hello!')
|
||||
|
||||
# If you want to handle updates you can't let the script end.
|
||||
input('Press enter to exit.')
|
||||
asyncio.get_event_loop().run_forever()
|
||||
|
||||
**More details**: :ref:`working-with-updates`
|
||||
|
||||
|
|
|
@ -32,7 +32,7 @@ need of manually importing the requests you need.
|
|||
|
||||
For instance, retrieving your own user can be done in a single line:
|
||||
|
||||
``myself = client.get_me()``
|
||||
``myself = await client.get_me()``
|
||||
|
||||
Internally, this method has sent a request to Telegram, who replied with
|
||||
the information about your own user, and then the desired information
|
||||
|
@ -47,37 +47,38 @@ how the library refers to either of these:
|
|||
# The method will infer that you've passed an username
|
||||
# It also accepts phone numbers, and will get the user
|
||||
# from your contact list.
|
||||
lonami = client.get_entity('lonami')
|
||||
lonami = await client.get_entity('lonami')
|
||||
|
||||
The so called "entities" are another important whole concept on its own,
|
||||
but for now you don't need to worry about it. Simply know that they are
|
||||
a good way to get information about an user, chat or channel.
|
||||
|
||||
Many other common methods for quick scripts are also available:
|
||||
Many other common methods for quick scripts are also available.
|
||||
Note that you should be writing this inside of an ``async def``:
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
# Note that you can use 'me' or 'self' to message yourself
|
||||
client.send_message('username', 'Hello World from Telethon!')
|
||||
await client.send_message('username', 'Hello World from Telethon!')
|
||||
|
||||
# .send_message's parse mode defaults to markdown, so you
|
||||
# can use **bold**, __italics__, [links](https://example.com), `code`,
|
||||
# and even [mentions](@username)/[mentions](tg://user?id=123456789)
|
||||
client.send_message('username', '**Using** __markdown__ `too`!')
|
||||
await client.send_message('username', '**Using** __markdown__ `too`!')
|
||||
|
||||
client.send_file('username', '/home/myself/Pictures/holidays.jpg')
|
||||
await client.send_file('username', '/home/myself/Pictures/holidays.jpg')
|
||||
|
||||
# The utils package has some goodies, like .get_display_name()
|
||||
from telethon import utils
|
||||
for message in client.iter_messages('username', limit=10):
|
||||
async for message in client.iter_messages('username', limit=10):
|
||||
print(utils.get_display_name(message.sender), message.message)
|
||||
|
||||
# Dialogs are the conversations you have open
|
||||
for dialog in client.get_dialogs(limit=10):
|
||||
async for dialog in client.get_dialogs(limit=10):
|
||||
print(utils.get_display_name(dialog.entity), dialog.draft.message)
|
||||
|
||||
# Default path is the working directory
|
||||
client.download_profile_photo('username')
|
||||
await client.download_profile_photo('username')
|
||||
|
||||
# Call .disconnect() when you're done
|
||||
client.disconnect()
|
||||
|
|
|
@ -30,27 +30,33 @@ Getting Started
|
|||
|
||||
.. code-block:: python
|
||||
|
||||
import asyncio
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
from telethon import TelegramClient, events
|
||||
|
||||
client = TelegramClient(..., update_workers=1, spawn_read_thread=False)
|
||||
client.start()
|
||||
client = TelegramClient(...)
|
||||
loop.run_until_complete(client.start())
|
||||
|
||||
@client.on(events.NewMessage)
|
||||
def my_event_handler(event):
|
||||
async def my_event_handler(event):
|
||||
if 'hello' in event.raw_text:
|
||||
event.reply('hi!')
|
||||
await event.reply('hi!')
|
||||
|
||||
client.idle()
|
||||
loop.run_forever()
|
||||
|
||||
|
||||
Not much, but there might be some things unclear. What does this code do?
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
import asyncio
|
||||
loop = asyncio.get_event_loop()
|
||||
|
||||
from telethon import TelegramClient, events
|
||||
|
||||
client = TelegramClient(..., update_workers=1, spawn_read_thread=False)
|
||||
client.start()
|
||||
client = TelegramClient(...)
|
||||
loop.run_until_complete(client.start())
|
||||
|
||||
|
||||
This is normal initialization (of course, pass session name, API ID and hash).
|
||||
|
@ -67,9 +73,9 @@ the callback function you're about to define will be called:
|
|||
|
||||
.. code-block:: python
|
||||
|
||||
def my_event_handler(event):
|
||||
async def my_event_handler(event):
|
||||
if 'hello' in event.raw_text:
|
||||
event.reply('hi!')
|
||||
await event.reply('hi!')
|
||||
|
||||
|
||||
If a ``NewMessage`` event occurs, and ``'hello'`` is in the text of the
|
||||
|
@ -77,10 +83,10 @@ message, we ``reply`` to the event with a ``'hi!'`` message.
|
|||
|
||||
.. code-block:: python
|
||||
|
||||
client.idle()
|
||||
loop.run_forever()
|
||||
|
||||
|
||||
Finally, this tells the client that we're done with our code, and want
|
||||
Finally, this tells the script that we're done with our code, and want
|
||||
to listen for all these events to occur. Of course, you might want to
|
||||
do other things instead idling. For this refer to :ref:`update-modes`.
|
||||
|
||||
|
@ -119,17 +125,17 @@ for example:
|
|||
# Either a single item or a list of them will work for the chats.
|
||||
# You can also use the IDs, Peers, or even User/Chat/Channel objects.
|
||||
@client.on(events.NewMessage(chats=('TelethonChat', 'TelethonOffTopic')))
|
||||
def normal_handler(event):
|
||||
async def normal_handler(event):
|
||||
if 'roll' in event.raw_text:
|
||||
event.reply(str(random.randint(1, 6)))
|
||||
await event.reply(str(random.randint(1, 6)))
|
||||
|
||||
|
||||
# Similarly, you can use incoming=True for messages that you receive
|
||||
@client.on(events.NewMessage(chats='TelethonOffTopic', outgoing=True))
|
||||
def admin_handler(event):
|
||||
async def admin_handler(event):
|
||||
if event.raw_text.startswith('eval'):
|
||||
expression = event.raw_text.replace('eval', '').strip()
|
||||
event.reply(str(ast.literal_eval(expression)))
|
||||
await event.reply(str(ast.literal_eval(expression)))
|
||||
|
||||
|
||||
You can pass one or more chats to the ``chats`` parameter (as a list or tuple),
|
||||
|
@ -167,15 +173,15 @@ propagation of the update through your handlers to stop:
|
|||
from telethon.events import StopPropagation
|
||||
|
||||
@client.on(events.NewMessage)
|
||||
def _(event):
|
||||
async def _(event):
|
||||
# ... some conditions
|
||||
event.delete()
|
||||
await event.delete()
|
||||
|
||||
# Other handlers won't have an event to work with
|
||||
raise StopPropagation
|
||||
|
||||
@client.on(events.NewMessage)
|
||||
def _(event):
|
||||
async def _(event):
|
||||
# Will never be reached, because it is the second handler
|
||||
# in the chain.
|
||||
pass
|
||||
|
|
|
@ -32,4 +32,4 @@ times, in this case, ``22222`` so we can hardcode that:
|
|||
|
||||
client = TelegramClient(None, api_id, api_hash)
|
||||
client.session.set_dc(2, '149.154.167.40', 80)
|
||||
client.start(phone='9996621234', code_callback=lambda: '22222')
|
||||
await client.start(phone='9996621234', code_callback=lambda: '22222')
|
||||
|
|
|
@ -19,7 +19,7 @@ not *interact* with a voting message), by making use of the
|
|||
|
||||
from telethon.tl.functions.messages import GetInlineBotResultsRequest
|
||||
|
||||
bot_results = client(GetInlineBotResultsRequest(
|
||||
bot_results = await client(GetInlineBotResultsRequest(
|
||||
bot, user_or_chat, 'query', ''
|
||||
))
|
||||
|
||||
|
@ -30,7 +30,7 @@ And you can select any of their results by using
|
|||
|
||||
from telethon.tl.functions.messages import SendInlineBotResultRequest
|
||||
|
||||
client(SendInlineBotResultRequest(
|
||||
await client(SendInlineBotResultRequest(
|
||||
get_input_peer(user_or_chat),
|
||||
obtained_query_id,
|
||||
obtained_str_id
|
||||
|
@ -47,7 +47,7 @@ To interact with a message that has a special reply markup, such as
|
|||
|
||||
from telethon.tl.functions.messages import GetBotCallbackAnswerRequest
|
||||
|
||||
client(GetBotCallbackAnswerRequest(
|
||||
await client(GetBotCallbackAnswerRequest(
|
||||
user_or_chat,
|
||||
msg.id,
|
||||
data=msg.reply_markup.rows[wanted_row].buttons[wanted_button].data
|
||||
|
|
|
@ -25,11 +25,11 @@ to, you can make use of the :tl:`JoinChannelRequest` to join such channel:
|
|||
.. code-block:: python
|
||||
|
||||
from telethon.tl.functions.channels import JoinChannelRequest
|
||||
client(JoinChannelRequest(channel))
|
||||
await client(JoinChannelRequest(channel))
|
||||
|
||||
# In the same way, you can also leave such channel
|
||||
from telethon.tl.functions.channels import LeaveChannelRequest
|
||||
client(LeaveChannelRequest(input_channel))
|
||||
await client(LeaveChannelRequest(input_channel))
|
||||
|
||||
|
||||
For more on channels, check the `channels namespace`__.
|
||||
|
@ -51,7 +51,7 @@ example, is the ``hash`` of the chat or channel. Now you can use
|
|||
.. code-block:: python
|
||||
|
||||
from telethon.tl.functions.messages import ImportChatInviteRequest
|
||||
updates = client(ImportChatInviteRequest('AAAAAEHbEkejzxUjAUCfYg'))
|
||||
updates = await client(ImportChatInviteRequest('AAAAAEHbEkejzxUjAUCfYg'))
|
||||
|
||||
|
||||
Adding someone else to such chat or channel
|
||||
|
@ -68,7 +68,7 @@ use is very straightforward, or :tl:`InviteToChannelRequest` for channels:
|
|||
|
||||
# Note that ``user_to_add`` is NOT the name of the parameter.
|
||||
# It's the user you want to add (``user_id=user_to_add``).
|
||||
client(AddChatUserRequest(
|
||||
await client(AddChatUserRequest(
|
||||
chat_id,
|
||||
user_to_add,
|
||||
fwd_limit=10 # Allow the user to see the 10 last messages
|
||||
|
@ -77,7 +77,7 @@ use is very straightforward, or :tl:`InviteToChannelRequest` for channels:
|
|||
# For channels (which includes megagroups)
|
||||
from telethon.tl.functions.channels import InviteToChannelRequest
|
||||
|
||||
client(InviteToChannelRequest(
|
||||
await client(InviteToChannelRequest(
|
||||
channel,
|
||||
[users_to_add]
|
||||
))
|
||||
|
@ -123,7 +123,7 @@ a fixed limit:
|
|||
all_participants = []
|
||||
|
||||
while True:
|
||||
participants = client(GetParticipantsRequest(
|
||||
participants = await client(GetParticipantsRequest(
|
||||
channel, ChannelParticipantsSearch(''), offset, limit,
|
||||
hash=0
|
||||
))
|
||||
|
@ -193,7 +193,7 @@ Giving or revoking admin permissions can be done with the :tl:`EditAdminRequest`
|
|||
# )
|
||||
|
||||
# Once you have a ChannelAdminRights, invoke it
|
||||
client(EditAdminRequest(channel, user, rights))
|
||||
await client(EditAdminRequest(channel, user, rights))
|
||||
|
||||
# User will now be able to change group info, delete other people's
|
||||
# messages and pin messages.
|
||||
|
@ -252,7 +252,7 @@ banned rights of an user through :tl:`EditAdminRequest` and its parameter
|
|||
embed_links=True
|
||||
)
|
||||
|
||||
client(EditBannedRequest(channel, user, rights))
|
||||
await client(EditBannedRequest(channel, user, rights))
|
||||
|
||||
|
||||
Kicking a member
|
||||
|
@ -267,7 +267,7 @@ is enough:
|
|||
from telethon.tl.functions.channels import EditBannedRequest
|
||||
from telethon.tl.types import ChannelBannedRights
|
||||
|
||||
client(EditBannedRequest(channel, user, ChannelBannedRights(
|
||||
await client(EditBannedRequest(channel, user, ChannelBannedRights(
|
||||
until_date=None,
|
||||
view_messages=True
|
||||
)))
|
||||
|
@ -291,7 +291,7 @@ use :tl:`GetMessagesViewsRequest`, setting ``increment=True``:
|
|||
# Obtain `channel' through dialogs or through client.get_entity() or anyhow.
|
||||
# Obtain `msg_ids' through `.get_messages()` or anyhow. Must be a list.
|
||||
|
||||
client(GetMessagesViewsRequest(
|
||||
await client(GetMessagesViewsRequest(
|
||||
peer=channel,
|
||||
id=msg_ids,
|
||||
increment=True
|
||||
|
|
|
@ -19,9 +19,9 @@ you should use :tl:`GetFullUser`:
|
|||
|
||||
from telethon.tl.functions.users import GetFullUserRequest
|
||||
|
||||
full = client(GetFullUserRequest(user))
|
||||
full = await client(GetFullUserRequest(user))
|
||||
# or even
|
||||
full = client(GetFullUserRequest('username'))
|
||||
full = await client(GetFullUserRequest('username'))
|
||||
|
||||
bio = full.about
|
||||
|
||||
|
@ -39,7 +39,7 @@ request. Omitted fields won't change after invoking :tl:`UpdateProfile`:
|
|||
|
||||
from telethon.tl.functions.account import UpdateProfileRequest
|
||||
|
||||
client(UpdateProfileRequest(about='This is a test from Telethon'))
|
||||
await client(UpdateProfileRequest(about='This is a test from Telethon'))
|
||||
|
||||
|
||||
Updating your username
|
||||
|
@ -51,7 +51,7 @@ You need to use :tl:`account.UpdateUsername`:
|
|||
|
||||
from telethon.tl.functions.account import UpdateUsernameRequest
|
||||
|
||||
client(UpdateUsernameRequest('new_username'))
|
||||
await client(UpdateUsernameRequest('new_username'))
|
||||
|
||||
|
||||
Updating your profile photo
|
||||
|
@ -65,6 +65,6 @@ through :tl:`UploadProfilePhoto`:
|
|||
|
||||
from telethon.tl.functions.photos import UploadProfilePhotoRequest
|
||||
|
||||
client(UploadProfilePhotoRequest(
|
||||
await client(UploadProfilePhotoRequest(
|
||||
client.upload_file('/path/to/some/file')
|
||||
))
|
||||
|
|
|
@ -21,14 +21,14 @@ Forwarding messages
|
|||
.. code-block:: python
|
||||
|
||||
# If you only have the message IDs
|
||||
client.forward_messages(
|
||||
await client.forward_messages(
|
||||
entity, # to which entity you are forwarding the messages
|
||||
message_ids, # the IDs of the messages (or message) to forward
|
||||
from_entity # who sent the messages?
|
||||
)
|
||||
|
||||
# If you have ``Message`` objects
|
||||
client.forward_messages(
|
||||
await client.forward_messages(
|
||||
entity, # to which entity you are forwarding the messages
|
||||
messages # the messages (or message) to forward
|
||||
)
|
||||
|
@ -40,7 +40,7 @@ Forwarding messages
|
|||
from_entity = bar()
|
||||
to_entity = baz()
|
||||
|
||||
client(ForwardMessagesRequest(
|
||||
await client(ForwardMessagesRequest(
|
||||
from_peer=from_entity, # who sent these messages?
|
||||
id=[msg.id for msg in messages], # which are the messages?
|
||||
to_peer=to_entity # who are we forwarding them to?
|
||||
|
@ -71,7 +71,7 @@ into issues_. A valid example would be:
|
|||
from telethon.tl.types import InputMessagesFilterEmpty
|
||||
|
||||
filter = InputMessagesFilterEmpty()
|
||||
result = client(SearchRequest(
|
||||
result = await client(SearchRequest(
|
||||
peer=peer, # On which chat/conversation
|
||||
q='query', # What to search for
|
||||
filter=filter, # Filter to use (maybe filter for media)
|
||||
|
@ -116,20 +116,20 @@ send yourself the very first sticker you have:
|
|||
.. code-block:: python
|
||||
|
||||
# Get all the sticker sets this user has
|
||||
sticker_sets = client(GetAllStickersRequest(0))
|
||||
sticker_sets = await client(GetAllStickersRequest(0))
|
||||
|
||||
# Choose a sticker set
|
||||
sticker_set = sticker_sets.sets[0]
|
||||
|
||||
# Get the stickers for this sticker set
|
||||
stickers = client(GetStickerSetRequest(
|
||||
stickers = await client(GetStickerSetRequest(
|
||||
stickerset=InputStickerSetID(
|
||||
id=sticker_set.id, access_hash=sticker_set.access_hash
|
||||
)
|
||||
))
|
||||
|
||||
# Stickers are nothing more than files, so send that
|
||||
client(SendMediaRequest(
|
||||
await client(SendMediaRequest(
|
||||
peer=client.get_me(),
|
||||
media=InputMediaDocument(
|
||||
id=InputDocument(
|
||||
|
|
|
@ -22,6 +22,18 @@ when you upgrade!
|
|||
contains the friendly methods that **you should use** most of the time.
|
||||
|
||||
|
||||
.. note::
|
||||
|
||||
We assume that you have some experience working with ``asyncio``,
|
||||
if you don't you should probably use the threaded version of the
|
||||
library, or either learn how to use ``asyncio``. All the code
|
||||
here assumes you're writing the code inside an ``async def`` so
|
||||
we can use ``await`` across the examples.
|
||||
|
||||
Then you can ``import asyncio`` and run
|
||||
``asyncio.get_event_loop().run_until_complete(my_method())``
|
||||
|
||||
|
||||
What is this?
|
||||
*************
|
||||
|
||||
|
|
14
setup.py
14
setup.py
|
@ -131,12 +131,12 @@ def main():
|
|||
from subprocess import run
|
||||
from shutil import rmtree
|
||||
|
||||
for x in ('build', 'dist', 'Telethon.egg-info'):
|
||||
for x in ('build', 'dist', 'Telethon_aio.egg-info'):
|
||||
rmtree(x, ignore_errors=True)
|
||||
run('python3 setup.py sdist', shell=True)
|
||||
run('python3 setup.py bdist_wheel', shell=True)
|
||||
run('twine upload dist/*', shell=True)
|
||||
for x in ('build', 'dist', 'Telethon.egg-info'):
|
||||
for x in ('build', 'dist', 'Telethon_aio.egg-info'):
|
||||
rmtree(x, ignore_errors=True)
|
||||
|
||||
else:
|
||||
|
@ -152,12 +152,13 @@ def main():
|
|||
version = re.search(r"^__version__\s*=\s*'(.*)'.*$",
|
||||
f.read(), flags=re.MULTILINE).group(1)
|
||||
setup(
|
||||
name='Telethon',
|
||||
name='Telethon-aio',
|
||||
version=version,
|
||||
description="Full-featured Telegram client library for Python 3",
|
||||
description="Full-featured Telegram client library for Python 3, "
|
||||
"modified to work under Python's asyncio module.",
|
||||
long_description=long_description,
|
||||
|
||||
url='https://github.com/LonamiWebs/Telethon',
|
||||
url='https://github.com/LonamiWebs/Telethon/tree/asyncio',
|
||||
download_url='https://github.com/LonamiWebs/Telethon/releases',
|
||||
|
||||
author='Lonami Exo',
|
||||
|
@ -196,7 +197,8 @@ def main():
|
|||
'telethon_generator/parser/tl_object.py',
|
||||
'telethon_generator/parser/tl_parser.py',
|
||||
]),
|
||||
install_requires=['pyaes', 'rsa',
|
||||
# We must be careful not to miss any comma here... v
|
||||
install_requires=['pyaes', 'rsa', 'async_generator',
|
||||
'typing' if version_info < (3, 5, 2) else ""],
|
||||
extras_require={
|
||||
'cryptg': ['cryptg']
|
||||
|
|
|
@ -30,7 +30,7 @@ class CdnDecrypter:
|
|||
self.cdn_file_hashes = cdn_file_hashes
|
||||
|
||||
@staticmethod
|
||||
def prepare_decrypter(client, cdn_client, cdn_redirect):
|
||||
async def prepare_decrypter(client, cdn_client, cdn_redirect):
|
||||
"""
|
||||
Prepares a new CDN decrypter.
|
||||
|
||||
|
@ -52,14 +52,14 @@ class CdnDecrypter:
|
|||
cdn_aes, cdn_redirect.cdn_file_hashes
|
||||
)
|
||||
|
||||
cdn_file = cdn_client(GetCdnFileRequest(
|
||||
cdn_file = await cdn_client(GetCdnFileRequest(
|
||||
file_token=cdn_redirect.file_token,
|
||||
offset=cdn_redirect.cdn_file_hashes[0].offset,
|
||||
limit=cdn_redirect.cdn_file_hashes[0].limit
|
||||
))
|
||||
if isinstance(cdn_file, CdnFileReuploadNeeded):
|
||||
# We need to use the original client here
|
||||
client(ReuploadCdnFileRequest(
|
||||
await client(ReuploadCdnFileRequest(
|
||||
file_token=cdn_redirect.file_token,
|
||||
request_token=cdn_file.request_token
|
||||
))
|
||||
|
@ -73,7 +73,7 @@ class CdnDecrypter:
|
|||
|
||||
return decrypter, cdn_file
|
||||
|
||||
def get_file(self):
|
||||
async def get_file(self):
|
||||
"""
|
||||
Calls GetCdnFileRequest and decrypts its bytes.
|
||||
Also ensures that the file hasn't been tampered.
|
||||
|
@ -82,7 +82,7 @@ class CdnDecrypter:
|
|||
"""
|
||||
if self.cdn_file_hashes:
|
||||
cdn_hash = self.cdn_file_hashes.pop(0)
|
||||
cdn_file = self.client(GetCdnFileRequest(
|
||||
cdn_file = await self.client(GetCdnFileRequest(
|
||||
self.file_token, cdn_hash.offset, cdn_hash.limit
|
||||
))
|
||||
cdn_file.bytes = self.cdn_aes.encrypt(cdn_file.bytes)
|
||||
|
|
|
@ -158,15 +158,15 @@ class ChatAction(EventBuilder):
|
|||
self.new_title = new_title
|
||||
self.unpin = unpin
|
||||
|
||||
def respond(self, *args, **kwargs):
|
||||
async def respond(self, *args, **kwargs):
|
||||
"""
|
||||
Responds to the chat action message (not as a reply). Shorthand for
|
||||
`telethon.telegram_client.TelegramClient.send_message` with
|
||||
``entity`` already set.
|
||||
"""
|
||||
return self._client.send_message(self.input_chat, *args, **kwargs)
|
||||
return await self._client.send_message(await self.input_chat, *args, **kwargs)
|
||||
|
||||
def reply(self, *args, **kwargs):
|
||||
async def reply(self, *args, **kwargs):
|
||||
"""
|
||||
Replies to the chat action message (as a reply). Shorthand for
|
||||
`telethon.telegram_client.TelegramClient.send_message` with
|
||||
|
@ -178,9 +178,9 @@ class ChatAction(EventBuilder):
|
|||
return self.respond(*args, **kwargs)
|
||||
|
||||
kwargs['reply_to'] = self.action_message.id
|
||||
return self._client.send_message(self.input_chat, *args, **kwargs)
|
||||
return await self._client.send_message(await self.input_chat, *args, **kwargs)
|
||||
|
||||
def delete(self, *args, **kwargs):
|
||||
async def delete(self, *args, **kwargs):
|
||||
"""
|
||||
Deletes the chat action message. You're responsible for checking
|
||||
whether you have the permission to do so, or to except the error
|
||||
|
@ -191,12 +191,12 @@ class ChatAction(EventBuilder):
|
|||
Does nothing if no message action triggered this event.
|
||||
"""
|
||||
if self.action_message:
|
||||
return self._client.delete_messages(self.input_chat,
|
||||
return await self._client.delete_messages(await self.input_chat,
|
||||
[self.action_message],
|
||||
*args, **kwargs)
|
||||
|
||||
@property
|
||||
def pinned_message(self):
|
||||
async def pinned_message(self):
|
||||
"""
|
||||
If ``new_pin`` is ``True``, this returns the (:tl:`Message`)
|
||||
object that was pinned.
|
||||
|
@ -204,8 +204,8 @@ class ChatAction(EventBuilder):
|
|||
if self._pinned_message == 0:
|
||||
return None
|
||||
|
||||
if isinstance(self._pinned_message, int) and self.input_chat:
|
||||
r = self._client(functions.channels.GetMessagesRequest(
|
||||
if isinstance(self._pinned_message, int) and await self.input_chat:
|
||||
r = await self._client(functions.channels.GetMessagesRequest(
|
||||
self._input_chat, [self._pinned_message]
|
||||
))
|
||||
try:
|
||||
|
@ -221,7 +221,7 @@ class ChatAction(EventBuilder):
|
|||
return self._pinned_message
|
||||
|
||||
@property
|
||||
def added_by(self):
|
||||
async def added_by(self):
|
||||
"""
|
||||
The user who added ``users``, if applicable (``None`` otherwise).
|
||||
"""
|
||||
|
@ -230,12 +230,12 @@ class ChatAction(EventBuilder):
|
|||
self._entities.get(utils.get_peer_id(self._added_by))
|
||||
|
||||
if not self._added_by:
|
||||
self._added_by = self._client.get_entity(self._added_by)
|
||||
self._added_by = await self._client.get_entity(self._added_by)
|
||||
|
||||
return self._added_by
|
||||
|
||||
@property
|
||||
def kicked_by(self):
|
||||
async def kicked_by(self):
|
||||
"""
|
||||
The user who kicked ``users``, if applicable (``None`` otherwise).
|
||||
"""
|
||||
|
@ -244,27 +244,27 @@ class ChatAction(EventBuilder):
|
|||
self._entities.get(utils.get_peer_id(self._kicked_by))
|
||||
|
||||
if not self._kicked_by:
|
||||
self._kicked_by = self._client.get_entity(self._kicked_by)
|
||||
self._kicked_by = await self._client.get_entity(self._kicked_by)
|
||||
|
||||
return self._kicked_by
|
||||
|
||||
@property
|
||||
def user(self):
|
||||
async def user(self):
|
||||
"""
|
||||
The first user that takes part in this action (e.g. joined).
|
||||
|
||||
Might be ``None`` if the information can't be retrieved or
|
||||
there is no user taking part.
|
||||
"""
|
||||
if self.users:
|
||||
if await self.users:
|
||||
return self._users[0]
|
||||
|
||||
@property
|
||||
def input_user(self):
|
||||
async def input_user(self):
|
||||
"""
|
||||
Input version of the ``self.user`` property.
|
||||
"""
|
||||
if self.input_users:
|
||||
if await self.input_users:
|
||||
return self._input_users[0]
|
||||
|
||||
@property
|
||||
|
@ -276,7 +276,7 @@ class ChatAction(EventBuilder):
|
|||
return utils.get_peer_id(self._user_peers[0])
|
||||
|
||||
@property
|
||||
def users(self):
|
||||
async def users(self):
|
||||
"""
|
||||
A list of users that take part in this action (e.g. joined).
|
||||
|
||||
|
@ -296,7 +296,7 @@ class ChatAction(EventBuilder):
|
|||
missing.append(peer)
|
||||
|
||||
try:
|
||||
missing = self._client.get_entity(missing)
|
||||
missing = await self._client.get_entity(missing)
|
||||
except (TypeError, ValueError):
|
||||
missing = []
|
||||
|
||||
|
@ -305,7 +305,7 @@ class ChatAction(EventBuilder):
|
|||
return self._users
|
||||
|
||||
@property
|
||||
def input_users(self):
|
||||
async def input_users(self):
|
||||
"""
|
||||
Input version of the ``self.users`` property.
|
||||
"""
|
||||
|
@ -313,7 +313,7 @@ class ChatAction(EventBuilder):
|
|||
self._input_users = []
|
||||
for peer in self._user_peers:
|
||||
try:
|
||||
self._input_users.append(self._client.get_input_entity(
|
||||
self._input_users.append(await self._client.get_input_entity(
|
||||
peer
|
||||
))
|
||||
except (TypeError, ValueError):
|
||||
|
|
|
@ -7,7 +7,7 @@ from ..errors import RPCError
|
|||
from ..tl import TLObject, types, functions
|
||||
|
||||
|
||||
def _into_id_set(client, chats):
|
||||
async def _into_id_set(client, chats):
|
||||
"""Helper util to turn the input chat or chats into a set of IDs."""
|
||||
if chats is None:
|
||||
return None
|
||||
|
@ -30,9 +30,9 @@ def _into_id_set(client, chats):
|
|||
# 0x2d45687 == crc32(b'Peer')
|
||||
result.add(utils.get_peer_id(chat))
|
||||
else:
|
||||
chat = client.get_input_entity(chat)
|
||||
chat = await client.get_input_entity(chat)
|
||||
if isinstance(chat, types.InputPeerSelf):
|
||||
chat = client.get_me(input_peer=True)
|
||||
chat = await client.get_me(input_peer=True)
|
||||
result.add(utils.get_peer_id(chat))
|
||||
|
||||
return result
|
||||
|
@ -62,10 +62,10 @@ class EventBuilder(abc.ABC):
|
|||
def build(self, update):
|
||||
"""Builds an event for the given update if possible, or returns None"""
|
||||
|
||||
def resolve(self, client):
|
||||
async def resolve(self, client):
|
||||
"""Helper method to allow event builders to be resolved before usage"""
|
||||
self.chats = _into_id_set(client, self.chats)
|
||||
self._self_id = client.get_me(input_peer=True).user_id
|
||||
self.chats = await _into_id_set(client, self.chats)
|
||||
self._self_id = (await client.get_me(input_peer=True)).user_id
|
||||
|
||||
def _filter_event(self, event):
|
||||
"""
|
||||
|
@ -103,7 +103,7 @@ class EventCommon(abc.ABC):
|
|||
)
|
||||
self.is_channel = isinstance(chat_peer, types.PeerChannel)
|
||||
|
||||
def _get_entity(self, msg_id, entity_id, chat=None):
|
||||
async def _get_entity(self, msg_id, entity_id, chat=None):
|
||||
"""
|
||||
Helper function to call :tl:`GetMessages` on the give msg_id and
|
||||
return the input entity whose ID is the given entity ID.
|
||||
|
@ -115,11 +115,11 @@ class EventCommon(abc.ABC):
|
|||
"""
|
||||
try:
|
||||
if isinstance(chat, types.InputPeerChannel):
|
||||
result = self._client(
|
||||
result = await self._client(
|
||||
functions.channels.GetMessagesRequest(chat, [msg_id])
|
||||
)
|
||||
else:
|
||||
result = self._client(
|
||||
result = await self._client(
|
||||
functions.messages.GetMessagesRequest([msg_id])
|
||||
)
|
||||
except RPCError:
|
||||
|
@ -136,7 +136,7 @@ class EventCommon(abc.ABC):
|
|||
return None, None
|
||||
|
||||
@property
|
||||
def input_chat(self):
|
||||
async def input_chat(self):
|
||||
"""
|
||||
The (:tl:`InputPeer`) (group, megagroup or channel) on which
|
||||
the event occurred. This doesn't have the title or anything,
|
||||
|
@ -148,7 +148,7 @@ class EventCommon(abc.ABC):
|
|||
|
||||
if self._input_chat is None and self._chat_peer is not None:
|
||||
try:
|
||||
self._input_chat = self._client.get_input_entity(
|
||||
self._input_chat = await self._client.get_input_entity(
|
||||
self._chat_peer
|
||||
)
|
||||
except (ValueError, TypeError):
|
||||
|
@ -157,7 +157,7 @@ class EventCommon(abc.ABC):
|
|||
# TODO For channels, getDifference? Maybe looking
|
||||
# in the dialogs (which is already done) is enough.
|
||||
if self._message_id is not None:
|
||||
self._chat, self._input_chat = self._get_entity(
|
||||
self._chat, self._input_chat = await self._get_entity(
|
||||
self._message_id,
|
||||
utils.get_peer_id(self._chat_peer)
|
||||
)
|
||||
|
@ -168,21 +168,21 @@ class EventCommon(abc.ABC):
|
|||
return self._client
|
||||
|
||||
@property
|
||||
def chat(self):
|
||||
async def chat(self):
|
||||
"""
|
||||
The (:tl:`User` | :tl:`Chat` | :tl:`Channel`, optional) on which
|
||||
the event occurred. This property may make an API call the first time
|
||||
to get the most up to date version of the chat (mostly when the event
|
||||
doesn't belong to a channel), so keep that in mind.
|
||||
"""
|
||||
if not self.input_chat:
|
||||
if not await self.input_chat:
|
||||
return None
|
||||
|
||||
if self._chat is None:
|
||||
self._chat = self._entities.get(utils.get_peer_id(self._input_chat))
|
||||
|
||||
if self._chat is None:
|
||||
self._chat = self._client.get_entity(self._input_chat)
|
||||
self._chat = await self._client.get_entity(self._input_chat)
|
||||
|
||||
return self._chat
|
||||
|
||||
|
|
|
@ -89,7 +89,7 @@ class MessageRead(EventBuilder):
|
|||
return self._message_ids
|
||||
|
||||
@property
|
||||
def messages(self):
|
||||
async def messages(self):
|
||||
"""
|
||||
The list of :tl:`Message` **which contents'** were read.
|
||||
|
||||
|
@ -97,17 +97,17 @@ class MessageRead(EventBuilder):
|
|||
was read instead checking if it's in here.
|
||||
"""
|
||||
if self._messages is None:
|
||||
chat = self.input_chat
|
||||
chat = await self.input_chat
|
||||
if not chat:
|
||||
self._messages = []
|
||||
elif isinstance(chat, types.InputPeerChannel):
|
||||
self._messages =\
|
||||
self._client(functions.channels.GetMessagesRequest(
|
||||
await self._client(functions.channels.GetMessagesRequest(
|
||||
chat, self._message_ids
|
||||
)).messages
|
||||
else:
|
||||
self._messages = \
|
||||
self._client(functions.messages.GetMessagesRequest(
|
||||
await self._client(functions.messages.GetMessagesRequest(
|
||||
self._message_ids
|
||||
)).messages
|
||||
|
||||
|
|
|
@ -154,34 +154,34 @@ class NewMessage(EventBuilder):
|
|||
self.is_reply = bool(message.reply_to_msg_id)
|
||||
self._reply_message = None
|
||||
|
||||
def respond(self, *args, **kwargs):
|
||||
async def respond(self, *args, **kwargs):
|
||||
"""
|
||||
Responds to the message (not as a reply). Shorthand for
|
||||
`telethon.telegram_client.TelegramClient.send_message` with
|
||||
``entity`` already set.
|
||||
"""
|
||||
return self._client.send_message(self.input_chat, *args, **kwargs)
|
||||
return await self._client.send_message(await self.input_chat, *args, **kwargs)
|
||||
|
||||
def reply(self, *args, **kwargs):
|
||||
async def reply(self, *args, **kwargs):
|
||||
"""
|
||||
Replies to the message (as a reply). Shorthand for
|
||||
`telethon.telegram_client.TelegramClient.send_message` with
|
||||
both ``entity`` and ``reply_to`` already set.
|
||||
"""
|
||||
kwargs['reply_to'] = self.message.id
|
||||
return self._client.send_message(self.input_chat, *args, **kwargs)
|
||||
return await self._client.send_message(await self.input_chat, *args, **kwargs)
|
||||
|
||||
def forward_to(self, *args, **kwargs):
|
||||
async def forward_to(self, *args, **kwargs):
|
||||
"""
|
||||
Forwards the message. Shorthand for
|
||||
`telethon.telegram_client.TelegramClient.forward_messages` with
|
||||
both ``messages`` and ``from_peer`` already set.
|
||||
"""
|
||||
kwargs['messages'] = self.message.id
|
||||
kwargs['from_peer'] = self.input_chat
|
||||
return self._client.forward_messages(*args, **kwargs)
|
||||
kwargs['from_peer'] = await self.input_chat
|
||||
return await self._client.forward_messages(*args, **kwargs)
|
||||
|
||||
def edit(self, *args, **kwargs):
|
||||
async def edit(self, *args, **kwargs):
|
||||
"""
|
||||
Edits the message iff it's outgoing. Shorthand for
|
||||
`telethon.telegram_client.TelegramClient.edit_message` with
|
||||
|
@ -195,15 +195,15 @@ class NewMessage(EventBuilder):
|
|||
if not self.message.out:
|
||||
if not isinstance(self.message.to_id, types.PeerUser):
|
||||
return None
|
||||
me = self._client.get_me(input_peer=True)
|
||||
me = await self._client.get_me(input_peer=True)
|
||||
if self.message.to_id.user_id != me.user_id:
|
||||
return None
|
||||
|
||||
return self._client.edit_message(self.input_chat,
|
||||
return await self._client.edit_message(await self.input_chat,
|
||||
self.message,
|
||||
*args, **kwargs)
|
||||
|
||||
def delete(self, *args, **kwargs):
|
||||
async def delete(self, *args, **kwargs):
|
||||
"""
|
||||
Deletes the message. You're responsible for checking whether you
|
||||
have the permission to do so, or to except the error otherwise.
|
||||
|
@ -211,12 +211,12 @@ class NewMessage(EventBuilder):
|
|||
`telethon.telegram_client.TelegramClient.delete_messages` with
|
||||
``entity`` and ``message_ids`` already set.
|
||||
"""
|
||||
return self._client.delete_messages(self.input_chat,
|
||||
return await self._client.delete_messages(await self.input_chat,
|
||||
[self.message],
|
||||
*args, **kwargs)
|
||||
|
||||
@property
|
||||
def input_sender(self):
|
||||
async def input_sender(self):
|
||||
"""
|
||||
This (:tl:`InputPeer`) is the input version of the user who
|
||||
sent the message. Similarly to ``input_chat``, this doesn't have
|
||||
|
@ -230,21 +230,21 @@ class NewMessage(EventBuilder):
|
|||
return None
|
||||
|
||||
try:
|
||||
self._input_sender = self._client.get_input_entity(
|
||||
self._input_sender = await self._client.get_input_entity(
|
||||
self.message.from_id
|
||||
)
|
||||
except (ValueError, TypeError):
|
||||
# We can rely on self.input_chat for this
|
||||
self._sender, self._input_sender = self._get_entity(
|
||||
self._sender, self._input_sender = await self._get_entity(
|
||||
self.message.id,
|
||||
self.message.from_id,
|
||||
chat=self.input_chat
|
||||
chat=await self.input_chat
|
||||
)
|
||||
|
||||
return self._input_sender
|
||||
|
||||
@property
|
||||
def sender(self):
|
||||
async def sender(self):
|
||||
"""
|
||||
This (:tl:`User`) may make an API call the first time to get
|
||||
the most up to date version of the sender (mostly when the event
|
||||
|
@ -252,7 +252,7 @@ class NewMessage(EventBuilder):
|
|||
|
||||
``input_sender`` needs to be available (often the case).
|
||||
"""
|
||||
if not self.input_sender:
|
||||
if not await self.input_sender:
|
||||
return None
|
||||
|
||||
if self._sender is None:
|
||||
|
@ -260,7 +260,7 @@ class NewMessage(EventBuilder):
|
|||
self._entities.get(utils.get_peer_id(self._input_sender))
|
||||
|
||||
if self._sender is None:
|
||||
self._sender = self._client.get_entity(self._input_sender)
|
||||
self._sender = await self._client.get_entity(self._input_sender)
|
||||
|
||||
return self._sender
|
||||
|
||||
|
@ -291,7 +291,7 @@ class NewMessage(EventBuilder):
|
|||
return self.message.message
|
||||
|
||||
@property
|
||||
def reply_message(self):
|
||||
async def reply_message(self):
|
||||
"""
|
||||
This optional :tl:`Message` will make an API call the first
|
||||
time to get the full :tl:`Message` object that one was replying to,
|
||||
|
@ -301,12 +301,12 @@ class NewMessage(EventBuilder):
|
|||
return None
|
||||
|
||||
if self._reply_message is None:
|
||||
if isinstance(self.input_chat, types.InputPeerChannel):
|
||||
r = self._client(functions.channels.GetMessagesRequest(
|
||||
self.input_chat, [self.message.reply_to_msg_id]
|
||||
if isinstance(await self.input_chat, types.InputPeerChannel):
|
||||
r = await self._client(functions.channels.GetMessagesRequest(
|
||||
await self.input_chat, [self.message.reply_to_msg_id]
|
||||
))
|
||||
else:
|
||||
r = self._client(functions.messages.GetMessagesRequest(
|
||||
r = await self._client(functions.messages.GetMessagesRequest(
|
||||
[self.message.reply_to_msg_id]
|
||||
))
|
||||
if not isinstance(r, types.messages.MessagesNotModified):
|
||||
|
|
|
@ -22,7 +22,7 @@ class Raw(EventBuilder):
|
|||
assert all(isinstance(x, type) for x in types)
|
||||
self.types = tuple(types)
|
||||
|
||||
def resolve(self, client):
|
||||
async def resolve(self, client):
|
||||
pass
|
||||
|
||||
def build(self, update):
|
||||
|
|
|
@ -148,14 +148,14 @@ class UserUpdate(EventBuilder):
|
|||
self.uploading = self.video = True
|
||||
|
||||
@property
|
||||
def user(self):
|
||||
async def user(self):
|
||||
"""Alias around the chat (conversation)."""
|
||||
return self.chat
|
||||
return await self.chat
|
||||
|
||||
@property
|
||||
def input_user(self):
|
||||
async def input_user(self):
|
||||
"""Alias around the input chat."""
|
||||
return self.input_chat
|
||||
return await self.input_chat
|
||||
|
||||
@property
|
||||
def user_id(self):
|
||||
|
|
|
@ -194,9 +194,9 @@ def get_inner_text(text, entity):
|
|||
"""
|
||||
if isinstance(entity, TLObject):
|
||||
entity = (entity,)
|
||||
multiple = True
|
||||
else:
|
||||
multiple = False
|
||||
else:
|
||||
multiple = True
|
||||
|
||||
text = _add_surrogate(text)
|
||||
result = []
|
||||
|
|
|
@ -1,31 +1,39 @@
|
|||
"""
|
||||
This module holds a rough implementation of the C# TCP client.
|
||||
"""
|
||||
# Python rough implementation of a C# TCP client
|
||||
import asyncio
|
||||
import errno
|
||||
import logging
|
||||
import socket
|
||||
import time
|
||||
from datetime import timedelta
|
||||
from io import BytesIO, BufferedWriter
|
||||
from threading import Lock
|
||||
|
||||
CONN_RESET_ERRNOS = {
|
||||
errno.EBADF, errno.ENOTSOCK, errno.ENETUNREACH,
|
||||
errno.EINVAL, errno.ENOTCONN, errno.EHOSTUNREACH,
|
||||
errno.ECONNREFUSED, errno.ECONNRESET, errno.ECONNABORTED,
|
||||
errno.ENETDOWN, errno.ENETRESET, errno.ECONNABORTED,
|
||||
errno.EHOSTDOWN, errno.EPIPE, errno.ESHUTDOWN
|
||||
}
|
||||
# catched: EHOSTUNREACH, ECONNREFUSED, ECONNRESET, ENETUNREACH
|
||||
# ConnectionError: EPIPE, ESHUTDOWN, ECONNABORTED, ECONNREFUSED, ECONNRESET
|
||||
|
||||
try:
|
||||
import socks
|
||||
except ImportError:
|
||||
socks = None
|
||||
|
||||
MAX_TIMEOUT = 15 # in seconds
|
||||
CONN_RESET_ERRNOS = {
|
||||
errno.EBADF, errno.ENOTSOCK, errno.ENETUNREACH,
|
||||
errno.EINVAL, errno.ENOTCONN
|
||||
}
|
||||
|
||||
__log__ = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class TcpClient:
|
||||
"""A simple TCP client to ease the work with sockets and proxies."""
|
||||
def __init__(self, proxy=None, timeout=timedelta(seconds=5)):
|
||||
|
||||
class SocketClosed(ConnectionError):
|
||||
pass
|
||||
|
||||
def __init__(self, proxy=None, timeout=timedelta(seconds=5), loop=None):
|
||||
"""
|
||||
Initializes the TCP client.
|
||||
|
||||
|
@ -34,7 +42,9 @@ class TcpClient:
|
|||
"""
|
||||
self.proxy = proxy
|
||||
self._socket = None
|
||||
self._closing_lock = Lock()
|
||||
self._loop = loop if loop else asyncio.get_event_loop()
|
||||
self._closed = asyncio.Event(loop=self._loop)
|
||||
self._closed.set()
|
||||
|
||||
if isinstance(timeout, timedelta):
|
||||
self.timeout = timeout.seconds
|
||||
|
@ -54,9 +64,9 @@ class TcpClient:
|
|||
else: # tuple, list, etc.
|
||||
self._socket.set_proxy(*self.proxy)
|
||||
|
||||
self._socket.settimeout(self.timeout)
|
||||
self._socket.setblocking(False)
|
||||
|
||||
def connect(self, ip, port):
|
||||
async def connect(self, ip, port):
|
||||
"""
|
||||
Tries connecting forever to IP:port unless an OSError is raised.
|
||||
|
||||
|
@ -69,75 +79,71 @@ class TcpClient:
|
|||
else:
|
||||
mode, address = socket.AF_INET, (ip, port)
|
||||
|
||||
timeout = 1
|
||||
while True:
|
||||
try:
|
||||
while not self._socket:
|
||||
if not self._socket:
|
||||
self._recreate_socket(mode)
|
||||
|
||||
self._socket.connect(address)
|
||||
break # Successful connection, stop retrying to connect
|
||||
await asyncio.wait_for(
|
||||
self._loop.sock_connect(self._socket, address),
|
||||
timeout=self.timeout,
|
||||
loop=self._loop
|
||||
)
|
||||
|
||||
self._closed.clear()
|
||||
except asyncio.TimeoutError as e:
|
||||
raise TimeoutError() from e
|
||||
except OSError as e:
|
||||
__log__.info('OSError "%s" raised while connecting', e)
|
||||
# Stop retrying to connect if proxy connection error occurred
|
||||
if socks and isinstance(e, socks.ProxyConnectionError):
|
||||
raise
|
||||
# There are some errors that we know how to handle, and
|
||||
# the loop will allow us to retry
|
||||
if e.errno in (errno.EBADF, errno.ENOTSOCK, errno.EINVAL,
|
||||
errno.ECONNREFUSED, # Windows-specific follow
|
||||
getattr(errno, 'WSAEACCES', None)):
|
||||
# Bad file descriptor, i.e. socket was closed, set it
|
||||
# to none to recreate it on the next iteration
|
||||
self._socket = None
|
||||
time.sleep(timeout)
|
||||
timeout *= 2
|
||||
if timeout > MAX_TIMEOUT:
|
||||
raise
|
||||
if e.errno in CONN_RESET_ERRNOS:
|
||||
self._raise_connection_reset(e)
|
||||
else:
|
||||
raise
|
||||
|
||||
def _get_connected(self):
|
||||
"""Determines whether the client is connected or not."""
|
||||
return self._socket is not None and self._socket.fileno() >= 0
|
||||
return not self._closed.is_set()
|
||||
|
||||
connected = property(fget=_get_connected)
|
||||
|
||||
def close(self):
|
||||
"""Closes the connection."""
|
||||
if self._closing_lock.locked():
|
||||
# Already closing, no need to close again (avoid None.close())
|
||||
return
|
||||
|
||||
with self._closing_lock:
|
||||
try:
|
||||
if self._socket is not None:
|
||||
if self.connected:
|
||||
self._socket.shutdown(socket.SHUT_RDWR)
|
||||
self._socket.close()
|
||||
except OSError:
|
||||
pass # Ignore ENOTCONN, EBADF, and any other error when closing
|
||||
finally:
|
||||
self._socket = None
|
||||
self._closed.set()
|
||||
|
||||
def write(self, data):
|
||||
async def _wait_close(self, coro):
|
||||
done, running = await asyncio.wait(
|
||||
[coro, self._closed.wait()],
|
||||
timeout=self.timeout,
|
||||
return_when=asyncio.FIRST_COMPLETED,
|
||||
loop=self._loop
|
||||
)
|
||||
for r in running:
|
||||
r.cancel()
|
||||
if not self.connected:
|
||||
raise self.SocketClosed()
|
||||
if not done:
|
||||
raise TimeoutError()
|
||||
return done.pop().result()
|
||||
|
||||
async def write(self, data):
|
||||
"""
|
||||
Writes (sends) the specified bytes to the connected peer.
|
||||
|
||||
:param data: the data to send.
|
||||
"""
|
||||
if self._socket is None:
|
||||
self._raise_connection_reset(None)
|
||||
|
||||
# TODO Timeout may be an issue when sending the data, Changed in v3.5:
|
||||
# The socket timeout is now the maximum total duration to send all data.
|
||||
if not self.connected:
|
||||
raise ConnectionResetError('No connection')
|
||||
try:
|
||||
self._socket.sendall(data)
|
||||
except socket.timeout as e:
|
||||
__log__.debug('socket.timeout "%s" while writing data', e)
|
||||
raise TimeoutError() from e
|
||||
except ConnectionError as e:
|
||||
__log__.info('ConnectionError "%s" while writing data', e)
|
||||
self._raise_connection_reset(e)
|
||||
await self._wait_close(self.sock_sendall(data))
|
||||
except self.SocketClosed:
|
||||
raise ConnectionResetError('Socket has closed')
|
||||
except OSError as e:
|
||||
__log__.info('OSError "%s" while writing data', e)
|
||||
if e.errno in CONN_RESET_ERRNOS:
|
||||
|
@ -145,22 +151,22 @@ class TcpClient:
|
|||
else:
|
||||
raise
|
||||
|
||||
def read(self, size):
|
||||
async def read(self, size):
|
||||
"""
|
||||
Reads (receives) a whole block of size bytes from the connected peer.
|
||||
|
||||
:param size: the size of the block to be read.
|
||||
:return: the read data with len(data) == size.
|
||||
"""
|
||||
if self._socket is None:
|
||||
self._raise_connection_reset(None)
|
||||
|
||||
with BufferedWriter(BytesIO(), buffer_size=size) as buffer:
|
||||
bytes_left = size
|
||||
partial = b''
|
||||
while bytes_left != 0:
|
||||
if not self.connected:
|
||||
raise ConnectionResetError('No connection')
|
||||
try:
|
||||
partial = self._socket.recv(bytes_left)
|
||||
except socket.timeout as e:
|
||||
partial = await self._wait_close(self.sock_recv(bytes_left))
|
||||
except TimeoutError as e:
|
||||
# These are somewhat common if the server has nothing
|
||||
# to send to us, so use a lower logging priority.
|
||||
if bytes_left < size:
|
||||
|
@ -173,12 +179,11 @@ class TcpClient:
|
|||
'socket.timeout "%s" while reading data', e
|
||||
)
|
||||
|
||||
raise TimeoutError() from e
|
||||
except ConnectionError as e:
|
||||
__log__.info('ConnectionError "%s" while reading data', e)
|
||||
self._raise_connection_reset(e)
|
||||
raise
|
||||
except self.SocketClosed:
|
||||
raise ConnectionResetError('Socket has closed while reading data')
|
||||
except OSError as e:
|
||||
if e.errno != errno.EBADF and self._closing_lock.locked():
|
||||
if e.errno != errno.EBADF:
|
||||
# Ignore bad file descriptor while closing
|
||||
__log__.info('OSError "%s" while reading data', e)
|
||||
|
||||
|
@ -188,7 +193,7 @@ class TcpClient:
|
|||
raise
|
||||
|
||||
if len(partial) == 0:
|
||||
self._raise_connection_reset(None)
|
||||
self._raise_connection_reset('No data on read')
|
||||
|
||||
buffer.write(partial)
|
||||
bytes_left -= len(partial)
|
||||
|
@ -197,8 +202,61 @@ class TcpClient:
|
|||
buffer.flush()
|
||||
return buffer.raw.getvalue()
|
||||
|
||||
def _raise_connection_reset(self, original):
|
||||
"""Disconnects the client and raises ConnectionResetError."""
|
||||
def _raise_connection_reset(self, error):
|
||||
description = error if isinstance(error, str) else str(error)
|
||||
if isinstance(error, str):
|
||||
error = Exception(error)
|
||||
self.close() # Connection reset -> flag as socket closed
|
||||
raise ConnectionResetError('The server has closed the connection.')\
|
||||
from original
|
||||
raise ConnectionResetError(description) from error
|
||||
|
||||
# due to new https://github.com/python/cpython/pull/4386
|
||||
def sock_recv(self, n):
|
||||
fut = self._loop.create_future()
|
||||
self._sock_recv(fut, None, n)
|
||||
return fut
|
||||
|
||||
def _sock_recv(self, fut, registered_fd, n):
|
||||
if registered_fd is not None:
|
||||
self._loop.remove_reader(registered_fd)
|
||||
if fut.cancelled() or self._socket is None:
|
||||
return
|
||||
|
||||
try:
|
||||
data = self._socket.recv(n)
|
||||
except (BlockingIOError, InterruptedError):
|
||||
fd = self._socket.fileno()
|
||||
self._loop.add_reader(fd, self._sock_recv, fut, fd, n)
|
||||
except Exception as exc:
|
||||
fut.set_exception(exc)
|
||||
else:
|
||||
fut.set_result(data)
|
||||
|
||||
def sock_sendall(self, data):
|
||||
fut = self._loop.create_future()
|
||||
if data:
|
||||
self._sock_sendall(fut, None, data)
|
||||
else:
|
||||
fut.set_result(None)
|
||||
return fut
|
||||
|
||||
def _sock_sendall(self, fut, registered_fd, data):
|
||||
if registered_fd:
|
||||
self._loop.remove_writer(registered_fd)
|
||||
if fut.cancelled() or self._socket is None:
|
||||
return
|
||||
|
||||
try:
|
||||
n = self._socket.send(data)
|
||||
except (BlockingIOError, InterruptedError):
|
||||
n = 0
|
||||
except Exception as exc:
|
||||
fut.set_exception(exc)
|
||||
return
|
||||
|
||||
if n == len(data):
|
||||
fut.set_result(None)
|
||||
else:
|
||||
if n:
|
||||
data = data[n:]
|
||||
fd = self._socket.fileno()
|
||||
self._loop.add_writer(fd, self._sock_sendall, fut, fd, data)
|
||||
|
|
|
@ -3,9 +3,9 @@ import os
|
|||
import struct
|
||||
from hashlib import sha1, sha256
|
||||
|
||||
from telethon.crypto import AES
|
||||
from telethon.errors import SecurityError
|
||||
from telethon.extensions import BinaryReader
|
||||
from .crypto import AES
|
||||
from .errors import SecurityError
|
||||
from .extensions import BinaryReader
|
||||
|
||||
|
||||
# region Multiple utilities
|
||||
|
|
|
@ -20,7 +20,7 @@ from ..tl.functions import (
|
|||
)
|
||||
|
||||
|
||||
def do_authentication(connection, retries=5):
|
||||
async def do_authentication(connection, retries=5):
|
||||
"""
|
||||
Performs the authentication steps on the given connection.
|
||||
Raises an error if all attempts fail.
|
||||
|
@ -35,14 +35,14 @@ def do_authentication(connection, retries=5):
|
|||
last_error = None
|
||||
while retries:
|
||||
try:
|
||||
return _do_authentication(connection)
|
||||
return await _do_authentication(connection)
|
||||
except (SecurityError, AssertionError, NotImplementedError) as e:
|
||||
last_error = e
|
||||
retries -= 1
|
||||
raise last_error
|
||||
|
||||
|
||||
def _do_authentication(connection):
|
||||
async def _do_authentication(connection):
|
||||
"""
|
||||
Executes the authentication process with the Telegram servers.
|
||||
|
||||
|
@ -55,8 +55,8 @@ def _do_authentication(connection):
|
|||
req_pq_request = ReqPqMultiRequest(
|
||||
nonce=int.from_bytes(os.urandom(16), 'big', signed=True)
|
||||
)
|
||||
sender.send(bytes(req_pq_request))
|
||||
with BinaryReader(sender.receive()) as reader:
|
||||
await sender.send(bytes(req_pq_request))
|
||||
with BinaryReader(await sender.receive()) as reader:
|
||||
req_pq_request.on_response(reader)
|
||||
|
||||
res_pq = req_pq_request.result
|
||||
|
@ -103,10 +103,10 @@ def _do_authentication(connection):
|
|||
public_key_fingerprint=target_fingerprint,
|
||||
encrypted_data=cipher_text
|
||||
)
|
||||
sender.send(bytes(req_dh_params))
|
||||
await sender.send(bytes(req_dh_params))
|
||||
|
||||
# Step 2 response: DH Exchange
|
||||
with BinaryReader(sender.receive()) as reader:
|
||||
with BinaryReader(await sender.receive()) as reader:
|
||||
req_dh_params.on_response(reader)
|
||||
|
||||
server_dh_params = req_dh_params.result
|
||||
|
@ -173,10 +173,10 @@ def _do_authentication(connection):
|
|||
server_nonce=res_pq.server_nonce,
|
||||
encrypted_data=client_dh_encrypted,
|
||||
)
|
||||
sender.send(bytes(set_client_dh))
|
||||
await sender.send(bytes(set_client_dh))
|
||||
|
||||
# Step 3 response: Complete DH Exchange
|
||||
with BinaryReader(sender.receive()) as reader:
|
||||
with BinaryReader(await sender.receive()) as reader:
|
||||
set_client_dh.on_response(reader)
|
||||
|
||||
dh_gen = set_client_dh.result
|
||||
|
|
|
@ -2,6 +2,7 @@
|
|||
This module holds the abstract `Connection` class.
|
||||
"""
|
||||
import abc
|
||||
import asyncio
|
||||
from datetime import timedelta
|
||||
|
||||
|
||||
|
@ -12,15 +13,17 @@ class Connection(abc.ABC):
|
|||
Subclasses should implement the actual protocol
|
||||
being used when encoding/decoding messages.
|
||||
"""
|
||||
def __init__(self, proxy=None, timeout=timedelta(seconds=5)):
|
||||
def __init__(self, proxy=None, timeout=timedelta(seconds=5), loop=None):
|
||||
"""
|
||||
Initializes a new connection.
|
||||
|
||||
:param proxy: whether to use a proxy or not.
|
||||
:param timeout: timeout to be used for all operations.
|
||||
:param loop: event loop to be used, or ``asyncio.get_event_loop()``.
|
||||
"""
|
||||
self._proxy = proxy
|
||||
self._timeout = timeout
|
||||
self._loop = loop or asyncio.get_event_loop()
|
||||
|
||||
@abc.abstractmethod
|
||||
def connect(self, ip, port):
|
||||
|
@ -32,7 +35,7 @@ class Connection(abc.ABC):
|
|||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def is_connected(self):
|
||||
async def is_connected(self):
|
||||
"""
|
||||
Determines whether the connection is alive or not.
|
||||
|
||||
|
@ -51,11 +54,11 @@ class Connection(abc.ABC):
|
|||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def recv(self):
|
||||
async def recv(self):
|
||||
"""Receives and unpacks a message"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def send(self, message):
|
||||
async def send(self, message):
|
||||
"""Encapsulates and sends the given message"""
|
||||
raise NotImplementedError
|
||||
|
|
|
@ -9,26 +9,26 @@ class ConnectionTcpAbridged(ConnectionTcpFull):
|
|||
only require 1 byte if the packet length is less than
|
||||
508 bytes (127 << 2, which is very common).
|
||||
"""
|
||||
def connect(self, ip, port):
|
||||
result = super().connect(ip, port)
|
||||
self.conn.write(b'\xef')
|
||||
async def connect(self, ip, port):
|
||||
result = await super().connect(ip, port)
|
||||
await self.conn.write(b'\xef')
|
||||
return result
|
||||
|
||||
def clone(self):
|
||||
return ConnectionTcpAbridged(self._proxy, self._timeout)
|
||||
|
||||
def recv(self):
|
||||
length = struct.unpack('<B', self.read(1))[0]
|
||||
async def recv(self):
|
||||
length = struct.unpack('<B', await self.read(1))[0]
|
||||
if length >= 127:
|
||||
length = struct.unpack('<i', self.read(3) + b'\0')[0]
|
||||
length = struct.unpack('<i', await self.read(3) + b'\0')[0]
|
||||
|
||||
return self.read(length << 2)
|
||||
return await self.read(length << 2)
|
||||
|
||||
def send(self, message):
|
||||
async def send(self, message):
|
||||
length = len(message) >> 2
|
||||
if length < 127:
|
||||
length = struct.pack('B', length)
|
||||
else:
|
||||
length = b'\x7f' + int.to_bytes(length, 3, 'little')
|
||||
|
||||
self.write(length + message)
|
||||
await self.write(length + message)
|
||||
|
|
|
@ -13,16 +13,16 @@ class ConnectionTcpFull(Connection):
|
|||
Default Telegram mode. Sends 12 additional bytes and
|
||||
needs to calculate the CRC value of the packet itself.
|
||||
"""
|
||||
def __init__(self, proxy=None, timeout=timedelta(seconds=5)):
|
||||
super().__init__(proxy, timeout)
|
||||
def __init__(self, proxy=None, timeout=timedelta(seconds=5), loop=None):
|
||||
super().__init__(proxy, timeout, loop)
|
||||
self._send_counter = 0
|
||||
self.conn = TcpClient(proxy=self._proxy, timeout=self._timeout)
|
||||
self.read = self.conn.read
|
||||
self.write = self.conn.write
|
||||
|
||||
def connect(self, ip, port):
|
||||
async def connect(self, ip, port):
|
||||
try:
|
||||
self.conn.connect(ip, port)
|
||||
await self.conn.connect(ip, port)
|
||||
except OSError as e:
|
||||
if e.errno == errno.EISCONN:
|
||||
return # Already connected, no need to re-set everything up
|
||||
|
@ -43,11 +43,11 @@ class ConnectionTcpFull(Connection):
|
|||
def clone(self):
|
||||
return ConnectionTcpFull(self._proxy, self._timeout)
|
||||
|
||||
def recv(self):
|
||||
packet_len_seq = self.read(8) # 4 and 4
|
||||
async def recv(self):
|
||||
packet_len_seq = await self.read(8) # 4 and 4
|
||||
packet_len, seq = struct.unpack('<ii', packet_len_seq)
|
||||
body = self.read(packet_len - 12)
|
||||
checksum = struct.unpack('<I', self.read(4))[0]
|
||||
body = await self.read(packet_len - 12)
|
||||
checksum = struct.unpack('<I', await self.read(4))[0]
|
||||
|
||||
valid_checksum = crc32(packet_len_seq + body)
|
||||
if checksum != valid_checksum:
|
||||
|
@ -55,11 +55,11 @@ class ConnectionTcpFull(Connection):
|
|||
|
||||
return body
|
||||
|
||||
def send(self, message):
|
||||
async def send(self, message):
|
||||
# https://core.telegram.org/mtproto#tcp-transport
|
||||
# total length, sequence number, packet and checksum (CRC32)
|
||||
length = len(message) + 12
|
||||
data = struct.pack('<ii', length, self._send_counter) + message
|
||||
crc = struct.pack('<I', crc32(data))
|
||||
self._send_counter += 1
|
||||
self.write(data + crc)
|
||||
await self.write(data + crc)
|
||||
|
|
|
@ -8,16 +8,16 @@ class ConnectionTcpIntermediate(ConnectionTcpFull):
|
|||
Intermediate mode between `ConnectionTcpFull` and `ConnectionTcpAbridged`.
|
||||
Always sends 4 extra bytes for the packet length.
|
||||
"""
|
||||
def connect(self, ip, port):
|
||||
result = super().connect(ip, port)
|
||||
self.conn.write(b'\xee\xee\xee\xee')
|
||||
async def connect(self, ip, port):
|
||||
result = await super().connect(ip, port)
|
||||
await self.conn.write(b'\xee\xee\xee\xee')
|
||||
return result
|
||||
|
||||
def clone(self):
|
||||
return ConnectionTcpIntermediate(self._proxy, self._timeout)
|
||||
|
||||
def recv(self):
|
||||
return self.read(struct.unpack('<i', self.read(4))[0])
|
||||
async def recv(self):
|
||||
return await self.read(struct.unpack('<i', await self.read(4))[0])
|
||||
|
||||
def send(self, message):
|
||||
self.write(struct.pack('<i', len(message)) + message)
|
||||
async def send(self, message):
|
||||
await self.write(struct.pack('<i', len(message)) + message)
|
||||
|
|
|
@ -12,14 +12,21 @@ class ConnectionTcpObfuscated(ConnectionTcpAbridged):
|
|||
every message with a randomly generated key using the
|
||||
AES-CTR mode so the packets are harder to discern.
|
||||
"""
|
||||
def __init__(self, proxy=None, timeout=timedelta(seconds=5)):
|
||||
super().__init__(proxy, timeout)
|
||||
def __init__(self, proxy=None, timeout=timedelta(seconds=5), loop=None):
|
||||
super().__init__(proxy, timeout, loop)
|
||||
self._aes_encrypt, self._aes_decrypt = None, None
|
||||
self.read = lambda s: self._aes_decrypt.encrypt(self.conn.read(s))
|
||||
self.write = lambda d: self.conn.write(self._aes_encrypt.encrypt(d))
|
||||
|
||||
def connect(self, ip, port):
|
||||
result = ConnectionTcpFull.connect(self, ip, port)
|
||||
async def read(size):
|
||||
return self._aes_decrypt.encrypt(await self.conn.read(size))
|
||||
|
||||
async def write(data):
|
||||
return await self.conn.write(self._aes_encrypt.encrypt(data))
|
||||
|
||||
self.read = read
|
||||
self.write = write
|
||||
|
||||
async def connect(self, ip, port):
|
||||
result = await ConnectionTcpFull.connect(self, ip, port)
|
||||
# Obfuscated messages secrets cannot start with any of these
|
||||
keywords = (b'PVrG', b'GET ', b'POST', b'\xee' * 4)
|
||||
while True:
|
||||
|
@ -43,7 +50,7 @@ class ConnectionTcpObfuscated(ConnectionTcpAbridged):
|
|||
self._aes_decrypt = AESModeCTR(decrypt_key, decrypt_iv)
|
||||
|
||||
random[56:64] = self._aes_encrypt.encrypt(bytes(random))[56:64]
|
||||
self.conn.write(bytes(random))
|
||||
await self.conn.write(bytes(random))
|
||||
return result
|
||||
|
||||
def clone(self):
|
||||
|
|
|
@ -26,32 +26,32 @@ class MtProtoPlainSender:
|
|||
self._last_msg_id = 0
|
||||
self._connection = connection
|
||||
|
||||
def connect(self):
|
||||
async def connect(self):
|
||||
"""Connects to Telegram's servers."""
|
||||
self._connection.connect()
|
||||
await self._connection.connect()
|
||||
|
||||
def disconnect(self):
|
||||
"""Disconnects from Telegram's servers."""
|
||||
self._connection.close()
|
||||
|
||||
def send(self, data):
|
||||
async def send(self, data):
|
||||
"""
|
||||
Sends a plain packet (auth_key_id = 0) containing the
|
||||
given message body (data).
|
||||
|
||||
:param data: the data to be sent.
|
||||
"""
|
||||
self._connection.send(
|
||||
await self._connection.send(
|
||||
struct.pack('<QQi', 0, self._get_new_msg_id(), len(data)) + data
|
||||
)
|
||||
|
||||
def receive(self):
|
||||
async def receive(self):
|
||||
"""
|
||||
Receives a plain packet from the network.
|
||||
|
||||
:return: the response body.
|
||||
"""
|
||||
body = self._connection.recv()
|
||||
body = await self._connection.recv()
|
||||
if body == b'l\xfe\xff\xff': # -404 little endian signed
|
||||
# Broken authorization, must reset the auth key
|
||||
raise BrokenAuthKeyError()
|
||||
|
|
|
@ -2,8 +2,9 @@
|
|||
This module contains the class used to communicate with Telegram's servers
|
||||
encrypting every packet, and relies on a valid AuthKey in the used Session.
|
||||
"""
|
||||
import asyncio
|
||||
import logging
|
||||
from threading import Lock
|
||||
from asyncio import Event
|
||||
|
||||
from .. import helpers, utils
|
||||
from ..errors import (
|
||||
|
@ -13,11 +14,11 @@ from ..errors import (
|
|||
from ..extensions import BinaryReader
|
||||
from ..tl import TLMessage, MessageContainer, GzipPacked
|
||||
from ..tl.all_tlobjects import tlobjects
|
||||
from ..tl.functions import InvokeAfterMsgRequest
|
||||
from ..tl.functions.auth import LogOutRequest
|
||||
from ..tl.types import (
|
||||
MsgsAck, Pong, BadServerSalt, BadMsgNotification, FutureSalts,
|
||||
MsgNewDetailedInfo, NewSessionCreated, MsgDetailedInfo
|
||||
MsgNewDetailedInfo, MsgDetailedInfo, MsgsStateReq, MsgResendReq,
|
||||
MsgsAllInfo, MsgsStateInfo, RpcError
|
||||
)
|
||||
|
||||
__log__ = logging.getLogger(__name__)
|
||||
|
@ -35,7 +36,7 @@ class MtProtoSender:
|
|||
in parallel, so thread-safety (hence locking) isn't needed.
|
||||
"""
|
||||
|
||||
def __init__(self, session, connection):
|
||||
def __init__(self, session, connection, loop=None):
|
||||
"""
|
||||
Initializes a new MTProto sender.
|
||||
|
||||
|
@ -44,28 +45,26 @@ class MtProtoSender:
|
|||
port of the server, salt, ID, and AuthKey,
|
||||
:param connection:
|
||||
the Connection to be used.
|
||||
:param loop:
|
||||
the asyncio loop to be used, or the default one.
|
||||
"""
|
||||
self.session = session
|
||||
self.connection = connection
|
||||
|
||||
# Message IDs that need confirmation
|
||||
self._need_confirmation = set()
|
||||
|
||||
# Requests (as msg_id: Message) sent waiting to be received
|
||||
self._pending_receive = {}
|
||||
|
||||
# Multithreading
|
||||
self._send_lock = Lock()
|
||||
self._loop = loop if loop else asyncio.get_event_loop()
|
||||
|
||||
# If we're invoking something from an update thread but we're also
|
||||
# receiving other request from the main thread (e.g. an update arrives
|
||||
# and we need to process it) we must ensure that only one is calling
|
||||
# receive at a given moment, since the receive step is fragile.
|
||||
self._recv_lock = Lock()
|
||||
self._read_lock = asyncio.Lock(loop=self._loop)
|
||||
self._write_lock = asyncio.Lock(loop=self._loop)
|
||||
|
||||
def connect(self):
|
||||
# Requests (as msg_id: Message) sent waiting to be received
|
||||
self._pending_receive = {}
|
||||
|
||||
async def connect(self):
|
||||
"""Connects to the server."""
|
||||
self.connection.connect(self.session.server_address, self.session.port)
|
||||
await self.connection.connect(self.session.server_address, self.session.port)
|
||||
|
||||
def is_connected(self):
|
||||
"""
|
||||
|
@ -75,15 +74,16 @@ class MtProtoSender:
|
|||
"""
|
||||
return self.connection.is_connected()
|
||||
|
||||
def disconnect(self):
|
||||
def disconnect(self, clear_pendings=True):
|
||||
"""Disconnects from the server."""
|
||||
__log__.info('Disconnecting MtProtoSender...')
|
||||
self.connection.close()
|
||||
if clear_pendings:
|
||||
self._clear_all_pending()
|
||||
|
||||
# region Send and receive
|
||||
|
||||
def send(self, requests, ordered=False):
|
||||
async def send(self, requests, ordered=False):
|
||||
"""
|
||||
Sends the specified TLObject(s) (which must be requests),
|
||||
and acknowledging any message which needed confirmation.
|
||||
|
@ -92,10 +92,18 @@ class MtProtoSender:
|
|||
:param ordered: whether the requests should be invoked in the
|
||||
order in which they appear or they can be executed
|
||||
in arbitrary order in the server.
|
||||
:return: a list of msg_ids which are correspond to sent requests.
|
||||
"""
|
||||
if not utils.is_list_like(requests):
|
||||
requests = (requests,)
|
||||
|
||||
# Prepare the event of every request
|
||||
for r in requests:
|
||||
if r.confirm_received is None:
|
||||
r.confirm_received = Event(loop=self._loop)
|
||||
else:
|
||||
r.confirm_received.clear()
|
||||
|
||||
if ordered:
|
||||
requests = iter(requests)
|
||||
messages = [TLMessage(self.session, next(requests))]
|
||||
|
@ -106,19 +114,13 @@ class MtProtoSender:
|
|||
messages = [TLMessage(self.session, r) for r in requests]
|
||||
|
||||
self._pending_receive.update({m.msg_id: m for m in messages})
|
||||
msg_ids = [m.msg_id for m in messages]
|
||||
|
||||
__log__.debug('Sending requests with IDs: %s', ', '.join(
|
||||
'{}: {}'.format(m.request.__class__.__name__, m.msg_id)
|
||||
for m in messages
|
||||
))
|
||||
|
||||
# Pack everything in the same container if we need to send AckRequests
|
||||
if self._need_confirmation:
|
||||
messages.append(
|
||||
TLMessage(self.session, MsgsAck(list(self._need_confirmation)))
|
||||
)
|
||||
self._need_confirmation.clear()
|
||||
|
||||
if len(messages) == 1:
|
||||
message = messages[0]
|
||||
else:
|
||||
|
@ -129,13 +131,19 @@ class MtProtoSender:
|
|||
for m in messages:
|
||||
m.container_msg_id = message.msg_id
|
||||
|
||||
self._send_message(message)
|
||||
await self._send_message(message)
|
||||
return msg_ids
|
||||
|
||||
def _send_acknowledge(self, msg_id):
|
||||
def forget_pendings(self, msg_ids):
|
||||
for msg_id in msg_ids:
|
||||
if msg_id in self._pending_receive:
|
||||
del self._pending_receive[msg_id]
|
||||
|
||||
async def _send_acknowledge(self, msg_id):
|
||||
"""Sends a message acknowledge for the given msg_id."""
|
||||
self._send_message(TLMessage(self.session, MsgsAck([msg_id])))
|
||||
await self._send_message(TLMessage(self.session, MsgsAck([msg_id])))
|
||||
|
||||
def receive(self, update_state):
|
||||
async def receive(self, updates_handler):
|
||||
"""
|
||||
Receives a single message from the connected endpoint.
|
||||
|
||||
|
@ -146,21 +154,13 @@ class MtProtoSender:
|
|||
Any unhandled object (likely updates) will be passed to
|
||||
update_state.process(TLObject).
|
||||
|
||||
:param update_state:
|
||||
the UpdateState that will process all the received
|
||||
:param updates_handler:
|
||||
the handler that will process all the received
|
||||
Update and Updates objects.
|
||||
"""
|
||||
if self._recv_lock.locked():
|
||||
with self._recv_lock:
|
||||
# Don't busy wait, acquire it but return because there's
|
||||
# already a receive running and we don't want another one.
|
||||
# It would lock until Telegram sent another update even if
|
||||
# the current receive already received the expected response.
|
||||
return
|
||||
|
||||
await self._read_lock.acquire()
|
||||
try:
|
||||
with self._recv_lock:
|
||||
body = self.connection.recv()
|
||||
body = await self.connection.recv()
|
||||
except (BufferError, InvalidChecksumError):
|
||||
# TODO BufferError, we should spot the cause...
|
||||
# "No more bytes left"; something wrong happened, clear
|
||||
|
@ -174,23 +174,28 @@ class MtProtoSender:
|
|||
len(self._pending_receive))
|
||||
self._clear_all_pending()
|
||||
return
|
||||
finally:
|
||||
self._read_lock.release()
|
||||
|
||||
message, remote_msg_id, remote_seq = self._decode_msg(body)
|
||||
with BinaryReader(message) as reader:
|
||||
self._process_msg(remote_msg_id, remote_seq, reader, update_state)
|
||||
await self._process_msg(remote_msg_id, remote_seq, reader, updates_handler)
|
||||
|
||||
# endregion
|
||||
|
||||
# region Low level processing
|
||||
|
||||
def _send_message(self, message):
|
||||
async def _send_message(self, message):
|
||||
"""
|
||||
Sends the given encrypted through the network.
|
||||
|
||||
:param message: the TLMessage to be sent.
|
||||
"""
|
||||
with self._send_lock:
|
||||
self.connection.send(helpers.pack_message(self.session, message))
|
||||
await self._write_lock.acquire()
|
||||
try:
|
||||
await self.connection.send(helpers.pack_message(self.session, message))
|
||||
finally:
|
||||
self._write_lock.release()
|
||||
|
||||
def _decode_msg(self, body):
|
||||
"""
|
||||
|
@ -208,18 +213,17 @@ class MtProtoSender:
|
|||
with BinaryReader(body) as reader:
|
||||
return helpers.unpack_message(self.session, reader)
|
||||
|
||||
def _process_msg(self, msg_id, sequence, reader, state):
|
||||
async def _process_msg(self, msg_id, sequence, reader, updates_handler):
|
||||
"""
|
||||
Processes the message read from the network inside reader.
|
||||
|
||||
:param msg_id: the ID of the message.
|
||||
:param sequence: the sequence of the message.
|
||||
:param reader: the BinaryReader that contains the message.
|
||||
:param state: the current UpdateState.
|
||||
:param updates_handler: the handler to process Update and Updates objects.
|
||||
:return: true if the message was handled correctly, false otherwise.
|
||||
"""
|
||||
# TODO Check salt, session_id and sequence_number
|
||||
self._need_confirmation.add(msg_id)
|
||||
|
||||
code = reader.read_int(signed=False)
|
||||
reader.seek(-4)
|
||||
|
@ -227,15 +231,16 @@ class MtProtoSender:
|
|||
# These are a bit of special case, not yet generated by the code gen
|
||||
if code == 0xf35c6d01: # rpc_result, (response of an RPC call)
|
||||
__log__.debug('Processing Remote Procedure Call result')
|
||||
return self._handle_rpc_result(msg_id, sequence, reader)
|
||||
await self._send_acknowledge(msg_id)
|
||||
return await self._handle_rpc_result(msg_id, sequence, reader)
|
||||
|
||||
if code == MessageContainer.CONSTRUCTOR_ID:
|
||||
__log__.debug('Processing container result')
|
||||
return self._handle_container(msg_id, sequence, reader, state)
|
||||
return await self._handle_container(msg_id, sequence, reader, updates_handler)
|
||||
|
||||
if code == GzipPacked.CONSTRUCTOR_ID:
|
||||
__log__.debug('Processing gzipped result')
|
||||
return self._handle_gzip_packed(msg_id, sequence, reader, state)
|
||||
return await self._handle_gzip_packed(msg_id, sequence, reader, updates_handler)
|
||||
|
||||
if code not in tlobjects:
|
||||
__log__.warning(
|
||||
|
@ -248,25 +253,30 @@ class MtProtoSender:
|
|||
__log__.debug('Processing %s result', type(obj).__name__)
|
||||
|
||||
if isinstance(obj, Pong):
|
||||
return self._handle_pong(msg_id, sequence, obj)
|
||||
return await self._handle_pong(msg_id, sequence, obj)
|
||||
|
||||
if isinstance(obj, BadServerSalt):
|
||||
return self._handle_bad_server_salt(msg_id, sequence, obj)
|
||||
return await self._handle_bad_server_salt(msg_id, sequence, obj)
|
||||
|
||||
if isinstance(obj, (MsgsStateReq, MsgResendReq)):
|
||||
# just answer we don't know anything
|
||||
return await self._handle_msgs_state_forgotten(msg_id, sequence, obj)
|
||||
|
||||
if isinstance(obj, MsgsAllInfo):
|
||||
# not interesting now
|
||||
return True
|
||||
|
||||
if isinstance(obj, BadMsgNotification):
|
||||
return self._handle_bad_msg_notification(msg_id, sequence, obj)
|
||||
return await self._handle_bad_msg_notification(msg_id, sequence, obj)
|
||||
|
||||
if isinstance(obj, MsgDetailedInfo):
|
||||
return self._handle_msg_detailed_info(msg_id, sequence, obj)
|
||||
return await self._handle_msg_detailed_info(msg_id, sequence, obj)
|
||||
|
||||
if isinstance(obj, MsgNewDetailedInfo):
|
||||
return self._handle_msg_new_detailed_info(msg_id, sequence, obj)
|
||||
|
||||
if isinstance(obj, NewSessionCreated):
|
||||
return self._handle_new_session_created(msg_id, sequence, obj)
|
||||
return await self._handle_msg_new_detailed_info(msg_id, sequence, obj)
|
||||
|
||||
if isinstance(obj, MsgsAck): # may handle the request we wanted
|
||||
# Ignore every ack request *unless* when logging out, when it's
|
||||
# Ignore every ack request *unless* when logging out,
|
||||
# when it seems to only make sense. We also need to set a non-None
|
||||
# result since Telegram doesn't send the response for these.
|
||||
for msg_id in obj.msg_ids:
|
||||
|
@ -287,8 +297,9 @@ class MtProtoSender:
|
|||
|
||||
# If the object isn't any of the above, then it should be an Update.
|
||||
self.session.process_entities(obj)
|
||||
if state:
|
||||
state.process(obj)
|
||||
await self._send_acknowledge(msg_id)
|
||||
if updates_handler:
|
||||
updates_handler(obj)
|
||||
|
||||
return True
|
||||
|
||||
|
@ -343,7 +354,7 @@ class MtProtoSender:
|
|||
__log__.info('Abruptly confirming %s', type(r).__name__)
|
||||
self._pending_receive.clear()
|
||||
|
||||
def _resend_request(self, msg_id):
|
||||
async def _resend_request(self, msg_id):
|
||||
"""
|
||||
Re-sends the request that belongs to a certain msg_id. This may
|
||||
also be the msg_id of a container if they were sent in one.
|
||||
|
@ -352,12 +363,13 @@ class MtProtoSender:
|
|||
"""
|
||||
request = self._pop_request(msg_id)
|
||||
if request:
|
||||
return self.send(request)
|
||||
await self.send(request)
|
||||
return
|
||||
requests = self._pop_requests_of_container(msg_id)
|
||||
if requests:
|
||||
return self.send(*requests)
|
||||
await self.send(*requests)
|
||||
|
||||
def _handle_pong(self, msg_id, sequence, pong):
|
||||
async def _handle_pong(self, msg_id, sequence, pong):
|
||||
"""
|
||||
Handles a Pong response.
|
||||
|
||||
|
@ -374,22 +386,24 @@ class MtProtoSender:
|
|||
|
||||
return True
|
||||
|
||||
def _handle_container(self, msg_id, sequence, reader, state):
|
||||
async def _handle_container(self, msg_id, sequence, reader, updates_handler):
|
||||
"""
|
||||
Handles a MessageContainer response.
|
||||
|
||||
:param msg_id: the ID of the message.
|
||||
:param sequence: the sequence of the message.
|
||||
:param reader: the reader containing the MessageContainer.
|
||||
:param updates_handler: handler to handle Update and Updates objects.
|
||||
:return: true, as it always succeeds.
|
||||
"""
|
||||
__log__.debug('Handling container')
|
||||
for inner_msg_id, _, inner_len in MessageContainer.iter_read(reader):
|
||||
begin_position = reader.tell_position()
|
||||
|
||||
# Note that this code is IMPORTANT for skipping RPC results of
|
||||
# lost requests (i.e., ones from the previous connection session)
|
||||
try:
|
||||
if not self._process_msg(inner_msg_id, sequence, reader, state):
|
||||
if not await self._process_msg(inner_msg_id, sequence, reader, updates_handler):
|
||||
reader.set_position(begin_position + inner_len)
|
||||
except:
|
||||
# If any error is raised, something went wrong; skip the packet
|
||||
|
@ -398,7 +412,7 @@ class MtProtoSender:
|
|||
|
||||
return True
|
||||
|
||||
def _handle_bad_server_salt(self, msg_id, sequence, bad_salt):
|
||||
async def _handle_bad_server_salt(self, msg_id, sequence, bad_salt):
|
||||
"""
|
||||
Handles a BadServerSalt response.
|
||||
|
||||
|
@ -408,14 +422,18 @@ class MtProtoSender:
|
|||
:return: true, as it always succeeds.
|
||||
"""
|
||||
self.session.salt = bad_salt.new_server_salt
|
||||
self.session.save()
|
||||
|
||||
# "the bad_server_salt response is received with the
|
||||
# correct salt, and the message is to be re-sent with it"
|
||||
self._resend_request(bad_salt.bad_msg_id)
|
||||
await self._resend_request(bad_salt.bad_msg_id)
|
||||
|
||||
return True
|
||||
|
||||
def _handle_bad_msg_notification(self, msg_id, sequence, bad_msg):
|
||||
async def _handle_msgs_state_forgotten(self, msg_id, sequence, req):
|
||||
await self._send_message(TLMessage(self.session, MsgsStateInfo(msg_id, chr(1) * len(req.msg_ids))))
|
||||
return True
|
||||
|
||||
async def _handle_bad_msg_notification(self, msg_id, sequence, bad_msg):
|
||||
"""
|
||||
Handles a BadMessageError response.
|
||||
|
||||
|
@ -431,25 +449,25 @@ class MtProtoSender:
|
|||
# Use the current msg_id to determine the right time offset.
|
||||
self.session.update_time_offset(correct_msg_id=msg_id)
|
||||
__log__.info('Attempting to use the correct time offset')
|
||||
self._resend_request(bad_msg.bad_msg_id)
|
||||
await self._resend_request(bad_msg.bad_msg_id)
|
||||
return True
|
||||
elif bad_msg.error_code == 32:
|
||||
# msg_seqno too low, so just pump it up by some "large" amount
|
||||
# TODO A better fix would be to start with a new fresh session ID
|
||||
self.session.sequence += 64
|
||||
__log__.info('Attempting to set the right higher sequence')
|
||||
self._resend_request(bad_msg.bad_msg_id)
|
||||
await self._resend_request(bad_msg.bad_msg_id)
|
||||
return True
|
||||
elif bad_msg.error_code == 33:
|
||||
# msg_seqno too high never seems to happen but just in case
|
||||
self.session.sequence -= 16
|
||||
__log__.info('Attempting to set the right lower sequence')
|
||||
self._resend_request(bad_msg.bad_msg_id)
|
||||
await self._resend_request(bad_msg.bad_msg_id)
|
||||
return True
|
||||
else:
|
||||
raise error
|
||||
|
||||
def _handle_msg_detailed_info(self, msg_id, sequence, msg_new):
|
||||
async def _handle_msg_detailed_info(self, msg_id, sequence, msg_new):
|
||||
"""
|
||||
Handles a MsgDetailedInfo response.
|
||||
|
||||
|
@ -460,10 +478,10 @@ class MtProtoSender:
|
|||
"""
|
||||
# TODO For now, simply ack msg_new.answer_msg_id
|
||||
# Relevant tdesktop source code: https://goo.gl/VvpCC6
|
||||
self._send_acknowledge(msg_new.answer_msg_id)
|
||||
await self._send_acknowledge(msg_new.answer_msg_id)
|
||||
return True
|
||||
|
||||
def _handle_msg_new_detailed_info(self, msg_id, sequence, msg_new):
|
||||
async def _handle_msg_new_detailed_info(self, msg_id, sequence, msg_new):
|
||||
"""
|
||||
Handles a MsgNewDetailedInfo response.
|
||||
|
||||
|
@ -474,23 +492,10 @@ class MtProtoSender:
|
|||
"""
|
||||
# TODO For now, simply ack msg_new.answer_msg_id
|
||||
# Relevant tdesktop source code: https://goo.gl/G7DPsR
|
||||
self._send_acknowledge(msg_new.answer_msg_id)
|
||||
await self._send_acknowledge(msg_new.answer_msg_id)
|
||||
return True
|
||||
|
||||
def _handle_new_session_created(self, msg_id, sequence, new_session):
|
||||
"""
|
||||
Handles a NewSessionCreated response.
|
||||
|
||||
:param msg_id: the ID of the message.
|
||||
:param sequence: the sequence of the message.
|
||||
:param reader: the reader containing the NewSessionCreated.
|
||||
:return: true, as it always succeeds.
|
||||
"""
|
||||
self.session.salt = new_session.server_salt
|
||||
# TODO https://goo.gl/LMyN7A
|
||||
return True
|
||||
|
||||
def _handle_rpc_result(self, msg_id, sequence, reader):
|
||||
async def _handle_rpc_result(self, msg_id, sequence, reader):
|
||||
"""
|
||||
Handles a RPCResult response.
|
||||
|
||||
|
@ -508,7 +513,7 @@ class MtProtoSender:
|
|||
__log__.debug('Received response for request with ID %d', request_id)
|
||||
request = self._pop_request(request_id)
|
||||
|
||||
if inner_code == 0x2144ca19: # RPC Error
|
||||
if inner_code == RpcError.CONSTRUCTOR_ID: # RPC Error
|
||||
reader.seek(4)
|
||||
if self.session.report_errors and request:
|
||||
error = rpc_message_to_error(
|
||||
|
@ -520,9 +525,6 @@ class MtProtoSender:
|
|||
reader.read_int(), reader.tgread_string()
|
||||
)
|
||||
|
||||
# Acknowledge that we received the error
|
||||
self._send_acknowledge(request_id)
|
||||
|
||||
if request:
|
||||
request.rpc_error = error
|
||||
request.confirm_received.set()
|
||||
|
@ -534,6 +536,7 @@ class MtProtoSender:
|
|||
return True # All contents were read okay
|
||||
|
||||
elif request:
|
||||
__log__.debug('Reading request response')
|
||||
if inner_code == GzipPacked.CONSTRUCTOR_ID:
|
||||
with BinaryReader(GzipPacked.read(reader)) as compressed_reader:
|
||||
request.on_response(compressed_reader)
|
||||
|
@ -570,21 +573,18 @@ class MtProtoSender:
|
|||
)
|
||||
return False
|
||||
|
||||
def _handle_gzip_packed(self, msg_id, sequence, reader, state):
|
||||
async def _handle_gzip_packed(self, msg_id, sequence, reader, updates_handler):
|
||||
"""
|
||||
Handles a GzipPacked response.
|
||||
|
||||
:param msg_id: the ID of the message.
|
||||
:param sequence: the sequence of the message.
|
||||
:param reader: the reader containing the GzipPacked.
|
||||
:param updates_handler: the handler to process Update and Updates objects.
|
||||
:return: the result of processing the packed message.
|
||||
"""
|
||||
__log__.debug('Handling gzip packed data')
|
||||
with BinaryReader(GzipPacked.read(reader)) as compressed_reader:
|
||||
# We are reentering process_msg, which seemingly the same msg_id
|
||||
# to the self._need_confirmation set. Remove it from there first
|
||||
# to avoid any future conflicts (i.e. if we "ignore" messages
|
||||
# that we are already aware of, see 1a91c02 and old 63dfb1e)
|
||||
self._need_confirmation -= {msg_id}
|
||||
return self._process_msg(msg_id, sequence, compressed_reader, state)
|
||||
return await self._process_msg(msg_id, sequence, compressed_reader, updates_handler)
|
||||
|
||||
# endregion
|
||||
|
|
|
@ -67,6 +67,22 @@ class Session(ABC):
|
|||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def user_id(self):
|
||||
"""
|
||||
Returns an ``user_id`` which the session related to.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@user_id.setter
|
||||
@abstractmethod
|
||||
def user_id(self, value):
|
||||
"""
|
||||
Sets the ``user_id`` which the session related to.
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def get_update_state(self, entity_id):
|
||||
"""
|
||||
|
@ -94,7 +110,7 @@ class Session(ABC):
|
|||
"""
|
||||
|
||||
@abstractmethod
|
||||
def save(self):
|
||||
async def save(self):
|
||||
"""
|
||||
Called whenever important properties change. It should
|
||||
make persist the relevant session information to disk.
|
||||
|
@ -102,7 +118,7 @@ class Session(ABC):
|
|||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def delete(self):
|
||||
async def delete(self):
|
||||
"""
|
||||
Called upon client.log_out(). Should delete the stored
|
||||
information from disk since it's not valid anymore.
|
||||
|
@ -125,7 +141,7 @@ class Session(ABC):
|
|||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def get_input_entity(self, key):
|
||||
async def get_input_entity(self, key):
|
||||
"""
|
||||
Turns the given key into an ``InputPeer`` (e.g. ``InputPeerUser``).
|
||||
The library uses this method whenever an ``InputPeer`` is needed
|
||||
|
@ -135,7 +151,7 @@ class Session(ABC):
|
|||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def cache_file(self, md5_digest, file_size, instance):
|
||||
async def cache_file(self, md5_digest, file_size, instance):
|
||||
"""
|
||||
Caches the given file information persistently, so that it
|
||||
doesn't need to be re-uploaded in case the file is used again.
|
||||
|
@ -146,7 +162,7 @@ class Session(ABC):
|
|||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def get_file(self, md5_digest, file_size, cls):
|
||||
async def get_file(self, md5_digest, file_size, cls):
|
||||
"""
|
||||
Returns an instance of ``cls`` if the ``md5_digest`` and ``file_size``
|
||||
match an existing saved record. The class will either be an
|
||||
|
|
|
@ -32,6 +32,7 @@ class MemorySession(Session):
|
|||
self._server_address = None
|
||||
self._port = None
|
||||
self._auth_key = None
|
||||
self._user_id = None
|
||||
|
||||
self._files = {}
|
||||
self._entities = set()
|
||||
|
@ -58,6 +59,14 @@ class MemorySession(Session):
|
|||
def auth_key(self, value):
|
||||
self._auth_key = value
|
||||
|
||||
@property
|
||||
def user_id(self):
|
||||
return self._user_id
|
||||
|
||||
@user_id.setter
|
||||
def user_id(self, value):
|
||||
self._user_id = value
|
||||
|
||||
def get_update_state(self, entity_id):
|
||||
return self._update_states.get(entity_id, None)
|
||||
|
||||
|
@ -67,10 +76,10 @@ class MemorySession(Session):
|
|||
def close(self):
|
||||
pass
|
||||
|
||||
def save(self):
|
||||
async def save(self):
|
||||
pass
|
||||
|
||||
def delete(self):
|
||||
async def delete(self):
|
||||
pass
|
||||
|
||||
def _entity_values_to_row(self, id, hash, username, phone, name):
|
||||
|
@ -170,7 +179,7 @@ class MemorySession(Session):
|
|||
except StopIteration:
|
||||
pass
|
||||
|
||||
def get_input_entity(self, key):
|
||||
async def get_input_entity(self, key):
|
||||
try:
|
||||
if key.SUBCLASS_OF_ID in (0xc91c90b6, 0xe669bf46, 0x40f202fd):
|
||||
# hex(crc32(b'InputPeer', b'InputUser' and b'InputChannel'))
|
||||
|
@ -215,14 +224,14 @@ class MemorySession(Session):
|
|||
else:
|
||||
raise ValueError('Could not find input entity with key ', key)
|
||||
|
||||
def cache_file(self, md5_digest, file_size, instance):
|
||||
async def cache_file(self, md5_digest, file_size, instance):
|
||||
if not isinstance(instance, (InputDocument, InputPhoto)):
|
||||
raise TypeError('Cannot cache %s instance' % type(instance))
|
||||
key = (md5_digest, file_size, _SentFileType.from_type(instance))
|
||||
value = (instance.id, instance.access_hash)
|
||||
self._files[key] = value
|
||||
|
||||
def get_file(self, md5_digest, file_size, cls):
|
||||
async def get_file(self, md5_digest, file_size, cls):
|
||||
key = (md5_digest, file_size, _SentFileType.from_type(cls))
|
||||
try:
|
||||
return cls(self._files[key])
|
||||
|
|
|
@ -4,7 +4,6 @@ import os
|
|||
import sqlite3
|
||||
from base64 import b64decode
|
||||
from os.path import isfile as file_exists
|
||||
from threading import Lock, RLock
|
||||
|
||||
from telethon.tl import types
|
||||
from .memory import MemorySession, _SentFileType
|
||||
|
@ -42,11 +41,6 @@ class SQLiteSession(MemorySession):
|
|||
if not self.filename.endswith(EXTENSION):
|
||||
self.filename += EXTENSION
|
||||
|
||||
# Cross-thread safety
|
||||
self._seq_no_lock = Lock()
|
||||
self._msg_id_lock = Lock()
|
||||
self._db_lock = RLock()
|
||||
|
||||
# Migrating from .json -> SQL
|
||||
entities = self._check_migrate_json()
|
||||
|
||||
|
@ -204,7 +198,6 @@ class SQLiteSession(MemorySession):
|
|||
self._update_session_table()
|
||||
|
||||
def _update_session_table(self):
|
||||
with self._db_lock:
|
||||
c = self._cursor()
|
||||
# While we can save multiple rows into the sessions table
|
||||
# currently we only want to keep ONE as the tables don't
|
||||
|
@ -220,7 +213,7 @@ class SQLiteSession(MemorySession):
|
|||
))
|
||||
c.close()
|
||||
|
||||
def get_update_state(self, entity_id):
|
||||
async def get_update_state(self, entity_id):
|
||||
c = self._cursor()
|
||||
row = c.execute('select pts, qts, date, seq from update_state '
|
||||
'where id = ?', (entity_id,)).fetchone()
|
||||
|
@ -230,8 +223,7 @@ class SQLiteSession(MemorySession):
|
|||
date = datetime.datetime.utcfromtimestamp(date)
|
||||
return types.updates.State(pts, qts, date, seq, unread_count=0)
|
||||
|
||||
def set_update_state(self, entity_id, state):
|
||||
with self._db_lock:
|
||||
async def set_update_state(self, entity_id, state):
|
||||
c = self._cursor()
|
||||
c.execute('insert or replace into update_state values (?,?,?,?,?)',
|
||||
(entity_id, state.pts, state.qts,
|
||||
|
@ -239,28 +231,24 @@ class SQLiteSession(MemorySession):
|
|||
c.close()
|
||||
self.save()
|
||||
|
||||
def save(self):
|
||||
async def save(self):
|
||||
"""Saves the current session object as session_user_id.session"""
|
||||
with self._db_lock:
|
||||
self._conn.commit()
|
||||
|
||||
def _cursor(self):
|
||||
"""Asserts that the connection is open and returns a cursor"""
|
||||
with self._db_lock:
|
||||
if self._conn is None:
|
||||
self._conn = sqlite3.connect(self.filename,
|
||||
check_same_thread=False)
|
||||
self._conn = sqlite3.connect(self.filename)
|
||||
return self._conn.cursor()
|
||||
|
||||
def close(self):
|
||||
"""Closes the connection unless we're working in-memory"""
|
||||
if self.filename != ':memory:':
|
||||
with self._db_lock:
|
||||
if self._conn is not None:
|
||||
self._conn.close()
|
||||
self._conn = None
|
||||
|
||||
def delete(self):
|
||||
async def delete(self):
|
||||
"""Deletes the current session file"""
|
||||
if self.filename == ':memory:':
|
||||
return True
|
||||
|
@ -293,7 +281,6 @@ class SQLiteSession(MemorySession):
|
|||
if not rows:
|
||||
return
|
||||
|
||||
with self._db_lock:
|
||||
self._cursor().executemany(
|
||||
'insert or replace into entities values (?,?,?,?,?)', rows
|
||||
)
|
||||
|
@ -332,7 +319,7 @@ class SQLiteSession(MemorySession):
|
|||
|
||||
# File processing
|
||||
|
||||
def get_file(self, md5_digest, file_size, cls):
|
||||
async def get_file(self, md5_digest, file_size, cls):
|
||||
tuple_ = self._cursor().execute(
|
||||
'select id, hash from sent_files '
|
||||
'where md5_digest = ? and file_size = ? and type = ?',
|
||||
|
@ -342,11 +329,10 @@ class SQLiteSession(MemorySession):
|
|||
# Both allowed classes have (id, access_hash) as parameters
|
||||
return cls(tuple_[0], tuple_[1])
|
||||
|
||||
def cache_file(self, md5_digest, file_size, instance):
|
||||
async def cache_file(self, md5_digest, file_size, instance):
|
||||
if not isinstance(instance, (InputDocument, InputPhoto)):
|
||||
raise TypeError('Cannot cache %s instance' % type(instance))
|
||||
|
||||
with self._db_lock:
|
||||
self._cursor().execute(
|
||||
'insert or replace into sent_files values (?,?,?,?,?)', (
|
||||
md5_digest, file_size,
|
||||
|
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
|
@ -68,12 +68,12 @@ class Dialog:
|
|||
|
||||
self.draft = Draft(client, dialog.peer, dialog.draft)
|
||||
|
||||
def send_message(self, *args, **kwargs):
|
||||
async def send_message(self, *args, **kwargs):
|
||||
"""
|
||||
Sends a message to this dialog. This is just a wrapper around
|
||||
``client.send_message(dialog.input_entity, *args, **kwargs)``.
|
||||
"""
|
||||
return self._client.send_message(self.input_entity, *args, **kwargs)
|
||||
return await self._client.send_message(self.input_entity, *args, **kwargs)
|
||||
|
||||
def to_dict(self):
|
||||
return {
|
||||
|
|
|
@ -26,7 +26,7 @@ class Draft:
|
|||
def __init__(self, client, peer, draft):
|
||||
self._client = client
|
||||
self._peer = peer
|
||||
if not draft:
|
||||
if not draft or not isinstance(draft, DraftMessage):
|
||||
draft = DraftMessage('', None, None, None, None)
|
||||
|
||||
self._text = markdown.unparse(draft.message, draft.entities)
|
||||
|
@ -46,18 +46,18 @@ class Draft:
|
|||
return cls(client=client, peer=update.peer, draft=update.draft)
|
||||
|
||||
@property
|
||||
def entity(self):
|
||||
async def entity(self):
|
||||
"""
|
||||
The entity that belongs to this dialog (user, chat or channel).
|
||||
"""
|
||||
return self._client.get_entity(self._peer)
|
||||
return await self._client.get_entity(self._peer)
|
||||
|
||||
@property
|
||||
def input_entity(self):
|
||||
async def input_entity(self):
|
||||
"""
|
||||
Input version of the entity.
|
||||
"""
|
||||
return self._client.get_input_entity(self._peer)
|
||||
return await self._client.get_input_entity(self._peer)
|
||||
|
||||
@property
|
||||
def text(self):
|
||||
|
@ -82,7 +82,7 @@ class Draft:
|
|||
"""
|
||||
return not self._text
|
||||
|
||||
def set_message(self, text=None, reply_to=0, parse_mode='md',
|
||||
async def set_message(self, text=None, reply_to=0, parse_mode='md',
|
||||
link_preview=None):
|
||||
"""
|
||||
Changes the draft message on the Telegram servers. The changes are
|
||||
|
@ -109,8 +109,9 @@ class Draft:
|
|||
if link_preview is None:
|
||||
link_preview = self.link_preview
|
||||
|
||||
raw_text, entities = self._client._parse_message_text(text, parse_mode)
|
||||
result = self._client(SaveDraftRequest(
|
||||
raw_text, entities = await self._client._parse_message_text(text,
|
||||
parse_mode)
|
||||
result = await self._client(SaveDraftRequest(
|
||||
peer=self._peer,
|
||||
message=raw_text,
|
||||
no_webpage=not link_preview,
|
||||
|
@ -127,22 +128,22 @@ class Draft:
|
|||
|
||||
return result
|
||||
|
||||
def send(self, clear=True, parse_mode='md'):
|
||||
async def send(self, clear=True, parse_mode='md'):
|
||||
"""
|
||||
Sends the contents of this draft to the dialog. This is just a
|
||||
wrapper around ``send_message(dialog.input_entity, *args, **kwargs)``.
|
||||
"""
|
||||
self._client.send_message(self._peer, self.text,
|
||||
await self._client.send_message(self._peer, self.text,
|
||||
reply_to=self.reply_to_msg_id,
|
||||
link_preview=self.link_preview,
|
||||
parse_mode=parse_mode,
|
||||
clear_draft=clear)
|
||||
|
||||
def delete(self):
|
||||
async def delete(self):
|
||||
"""
|
||||
Deletes this draft, and returns ``True`` on success.
|
||||
"""
|
||||
return self.set_message(text='')
|
||||
return await self.set_message(text='')
|
||||
|
||||
def to_dict(self):
|
||||
try:
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
import struct
|
||||
from asyncio import Event
|
||||
from datetime import datetime, date
|
||||
from threading import Event
|
||||
|
||||
|
||||
class TLObject:
|
||||
|
@ -180,7 +180,7 @@ class TLObject:
|
|||
return TLObject.pretty_format(self, indent=0)
|
||||
|
||||
# These should be overrode
|
||||
def resolve(self, client, utils):
|
||||
async def resolve(self, client, utils):
|
||||
pass
|
||||
|
||||
def to_dict(self):
|
||||
|
@ -192,3 +192,6 @@ class TLObject:
|
|||
@classmethod
|
||||
def from_reader(cls, reader):
|
||||
return TLObject()
|
||||
|
||||
def __repr__(self):
|
||||
return self.__str__()
|
||||
|
|
|
@ -1,152 +0,0 @@
|
|||
import itertools
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from queue import Queue, Empty
|
||||
from threading import RLock, Thread
|
||||
|
||||
from . import utils
|
||||
from .tl import types as tl
|
||||
|
||||
__log__ = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class UpdateState:
|
||||
"""
|
||||
Used to hold the current state of processed updates.
|
||||
To retrieve an update, :meth:`poll` should be called.
|
||||
"""
|
||||
WORKER_POLL_TIMEOUT = 5.0 # Avoid waiting forever on the workers
|
||||
|
||||
def __init__(self, workers=None):
|
||||
"""
|
||||
:param workers: This integer parameter has three possible cases:
|
||||
workers is None: Updates will *not* be stored on self.
|
||||
workers = 0: Another thread is responsible for calling self.poll()
|
||||
workers > 0: 'workers' background threads will be spawned, any
|
||||
any of them will invoke the self.handler.
|
||||
"""
|
||||
self._workers = workers
|
||||
self._worker_threads = []
|
||||
|
||||
self.handler = None
|
||||
self._updates_lock = RLock()
|
||||
self._updates = Queue()
|
||||
|
||||
# https://core.telegram.org/api/updates
|
||||
self._state = tl.updates.State(0, 0, datetime.now(), 0, 0)
|
||||
|
||||
def can_poll(self):
|
||||
"""Returns True if a call to .poll() won't lock"""
|
||||
return not self._updates.empty()
|
||||
|
||||
def poll(self, timeout=None):
|
||||
"""
|
||||
Polls an update or blocks until an update object is available.
|
||||
If 'timeout is not None', it should be a floating point value,
|
||||
and the method will 'return None' if waiting times out.
|
||||
"""
|
||||
try:
|
||||
return self._updates.get(timeout=timeout)
|
||||
except Empty:
|
||||
return None
|
||||
|
||||
def get_workers(self):
|
||||
return self._workers
|
||||
|
||||
def set_workers(self, n):
|
||||
"""Changes the number of workers running.
|
||||
If 'n is None', clears all pending updates from memory.
|
||||
"""
|
||||
if n is None:
|
||||
self.stop_workers()
|
||||
else:
|
||||
self._workers = n
|
||||
self.setup_workers()
|
||||
|
||||
workers = property(fget=get_workers, fset=set_workers)
|
||||
|
||||
def stop_workers(self):
|
||||
"""
|
||||
Waits for all the worker threads to stop.
|
||||
"""
|
||||
# Put dummy ``None`` objects so that they don't need to timeout.
|
||||
n = self._workers
|
||||
self._workers = None
|
||||
if n:
|
||||
with self._updates_lock:
|
||||
for _ in range(n):
|
||||
self._updates.put(None)
|
||||
|
||||
for t in self._worker_threads:
|
||||
t.join()
|
||||
|
||||
self._worker_threads.clear()
|
||||
self._workers = n
|
||||
|
||||
def setup_workers(self):
|
||||
if self._worker_threads or not self._workers:
|
||||
# There already are workers, or workers is None or 0. Do nothing.
|
||||
return
|
||||
|
||||
for i in range(self._workers):
|
||||
thread = Thread(
|
||||
target=UpdateState._worker_loop,
|
||||
name='UpdateWorker{}'.format(i),
|
||||
daemon=True,
|
||||
args=(self, i)
|
||||
)
|
||||
self._worker_threads.append(thread)
|
||||
thread.start()
|
||||
|
||||
def _worker_loop(self, wid):
|
||||
while self._workers is not None:
|
||||
try:
|
||||
update = self.poll(timeout=UpdateState.WORKER_POLL_TIMEOUT)
|
||||
if update and self.handler:
|
||||
self.handler(update)
|
||||
except StopIteration:
|
||||
break
|
||||
except:
|
||||
# We don't want to crash a worker thread due to any reason
|
||||
__log__.exception('Unhandled exception on worker %d', wid)
|
||||
|
||||
def get_update_state(self, entity_id):
|
||||
"""Gets the updates.State corresponding to the given entity or 0."""
|
||||
return self._state
|
||||
|
||||
def process(self, update):
|
||||
"""Processes an update object. This method is normally called by
|
||||
the library itself.
|
||||
"""
|
||||
if self._workers is None:
|
||||
return # No processing needs to be done if nobody's working
|
||||
|
||||
with self._updates_lock:
|
||||
if isinstance(update, tl.updates.State):
|
||||
__log__.debug('Saved new updates state')
|
||||
self._state = update
|
||||
return # Nothing else to be done
|
||||
|
||||
if hasattr(update, 'pts'):
|
||||
self._state.pts = update.pts
|
||||
|
||||
# After running the script for over an hour and receiving over
|
||||
# 1000 updates, the only duplicates received were users going
|
||||
# online or offline. We can trust the server until new reports.
|
||||
# This should only be used as read-only.
|
||||
if isinstance(update, tl.UpdateShort):
|
||||
update.update._entities = {}
|
||||
self._updates.put(update.update)
|
||||
# Expand "Updates" into "Update", and pass these to callbacks.
|
||||
# Since .users and .chats have already been processed, we
|
||||
# don't need to care about those either.
|
||||
elif isinstance(update, (tl.Updates, tl.UpdatesCombined)):
|
||||
entities = {utils.get_peer_id(x): x for x in
|
||||
itertools.chain(update.users, update.chats)}
|
||||
for u in update.updates:
|
||||
u._entities = entities
|
||||
self._updates.put(u)
|
||||
# TODO Handle "tl.UpdatesTooLong"
|
||||
else:
|
||||
update._entities = {}
|
||||
self._updates.put(update)
|
|
@ -14,10 +14,10 @@ AUTO_GEN_NOTICE = \
|
|||
|
||||
|
||||
AUTO_CASTS = {
|
||||
'InputPeer': 'utils.get_input_peer(client.get_input_entity({}))',
|
||||
'InputChannel': 'utils.get_input_channel(client.get_input_entity({}))',
|
||||
'InputUser': 'utils.get_input_user(client.get_input_entity({}))',
|
||||
'InputDialogPeer': 'utils.get_input_dialog(client.get_input_entity({}))',
|
||||
'InputPeer': 'utils.get_input_peer(await client.get_input_entity({}))',
|
||||
'InputChannel': 'utils.get_input_channel(await client.get_input_entity({}))',
|
||||
'InputUser': 'utils.get_input_user(await client.get_input_entity({}))',
|
||||
'InputDialogPeer': 'utils.get_input_dialog(await client.get_input_entity({}))',
|
||||
'InputMedia': 'utils.get_input_media({})',
|
||||
'InputPhoto': 'utils.get_input_photo({})',
|
||||
'InputMessage': 'utils.get_input_message({})'
|
||||
|
@ -234,19 +234,30 @@ def _write_class_init(tlobject, type_constructors, builder):
|
|||
|
||||
def _write_resolve(tlobject, builder):
|
||||
if any(arg.type in AUTO_CASTS for arg in tlobject.real_args):
|
||||
builder.writeln('def resolve(self, client, utils):')
|
||||
builder.writeln('async def resolve(self, client, utils):')
|
||||
for arg in tlobject.real_args:
|
||||
ac = AUTO_CASTS.get(arg.type, None)
|
||||
if not ac:
|
||||
continue
|
||||
if arg.is_vector:
|
||||
builder.write('self.{0} = [{1} for _x in self.{0}]',
|
||||
arg.name, ac.format('_x'))
|
||||
else:
|
||||
builder.write('self.{} = {}', arg.name,
|
||||
|
||||
if arg.is_flag:
|
||||
builder.writeln('if self.{}:', arg.name)
|
||||
|
||||
if not arg.is_vector:
|
||||
builder.writeln('self.{} = {}', arg.name,
|
||||
ac.format('self.' + arg.name))
|
||||
builder.writeln(' if self.{} else None'.format(arg.name)
|
||||
if arg.is_flag else '')
|
||||
else:
|
||||
# Since the auto-cast might have await, we can't use that in
|
||||
# Python 3.5's list comprehensions. Build the list manually.
|
||||
builder.writeln('_tmp = []')
|
||||
builder.writeln('for _x in self.{}:', arg.name)
|
||||
builder.writeln('_tmp.append({})', ac.format('_x'))
|
||||
builder.end_block()
|
||||
builder.writeln('self.{} = _tmp', arg.name)
|
||||
|
||||
if arg.is_flag:
|
||||
builder.end_block()
|
||||
|
||||
builder.end_block()
|
||||
|
||||
|
||||
|
|
|
@ -17,33 +17,33 @@ class HigherLevelTests(unittest.TestCase):
|
|||
raise ValueError('Please fill in both your api_id and api_hash.')
|
||||
|
||||
@unittest.skip("you can't seriously trash random mobile numbers like that :)")
|
||||
def test_cdn_download(self):
|
||||
async def test_cdn_download(self):
|
||||
client = TelegramClient(None, api_id, api_hash)
|
||||
client.session.set_dc(0, '149.154.167.40', 80)
|
||||
self.assertTrue(client.connect())
|
||||
self.assertTrue(await client.connect())
|
||||
|
||||
try:
|
||||
phone = '+999662' + str(randint(0, 9999)).zfill(4)
|
||||
client.send_code_request(phone)
|
||||
client.sign_up('22222', 'Test', 'DC')
|
||||
await client.send_code_request(phone)
|
||||
await client.sign_up('22222', 'Test', 'DC')
|
||||
|
||||
me = client.get_me()
|
||||
me = await client.get_me()
|
||||
data = os.urandom(2 ** 17)
|
||||
client.send_file(
|
||||
await client.send_file(
|
||||
me, data,
|
||||
progress_callback=lambda c, t:
|
||||
print('test_cdn_download:uploading {:.2%}...'.format(c/t))
|
||||
)
|
||||
msg = client.get_messages(me)[1][0]
|
||||
msg = (await client.get_messages(me))[0]
|
||||
|
||||
out = BytesIO()
|
||||
client.download_media(msg, out)
|
||||
await client.download_media(msg, out)
|
||||
self.assertEqual(sha256(data).digest(), sha256(out.getvalue()).digest())
|
||||
|
||||
out = BytesIO()
|
||||
client.download_media(msg, out) # Won't redirect
|
||||
await client.download_media(msg, out) # Won't redirect
|
||||
self.assertEqual(sha256(data).digest(), sha256(out.getvalue()).digest())
|
||||
|
||||
client.log_out()
|
||||
await client.log_out()
|
||||
finally:
|
||||
client.disconnect()
|
||||
|
|
|
@ -25,20 +25,20 @@ def run_server_echo_thread(port):
|
|||
class NetworkTests(unittest.TestCase):
|
||||
|
||||
@unittest.skip("test_tcp_client needs fix")
|
||||
def test_tcp_client(self):
|
||||
async def test_tcp_client(self):
|
||||
port = random.randint(50000, 60000) # Arbitrary non-privileged port
|
||||
run_server_echo_thread(port)
|
||||
|
||||
msg = b'Unit testing...'
|
||||
client = TcpClient()
|
||||
client.connect('localhost', port)
|
||||
client.write(msg)
|
||||
self.assertEqual(msg, client.read(15),
|
||||
await client.connect('localhost', port)
|
||||
await client.write(msg)
|
||||
self.assertEqual(msg, await client.read(15),
|
||||
msg='Read message does not equal sent message')
|
||||
client.close()
|
||||
|
||||
@unittest.skip("Some parameters changed, so IP doesn't go there anymore.")
|
||||
def test_authenticator(self):
|
||||
async def test_authenticator(self):
|
||||
transport = Connection('149.154.167.91', 443)
|
||||
self.assertTrue(authenticator.do_authentication(transport))
|
||||
self.assertTrue(await authenticator.do_authentication(transport))
|
||||
transport.close()
|
||||
|
|
Loading…
Reference in New Issue
Block a user