mirror of
https://github.com/Alexander-D-Karpov/scripts.git
synced 2024-11-25 21:13:43 +03:00
248 lines
7.9 KiB
Python
248 lines
7.9 KiB
Python
import os
|
|
import json
|
|
import yaml
|
|
|
|
from telethon import TelegramClient
|
|
from telethon.tl import functions
|
|
from telethon.tl.types import (
|
|
MessageMediaDocument,
|
|
MessageMediaPhoto,
|
|
PeerChannel,
|
|
PeerUser,
|
|
PeerChat,
|
|
)
|
|
|
|
if os.getenv("api_id") is None:
|
|
raise ValueError("please set api_id env variable")
|
|
|
|
if os.getenv("api_hash") is None:
|
|
raise ValueError("please set api_hash env variable")
|
|
|
|
api_id = os.getenv("api_id")
|
|
api_hash = os.getenv("api_hash")
|
|
|
|
if not os.path.isdir("poller"):
|
|
os.mkdir("poller")
|
|
|
|
if not os.path.isfile("poller.yaml"):
|
|
raise FileNotFoundError("Please create poller.yaml")
|
|
|
|
with open("poller.yaml", "r") as stream:
|
|
try:
|
|
config = yaml.safe_load(stream)
|
|
except yaml.YAMLError as exc:
|
|
print(exc)
|
|
|
|
# load offsets
|
|
offsets = {}
|
|
if not os.path.isfile("poller/.offsets.json"):
|
|
f = open("poller/.offsets.json", "x")
|
|
f.write("{}")
|
|
f.close()
|
|
else:
|
|
with open("poller/.offsets.json") as f:
|
|
offsets = json.load(f)
|
|
|
|
|
|
# parse and check config
|
|
folders = []
|
|
channels = []
|
|
if "folders" in config:
|
|
if config["folders"]:
|
|
folders = config["folders"]
|
|
if type(folders) is not list:
|
|
raise TypeError("Folders should be a list(start with - on a new line)")
|
|
if not (all([type(x) is str for x in folders])):
|
|
raise TypeError("Folders should be specified by name")
|
|
if "channels" in config:
|
|
if "ids" in config["channels"]:
|
|
ids = config["channels"]["ids"]
|
|
if type(ids) is not list:
|
|
raise TypeError("Ids should be a list(start with - on a new line)")
|
|
if not (all([type(x) is int for x in ids])):
|
|
raise TypeError("Ids should be integers")
|
|
channels += list(map(str, ids))
|
|
if "usernames" in config["channels"]:
|
|
usernames = config["channels"]["usernames"]
|
|
if type(usernames) is not list:
|
|
raise TypeError("Usernames should be a list(start with - on a new line)")
|
|
if not (all([type(x) is str for x in usernames])):
|
|
raise TypeError("Channel's ids should be string")
|
|
channels += [x.replace("@", "") for x in usernames]
|
|
|
|
|
|
async def aenumerate(asequence, start=0):
|
|
"""Asynchronously enumerate an async iterator from a given start value"""
|
|
n = start
|
|
async for elem in asequence:
|
|
yield n, elem
|
|
n += 1
|
|
|
|
|
|
async def progress_bar(
|
|
iterable,
|
|
total,
|
|
prefix="",
|
|
suffix="",
|
|
decimals=1,
|
|
length=100,
|
|
fill="█",
|
|
print_end="\r",
|
|
):
|
|
# Progress Bar Printing Function
|
|
def print_progress_bar(iteration):
|
|
percent = ("{0:." + str(decimals) + "f}").format(
|
|
100 * (iteration / float(total))
|
|
)
|
|
filledLength = int(length * iteration // total)
|
|
bar = fill * filledLength + "-" * (length - filledLength)
|
|
print(
|
|
f"\r{prefix} |{bar}| {percent}% ({iteration+1}/{total}) {suffix}",
|
|
end=print_end,
|
|
)
|
|
|
|
print_progress_bar(0)
|
|
async for i, item in aenumerate(iterable):
|
|
yield item
|
|
print_progress_bar(item.id)
|
|
|
|
|
|
async def download(client, entity, title, min_id):
|
|
max_id = 0
|
|
async for message in client.iter_messages(entity):
|
|
max_id = message.id
|
|
break
|
|
|
|
if max_id <= min_id:
|
|
print(" " * 4 + f"done {title}")
|
|
return
|
|
if not os.path.isdir(f"poller/{title}"):
|
|
os.mkdir(f"poller/{title}")
|
|
if not os.path.isdir(f"poller/{title}/photos/"):
|
|
os.mkdir(f"poller/{title}/photos/")
|
|
if not os.path.isdir(f"poller/{title}/videos/"):
|
|
os.mkdir(f"poller/{title}/videos/")
|
|
if not os.path.isdir(f"poller/{title}/other/"):
|
|
os.mkdir(f"poller/{title}/other/")
|
|
print(" " * 4 + f"downloading {title}", end="\r")
|
|
async for message in progress_bar(
|
|
client.iter_messages(entity, reverse=True, min_id=min_id),
|
|
max_id,
|
|
" " * 4 + f"downloading {title}:",
|
|
):
|
|
if message.media:
|
|
if type(message.media) == MessageMediaPhoto:
|
|
await message.download_media(file=f"poller/{title}/photos/")
|
|
elif type(message.media) == MessageMediaDocument:
|
|
if message.media.document.mime_type:
|
|
mime_type = message.media.document.mime_type
|
|
if mime_type.startswith("image"):
|
|
await message.download_media(file=f"poller/{title}/photos/")
|
|
elif mime_type.startswith("video"):
|
|
await message.download_media(file=f"poller/{title}/videos/")
|
|
else:
|
|
await message.download_media(file=f"poller/{title}/other/")
|
|
else:
|
|
await message.download_media(file=f"poller/{title}/other/")
|
|
|
|
offsets[entity.id] = message.id
|
|
|
|
if message.id % 10 == 0:
|
|
with open("poller/.offsets.json", "w") as f:
|
|
json.dump(offsets, f, indent=4)
|
|
print(" " * 4 + f"done {title}")
|
|
with open("poller/.offsets.json", "w") as f:
|
|
json.dump(offsets, f, indent=4)
|
|
|
|
|
|
async def download_channel(client, id):
|
|
id = str(id)
|
|
min_id = 0
|
|
if id in offsets:
|
|
min_id = offsets[id]
|
|
try:
|
|
entity = await client.get_entity(PeerChannel(int(id)))
|
|
except ValueError:
|
|
print("channel not found, there is probably somthing broken...")
|
|
return
|
|
await download(client, entity, entity.title, min_id)
|
|
|
|
|
|
async def download_user(client, id):
|
|
id = str(id)
|
|
min_id = 0
|
|
if id in offsets:
|
|
min_id = offsets[id]
|
|
try:
|
|
entity = await client.get_entity(PeerUser(int(id)))
|
|
except ValueError:
|
|
print("user not found, there is probably somthing broken...")
|
|
return
|
|
await download(client, entity, entity.username, min_id)
|
|
|
|
|
|
async def download_chat(client, id):
|
|
id = str(id)
|
|
min_id = 0
|
|
if id in offsets:
|
|
min_id = offsets[id]
|
|
try:
|
|
entity = await client.get_entity(PeerChat(int(id)))
|
|
except ValueError:
|
|
print("chat not found, there is probably somthing broken...")
|
|
return
|
|
await download(client, entity, entity.title, min_id)
|
|
|
|
|
|
async def run(client):
|
|
err = False
|
|
|
|
if channels:
|
|
for c in channels:
|
|
try:
|
|
entity = await client.get_entity(c)
|
|
await download_channel(client, entity.id)
|
|
except ValueError:
|
|
err = True
|
|
print(f"Chat {c} not found")
|
|
if err:
|
|
raise ValueError(
|
|
"please check if channel's names or ids in config are correct"
|
|
)
|
|
|
|
if folders:
|
|
user_folders = {}
|
|
req = await client(functions.messages.GetDialogFiltersRequest())
|
|
for folder in req:
|
|
d = folder.to_dict()
|
|
if "title" in d:
|
|
if d["title"] in folders:
|
|
user_folders[d["title"]] = []
|
|
for el in d["include_peers"]:
|
|
id = 0
|
|
for name, val in el.items():
|
|
if "id" in name:
|
|
id = str(val)
|
|
user_folders[d["title"]].append({"_": el["_"], "id": id})
|
|
|
|
for el in folders:
|
|
if el not in user_folders:
|
|
err = True
|
|
print(f"folder {el} not found")
|
|
if err:
|
|
raise ValueError("please check if folder's names in config are correct")
|
|
for folder, included_chats in user_folders.items():
|
|
print(f"downloading folder: {folder}")
|
|
for el in included_chats:
|
|
id = el["id"]
|
|
if el["_"] == "InputPeerUser":
|
|
await download_user(client, id)
|
|
elif el["_"] == "InputPeerChannel":
|
|
await download_channel(client, id)
|
|
elif el["_"] == "InputPeerChat":
|
|
await download_chat(client, id)
|
|
|
|
|
|
with TelegramClient("downloader", int(api_id), api_hash) as client:
|
|
client.loop.run_until_complete(run(client))
|