fix #8, small refactor

This commit is contained in:
kiriharu 2021-01-05 16:24:10 +03:00
parent d1fab3f8e5
commit d9b4423672
6 changed files with 36 additions and 34 deletions

View File

@ -1,7 +1,7 @@
from aiogram.types import Message
from httpx import Response
from core.coretypes import ErrorPayload, ICMPCheckerResponse, ResponseStatus
from ..base import CheckerBaseHandler, NotEnoughArgs
from ..base import CheckerBaseHandler, NotEnoughArgs, LocalhostForbidden
icmp_help_message = """
Производит проверку хоста по протоколу ICMP.
@ -23,6 +23,8 @@ class ICMPCheckerHandler(CheckerBaseHandler):
args = await self.process_args(message.text)
except NotEnoughArgs:
return await message.answer(icmp_help_message)
except LocalhostForbidden:
return await message.answer(self.localhost_forbidden_message, parse_mode="Markdown")
await self.check(message.chat.id, message.bot, dict(target=args[0], target_fq=args[0]))
async def process_args(self, text: str) -> list:
@ -31,6 +33,7 @@ class ICMPCheckerHandler(CheckerBaseHandler):
raise NotEnoughArgs()
if len(args) >= 2:
target = args[1]
await self.validate_target(target)
return [target]
async def prepare_message(self, res: Response):

View File

@ -24,17 +24,7 @@ class MinecraftCheckerHandler(CheckerBaseHandler):
super().__init__()
async def handler(self, message: Message):
try:
args = await self.process_args(message.text)
except NotEnoughArgs:
return await message.answer(self.help_message, parse_mode="Markdown")
except InvalidPort:
return await message.answer(invalid_port, parse_mode="Markdown")
await self.check(
message.chat.id,
message.bot,
dict(target=args[0], port=args[1], target_fq=f"{args[0]}:{args[1]}")
)
await self.target_port_handler(message)
async def process_args(self, text: str) -> list:
return process_args_for_host_port(text, 25565)

View File

@ -23,17 +23,7 @@ class TCPCheckerHandler(CheckerBaseHandler):
super().__init__()
async def handler(self, message: Message):
try:
args = await self.process_args(message.text)
except NotEnoughArgs:
return await message.answer(self.help_message, parse_mode="Markdown")
except InvalidPort:
return await message.answer(invalid_port, parse_mode="Markdown")
await self.check(
message.chat.id,
message.bot,
dict(target=args[0], port=args[1], target_fq=f"{args[0]}:{args[1]}")
)
await self.target_port_handler(message)
async def process_args(self, text: str) -> list:
port = None

View File

@ -22,17 +22,7 @@ class WebCheckerHandler(CheckerBaseHandler):
super().__init__()
async def handler(self, message: Message):
try:
args = await self.process_args(message.text)
except NotEnoughArgs:
return await message.answer(self.help_message, parse_mode="Markdown")
except InvalidPort:
return await message.answer(invalid_port, parse_mode="Markdown")
await self.check(
message.chat.id,
message.bot,
dict(target=args[0], port=args[1], target_fq=f"{args[0]}:{args[1]}")
)
await self.target_port_handler(message)
async def process_args(self, text: str) -> list:
return process_args_for_host_port(text, 80)

View File

@ -1,6 +1,8 @@
from aiogram.types import Message
import whois
from tgbot.handlers.helpers import validate_local
whois_help_message = """
Вернёт информацию о домене.
@ -13,6 +15,8 @@ no_domain_text = """
Напишите /whois чтобы посмотреть справку.
"""
localhost_exception = "❗Локальные адреса запрещены!"
def create_whois_message(domain: str) -> str:
domain_info = whois.whois(domain)
@ -67,5 +71,7 @@ async def whois_cmd(msg: Message):
return await msg.answer(no_domain_text)
if len(args) >= 2:
host = args[1]
if validate_local(args[0]):
return await msg.answer(localhost_exception, parse_mode="Markdown")
await msg.bot.send_chat_action(msg.chat.id, 'typing')
await msg.answer(create_whois_message(host), parse_mode='html')

View File

@ -1,6 +1,8 @@
from httpx import AsyncClient, Timeout, Response, ConnectError
from typing import List
from core.coretypes import APINode
from ipaddress import ip_address
from contextlib import suppress
def check_int(value) -> bool:
@ -12,6 +14,27 @@ def check_int(value) -> bool:
return True
def validate_local(target: str) -> bool:
"""
Validates ip or FQDN is localhost
:return True if localhost find
"""
if target == "localhost":
return True
with suppress(ValueError):
ip_addr = ip_address(target)
if any(
[ip_addr.is_loopback,
ip_addr.is_private,
ip_addr.is_multicast,
ip_addr.is_link_local,
ip_addr.is_unspecified]
):
return True
return False
async def send_api_requests(endpoint: str, data: dict, nodes: List[APINode]):
for node in nodes:
data.update(dict(token=node.token))