Merge pull request #1 from GilbertKrantz/copilot/create-cli-app-for-sql-injection

Add Rich-based CLI wrapper for automated SQL injection testing with POST support, batch processing, and automatic logging
This commit is contained in:
Wilbert Chandra 2026-01-07 20:35:30 +07:00 committed by GitHub
commit 6c527afe62
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 1346 additions and 1 deletions

3
.gitignore vendored
View File

@ -5,4 +5,5 @@ __pycache__/
traffic.txt
*~
req*.txt
.idea/
.idea/
logs/

230
EXAMPLES.md Normal file
View File

@ -0,0 +1,230 @@
# SQLMap CLI - Examples
## Installation
```bash
# Install dependencies
pip install -r requirements.txt
```
## Basic Usage
### 1. Quick Scan (Default: Level 1, Risk 1)
Test a single URL with minimal risk:
```bash
python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/products/search?q=test"
```
### 2. Comprehensive Scan
Test all combinations of risk (1-3) and levels (1-5) automatically:
```bash
python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/products/search?q=test" --comprehensive
```
This runs **15 tests total** (5 levels × 3 risks) and provides a complete vulnerability assessment.
### 3. Custom Level and Risk
Run a specific test configuration:
```bash
# Medium level, medium risk
python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/products/search?q=test" --level 3 --risk 2
# High level, high risk
python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/products/search?q=test" --level 5 --risk 3
```
### 4. Interactive Mode
Get guided prompts for easy testing:
```bash
python sqlmapcli.py --interactive
```
This will ask you:
- Target URL
- Whether the request requires POST data/body
- POST data/body (if needed) - supports JSON or form data
- Scan type (quick or comprehensive)
- Custom level and risk settings
### 5. Custom Comprehensive Scan
Limit the comprehensive scan to specific max values:
```bash
# Test only up to level 3 and risk 2
python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/products/search?q=test" --comprehensive --max-level 3 --max-risk 2
```
### 6. Raw Output Mode
Get the exact same output as running sqlmap directly:
```bash
# Show raw sqlmap output without formatting
python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/user/login" --data='{"email":"test@example.com","password":"pass123"}' --level 2 --risk 2 --raw
# Increase verbosity for more details
python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/user/login" --data='{"email":"test@example.com","password":"pass123"}' --verbose 3 --raw
```
**Note**: The `--raw` flag ensures the CLI output matches sqlmap exactly, bypassing all formatting and parsing.
### 7. Batch Mode - Test Multiple Endpoints
Test multiple endpoints with concurrency:
```bash
# Test multiple endpoints from a JSON file with auto-scaled concurrency (default, typically 2x CPU cores)
python sqlmapcli.py -b endpoints.json --level 2 --risk 2
# Test with specific concurrency (10 concurrent scans)
python sqlmapcli.py -b endpoints.json --level 2 --risk 2 --concurrency 10
# Test with custom settings
python sqlmapcli.py -b endpoints.json --level 3 --risk 2 --concurrency 5
```
**Batch File Format** (`endpoints.json`):
```json
[
{
"url": "https://demo.owasp-juice.shop/rest/products/search?q=test"
},
{
"url": "https://demo.owasp-juice.shop/rest/user/login",
"data": "{\"email\":\"test@example.com\",\"password\":\"password123\"}"
},
{
"url": "https://demo.owasp-juice.shop/api/Users/1"
}
]
```
**Features**:
- Tests N endpoints with M concurrency
- Automatically saves logs for each endpoint
- Displays progress and summary table
- Supports both GET and POST requests
### 8. Log Management
Logs are automatically saved to the `logs/` folder:
```bash
# Run scan with logging (default behavior)
python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/products/search?q=test"
# Log saved to: logs/sqlmap_https___demo_owasp_juice_shop_rest_produ_20260107_123456.log
# Disable logging if needed
python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/products/search?q=test" --no-logs
```
**Log Features**:
- Automatic log folder creation
- Timestamped log files
- Sanitized filenames based on URL
- Complete sqlmap output saved
## Real-World Testing Example
**Using OWASP Juice Shop Demo** (a legitimate vulnerable application for security testing):
```bash
# Quick scan on OWASP Juice Shop REST API with GET parameter
python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/products/search?q=test" --level 2 --risk 2
# Test login endpoint with POST data (JSON)
python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/user/login" --data='{"email":"test@example.com","password":"password123"}' --level 2 --risk 2
# Comprehensive scan on login endpoint
python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/user/login" --data='{"email":"test@example.com","password":"password123"}' --comprehensive
```
This is a real, legitimate target designed for security testing and learning.
## Understanding Levels and Risks
### Levels (1-5)
- **Level 1**: Default, tests GET and POST parameters
- **Level 2**: Adds HTTP Cookie header testing
- **Level 3**: Adds HTTP User-Agent/Referer headers testing
- **Level 4**: Deeper tests with more payloads
- **Level 5**: Maximum depth, most comprehensive
### Risks (1-3)
- **Risk 1**: Safe for all databases, minimal intrusion
- **Risk 2**: May include time-based tests (slight delay)
- **Risk 3**: Aggressive tests (may cause OR attacks on UPDATE/INSERT)
## Output Examples
### Successful Scan (No Vulnerabilities)
```
╔════════════════════════════════════════════════════ Scan Summary ════════════════════════════════════════════════════╗
║ Target: http://example.com/page?id=1 ║
║ Total Tests: 1 ║
║ Duration: 12.45 seconds ║
║ Vulnerabilities Found: 0 ║
╚══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝
✓ No SQL injection vulnerabilities detected.
```
### Vulnerable Target Found
```
⚠️ Vulnerabilities Detected
┏━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃ Parameter ┃ Type ┃ Title ┃
┣━━━━━━━━━━━╋━━━━━━━━━━━━━━━━━━━━━╋━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫
┃ id ┃ boolean-based blind ┃ AND boolean-based blind - WHERE or HAVING clause ┃
┃ id ┃ time-based blind ┃ MySQL >= 5.0.12 AND time-based blind (query SLEEP) ┃
┗━━━━━━━━━━━┻━━━━━━━━━━━━━━━━━━━━━┻━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛
⚠️ SQL injection vulnerabilities detected! Take immediate action.
```
## Features Showcase
✨ **Beautiful UI with Rich**
- Colored output for easy reading
- Progress bars showing scan status
- Tables for organized results
- Panels for important information
⚡ **One-Line Testing**
- Run all risk/level combinations with `--comprehensive`
- No need to manually iterate through tests
- Automatic result aggregation
📊 **Clear Summaries**
- See exactly what was tested
- Color-coded findings (red = vulnerable, green = safe)
- Detailed vulnerability tables
- Duration tracking
🎯 **User-Friendly**
- Interactive mode for beginners
- Flexible command-line options for experts
- Clear help messages
## Tips
1. **Start with quick scan**: Always start with a quick scan to see if the target is vulnerable
2. **Use comprehensive for thorough testing**: If vulnerabilities are found, use comprehensive mode
3. **Adjust timeout if needed**: Some tests may take longer on slow networks
4. **Legal use only**: Only test targets you have explicit permission to test
## Testing Resources
**⚠️ IMPORTANT**: Only test websites you own or have explicit written permission to test.
For learning and practice, you can use legitimate SQL injection testing websites designed for security education:
- **DVWA** (Damn Vulnerable Web Application) - Set up locally
- **WebGoat** - OWASP's deliberately insecure application
- **bWAPP** - Buggy Web Application for practicing
- **OWASP Juice Shop** - Modern vulnerable web application
- **Local test environments** - Set up your own vulnerable applications
Always ensure you have permission before testing any website. Unauthorized testing is illegal.

