speedup requests to backend using asyncio.as_completed

This commit is contained in:
kiriharu 2021-01-07 01:24:23 +03:00
parent e66afe0757
commit d2b6978b05
2 changed files with 41 additions and 17 deletions

View File

@ -6,9 +6,10 @@ from httpx import Response
from aiogram.bot import Bot from aiogram.bot import Bot
from datetime import datetime from datetime import datetime
from core.coretypes import APINodeInfo from core.coretypes import APINodeInfo
from .helpers import send_api_requests, check_int, validate_local from .helpers import send_api_requests, check_int, validate_local, timing
from loguru import logger from loguru import logger
from uuid import uuid4 from uuid import uuid4
from time import time
header = "Отчет о проверке хоста:" \ header = "Отчет о проверке хоста:" \
"\n\n— Хост: {target_fq}"\ "\n\n— Хост: {target_fq}"\
@ -64,6 +65,7 @@ class CheckerBaseHandler:
async def check(self, chat_id: int, bot: Bot, data: dict): async def check(self, chat_id: int, bot: Bot, data: dict):
# TODO: start check and end check metrics with ident, chat_id and api_endpoint # TODO: start check and end check metrics with ident, chat_id and api_endpoint
ts = time()
ident = uuid4().hex ident = uuid4().hex
logger.info(f"User {chat_id} started check {ident}") logger.info(f"User {chat_id} started check {ident}")
rsp_msg = await bot.send_message(chat_id, header.format(**data)) rsp_msg = await bot.send_message(chat_id, header.format(**data))
@ -79,6 +81,8 @@ class CheckerBaseHandler:
iter_keys = iter_keys + 1 iter_keys = iter_keys + 1
logger.info(f"User {chat_id} ended check {ident}") logger.info(f"User {chat_id} ended check {ident}")
await rsp_msg.edit_text(rsp_msg.text + f"\n\nПроверка завершена❗") await rsp_msg.edit_text(rsp_msg.text + f"\n\nПроверка завершена❗")
te = time()
logger.info(f"func {__name__} took {te - ts} sec")
async def validate_target(self, target: str): async def validate_target(self, target: str):
if validate_local(target): if validate_local(target):

View File

@ -1,5 +1,5 @@
from httpx import AsyncClient, Timeout, Response, ConnectError, ReadTimeout from httpx import AsyncClient, Timeout, Response
from typing import List from typing import List, Callable
from core.coretypes import APINode from core.coretypes import APINode
from ipaddress import ip_address from ipaddress import ip_address
from contextlib import suppress from contextlib import suppress
@ -8,6 +8,21 @@ from aiogram.bot import Bot
from tgbot.handlers.metrics import push_api_request_status from tgbot.handlers.metrics import push_api_request_status
from tgbot.config import NOTIFICATION_BOT_TOKEN, NOTIFICATION_USERS from tgbot.config import NOTIFICATION_BOT_TOKEN, NOTIFICATION_USERS
from traceback import format_exc from traceback import format_exc
from functools import wraps
from time import time
import asyncio
def timing(f):
@wraps(f)
def wrap(*args, **kw):
ts = time()
result = f(*args, **kw)
te = time()
logger.info(f"func {f.__name__} took {te - ts} sec")
return result
return wrap
def check_int(value) -> bool: def check_int(value) -> bool:
@ -40,24 +55,29 @@ def validate_local(target: str) -> bool:
return False return False
async def send_api_requests(endpoint: str, data: dict, nodes: List[APINode]): async def send_api_request(client: AsyncClient, endpoint: str, data: dict, node: APINode):
for node in nodes: try:
data.update(dict(token=node.token)) data['token'] = node.token
try: result = await client.get(
async with AsyncClient(timeout=Timeout(timeout=100.0)) as client: f"{node.address}/{endpoint}", params=data
result = await client.get( )
f"{node.address}/{endpoint}", params=data except Exception as e:
) logger.error(f"Node {node.address} got Error. Full exception: {e}")
except Exception as e: result = Response(500)
logger.error(f"Node {node.address} got Error. Full exception: {e}") await send_message_to_admins(f"Node {node.address} got error {e}. Full exception: ```{format_exc()}```")
# We yield 500 response when get error
result = Response(500)
await send_message_to_admins(f"Node {node.address} got error {e}. Full exception: ```{format_exc()}```")
await push_api_request_status( await push_api_request_status(
result.status_code, result.status_code,
endpoint endpoint
) )
yield result return result
async def send_api_requests(endpoint: str, data: dict, nodes: List[APINode]):
async with AsyncClient(timeout=Timeout(timeout=100.0)) as client:
tasks = [send_api_request(client, endpoint, data, node) for node in nodes]
for completed in asyncio.as_completed(tasks):
res = await completed
yield res
async def send_message_to_admins(message: str): async def send_message_to_admins(message: str):