Add UI and utility functions for SQL injection testing

- Implemented a new UI module (sql_cli/ui.py) for displaying banners and scan results using the Rich library.
- Created utility functions in sql_cli/utils.py for generating log filenames and saving logs.
- Refactored sqlmapcli.py to utilize the new UI and utility functions, enhancing the interactive mode and scan processes.
- Added support for custom headers and POST data in the interactive mode.
- Introduced a test endpoints JSON file (test_endpoints.json) for batch testing.
This commit is contained in:
Wilbert Chandra 2026-01-07 12:49:14 +00:00
parent c45102fc1e
commit 2270c8981b
7 changed files with 645 additions and 688 deletions

View File

@ -4,9 +4,16 @@
}, },
{ {
"url": "https://demo.owasp-juice.shop/rest/user/login", "url": "https://demo.owasp-juice.shop/rest/user/login",
"data": "{\"email\":\"test@example.com\",\"password\":\"password123\"}" "data": {
"email": "test@example.com",
"password": "password123"
}
}, },
{ {
"url": "https://demo.owasp-juice.shop/api/Users/1" "url": "https://demo.owasp-juice.shop/api/Users/1",
"headers": [
"Authorization: Bearer my_secret_token",
"X-Custom-Header: value"
]
} }
] ]

18
sql_cli/models.py Normal file
View File

@ -0,0 +1,18 @@
from typing import List, Dict, Optional, TypedDict
from datetime import datetime
SQL_TECHNIQUES = {
"B": "Boolean-based blind",
"E": "Error-based",
"U": "Union query-based",
"S": "Stacked queries",
"T": "Time-based blind",
"Q": "Inline queries",
}
class ScanResult(TypedDict):
total_tests: int
vulnerabilities: List[Dict[str, str]]
start_time: Optional[datetime]
end_time: Optional[datetime]
target: Optional[str]

344
sql_cli/scanner.py Normal file
View File