View File

@ -25,6 +25,77 @@ sqlmap works out of the box with [Python](https://www.python.org/download/) vers
Usage
----
### SQLMap CLI - Beautiful Automated Testing 🎨
**NEW**: We now have a beautiful CLI wrapper that automates comprehensive SQL injection testing in a single command!
#### Quick Start
Install dependencies:
```bash
pip install rich
```
#### Examples
**Quick scan** (default settings):
```bash
python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/products/search?q=test"
```
**Comprehensive scan** (tests all risk and level combinations):
```bash
python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/products/search?q=test" --comprehensive
```
**Custom level and risk**:
```bash
python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/products/search?q=test" --level 3 --risk 2
```
**Interactive mode**:
```bash
python sqlmapcli.py --interactive
```
*Interactive mode now prompts for POST data/body, supporting both JSON and form data.*
#### Features
**Beautiful output** with Rich library - panels, tables, progress bars
**One-line comprehensive testing** - test all risk/level combinations automatically
📊 **Clear result summaries** - vulnerability tables with color-coded findings
🎯 **Interactive mode** - guided prompts for easy testing, including POST data support
⏱️ **Progress tracking** - see exactly what's being tested in real-time
🔄 **Batch processing** - test multiple endpoints with configurable concurrency
📝 **Automatic logging** - saves all scan results to logs/ folder
#### CLI Options
```
-u, --url Target URL
-b, --batch-file JSON file with multiple endpoints
-c, --concurrency Concurrent scans for batch mode (default: 0 for auto-scale based on CPU count)
--comprehensive Run all risk/level combinations (1-3 risk, 1-5 levels)
--level {1-5} Test level (default: 1)
--risk {1-3} Test risk (default: 1)
--max-level {1-5} Maximum level for comprehensive scan
--max-risk {1-3} Maximum risk for comprehensive scan
--technique SQL injection techniques (default: BEUSTQ)
--data POST data string (JSON or form data)
--raw Show raw sqlmap output (bypasses formatting)
--verbose {0-6} Sqlmap verbosity level (default: 1)
--no-logs Disable automatic log saving
-i, --interactive Interactive mode
```
**Note**: Use `--raw` flag to see the exact same output as running sqlmap directly. This ensures you get all details that sqlmap provides without any formatting or parsing.
**Batch Mode**: Test multiple endpoints from a JSON file with concurrent scanning. See `endpoints.json.example` for format.
---
### Original SQLMap Usage
To get a list of basic options and switches use:
python sqlmap.py -h

19
endpoints.json.example Normal file
View File

@ -0,0 +1,19 @@
[
{
"url": "https://demo.owasp-juice.shop/rest/products/search?q=test"
},
{
"url": "https://demo.owasp-juice.shop/rest/user/login",
"data": {
"email": "test@example.com",
"password": "password123"
}
},
{
"url": "https://demo.owasp-juice.shop/api/Users/1",
"headers": [
"Authorization: Bearer my_secret_token",
"X-Custom-Header: value"
]
}
]

6
sql_cli/__init__.py Normal file
View File

@ -0,0 +1,6 @@
"""
SQLMap CLI Package
A beautiful CLI wrapper for sqlmap with automated testing capabilities
"""
__version__ = "1.0.0"

10
sql_cli/models.py Normal file
View File

@ -0,0 +1,10 @@
from typing import List, Dict, Optional, TypedDict
from datetime import datetime
class ScanResult(TypedDict):
total_tests: int
vulnerabilities: List[Dict[str, str]]
start_time: Optional[datetime]
end_time: Optional[datetime]
target: Optional[str]

523
sql_cli/scanner.py Normal file
View File

@ -0,0 +1,523 @@
from rich.table import Table
import sys
import subprocess
import json
import os
import tempfile
import shutil
from datetime import datetime
from typing import List, Dict, Tuple, Optional, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
from rich.console import Console
from rich.progress import (
Progress,
SpinnerColumn,
TextColumn,
BarColumn,
TimeElapsedColumn,
)
from rich.panel import Panel
from rich import box
from .models import ScanResult
from .utils import SQLMAP_PATH, get_log_filename, save_log
from .ui import display_summary, display_batch_results
console = Console()
class SQLMapScanner:
def __init__(self, enable_logging: bool = True):
self.enable_logging = enable_logging
self.results: ScanResult = {
"total_tests": 0,
"vulnerabilities": [],
"start_time": None,
"end_time": None,
"target": None,
}
def run_sqlmap_test(
self,
url: str,
level: int,
risk: int,
technique: str = "BEUSTQ",
batch: bool = True,
data: Optional[str] = None,
headers: Optional[str] = None,
verbose: int = 1,
extra_args: Optional[List[str]] = None,
progress: Optional[Progress] = None,
task_id: Any = None,
) -> Tuple[bool, str]:
"""Run sqlmap with specified parameters and optional real-time progress"""
cmd = [
sys.executable,
str(SQLMAP_PATH),
"-u",
url,
f"--level={level}",
f"--risk={risk}",
f"--technique={technique}",
"-v",
str(verbose),
]
if batch:
cmd.append("--batch")
if data:
cmd.extend(["--data", data, "--method", "POST"])
if headers:
cmd.extend(["--headers", headers])
if extra_args:
cmd.extend(extra_args)
# Create a unique temporary directory for this run to avoid session database locks
# which are the primary cause of concurrency bottlenecks in sqlmap
tmp_output_dir = tempfile.mkdtemp(prefix="sqlmap_scan_")
cmd.extend(["--output-dir", tmp_output_dir])
try:
if progress and task_id:
# Run with real-time output parsing
process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
)
output_lines = []
if process.stdout is None:
return False, "Failed to capture sqlmap output"
for line in process.stdout:
output_lines.append(line)
# Update progress description based on sqlmap status
clean_line = line.strip()
if "[INFO]" in clean_line:
status = clean_line.split("[INFO]", 1)[1].strip()
# Clean up status message
if "testing" in status.lower():
progress.update(
task_id, description=f"[cyan]{status[:50]}[/cyan]"
)
elif "detecting" in status.lower():
progress.update(
task_id, description=f"[yellow]{status[:50]}[/yellow]"
)
elif "identified" in status.lower():
progress.update(
task_id, description=f"[green]{status[:50]}[/green]"
)
process.wait()
full_output = "".join(output_lines)
# Cleanup temporary output directory
try:
shutil.rmtree(tmp_output_dir)
except Exception as cleanup_error:
console.log(
f"Failed to remove temporary sqlmap output directory {tmp_output_dir!r}: {cleanup_error}"
)
return process.returncode == 0, full_output
else:
# Run without progress (non-interactive)
result = subprocess.run(
cmd, capture_output=True, text=True, timeout=600
)
# Cleanup temporary output directory
try:
shutil.rmtree(tmp_output_dir)
except Exception as cleanup_error:
console.log(
f"Failed to remove temporary sqlmap output directory {tmp_output_dir!r}: {cleanup_error}"
)
return result.returncode == 0, result.stdout + result.stderr
except subprocess.TimeoutExpired:
# Cleanup on timeout
try:
shutil.rmtree(tmp_output_dir)
except Exception as cleanup_error:
console.log(
f"Failed to remove temporary sqlmap output directory {tmp_output_dir!r} after timeout: {cleanup_error}"
)
return False, "Test timed out after 10 minutes"
except Exception as e:
# Cleanup on error
try:
shutil.rmtree(tmp_output_dir)
except Exception as cleanup_error:
console.log(
f"Failed to remove temporary sqlmap output directory {tmp_output_dir!r} after error: {cleanup_error}"
)
return False, str(e)
def parse_results(self, output: str) -> Dict[str, Any]:
"""Parse sqlmap output for vulnerabilities"""
vulns = []
# Look for vulnerability indicators
if "sqlmap identified the following injection point" in output:
lines = output.split("\n")
current_param = "Unknown"
for i, line in enumerate(lines):
if "Parameter:" in line:
current_param = line.split("Parameter:")[1].strip()
elif "Type:" in line:
vuln_type = line.split("Type:")[1].strip()
if i + 1 < len(lines) and "Title:" in lines[i + 1]:
title = lines[i + 1].split("Title:")[1].strip()
vulns.append(
{
"parameter": current_param,
"type": vuln_type,
"title": title,
}
)
backend_dbms = None
if "back-end DBMS:" in output.lower():
for line in output.split("\n"):
if "back-end DBMS:" in line.lower():
backend_dbms = line.split(":", 1)[1].strip()
break
return {
"vulnerabilities": vulns,
"backend_dbms": backend_dbms,
"is_vulnerable": len(vulns) > 0 or "vulnerable" in output.lower(),
}
def comprehensive_scan(
self,
url: str,
max_level: int = 5,
max_risk: int = 3,
techniques: str = "BEUSTQ",
data: Optional[str] = None,
headers: Optional[str] = None,
verbose: int = 1,
):
"""Run comprehensive scan with all levels and risks"""
self.results["target"] = url
self.results["start_time"] = datetime.now()
results_table = Table(title="Scan Results", box=box.ROUNDED)
results_table.add_column("Level", style="cyan", justify="center")
results_table.add_column("Risk", style="yellow", justify="center")
results_table.add_column("Status", justify="center")
results_table.add_column("Findings", style="magenta")
total_tests = max_level * max_risk
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TimeElapsedColumn(),
console=console,
) as progress:
overall_task = progress.add_task(
f"[cyan]Scanning {url}...", total=total_tests
)
for level in range(1, max_level + 1):
for risk in range(1, max_risk + 1):
progress.update(
overall_task,
description=f"[cyan]Testing Level {level}, Risk {risk}...[/cyan]",
)
success, output = self.run_sqlmap_test(
url,
level,
risk,
techniques,
data=data,
headers=headers,
verbose=max(verbose, 3),
progress=progress,
task_id=overall_task,
)
parsed = self.parse_results(output)
status = "" if success else ""
status_style = "green" if success else "red"
findings = (
"No vulnerabilities"
if not parsed["is_vulnerable"]
else f"{len(parsed['vulnerabilities'])} found!"
)
findings_style = (
"green" if not parsed["is_vulnerable"] else "bold red"
)
if parsed["is_vulnerable"]:
# Deduplicate vulnerabilities across different level/risk combinations
existing_keys = set()
for v in self.results["vulnerabilities"]:
if isinstance(v, dict):
param = v.get("parameter")
vtype = v.get("type")
title = v.get("title")
else:
param = getattr(v, "parameter", None)
vtype = getattr(v, "type", None)
title = getattr(v, "title", None)
existing_keys.add((param, vtype, title))
for v in parsed["vulnerabilities"]:
if isinstance(v, dict):
param = v.get("parameter")
vtype = v.get("type")
title = v.get("title")
else:
param = getattr(v, "parameter", None)
vtype = getattr(v, "type", None)
title = getattr(v, "title", None)
key = (param, vtype, title)
if key not in existing_keys:
self.results["vulnerabilities"].append(v)
existing_keys.add(key)
results_table.add_row(
str(level),
str(risk),
f"[{status_style}]{status}[/{status_style}]",
f"[{findings_style}]{findings}[/{findings_style}]",
)
progress.update(overall_task, advance=1)
self.results["total_tests"] += 1
self.results["end_time"] = datetime.now()
console.print()
console.print(results_table)
display_summary(self.results)
def quick_scan(
self,
url: str,
level: int = 1,
risk: int = 1,
data: Optional[str] = None,
headers: Optional[str] = None,
raw: bool = False,
verbose: int = 1,
):
"""Run a quick scan with default settings"""
self.results["target"] = url
self.results["start_time"] = datetime.now()
if not raw:
scan_info = f"[cyan]Running quick scan on:[/cyan]\n[yellow]{url}[/yellow]\n[dim]Level: {level}, Risk: {risk}[/dim]"
if data:
scan_info += f"\n[dim]POST Data: {data}[/dim]"
if headers:
scan_info += f"\n[dim]Headers: {headers}[/dim]"
console.print(Panel(scan_info, border_style="cyan", box=box.ROUNDED))
if raw:
console.print("[cyan]Running sqlmap...[/cyan]\n")
success, output = self.run_sqlmap_test(
url, level, risk, data=data, headers=headers, verbose=verbose
)
console.print(output)
if self.enable_logging:
log_file = get_log_filename(url)
save_log(log_file, output)
return
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
TimeElapsedColumn(),
console=console,
) as progress:
task = progress.add_task(f"[cyan]Scanning {url[:40]}...", total=None)
success, output = self.run_sqlmap_test(
url,
level,
risk,
data=data,
headers=headers,
verbose=max(verbose, 3),
progress=progress,
task_id=task,
)
progress.update(
task, completed=True, description="[green]✓ Scan Complete[/green]"
)
if self.enable_logging:
log_file = get_log_filename(url)
save_log(log_file, output)
parsed = self.parse_results(output)
self.results["vulnerabilities"] = parsed["vulnerabilities"]
self.results["total_tests"] = 1
self.results["end_time"] = datetime.now()
display_summary(self.results)
def process_single_endpoint(
self,
endpoint: Dict,
level: int,
risk: int,
verbose: int,
progress: Optional[Progress] = None,
task_id: Any = None,
) -> Dict:
"""Process a single endpoint for batch mode"""
url = str(endpoint.get("url")) if endpoint.get("url") else ""
data = endpoint.get("data")
if data is not None and not isinstance(data, str):
data = json.dumps(data)
headers = endpoint.get("headers")
if headers is not None and isinstance(headers, list):
headers = "\n".join(headers)
try:
# Force verbosity 3 for better progress tracking if in batch mode
# unless a specific high verbosity is already requested
exec_verbose = max(verbose, 3) if progress else verbose
success, output = self.run_sqlmap_test(
url,
level,
risk,
data=data,
headers=headers,
verbose=exec_verbose,
progress=progress,
task_id=task_id,
)
if self.enable_logging:
log_file = get_log_filename(url)
save_log(log_file, output)
parsed = self.parse_results(output)
if progress and task_id:
status_color = "red" if parsed["is_vulnerable"] else "green"
status_text = "Vulnerable" if parsed["is_vulnerable"] else "Clean"
progress.update(
task_id,
description=f"[{status_color}]✓ {status_text}[/{status_color}] - {url[:30]}...",
completed=100,
)
return {
"url": url,
"data": data,
"success": success,
"vulnerabilities": parsed["vulnerabilities"],
"is_vulnerable": parsed["is_vulnerable"],
}
except Exception as e:
if progress and task_id:
progress.update(
task_id,
description=f"[bold red]✗ Error: {str(e)[:30]}[/bold red]",
completed=100,
)
return {
"url": url,
"data": data,
"success": False,
"error": str(e),
"vulnerabilities": [],
"is_vulnerable": False,
}
def batch_scan(
self,
endpoints: List[Dict],
level: int = 1,
risk: int = 1,
concurrency: int = 5,
verbose: int = 1,
):
"""Run batch scan on multiple endpoints with concurrency"""
# Determine actual concurrency
if concurrency <= 0:
# For I/O bound tasks like scanning, we can use 2x CPU count
concurrency = (os.cpu_count() or 2) * 2
console.print(
Panel(
f"[cyan]Batch Scan Mode[/cyan]\n"
f"[dim]Testing {len(endpoints)} endpoint(s) with concurrency={concurrency}[/dim]\n"
f"[dim]Level: {level}, Risk: {risk}[/dim]",
border_style="cyan",
box=box.ROUNDED,
)
)
results = []
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TimeElapsedColumn(),
console=console,
expand=True,
) as progress:
with ThreadPoolExecutor(max_workers=concurrency) as executor:
future_to_endpoint = {}
for endpoint in endpoints:
url = endpoint.get("url", "Unknown")
task_id = progress.add_task(
f"[dim]Waiting...[/dim] {url[:40]}...", total=100
)
future = executor.submit(
self.process_single_endpoint,
endpoint,
level,
risk,
verbose,
progress,
task_id,
)
future_to_endpoint[future] = endpoint
for future in as_completed(future_to_endpoint):
endpoint = future_to_endpoint[future]
try:
results.append(future.result())
except Exception as e:
results.append(
{
"url": endpoint.get("url"),
"data": endpoint.get("data"),
"success": False,
"error": str(e),
"vulnerabilities": [],
"is_vulnerable": False,
}
)
display_batch_results(results)
return results

