diff --git a/main.py b/main.py index 0a18496b..1346c661 100755 --- a/main.py +++ b/main.py @@ -42,7 +42,13 @@ if __name__ == '__main__': print('{}. {}'.format(i, display)) # Let the user decide who they want to talk to - i = int(input('Who do you want to send messages to (0 to exit)?: ')) - 1 + i = None + while i is None: + try: + i = int(input('Who do you want to send messages to (0 to exit)?: ')) - 1 + except ValueError: + pass + if i == -1: break diff --git a/network/mtproto_sender.py b/network/mtproto_sender.py index cf9f663d..a9b0d9c3 100755 --- a/network/mtproto_sender.py +++ b/network/mtproto_sender.py @@ -14,7 +14,7 @@ from tl.all_tlobjects import tlobjects class MtProtoSender: """MTProto Mobile Protocol sender (https://core.telegram.org/mtproto/description)""" - def __init__(self, transport, session, check_updates=True): + def __init__(self, transport, session): self.transport = transport self.session = session @@ -24,17 +24,13 @@ class MtProtoSender: # Store a Lock instance to make this class safely multi-threaded self.lock = Lock() - if check_updates: - self.updates_thread = Thread(target=self.updates_thread_method, name='Updates thread') - self.updates_thread_running = True - self.updates_thread_receiving = False - - self.updates_thread.start() + self.updates_thread = Thread(target=self.updates_thread_method, name='Updates thread') + self.updates_thread_running = False + self.updates_thread_receiving = False def disconnect(self): """Disconnects and **stops all the running threads** if any""" - self.updates_thread_running = False - self.transport.cancel_receive() + self.set_listen_for_updates(enabled=False) self.transport.close() def add_update_handler(self, handler): @@ -54,7 +50,7 @@ class MtProtoSender: # region Send and receive - def send(self, request): + def send(self, request, resend=False): """Sends the specified MTProtoRequest, previously sending any message which needed confirmation. This also pauses the updates thread""" @@ -64,8 +60,9 @@ class MtProtoSender: if self.updates_thread_receiving: self.transport.cancel_receive() - # Now only us can be using this method - self.lock.acquire() + # Now only us can be using this method if we're not resending + if not resend: + self.lock.acquire() # If any message needs confirmation send an AckRequest first if self.need_confirmation: @@ -161,9 +158,8 @@ class MtProtoSender: return message, remote_msg_id, remote_sequence - def process_msg(self, msg_id, sequence, reader, request, only_updates=False): - """Processes and handles a Telegram message. Optionally, this - function will only check for updates (hence the request can be None)""" + def process_msg(self, msg_id, sequence, reader, request=None): + """Processes and handles a Telegram message""" # TODO Check salt, session_id and sequence_number self.need_confirmation.append(msg_id) @@ -171,18 +167,18 @@ class MtProtoSender: code = reader.read_int(signed=False) reader.seek(-4) - # The following codes are "parsed manually" (and do not refer to an update) - if not only_updates: - if code == 0xf35c6d01: # rpc_result - return self.handle_rpc_result(msg_id, sequence, reader, request) - if code == 0x73f1f8dc: # msg_container - return self.handle_container(msg_id, sequence, reader, request) - if code == 0x3072cfa1: # gzip_packed - return self.handle_gzip_packed(msg_id, sequence, reader, request) - if code == 0xedab447b: # bad_server_salt - return self.handle_bad_server_salt(msg_id, sequence, reader, request) - if code == 0xa7eff811: # bad_msg_notification - return self.handle_bad_msg_notification(msg_id, sequence, reader) + # The following codes are "parsed manually" + if code == 0xf35c6d01: # rpc_result, (response of an RPC call, i.e., we sent a request) + return self.handle_rpc_result(msg_id, sequence, reader, request) + + if code == 0x73f1f8dc: # msg_container + return self.handle_container(msg_id, sequence, reader, request) + if code == 0x3072cfa1: # gzip_packed + return self.handle_gzip_packed(msg_id, sequence, reader, request) + if code == 0xedab447b: # bad_server_salt + return self.handle_bad_server_salt(msg_id, sequence, reader, request) + if code == 0xa7eff811: # bad_msg_notification + return self.handle_bad_msg_notification(msg_id, sequence, reader) # If the code is not parsed manually, then it was parsed by the code generator! # In this case, we will simply treat the incoming TLObject as an Update, @@ -218,7 +214,7 @@ class MtProtoSender: return False - def handle_bad_server_salt(self, msg_id, sequence, reader, mtproto_request): + def handle_bad_server_salt(self, msg_id, sequence, reader, request): code = reader.read_int(signed=False) bad_msg_id = reader.read_long(signed=False) bad_msg_seq_no = reader.read_int() @@ -227,8 +223,11 @@ class MtProtoSender: self.session.salt = new_salt + if request is None: + raise ValueError('Tried to handle a bad server salt with no request specified') + # Resend - self.send(mtproto_request) + self.send(request, resend=True) return True @@ -241,14 +240,13 @@ class MtProtoSender: raise BadMessageError(error_code) def handle_rpc_result(self, msg_id, sequence, reader, request): + if not request: + raise ValueError('RPC results should only happen after a request was sent') + code = reader.read_int(signed=False) request_id = reader.read_long(signed=False) inner_code = reader.read_int(signed=False) - if not request: - raise ValueError('Cannot handle RPC results if no request was specified. ' - 'This should only happen when the updates thread does not work properly.') - if request_id == request.msg_id: request.confirm_received = True @@ -266,27 +264,37 @@ class MtProtoSender: else: raise error - else: if inner_code == 0x3072cfa1: # GZip packed unpacked_data = gzip.decompress(reader.tgread_bytes()) with BinaryReader(unpacked_data) as compressed_reader: request.on_response(compressed_reader) - else: reader.seek(-4) request.on_response(reader) - def handle_gzip_packed(self, msg_id, sequence, reader, mtproto_request): + def handle_gzip_packed(self, msg_id, sequence, reader, request): code = reader.read_int(signed=False) packed_data = reader.tgread_bytes() unpacked_data = gzip.decompress(packed_data) with BinaryReader(unpacked_data) as compressed_reader: - self.process_msg(msg_id, sequence, compressed_reader, mtproto_request) + return self.process_msg(msg_id, sequence, compressed_reader, request) # endregion + def set_listen_for_updates(self, enabled): + if enabled: + if not self.updates_thread_running: + self.updates_thread_running = True + self.updates_thread_receiving = False + + self.updates_thread.start() + else: + self.updates_thread_running = False + if self.updates_thread_receiving: + self.transport.cancel_receive() + def updates_thread_method(self): """This method will run until specified and listen for incoming updates""" while self.updates_thread_running: @@ -297,7 +305,7 @@ class MtProtoSender: message, remote_msg_id, remote_sequence = self.decode_msg(body) with BinaryReader(message) as reader: - self.process_msg(remote_msg_id, remote_sequence, reader, request=None, only_updates=True) + self.process_msg(remote_msg_id, remote_sequence, reader) except ReadCancelledError: pass diff --git a/tl/telegram_client.py b/tl/telegram_client.py index d06bc5d3..9088b544 100644 --- a/tl/telegram_client.py +++ b/tl/telegram_client.py @@ -10,6 +10,7 @@ from network import MtProtoSender, TcpTransport from parser.markdown_parser import parse_message_entities # For sending and receiving requests +from tl import MTProtoRequest from tl import Session from tl.types import PeerUser, PeerChat, PeerChannel, InputPeerUser, InputPeerChat, InputPeerChannel, InputPeerEmpty from tl.functions import InvokeWithLayerRequest, InitConnectionRequest @@ -62,18 +63,22 @@ class TelegramClient: # Now it's time to send an InitConnectionRequest # This must always be invoked with the layer we'll be using - request = InvokeWithLayerRequest(layer=self.layer, - query=InitConnectionRequest(api_id=self.api_id, - device_model=platform.node(), - system_version=platform.system(), - app_version='0.2', - lang_code='en', - query=GetConfigRequest())) + query = InitConnectionRequest(api_id=self.api_id, + device_model=platform.node(), + system_version=platform.system(), + app_version='0.3', + lang_code='en', + query=GetConfigRequest()) - self.sender.send(request) - self.sender.receive(request) + result = self.invoke(InvokeWithLayerRequest(layer=self.layer, query=query)) - self.dc_options = request.result.dc_options + # Only listen for updates if we're authorized + if self.is_user_authorized(): + self.sender.set_listen_for_updates(True) + + # We're only interested in the DC options, + # although many other options are available! + self.dc_options = result.dc_options return True except RPCError as error: print('Could not stabilise initial connection: {}'.format(error)) @@ -114,11 +119,9 @@ class TelegramClient: completed = False while not completed: try: - self.sender.send(request) - self.sender.receive(request) + result = self.invoke(request) + self.phone_code_hashes[phone_number] = result.phone_code_hash completed = True - if request.result: - self.phone_code_hashes[phone_number] = request.result.phone_code_hash except InvalidDCError as error: self.reconnect_to_dc(error.new_dc) @@ -137,19 +140,19 @@ class TelegramClient: self.session.user = request.result.user self.session.save() + # Now that we're authorized, we can listen for incoming updates + self.sender.set_listen_for_updates(True) + return self.session.user def get_dialogs(self, count=10, offset_date=None, offset_id=0, offset_peer=InputPeerEmpty()): """Returns a tuple of lists ([dialogs], [displays], [input_peers]) with 'count' items each""" - request = GetDialogsRequest(offset_date=TelegramClient.get_tg_date(offset_date), - offset_id=offset_id, - offset_peer=offset_peer, - limit=count) - self.sender.send(request) - self.sender.receive(request) + r = self.invoke(GetDialogsRequest(offset_date=TelegramClient.get_tg_date(offset_date), + offset_id=offset_id, + offset_peer=offset_peer, + limit=count)) - r = request.result return (r.dialogs, [self.find_display_name(d.peer, r.users, r.chats) for d in r.dialogs], [self.find_input_peer(d.peer, r.users, r.chats) for d in r.dialogs]) @@ -161,14 +164,11 @@ class TelegramClient: else: msg, entities = message, [] - request = SendMessageRequest(peer=input_peer, - message=msg, - random_id=utils.generate_random_long(), - entities=entities, - no_webpage=no_web_page) - - self.sender.send(request) - self.sender.receive(request) + self.invoke(SendMessageRequest(peer=input_peer, + message=msg, + random_id=utils.generate_random_long(), + entities=entities, + no_webpage=no_web_page)) def get_message_history(self, input_peer, limit=20, offset_date=None, offset_id=0, max_id=0, min_id=0, add_offset=0): @@ -186,23 +186,19 @@ class TelegramClient: :return: A tuple containing total message count and two more lists ([messages], [senders]). Note that the sender can be null if it was not found! """ - request = GetHistoryRequest(input_peer, - limit=limit, - offset_date=self.get_tg_date(offset_date), - offset_id=offset_id, - max_id=max_id, - min_id=min_id, - add_offset=add_offset) + result = self.invoke(GetHistoryRequest(input_peer, + limit=limit, + offset_date=self.get_tg_date(offset_date), + offset_id=offset_id, + max_id=max_id, + min_id=min_id, + add_offset=add_offset)) - self.sender.send(request) - self.sender.receive(request) - - result = request.result # The result may be a messages slice (not all messages were retrieved) or # simply a messages TLObject. In the later case, no "count" attribute is specified: # the total messages count is retrieved by counting all the retrieved messages total_messages = getattr(result, 'count', len(result.messages)) - + return (total_messages, result.messages, [usr # Create a list with the users... @@ -210,6 +206,16 @@ class TelegramClient: for msg in result.messages # ...from all the messages... for usr in result.users]) # ...from all of the available users + def invoke(self, request): + """Invokes an MTProtoRequest and returns its results""" + if not issubclass(type(request), MTProtoRequest): + raise ValueError('You can only invoke MtProtoRequests') + + self.sender.send(request) + self.sender.receive(request) + + return request.result + # endregion # region Utilities