@ -0,0 +1,344 @@
from rich.table import Table
import sys
import subprocess
import json
from datetime import datetime
from typing import List, Dict, Tuple, Optional, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
from rich.console import Console
from rich.progress import (
Progress,
SpinnerColumn,
TextColumn,
BarColumn,
TimeElapsedColumn,
)
from rich.panel import Panel
from rich import box
from .models import ScanResult
from .utils import SQLMAP_PATH, get_log_filename, save_log
from .ui import display_summary, display_batch_results
console = Console()
class SQLMapScanner:
def __init__(self, enable_logging: bool = True):
self.enable_logging = enable_logging
self.results: ScanResult = {
'total_tests': 0,
'vulnerabilities': [],
'start_time': None,
'end_time': None,
'target': None
}
def run_sqlmap_test(
self,
url: str,
level: int,
risk: int,
technique: str = "BEUSTQ",
batch: bool = True,
data: Optional[str] = None,
headers: Optional[str] = None,
verbose: int = 1,
extra_args: Optional[List[str]] = None,
) -> Tuple[bool, str]:
"""Run sqlmap with specified parameters"""
cmd = [
sys.executable,
str(SQLMAP_PATH),
"-u",
url,
f"--level={level}",
f"--risk={risk}",
f"--technique={technique}",
"-v",
str(verbose),
]
if batch:
cmd.append("--batch")
if data:
cmd.extend(["--data", data, "--method", "POST"])
if headers:
cmd.extend(["--headers", headers])
if extra_args:
cmd.extend(extra_args)
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True
)
return result.returncode == 0, result.stdout + result.stderr
except subprocess.TimeoutExpired:
return False, "Test timed out after 10 minutes"
except Exception as e:
return False, str(e)
def parse_results(self, output: str) -> Dict[str, Any]:
"""Parse sqlmap output for vulnerabilities"""
vulns = []
# Look for vulnerability indicators
if "sqlmap identified the following injection point" in output:
lines = output.split("\n")
current_param = "Unknown"
for i, line in enumerate(lines):
if "Parameter:" in line:
current_param = line.split("Parameter:")[1].strip()
elif "Type:" in line:
vuln_type = line.split("Type:")[1].strip()
if i + 1 < len(lines) and "Title:" in lines[i + 1]:
title = lines[i + 1].split("Title:")[1].strip()
vulns.append(
{
"parameter": current_param,
"type": vuln_type,
"title": title,
}
)
backend_dbms = None
if "back-end DBMS:" in output.lower():
for line in output.split("\n"):
if "back-end DBMS:" in line.lower():
backend_dbms = line.split(":", 1)[1].strip()
break
return {
"vulnerabilities": vulns,
"backend_dbms": backend_dbms,
"is_vulnerable": len(vulns) > 0 or "vulnerable" in output.lower(),
}
def comprehensive_scan(
self,
url: str,
max_level: int = 5,
max_risk: int = 3,
techniques: str = "BEUSTQ",
data: Optional[str] = None,
headers: Optional[str] = None,
verbose: int = 1,
):
"""Run comprehensive scan with all levels and risks"""
self.results["target"] = url
self.results["start_time"] = datetime.now()
results_table = Table(title="Scan Results", box=box.ROUNDED)
results_table.add_column("Level", style="cyan", justify="center")
results_table.add_column("Risk", style="yellow", justify="center")
results_table.add_column("Status", justify="center")
results_table.add_column("Findings", style="magenta")
total_tests = max_level * max_risk
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TimeElapsedColumn(),
console=console,
) as progress:
overall_task = progress.add_task(
f"[cyan]Scanning {url}...", total=total_tests
)
for level in range(1, max_level + 1):
for risk in range(1, max_risk + 1):
progress.update(
overall_task,
description=f"[cyan]Testing Level {level}, Risk {risk}...",
)
success, output = self.run_sqlmap_test(
url, level, risk, techniques, data=data, headers=headers, verbose=verbose
)
parsed = self.parse_results(output)
status = "" if success else ""
status_style = "green" if success else "red"
findings = (
"No vulnerabilities"
if not parsed["is_vulnerable"]
else f"{len(parsed['vulnerabilities'])} found!"
)
findings_style = (
"green" if not parsed["is_vulnerable"] else "bold red"
)
if parsed["is_vulnerable"]:
self.results["vulnerabilities"].extend(
parsed["vulnerabilities"]
)
results_table.add_row(
str(level),
str(risk),
f"[{status_style}]{status}[/{status_style}]",
f"[{findings_style}]{findings}[/{findings_style}]",
)
progress.update(overall_task, advance=1)
self.results["total_tests"] += 1
self.results["end_time"] = datetime.now()
console.print()
console.print(results_table)
display_summary(self.results)
def quick_scan(
self,
url: str,
level: int = 1,
risk: int = 1,
data: Optional[str] = None,
headers: Optional[str] = None,
raw: bool = False,
verbose: int = 1,
):
"""Run a quick scan with default settings"""
self.results["target"] = url
self.results["start_time"] = datetime.now()
if not raw:
scan_info = f"[cyan]Running quick scan on:[/cyan]\n[yellow]{url}[/yellow]\n[dim]Level: {level}, Risk: {risk}[/dim]"
if data:
scan_info += f"\n[dim]POST Data: {data}[/dim]"
if headers:
scan_info += f"\n[dim]Headers: {headers}[/dim]"
console.print(Panel(scan_info, border_style="cyan", box=box.ROUNDED))
if raw:
console.print("[cyan]Running sqlmap...[/cyan]\n")
success, output = self.run_sqlmap_test(
url, level, risk, data=data, headers=headers, verbose=verbose
)
console.print(output)
if self.enable_logging:
log_file = get_log_filename(url)
save_log(log_file, output)
return
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
TimeElapsedColumn(),
console=console,
) as progress:
task = progress.add_task(
"[cyan]Scanning for vulnerabilities...", total=None
)
success, output = self.run_sqlmap_test(
url, level, risk, data=data, headers=headers, verbose=verbose
)
progress.update(task, completed=True)
if self.enable_logging:
log_file = get_log_filename(url)
save_log(log_file, output)
parsed = self.parse_results(output)
self.results["vulnerabilities"] = parsed["vulnerabilities"]
self.results["total_tests"] = 1
self.results["end_time"] = datetime.now()
display_summary(self.results)
def process_single_endpoint(self, endpoint: Dict, level: int, risk: int, verbose: int) -> Dict:
"""Process a single endpoint for batch mode"""
url = str(endpoint.get('url')) if endpoint.get('url') else ''
data = endpoint.get('data')
if data is not None and not isinstance(data, str):
data = json.dumps(data)
headers = endpoint.get('headers')
if headers is not None and isinstance(headers, list):
headers = "\\n".join(headers)
try:
success, output = self.run_sqlmap_test(url, level, risk, data=data, headers=headers, verbose=verbose)
if self.enable_logging:
log_file = get_log_filename(url)
save_log(log_file, output)
parsed = self.parse_results(output)
return {
'url': url,
'data': data,
'success': success,
'vulnerabilities': parsed['vulnerabilities'],
'is_vulnerable': parsed['is_vulnerable']
}
except Exception as e:
return {
'url': url,
'data': data,
'success': False,
'error': str(e),
'vulnerabilities': [],
'is_vulnerable': False
}
def batch_scan(self, endpoints: List[Dict], level: int = 1, risk: int = 1,
concurrency: int = 5, verbose: int = 1):
"""Run batch scan on multiple endpoints with concurrency"""
console.print(
Panel(
f"[cyan]Batch Scan Mode[/cyan]\n"
f"[dim]Testing {len(endpoints)} endpoint(s) with concurrency={concurrency}[/dim]\n"
f"[dim]Level: {level}, Risk: {risk}[/dim]",
border_style="cyan",
box=box.ROUNDED
)
)
results = []
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TextColumn("({task.completed}/{task.total})"),
TimeElapsedColumn(),
console=console
) as progress:
task = progress.add_task("[cyan]Processing endpoints...", total=len(endpoints))
with ThreadPoolExecutor(max_workers=concurrency) as executor:
future_to_endpoint = {
executor.submit(self.process_single_endpoint, endpoint, level, risk, verbose): endpoint
for endpoint in endpoints
}
for future in as_completed(future_to_endpoint):
endpoint = future_to_endpoint[future]
try:
results.append(future.result())
except Exception as e:
results.append({
'url': endpoint.get('url'),
'data': endpoint.get('data'),
'success': False,
'error': str(e),
'vulnerabilities': [],
'is_vulnerable': False
})
progress.update(task, advance=1)
display_batch_results(results)
return results