148
sql_cli/ui.py Normal file
View File

@ -0,0 +1,148 @@
from rich.console import Console
from rich.panel import Panel
from rich.table import Table
from rich import box
from typing import List, Dict
from .models import ScanResult
console = Console()
def print_banner():
"""Display a beautiful banner"""
banner = """
CLI - Automated SQL Injection Testing
"""
console.print(banner, style="bold cyan")
console.print(
Panel(
"[yellow]⚠️ Legal Disclaimer: Only use on targets you have permission to test[/yellow]",
border_style="yellow",
box=box.ROUNDED,
)
)
console.print()
def display_summary(results: ScanResult):
"""Display a comprehensive summary of results"""
console.print()
# Calculate duration
duration = 0.0
if results["end_time"] and results["start_time"]:
duration = (results["end_time"] - results["start_time"]).total_seconds()
# Create summary panel
summary_text = f"""
[cyan]Target:[/cyan] {results["target"] or "N/A"}
[cyan]Total Tests:[/cyan] {results["total_tests"]}
[cyan]Duration:[/cyan] {duration:.2f} seconds
[cyan]Vulnerabilities Found:[/cyan] {len(results["vulnerabilities"])}
"""
console.print(
Panel(
summary_text.strip(),
title="[bold]Scan Summary[/bold]",
border_style="green" if len(results["vulnerabilities"]) == 0 else "red",
box=box.DOUBLE,
)
)
# Display vulnerabilities if found
if results["vulnerabilities"]:
console.print()
vuln_table = Table(title="⚠️ Vulnerabilities Detected", box=box.HEAVY)
vuln_table.add_column("Parameter", style="cyan")
vuln_table.add_column("Type", style="yellow")
vuln_table.add_column("Title", style="red")
for vuln in results["vulnerabilities"]:
vuln_table.add_row(
vuln.get("parameter", "N/A"),
vuln.get("type", "N/A"),
vuln.get("title", "N/A"),
)
console.print(vuln_table)
console.print()
console.print(
"[bold red]⚠️ SQL injection vulnerabilities detected! Take immediate action.[/bold red]"
)
else:
console.print()
console.print(
"[bold green]✓ No SQL injection vulnerabilities detected.[/bold green]"
)
console.print()
def display_batch_results(results: List[Dict]):
"""Display batch scan results in a table"""
console.print()
# Create results table
results_table = Table(title="Batch Scan Results", box=box.ROUNDED)
results_table.add_column("URL", style="cyan", no_wrap=False)
results_table.add_column("Status", justify="center")
results_table.add_column("Vulnerabilities", style="magenta")
vulnerable_count = 0
successful_count = 0
for result in results:
url = result["url"][:60] + "..." if len(result["url"]) > 60 else result["url"]
if result.get("error"):
status = "[red]✗ Error[/red]"
vulns = f"[red]{result['error'][:40]}[/red]"
elif result["success"]:
successful_count += 1
if result["is_vulnerable"]:
vulnerable_count += 1
status = "[red]✓ Vulnerable[/red]"
vulns = f"[red]{len(result['vulnerabilities'])} found[/red]"
else:
status = "[green]✓ Clean[/green]"
vulns = "[green]None[/green]"
else:
status = "[yellow]✗ Failed[/yellow]"
vulns = "[yellow]N/A[/yellow]"
results_table.add_row(url, status, vulns)
console.print(results_table)
# Summary
console.print()
summary = f"""
[cyan]Batch Summary:[/cyan]
Total Endpoints: {len(results)}
Successful Scans: {successful_count}
Vulnerable: [red]{vulnerable_count}[/red]
Clean: [green]{successful_count - vulnerable_count}[/green]
"""
border_color = "red" if vulnerable_count > 0 else "green"
console.print(
Panel(
summary.strip(),
title="[bold]Summary[/bold]",
border_style=border_color,
box=box.DOUBLE,
)
)
console.print()

