Compare commits

...

112 Commits

Author SHA1 Message Date
Andrey Egorov
85103bcf6d Merge branch 'asyncio' of https://github.com/LonamiWebs/Telethon into asyncio-upstream
* 'asyncio' of https://github.com/LonamiWebs/Telethon:
  Duplicate commit from master to handle empty draft msg
2018-06-14 15:34:21 +03:00
Andrey Egorov
43a0226b33 Merge branch 'asyncio' into asyncio-upstream
* asyncio:
  Not need to save (salt is out of DB)
  Very rare exception in the case of reconnect
  updates_handler is out from MtProtoSender to gc works properly; unauth_handler log format fix
  Memory leaks fix
  Pretty format of TLObject's
  More accurate clear pendings
  Another attempt to prevent duplicates
  Handle updates and other refactoring
  SocketClosed exception
  Refactoring of TcpClient
  Socket OSError logging
  More aggressive catching network errors
  No route to host catched + other errno to reconnect

# Conflicts (resolved):
#	telethon/extensions/tcp_client.py
#	telethon/network/mtproto_sender.py
#	telethon/telegram_bare_client.py
#	telethon/tl/session.py
2018-06-14 15:21:50 +03:00
Lonami Exo
097acd874b Duplicate commit from master to handle empty draft msg
Telegram seems to have changed something in their servers
and now this is sent.
2018-06-13 20:57:41 +02:00
Andrey Egorov
f5a7a8da45 Not need to save (salt is out of DB) 2018-06-08 01:33:11 +03:00
Lonami Exo
cb75092ba1 Merge branch 'master' into asyncio 2018-05-30 19:02:55 +02:00
Lonami Exo
85089353f2 Fix asyncio docs 2018-05-30 18:36:37 +02:00
Lonami Exo
aba478789c Merge branch 'master' into asyncio 2018-05-19 14:44:50 +02:00
Lonami Exo
aa80f92807 Merge branch 'master' into asyncio 2018-05-14 17:42:24 +02:00
Lonami Exo
02fcaaa78c Merge branch 'master' into asyncio 2018-05-13 17:15:53 +02:00
Lonami Exo
1aaafc9a43 Merge branch 'master' into asyncio 2018-05-13 16:47:35 +02:00
Lonami Exo
98449bb32f Add missing comma in the dependency list 2018-05-13 09:53:05 +02:00
Lonami Exo
3df90307a7 Merge branch 'master' into asyncio 2018-05-10 14:45:04 +02:00
Lonami Exo
ed8c123b9a Fix yield_ must be awaited 2018-05-10 10:11:30 +02:00
Lonami Exo
6130b8918d Fix invalid merge 2018-05-10 10:07:54 +02:00
Lonami Exo
95eac6c151 Merge branch 'master' into asyncio 2018-05-10 09:55:05 +02:00
Lonami Exo
b09d91b9ef Add missing await 2018-05-09 09:46:58 +02:00
Lonami Exo
3171efafcb Remove the last await in comprehension for Py3.5 2018-05-09 09:08:52 +02:00
Lonami Exo
1b76c1fc7b Attempt at supporting Python 3.5 (cc @tulir)
Thanks in part to
https://github.com/tulir/telethon-asyncio/compare/asyncio...tulir:asyncio-3.5
2018-05-08 20:50:51 +02:00
Lonami Exo
d20dc01afa Add missing await 2018-05-08 20:30:14 +02:00
Lonami Exo
c0fa2ae620 Update to v0.19 2018-05-08 20:28:50 +02:00
Lonami Exo
2cc0e17c7e Add voice/video note parameters to send_file 2018-05-08 20:28:50 +02:00
Lonami Exo
fbea963230 First attempt at updates catch_up for private chats/groups 2018-05-08 20:26:46 +02:00
Lonami Exo
fe299cc6cc Remove broken packet length check 2018-05-08 20:26:46 +02:00
Lonami Exo
97f3dd809b Fix race condition causing broken responses 2018-05-08 20:26:19 +02:00
Lonami Exo
01a594ca5d Fix still broken log for broken packets 2018-05-08 20:24:28 +02:00
Lonami Exo
cd410d7fd7 Call disconnect on ConnectionResetError hoping a reconnection
Maybe self._reconnect() had no effect unless a clean disconnect
was done, and so retrying would be mostly useless. Just a guess.
2018-05-08 20:23:07 +02:00
Lonami Exo
c0b61f3a63 Fix online documentation showing duplicated errors 2018-05-08 20:23:07 +02:00
Lonami
dffbd2d689
Remove lock usage from master merge 2018-05-06 22:15:06 +02:00
Lonami
2691872503
Add missing time import 2018-05-06 22:12:47 +02:00
Lonami Exo
41f0e0c0a8 Merge branch 'master' into asyncio 2018-05-06 13:09:17 +02:00
Lonami Exo
ae5a265ca1 Fix unsupported async list comprehension in py3.5 2018-05-02 20:36:11 +02:00
Lonami Exo
52042d4a1b Merge branch 'master' into asyncio 2018-05-02 20:26:05 +02:00
Lonami Exo
9e7cbb0b09 Merge branch 'master' into asyncio 2018-04-24 09:48:29 +02:00
Lonami Exo
04a68f12cc Merge branch 'master' into asyncio 2018-04-23 15:53:28 +02:00
Lonami Exo
4735392cf9 Add missing await keywords 2018-04-23 13:54:06 +02:00
Terrance
096b2a6f7a Cancel background tasks on disconnect (#783)
Fixes #782.
2018-04-23 09:49:30 +02:00
Lonami Exo
8260a13824 Fix-up merge missing async keyword 2018-04-15 16:12:08 +02:00
Lonami Exo
b7ae612246 Merge branch 'master' into asyncio 2018-04-15 16:09:25 +02:00
Lonami Exo
aae6a26654 Add even more missing await keywords 2018-04-08 16:02:22 +02:00
Lonami Exo
d980e947cf Add missing parenthesis around await keyword 2018-04-08 15:57:40 +02:00
Lonami Exo
946dd69211 Merge branch 'master' into asyncio 2018-04-08 15:57:06 +02:00
Lonami Exo
597433075e Add missing await 2018-04-08 15:50:15 +02:00
Lonami Exo
06af73ed65 Merge branch 'master' into asyncio 2018-04-08 15:47:17 +02:00
Dan Elkouby
b8030959b6 Fix adding events while the aio loop is stopped (#748) 2018-04-08 00:05:16 +02:00
Lonami Exo
abcd09e7d0 Merge branch 'master' into asyncio 2018-04-04 13:47:43 +02:00
Lonami Exo
1eb418e1ab Merge branch 'master' into asyncio 2018-04-01 17:08:50 +02:00
Lonami Exo
ddf36c9cb0 Fix invalid async for when getting entities 2018-03-30 12:02:25 +02:00
Lonami Exo
2ee5201229 Merge branch 'master' into asyncio 2018-03-24 18:45:11 +01:00
Lonami Exo
8bf140ca74 Document asyncio better (#456) 2018-03-24 14:04:25 +01:00
Lonami Exo
5cb3a9af36 Fix client.on decorator not running 2018-03-24 13:57:11 +01:00
Lonami Exo
8b0580901a Merge branch 'master' into asyncio 2018-03-24 13:08:51 +01:00
Marius Räsener
d4e1f13e41 Add missing await keyword (#710) 2018-03-19 17:27:51 +01:00
Andrey Egorov
236fccea7f Very rare exception in the case of reconnect 2018-03-18 20:41:00 +03:00
Lonami Exo
e505fc1711 Rename telethon_aio back to telethon 2018-03-18 10:26:53 +01:00
Lonami Exo
bcd5f8d4a1 Merge branch 'master' into asyncio 2018-03-18 10:24:57 +01:00
Lonami Exo
1047e9c3d5 Merge branch 'master' into asyncio 2018-03-18 10:23:48 +01:00
Lonami Exo
fd602dfd81 Merge branch 'master' into asyncio 2018-03-11 20:14:16 +01:00
Lonami Exo
a828e9d155 Update .gitignore to reflect the package name change 2018-03-04 12:25:41 +01:00
Lonami Exo
48ac6daef5 Fix non-relative import on helpers 2018-03-04 12:25:08 +01:00
Lonami Exo
69970b5b20 Merge branch 'master' into asyncio 2018-03-04 12:14:20 +01:00
Lonami Exo
cde314fc21 Rename package to telethon_aio and prepare for PyPi 2018-03-04 11:22:52 +01:00
Lonami Exo
d5c5c3cff1 Add missing await keyword for ._check_events_pending_resolve 2018-03-03 22:54:38 +01:00
Lonami Exo
563d731c95 Merge remote-tracking branch 'tulir/asyncio' into asyncio 2018-03-03 17:03:27 +01:00
Tulir Asokan
4432a2d14e Merge branch 'master' into asyncio 2018-03-03 13:02:13 +02:00
Tanuj
784c2e9ed1 Fix get_participants missing async keywords (#662) 2018-03-03 09:19:33 +01:00
Tulir Asokan
731a2956df Merge branch 'master' into asyncio 2018-02-25 12:05:45 +02:00
Tulir Asokan
c2fba26ad9 Merge branch 'master' into asyncio 2018-02-25 01:54:41 +02:00
Lonami Exo
9054a12c11 Fix tiny bug regarding .get_me(input_peer=True) crashing events 2018-02-24 18:30:09 +01:00
Tulir Asokan
7998fd59f7 Add missing await to mention generation in _parse_message_text (#634) 2018-02-22 20:57:40 +01:00
Lonami Exo
6e854325a8 Merge branch 'master' into asyncio 2018-02-16 18:42:09 +01:00
Lonami Exo
2e953dab50 Add missing async and await keywords on TelegramClient.on 2018-02-14 14:15:00 +01:00
Andrey Egorov
a6c6bc73eb updates_handler is out from MtProtoSender to gc works properly; unauth_handler log format fix 2018-02-13 16:08:24 +03:00
Lonami Exo
7da092894b Acquire reconnect lock outside the reconnect function 2018-02-10 12:56:54 +01:00
Lonami Exo
d8376ee50d Add a lock around connection.recv() 2018-02-10 12:44:09 +01:00
Lonami Exo
50515aa528 Merge branch 'master' into asyncio 2018-02-09 19:22:26 +01:00
Andrey Egorov
6cfb829e58 Memory leaks fix 2018-01-21 18:57:53 +03:00
Andrey Egorov
91e5ef852a Pretty format of TLObject's 2018-01-17 15:41:13 +03:00
Lonami Exo
2b9c06f0e6 Remove invalid self._logger calls since merge 2018-01-05 18:32:54 +01:00
Lonami Exo
a1d497a2c0 Merge branch 'master' into asyncio 2018-01-05 17:59:36 +01:00
Andrey Egorov
2f1d5e277e More accurate clear pendings 2017-12-12 21:22:42 +03:00
Andrey Egorov
7c0af2c080 Another attempt to prevent duplicates 2017-12-09 21:24:13 +03:00
Andrey Egorov
984f483b98 Handle updates and other refactoring 2017-12-03 02:31:43 +03:00
Lonami Exo
e71831050f Fix README.rst never actually running the example 2017-11-25 18:49:10 +01:00
Lonami Exo
e0802d1a2d Update README.rst to show asyncio code (#456) 2017-11-25 18:47:41 +01:00
Lonami Exo
c67f78eab7 Add unparse markdown method 2017-11-25 16:17:00 +01:00
Lonami Exo
de803a0ace Merge branch 'master' into asyncio 2017-11-24 19:26:00 +01:00
Andrey Egorov
004c92edbe SocketClosed exception 2017-11-19 13:04:40 +03:00
Andrey Egorov
32bca4f1b8 Refactoring of TcpClient 2017-11-19 01:55:40 +03:00
Andrey Egorov
653dd21259 Socket OSError logging 2017-11-16 17:31:39 +03:00
Lonami Exo
8a287c2860 Fix-up removing required error variable while merging 2017-11-16 13:21:24 +01:00
Lonami Exo
7f5126c341 Merge branch 'master' into asyncio 2017-11-16 13:20:20 +01:00
Andrey Egorov
2efcfbd416 More aggressive catching network errors 2017-11-16 02:56:57 +03:00
Andrey Egorov
3111153822 No route to host catched + other errno to reconnect 2017-11-15 14:19:01 +03:00
Lonami
69b3f64d19
Merge pull request #434 from andr-04/asyncio
Reconnection fix in read/write and restore some acks
2017-11-14 12:16:07 +01:00
Andrey Egorov
25af22f1e7 Bugfix in reconnection 2017-11-14 14:07:40 +03:00
Andrey Egorov
5e172053da Merge remote-tracking branch 'upstream/asyncio' into asyncio 2017-10-30 00:29:46 +03:00
Lonami Exo
23b5a9d1f6 Merge branch 'master' into asyncio 2017-10-29 20:08:32 +01:00
Andrey Egorov
cb2d943139 Remove forgotten points 2017-10-29 15:33:03 +03:00
Lonami
6dc0ee9d6c
Merge pull request #370 from andr-04/asyncio
Made update system for asyncio functional
2017-10-28 11:07:41 +02:00
Lonami Exo
8bd578711c Revert "no more retries" exception 2017-10-23 10:05:15 +02:00
Lonami Exo
3a7fa249a4 Revert None result checks on the TelegramClient 2017-10-22 20:30:55 +02:00
Lonami Exo
48ec0319d2 Merge branch 'asyncio' of LonamiWebs/Telethon into asyncio 2017-10-22 20:10:25 +02:00
Lonami Exo
ffaa3ac064 Remove unused timeout variable from the TelegramClient 2017-10-22 19:47:24 +02:00
Lonami Exo
30ac6789ce Change _set_connected_and_authorized condition 2017-10-22 19:27:49 +02:00
Lonami Exo
1a0d5e75bf Make use of more constants in the TcpClient 2017-10-22 19:13:45 +02:00
Andrey Egorov
780e0ceddf Update handlers works; it also seems stable 2017-10-22 15:06:36 +03:00
Lonami Exo
335bc6a789 Merge branch 'master' into asyncio 2017-10-21 15:45:56 +02:00
Lonami Exo
e4bcab336b Fix some missing await calls 2017-10-20 16:05:49 +02:00
Lonami Exo
917665852d Merge branch 'master' into asyncio 2017-10-16 10:03:01 +02:00
Lonami Exo
ef43e2e336 Fix CdnDecrypter not being async 2017-10-07 09:50:23 +02:00
Lonami Exo
77c99db066 Use async def everywhere 2017-10-06 21:53:51 +02:00
Lonami Exo
9716d1d543 Remove all Thread's except from UpdateState 2017-10-06 21:52:47 +02:00
45 changed files with 1395 additions and 1462 deletions

View File

@ -2,7 +2,8 @@ Telethon
========
.. epigraph::
⭐️ Thanks **everyone** who has starred the project, it means a lot!
This is the ``asyncio`` version of the library. If you don't know how
to work with it, `see here https://pypi.python.org/pypi/Telethon`__.
**Telethon** is Telegram client implementation in **Python 3** which uses
the latest available API of Telegram.
@ -22,7 +23,12 @@ Installing
.. code:: sh
pip3 install telethon
pip3 install telethon-aio
.. warning::
Be careful **not** to install ``telethon-asyncio`` or other
variants, someone else name-squatted those and are unofficial!
Creating a client
@ -30,6 +36,7 @@ Creating a client
.. code:: python
import asyncio
from telethon import TelegramClient
# These example values won't work. You must get your own api_id and
@ -38,22 +45,28 @@ Creating a client
api_hash = '0123456789abcdef0123456789abcdef'
client = TelegramClient('session_name', api_id, api_hash)
client.start()
async def main():
await client.start()
asyncio.get_event_loop().run_until_complete(main())
Doing stuff
-----------
Note that this assumes you're inside an "async def" method. Check out the
`Python documentation <https://docs.python.org/3/library/asyncio-dev.html>`_
if you're new with ``asyncio``.
.. code:: python
print(client.get_me().stringify())
print((await client.get_me()).stringify())
client.send_message('username', 'Hello! Talking to you from Telethon')
client.send_file('username', '/home/myself/Pictures/holidays.jpg')
await client.send_message('username', 'Hello! Talking to you from Telethon')
await client.send_file('username', '/home/myself/Pictures/holidays.jpg')
client.download_profile_photo('me')
messages = client.get_messages('username')
client.download_media(messages[0])
await client.download_profile_photo('me')
messages = await client.get_messages('username')
await client.download_media(messages[0])
Next steps
@ -61,5 +74,7 @@ Next steps
Do you like how Telethon looks? Check out
`Read The Docs <http://telethon.rtfd.io/>`_
for a more in-depth explanation, with examples,
troubleshooting issues, and more useful information.
for a more in-depth explanation, with examples, troubleshooting issues,
and more useful information. Note that the examples there are written for
the threaded version, not the one using asyncio. However, you just need to
await every remote call.

View File

@ -67,7 +67,7 @@ Or we call `telethon.telegram_client.TelegramClient.get_input_entity()`:
.. code-block:: python
peer = client.get_input_entity('someone')
peer = await client.get_input_entity('someone')
When you're going to invoke an API method, most require you to pass an
:tl:`InputUser`, :tl:`InputChat`, or so on, this is why using
@ -78,7 +78,7 @@ If you also need to have information about the whole user, use
.. code-block:: python
entity = client.get_entity('someone')
entity = await client.get_entity('someone')
In the later case, when you use the entity, the library will cast it to
its "input" version for you. If you already have the complete user and
@ -104,7 +104,7 @@ request we do:
.. code-block:: python
result = client(SendMessageRequest(peer, 'Hello there!'))
result = await client(SendMessageRequest(peer, 'Hello there!'))
# __call__ is an alias for client.invoke(request). Both will work
Message sent! Of course, this is only an example. There are nearly 250
@ -113,18 +113,19 @@ as you wish. Remember to use the right types! To sum up:
.. code-block:: python
result = client(SendMessageRequest(
client.get_input_entity('username'), 'Hello there!'
))
async def method():
result = await client(SendMessageRequest(
client.get_input_entity('username'), 'Hello there!'
))
This can further be simplified to:
.. code-block:: python
result = client(SendMessageRequest('username', 'Hello there!'))
result = await client(SendMessageRequest('username', 'Hello there!'))
# Or even
result = client(SendMessageRequest(PeerChannel(id), 'Hello there!'))
result = await client(SendMessageRequest(PeerChannel(id), 'Hello there!'))
.. note::

View File

@ -4,41 +4,23 @@
Update Modes
============
Using ``asyncio`` simplifies the way you can work with updates. The library
will always ensure the future of a loop that will poll updates for you, so
you can do other things in the mean time.
The library can run in four distinguishable modes:
- With no extra threads at all.
- With an extra thread that receives everything as soon as possible (default).
- With several worker threads that run your update handlers.
- A mix of the above.
Since this section is about updates, we'll describe the simplest way to
work with them.
Using multiple workers
**********************
When you create your client, simply pass a number to the
``update_workers`` parameter:
``client = TelegramClient('session', api_id, api_hash, update_workers=2)``
You can set any amount of workers you want. The more you put, the more
update handlers that can be called "at the same time". One or two should
suffice most of the time, since setting more will not make things run
faster most of the times (actually, it could slow things down).
The next thing you want to do is to add a method that will be called when
an `Update`__ arrives:
Once you have your client ready, the next thing you want to do is to add a
method that will be called when an `Update`__ arrives:
.. code-block:: python
def callback(update):
import asyncio
loop = asyncio.get_event_loop()
async def callback(update):
print('I received', update)
client.add_event_handler(callback)
# do more work here, or simply sleep!
loop.run_until_complete(client.add_event_handler(callback))
loop.run_forever() # this blocks forever, don't let the script end!
That's it! This is the old way to listen for raw updates, with no further
processing. If this feels annoying for you, remember that you can always
@ -51,94 +33,18 @@ let's reply to them with the same text reversed:
from telethon.tl.types import UpdateShortMessage, PeerUser
def replier(update):
async def replier(update):
if isinstance(update, UpdateShortMessage) and not update.out:
client.send_message(PeerUser(update.user_id), update.message[::-1])
await client.send_message(PeerUser(update.user_id), update.message[::-1])
client.add_event_handler(replier)
input('Press enter to stop this!')
client.disconnect()
loop.run_until_complete(client.add_event_handler(replier))
loop.run_forever()
We only ask you one thing: don't keep this running for too long, or your
contacts will go mad.
Spawning no worker at all
*************************
All the workers do is loop forever and poll updates from a queue that is
filled from the ``ReadThread``, responsible for reading every item off
the network. If you only need a worker and the ``MainThread`` would be
doing no other job, this is the preferred way. You can easily do the same
as the workers like so:
.. code-block:: python
while True:
try:
update = client.updates.poll()
if not update:
continue
print('I received', update)
except KeyboardInterrupt:
break
client.disconnect()
Note that ``poll`` accepts a ``timeout=`` parameter, and it will return
``None`` if other thread got the update before you could or if the timeout
expired, so it's important to check ``if not update``.
This can coexist with the rest of ``N`` workers, or you can set it to ``0``
additional workers:
``client = TelegramClient('session', api_id, api_hash, update_workers=0)``
You **must** set it to ``0`` (or higher), as it defaults to ``None`` and that
has a different meaning. ``None`` workers means updates won't be processed
*at all*, so you must set it to some integer value if you want
``client.updates.poll()`` to work.
Using the main thread instead the ``ReadThread``
************************************************
If you have no work to do on the ``MainThread`` and you were planning to have
a ``while True: sleep(1)``, don't do that. Instead, don't spawn the secondary
``ReadThread`` at all like so:
.. code-block:: python
client = TelegramClient(
...
spawn_read_thread=False
)
And then ``.idle()`` from the ``MainThread``:
``client.idle()``
You can stop it with :kbd:`Control+C`, and you can configure the signals
to be used in a similar fashion to `Python Telegram Bot`__.
As a complete example:
.. code-block:: python
def callback(update):
print('I received', update)
client = TelegramClient('session', api_id, api_hash,
update_workers=1, spawn_read_thread=False)
client.connect()
client.add_event_handler(callback)
client.idle() # ends with Ctrl+C
This is the preferred way to use if you're simply going to listen for updates.
__ https://lonamiwebs.github.io/Telethon/types/update.html
__ https://github.com/python-telegram-bot/python-telegram-bot/blob/4b3315db6feebafb94edcaa803df52bb49999ced/telegram/ext/updater.py#L460

View File

@ -49,7 +49,7 @@ your disk. This is by default a database file using Python's ``sqlite3``.
Before using the client, you must be connected to Telegram.
Doing so is very easy:
``client.connect() # Must return True, otherwise, try again``
``await client.connect() # Must return True, otherwise, try again``
You may or may not be authorized yet. You must be authorized
before you're able to send any request:
@ -61,8 +61,8 @@ If you're not authorized, you need to ``.sign_in()``:
.. code-block:: python
phone_number = '+34600000000'
client.send_code_request(phone_number)
myself = client.sign_in(phone_number, input('Enter code: '))
await client.send_code_request(phone_number)
myself = await client.sign_in(phone_number, input('Enter code: '))
# If .sign_in raises PhoneNumberUnoccupiedError, use .sign_up instead
# If .sign_in raises SessionPasswordNeeded error, call .sign_in(password=...)
# You can import both exceptions from telethon.errors.
@ -84,19 +84,21 @@ As a full example:
.. code-block:: python
client = TelegramClient('anon', api_id, api_hash)
assert client.connect()
if not client.is_user_authorized():
client.send_code_request(phone_number)
me = client.sign_in(phone_number, input('Enter code: '))
async def main():
client = TelegramClient('anon', api_id, api_hash)
assert await client.connect()
if not client.is_user_authorized():
await client.send_code_request(phone_number)
me = await client.sign_in(phone_number, input('Enter code: '))
All of this, however, can be done through a call to ``.start()``:
.. code-block:: python
client = TelegramClient('anon', api_id, api_hash)
client.start()
async def main():
client = TelegramClient('anon', api_id, api_hash)
await client.start()
The code shown is just what ``.start()`` will be doing behind the scenes
@ -110,6 +112,19 @@ is just a matter of taste, and how much control you need.
Remember that you can get yourself at any time with ``client.get_me()``.
Assuming you've written all of this in a ``async def main():``, you can
run it with:
.. code-block:: python
import asyncio
async def main():
...
asyncio.get_event_loop().run_until_complete(main())
.. warning::
Please note that if you fail to login around 5 times (or change the first
parameter of the ``TelegramClient``, which is the session name) you will
@ -146,11 +161,11 @@ account, calling :meth:`telethon.TelegramClient.sign_in` will raise a
import getpass
from telethon.errors import SessionPasswordNeededError
client.sign_in(phone)
await client.sign_in(phone)
try:
client.sign_in(code=input('Enter code: '))
await client.sign_in(code=input('Enter code: '))
except SessionPasswordNeededError:
client.sign_in(password=getpass.getpass())
await client.sign_in(password=getpass.getpass())
The mentioned ``.start()`` method will handle this for you as well, but
@ -170,19 +185,19 @@ See the examples below:
from telethon.errors import EmailUnconfirmedError
# Sets 2FA password for first time:
client.edit_2fa(new_password='supersecurepassword')
await client.edit_2fa(new_password='supersecurepassword')
# Changes password:
client.edit_2fa(current_password='supersecurepassword',
new_password='changedmymind')
await client.edit_2fa(current_password='supersecurepassword',
new_password='changedmymind')
# Clears current password (i.e. removes 2FA):
client.edit_2fa(current_password='changedmymind', new_password=None)
await client.edit_2fa(current_password='changedmymind', new_password=None)
# Sets new password with recovery email:
try:
client.edit_2fa(new_password='memes and dreams',
email='JohnSmith@example.com')
await client.edit_2fa(new_password='memes and dreams',
email='JohnSmith@example.com')
# Raises error (you need to check your email to complete 2FA setup.)
except EmailUnconfirmedError:
# You can put email checking code here if desired.
@ -192,9 +207,9 @@ See the examples below:
# give email parameter again it will keep the last used setting
# Set hint after already setting password:
client.edit_2fa(current_password='memes and dreams',
new_password='memes and dreams',
hint='It keeps you alive')
await client.edit_2fa(current_password='memes and dreams',
new_password='memes and dreams',
hint='It keeps you alive')
__ https://github.com/Anorov/PySocks#installation
__ https://github.com/Anorov/PySocks#usage-1

View File

@ -38,35 +38,34 @@ Getting entities
Through the use of the :ref:`sessions`, the library will automatically
remember the ID and hash pair, along with some extra information, so
you're able to just do this:
you're able to just do this (inside an ``async def``):
.. code-block:: python
# Dialogs are the "conversations you have open".
# This method returns a list of Dialog, which
# has the .entity attribute and other information.
dialogs = client.get_dialogs()
dialogs = await client.get_dialogs()
# All of these work and do the same.
lonami = client.get_entity('lonami')
lonami = client.get_entity('t.me/lonami')
lonami = client.get_entity('https://telegram.dog/lonami')
lonami = await client.get_entity('lonami')
lonami = await client.get_entity('t.me/lonami')
lonami = await client.get_entity('https://telegram.dog/lonami')
# Other kind of entities.
channel = client.get_entity('telegram.me/joinchat/AAAAAEkk2WdoDrB4-Q8-gg')
contact = client.get_entity('+34xxxxxxxxx')
friend = client.get_entity(friend_id)
channel = await client.get_entity('telegram.me/joinchat/AAAAAEkk2WdoDrB4-Q8-gg')
contact = await client.get_entity('+34xxxxxxxxx')
friend = await client.get_entity(friend_id)
# Getting entities through their ID (User, Chat or Channel)
entity = client.get_entity(some_id)
entity = await client.get_entity(some_id)
# You can be more explicit about the type for said ID by wrapping
# it inside a Peer instance. This is recommended but not necessary.
from telethon.tl.types import PeerUser, PeerChat, PeerChannel
my_user = client.get_entity(PeerUser(some_id))
my_chat = client.get_entity(PeerChat(some_id))
my_channel = client.get_entity(PeerChannel(some_id))
my_user = await client.get_entity(PeerUser(some_id))
my_chat = await client.get_entity(PeerChat(some_id))
my_channel = await client.get_entity(PeerChannel(some_id))
All methods in the :ref:`telegram-client` call ``.get_input_entity()`` prior
@ -126,7 +125,7 @@ library, the raw requests you make to the API are also able to call
.. code-block:: python
client(SendMessageRequest('username', 'hello'))
await client(SendMessageRequest('username', 'hello'))
The library will call the ``.resolve()`` method of the request, which will
resolve ``'username'`` with the appropriated :tl:`InputPeer`. Don't worry if

View File

@ -19,15 +19,19 @@ Creating a client
.. code-block:: python
import asyncio
loop = asyncio.get_event_loop()
from telethon import TelegramClient
# These example values won't work. You must get your own api_id and
# api_hash from https://my.telegram.org, under API Development.
api_id = 12345
api_hash = '0123456789abcdef0123456789abcdef'
client = TelegramClient('session_name', api_id, api_hash)
client.start()
loop.run_until_complete(client.start())
**More details**: :ref:`creating-a-client`
@ -37,31 +41,33 @@ Basic Usage
.. code-block:: python
# You should write all this inside of an async def.
#
# Getting information about yourself
print(client.get_me().stringify())
print((await client.get_me()).stringify())
# Sending a message (you can use 'me' or 'self' to message yourself)
client.send_message('username', 'Hello World from Telethon!')
await client.send_message('username', 'Hello World from Telethon!')
# Sending a file
client.send_file('username', '/home/myself/Pictures/holidays.jpg')
await client.send_file('username', '/home/myself/Pictures/holidays.jpg')
# Retrieving messages from a chat
from telethon import utils
for message in client.iter_messages('username', limit=10):
async for message in client.iter_messages('username', limit=10):
print(utils.get_display_name(message.sender), message.message)
# Listing all the dialogs (conversations you have open)
for dialog in client.get_dialogs(limit=10):
async for dialog in client.get_dialogs(limit=10):
print(utils.get_display_name(dialog.entity), dialog.draft.message)
# Downloading profile photos (default path is the working directory)
client.download_profile_photo('username')
await client.download_profile_photo('username')
# Once you have a message with .media (if message.media)
# you can download it using client.download_media():
messages = client.get_messages('username')
client.download_media(messages[0])
messages = await client.get_messages('username')
await client.download_media(messages[0])
**More details**: :ref:`telegram-client`
@ -73,17 +79,15 @@ Handling Updates
.. code-block:: python
import asyncio
from telethon import events
# We need to have some worker running
client.updates.workers = 1
@client.on(events.NewMessage(incoming=True, pattern='(?i)hi'))
def handler(event):
event.reply('Hello!')
async def handler(event):
await event.reply('Hello!')
# If you want to handle updates you can't let the script end.
input('Press enter to exit.')
asyncio.get_event_loop().run_forever()
**More details**: :ref:`working-with-updates`

View File

@ -32,7 +32,7 @@ need of manually importing the requests you need.
For instance, retrieving your own user can be done in a single line:
``myself = client.get_me()``
``myself = await client.get_me()``
Internally, this method has sent a request to Telegram, who replied with
the information about your own user, and then the desired information
@ -47,37 +47,38 @@ how the library refers to either of these:
# The method will infer that you've passed an username
# It also accepts phone numbers, and will get the user
# from your contact list.
lonami = client.get_entity('lonami')
lonami = await client.get_entity('lonami')
The so called "entities" are another important whole concept on its own,
but for now you don't need to worry about it. Simply know that they are
a good way to get information about an user, chat or channel.
Many other common methods for quick scripts are also available:
Many other common methods for quick scripts are also available.
Note that you should be writing this inside of an ``async def``:
.. code-block:: python
# Note that you can use 'me' or 'self' to message yourself
client.send_message('username', 'Hello World from Telethon!')
await client.send_message('username', 'Hello World from Telethon!')
# .send_message's parse mode defaults to markdown, so you
# can use **bold**, __italics__, [links](https://example.com), `code`,
# and even [mentions](@username)/[mentions](tg://user?id=123456789)
client.send_message('username', '**Using** __markdown__ `too`!')
await client.send_message('username', '**Using** __markdown__ `too`!')
client.send_file('username', '/home/myself/Pictures/holidays.jpg')
await client.send_file('username', '/home/myself/Pictures/holidays.jpg')
# The utils package has some goodies, like .get_display_name()
from telethon import utils
for message in client.iter_messages('username', limit=10):
async for message in client.iter_messages('username', limit=10):
print(utils.get_display_name(message.sender), message.message)
# Dialogs are the conversations you have open
for dialog in client.get_dialogs(limit=10):
async for dialog in client.get_dialogs(limit=10):
print(utils.get_display_name(dialog.entity), dialog.draft.message)
# Default path is the working directory
client.download_profile_photo('username')
await client.download_profile_photo('username')
# Call .disconnect() when you're done
client.disconnect()

View File

@ -30,27 +30,33 @@ Getting Started
.. code-block:: python
import asyncio
loop = asyncio.get_event_loop()
from telethon import TelegramClient, events
client = TelegramClient(..., update_workers=1, spawn_read_thread=False)
client.start()
client = TelegramClient(...)
loop.run_until_complete(client.start())
@client.on(events.NewMessage)
def my_event_handler(event):
async def my_event_handler(event):
if 'hello' in event.raw_text:
event.reply('hi!')
await event.reply('hi!')
client.idle()
loop.run_forever()
Not much, but there might be some things unclear. What does this code do?
.. code-block:: python
import asyncio
loop = asyncio.get_event_loop()
from telethon import TelegramClient, events
client = TelegramClient(..., update_workers=1, spawn_read_thread=False)
client.start()
client = TelegramClient(...)
loop.run_until_complete(client.start())
This is normal initialization (of course, pass session name, API ID and hash).
@ -67,9 +73,9 @@ the callback function you're about to define will be called:
.. code-block:: python
def my_event_handler(event):
async def my_event_handler(event):
if 'hello' in event.raw_text:
event.reply('hi!')
await event.reply('hi!')
If a ``NewMessage`` event occurs, and ``'hello'`` is in the text of the
@ -77,10 +83,10 @@ message, we ``reply`` to the event with a ``'hi!'`` message.
.. code-block:: python
client.idle()
loop.run_forever()
Finally, this tells the client that we're done with our code, and want
Finally, this tells the script that we're done with our code, and want
to listen for all these events to occur. Of course, you might want to
do other things instead idling. For this refer to :ref:`update-modes`.
@ -119,17 +125,17 @@ for example:
# Either a single item or a list of them will work for the chats.
# You can also use the IDs, Peers, or even User/Chat/Channel objects.
@client.on(events.NewMessage(chats=('TelethonChat', 'TelethonOffTopic')))
def normal_handler(event):
async def normal_handler(event):
if 'roll' in event.raw_text:
event.reply(str(random.randint(1, 6)))
await event.reply(str(random.randint(1, 6)))
# Similarly, you can use incoming=True for messages that you receive
@client.on(events.NewMessage(chats='TelethonOffTopic', outgoing=True))
def admin_handler(event):
async def admin_handler(event):
if event.raw_text.startswith('eval'):
expression = event.raw_text.replace('eval', '').strip()
event.reply(str(ast.literal_eval(expression)))
await event.reply(str(ast.literal_eval(expression)))
You can pass one or more chats to the ``chats`` parameter (as a list or tuple),
@ -167,15 +173,15 @@ propagation of the update through your handlers to stop:
from telethon.events import StopPropagation
@client.on(events.NewMessage)
def _(event):
async def _(event):
# ... some conditions
event.delete()
await event.delete()
# Other handlers won't have an event to work with
raise StopPropagation
@client.on(events.NewMessage)
def _(event):
async def _(event):
# Will never be reached, because it is the second handler
# in the chain.
pass

View File

@ -32,4 +32,4 @@ times, in this case, ``22222`` so we can hardcode that:
client = TelegramClient(None, api_id, api_hash)
client.session.set_dc(2, '149.154.167.40', 80)
client.start(phone='9996621234', code_callback=lambda: '22222')
await client.start(phone='9996621234', code_callback=lambda: '22222')

View File

@ -19,7 +19,7 @@ not *interact* with a voting message), by making use of the
from telethon.tl.functions.messages import GetInlineBotResultsRequest
bot_results = client(GetInlineBotResultsRequest(
bot_results = await client(GetInlineBotResultsRequest(
bot, user_or_chat, 'query', ''
))
@ -30,7 +30,7 @@ And you can select any of their results by using
from telethon.tl.functions.messages import SendInlineBotResultRequest
client(SendInlineBotResultRequest(
await client(SendInlineBotResultRequest(
get_input_peer(user_or_chat),
obtained_query_id,
obtained_str_id
@ -47,7 +47,7 @@ To interact with a message that has a special reply markup, such as
from telethon.tl.functions.messages import GetBotCallbackAnswerRequest
client(GetBotCallbackAnswerRequest(
await client(GetBotCallbackAnswerRequest(
user_or_chat,
msg.id,
data=msg.reply_markup.rows[wanted_row].buttons[wanted_button].data

View File

@ -25,11 +25,11 @@ to, you can make use of the :tl:`JoinChannelRequest` to join such channel:
.. code-block:: python
from telethon.tl.functions.channels import JoinChannelRequest
client(JoinChannelRequest(channel))
await client(JoinChannelRequest(channel))
# In the same way, you can also leave such channel
from telethon.tl.functions.channels import LeaveChannelRequest
client(LeaveChannelRequest(input_channel))
await client(LeaveChannelRequest(input_channel))
For more on channels, check the `channels namespace`__.
@ -51,7 +51,7 @@ example, is the ``hash`` of the chat or channel. Now you can use
.. code-block:: python
from telethon.tl.functions.messages import ImportChatInviteRequest
updates = client(ImportChatInviteRequest('AAAAAEHbEkejzxUjAUCfYg'))
updates = await client(ImportChatInviteRequest('AAAAAEHbEkejzxUjAUCfYg'))
Adding someone else to such chat or channel
@ -68,7 +68,7 @@ use is very straightforward, or :tl:`InviteToChannelRequest` for channels:
# Note that ``user_to_add`` is NOT the name of the parameter.
# It's the user you want to add (``user_id=user_to_add``).
client(AddChatUserRequest(
await client(AddChatUserRequest(
chat_id,
user_to_add,
fwd_limit=10 # Allow the user to see the 10 last messages
@ -77,7 +77,7 @@ use is very straightforward, or :tl:`InviteToChannelRequest` for channels:
# For channels (which includes megagroups)
from telethon.tl.functions.channels import InviteToChannelRequest
client(InviteToChannelRequest(
await client(InviteToChannelRequest(
channel,
[users_to_add]
))
@ -123,7 +123,7 @@ a fixed limit:
all_participants = []
while True:
participants = client(GetParticipantsRequest(
participants = await client(GetParticipantsRequest(
channel, ChannelParticipantsSearch(''), offset, limit,
hash=0
))
@ -193,7 +193,7 @@ Giving or revoking admin permissions can be done with the :tl:`EditAdminRequest`
# )
# Once you have a ChannelAdminRights, invoke it
client(EditAdminRequest(channel, user, rights))
await client(EditAdminRequest(channel, user, rights))
# User will now be able to change group info, delete other people's
# messages and pin messages.
@ -252,7 +252,7 @@ banned rights of an user through :tl:`EditAdminRequest` and its parameter
embed_links=True
)
client(EditBannedRequest(channel, user, rights))
await client(EditBannedRequest(channel, user, rights))
Kicking a member
@ -267,7 +267,7 @@ is enough:
from telethon.tl.functions.channels import EditBannedRequest
from telethon.tl.types import ChannelBannedRights
client(EditBannedRequest(channel, user, ChannelBannedRights(
await client(EditBannedRequest(channel, user, ChannelBannedRights(
until_date=None,
view_messages=True
)))
@ -291,7 +291,7 @@ use :tl:`GetMessagesViewsRequest`, setting ``increment=True``:
# Obtain `channel' through dialogs or through client.get_entity() or anyhow.
# Obtain `msg_ids' through `.get_messages()` or anyhow. Must be a list.
client(GetMessagesViewsRequest(
await client(GetMessagesViewsRequest(
peer=channel,
id=msg_ids,
increment=True

View File

@ -19,9 +19,9 @@ you should use :tl:`GetFullUser`:
from telethon.tl.functions.users import GetFullUserRequest
full = client(GetFullUserRequest(user))
full = await client(GetFullUserRequest(user))
# or even
full = client(GetFullUserRequest('username'))
full = await client(GetFullUserRequest('username'))
bio = full.about
@ -39,7 +39,7 @@ request. Omitted fields won't change after invoking :tl:`UpdateProfile`:
from telethon.tl.functions.account import UpdateProfileRequest
client(UpdateProfileRequest(about='This is a test from Telethon'))
await client(UpdateProfileRequest(about='This is a test from Telethon'))
Updating your username
@ -51,7 +51,7 @@ You need to use :tl:`account.UpdateUsername`:
from telethon.tl.functions.account import UpdateUsernameRequest
client(UpdateUsernameRequest('new_username'))
await client(UpdateUsernameRequest('new_username'))
Updating your profile photo
@ -65,6 +65,6 @@ through :tl:`UploadProfilePhoto`:
from telethon.tl.functions.photos import UploadProfilePhotoRequest
client(UploadProfilePhotoRequest(
await client(UploadProfilePhotoRequest(
client.upload_file('/path/to/some/file')
))

View File

@ -21,14 +21,14 @@ Forwarding messages
.. code-block:: python
# If you only have the message IDs
client.forward_messages(
await client.forward_messages(
entity, # to which entity you are forwarding the messages
message_ids, # the IDs of the messages (or message) to forward
from_entity # who sent the messages?
)
# If you have ``Message`` objects
client.forward_messages(
await client.forward_messages(
entity, # to which entity you are forwarding the messages
messages # the messages (or message) to forward
)
@ -40,7 +40,7 @@ Forwarding messages
from_entity = bar()
to_entity = baz()
client(ForwardMessagesRequest(
await client(ForwardMessagesRequest(
from_peer=from_entity, # who sent these messages?
id=[msg.id for msg in messages], # which are the messages?
to_peer=to_entity # who are we forwarding them to?
@ -71,7 +71,7 @@ into issues_. A valid example would be:
from telethon.tl.types import InputMessagesFilterEmpty
filter = InputMessagesFilterEmpty()
result = client(SearchRequest(
result = await client(SearchRequest(
peer=peer, # On which chat/conversation
q='query', # What to search for
filter=filter, # Filter to use (maybe filter for media)
@ -116,20 +116,20 @@ send yourself the very first sticker you have:
.. code-block:: python
# Get all the sticker sets this user has
sticker_sets = client(GetAllStickersRequest(0))
sticker_sets = await client(GetAllStickersRequest(0))
# Choose a sticker set
sticker_set = sticker_sets.sets[0]
# Get the stickers for this sticker set
stickers = client(GetStickerSetRequest(
stickers = await client(GetStickerSetRequest(
stickerset=InputStickerSetID(
id=sticker_set.id, access_hash=sticker_set.access_hash
)
))
# Stickers are nothing more than files, so send that
client(SendMediaRequest(
await client(SendMediaRequest(
peer=client.get_me(),
media=InputMediaDocument(
id=InputDocument(

View File

@ -22,6 +22,18 @@ when you upgrade!
contains the friendly methods that **you should use** most of the time.
.. note::
We assume that you have some experience working with ``asyncio``,
if you don't you should probably use the threaded version of the
library, or either learn how to use ``asyncio``. All the code
here assumes you're writing the code inside an ``async def`` so
we can use ``await`` across the examples.
Then you can ``import asyncio`` and run
``asyncio.get_event_loop().run_until_complete(my_method())``
What is this?
*************

View File

@ -131,12 +131,12 @@ def main():
from subprocess import run
from shutil import rmtree
for x in ('build', 'dist', 'Telethon.egg-info'):
for x in ('build', 'dist', 'Telethon_aio.egg-info'):
rmtree(x, ignore_errors=True)
run('python3 setup.py sdist', shell=True)
run('python3 setup.py bdist_wheel', shell=True)
run('twine upload dist/*', shell=True)
for x in ('build', 'dist', 'Telethon.egg-info'):
for x in ('build', 'dist', 'Telethon_aio.egg-info'):
rmtree(x, ignore_errors=True)
else:
@ -152,12 +152,13 @@ def main():
version = re.search(r"^__version__\s*=\s*'(.*)'.*$",
f.read(), flags=re.MULTILINE).group(1)
setup(
name='Telethon',
name='Telethon-aio',
version=version,
description="Full-featured Telegram client library for Python 3",
description="Full-featured Telegram client library for Python 3, "
"modified to work under Python's asyncio module.",
long_description=long_description,
url='https://github.com/LonamiWebs/Telethon',
url='https://github.com/LonamiWebs/Telethon/tree/asyncio',
download_url='https://github.com/LonamiWebs/Telethon/releases',
author='Lonami Exo',
@ -196,7 +197,8 @@ def main():
'telethon_generator/parser/tl_object.py',
'telethon_generator/parser/tl_parser.py',
]),
install_requires=['pyaes', 'rsa',
# We must be careful not to miss any comma here... v
install_requires=['pyaes', 'rsa', 'async_generator',
'typing' if version_info < (3, 5, 2) else ""],
extras_require={
'cryptg': ['cryptg']

View File

@ -30,7 +30,7 @@ class CdnDecrypter:
self.cdn_file_hashes = cdn_file_hashes
@staticmethod
def prepare_decrypter(client, cdn_client, cdn_redirect):
async def prepare_decrypter(client, cdn_client, cdn_redirect):
"""
Prepares a new CDN decrypter.
@ -52,14 +52,14 @@ class CdnDecrypter:
cdn_aes, cdn_redirect.cdn_file_hashes
)
cdn_file = cdn_client(GetCdnFileRequest(
cdn_file = await cdn_client(GetCdnFileRequest(
file_token=cdn_redirect.file_token,
offset=cdn_redirect.cdn_file_hashes[0].offset,
limit=cdn_redirect.cdn_file_hashes[0].limit
))
if isinstance(cdn_file, CdnFileReuploadNeeded):
# We need to use the original client here
client(ReuploadCdnFileRequest(
await client(ReuploadCdnFileRequest(
file_token=cdn_redirect.file_token,
request_token=cdn_file.request_token
))
@ -73,7 +73,7 @@ class CdnDecrypter:
return decrypter, cdn_file
def get_file(self):
async def get_file(self):
"""
Calls GetCdnFileRequest and decrypts its bytes.
Also ensures that the file hasn't been tampered.
@ -82,7 +82,7 @@ class CdnDecrypter:
"""
if self.cdn_file_hashes:
cdn_hash = self.cdn_file_hashes.pop(0)
cdn_file = self.client(GetCdnFileRequest(
cdn_file = await self.client(GetCdnFileRequest(
self.file_token, cdn_hash.offset, cdn_hash.limit
))
cdn_file.bytes = self.cdn_aes.encrypt(cdn_file.bytes)

View File

@ -158,15 +158,15 @@ class ChatAction(EventBuilder):
self.new_title = new_title
self.unpin = unpin
def respond(self, *args, **kwargs):
async def respond(self, *args, **kwargs):
"""
Responds to the chat action message (not as a reply). Shorthand for
`telethon.telegram_client.TelegramClient.send_message` with
``entity`` already set.
"""
return self._client.send_message(self.input_chat, *args, **kwargs)
return await self._client.send_message(await self.input_chat, *args, **kwargs)
def reply(self, *args, **kwargs):
async def reply(self, *args, **kwargs):
"""
Replies to the chat action message (as a reply). Shorthand for
`telethon.telegram_client.TelegramClient.send_message` with
@ -178,9 +178,9 @@ class ChatAction(EventBuilder):
return self.respond(*args, **kwargs)
kwargs['reply_to'] = self.action_message.id
return self._client.send_message(self.input_chat, *args, **kwargs)
return await self._client.send_message(await self.input_chat, *args, **kwargs)
def delete(self, *args, **kwargs):
async def delete(self, *args, **kwargs):
"""
Deletes the chat action message. You're responsible for checking
whether you have the permission to do so, or to except the error
@ -191,12 +191,12 @@ class ChatAction(EventBuilder):
Does nothing if no message action triggered this event.
"""
if self.action_message:
return self._client.delete_messages(self.input_chat,
[self.action_message],
*args, **kwargs)
return await self._client.delete_messages(await self.input_chat,
[self.action_message],
*args, **kwargs)
@property
def pinned_message(self):
async def pinned_message(self):
"""
If ``new_pin`` is ``True``, this returns the (:tl:`Message`)
object that was pinned.
@ -204,8 +204,8 @@ class ChatAction(EventBuilder):
if self._pinned_message == 0:
return None
if isinstance(self._pinned_message, int) and self.input_chat:
r = self._client(functions.channels.GetMessagesRequest(
if isinstance(self._pinned_message, int) and await self.input_chat:
r = await self._client(functions.channels.GetMessagesRequest(
self._input_chat, [self._pinned_message]
))
try:
@ -221,7 +221,7 @@ class ChatAction(EventBuilder):
return self._pinned_message
@property
def added_by(self):
async def added_by(self):
"""
The user who added ``users``, if applicable (``None`` otherwise).
"""
@ -230,12 +230,12 @@ class ChatAction(EventBuilder):
self._entities.get(utils.get_peer_id(self._added_by))
if not self._added_by:
self._added_by = self._client.get_entity(self._added_by)
self._added_by = await self._client.get_entity(self._added_by)
return self._added_by
@property
def kicked_by(self):
async def kicked_by(self):
"""
The user who kicked ``users``, if applicable (``None`` otherwise).
"""
@ -244,27 +244,27 @@ class ChatAction(EventBuilder):
self._entities.get(utils.get_peer_id(self._kicked_by))
if not self._kicked_by:
self._kicked_by = self._client.get_entity(self._kicked_by)
self._kicked_by = await self._client.get_entity(self._kicked_by)
return self._kicked_by
@property
def user(self):
async def user(self):
"""
The first user that takes part in this action (e.g. joined).
Might be ``None`` if the information can't be retrieved or
there is no user taking part.
"""
if self.users:
if await self.users:
return self._users[0]
@property
def input_user(self):
async def input_user(self):
"""
Input version of the ``self.user`` property.
"""
if self.input_users:
if await self.input_users:
return self._input_users[0]
@property
@ -276,7 +276,7 @@ class ChatAction(EventBuilder):
return utils.get_peer_id(self._user_peers[0])
@property
def users(self):
async def users(self):
"""
A list of users that take part in this action (e.g. joined).
@ -296,7 +296,7 @@ class ChatAction(EventBuilder):
missing.append(peer)
try:
missing = self._client.get_entity(missing)
missing = await self._client.get_entity(missing)
except (TypeError, ValueError):
missing = []
@ -305,7 +305,7 @@ class ChatAction(EventBuilder):
return self._users
@property
def input_users(self):
async def input_users(self):
"""
Input version of the ``self.users`` property.
"""
@ -313,7 +313,7 @@ class ChatAction(EventBuilder):
self._input_users = []
for peer in self._user_peers:
try:
self._input_users.append(self._client.get_input_entity(
self._input_users.append(await self._client.get_input_entity(
peer
))
except (TypeError, ValueError):

View File

@ -7,7 +7,7 @@ from ..errors import RPCError
from ..tl import TLObject, types, functions
def _into_id_set(client, chats):
async def _into_id_set(client, chats):
"""Helper util to turn the input chat or chats into a set of IDs."""
if chats is None:
return None
@ -30,9 +30,9 @@ def _into_id_set(client, chats):
# 0x2d45687 == crc32(b'Peer')
result.add(utils.get_peer_id(chat))
else:
chat = client.get_input_entity(chat)
chat = await client.get_input_entity(chat)
if isinstance(chat, types.InputPeerSelf):
chat = client.get_me(input_peer=True)
chat = await client.get_me(input_peer=True)
result.add(utils.get_peer_id(chat))
return result
@ -62,10 +62,10 @@ class EventBuilder(abc.ABC):
def build(self, update):
"""Builds an event for the given update if possible, or returns None"""
def resolve(self, client):
async def resolve(self, client):
"""Helper method to allow event builders to be resolved before usage"""
self.chats = _into_id_set(client, self.chats)
self._self_id = client.get_me(input_peer=True).user_id
self.chats = await _into_id_set(client, self.chats)
self._self_id = (await client.get_me(input_peer=True)).user_id
def _filter_event(self, event):
"""
@ -103,7 +103,7 @@ class EventCommon(abc.ABC):
)
self.is_channel = isinstance(chat_peer, types.PeerChannel)
def _get_entity(self, msg_id, entity_id, chat=None):
async def _get_entity(self, msg_id, entity_id, chat=None):
"""
Helper function to call :tl:`GetMessages` on the give msg_id and
return the input entity whose ID is the given entity ID.
@ -115,11 +115,11 @@ class EventCommon(abc.ABC):
"""
try:
if isinstance(chat, types.InputPeerChannel):
result = self._client(
result = await self._client(
functions.channels.GetMessagesRequest(chat, [msg_id])
)
else:
result = self._client(
result = await self._client(
functions.messages.GetMessagesRequest([msg_id])
)
except RPCError:
@ -136,7 +136,7 @@ class EventCommon(abc.ABC):
return None, None
@property
def input_chat(self):
async def input_chat(self):
"""
The (:tl:`InputPeer`) (group, megagroup or channel) on which
the event occurred. This doesn't have the title or anything,
@ -148,7 +148,7 @@ class EventCommon(abc.ABC):
if self._input_chat is None and self._chat_peer is not None:
try:
self._input_chat = self._client.get_input_entity(
self._input_chat = await self._client.get_input_entity(
self._chat_peer
)
except (ValueError, TypeError):
@ -157,7 +157,7 @@ class EventCommon(abc.ABC):
# TODO For channels, getDifference? Maybe looking
# in the dialogs (which is already done) is enough.
if self._message_id is not None:
self._chat, self._input_chat = self._get_entity(
self._chat, self._input_chat = await self._get_entity(
self._message_id,
utils.get_peer_id(self._chat_peer)
)
@ -168,21 +168,21 @@ class EventCommon(abc.ABC):
return self._client
@property
def chat(self):
async def chat(self):
"""
The (:tl:`User` | :tl:`Chat` | :tl:`Channel`, optional) on which
the event occurred. This property may make an API call the first time
to get the most up to date version of the chat (mostly when the event
doesn't belong to a channel), so keep that in mind.
"""
if not self.input_chat:
if not await self.input_chat:
return None
if self._chat is None:
self._chat = self._entities.get(utils.get_peer_id(self._input_chat))
if self._chat is None:
self._chat = self._client.get_entity(self._input_chat)
self._chat = await self._client.get_entity(self._input_chat)
return self._chat

View File

@ -89,7 +89,7 @@ class MessageRead(EventBuilder):
return self._message_ids
@property
def messages(self):
async def messages(self):
"""
The list of :tl:`Message` **which contents'** were read.
@ -97,17 +97,17 @@ class MessageRead(EventBuilder):
was read instead checking if it's in here.
"""
if self._messages is None:
chat = self.input_chat
chat = await self.input_chat
if not chat:
self._messages = []
elif isinstance(chat, types.InputPeerChannel):
self._messages =\
self._client(functions.channels.GetMessagesRequest(
await self._client(functions.channels.GetMessagesRequest(
chat, self._message_ids
)).messages
else:
self._messages =\
self._client(functions.messages.GetMessagesRequest(
self._messages = \
await self._client(functions.messages.GetMessagesRequest(
self._message_ids
)).messages

View File

@ -154,34 +154,34 @@ class NewMessage(EventBuilder):
self.is_reply = bool(message.reply_to_msg_id)
self._reply_message = None
def respond(self, *args, **kwargs):
async def respond(self, *args, **kwargs):
"""
Responds to the message (not as a reply). Shorthand for
`telethon.telegram_client.TelegramClient.send_message` with
``entity`` already set.
"""
return self._client.send_message(self.input_chat, *args, **kwargs)
return await self._client.send_message(await self.input_chat, *args, **kwargs)
def reply(self, *args, **kwargs):
async def reply(self, *args, **kwargs):
"""
Replies to the message (as a reply). Shorthand for
`telethon.telegram_client.TelegramClient.send_message` with
both ``entity`` and ``reply_to`` already set.
"""
kwargs['reply_to'] = self.message.id
return self._client.send_message(self.input_chat, *args, **kwargs)
return await self._client.send_message(await self.input_chat, *args, **kwargs)
def forward_to(self, *args, **kwargs):
async def forward_to(self, *args, **kwargs):
"""
Forwards the message. Shorthand for
`telethon.telegram_client.TelegramClient.forward_messages` with
both ``messages`` and ``from_peer`` already set.
"""
kwargs['messages'] = self.message.id
kwargs['from_peer'] = self.input_chat
return self._client.forward_messages(*args, **kwargs)
kwargs['from_peer'] = await self.input_chat
return await self._client.forward_messages(*args, **kwargs)
def edit(self, *args, **kwargs):
async def edit(self, *args, **kwargs):
"""
Edits the message iff it's outgoing. Shorthand for
`telethon.telegram_client.TelegramClient.edit_message` with
@ -195,15 +195,15 @@ class NewMessage(EventBuilder):
if not self.message.out:
if not isinstance(self.message.to_id, types.PeerUser):
return None
me = self._client.get_me(input_peer=True)
me = await self._client.get_me(input_peer=True)
if self.message.to_id.user_id != me.user_id:
return None
return self._client.edit_message(self.input_chat,
self.message,
*args, **kwargs)
return await self._client.edit_message(await self.input_chat,
self.message,
*args, **kwargs)
def delete(self, *args, **kwargs):
async def delete(self, *args, **kwargs):
"""
Deletes the message. You're responsible for checking whether you
have the permission to do so, or to except the error otherwise.
@ -211,12 +211,12 @@ class NewMessage(EventBuilder):
`telethon.telegram_client.TelegramClient.delete_messages` with
``entity`` and ``message_ids`` already set.
"""
return self._client.delete_messages(self.input_chat,
[self.message],
*args, **kwargs)
return await self._client.delete_messages(await self.input_chat,
[self.message],
*args, **kwargs)
@property
def input_sender(self):
async def input_sender(self):
"""
This (:tl:`InputPeer`) is the input version of the user who
sent the message. Similarly to ``input_chat``, this doesn't have
@ -230,21 +230,21 @@ class NewMessage(EventBuilder):
return None
try:
self._input_sender = self._client.get_input_entity(
self._input_sender = await self._client.get_input_entity(
self.message.from_id
)
except (ValueError, TypeError):
# We can rely on self.input_chat for this
self._sender, self._input_sender = self._get_entity(
self._sender, self._input_sender = await self._get_entity(
self.message.id,
self.message.from_id,
chat=self.input_chat
chat=await self.input_chat
)
return self._input_sender
@property
def sender(self):
async def sender(self):
"""
This (:tl:`User`) may make an API call the first time to get
the most up to date version of the sender (mostly when the event
@ -252,7 +252,7 @@ class NewMessage(EventBuilder):
``input_sender`` needs to be available (often the case).
"""
if not self.input_sender:
if not await self.input_sender:
return None
if self._sender is None:
@ -260,7 +260,7 @@ class NewMessage(EventBuilder):
self._entities.get(utils.get_peer_id(self._input_sender))
if self._sender is None:
self._sender = self._client.get_entity(self._input_sender)
self._sender = await self._client.get_entity(self._input_sender)
return self._sender
@ -291,7 +291,7 @@ class NewMessage(EventBuilder):
return self.message.message
@property
def reply_message(self):
async def reply_message(self):
"""
This optional :tl:`Message` will make an API call the first
time to get the full :tl:`Message` object that one was replying to,
@ -301,12 +301,12 @@ class NewMessage(EventBuilder):
return None
if self._reply_message is None:
if isinstance(self.input_chat, types.InputPeerChannel):
r = self._client(functions.channels.GetMessagesRequest(
self.input_chat, [self.message.reply_to_msg_id]
if isinstance(await self.input_chat, types.InputPeerChannel):
r = await self._client(functions.channels.GetMessagesRequest(
await self.input_chat, [self.message.reply_to_msg_id]
))
else:
r = self._client(functions.messages.GetMessagesRequest(
r = await self._client(functions.messages.GetMessagesRequest(
[self.message.reply_to_msg_id]
))
if not isinstance(r, types.messages.MessagesNotModified):

View File

@ -22,7 +22,7 @@ class Raw(EventBuilder):
assert all(isinstance(x, type) for x in types)
self.types = tuple(types)
def resolve(self, client):
async def resolve(self, client):
pass
def build(self, update):

View File

@ -148,14 +148,14 @@ class UserUpdate(EventBuilder):
self.uploading = self.video = True
@property
def user(self):
async def user(self):
"""Alias around the chat (conversation)."""
return self.chat
return await self.chat
@property
def input_user(self):
async def input_user(self):
"""Alias around the input chat."""
return self.input_chat
return await self.input_chat
@property
def user_id(self):

View File

@ -194,9 +194,9 @@ def get_inner_text(text, entity):
"""
if isinstance(entity, TLObject):
entity = (entity,)
multiple = True
else:
multiple = False
else:
multiple = True
text = _add_surrogate(text)
result = []

View File

@ -1,31 +1,39 @@
"""
This module holds a rough implementation of the C# TCP client.
"""
# Python rough implementation of a C# TCP client
import asyncio
import errno
import logging
import socket
import time
from datetime import timedelta
from io import BytesIO, BufferedWriter
from threading import Lock
CONN_RESET_ERRNOS = {
errno.EBADF, errno.ENOTSOCK, errno.ENETUNREACH,
errno.EINVAL, errno.ENOTCONN, errno.EHOSTUNREACH,
errno.ECONNREFUSED, errno.ECONNRESET, errno.ECONNABORTED,
errno.ENETDOWN, errno.ENETRESET, errno.ECONNABORTED,
errno.EHOSTDOWN, errno.EPIPE, errno.ESHUTDOWN
}
# catched: EHOSTUNREACH, ECONNREFUSED, ECONNRESET, ENETUNREACH
# ConnectionError: EPIPE, ESHUTDOWN, ECONNABORTED, ECONNREFUSED, ECONNRESET
try:
import socks
except ImportError:
socks = None
MAX_TIMEOUT = 15 # in seconds
CONN_RESET_ERRNOS = {
errno.EBADF, errno.ENOTSOCK, errno.ENETUNREACH,
errno.EINVAL, errno.ENOTCONN
}
__log__ = logging.getLogger(__name__)
class TcpClient:
"""A simple TCP client to ease the work with sockets and proxies."""
def __init__(self, proxy=None, timeout=timedelta(seconds=5)):
class SocketClosed(ConnectionError):
pass
def __init__(self, proxy=None, timeout=timedelta(seconds=5), loop=None):
"""
Initializes the TCP client.
@ -34,7 +42,9 @@ class TcpClient:
"""
self.proxy = proxy
self._socket = None
self._closing_lock = Lock()
self._loop = loop if loop else asyncio.get_event_loop()
self._closed = asyncio.Event(loop=self._loop)
self._closed.set()
if isinstance(timeout, timedelta):
self.timeout = timeout.seconds
@ -54,9 +64,9 @@ class TcpClient:
else: # tuple, list, etc.
self._socket.set_proxy(*self.proxy)
self._socket.settimeout(self.timeout)
self._socket.setblocking(False)
def connect(self, ip, port):
async def connect(self, ip, port):
"""
Tries connecting forever to IP:port unless an OSError is raised.
@ -69,75 +79,71 @@ class TcpClient:
else:
mode, address = socket.AF_INET, (ip, port)
timeout = 1
while True:
try:
while not self._socket:
self._recreate_socket(mode)
try:
if not self._socket:
self._recreate_socket(mode)
self._socket.connect(address)
break # Successful connection, stop retrying to connect
except OSError as e:
__log__.info('OSError "%s" raised while connecting', e)
# Stop retrying to connect if proxy connection error occurred
if socks and isinstance(e, socks.ProxyConnectionError):
raise
# There are some errors that we know how to handle, and
# the loop will allow us to retry
if e.errno in (errno.EBADF, errno.ENOTSOCK, errno.EINVAL,
errno.ECONNREFUSED, # Windows-specific follow
getattr(errno, 'WSAEACCES', None)):
# Bad file descriptor, i.e. socket was closed, set it
# to none to recreate it on the next iteration
self._socket = None
time.sleep(timeout)
timeout *= 2
if timeout > MAX_TIMEOUT:
raise
else:
raise
await asyncio.wait_for(
self._loop.sock_connect(self._socket, address),
timeout=self.timeout,
loop=self._loop
)
self._closed.clear()
except asyncio.TimeoutError as e:
raise TimeoutError() from e
except OSError as e:
if e.errno in CONN_RESET_ERRNOS:
self._raise_connection_reset(e)
else:
raise
def _get_connected(self):
"""Determines whether the client is connected or not."""
return self._socket is not None and self._socket.fileno() >= 0
return not self._closed.is_set()
connected = property(fget=_get_connected)
def close(self):
"""Closes the connection."""
if self._closing_lock.locked():
# Already closing, no need to close again (avoid None.close())
return
with self._closing_lock:
try:
if self._socket is not None:
try:
if self._socket is not None:
if self.connected:
self._socket.shutdown(socket.SHUT_RDWR)
self._socket.close()
except OSError:
pass # Ignore ENOTCONN, EBADF, and any other error when closing
finally:
self._socket = None
self._socket.close()
except OSError:
pass # Ignore ENOTCONN, EBADF, and any other error when closing
finally:
self._socket = None
self._closed.set()
def write(self, data):
async def _wait_close(self, coro):
done, running = await asyncio.wait(
[coro, self._closed.wait()],
timeout=self.timeout,
return_when=asyncio.FIRST_COMPLETED,
loop=self._loop
)
for r in running:
r.cancel()
if not self.connected:
raise self.SocketClosed()
if not done:
raise TimeoutError()
return done.pop().result()
async def write(self, data):
"""
Writes (sends) the specified bytes to the connected peer.
:param data: the data to send.
"""
if self._socket is None:
self._raise_connection_reset(None)
# TODO Timeout may be an issue when sending the data, Changed in v3.5:
# The socket timeout is now the maximum total duration to send all data.
if not self.connected:
raise ConnectionResetError('No connection')
try:
self._socket.sendall(data)
except socket.timeout as e:
__log__.debug('socket.timeout "%s" while writing data', e)
raise TimeoutError() from e
except ConnectionError as e:
__log__.info('ConnectionError "%s" while writing data', e)
self._raise_connection_reset(e)
await self._wait_close(self.sock_sendall(data))
except self.SocketClosed:
raise ConnectionResetError('Socket has closed')
except OSError as e:
__log__.info('OSError "%s" while writing data', e)
if e.errno in CONN_RESET_ERRNOS:
@ -145,22 +151,22 @@ class TcpClient:
else:
raise
def read(self, size):
async def read(self, size):
"""
Reads (receives) a whole block of size bytes from the connected peer.
:param size: the size of the block to be read.
:return: the read data with len(data) == size.
"""
if self._socket is None:
self._raise_connection_reset(None)
with BufferedWriter(BytesIO(), buffer_size=size) as buffer:
bytes_left = size
partial = b''
while bytes_left != 0:
if not self.connected:
raise ConnectionResetError('No connection')
try:
partial = self._socket.recv(bytes_left)
except socket.timeout as e:
partial = await self._wait_close(self.sock_recv(bytes_left))
except TimeoutError as e:
# These are somewhat common if the server has nothing
# to send to us, so use a lower logging priority.
if bytes_left < size:
@ -173,12 +179,11 @@ class TcpClient:
'socket.timeout "%s" while reading data', e
)
raise TimeoutError() from e
except ConnectionError as e:
__log__.info('ConnectionError "%s" while reading data', e)
self._raise_connection_reset(e)
raise
except self.SocketClosed:
raise ConnectionResetError('Socket has closed while reading data')
except OSError as e:
if e.errno != errno.EBADF and self._closing_lock.locked():
if e.errno != errno.EBADF:
# Ignore bad file descriptor while closing
__log__.info('OSError "%s" while reading data', e)
@ -188,7 +193,7 @@ class TcpClient:
raise
if len(partial) == 0:
self._raise_connection_reset(None)
self._raise_connection_reset('No data on read')
buffer.write(partial)
bytes_left -= len(partial)
@ -197,8 +202,61 @@ class TcpClient:
buffer.flush()
return buffer.raw.getvalue()
def _raise_connection_reset(self, original):
"""Disconnects the client and raises ConnectionResetError."""
def _raise_connection_reset(self, error):
description = error if isinstance(error, str) else str(error)
if isinstance(error, str):
error = Exception(error)
self.close() # Connection reset -> flag as socket closed
raise ConnectionResetError('The server has closed the connection.')\
from original
raise ConnectionResetError(description) from error
# due to new https://github.com/python/cpython/pull/4386
def sock_recv(self, n):
fut = self._loop.create_future()
self._sock_recv(fut, None, n)
return fut
def _sock_recv(self, fut, registered_fd, n):
if registered_fd is not None:
self._loop.remove_reader(registered_fd)
if fut.cancelled() or self._socket is None:
return
try:
data = self._socket.recv(n)
except (BlockingIOError, InterruptedError):
fd = self._socket.fileno()
self._loop.add_reader(fd, self._sock_recv, fut, fd, n)
except Exception as exc:
fut.set_exception(exc)
else:
fut.set_result(data)
def sock_sendall(self, data):
fut = self._loop.create_future()
if data:
self._sock_sendall(fut, None, data)
else:
fut.set_result(None)
return fut
def _sock_sendall(self, fut, registered_fd, data):
if registered_fd:
self._loop.remove_writer(registered_fd)
if fut.cancelled() or self._socket is None:
return
try:
n = self._socket.send(data)
except (BlockingIOError, InterruptedError):
n = 0
except Exception as exc:
fut.set_exception(exc)
return
if n == len(data):
fut.set_result(None)
else:
if n:
data = data[n:]
fd = self._socket.fileno()
self._loop.add_writer(fd, self._sock_sendall, fut, fd, data)

View File

@ -3,9 +3,9 @@ import os
import struct
from hashlib import sha1, sha256
from telethon.crypto import AES
from telethon.errors import SecurityError
from telethon.extensions import BinaryReader
from .crypto import AES
from .errors import SecurityError
from .extensions import BinaryReader
# region Multiple utilities

View File

@ -20,7 +20,7 @@ from ..tl.functions import (
)
def do_authentication(connection, retries=5):
async def do_authentication(connection, retries=5):
"""
Performs the authentication steps on the given connection.
Raises an error if all attempts fail.
@ -35,14 +35,14 @@ def do_authentication(connection, retries=5):
last_error = None
while retries:
try:
return _do_authentication(connection)
return await _do_authentication(connection)
except (SecurityError, AssertionError, NotImplementedError) as e:
last_error = e
retries -= 1
raise last_error
def _do_authentication(connection):
async def _do_authentication(connection):
"""
Executes the authentication process with the Telegram servers.
@ -55,8 +55,8 @@ def _do_authentication(connection):
req_pq_request = ReqPqMultiRequest(
nonce=int.from_bytes(os.urandom(16), 'big', signed=True)
)
sender.send(bytes(req_pq_request))
with BinaryReader(sender.receive()) as reader:
await sender.send(bytes(req_pq_request))
with BinaryReader(await sender.receive()) as reader:
req_pq_request.on_response(reader)
res_pq = req_pq_request.result
@ -103,10 +103,10 @@ def _do_authentication(connection):
public_key_fingerprint=target_fingerprint,
encrypted_data=cipher_text
)
sender.send(bytes(req_dh_params))
await sender.send(bytes(req_dh_params))
# Step 2 response: DH Exchange
with BinaryReader(sender.receive()) as reader:
with BinaryReader(await sender.receive()) as reader:
req_dh_params.on_response(reader)
server_dh_params = req_dh_params.result
@ -173,10 +173,10 @@ def _do_authentication(connection):
server_nonce=res_pq.server_nonce,
encrypted_data=client_dh_encrypted,
)
sender.send(bytes(set_client_dh))
await sender.send(bytes(set_client_dh))
# Step 3 response: Complete DH Exchange
with BinaryReader(sender.receive()) as reader:
with BinaryReader(await sender.receive()) as reader:
set_client_dh.on_response(reader)
dh_gen = set_client_dh.result

View File

@ -2,6 +2,7 @@
This module holds the abstract `Connection` class.
"""
import abc
import asyncio
from datetime import timedelta
@ -12,15 +13,17 @@ class Connection(abc.ABC):
Subclasses should implement the actual protocol
being used when encoding/decoding messages.
"""
def __init__(self, proxy=None, timeout=timedelta(seconds=5)):
def __init__(self, proxy=None, timeout=timedelta(seconds=5), loop=None):
"""
Initializes a new connection.
:param proxy: whether to use a proxy or not.
:param timeout: timeout to be used for all operations.
:param loop: event loop to be used, or ``asyncio.get_event_loop()``.
"""
self._proxy = proxy
self._timeout = timeout
self._loop = loop or asyncio.get_event_loop()
@abc.abstractmethod
def connect(self, ip, port):
@ -32,7 +35,7 @@ class Connection(abc.ABC):
raise NotImplementedError
@abc.abstractmethod
def is_connected(self):
async def is_connected(self):
"""
Determines whether the connection is alive or not.
@ -51,11 +54,11 @@ class Connection(abc.ABC):
raise NotImplementedError
@abc.abstractmethod
def recv(self):
async def recv(self):
"""Receives and unpacks a message"""
raise NotImplementedError
@abc.abstractmethod
def send(self, message):
async def send(self, message):
"""Encapsulates and sends the given message"""
raise NotImplementedError

View File

@ -9,26 +9,26 @@ class ConnectionTcpAbridged(ConnectionTcpFull):
only require 1 byte if the packet length is less than
508 bytes (127 << 2, which is very common).
"""
def connect(self, ip, port):
result = super().connect(ip, port)
self.conn.write(b'\xef')
async def connect(self, ip, port):
result = await super().connect(ip, port)
await self.conn.write(b'\xef')
return result
def clone(self):
return ConnectionTcpAbridged(self._proxy, self._timeout)
def recv(self):
length = struct.unpack('<B', self.read(1))[0]
async def recv(self):
length = struct.unpack('<B', await self.read(1))[0]
if length >= 127:
length = struct.unpack('<i', self.read(3) + b'\0')[0]
length = struct.unpack('<i', await self.read(3) + b'\0')[0]
return self.read(length << 2)
return await self.read(length << 2)
def send(self, message):
async def send(self, message):
length = len(message) >> 2
if length < 127:
length = struct.pack('B', length)
else:
length = b'\x7f' + int.to_bytes(length, 3, 'little')
self.write(length + message)
await self.write(length + message)

View File

@ -13,16 +13,16 @@ class ConnectionTcpFull(Connection):
Default Telegram mode. Sends 12 additional bytes and
needs to calculate the CRC value of the packet itself.
"""
def __init__(self, proxy=None, timeout=timedelta(seconds=5)):
super().__init__(proxy, timeout)
def __init__(self, proxy=None, timeout=timedelta(seconds=5), loop=None):
super().__init__(proxy, timeout, loop)
self._send_counter = 0
self.conn = TcpClient(proxy=self._proxy, timeout=self._timeout)
self.read = self.conn.read
self.write = self.conn.write
def connect(self, ip, port):
async def connect(self, ip, port):
try:
self.conn.connect(ip, port)
await self.conn.connect(ip, port)
except OSError as e:
if e.errno == errno.EISCONN:
return # Already connected, no need to re-set everything up
@ -43,11 +43,11 @@ class ConnectionTcpFull(Connection):
def clone(self):
return ConnectionTcpFull(self._proxy, self._timeout)
def recv(self):
packet_len_seq = self.read(8) # 4 and 4
async def recv(self):
packet_len_seq = await self.read(8) # 4 and 4
packet_len, seq = struct.unpack('<ii', packet_len_seq)
body = self.read(packet_len - 12)
checksum = struct.unpack('<I', self.read(4))[0]
body = await self.read(packet_len - 12)
checksum = struct.unpack('<I', await self.read(4))[0]
valid_checksum = crc32(packet_len_seq + body)
if checksum != valid_checksum:
@ -55,11 +55,11 @@ class ConnectionTcpFull(Connection):
return body
def send(self, message):
async def send(self, message):
# https://core.telegram.org/mtproto#tcp-transport
# total length, sequence number, packet and checksum (CRC32)
length = len(message) + 12
data = struct.pack('<ii', length, self._send_counter) + message
crc = struct.pack('<I', crc32(data))
self._send_counter += 1
self.write(data + crc)
await self.write(data + crc)

View File

@ -8,16 +8,16 @@ class ConnectionTcpIntermediate(ConnectionTcpFull):
Intermediate mode between `ConnectionTcpFull` and `ConnectionTcpAbridged`.
Always sends 4 extra bytes for the packet length.
"""
def connect(self, ip, port):
result = super().connect(ip, port)
self.conn.write(b'\xee\xee\xee\xee')
async def connect(self, ip, port):
result = await super().connect(ip, port)
await self.conn.write(b'\xee\xee\xee\xee')
return result
def clone(self):
return ConnectionTcpIntermediate(self._proxy, self._timeout)
def recv(self):
return self.read(struct.unpack('<i', self.read(4))[0])
async def recv(self):
return await self.read(struct.unpack('<i', await self.read(4))[0])
def send(self, message):
self.write(struct.pack('<i', len(message)) + message)
async def send(self, message):
await self.write(struct.pack('<i', len(message)) + message)

View File

@ -12,14 +12,21 @@ class ConnectionTcpObfuscated(ConnectionTcpAbridged):
every message with a randomly generated key using the
AES-CTR mode so the packets are harder to discern.
"""
def __init__(self, proxy=None, timeout=timedelta(seconds=5)):
super().__init__(proxy, timeout)
def __init__(self, proxy=None, timeout=timedelta(seconds=5), loop=None):
super().__init__(proxy, timeout, loop)
self._aes_encrypt, self._aes_decrypt = None, None
self.read = lambda s: self._aes_decrypt.encrypt(self.conn.read(s))
self.write = lambda d: self.conn.write(self._aes_encrypt.encrypt(d))
def connect(self, ip, port):
result = ConnectionTcpFull.connect(self, ip, port)
async def read(size):
return self._aes_decrypt.encrypt(await self.conn.read(size))
async def write(data):
return await self.conn.write(self._aes_encrypt.encrypt(data))
self.read = read
self.write = write
async def connect(self, ip, port):
result = await ConnectionTcpFull.connect(self, ip, port)
# Obfuscated messages secrets cannot start with any of these
keywords = (b'PVrG', b'GET ', b'POST', b'\xee' * 4)
while True:
@ -43,7 +50,7 @@ class ConnectionTcpObfuscated(ConnectionTcpAbridged):
self._aes_decrypt = AESModeCTR(decrypt_key, decrypt_iv)
random[56:64] = self._aes_encrypt.encrypt(bytes(random))[56:64]
self.conn.write(bytes(random))
await self.conn.write(bytes(random))
return result
def clone(self):

View File

@ -26,32 +26,32 @@ class MtProtoPlainSender:
self._last_msg_id = 0
self._connection = connection
def connect(self):
async def connect(self):
"""Connects to Telegram's servers."""
self._connection.connect()
await self._connection.connect()
def disconnect(self):
"""Disconnects from Telegram's servers."""
self._connection.close()
def send(self, data):
async def send(self, data):
"""
Sends a plain packet (auth_key_id = 0) containing the
given message body (data).
:param data: the data to be sent.
"""
self._connection.send(
await self._connection.send(
struct.pack('<QQi', 0, self._get_new_msg_id(), len(data)) + data
)
def receive(self):
async def receive(self):
"""
Receives a plain packet from the network.
:return: the response body.
"""
body = self._connection.recv()
body = await self._connection.recv()
if body == b'l\xfe\xff\xff': # -404 little endian signed
# Broken authorization, must reset the auth key
raise BrokenAuthKeyError()

View File

@ -2,8 +2,9 @@
This module contains the class used to communicate with Telegram's servers
encrypting every packet, and relies on a valid AuthKey in the used Session.
"""
import asyncio
import logging
from threading import Lock
from asyncio import Event
from .. import helpers, utils
from ..errors import (
@ -13,11 +14,11 @@ from ..errors import (
from ..extensions import BinaryReader
from ..tl import TLMessage, MessageContainer, GzipPacked
from ..tl.all_tlobjects import tlobjects
from ..tl.functions import InvokeAfterMsgRequest
from ..tl.functions.auth import LogOutRequest
from ..tl.types import (
MsgsAck, Pong, BadServerSalt, BadMsgNotification, FutureSalts,
MsgNewDetailedInfo, NewSessionCreated, MsgDetailedInfo
MsgNewDetailedInfo, MsgDetailedInfo, MsgsStateReq, MsgResendReq,
MsgsAllInfo, MsgsStateInfo, RpcError
)
__log__ = logging.getLogger(__name__)
@ -35,7 +36,7 @@ class MtProtoSender:
in parallel, so thread-safety (hence locking) isn't needed.
"""
def __init__(self, session, connection):
def __init__(self, session, connection, loop=None):
"""
Initializes a new MTProto sender.
@ -44,28 +45,26 @@ class MtProtoSender:
port of the server, salt, ID, and AuthKey,
:param connection:
the Connection to be used.
:param loop:
the asyncio loop to be used, or the default one.
"""
self.session = session
self.connection = connection
# Message IDs that need confirmation
self._need_confirmation = set()
# Requests (as msg_id: Message) sent waiting to be received
self._pending_receive = {}
# Multithreading
self._send_lock = Lock()
self._loop = loop if loop else asyncio.get_event_loop()
# If we're invoking something from an update thread but we're also
# receiving other request from the main thread (e.g. an update arrives
# and we need to process it) we must ensure that only one is calling
# receive at a given moment, since the receive step is fragile.
self._recv_lock = Lock()
self._read_lock = asyncio.Lock(loop=self._loop)
self._write_lock = asyncio.Lock(loop=self._loop)
def connect(self):
# Requests (as msg_id: Message) sent waiting to be received
self._pending_receive = {}
async def connect(self):
"""Connects to the server."""
self.connection.connect(self.session.server_address, self.session.port)
await self.connection.connect(self.session.server_address, self.session.port)
def is_connected(self):
"""
@ -75,15 +74,16 @@ class MtProtoSender:
"""
return self.connection.is_connected()
def disconnect(self):
def disconnect(self, clear_pendings=True):
"""Disconnects from the server."""
__log__.info('Disconnecting MtProtoSender...')
self.connection.close()
self._clear_all_pending()
if clear_pendings:
self._clear_all_pending()
# region Send and receive
def send(self, requests, ordered=False):
async def send(self, requests, ordered=False):
"""
Sends the specified TLObject(s) (which must be requests),
and acknowledging any message which needed confirmation.
@ -92,10 +92,18 @@ class MtProtoSender:
:param ordered: whether the requests should be invoked in the
order in which they appear or they can be executed
in arbitrary order in the server.
:return: a list of msg_ids which are correspond to sent requests.
"""
if not utils.is_list_like(requests):
requests = (requests,)
# Prepare the event of every request
for r in requests:
if r.confirm_received is None:
r.confirm_received = Event(loop=self._loop)
else:
r.confirm_received.clear()
if ordered:
requests = iter(requests)
messages = [TLMessage(self.session, next(requests))]
@ -106,19 +114,13 @@ class MtProtoSender:
messages = [TLMessage(self.session, r) for r in requests]
self._pending_receive.update({m.msg_id: m for m in messages})
msg_ids = [m.msg_id for m in messages]
__log__.debug('Sending requests with IDs: %s', ', '.join(
'{}: {}'.format(m.request.__class__.__name__, m.msg_id)
for m in messages
))
# Pack everything in the same container if we need to send AckRequests
if self._need_confirmation:
messages.append(
TLMessage(self.session, MsgsAck(list(self._need_confirmation)))
)
self._need_confirmation.clear()
if len(messages) == 1:
message = messages[0]
else:
@ -129,13 +131,19 @@ class MtProtoSender:
for m in messages:
m.container_msg_id = message.msg_id
self._send_message(message)
await self._send_message(message)
return msg_ids
def _send_acknowledge(self, msg_id):
def forget_pendings(self, msg_ids):
for msg_id in msg_ids:
if msg_id in self._pending_receive:
del self._pending_receive[msg_id]
async def _send_acknowledge(self, msg_id):
"""Sends a message acknowledge for the given msg_id."""
self._send_message(TLMessage(self.session, MsgsAck([msg_id])))
await self._send_message(TLMessage(self.session, MsgsAck([msg_id])))
def receive(self, update_state):
async def receive(self, updates_handler):
"""
Receives a single message from the connected endpoint.
@ -146,21 +154,13 @@ class MtProtoSender:
Any unhandled object (likely updates) will be passed to
update_state.process(TLObject).
:param update_state:
the UpdateState that will process all the received
:param updates_handler:
the handler that will process all the received
Update and Updates objects.
"""
if self._recv_lock.locked():
with self._recv_lock:
# Don't busy wait, acquire it but return because there's
# already a receive running and we don't want another one.
# It would lock until Telegram sent another update even if
# the current receive already received the expected response.
return
await self._read_lock.acquire()
try:
with self._recv_lock:
body = self.connection.recv()
body = await self.connection.recv()
except (BufferError, InvalidChecksumError):
# TODO BufferError, we should spot the cause...
# "No more bytes left"; something wrong happened, clear
@ -174,23 +174,28 @@ class MtProtoSender:
len(self._pending_receive))
self._clear_all_pending()
return
finally:
self._read_lock.release()
message, remote_msg_id, remote_seq = self._decode_msg(body)
with BinaryReader(message) as reader:
self._process_msg(remote_msg_id, remote_seq, reader, update_state)
await self._process_msg(remote_msg_id, remote_seq, reader, updates_handler)
# endregion
# region Low level processing
def _send_message(self, message):
async def _send_message(self, message):
"""
Sends the given encrypted through the network.
:param message: the TLMessage to be sent.
"""
with self._send_lock:
self.connection.send(helpers.pack_message(self.session, message))
await self._write_lock.acquire()
try:
await self.connection.send(helpers.pack_message(self.session, message))
finally:
self._write_lock.release()
def _decode_msg(self, body):
"""
@ -208,18 +213,17 @@ class MtProtoSender:
with BinaryReader(body) as reader:
return helpers.unpack_message(self.session, reader)
def _process_msg(self, msg_id, sequence, reader, state):
async def _process_msg(self, msg_id, sequence, reader, updates_handler):
"""
Processes the message read from the network inside reader.
:param msg_id: the ID of the message.
:param sequence: the sequence of the message.
:param reader: the BinaryReader that contains the message.
:param state: the current UpdateState.
:param updates_handler: the handler to process Update and Updates objects.
:return: true if the message was handled correctly, false otherwise.
"""
# TODO Check salt, session_id and sequence_number
self._need_confirmation.add(msg_id)
code = reader.read_int(signed=False)
reader.seek(-4)
@ -227,15 +231,16 @@ class MtProtoSender:
# These are a bit of special case, not yet generated by the code gen
if code == 0xf35c6d01: # rpc_result, (response of an RPC call)
__log__.debug('Processing Remote Procedure Call result')
return self._handle_rpc_result(msg_id, sequence, reader)
await self._send_acknowledge(msg_id)
return await self._handle_rpc_result(msg_id, sequence, reader)
if code == MessageContainer.CONSTRUCTOR_ID:
__log__.debug('Processing container result')
return self._handle_container(msg_id, sequence, reader, state)
return await self._handle_container(msg_id, sequence, reader, updates_handler)
if code == GzipPacked.CONSTRUCTOR_ID:
__log__.debug('Processing gzipped result')
return self._handle_gzip_packed(msg_id, sequence, reader, state)
return await self._handle_gzip_packed(msg_id, sequence, reader, updates_handler)
if code not in tlobjects:
__log__.warning(
@ -248,25 +253,30 @@ class MtProtoSender:
__log__.debug('Processing %s result', type(obj).__name__)
if isinstance(obj, Pong):
return self._handle_pong(msg_id, sequence, obj)
return await self._handle_pong(msg_id, sequence, obj)
if isinstance(obj, BadServerSalt):
return self._handle_bad_server_salt(msg_id, sequence, obj)
return await self._handle_bad_server_salt(msg_id, sequence, obj)
if isinstance(obj, (MsgsStateReq, MsgResendReq)):
# just answer we don't know anything
return await self._handle_msgs_state_forgotten(msg_id, sequence, obj)
if isinstance(obj, MsgsAllInfo):
# not interesting now
return True
if isinstance(obj, BadMsgNotification):
return self._handle_bad_msg_notification(msg_id, sequence, obj)
return await self._handle_bad_msg_notification(msg_id, sequence, obj)
if isinstance(obj, MsgDetailedInfo):
return self._handle_msg_detailed_info(msg_id, sequence, obj)
return await self._handle_msg_detailed_info(msg_id, sequence, obj)
if isinstance(obj, MsgNewDetailedInfo):
return self._handle_msg_new_detailed_info(msg_id, sequence, obj)
if isinstance(obj, NewSessionCreated):
return self._handle_new_session_created(msg_id, sequence, obj)
return await self._handle_msg_new_detailed_info(msg_id, sequence, obj)
if isinstance(obj, MsgsAck): # may handle the request we wanted
# Ignore every ack request *unless* when logging out, when it's
# Ignore every ack request *unless* when logging out,
# when it seems to only make sense. We also need to set a non-None
# result since Telegram doesn't send the response for these.
for msg_id in obj.msg_ids:
@ -287,8 +297,9 @@ class MtProtoSender:
# If the object isn't any of the above, then it should be an Update.
self.session.process_entities(obj)
if state:
state.process(obj)
await self._send_acknowledge(msg_id)
if updates_handler:
updates_handler(obj)
return True
@ -343,7 +354,7 @@ class MtProtoSender:
__log__.info('Abruptly confirming %s', type(r).__name__)
self._pending_receive.clear()
def _resend_request(self, msg_id):
async def _resend_request(self, msg_id):
"""
Re-sends the request that belongs to a certain msg_id. This may
also be the msg_id of a container if they were sent in one.
@ -352,12 +363,13 @@ class MtProtoSender:
"""
request = self._pop_request(msg_id)
if request:
return self.send(request)
await self.send(request)
return
requests = self._pop_requests_of_container(msg_id)
if requests:
return self.send(*requests)
await self.send(*requests)
def _handle_pong(self, msg_id, sequence, pong):
async def _handle_pong(self, msg_id, sequence, pong):
"""
Handles a Pong response.
@ -374,22 +386,24 @@ class MtProtoSender:
return True
def _handle_container(self, msg_id, sequence, reader, state):
async def _handle_container(self, msg_id, sequence, reader, updates_handler):
"""
Handles a MessageContainer response.
:param msg_id: the ID of the message.
:param sequence: the sequence of the message.
:param reader: the reader containing the MessageContainer.
:param updates_handler: handler to handle Update and Updates objects.
:return: true, as it always succeeds.
"""
__log__.debug('Handling container')
for inner_msg_id, _, inner_len in MessageContainer.iter_read(reader):
begin_position = reader.tell_position()
# Note that this code is IMPORTANT for skipping RPC results of
# lost requests (i.e., ones from the previous connection session)
try:
if not self._process_msg(inner_msg_id, sequence, reader, state):
if not await self._process_msg(inner_msg_id, sequence, reader, updates_handler):
reader.set_position(begin_position + inner_len)
except:
# If any error is raised, something went wrong; skip the packet
@ -398,7 +412,7 @@ class MtProtoSender:
return True
def _handle_bad_server_salt(self, msg_id, sequence, bad_salt):
async def _handle_bad_server_salt(self, msg_id, sequence, bad_salt):
"""
Handles a BadServerSalt response.
@ -408,14 +422,18 @@ class MtProtoSender:
:return: true, as it always succeeds.
"""
self.session.salt = bad_salt.new_server_salt
self.session.save()
# "the bad_server_salt response is received with the
# correct salt, and the message is to be re-sent with it"
self._resend_request(bad_salt.bad_msg_id)
await self._resend_request(bad_salt.bad_msg_id)
return True
def _handle_bad_msg_notification(self, msg_id, sequence, bad_msg):
async def _handle_msgs_state_forgotten(self, msg_id, sequence, req):
await self._send_message(TLMessage(self.session, MsgsStateInfo(msg_id, chr(1) * len(req.msg_ids))))
return True
async def _handle_bad_msg_notification(self, msg_id, sequence, bad_msg):
"""
Handles a BadMessageError response.
@ -431,25 +449,25 @@ class MtProtoSender:
# Use the current msg_id to determine the right time offset.
self.session.update_time_offset(correct_msg_id=msg_id)
__log__.info('Attempting to use the correct time offset')
self._resend_request(bad_msg.bad_msg_id)
await self._resend_request(bad_msg.bad_msg_id)
return True
elif bad_msg.error_code == 32:
# msg_seqno too low, so just pump it up by some "large" amount
# TODO A better fix would be to start with a new fresh session ID
self.session.sequence += 64
__log__.info('Attempting to set the right higher sequence')
self._resend_request(bad_msg.bad_msg_id)
await self._resend_request(bad_msg.bad_msg_id)
return True
elif bad_msg.error_code == 33:
# msg_seqno too high never seems to happen but just in case
self.session.sequence -= 16
__log__.info('Attempting to set the right lower sequence')
self._resend_request(bad_msg.bad_msg_id)
await self._resend_request(bad_msg.bad_msg_id)
return True
else:
raise error
def _handle_msg_detailed_info(self, msg_id, sequence, msg_new):
async def _handle_msg_detailed_info(self, msg_id, sequence, msg_new):
"""
Handles a MsgDetailedInfo response.
@ -460,10 +478,10 @@ class MtProtoSender:
"""
# TODO For now, simply ack msg_new.answer_msg_id
# Relevant tdesktop source code: https://goo.gl/VvpCC6
self._send_acknowledge(msg_new.answer_msg_id)
await self._send_acknowledge(msg_new.answer_msg_id)
return True
def _handle_msg_new_detailed_info(self, msg_id, sequence, msg_new):
async def _handle_msg_new_detailed_info(self, msg_id, sequence, msg_new):
"""
Handles a MsgNewDetailedInfo response.
@ -474,23 +492,10 @@ class MtProtoSender:
"""
# TODO For now, simply ack msg_new.answer_msg_id
# Relevant tdesktop source code: https://goo.gl/G7DPsR
self._send_acknowledge(msg_new.answer_msg_id)
await self._send_acknowledge(msg_new.answer_msg_id)
return True
def _handle_new_session_created(self, msg_id, sequence, new_session):
"""
Handles a NewSessionCreated response.
:param msg_id: the ID of the message.
:param sequence: the sequence of the message.
:param reader: the reader containing the NewSessionCreated.
:return: true, as it always succeeds.
"""
self.session.salt = new_session.server_salt
# TODO https://goo.gl/LMyN7A
return True
def _handle_rpc_result(self, msg_id, sequence, reader):
async def _handle_rpc_result(self, msg_id, sequence, reader):
"""
Handles a RPCResult response.
@ -508,7 +513,7 @@ class MtProtoSender:
__log__.debug('Received response for request with ID %d', request_id)
request = self._pop_request(request_id)
if inner_code == 0x2144ca19: # RPC Error
if inner_code == RpcError.CONSTRUCTOR_ID: # RPC Error
reader.seek(4)
if self.session.report_errors and request:
error = rpc_message_to_error(
@ -520,9 +525,6 @@ class MtProtoSender:
reader.read_int(), reader.tgread_string()
)
# Acknowledge that we received the error
self._send_acknowledge(request_id)
if request:
request.rpc_error = error
request.confirm_received.set()
@ -534,6 +536,7 @@ class MtProtoSender:
return True # All contents were read okay
elif request:
__log__.debug('Reading request response')
if inner_code == GzipPacked.CONSTRUCTOR_ID:
with BinaryReader(GzipPacked.read(reader)) as compressed_reader:
request.on_response(compressed_reader)
@ -570,21 +573,18 @@ class MtProtoSender:
)
return False
def _handle_gzip_packed(self, msg_id, sequence, reader, state):
async def _handle_gzip_packed(self, msg_id, sequence, reader, updates_handler):
"""
Handles a GzipPacked response.
:param msg_id: the ID of the message.
:param sequence: the sequence of the message.
:param reader: the reader containing the GzipPacked.
:param updates_handler: the handler to process Update and Updates objects.
:return: the result of processing the packed message.
"""
__log__.debug('Handling gzip packed data')
with BinaryReader(GzipPacked.read(reader)) as compressed_reader:
# We are reentering process_msg, which seemingly the same msg_id
# to the self._need_confirmation set. Remove it from there first
# to avoid any future conflicts (i.e. if we "ignore" messages
# that we are already aware of, see 1a91c02 and old 63dfb1e)
self._need_confirmation -= {msg_id}
return self._process_msg(msg_id, sequence, compressed_reader, state)
return await self._process_msg(msg_id, sequence, compressed_reader, updates_handler)
# endregion

View File

@ -67,6 +67,22 @@ class Session(ABC):
"""
raise NotImplementedError
@property
@abstractmethod
def user_id(self):
"""
Returns an ``user_id`` which the session related to.
"""
raise NotImplementedError
@user_id.setter
@abstractmethod
def user_id(self, value):
"""
Sets the ``user_id`` which the session related to.
"""
raise NotImplementedError
@abstractmethod
def get_update_state(self, entity_id):
"""
@ -94,7 +110,7 @@ class Session(ABC):
"""
@abstractmethod
def save(self):
async def save(self):
"""
Called whenever important properties change. It should
make persist the relevant session information to disk.
@ -102,7 +118,7 @@ class Session(ABC):
raise NotImplementedError
@abstractmethod
def delete(self):
async def delete(self):
"""
Called upon client.log_out(). Should delete the stored
information from disk since it's not valid anymore.
@ -125,7 +141,7 @@ class Session(ABC):
raise NotImplementedError
@abstractmethod
def get_input_entity(self, key):
async def get_input_entity(self, key):
"""
Turns the given key into an ``InputPeer`` (e.g. ``InputPeerUser``).
The library uses this method whenever an ``InputPeer`` is needed
@ -135,7 +151,7 @@ class Session(ABC):
raise NotImplementedError
@abstractmethod
def cache_file(self, md5_digest, file_size, instance):
async def cache_file(self, md5_digest, file_size, instance):
"""
Caches the given file information persistently, so that it
doesn't need to be re-uploaded in case the file is used again.
@ -146,7 +162,7 @@ class Session(ABC):
raise NotImplementedError
@abstractmethod
def get_file(self, md5_digest, file_size, cls):
async def get_file(self, md5_digest, file_size, cls):
"""
Returns an instance of ``cls`` if the ``md5_digest`` and ``file_size``
match an existing saved record. The class will either be an

View File

@ -32,6 +32,7 @@ class MemorySession(Session):
self._server_address = None
self._port = None
self._auth_key = None
self._user_id = None
self._files = {}
self._entities = set()
@ -58,6 +59,14 @@ class MemorySession(Session):
def auth_key(self, value):
self._auth_key = value
@property
def user_id(self):
return self._user_id
@user_id.setter
def user_id(self, value):
self._user_id = value
def get_update_state(self, entity_id):
return self._update_states.get(entity_id, None)
@ -67,10 +76,10 @@ class MemorySession(Session):
def close(self):
pass
def save(self):
async def save(self):
pass
def delete(self):
async def delete(self):
pass
def _entity_values_to_row(self, id, hash, username, phone, name):
@ -170,7 +179,7 @@ class MemorySession(Session):
except StopIteration:
pass
def get_input_entity(self, key):
async def get_input_entity(self, key):
try:
if key.SUBCLASS_OF_ID in (0xc91c90b6, 0xe669bf46, 0x40f202fd):
# hex(crc32(b'InputPeer', b'InputUser' and b'InputChannel'))
@ -215,14 +224,14 @@ class MemorySession(Session):
else:
raise ValueError('Could not find input entity with key ', key)
def cache_file(self, md5_digest, file_size, instance):
async def cache_file(self, md5_digest, file_size, instance):
if not isinstance(instance, (InputDocument, InputPhoto)):
raise TypeError('Cannot cache %s instance' % type(instance))
key = (md5_digest, file_size, _SentFileType.from_type(instance))
value = (instance.id, instance.access_hash)
self._files[key] = value
def get_file(self, md5_digest, file_size, cls):
async def get_file(self, md5_digest, file_size, cls):
key = (md5_digest, file_size, _SentFileType.from_type(cls))
try:
return cls(self._files[key])

View File

@ -4,7 +4,6 @@ import os
import sqlite3
from base64 import b64decode
from os.path import isfile as file_exists
from threading import Lock, RLock
from telethon.tl import types
from .memory import MemorySession, _SentFileType
@ -42,11 +41,6 @@ class SQLiteSession(MemorySession):
if not self.filename.endswith(EXTENSION):
self.filename += EXTENSION
# Cross-thread safety
self._seq_no_lock = Lock()
self._msg_id_lock = Lock()
self._db_lock = RLock()
# Migrating from .json -> SQL
entities = self._check_migrate_json()
@ -204,23 +198,22 @@ class SQLiteSession(MemorySession):
self._update_session_table()
def _update_session_table(self):
with self._db_lock:
c = self._cursor()
# While we can save multiple rows into the sessions table
# currently we only want to keep ONE as the tables don't
# tell us which auth_key's are usable and will work. Needs
# some more work before being able to save auth_key's for
# multiple DCs. Probably done differently.
c.execute('delete from sessions')
c.execute('insert or replace into sessions values (?,?,?,?)', (
self._dc_id,
self._server_address,
self._port,
self._auth_key.key if self._auth_key else b''
))
c.close()
c = self._cursor()
# While we can save multiple rows into the sessions table
# currently we only want to keep ONE as the tables don't
# tell us which auth_key's are usable and will work. Needs
# some more work before being able to save auth_key's for
# multiple DCs. Probably done differently.
c.execute('delete from sessions')
c.execute('insert or replace into sessions values (?,?,?,?)', (
self._dc_id,
self._server_address,
self._port,
self._auth_key.key if self._auth_key else b''
))
c.close()
def get_update_state(self, entity_id):
async def get_update_state(self, entity_id):
c = self._cursor()
row = c.execute('select pts, qts, date, seq from update_state '
'where id = ?', (entity_id,)).fetchone()
@ -230,37 +223,32 @@ class SQLiteSession(MemorySession):
date = datetime.datetime.utcfromtimestamp(date)
return types.updates.State(pts, qts, date, seq, unread_count=0)
def set_update_state(self, entity_id, state):
with self._db_lock:
c = self._cursor()
c.execute('insert or replace into update_state values (?,?,?,?,?)',
(entity_id, state.pts, state.qts,
state.date.timestamp(), state.seq))
c.close()
self.save()
async def set_update_state(self, entity_id, state):
c = self._cursor()
c.execute('insert or replace into update_state values (?,?,?,?,?)',
(entity_id, state.pts, state.qts,
state.date.timestamp(), state.seq))
c.close()
self.save()
def save(self):
async def save(self):
"""Saves the current session object as session_user_id.session"""
with self._db_lock:
self._conn.commit()
self._conn.commit()
def _cursor(self):
"""Asserts that the connection is open and returns a cursor"""
with self._db_lock:
if self._conn is None:
self._conn = sqlite3.connect(self.filename,
check_same_thread=False)
return self._conn.cursor()
if self._conn is None:
self._conn = sqlite3.connect(self.filename)
return self._conn.cursor()
def close(self):
"""Closes the connection unless we're working in-memory"""
if self.filename != ':memory:':
with self._db_lock:
if self._conn is not None:
self._conn.close()
self._conn = None
if self._conn is not None:
self._conn.close()
self._conn = None
def delete(self):
async def delete(self):
"""Deletes the current session file"""
if self.filename == ':memory:':
return True
@ -293,11 +281,10 @@ class SQLiteSession(MemorySession):
if not rows:
return
with self._db_lock:
self._cursor().executemany(
'insert or replace into entities values (?,?,?,?,?)', rows
)
self.save()
self._cursor().executemany(
'insert or replace into entities values (?,?,?,?,?)', rows
)
self.save()
def _fetchone_entity(self, query, args):
c = self._cursor()
@ -332,7 +319,7 @@ class SQLiteSession(MemorySession):
# File processing
def get_file(self, md5_digest, file_size, cls):
async def get_file(self, md5_digest, file_size, cls):
tuple_ = self._cursor().execute(
'select id, hash from sent_files '
'where md5_digest = ? and file_size = ? and type = ?',
@ -342,15 +329,14 @@ class SQLiteSession(MemorySession):
# Both allowed classes have (id, access_hash) as parameters
return cls(tuple_[0], tuple_[1])
def cache_file(self, md5_digest, file_size, instance):
async def cache_file(self, md5_digest, file_size, instance):
if not isinstance(instance, (InputDocument, InputPhoto)):
raise TypeError('Cannot cache %s instance' % type(instance))
with self._db_lock:
self._cursor().execute(
'insert or replace into sent_files values (?,?,?,?,?)', (
md5_digest, file_size,
_SentFileType.from_type(type(instance)).value,
instance.id, instance.access_hash
))
self.save()
self._cursor().execute(
'insert or replace into sent_files values (?,?,?,?,?)', (
md5_digest, file_size,
_SentFileType.from_type(type(instance)).value,
instance.id, instance.access_hash
))
self.save()

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -68,12 +68,12 @@ class Dialog:
self.draft = Draft(client, dialog.peer, dialog.draft)
def send_message(self, *args, **kwargs):
async def send_message(self, *args, **kwargs):
"""
Sends a message to this dialog. This is just a wrapper around
``client.send_message(dialog.input_entity, *args, **kwargs)``.
"""
return self._client.send_message(self.input_entity, *args, **kwargs)
return await self._client.send_message(self.input_entity, *args, **kwargs)
def to_dict(self):
return {

View File

@ -26,7 +26,7 @@ class Draft:
def __init__(self, client, peer, draft):
self._client = client
self._peer = peer
if not draft:
if not draft or not isinstance(draft, DraftMessage):
draft = DraftMessage('', None, None, None, None)
self._text = markdown.unparse(draft.message, draft.entities)
@ -46,18 +46,18 @@ class Draft:
return cls(client=client, peer=update.peer, draft=update.draft)
@property
def entity(self):
async def entity(self):
"""
The entity that belongs to this dialog (user, chat or channel).
"""
return self._client.get_entity(self._peer)
return await self._client.get_entity(self._peer)
@property
def input_entity(self):
async def input_entity(self):
"""
Input version of the entity.
"""
return self._client.get_input_entity(self._peer)
return await self._client.get_input_entity(self._peer)
@property
def text(self):
@ -82,8 +82,8 @@ class Draft:
"""
return not self._text
def set_message(self, text=None, reply_to=0, parse_mode='md',
link_preview=None):
async def set_message(self, text=None, reply_to=0, parse_mode='md',
link_preview=None):
"""
Changes the draft message on the Telegram servers. The changes are
reflected in this object.
@ -109,8 +109,9 @@ class Draft:
if link_preview is None:
link_preview = self.link_preview
raw_text, entities = self._client._parse_message_text(text, parse_mode)
result = self._client(SaveDraftRequest(
raw_text, entities = await self._client._parse_message_text(text,
parse_mode)
result = await self._client(SaveDraftRequest(
peer=self._peer,
message=raw_text,
no_webpage=not link_preview,
@ -127,22 +128,22 @@ class Draft:
return result
def send(self, clear=True, parse_mode='md'):
async def send(self, clear=True, parse_mode='md'):
"""
Sends the contents of this draft to the dialog. This is just a
wrapper around ``send_message(dialog.input_entity, *args, **kwargs)``.
"""
self._client.send_message(self._peer, self.text,
reply_to=self.reply_to_msg_id,
link_preview=self.link_preview,
parse_mode=parse_mode,
clear_draft=clear)
await self._client.send_message(self._peer, self.text,
reply_to=self.reply_to_msg_id,
link_preview=self.link_preview,
parse_mode=parse_mode,
clear_draft=clear)
def delete(self):
async def delete(self):
"""
Deletes this draft, and returns ``True`` on success.
"""
return self.set_message(text='')
return await self.set_message(text='')
def to_dict(self):
try:

View File

@ -1,6 +1,6 @@
import struct
from asyncio import Event
from datetime import datetime, date
from threading import Event
class TLObject:
@ -180,7 +180,7 @@ class TLObject:
return TLObject.pretty_format(self, indent=0)
# These should be overrode
def resolve(self, client, utils):
async def resolve(self, client, utils):
pass
def to_dict(self):
@ -192,3 +192,6 @@ class TLObject:
@classmethod
def from_reader(cls, reader):
return TLObject()
def __repr__(self):
return self.__str__()

View File

@ -1,152 +0,0 @@
import itertools
import logging
from datetime import datetime
from queue import Queue, Empty
from threading import RLock, Thread
from . import utils
from .tl import types as tl
__log__ = logging.getLogger(__name__)
class UpdateState:
"""
Used to hold the current state of processed updates.
To retrieve an update, :meth:`poll` should be called.
"""
WORKER_POLL_TIMEOUT = 5.0 # Avoid waiting forever on the workers
def __init__(self, workers=None):
"""
:param workers: This integer parameter has three possible cases:
workers is None: Updates will *not* be stored on self.
workers = 0: Another thread is responsible for calling self.poll()
workers > 0: 'workers' background threads will be spawned, any
any of them will invoke the self.handler.
"""
self._workers = workers
self._worker_threads = []
self.handler = None
self._updates_lock = RLock()
self._updates = Queue()
# https://core.telegram.org/api/updates
self._state = tl.updates.State(0, 0, datetime.now(), 0, 0)
def can_poll(self):
"""Returns True if a call to .poll() won't lock"""
return not self._updates.empty()
def poll(self, timeout=None):
"""
Polls an update or blocks until an update object is available.
If 'timeout is not None', it should be a floating point value,
and the method will 'return None' if waiting times out.
"""
try:
return self._updates.get(timeout=timeout)
except Empty:
return None
def get_workers(self):
return self._workers
def set_workers(self, n):
"""Changes the number of workers running.
If 'n is None', clears all pending updates from memory.
"""
if n is None:
self.stop_workers()
else:
self._workers = n
self.setup_workers()
workers = property(fget=get_workers, fset=set_workers)
def stop_workers(self):
"""
Waits for all the worker threads to stop.
"""
# Put dummy ``None`` objects so that they don't need to timeout.
n = self._workers
self._workers = None
if n:
with self._updates_lock:
for _ in range(n):
self._updates.put(None)
for t in self._worker_threads:
t.join()
self._worker_threads.clear()
self._workers = n
def setup_workers(self):
if self._worker_threads or not self._workers:
# There already are workers, or workers is None or 0. Do nothing.
return
for i in range(self._workers):
thread = Thread(
target=UpdateState._worker_loop,
name='UpdateWorker{}'.format(i),
daemon=True,
args=(self, i)
)
self._worker_threads.append(thread)
thread.start()
def _worker_loop(self, wid):
while self._workers is not None:
try:
update = self.poll(timeout=UpdateState.WORKER_POLL_TIMEOUT)
if update and self.handler:
self.handler(update)
except StopIteration:
break
except:
# We don't want to crash a worker thread due to any reason
__log__.exception('Unhandled exception on worker %d', wid)
def get_update_state(self, entity_id):
"""Gets the updates.State corresponding to the given entity or 0."""
return self._state
def process(self, update):
"""Processes an update object. This method is normally called by
the library itself.
"""
if self._workers is None:
return # No processing needs to be done if nobody's working
with self._updates_lock:
if isinstance(update, tl.updates.State):
__log__.debug('Saved new updates state')
self._state = update
return # Nothing else to be done
if hasattr(update, 'pts'):
self._state.pts = update.pts
# After running the script for over an hour and receiving over
# 1000 updates, the only duplicates received were users going
# online or offline. We can trust the server until new reports.
# This should only be used as read-only.
if isinstance(update, tl.UpdateShort):
update.update._entities = {}
self._updates.put(update.update)
# Expand "Updates" into "Update", and pass these to callbacks.
# Since .users and .chats have already been processed, we
# don't need to care about those either.
elif isinstance(update, (tl.Updates, tl.UpdatesCombined)):
entities = {utils.get_peer_id(x): x for x in
itertools.chain(update.users, update.chats)}
for u in update.updates:
u._entities = entities
self._updates.put(u)
# TODO Handle "tl.UpdatesTooLong"
else:
update._entities = {}
self._updates.put(update)

View File

@ -14,10 +14,10 @@ AUTO_GEN_NOTICE = \
AUTO_CASTS = {
'InputPeer': 'utils.get_input_peer(client.get_input_entity({}))',
'InputChannel': 'utils.get_input_channel(client.get_input_entity({}))',
'InputUser': 'utils.get_input_user(client.get_input_entity({}))',
'InputDialogPeer': 'utils.get_input_dialog(client.get_input_entity({}))',
'InputPeer': 'utils.get_input_peer(await client.get_input_entity({}))',
'InputChannel': 'utils.get_input_channel(await client.get_input_entity({}))',
'InputUser': 'utils.get_input_user(await client.get_input_entity({}))',
'InputDialogPeer': 'utils.get_input_dialog(await client.get_input_entity({}))',
'InputMedia': 'utils.get_input_media({})',
'InputPhoto': 'utils.get_input_photo({})',
'InputMessage': 'utils.get_input_message({})'
@ -234,19 +234,30 @@ def _write_class_init(tlobject, type_constructors, builder):
def _write_resolve(tlobject, builder):
if any(arg.type in AUTO_CASTS for arg in tlobject.real_args):
builder.writeln('def resolve(self, client, utils):')
builder.writeln('async def resolve(self, client, utils):')
for arg in tlobject.real_args:
ac = AUTO_CASTS.get(arg.type, None)
if not ac:
continue
if arg.is_vector:
builder.write('self.{0} = [{1} for _x in self.{0}]',
arg.name, ac.format('_x'))
else:
builder.write('self.{} = {}', arg.name,
if arg.is_flag:
builder.writeln('if self.{}:', arg.name)
if not arg.is_vector:
builder.writeln('self.{} = {}', arg.name,
ac.format('self.' + arg.name))
builder.writeln(' if self.{} else None'.format(arg.name)
if arg.is_flag else '')
else:
# Since the auto-cast might have await, we can't use that in
# Python 3.5's list comprehensions. Build the list manually.
builder.writeln('_tmp = []')
builder.writeln('for _x in self.{}:', arg.name)
builder.writeln('_tmp.append({})', ac.format('_x'))
builder.end_block()
builder.writeln('self.{} = _tmp', arg.name)
if arg.is_flag:
builder.end_block()
builder.end_block()

View File

@ -17,33 +17,33 @@ class HigherLevelTests(unittest.TestCase):
raise ValueError('Please fill in both your api_id and api_hash.')
@unittest.skip("you can't seriously trash random mobile numbers like that :)")
def test_cdn_download(self):
async def test_cdn_download(self):
client = TelegramClient(None, api_id, api_hash)
client.session.set_dc(0, '149.154.167.40', 80)
self.assertTrue(client.connect())
self.assertTrue(await client.connect())
try:
phone = '+999662' + str(randint(0, 9999)).zfill(4)
client.send_code_request(phone)
client.sign_up('22222', 'Test', 'DC')
await client.send_code_request(phone)
await client.sign_up('22222', 'Test', 'DC')
me = client.get_me()
me = await client.get_me()
data = os.urandom(2 ** 17)
client.send_file(
await client.send_file(
me, data,
progress_callback=lambda c, t:
print('test_cdn_download:uploading {:.2%}...'.format(c/t))
)
msg = client.get_messages(me)[1][0]
msg = (await client.get_messages(me))[0]
out = BytesIO()
client.download_media(msg, out)
await client.download_media(msg, out)
self.assertEqual(sha256(data).digest(), sha256(out.getvalue()).digest())
out = BytesIO()
client.download_media(msg, out) # Won't redirect
await client.download_media(msg, out) # Won't redirect
self.assertEqual(sha256(data).digest(), sha256(out.getvalue()).digest())
client.log_out()
await client.log_out()
finally:
client.disconnect()

View File

@ -25,20 +25,20 @@ def run_server_echo_thread(port):
class NetworkTests(unittest.TestCase):
@unittest.skip("test_tcp_client needs fix")
def test_tcp_client(self):
async def test_tcp_client(self):
port = random.randint(50000, 60000) # Arbitrary non-privileged port
run_server_echo_thread(port)
msg = b'Unit testing...'
client = TcpClient()
client.connect('localhost', port)
client.write(msg)
self.assertEqual(msg, client.read(15),
await client.connect('localhost', port)
await client.write(msg)
self.assertEqual(msg, await client.read(15),
msg='Read message does not equal sent message')
client.close()
@unittest.skip("Some parameters changed, so IP doesn't go there anymore.")
def test_authenticator(self):
async def test_authenticator(self):
transport = Connection('149.154.167.91', 443)
self.assertTrue(authenticator.do_authentication(transport))
self.assertTrue(await authenticator.do_authentication(transport))
transport.close()