Merge branch 'master' into sync

This commit is contained in:
Lonami Exo 2018-06-29 11:09:28 +02:00
commit e94ad7ad77
7 changed files with 125 additions and 46 deletions

View File

@ -193,7 +193,7 @@ class AuthMethods(MessageParseMethods, UserMethods):
return self
def sign_in(
self, phone=None, *, code=None, password=None,
self, phone=None, code=None, *, password=None,
bot_token=None, phone_code_hash=None):
"""
Starts or completes the sign in process with the given phone number

View File

@ -22,7 +22,7 @@ class MessageMethods(UploadMethods, MessageParseMethods):
self, entity, limit=None, *, offset_date=None, offset_id=0,
max_id=0, min_id=0, add_offset=0, search=None, filter=None,
from_user=None, batch_size=100, wait_time=None, ids=None,
_total=None):
reverse=False, _total=None):
"""
Iterator over the message history for the specified entity.
@ -91,6 +91,15 @@ class MessageMethods(UploadMethods, MessageParseMethods):
will appear in its place, so that zipping the list of IDs
with the messages can match one-to-one.
reverse (`bool`, optional):
If set to ``True``, the messages will be returned in reverse
order (from oldest to newest, instead of the default newest
to oldest). This also means that the meaning of `offset_id`
and `offset_date` parameters is reversed, although they will
still be exclusive. `min_id` becomes equivalent to `offset_id`
instead of being `max_id` as well since messages are returned
in ascending order.
_total (`list`, optional):
A single-item list to pass the total parameter by reference.
@ -112,6 +121,8 @@ class MessageMethods(UploadMethods, MessageParseMethods):
if ids:
if not utils.is_list_like(ids):
ids = (ids,)
if reverse:
ids = list(reversed(ids))
for x in self._iter_ids(entity, ids, total=_total):
yield (x)
return
@ -121,10 +132,26 @@ class MessageMethods(UploadMethods, MessageParseMethods):
#
# We can emulate their behaviour locally by setting offset = max_id
# and simply stopping once we hit a message with ID <= min_id.
offset_id = max(offset_id, max_id)
if offset_id and min_id:
if offset_id - min_id <= 1:
return
if reverse:
offset_id = max(offset_id, min_id)
if offset_id and max_id:
if max_id - offset_id <= 1:
print('suck lol')
return
if not max_id:
max_id = float('inf')
else:
offset_id = max(offset_id, max_id)
if offset_id and min_id:
if offset_id - min_id <= 1:
return
if reverse:
if offset_id:
offset_id += 1
else:
offset_id = 1
from_id = None
limit = float('inf') if limit is None else int(limit)
@ -139,7 +166,7 @@ class MessageMethods(UploadMethods, MessageParseMethods):
max_date=offset_date,
offset_id=offset_id,
add_offset=add_offset,
limit=1,
limit=0, # Search actually returns 0 items if we ask it to
max_id=0,
min_id=0,
hash=0,
@ -182,12 +209,24 @@ class MessageMethods(UploadMethods, MessageParseMethods):
wait_time = 1 if limit > 3000 else 0
have = 0
last_id = float('inf')
batch_size = min(max(batch_size, 1), 100)
last_id = 0 if reverse else float('inf')
# Telegram has a hard limit of 100.
# We don't need to fetch 100 if the limit is less.
batch_size = min(max(batch_size, 1), min(100, limit))
# Use a negative offset to work around reversing the results
if reverse:
request.add_offset -= batch_size
while have < limit:
start = time.time()
# Telegram has a hard limit of 100
request.limit = min(limit - have, batch_size)
if reverse and request.limit != batch_size:
# Last batch needs special care if we're on reverse
request.add_offset += batch_size - request.limit + 1
r = self(request)
if _total:
_total[0] = getattr(r, 'count', len(r.messages))
@ -195,19 +234,23 @@ class MessageMethods(UploadMethods, MessageParseMethods):
entities = {utils.get_peer_id(x): x
for x in itertools.chain(r.users, r.chats)}
for message in r.messages:
if message.id <= min_id:
return
messages = reversed(r.messages) if reverse else r.messages
for message in messages:
if (isinstance(message, types.MessageEmpty)
or message.id >= last_id
or (from_id and message.from_id != from_id)):
or from_id and message.from_id != from_id):
continue
if reverse:
if message.id <= last_id or message.id >= max_id:
return
else:
if message.id >= last_id or message.id <= min_id:
return
# There has been reports that on bad connections this method
# was returning duplicated IDs sometimes. Using ``last_id``
# is an attempt to avoid these duplicates, since the message
# IDs are returned in descending order.
# IDs are returned in descending order (or asc if reverse).
last_id = message.id
yield (custom.Message(self, message, entities, entity))
@ -216,11 +259,11 @@ class MessageMethods(UploadMethods, MessageParseMethods):
if len(r.messages) < request.limit:
break
request.offset_id = r.messages[-1].id
# Find the first message that's not empty (in some rare cases
# it can happen that the last message is :tl:`MessageEmpty`)
last_message = None
for m in reversed(r.messages):
messages = r.messages if reverse else reversed(r.messages)
for m in messages:
if not isinstance(m, types.MessageEmpty):
last_message = m
break
@ -234,11 +277,16 @@ class MessageMethods(UploadMethods, MessageParseMethods):
# should just give up since there won't be any new Message.
break
else:
request.offset_id = last_message.id
if isinstance(request, functions.messages.GetHistoryRequest):
request.offset_date = last_message.date
else:
request.max_date = last_message.date
if reverse:
# We want to skip the one we already have
request.add_offset -= 1
time.sleep(
max(wait_time - (time.time() - start), 0))

