Add basic updates processing to ignore updates with lower .pts

This commit is contained in:
Lonami Exo 2017-09-07 20:17:40 +02:00
parent d237375208
commit b8e881b6b6
3 changed files with 34 additions and 9 deletions

View File

@ -1,8 +1,8 @@
import os import os
import threading
from datetime import datetime, timedelta from datetime import datetime, timedelta
from mimetypes import guess_type from mimetypes import guess_type
from threading import RLock, Thread from threading import RLock, Thread
import threading
from . import TelegramBareClient from . import TelegramBareClient
from . import helpers as utils from . import helpers as utils
@ -27,6 +27,9 @@ from .tl.functions.messages import (
GetDialogsRequest, GetHistoryRequest, ReadHistoryRequest, SendMediaRequest, GetDialogsRequest, GetHistoryRequest, ReadHistoryRequest, SendMediaRequest,
SendMessageRequest SendMessageRequest
) )
from .tl.functions.updates import (
GetStateRequest
)
from .tl.functions.users import ( from .tl.functions.users import (
GetUsersRequest GetUsersRequest
) )
@ -155,6 +158,8 @@ class TelegramClient(TelegramBareClient):
target=self._recv_thread_impl target=self._recv_thread_impl
) )
self._recv_thread.start() self._recv_thread.start()
if self.updates.enabled:
self.sync_updates()
return ok return ok
@ -904,6 +909,13 @@ class TelegramClient(TelegramBareClient):
# region Updates handling # region Updates handling
def sync_updates(self):
"""Synchronizes self.updates to their initial state. Will be
called automatically on connection if self.updates.enabled = True,
otherwise it should be called manually after enabling updates.
"""
self.updates.process(self(GetStateRequest()))
def add_update_handler(self, handler): def add_update_handler(self, handler):
"""Adds an update handler (a function which takes a TLObject, """Adds an update handler (a function which takes a TLObject,
an update, as its parameter) and listens for updates""" an update, as its parameter) and listens for updates"""

View File

@ -1,5 +1,8 @@
from threading import Lock, Event
from collections import deque from collections import deque
from datetime import datetime
from threading import RLock, Event
from .tl import types as tl
class UpdateState: class UpdateState:
@ -9,10 +12,13 @@ class UpdateState:
def __init__(self, enabled): def __init__(self, enabled):
self.enabled = enabled self.enabled = enabled
self.handlers = [] self.handlers = []
self._updates_lock = Lock() self._updates_lock = RLock()
self._updates_available = Event() self._updates_available = Event()
self._updates = deque() self._updates = deque()
# https://core.telegram.org/api/updates
self._state = tl.updates.State(0, 0, datetime.now(), 0, 0)
def has_any(self): def has_any(self):
"""Returns True if a call to .pop_update() won't lock""" """Returns True if a call to .pop_update() won't lock"""
return self._updates_available.is_set() return self._updates_available.is_set()
@ -31,11 +37,16 @@ class UpdateState:
"""Processes an update object. This method is normally called by """Processes an update object. This method is normally called by
the library itself. the library itself.
""" """
for handler in self.handlers: if not self.enabled:
handler(update) return
with self._updates_lock:
if isinstance(update, tl.updates.State):
self._state = update
elif not hasattr(update, 'pts') or update.pts > self._state.pts:
self._state.pts = getattr(update, 'pts', self._state.pts)
for handler in self.handlers:
handler(update)
if self.enabled:
with self._updates_lock:
self._updates.append(update) self._updates.append(update)
self._updates_available.set() self._updates_available.set()

View File

@ -51,7 +51,9 @@ class InteractiveTelegramClient(TelegramClient):
print('Initializing interactive example...') print('Initializing interactive example...')
super().__init__( super().__init__(
session_user_id, api_id, api_hash, session_user_id, api_id, api_hash,
connection_mode=ConnectionMode.TCP_ABRIDGED, proxy=proxy connection_mode=ConnectionMode.TCP_ABRIDGED,
enable_updates=True,
proxy=proxy
) )
# Store all the found media in memory here, # Store all the found media in memory here,