145
sql_cli/ui.py Normal file
View File

@ -0,0 +1,145 @@
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from rich import box
from typing import List, Dict
from .models import ScanResult
console = Console()
def print_banner():
"""Display a beautiful banner"""
banner = """
CLI - Automated SQL Injection Testing
"""
console.print(banner, style="bold cyan")
console.print(
Panel(
"[yellow]⚠️ Legal Disclaimer: Only use on targets you have permission to test[/yellow]",
border_style="yellow",
box=box.ROUNDED,
)
)
console.print()
def display_summary(results: ScanResult):
"""Display a comprehensive summary of results"""
console.print()
# Calculate duration
duration = 0.0
if results["end_time"] and results["start_time"]:
duration = (results["end_time"] - results["start_time"]).total_seconds()
# Create summary panel
summary_text = f"""
[cyan]Target:[/cyan] {results["target"] or "N/A"}
[cyan]Total Tests:[/cyan] {results["total_tests"]}
[cyan]Duration:[/cyan] {duration:.2f} seconds
[cyan]Vulnerabilities Found:[/cyan] {len(results["vulnerabilities"])}
"""
console.print(
Panel(
summary_text.strip(),
title="[bold]Scan Summary[/bold]",
border_style="green" if len(results["vulnerabilities"]) == 0 else "red",
box=box.DOUBLE,
)
)
# Display vulnerabilities if found
if results["vulnerabilities"]:
console.print()
vuln_table = Table(title="⚠️ Vulnerabilities Detected", box=box.HEAVY)
vuln_table.add_column("Parameter", style="cyan")
vuln_table.add_column("Type", style="yellow")
vuln_table.add_column("Title", style="red")
for vuln in results["vulnerabilities"]:
vuln_table.add_row(
vuln.get("parameter", "N/A"),
vuln.get("type", "N/A"),
vuln.get("title", "N/A"),
)
console.print(vuln_table)
console.print()
console.print(
"[bold red]⚠️ SQL injection vulnerabilities detected! Take immediate action.[/bold red]"
)
else:
console.print()
console.print(
"[bold green]✓ No SQL injection vulnerabilities detected.[/bold green]"
)
console.print()
def display_batch_results(results: List[Dict]):
"""Display batch scan results in a table"""
console.print()
# Create results table
results_table = Table(title="Batch Scan Results", box=box.ROUNDED)
results_table.add_column("URL", style="cyan", no_wrap=False)
results_table.add_column("Status", justify="center")
results_table.add_column("Vulnerabilities", style="magenta")
vulnerable_count = 0
successful_count = 0
for result in results:
url = result['url'][:60] + '...' if len(result['url']) > 60 else result['url']
if result.get('error'):
status = "[red]✗ Error[/red]"
vulns = f"[red]{result['error'][:40]}[/red]"
elif result['success']:
successful_count += 1
if result['is_vulnerable']:
vulnerable_count += 1
status = "[red]✓ Vulnerable[/red]"
vulns = f"[red]{len(result['vulnerabilities'])} found[/red]"
else:
status = "[green]✓ Clean[/green]"
vulns = "[green]None[/green]"
else:
status = "[yellow]✗ Failed[/yellow]"
vulns = "[yellow]N/A[/yellow]"
results_table.add_row(url, status, vulns)
console.print(results_table)
# Summary
console.print()
summary = f"""
[cyan]Batch Summary:[/cyan]
Total Endpoints: {len(results)}
Successful Scans: {successful_count}
Vulnerable: [red]{vulnerable_count}[/red]
Clean: [green]{successful_count - vulnerable_count}[/green]
"""
border_color = "red" if vulnerable_count > 0 else "green"
console.print(
Panel(
summary.strip(),
title="[bold]Summary[/bold]",
border_style=border_color,
box=box.DOUBLE
)
)
console.print()

27
sql_cli/utils.py Normal file
View File