View File

@ -1,6 +1,7 @@
import abc
import logging
import platform
import queue
import sys
import threading
import time
@ -91,6 +92,17 @@ class TelegramBaseClient(abc.ABC):
Whether reconnection should be retried `connection_retries`
times automatically if Telegram disconnects us or not.
sequential_updates (`bool`, optional):
By default every incoming update will create a new task, so
you can handle several updates in parallel. Some scripts need
the order in which updates are processed to be sequential, and
this setting allows them to do so.
If set to ``True``, incoming updates will be put in a queue
and processed sequentially. This means your event handlers
should *not* perform long-running operations since new
updates are put inside of an unbounded queue.
flood_sleep_threshold (`int` | `float`, optional):
The threshold below which the library should automatically
sleep on flood wait errors (inclusive). For instance, if a
@ -138,6 +150,7 @@ class TelegramBaseClient(abc.ABC):
request_retries=5,
connection_retries=5,
auto_reconnect=True,
sequential_updates=False,
flood_sleep_threshold=60,
device_model=None,
system_version=None,
@ -226,6 +239,13 @@ class TelegramBaseClient(abc.ABC):
self._last_request = time.time()
self._channel_pts = {}
if sequential_updates:
self._updates_queue = queue.Queue()
self._dispatching_updates_queue = threading.Event()
else:
self._updates_queue = None
self._dispatching_updates_queue = None
# Start with invalid state (-1) so we can have somewhere to store
# the state, but also be able to determine if we are authorized.
self._state = types.updates.State(-1, 0, datetime.now(), 0, -1)
@ -279,7 +299,8 @@ class TelegramBaseClient(abc.ABC):
"""
Returns ``True`` if the user has connected.
"""
return self._sender.is_connected()
sender = getattr(self, '_sender', None)
return sender and sender.is_connected()
def disconnect(self):
"""

View File

@ -168,7 +168,13 @@ class UpdateMethods(UserMethods):
self._handle_update(update.update)
else:
update._entities = getattr(update, '_entities', {})
syncio.create_task(self._dispatch_update, update)
if self._updates_queue is None:
syncio.create_task(self._dispatch_update, update)
else:
self._updates_queue.put_nowait(update)
if not self._dispatching_updates_queue.is_set():
self._dispatching_updates_queue.set()
syncio.create_task(self._dispatch_queue_updates)
need_diff = False
if hasattr(update, 'pts'):
@ -217,6 +223,12 @@ class UpdateMethods(UserMethods):
self(functions.updates.GetStateRequest())
def _dispatch_queue_updates(self):
while not self._updates_queue.empty():
self._dispatch_update(self._updates_queue.get_nowait())
self._dispatching_updates_queue.clear()
def _dispatch_update(self, update):
if self._events_pending_resolve:
if self._event_resolve_lock.locked():

View File

@ -5,15 +5,12 @@ since they seem to count as two characters and it's a bit strange.
"""
import re
from ..helpers import add_surrogate, del_surrogate
from ..tl import TLObject
from ..tl.types import (
MessageEntityBold, MessageEntityItalic, MessageEntityCode,
MessageEntityPre, MessageEntityTextUrl
)
from ..utils import (
add_surrogate as _add_surrogate,
del_surrogate as _del_surrogate
)
DEFAULT_DELIMITERS = {
'**': MessageEntityBold,
@ -57,7 +54,7 @@ def parse(message, delimiters=None, url_re=None):
# Work on byte level with the utf-16le encoding to get the offsets right.
# The offset will just be half the index we're at.
message = _add_surrogate(message)
message = add_surrogate(message)
while i < len(message):
if url_re and current is None:
# If we're not inside a previous match since Telegram doesn't allow
@ -73,7 +70,7 @@ def parse(message, delimiters=None, url_re=None):
result.append(MessageEntityTextUrl(
offset=url_match.start(), length=len(url_match.group(1)),
url=_del_surrogate(url_match.group(2))
url=del_surrogate(url_match.group(2))
))
i += len(url_match.group(1))
# Next loop iteration, don't check delimiters, since
@ -128,7 +125,7 @@ def parse(message, delimiters=None, url_re=None):
+ message[current.offset:]
)
return _del_surrogate(message), result
return del_surrogate(message), result
def unparse(text, entities, delimiters=None, url_fmt=None):
@ -156,7 +153,7 @@ def unparse(text, entities, delimiters=None, url_fmt=None):
else:
entities = tuple(sorted(entities, key=lambda e: e.offset, reverse=True))
text = _add_surrogate(text)
text = add_surrogate(text)
delimiters = {v: k for k, v in delimiters.items()}
for entity in entities:
s = entity.offset
@ -167,8 +164,8 @@ def unparse(text, entities, delimiters=None, url_fmt=None):
elif isinstance(entity, MessageEntityTextUrl) and url_fmt:
text = (
text[:s] +
_add_surrogate(url_fmt.format(text[s:e], entity.url)) +
add_surrogate(url_fmt.format(text[s:e], entity.url)) +
text[e:]
)
return _del_surrogate(text)
return del_surrogate(text)

