diff --git a/telethon/_updates/__init__.py b/telethon/_updates/__init__.py new file mode 100644 index 00000000..7da2c047 --- /dev/null +++ b/telethon/_updates/__init__.py @@ -0,0 +1,2 @@ +from .entitycache import EntityCache +from .messagebox import MessageBox, GapError diff --git a/telethon/_updates/entitycache.py b/telethon/_updates/entitycache.py new file mode 100644 index 00000000..e0e9888a --- /dev/null +++ b/telethon/_updates/entitycache.py @@ -0,0 +1,54 @@ +from .session import EntityType, Entity + + +_sentinel = object() + + +class EntityCache: + def __init__( + self, + hash_map: dict = _sentinel, + self_id: int = None, + self_bot: bool = False + ): + self.hash_map = {} if hash_map is _sentinel else hash_map + self.self_id = self_id + self.self_bot = self_bot + + def set_self_user(self, id, bot): + self.self_id = id + self.self_bot = bot + + def get(self, id): + try: + hash, ty = self.hash_map[id] + return Entity(ty, id, hash) + except KeyError: + return None + + def extend(self, users, chats): + # See https://core.telegram.org/api/min for "issues" with "min constructors". + self.hash_map.update( + (u.id, ( + u.access_hash, + EntityType.BOT if u.bot else EntityType.USER, + )) + for u in users + if getattr(u, 'access_hash', None) and not u.min + ) + self.hash_map.update( + (c.id, ( + c.access_hash, + EntityType.MEGAGROUP if c.megagroup else ( + EntityType.GIGAGROUP if getattr(c, 'gigagroup', None) else EntityType.CHANNEL + ), + )) + for c in chats + if getattr(c, 'access_hash', None) and not getattr(c, 'min', None) + ) + + def get_all_entities(self): + return [Entity(ty, id, hash) for id, (hash, ty) in self.hash_map.items()] + + def put(self, entity): + self.hash_map[entity.id] = (entity.hash, entity.ty) diff --git a/telethon/_updates/messagebox.py b/telethon/_updates/messagebox.py new file mode 100644 index 00000000..8ade5b18 --- /dev/null +++ b/telethon/_updates/messagebox.py @@ -0,0 +1,607 @@ +""" +This module deals with correct handling of updates, including gaps, and knowing when the code +should "get difference" (the set of updates that the client should know by now minus the set +of updates that it actually knows). + +Each chat has its own [`Entry`] in the [`MessageBox`] (this `struct` is the "entry point"). +At any given time, the message box may be either getting difference for them (entry is in +[`MessageBox::getting_diff_for`]) or not. If not getting difference, a possible gap may be +found for the updates (entry is in [`MessageBox::possible_gaps`]). Otherwise, the entry is +on its happy path. + +Gaps are cleared when they are either resolved on their own (by waiting for a short time) +or because we got the difference for the corresponding entry. + +While there are entries for which their difference must be fetched, +[`MessageBox::check_deadlines`] will always return [`Instant::now`], since "now" is the time +to get the difference. +""" +import asyncio +import datetime +import time +from .session import SessionState, ChannelState +from ..tl import types as tl, functions as fn + + +# Telegram sends `seq` equal to `0` when "it doesn't matter", so we use that value too. +NO_SEQ = 0 + +# See https://core.telegram.org/method/updates.getChannelDifference. +BOT_CHANNEL_DIFF_LIMIT = 100000 +USER_CHANNEL_DIFF_LIMIT = 100 + +# > It may be useful to wait up to 0.5 seconds +POSSIBLE_GAP_TIMEOUT = 0.5 + +# After how long without updates the client will "timeout". +# +# When this timeout occurs, the client will attempt to fetch updates by itself, ignoring all the +# updates that arrive in the meantime. After all updates are fetched when this happens, the +# client will resume normal operation, and the timeout will reset. +# +# Documentation recommends 15 minutes without updates (https://core.telegram.org/api/updates). +NO_UPDATES_TIMEOUT = 15 * 60 + +# Entry "enum". +# Account-wide `pts` includes private conversations (one-to-one) and small group chats. +ENTRY_ACCOUNT = object() +# Account-wide `qts` includes only "secret" one-to-one chats. +ENTRY_SECRET = object() +# Integers will be Channel-specific `pts`, and includes "megagroup", "broadcast" and "supergroup" channels. + +_sentinel = object() + +def next_updates_deadline(): + return asyncio.get_running_loop().time() + NO_UPDATES_TIMEOUT + + +class GapError(ValueError): + pass + + +# Represents the information needed to correctly handle a specific `tl::enums::Update`. +class PtsInfo: + __slots__ = ('pts', 'pts_count', 'entry') + + def __init__( + self, + pts: int, + pts_count: int, + entry: object + ): + self.pts = pts + self.pts_count = pts_count + self.entry = entry + + @classmethod + def from_update(cls, update): + pts = getattr(update, 'pts', None) + if pts: + pts_count = getattr(update, 'pts_count', None) or 0 + try: + entry = update.message.peer_id.channel_id + except AttributeError: + entry = getattr(update, 'channel_id', None) or ENTRY_ACCOUNT + return cls(pts=pts, pts_count=pts_count, entry=entry) + + qts = getattr(update, 'qts', None) + if qts: + pts_count = 1 if isinstance(update, _tl.UpdateNewEncryptedMessage) else 0 + return cls(pts=qts, pts_count=pts_count, entry=ENTRY_SECRET) + + return None + + +# The state of a particular entry in the message box. +class State: + __slots__ = ('pts', 'deadline') + + def __init__( + self, + # Current local persistent timestamp. + pts: int, + # Next instant when we would get the update difference if no updates arrived before then. + deadline: float + ): + self.pts = pts + self.deadline = deadline + + +# > ### Recovering gaps +# > […] Manually obtaining updates is also required in the following situations: +# > • Loss of sync: a gap was found in `seq` / `pts` / `qts` (as described above). +# > It may be useful to wait up to 0.5 seconds in this situation and abort the sync in case a new update +# > arrives, that fills the gap. +# +# This is really easy to trigger by spamming messages in a channel (with as little as 3 members works), because +# the updates produced by the RPC request take a while to arrive (whereas the read update comes faster alone). +class PossibleGap: + __slots__ = ('deadline', 'updates') + + def __init__( + self, + deadline: float, + # Pending updates (those with a larger PTS, producing the gap which may later be filled). + updates: list # of updates + ): + self.deadline = deadline + self.updates = updates + + +# Represents a "message box" (event `pts` for a specific entry). +# +# See https://core.telegram.org/api/updates#message-related-event-sequences. +class MessageBox: + __slots__ = ('map', 'date', 'seq', 'next_deadline', 'possible_gaps', 'getting_diff_for', 'reset_deadlines_for') + + def __init__( + self, + # Map each entry to their current state. + map: dict = _sentinel, # entry -> state + + # Additional fields beyond PTS needed by `ENTRY_ACCOUNT`. + date: datetime.datetime = datetime.datetime(*time.gmtime(0)[:6]).replace(tzinfo=datetime.timezone.utc), + seq: int = NO_SEQ, + + # Holds the entry with the closest deadline (optimization to avoid recalculating the minimum deadline). + next_deadline: object = None, # entry + + # Which entries have a gap and may soon trigger a need to get difference. + # + # If a gap is found, stores the required information to resolve it (when should it timeout and what updates + # should be held in case the gap is resolved on its own). + # + # Not stored directly in `map` as an optimization (else we would need another way of knowing which entries have + # a gap in them). + possible_gaps: dict = _sentinel, # entry -> possiblegap + + # For which entries are we currently getting difference. + getting_diff_for: set = _sentinel, # entry + + # Temporarily stores which entries should have their update deadline reset. + # Stored in the message box in order to reuse the allocation. + reset_deadlines_for: set = _sentinel # entry + ): + self.map = {} if map is _sentinel else map + self.date = date + self.seq = seq + self.next_deadline = next_deadline + self.possible_gaps = {} if possible_gaps is _sentinel else possible_gaps + self.getting_diff_for = set() if getting_diff_for is _sentinel else getting_diff_for + self.reset_deadlines_for = set() if reset_deadlines_for is _sentinel else reset_deadlines_for + + # region Creation, querying, and setting base state. + + def load(self, session_state, channel_states): + """ + Create a [`MessageBox`] from a previously known update state. + """ + deadline = next_updates_deadline() + + self.map.clear() + if session_state.pts != NO_SEQ: + self.map[ENTRY_ACCOUNT] = State(pts=session_state.pts, deadline=deadline) + if session_state.qts != NO_SEQ: + self.map[ENTRY_SECRET] = State(pts=session_state.qts, deadline=deadline) + self.map.update((s.channel_id, State(pts=s.pts, deadline=deadline)) for s in channel_states) + + self.date = datetime.datetime.fromtimestamp(session_state.date).replace(tzinfo=datetime.timezone.utc) + self.seq = session_state.seq + self.next_deadline = ENTRY_ACCOUNT + + def session_state(self): + """ + Return the current state. + + This should be used for persisting the state. + """ + return dict( + pts=self.map[ENTRY_ACCOUNT].pts if ENTRY_ACCOUNT in self.map else NO_SEQ, + qts=self.map[ENTRY_SECRET].pts if ENTRY_SECRET in self.map else NO_SEQ, + date=int(self.date.timestamp()), + seq=self.seq, + ), {id: state.pts for id, state in self.map.items() if isinstance(id, int)} + + def is_empty(self) -> bool: + """ + Return true if the message box is empty and has no state yet. + """ + return ENTRY_ACCOUNT not in self.map + + def check_deadlines(self): + """ + Return the next deadline when receiving updates should timeout. + + If a deadline expired, the corresponding entries will be marked as needing to get its difference. + While there are entries pending of getting their difference, this method returns the current instant. + """ + now = asyncio.get_running_loop().time() + + if self.getting_diff_for: + return now + + deadline = next_updates_deadline() + + # Most of the time there will be zero or one gap in flight so finding the minimum is cheap. + if self.possible_gaps: + deadline = min(deadline, *(gap.deadline for gap in self.possible_gaps.values())) + elif self.next_deadline in self.map: + deadline = min(deadline, self.map[self.next_deadline].deadline) + + if now > deadline: + # Check all expired entries and add them to the list that needs getting difference. + self.getting_diff_for.update(entry for entry, gap in self.possible_gaps.items() if now > gap.deadline) + self.getting_diff_for.update(entry for entry, state in self.map.items() if now > state.deadline) + + # When extending `getting_diff_for`, it's important to have the moral equivalent of + # `begin_get_diff` (that is, clear possible gaps if we're now getting difference). + for entry in self.getting_diff_for: + self.possible_gaps.pop(entry, None) + + return deadline + + # Reset the deadline for the periods without updates for a given entry. + # + # It also updates the next deadline time to reflect the new closest deadline. + def reset_deadline(self, entry, deadline): + if entry in self.map: + self.map[entry].deadline = deadline + # TODO figure out why not in map may happen + + if self.next_deadline == entry: + # If the updated deadline was the closest one, recalculate the new minimum. + self.next_deadline = min(self.map.items(), key=lambda entry_state: entry_state[1].deadline)[0] + elif self.next_deadline in self.map and deadline < self.map[self.next_deadline].deadline: + # If the updated deadline is smaller than the next deadline, change the next deadline to be the new one. + self.next_deadline = entry + # else an unrelated deadline was updated, so the closest one remains unchanged. + + # Convenience to reset a channel's deadline, with optional timeout. + def reset_channel_deadline(self, channel_id, timeout): + self.reset_deadline(channel_id, asyncio.get_running_loop().time() + (timeout or NO_UPDATES_TIMEOUT)) + + # Reset all the deadlines in `reset_deadlines_for` and then empty the set. + def apply_deadlines_reset(self): + next_deadline = next_updates_deadline() + + reset_deadlines_for = self.reset_deadlines_for + self.reset_deadlines_for = set() # "move" the set to avoid self.reset_deadline() from touching it during iter + + for entry in reset_deadlines_for: + self.reset_deadline(entry, next_deadline) + + reset_deadlines_for.clear() # reuse allocation, the other empty set was a temporary dummy value + self.reset_deadlines_for = reset_deadlines_for + + # Sets the update state. + # + # Should be called right after login if [`MessageBox::new`] was used, otherwise undesirable + # updates will be fetched. + def set_state(self, state): + deadline = next_updates_deadline() + + if state.pts != NO_SEQ: + self.map[ENTRY_ACCOUNT] = State(pts=state.pts, deadline=deadline) + else: + self.map.pop(ENTRY_ACCOUNT, None) + + if state.qts != NO_SEQ: + self.map[ENTRY_SECRET] = State(pts=state.qts, deadline=deadline) + else: + self.map.pop(ENTRY_SECRET, None) + + self.date = state.date + self.seq = state.seq + + # Like [`MessageBox::set_state`], but for channels. Useful when getting dialogs. + # + # The update state will only be updated if no entry was known previously. + def try_set_channel_state(self, id, pts): + if id not in self.map: + self.map[id] = State(pts=pts, deadline=next_updates_deadline()) + + # Begin getting difference for the given entry. + # + # Clears any previous gaps. + def begin_get_diff(self, entry): + self.getting_diff_for.add(entry) + self.possible_gaps.pop(entry, None) + + # Finish getting difference for the given entry. + # + # It also resets the deadline. + def end_get_diff(self, entry): + try: + self.getting_diff_for.remove(entry) + except KeyError: + pass + self.reset_deadline(entry, next_updates_deadline()) + assert entry not in self.possible_gaps, "gaps shouldn't be created while getting difference" + + # endregion Creation, querying, and setting base state. + + # region "Normal" updates flow (processing and detection of gaps). + + # Process an update and return what should be done with it. + # + # Updates corresponding to entries for which their difference is currently being fetched + # will be ignored. While according to the [updates' documentation]: + # + # > Implementations [have] to postpone updates received via the socket while + # > filling gaps in the event and `Update` sequences, as well as avoid filling + # > gaps in the same sequence. + # + # In practice, these updates should have also been retrieved through getting difference. + # + # [updates documentation] https://core.telegram.org/api/updates + def process_updates( + self, + updates, + chat_hashes, + result, # out list of updates; returns list of user, chat, or raise if gap + ): + date = getattr(updates, 'date', None) + if date is None: + # updatesTooLong is the only one with no date (we treat it as a gap) + raise GapError + + seq = getattr(updates, 'seq', None) or NO_SEQ + seq_start = getattr(updates, 'seq_start', None) or seq + users = getattr(updates, 'users', None) or [] + chats = getattr(updates, 'chats', None) or [] + updates = getattr(updates, 'updates', None) or [updates] + + # > For all the other [not `updates` or `updatesCombined`] `Updates` type constructors + # > there is no need to check `seq` or change a local state. + if seq_start != NO_SEQ: + if self.seq + 1 > seq_start: + # Skipping updates that were already handled + return (users, chats) + elif self.seq + 1 < seq_start: + # Gap detected + self.begin_get_diff(ENTRY_ACCOUNT) + raise GapError + # else apply + + self.date = date + if seq != NO_SEQ: + self.seq = seq + + result.extend(filter(None, (self.apply_pts_info(u, reset_deadline=True) for u in updates))) + + self.apply_deadlines_reset() + + def _sort_gaps(update): + pts = PtsInfo.from_update(update) + return pts.pts - pts.pts_count if pts else 0 + + if self.possible_gaps: + # For each update in possible gaps, see if the gap has been resolved already. + for key in list(self.possible_gaps.keys()): + self.possible_gaps[key].updates.sort(key=_sort_gaps) + + for _ in range(len(self.possible_gaps[key].updates)): + update = self.possible_gaps[key].updates.pop(0) + + # If this fails to apply, it will get re-inserted at the end. + # All should fail, so the order will be preserved (it would've cycled once). + update = self.apply_pts_info(update, reset_deadline=False) + if update: + result.append(update) + + # Clear now-empty gaps. + self.possible_gaps = {entry: gap for entry, gap in self.possible_gaps.items() if gap.updates} + + return (users, chats) + + # Tries to apply the input update if its `PtsInfo` follows the correct order. + # + # If the update can be applied, it is returned; otherwise, the update is stored in a + # possible gap (unless it was already handled or would be handled through getting + # difference) and `None` is returned. + def apply_pts_info( + self, + update, + *, + reset_deadline, + ): + pts = PtsInfo.from_update(update) + if not pts: + # No pts means that the update can be applied in any order. + return update + + # As soon as we receive an update of any form related to messages (has `PtsInfo`), + # the "no updates" period for that entry is reset. + # + # Build the `HashSet` to avoid calling `reset_deadline` more than once for the same entry. + if reset_deadline: + self.reset_deadlines_for.add(pts.entry) + + if pts.entry in self.getting_diff_for: + # Note: early returning here also prevents gap from being inserted (which they should + # not be while getting difference). + return None + + if pts.entry in self.map: + local_pts = self.map[pts.entry].pts + if local_pts + pts.pts_count > pts.pts: + # Ignore + return None + elif local_pts + pts.pts_count < pts.pts: + # Possible gap + # TODO store chats too? + if pts.entry not in self.possible_gaps: + self.possible_gaps[pts.entry] = PossibleGap( + deadline=asyncio.get_running_loop().time() + POSSIBLE_GAP_TIMEOUT, + updates=[] + ) + + self.possible_gaps[pts.entry].updates.append(update) + return None + else: + # Apply + pass + else: + # No previous `pts` known, and because this update has to be "right" (it's the first one) our + # `local_pts` must be one less. + local_pts = pts.pts - 1 + + # For example, when we're in a channel, we immediately receive: + # * ReadChannelInbox (pts = X) + # * NewChannelMessage (pts = X, pts_count = 1) + # + # Notice how both `pts` are the same. If we stored the one from the first, then the second one would + # be considered "already handled" and ignored, which is not desirable. Instead, advance local `pts` + # by `pts_count` (which is 0 for updates not directly related to messages, like reading inbox). + if pts.entry in self.map: + self.map[pts.entry].pts = local_pts + pts.pts_count + else: + self.map[pts.entry] = State(pts=local_pts + pts.pts_count, deadline=next_updates_deadline()) + + return update + + # endregion "Normal" updates flow (processing and detection of gaps). + + # region Getting and applying account difference. + + # Return the request that needs to be made to get the difference, if any. + def get_difference(self): + entry = ENTRY_ACCOUNT + if entry in self.getting_diff_for: + if entry in self.map: + return _tl.fn.updates.GetDifference( + pts=self.map[ENTRY_ACCOUNT].pts, + pts_total_limit=None, + date=self.date, + qts=self.map[ENTRY_SECRET].pts if ENTRY_SECRET in self.map else NO_SEQ, + ) + else: + # TODO investigate when/why/if this can happen + self.end_get_diff(entry) + + return None + + # Similar to [`MessageBox::process_updates`], but using the result from getting difference. + def apply_difference( + self, + diff, + chat_hashes, + ): + if isinstance(diff, _tl.updates.DifferenceEmpty): + self.date = diff.date + self.seq = diff.seq + self.end_get_diff(ENTRY_ACCOUNT) + return [], [], [] + elif isinstance(diff, _tl.updates.Difference): + self.end_get_diff(ENTRY_ACCOUNT) + chat_hashes.extend(diff.users, diff.chats) + return self.apply_difference_type(diff) + elif isinstance(diff, _tl.updates.DifferenceSlice): + chat_hashes.extend(diff.users, diff.chats) + return self.apply_difference_type(diff) + elif isinstance(diff, _tl.updates.DifferenceTooLong): + # TODO when are deadlines reset if we update the map?? + self.map[ENTRY_ACCOUNT].pts = diff.pts + self.end_get_diff(ENTRY_ACCOUNT) + return [], [], [] + + def apply_difference_type( + self, + diff, + ): + state = getattr(diff, 'intermediate_state', None) or diff.state + self.set_state(state) + + for u in diff.other_updates: + if isinstance(u, _tl.UpdateChannelTooLong): + self.begin_get_diff(u.channel_id) + + diff.other_updates.extend(_tl.UpdateNewMessage( + message=m, + pts=NO_SEQ, + pts_count=NO_SEQ, + ) for m in diff.new_messages) + diff.other_updates.extend(_tl.UpdateNewEncryptedMessage( + message=m, + qts=NO_SEQ, + ) for m in diff.new_encrypted_messages) + + return diff.other_updates, diff.users, diff.chats + + # endregion Getting and applying account difference. + + # region Getting and applying channel difference. + + # Return the request that needs to be made to get a channel's difference, if any. + def get_channel_difference( + self, + chat_hashes, + ): + entry = next((id for id in self.getting_diff_for if isinstance(id, int)), None) + if not entry: + return None + + packed = chat_hashes.get(entry) + if not packed: + # Cannot get channel difference as we're missing its hash + self.end_get_diff(entry) + # Remove the outdated `pts` entry from the map so that the next update can correct + # it. Otherwise, it will spam that the access hash is missing. + self.map.pop(entry, None) + return None + + state = self.map.get(entry) + if not state: + # TODO investigate when/why/if this can happen + # Cannot get channel difference as we're missing its pts + self.end_get_diff(entry) + return None + + return _tl.fn.updates.GetChannelDifference( + force=False, + channel=_tl.InputChannel(packed.id, packed.hash), + filter=_tl.ChannelMessagesFilterEmpty(), + pts=state.pts, + limit=BOT_CHANNEL_DIFF_LIMIT if chat_hashes.self_bot else USER_CHANNEL_DIFF_LIMIT + ) + + # Similar to [`MessageBox::process_updates`], but using the result from getting difference. + def apply_channel_difference( + self, + request, + diff, + chat_hashes, + ): + entry = request.channel.channel_id + self.possible_gaps.pop(entry, None) + + if isinstance(diff, _tl.updates.ChannelDifferenceEmpty): + assert diff.final + self.end_get_diff(entry) + self.map[entry].pts = diff.pts + return [], [], [] + elif isinstance(diff, _tl.updates.ChannelDifferenceTooLong): + assert diff.final + self.map[entry].pts = diff.dialog.pts + chat_hashes.extend(diff.users, diff.chats) + self.reset_channel_deadline(entry, diff.timeout) + # This `diff` has the "latest messages and corresponding chats", but it would + # be strange to give the user only partial changes of these when they would + # expect all updates to be fetched. Instead, nothing is returned. + return [], [], [] + elif isinstance(diff, _tl.updates.ChannelDifference): + if diff.final: + self.end_get_diff(entry) + + self.map[entry].pts = diff.pts + diff.other_updates.extend(_tl.UpdateNewMessage( + message=m, + pts=NO_SEQ, + pts_count=NO_SEQ, + ) for m in diff.new_messages) + chat_hashes.extend(diff.users, diff.chats) + self.reset_channel_deadline(entry, None) + + return diff.other_updates, diff.users, diff.chats + + # endregion Getting and applying channel difference. diff --git a/telethon/_updates/session.py b/telethon/_updates/session.py new file mode 100644 index 00000000..77ec30ae --- /dev/null +++ b/telethon/_updates/session.py @@ -0,0 +1,177 @@ +from typing import Optional, Tuple +from enum import IntEnum + + +class SessionState: + """ + Stores the information needed to fetch updates and about the current user. + + * user_id: 64-bit number representing the user identifier. + * dc_id: 32-bit number relating to the datacenter identifier where the user is. + * bot: is the logged-in user a bot? + * pts: 64-bit number holding the state needed to fetch updates. + * qts: alternative 64-bit number holding the state needed to fetch updates. + * date: 64-bit number holding the date needed to fetch updates. + * seq: 64-bit-number holding the sequence number needed to fetch updates. + * takeout_id: 64-bit-number holding the identifier of the current takeout session. + + Note that some of the numbers will only use 32 out of the 64 available bits. + However, for future-proofing reasons, we recommend you pretend they are 64-bit long. + """ + __slots__ = ('user_id', 'dc_id', 'bot', 'pts', 'qts', 'date', 'seq', 'takeout_id') + + def __init__( + self, + user_id: int, + dc_id: int, + bot: bool, + pts: int, + qts: int, + date: int, + seq: int, + takeout_id: Optional[int] + ): + self.user_id = user_id + self.dc_id = dc_id + self.bot = bot + self.pts = pts + self.qts = qts + self.date = date + self.seq = seq + self.takeout_id = takeout_id + + +class ChannelState: + """ + Stores the information needed to fetch updates from a channel. + + * channel_id: 64-bit number representing the channel identifier. + * pts: 64-bit number holding the state needed to fetch updates. + """ + __slots__ = ('channel_id', 'pts') + + def __init__( + self, + channel_id: int, + pts: int, + ): + self.channel_id = channel_id + self.pts = pts + + +class EntityType(IntEnum): + """ + You can rely on the type value to be equal to the ASCII character one of: + + * 'U' (85): this entity belongs to a :tl:`User` who is not a ``bot``. + * 'B' (66): this entity belongs to a :tl:`User` who is a ``bot``. + * 'G' (71): this entity belongs to a small group :tl:`Chat`. + * 'C' (67): this entity belongs to a standard broadcast :tl:`Channel`. + * 'M' (77): this entity belongs to a megagroup :tl:`Channel`. + * 'E' (69): this entity belongs to an "enormous" "gigagroup" :tl:`Channel`. + """ + USER = ord('U') + BOT = ord('B') + GROUP = ord('G') + CHANNEL = ord('C') + MEGAGROUP = ord('M') + GIGAGROUP = ord('E') + + def canonical(self): + """ + Return the canonical version of this type. + """ + return _canon_entity_types[self] + + +_canon_entity_types = { + EntityType.USER: EntityType.USER, + EntityType.BOT: EntityType.USER, + EntityType.GROUP: EntityType.GROUP, + EntityType.CHANNEL: EntityType.CHANNEL, + EntityType.MEGAGROUP: EntityType.CHANNEL, + EntityType.GIGAGROUP: EntityType.CHANNEL, +} + + +class Entity: + """ + Stores the information needed to use a certain user, chat or channel with the API. + + * ty: 8-bit number indicating the type of the entity (of type `EntityType`). + * id: 64-bit number uniquely identifying the entity among those of the same type. + * hash: 64-bit signed number needed to use this entity with the API. + + The string representation of this class is considered to be stable, for as long as + Telegram doesn't need to add more fields to the entities. It can also be converted + to bytes with ``bytes(entity)``, for a more compact representation. + """ + __slots__ = ('ty', 'id', 'hash') + + def __init__( + self, + ty: EntityType, + id: int, + hash: int + ): + self.ty = ty + self.id = id + self.hash = hash + + @property + def is_user(self): + """ + ``True`` if the entity is either a user or a bot. + """ + return self.ty in (EntityType.USER, EntityType.BOT) + + @property + def is_group(self): + """ + ``True`` if the entity is a small group chat or `megagroup`_. + + .. _megagroup: https://telegram.org/blog/supergroups5k + """ + return self.ty in (EntityType.GROUP, EntityType.MEGAGROUP) + + @property + def is_broadcast(self): + """ + ``True`` if the entity is a broadcast channel or `broadcast group`_. + + .. _broadcast group: https://telegram.org/blog/autodelete-inv2#groups-with-unlimited-members + """ + return self.ty in (EntityType.CHANNEL, EntityType.GIGAGROUP) + + @classmethod + def from_str(cls, string: str): + """ + Convert the string into an `Entity`. + """ + try: + ty, id, hash = string.split('.') + ty, id, hash = ord(ty), int(id), int(hash) + except AttributeError: + raise TypeError(f'expected str, got {string!r}') from None + except (TypeError, ValueError): + raise ValueError(f'malformed entity str (must be T.id.hash), got {string!r}') from None + + return cls(EntityType(ty), id, hash) + + @classmethod + def from_bytes(cls, blob): + """ + Convert the bytes into an `Entity`. + """ + try: + ty, id, hash = struct.unpack('