33
sql_cli/utils.py Normal file
View File

@ -0,0 +1,33 @@
import re
import hashlib
import random
from pathlib import Path
from datetime import datetime
from rich.console import Console
console = Console()
SQLMAP_PATH = Path(__file__).parent.parent / "sqlmap.py"
LOGS_DIR = Path(__file__).parent.parent / "logs"
def get_log_filename(url: str) -> Path:
"""Generate a log filename based on URL and timestamp with hash for uniqueness"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Create a hash of the URL to ensure uniqueness
url_hash = hashlib.md5(url.encode()).hexdigest()[:8]
# Add random component for additional uniqueness in batch scenarios
random_component = random.randint(1000, 9999)
# Sanitize URL for filename (keep it readable but short)
safe_url = re.sub(r'[^\w\-_\.]', '_', url)[:30]
return LOGS_DIR / f"sqlmap_{safe_url}_{url_hash}_{timestamp}_{random_component}.log"
def save_log(log_file: Path, content: str):
"""Save content to log file"""
try:
if not LOGS_DIR.exists():
LOGS_DIR.mkdir(exist_ok=True)
with open(log_file, 'w', encoding='utf-8') as f:
f.write(content)
console.print(f"[dim]Log saved to: {log_file}[/dim]")
except Exception as e:
console.print(f"[yellow]Warning: Could not save log: {e}[/yellow]")

285
sqlmapcli.py Executable file
View File

@ -0,0 +1,285 @@
#!/usr/bin/env python3
"""
SQLMap CLI - A beautiful CLI wrapper for sqlmap
Automates comprehensive SQL injection testing with a single command
"""
import sys
import argparse
import json
from pathlib import Path
# Add the current directory to path so we can import from sql_cli
sys.path.append(str(Path(__file__).parent))
try:
from rich.console import Console
from rich.panel import Panel
from rich.prompt import Prompt, Confirm
except ImportError:
print("Error: 'rich' library is required. Install it with: pip install rich")
sys.exit(1)
from sql_cli.scanner import SQLMapScanner
from sql_cli.utils import SQLMAP_PATH
from sql_cli.ui import print_banner
console = Console()
def interactive_mode(scanner: SQLMapScanner):
"""Interactive mode for user input"""
console.print()
console.print(
Panel(
"[cyan]Interactive Mode[/cyan]\n[dim]Enter target details for SQL injection testing[/dim]",
border_style="cyan",
)
)
url = Prompt.ask("\n[cyan]Enter target URL[/cyan]")
# Ask if this is a POST request
has_data = Confirm.ask(
"[cyan]Does this request require POST data/body?[/cyan]", default=False
)
data = None
if has_data:
console.print("\n[dim]Examples:[/dim]")
console.print(
'[dim] JSON: {"email":"test@example.com","password":"pass123"}[/dim]'
)
console.print("[dim] Form: username=admin&password=secret[/dim]")
data = Prompt.ask("\n[cyan]Enter POST data/body[/cyan]")
# Ask for custom headers
has_headers = Confirm.ask(
"[cyan]Do you need to add custom headers (Auth, etc.)?[/cyan]", default=False
)
headers = None
if has_headers:
console.print("\n[dim]Example:[/dim]")
console.print(
'[dim] "Authorization: Bearer token; Cookie: PHPSESSID=..."[/dim]'
)
headers = Prompt.ask("\n[cyan]Enter headers[/cyan]")
scan_type = Prompt.ask(
"\n[cyan]Select scan type[/cyan]",
choices=["quick", "comprehensive"],
default="quick",
)
if scan_type == "quick":
# Input validation for level and risk
while True:
try:
level_str = Prompt.ask("[cyan]Test level (1-5)[/cyan]", default="1")
level = int(level_str)
if 1 <= level <= 5:
break
console.print("[red]Level must be between 1 and 5[/red]")
except ValueError:
console.print("[red]Please enter a valid number[/red]")
while True:
try:
risk_str = Prompt.ask("[cyan]Test risk (1-3)[/cyan]", default="1")
risk = int(risk_str)
if 1 <= risk <= 3:
break
console.print("[red]Risk must be between 1 and 3[/red]")
except ValueError:
console.print("[red]Please enter a valid number[/red]")
scanner.quick_scan(url, level, risk, data=data, headers=headers)
else:
# Input validation for max_level and max_risk
while True:
try:
max_level_str = Prompt.ask("[cyan]Maximum test level (1-5)[/cyan]", default="5")
max_level = int(max_level_str)
if 1 <= max_level <= 5:
break
console.print("[red]Level must be between 1 and 5[/red]")
except ValueError:
console.print("[red]Please enter a valid number[/red]")
while True:
try:
max_risk_str = Prompt.ask("[cyan]Maximum test risk (1-3)[/cyan]", default="3")
max_risk = int(max_risk_str)
if 1 <= max_risk <= 3:
break
console.print("[red]Risk must be between 1 and 3[/red]")
except ValueError:
console.print("[red]Please enter a valid number[/red]")
scanner.comprehensive_scan(url, max_level, max_risk, data=data, headers=headers)
def main():
parser = argparse.ArgumentParser(
description="SQLMap CLI - Beautiful automated SQL injection testing",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
# Quick scan with default settings (GET parameter)
python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/products/search?q=test"
# Test with POST data (JSON)
python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/user/login" --data='{"email":"test@example.com","password":"pass123"}'
# Comprehensive scan (all risk and level combinations)
python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/products/search?q=test" --comprehensive
# Batch mode - test multiple endpoints from JSON file
python sqlmapcli.py -b endpoints.json --level 2 --risk 2
# Interactive mode
python sqlmapcli.py --interactive
""",
)
parser.add_argument(
"-u", "--url", help='Target URL (e.g., "http://example.com/page?id=1")'
)
parser.add_argument(
"--comprehensive", action="store_true", help="Run comprehensive scan"
)
parser.add_argument(
"--level",
type=int,
default=1,
choices=[1, 2, 3, 4, 5],
help="Level (1-5, default: 1)",
)
parser.add_argument(
"--risk", type=int, default=1, choices=[1, 2, 3], help="Risk (1-3, default: 1)"
)
parser.add_argument(
"--max-level",
type=int,
default=5,
choices=[1, 2, 3, 4, 5],
help="Max level for comprehensive",
)
parser.add_argument(
"--max-risk",
type=int,
default=3,
choices=[1, 2, 3],
help="Max risk for comprehensive",
)
parser.add_argument(
"--technique",
type=str,
default="BEUSTQ",
help="SQL techniques (default: BEUSTQ)",
)
parser.add_argument("--data", type=str, help="POST data")
parser.add_argument("--headers", type=str, help="Extra headers")
parser.add_argument("--raw", action="store_true", help="Show raw sqlmap output")
parser.add_argument(
"--verbose", type=int, choices=[0, 1, 2, 3, 4, 5, 6], help="Verbosity (0-6)"
)
parser.add_argument(
"-i", "--interactive", action="store_true", help="Interactive mode"
)
parser.add_argument("-b", "--batch-file", type=str, help="Path to batch JSON")
parser.add_argument(
"-c",
"--concurrency",
type=int,
default=0,
help="Number of concurrent scans (default: 0 for auto-scale)",
)
parser.add_argument("--no-logs", action="store_true", help="Disable logs")
args = parser.parse_args()
scanner = SQLMapScanner(enable_logging=not args.no_logs)
print_banner()
if not SQLMAP_PATH.exists():
console.print(
f"[bold red]Error: sqlmap.py not found at {SQLMAP_PATH}[/bold red]"
)
sys.exit(1)
if args.interactive:
interactive_mode(scanner)
return
if args.batch_file:
try:
with open(args.batch_file, "r") as f:
endpoints = json.load(f)
if not isinstance(endpoints, list):
console.print(
"[bold red]Error: Batch file must contain a JSON array[/bold red]"
)
sys.exit(1)
verbose_level = args.verbose if args.verbose is not None else 1
scanner.batch_scan(
endpoints,
level=args.level,
risk=args.risk,
concurrency=args.concurrency,
verbose=verbose_level,
)
return
except FileNotFoundError:
console.print(
f"[bold red]Error: Batch file not found: {args.batch_file}[/bold red]"
)
sys.exit(1)
except json.JSONDecodeError as e:
console.print(
f"[bold red]Error: Invalid JSON in batch file '{args.batch_file}': {e}[/bold red]"
)
sys.exit(1)
except PermissionError:
console.print(
f"[bold red]Error: Permission denied when reading batch file: {args.batch_file}[/bold red]"
)
sys.exit(1)
except Exception as e:
console.print(f"[bold red]Error loading batch file: {e}[/bold red]")
sys.exit(1)
if not args.url:
console.print("[bold red]Error: URL is required[/bold red]")
parser.print_help()
sys.exit(1)
verbose_level = args.verbose if args.verbose is not None else 1
if args.comprehensive:
scanner.comprehensive_scan(
args.url,
max_level=args.max_level,
max_risk=args.max_risk,
techniques=args.technique,
data=args.data,
headers=args.headers,
verbose=verbose_level,
)
else:
scanner.quick_scan(
args.url,
level=args.level,
risk=args.risk,
data=args.data,
headers=args.headers,
raw=args.raw,
verbose=verbose_level,
)
if __name__ == "__main__":
main()

View File

@ -0,0 +1,19 @@
[
{
"url": "https://httpbin.org/get?id=1",
"headers": [
"X-Test-Header: value1",
"X-Auth-Token: secret123"
]
},
{
"url": "https://httpbin.org/post",
"data": {
"username": "admin",
"id": 123
},
"headers": [
"Content-Type: application/json"
]
}
]