View File

@ -1,5 +1,6 @@
"""Various helpers not related to the Telegram API itself"""
import os
import struct
from hashlib import sha1, sha256
@ -17,6 +18,20 @@ def ensure_parent_dir_exists(file_path):
if parent:
os.makedirs(parent, exist_ok=True)
def add_surrogate(text):
return ''.join(
# SMP -> Surrogate Pairs (Telegram offsets are calculated with these).
# See https://en.wikipedia.org/wiki/Plane_(Unicode)#Overview for more.
''.join(chr(y) for y in struct.unpack('<HH', x.encode('utf-16le')))
if (0x10000 <= ord(x) <= 0x10FFFF) else x for x in text
)
def del_surrogate(text):
return text.encode('utf-16', 'surrogatepass').decode('utf-16')
# endregion
# region Cryptographic related utils

View File

@ -7,13 +7,12 @@ import math
import mimetypes
import os
import re
import struct
import types
from collections import UserList
from mimetypes import guess_extension
from .extensions import markdown, html
from .tl import TLObject
from .helpers import add_surrogate, del_surrogate
from .tl.types import (
Channel, ChannelForbidden, Chat, ChatEmpty, ChatForbidden, ChatFull,
ChatPhoto, InputPeerChannel, InputPeerChat, InputPeerUser, InputPeerEmpty,
@ -586,19 +585,6 @@ def _fix_peer_id(peer_id):
return int(peer_id)
def add_surrogate(text):
return ''.join(
# SMP -> Surrogate Pairs (Telegram offsets are calculated with these).
# See https://en.wikipedia.org/wiki/Plane_(Unicode)#Overview for more.
''.join(chr(y) for y in struct.unpack('<HH', x.encode('utf-16le')))
if (0x10000 <= ord(x) <= 0x10FFFF) else x for x in text
)
def del_surrogate(text):
return text.encode('utf-16', 'surrogatepass').decode('utf-16')
def get_inner_text(text, entities):
"""
Gets the inner text that's surrounded by the given entities.