@ -0,0 +1,27 @@
import re
from pathlib import Path
from datetime import datetime
from rich.console import Console
console = Console()
SQLMAP_PATH = Path(__file__).parent.parent / "sqlmap.py"
LOGS_DIR = Path(__file__).parent.parent / "logs"
def get_log_filename(url: str) -> Path:
"""Generate a log filename based on URL and timestamp"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Sanitize URL for filename
safe_url = re.sub(r'[^\w\-_\.]', '_', url)[:50]
return LOGS_DIR / f"sqlmap_{safe_url}_{timestamp}.log"
def save_log(log_file: Path, content: str):
"""Save content to log file"""
try:
if not LOGS_DIR.exists():
LOGS_DIR.mkdir(exist_ok=True)
with open(log_file, 'w', encoding='utf-8') as f:
f.write(content)
console.print(f"[dim]Log saved to: {log_file}[/dim]")
except Exception as e:
console.print(f"[yellow]Warning: Could not save log: {e}[/yellow]")

View File

@ -4,597 +4,85 @@ SQLMap CLI - A beautiful CLI wrapper for sqlmap
Automates comprehensive SQL injection testing with a single command Automates comprehensive SQL injection testing with a single command
""" """
import subprocess
import sys import sys
import argparse import argparse
import re
import json import json
from pathlib import Path from pathlib import Path
from typing import List, Dict, Tuple, Optional, TypedDict, Any
from datetime import datetime # Add the current directory to path so we can import from sql_cli
from concurrent.futures import ThreadPoolExecutor, as_completed sys.path.append(str(Path(__file__).parent))
try: try:
from rich.console import Console from rich.console import Console
from rich.panel import Panel from rich.panel import Panel
from rich.table import Table
from rich.progress import (
Progress,
SpinnerColumn,
TextColumn,
BarColumn,
TimeElapsedColumn,
)
from rich.prompt import Prompt, Confirm from rich.prompt import Prompt, Confirm
from rich import box
except ImportError: except ImportError:
print("Error: 'rich' library is required. Install it with: pip install rich") print("Error: 'rich' library is required. Install it with: pip install rich")
sys.exit(1) sys.exit(1)
from sql_cli.scanner import SQLMapScanner
from sql_cli.utils import SQLMAP_PATH
from sql_cli.ui import print_banner
console = Console() console = Console()
SQLMAP_PATH = Path(__file__).parent / "sqlmap.py" def interactive_mode(scanner: SQLMapScanner):
LOGS_DIR = Path(__file__).parent / "logs" """Interactive mode for user input"""
console.print()
SQL_TECHNIQUES = { console.print(
"B": "Boolean-based blind", Panel(
"E": "Error-based", "[cyan]Interactive Mode[/cyan]\n[dim]Enter target details for SQL injection testing[/dim]",
"U": "Union query-based", border_style="cyan",
"S": "Stacked queries",
"T": "Time-based blind",
"Q": "Inline queries",
}
class ScanResult(TypedDict):
total_tests: int
vulnerabilities: List[Dict[str, str]]
start_time: Optional[datetime]
end_time: Optional[datetime]
target: Optional[str]
class SQLMapCLI:
def __init__(self, enable_logging: bool = True):
self.console = Console()
self.enable_logging = enable_logging
self.results: ScanResult = {
'total_tests': 0,
'vulnerabilities': [],
'start_time': None,
'end_time': None,
'target': None
}
# Create logs directory if it doesn't exist
if self.enable_logging:
LOGS_DIR.mkdir(exist_ok=True)
def get_log_filename(self, url: str) -> Path:
"""Generate a log filename based on URL and timestamp"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Sanitize URL for filename
safe_url = re.sub(r'[^\w\-_\.]', '_', url)[:50]
return LOGS_DIR / f"sqlmap_{safe_url}_{timestamp}.log"
def save_log(self, log_file: Path, content: str):
"""Save content to log file"""
try:
with open(log_file, 'w', encoding='utf-8') as f:
f.write(content)
self.console.print(f"[dim]Log saved to: {log_file}[/dim]")
except Exception as e:
self.console.print(f"[yellow]Warning: Could not save log: {e}[/yellow]")
def print_banner(self):
"""Display a beautiful banner"""
banner = """
CLI - Automated SQL Injection Testing
"""
self.console.print(banner, style="bold cyan")
self.console.print(
Panel(
"[yellow]⚠️ Legal Disclaimer: Only use on targets you have permission to test[/yellow]",
border_style="yellow",
box=box.ROUNDED,
)
) )
self.console.print() )
def run_sqlmap_test( url = Prompt.ask("\n[cyan]Enter target URL[/cyan]")
self,
url: str,
level: int,
risk: int,
technique: str = "BEUSTQ",
batch: bool = True,
data: Optional[str] = None,
verbose: int = 1,
extra_args: Optional[List[str]] = None,
) -> Tuple[bool, str]:
"""Run sqlmap with specified parameters"""
cmd = [
sys.executable,
str(SQLMAP_PATH),
"-u",
url,
f"--level={level}",
f"--risk={risk}",
f"--technique={technique}",
"-v",
str(verbose),
]
if batch: # Ask if this is a POST request
cmd.append("--batch") has_data = Confirm.ask(
"[cyan]Does this request require POST data/body?[/cyan]", default=False
)
if data: data = None
cmd.extend(["--data", data, "--method", "POST"]) if has_data:
console.print("\n[dim]Examples:[/dim]")
if extra_args: console.print(
cmd.extend(extra_args) '[dim] JSON: {"email":"test@example.com","password":"pass123"}[/dim]'
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True
)
return result.returncode == 0, result.stdout + result.stderr
except subprocess.TimeoutExpired:
return False, "Test timed out after 10 minutes"
except Exception as e:
return False, str(e)
def parse_results(self, output: str) -> Dict[str, Any]:
"""Parse sqlmap output for vulnerabilities"""
vulns = []
# Look for vulnerability indicators
if "sqlmap identified the following injection point" in output:
# Extract injection details
lines = output.split("\n")
current_param = "Unknown" # Default parameter name
for i, line in enumerate(lines):
if "Parameter:" in line:
current_param = line.split("Parameter:")[1].strip()
elif "Type:" in line:
vuln_type = line.split("Type:")[1].strip()
# Check if next line contains the title
if i + 1 < len(lines) and "Title:" in lines[i + 1]:
title = lines[i + 1].split("Title:")[1].strip()
vulns.append(
{
"parameter": current_param,
"type": vuln_type,
"title": title,
}
)
# Check for backend DBMS detection
backend_dbms = None
if "back-end DBMS:" in output.lower():
for line in output.split("\n"):
if "back-end DBMS:" in line.lower():
backend_dbms = line.split(":", 1)[1].strip()
break
return {
"vulnerabilities": vulns,
"backend_dbms": backend_dbms,
"is_vulnerable": len(vulns) > 0 or "vulnerable" in output.lower(),
}
def comprehensive_scan(
self,
url: str,
max_level: int = 5,
max_risk: int = 3,
techniques: str = "BEUSTQ",
data: Optional[str] = None,
verbose: int = 1,
):
"""Run comprehensive scan with all levels and risks"""
self.results["target"] = url
self.results["start_time"] = datetime.now()
# Create results table
results_table = Table(title="Scan Results", box=box.ROUNDED)
results_table.add_column("Level", style="cyan", justify="center")
results_table.add_column("Risk", style="yellow", justify="center")
results_table.add_column("Status", justify="center")
results_table.add_column("Findings", style="magenta")
total_tests = max_level * max_risk
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TimeElapsedColumn(),
console=self.console,
) as progress:
overall_task = progress.add_task(
f"[cyan]Scanning {url}...", total=total_tests
)
for level in range(1, max_level + 1):
for risk in range(1, max_risk + 1):
progress.update(
overall_task,
description=f"[cyan]Testing Level {level}, Risk {risk}...",
)
success, output = self.run_sqlmap_test(
url, level, risk, techniques, data=data, verbose=verbose
)
parsed = self.parse_results(output)
status = "" if success else ""
status_style = "green" if success else "red"
findings = (
"No vulnerabilities"
if not parsed["is_vulnerable"]
else f"{len(parsed['vulnerabilities'])} found!"
)
findings_style = (
"green" if not parsed["is_vulnerable"] else "bold red"
)
if parsed["is_vulnerable"]:
self.results["vulnerabilities"].extend(
parsed["vulnerabilities"]
)
results_table.add_row(
str(level),
str(risk),
f"[{status_style}]{status}[/{status_style}]",
f"[{findings_style}]{findings}[/{findings_style}]",
)
progress.update(overall_task, advance=1)
self.results["total_tests"] += 1
self.results["end_time"] = datetime.now()
# Display results
self.console.print()
self.console.print(results_table)
self.display_summary()
def quick_scan(
self,
url: str,
level: int = 1,
risk: int = 1,
data: Optional[str] = None,
raw: bool = False,
verbose: int = 1,
):
"""Run a quick scan with default settings"""
self.results["target"] = url
self.results["start_time"] = datetime.now()
if not raw:
scan_info = f"[cyan]Running quick scan on:[/cyan]\n[yellow]{url}[/yellow]\n[dim]Level: {level}, Risk: {risk}[/dim]"
if data:
scan_info += f"\n[dim]POST Data: {data}[/dim]"
self.console.print(Panel(scan_info, border_style="cyan", box=box.ROUNDED))
if raw:
# Raw mode - just show sqlmap output directly
self.console.print("[cyan]Running sqlmap...[/cyan]\n")
success, output = self.run_sqlmap_test(
url, level, risk, data=data, verbose=verbose
)
self.console.print(output)
# Save log
if self.enable_logging:
log_file = self.get_log_filename(url)
self.save_log(log_file, output)
return
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
TimeElapsedColumn(),
console=self.console,
) as progress:
task = progress.add_task(
"[cyan]Scanning for vulnerabilities...", total=None
)
success, output = self.run_sqlmap_test(
url, level, risk, data=data, verbose=verbose
)
progress.update(task, completed=True)
# Save log
if self.enable_logging:
log_file = self.get_log_filename(url)
self.save_log(log_file, output)
parsed = self.parse_results(output)
self.results["vulnerabilities"] = parsed["vulnerabilities"]
self.results["total_tests"] = 1
self.results["end_time"] = datetime.now()
self.display_summary()
def display_summary(self):
"""Display a comprehensive summary of results"""
self.console.print()
# Calculate duration
duration = 0.0
if self.results["end_time"] and self.results["start_time"]:
duration = (
self.results["end_time"] - self.results["start_time"]
).total_seconds()
# Create summary panel
summary_text = f"""
[cyan]Target:[/cyan] {self.results["target"] or "N/A"}
[cyan]Total Tests:[/cyan] {self.results["total_tests"]}
[cyan]Duration:[/cyan] {duration:.2f} seconds
[cyan]Vulnerabilities Found:[/cyan] {len(self.results["vulnerabilities"])}
"""
self.console.print(
Panel(
summary_text.strip(),
title="[bold]Scan Summary[/bold]",
border_style="green"
if len(self.results["vulnerabilities"]) == 0
else "red",
box=box.DOUBLE,
)
) )
console.print("[dim] Form: username=admin&password=secret[/dim]")
data = Prompt.ask("\n[cyan]Enter POST data/body[/cyan]")
# Display vulnerabilities if found # Ask for custom headers
if self.results["vulnerabilities"]: has_headers = Confirm.ask(
self.console.print() "[cyan]Do you need to add custom headers (Auth, etc.)?[/cyan]", default=False
vuln_table = Table(title="⚠️ Vulnerabilities Detected", box=box.HEAVY) )
vuln_table.add_column("Parameter", style="cyan")
vuln_table.add_column("Type", style="yellow")
vuln_table.add_column("Title", style="red")
for vuln in self.results["vulnerabilities"]: headers = None
vuln_table.add_row( if has_headers:
vuln.get("parameter", "N/A"), console.print("\n[dim]Example:[/dim]")
vuln.get("type", "N/A"), console.print(
vuln.get("title", "N/A"), '[dim] "Authorization: Bearer token; Cookie: PHPSESSID=..."[/dim]'
)
self.console.print(vuln_table)
self.console.print()
self.console.print(
"[bold red]⚠️ SQL injection vulnerabilities detected! Take immediate action.[/bold red]"
)
else:
self.console.print()
self.console.print(
"[bold green]✓ No SQL injection vulnerabilities detected.[/bold green]"
)
self.console.print()
def process_single_endpoint(self, endpoint: Dict, level: int, risk: int, verbose: int) -> Dict:
"""Process a single endpoint for batch mode"""
url = endpoint.get('url')
data = endpoint.get('data')
try:
success, output = self.run_sqlmap_test(url, level, risk, data=data, verbose=verbose)
# Save log
if self.enable_logging:
log_file = self.get_log_filename(url)
self.save_log(log_file, output)
parsed = self.parse_results(output)
return {
'url': url,
'data': data,
'success': success,
'vulnerabilities': parsed['vulnerabilities'],
'is_vulnerable': parsed['is_vulnerable']
}
except Exception as e:
return {
'url': url,
'data': data,
'success': False,
'error': str(e),
'vulnerabilities': [],
'is_vulnerable': False
}
def batch_scan(self, endpoints: List[Dict], level: int = 1, risk: int = 1,
concurrency: int = 5, verbose: int = 1):
"""Run batch scan on multiple endpoints with concurrency"""
self.console.print(
Panel(
f"[cyan]Batch Scan Mode[/cyan]\n"
f"[dim]Testing {len(endpoints)} endpoint(s) with concurrency={concurrency}[/dim]\n"
f"[dim]Level: {level}, Risk: {risk}[/dim]",
border_style="cyan",
box=box.ROUNDED
)
) )
headers = Prompt.ask("\n[cyan]Enter headers[/cyan]")
results = []
completed = 0 scan_type = Prompt.ask(
"\n[cyan]Select scan type[/cyan]",
with Progress( choices=["quick", "comprehensive"],
SpinnerColumn(), default="quick",
TextColumn("[progress.description]{task.description}"), )
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), if scan_type == "quick":
TextColumn("({task.completed}/{task.total})"), level = int(Prompt.ask("[cyan]Test level (1-5)[/cyan]", default="1"))
TimeElapsedColumn(), risk = int(Prompt.ask("[cyan]Test risk (1-3)[/cyan]", default="1"))
console=self.console scanner.quick_scan(url, level, risk, data=data, headers=headers)
) as progress: else:
max_level = int(
task = progress.add_task( Prompt.ask("[cyan]Maximum test level (1-5)[/cyan]", default="5")
"[cyan]Processing endpoints...",
total=len(endpoints)
)
with ThreadPoolExecutor(max_workers=concurrency) as executor:
future_to_endpoint = {
executor.submit(
self.process_single_endpoint,
endpoint,
level,
risk,
verbose
): endpoint
for endpoint in endpoints
}
for future in as_completed(future_to_endpoint):
endpoint = future_to_endpoint[future]
try:
result = future.result()
results.append(result)
completed += 1
progress.update(task, advance=1)
except Exception as e:
results.append({
'url': endpoint.get('url'),
'data': endpoint.get('data'),
'success': False,
'error': str(e),
'vulnerabilities': [],
'is_vulnerable': False
})
completed += 1
progress.update(task, advance=1)
# Display batch results
self.display_batch_results(results)
return results
def display_batch_results(self, results: List[Dict]):
"""Display batch scan results in a table"""
self.console.print()
# Create results table
results_table = Table(title="Batch Scan Results", box=box.ROUNDED)
results_table.add_column("URL", style="cyan", no_wrap=False)
results_table.add_column("Status", justify="center")
results_table.add_column("Vulnerabilities", style="magenta")
vulnerable_count = 0
successful_count = 0
for result in results:
url = result['url'][:60] + '...' if len(result['url']) > 60 else result['url']
if result.get('error'):
status = "[red]✗ Error[/red]"
vulns = f"[red]{result['error'][:40]}[/red]"
elif result['success']:
successful_count += 1
if result['is_vulnerable']:
vulnerable_count += 1
status = "[red]✓ Vulnerable[/red]"
vulns = f"[red]{len(result['vulnerabilities'])} found[/red]"
else:
status = "[green]✓ Clean[/green]"
vulns = "[green]None[/green]"
else:
status = "[yellow]✗ Failed[/yellow]"
vulns = "[yellow]N/A[/yellow]"
results_table.add_row(url, status, vulns)
self.console.print(results_table)
# Summary
self.console.print()
summary = f"""
[cyan]Batch Summary:[/cyan]
Total Endpoints: {len(results)}
Successful Scans: {successful_count}
Vulnerable: [red]{vulnerable_count}[/red]
Clean: [green]{successful_count - vulnerable_count}[/green]
"""
border_color = "red" if vulnerable_count > 0 else "green"
self.console.print(
Panel(
summary.strip(),
title="[bold]Summary[/bold]",
border_style=border_color,
box=box.DOUBLE
)
) )
self.console.print() max_risk = int(
Prompt.ask("[cyan]Maximum test risk (1-3)[/cyan]", default="3")
def interactive_mode(self):
"""Interactive mode for user input"""
self.console.print()
self.console.print(
Panel(
"[cyan]Interactive Mode[/cyan]\n[dim]Enter target details for SQL injection testing[/dim]",
border_style="cyan",
)
) )
scanner.comprehensive_scan(url, max_level, max_risk, data=data, headers=headers)
url = Prompt.ask("\n[cyan]Enter target URL[/cyan]")
# Ask if this is a POST request
has_data = Confirm.ask(
"[cyan]Does this request require POST data/body?[/cyan]", default=False
)
data = None
if has_data:
self.console.print("\n[dim]Examples:[/dim]")
self.console.print(
'[dim] JSON: {"email":"test@example.com","password":"pass123"}[/dim]'
)
self.console.print("[dim] Form: username=admin&password=secret[/dim]")
data = Prompt.ask("\n[cyan]Enter POST data/body[/cyan]")
scan_type = Prompt.ask(
"\n[cyan]Select scan type[/cyan]",
choices=["quick", "comprehensive"],
default="quick",
)
if scan_type == "quick":
level = int(Prompt.ask("[cyan]Test level (1-5)[/cyan]", default="1"))
risk = int(Prompt.ask("[cyan]Test risk (1-3)[/cyan]", default="1"))
self.quick_scan(url, level, risk, data=data)
else:
max_level = int(
Prompt.ask("[cyan]Maximum test level (1-5)[/cyan]", default="5")
)
max_risk = int(
Prompt.ask("[cyan]Maximum test risk (1-3)[/cyan]", default="3")
)
self.comprehensive_scan(url, max_level, max_risk, data=data)
def main(): def main():
@ -615,139 +103,51 @@ Examples:
# Batch mode - test multiple endpoints from JSON file # Batch mode - test multiple endpoints from JSON file
python sqlmapcli.py -b endpoints.json --level 2 --risk 2 --concurrency 10 python sqlmapcli.py -b endpoints.json --level 2 --risk 2 --concurrency 10
# Batch mode example JSON file format:
# [
# {"url": "https://example.com/api/users?id=1"},
# {"url": "https://example.com/api/login", "data": "{\\"user\\":\\"test\\",\\"pass\\":\\"test\\"}"}
# ]
# Interactive mode # Interactive mode
python sqlmapcli.py --interactive python sqlmapcli.py --interactive
""", """,
) )
parser.add_argument( parser.add_argument("-u", "--url", help='Target URL (e.g., "http://example.com/page?id=1")')
"-u", "--url", help='Target URL (e.g., "http://example.com/page?id=1")' parser.add_argument("--comprehensive", action="store_true", help="Run comprehensive scan")
) parser.add_argument("--level", type=int, default=1, choices=[1, 2, 3, 4, 5], help="Level (1-5, default: 1)")
parser.add_argument("--risk", type=int, default=1, choices=[1, 2, 3], help="Risk (1-3, default: 1)")
parser.add_argument( parser.add_argument("--max-level", type=int, default=5, choices=[1, 2, 3, 4, 5], help="Max level for comprehensive")
"--comprehensive", parser.add_argument("--max-risk", type=int, default=3, choices=[1, 2, 3], help="Max risk for comprehensive")
action="store_true", parser.add_argument("--technique", type=str, default="BEUSTQ", help="SQL techniques (default: BEUSTQ)")
help="Run comprehensive scan with all risk/level combinations", parser.add_argument("--data", type=str, help='POST data')
) parser.add_argument("--headers", type=str, help='Extra headers')
parser.add_argument("--raw", action="store_true", help="Show raw sqlmap output")
parser.add_argument( parser.add_argument("--verbose", type=int, choices=[0, 1, 2, 3, 4, 5, 6], help="Verbosity (0-6)")
"--level", parser.add_argument("-i", "--interactive", action="store_true", help="Interactive mode")
type=int, parser.add_argument('-b', '--batch-file', type=str, help='Path to batch JSON')
default=1, parser.add_argument('-c', '--concurrency', type=int, default=5, help='Concurrency (default: 5)')
choices=[1, 2, 3, 4, 5], parser.add_argument('--no-logs', action='store_true', help='Disable logs')
help="Level of tests to perform (1-5, default: 1)",
)
parser.add_argument(
"--risk",
type=int,
default=1,
choices=[1, 2, 3],
help="Risk of tests to perform (1-3, default: 1)",
)
parser.add_argument(
"--max-level",
type=int,
default=5,
choices=[1, 2, 3, 4, 5],
help="Maximum level for comprehensive scan (default: 5)",
)
parser.add_argument(
"--max-risk",
type=int,
default=3,
choices=[1, 2, 3],
help="Maximum risk for comprehensive scan (default: 3)",
)
parser.add_argument(
"--technique",
type=str,
default="BEUSTQ",
help="SQL injection techniques to use (default: BEUSTQ)",
)
parser.add_argument(
"--data",
type=str,
help='Data string to be sent through POST (e.g., "username=test&password=test")',
)
parser.add_argument(
"--raw", action="store_true", help="Show raw sqlmap output without formatting"
)
parser.add_argument(
"--verbose",
type=int,
choices=[0, 1, 2, 3, 4, 5, 6],
help="Sqlmap verbosity level (0-6, default: 1)",
)
parser.add_argument(
"-i", "--interactive", action="store_true", help="Run in interactive mode"
)
parser.add_argument(
'-b', '--batch-file',
type=str,
help='Path to JSON file containing multiple endpoints to test'
)
parser.add_argument(
'-c', '--concurrency',
type=int,
default=5,
help='Number of concurrent scans for batch mode (default: 5)'
)
parser.add_argument(
'--no-logs',
action='store_true',
help='Disable saving logs to the logs folder'
)
args = parser.parse_args() args = parser.parse_args()
cli = SQLMapCLI(enable_logging=not args.no_logs) scanner = SQLMapScanner(enable_logging=not args.no_logs)
cli.print_banner() print_banner()
# Check if sqlmap exists
if not SQLMAP_PATH.exists(): if not SQLMAP_PATH.exists():
console.print( console.print(f"[bold red]Error: sqlmap.py not found at {SQLMAP_PATH}[/bold red]")
f"[bold red]Error: sqlmap.py not found at {SQLMAP_PATH}[/bold red]",
style="bold red",
)
console.print(
"[yellow]Make sure you're running this script from the sqlmap directory[/yellow]"
)
sys.exit(1) sys.exit(1)
# Interactive mode
if args.interactive: if args.interactive:
cli.interactive_mode() interactive_mode(scanner)
return return
# Batch mode
if args.batch_file: if args.batch_file:
try: try:
with open(args.batch_file, 'r') as f: with open(args.batch_file, 'r') as f:
endpoints = json.load(f) endpoints = json.load(f)
if not isinstance(endpoints, list): if not isinstance(endpoints, list):
console.print("[bold red]Error: Batch file must contain a JSON array of endpoints[/bold red]") console.print("[bold red]Error: Batch file must contain a JSON array[/bold red]")
sys.exit(1) sys.exit(1)
verbose_level = args.verbose if args.verbose is not None else 1 verbose_level = args.verbose if args.verbose is not None else 1
cli.batch_scan( scanner.batch_scan(
endpoints, endpoints,
level=args.level, level=args.level,
risk=args.risk, risk=args.risk,
@ -755,37 +155,34 @@ Examples:
verbose=verbose_level verbose=verbose_level
) )
return return
except FileNotFoundError: except Exception as e:
console.print(f"[bold red]Error: Batch file not found: {args.batch_file}[/bold red]") console.print(f"[bold red]Error loading batch file: {e}[/bold red]")
sys.exit(1)
except json.JSONDecodeError as e:
console.print(f"[bold red]Error: Invalid JSON in batch file: {e}[/bold red]")
sys.exit(1) sys.exit(1)
# Check if URL is provided
if not args.url: if not args.url:
console.print("[bold red]Error: URL is required (use -u, -b, or --interactive)[/bold red]") console.print("[bold red]Error: URL is required[/bold red]")
parser.print_help() parser.print_help()
sys.exit(1) sys.exit(1)
# Run appropriate scan
verbose_level = args.verbose if args.verbose is not None else 1 verbose_level = args.verbose if args.verbose is not None else 1
if args.comprehensive: if args.comprehensive:
cli.comprehensive_scan( scanner.comprehensive_scan(
args.url, args.url,
max_level=args.max_level, max_level=args.max_level,
max_risk=args.max_risk, max_risk=args.max_risk,
techniques=args.technique, techniques=args.technique,
data=args.data, data=args.data,
headers=args.headers,
verbose=verbose_level, verbose=verbose_level,
) )
else: else:
cli.quick_scan( scanner.quick_scan(
args.url, args.url,
level=args.level, level=args.level,
risk=args.risk, risk=args.risk,
data=args.data, data=args.data,
headers=args.headers,
raw=args.raw, raw=args.raw,
verbose=verbose_level, verbose=verbose_level,
) )

19
test_endpoints.json Normal file
View File

@ -0,0 +1,19 @@
[
{
"url": "https://httpbin.org/get?id=1",
"headers": [
"X-Test-Header: value1",
"X-Auth-Token: secret123"
]
},
{
"url": "https://httpbin.org/post",
"data": {
"username": "admin",
"id": 123
},
"headers": [
"Content-Type: application/json"
]
}
]