diff --git a/.gitignore b/.gitignore index 1f7f94a3b..d1cbbb82b 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,5 @@ __pycache__/ traffic.txt *~ req*.txt -.idea/ \ No newline at end of file +.idea/ +logs/ \ No newline at end of file diff --git a/EXAMPLES.md b/EXAMPLES.md new file mode 100644 index 000000000..78bb019bf --- /dev/null +++ b/EXAMPLES.md @@ -0,0 +1,230 @@ +# SQLMap CLI - Examples + +## Installation + +```bash +# Install dependencies +pip install -r requirements.txt +``` + +## Basic Usage + +### 1. Quick Scan (Default: Level 1, Risk 1) +Test a single URL with minimal risk: + +```bash +python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/products/search?q=test" +``` + +### 2. Comprehensive Scan +Test all combinations of risk (1-3) and levels (1-5) automatically: + +```bash +python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/products/search?q=test" --comprehensive +``` + +This runs **15 tests total** (5 levels × 3 risks) and provides a complete vulnerability assessment. + +### 3. Custom Level and Risk +Run a specific test configuration: + +```bash +# Medium level, medium risk +python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/products/search?q=test" --level 3 --risk 2 + +# High level, high risk +python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/products/search?q=test" --level 5 --risk 3 +``` + +### 4. Interactive Mode +Get guided prompts for easy testing: + +```bash +python sqlmapcli.py --interactive +``` + +This will ask you: +- Target URL +- Whether the request requires POST data/body +- POST data/body (if needed) - supports JSON or form data +- Scan type (quick or comprehensive) +- Custom level and risk settings + +### 5. Custom Comprehensive Scan +Limit the comprehensive scan to specific max values: + +```bash +# Test only up to level 3 and risk 2 +python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/products/search?q=test" --comprehensive --max-level 3 --max-risk 2 +``` + +### 6. Raw Output Mode +Get the exact same output as running sqlmap directly: + +```bash +# Show raw sqlmap output without formatting +python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/user/login" --data='{"email":"test@example.com","password":"pass123"}' --level 2 --risk 2 --raw + +# Increase verbosity for more details +python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/user/login" --data='{"email":"test@example.com","password":"pass123"}' --verbose 3 --raw +``` + +**Note**: The `--raw` flag ensures the CLI output matches sqlmap exactly, bypassing all formatting and parsing. + +### 7. Batch Mode - Test Multiple Endpoints +Test multiple endpoints with concurrency: + +```bash +# Test multiple endpoints from a JSON file with auto-scaled concurrency (default, typically 2x CPU cores) +python sqlmapcli.py -b endpoints.json --level 2 --risk 2 + +# Test with specific concurrency (10 concurrent scans) +python sqlmapcli.py -b endpoints.json --level 2 --risk 2 --concurrency 10 + +# Test with custom settings +python sqlmapcli.py -b endpoints.json --level 3 --risk 2 --concurrency 5 +``` + +**Batch File Format** (`endpoints.json`): +```json +[ + { + "url": "https://demo.owasp-juice.shop/rest/products/search?q=test" + }, + { + "url": "https://demo.owasp-juice.shop/rest/user/login", + "data": "{\"email\":\"test@example.com\",\"password\":\"password123\"}" + }, + { + "url": "https://demo.owasp-juice.shop/api/Users/1" + } +] +``` + +**Features**: +- Tests N endpoints with M concurrency +- Automatically saves logs for each endpoint +- Displays progress and summary table +- Supports both GET and POST requests + +### 8. Log Management + +Logs are automatically saved to the `logs/` folder: + +```bash +# Run scan with logging (default behavior) +python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/products/search?q=test" +# Log saved to: logs/sqlmap_https___demo_owasp_juice_shop_rest_produ_20260107_123456.log + +# Disable logging if needed +python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/products/search?q=test" --no-logs +``` + +**Log Features**: +- Automatic log folder creation +- Timestamped log files +- Sanitized filenames based on URL +- Complete sqlmap output saved + +## Real-World Testing Example + +**Using OWASP Juice Shop Demo** (a legitimate vulnerable application for security testing): + +```bash +# Quick scan on OWASP Juice Shop REST API with GET parameter +python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/products/search?q=test" --level 2 --risk 2 + +# Test login endpoint with POST data (JSON) +python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/user/login" --data='{"email":"test@example.com","password":"password123"}' --level 2 --risk 2 + +# Comprehensive scan on login endpoint +python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/user/login" --data='{"email":"test@example.com","password":"password123"}' --comprehensive +``` + +This is a real, legitimate target designed for security testing and learning. + +## Understanding Levels and Risks + +### Levels (1-5) +- **Level 1**: Default, tests GET and POST parameters +- **Level 2**: Adds HTTP Cookie header testing +- **Level 3**: Adds HTTP User-Agent/Referer headers testing +- **Level 4**: Deeper tests with more payloads +- **Level 5**: Maximum depth, most comprehensive + +### Risks (1-3) +- **Risk 1**: Safe for all databases, minimal intrusion +- **Risk 2**: May include time-based tests (slight delay) +- **Risk 3**: Aggressive tests (may cause OR attacks on UPDATE/INSERT) + +## Output Examples + +### Successful Scan (No Vulnerabilities) +``` +╔════════════════════════════════════════════════════ Scan Summary ════════════════════════════════════════════════════╗ +║ Target: http://example.com/page?id=1 ║ +║ Total Tests: 1 ║ +║ Duration: 12.45 seconds ║ +║ Vulnerabilities Found: 0 ║ +╚══════════════════════════════════════════════════════════════════════════════════════════════════════════════════════╝ + +✓ No SQL injection vulnerabilities detected. +``` + +### Vulnerable Target Found +``` + ⚠️ Vulnerabilities Detected +┏━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┓ +┃ Parameter ┃ Type ┃ Title ┃ +┣━━━━━━━━━━━╋━━━━━━━━━━━━━━━━━━━━━╋━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┫ +┃ id ┃ boolean-based blind ┃ AND boolean-based blind - WHERE or HAVING clause ┃ +┃ id ┃ time-based blind ┃ MySQL >= 5.0.12 AND time-based blind (query SLEEP) ┃ +┗━━━━━━━━━━━┻━━━━━━━━━━━━━━━━━━━━━┻━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┛ + +⚠️ SQL injection vulnerabilities detected! Take immediate action. +``` + +## Features Showcase + +✨ **Beautiful UI with Rich** +- Colored output for easy reading +- Progress bars showing scan status +- Tables for organized results +- Panels for important information + +⚡ **One-Line Testing** +- Run all risk/level combinations with `--comprehensive` +- No need to manually iterate through tests +- Automatic result aggregation + +📊 **Clear Summaries** +- See exactly what was tested +- Color-coded findings (red = vulnerable, green = safe) +- Detailed vulnerability tables +- Duration tracking + +🎯 **User-Friendly** +- Interactive mode for beginners +- Flexible command-line options for experts +- Clear help messages + +## Tips + +1. **Start with quick scan**: Always start with a quick scan to see if the target is vulnerable +2. **Use comprehensive for thorough testing**: If vulnerabilities are found, use comprehensive mode +3. **Adjust timeout if needed**: Some tests may take longer on slow networks +4. **Legal use only**: Only test targets you have explicit permission to test + +## Testing Resources + +**⚠️ IMPORTANT**: Only test websites you own or have explicit written permission to test. + +For learning and practice, you can use legitimate SQL injection testing websites designed for security education: + +- **DVWA** (Damn Vulnerable Web Application) - Set up locally +- **WebGoat** - OWASP's deliberately insecure application +- **bWAPP** - Buggy Web Application for practicing +- **OWASP Juice Shop** - Modern vulnerable web application +- **Local test environments** - Set up your own vulnerable applications + +Always ensure you have permission before testing any website. Unauthorized testing is illegal. diff --git a/README.md b/README.md index e85b3a043..24bb2fa99 100644 --- a/README.md +++ b/README.md @@ -25,6 +25,77 @@ sqlmap works out of the box with [Python](https://www.python.org/download/) vers Usage ---- +### SQLMap CLI - Beautiful Automated Testing 🎨 + +**NEW**: We now have a beautiful CLI wrapper that automates comprehensive SQL injection testing in a single command! + +#### Quick Start + +Install dependencies: +```bash +pip install rich +``` + +#### Examples + +**Quick scan** (default settings): +```bash +python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/products/search?q=test" +``` + +**Comprehensive scan** (tests all risk and level combinations): +```bash +python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/products/search?q=test" --comprehensive +``` + +**Custom level and risk**: +```bash +python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/products/search?q=test" --level 3 --risk 2 +``` + +**Interactive mode**: +```bash +python sqlmapcli.py --interactive +``` +*Interactive mode now prompts for POST data/body, supporting both JSON and form data.* + +#### Features + +✨ **Beautiful output** with Rich library - panels, tables, progress bars +⚡ **One-line comprehensive testing** - test all risk/level combinations automatically +📊 **Clear result summaries** - vulnerability tables with color-coded findings +🎯 **Interactive mode** - guided prompts for easy testing, including POST data support +⏱️ **Progress tracking** - see exactly what's being tested in real-time +🔄 **Batch processing** - test multiple endpoints with configurable concurrency +📝 **Automatic logging** - saves all scan results to logs/ folder + +#### CLI Options + +``` +-u, --url Target URL +-b, --batch-file JSON file with multiple endpoints +-c, --concurrency Concurrent scans for batch mode (default: 0 for auto-scale based on CPU count) +--comprehensive Run all risk/level combinations (1-3 risk, 1-5 levels) +--level {1-5} Test level (default: 1) +--risk {1-3} Test risk (default: 1) +--max-level {1-5} Maximum level for comprehensive scan +--max-risk {1-3} Maximum risk for comprehensive scan +--technique SQL injection techniques (default: BEUSTQ) +--data POST data string (JSON or form data) +--raw Show raw sqlmap output (bypasses formatting) +--verbose {0-6} Sqlmap verbosity level (default: 1) +--no-logs Disable automatic log saving +-i, --interactive Interactive mode +``` + +**Note**: Use `--raw` flag to see the exact same output as running sqlmap directly. This ensures you get all details that sqlmap provides without any formatting or parsing. + +**Batch Mode**: Test multiple endpoints from a JSON file with concurrent scanning. See `endpoints.json.example` for format. + +--- + +### Original SQLMap Usage + To get a list of basic options and switches use: python sqlmap.py -h diff --git a/endpoints.json.example b/endpoints.json.example new file mode 100644 index 000000000..4b4ae3be3 --- /dev/null +++ b/endpoints.json.example @@ -0,0 +1,19 @@ +[ + { + "url": "https://demo.owasp-juice.shop/rest/products/search?q=test" + }, + { + "url": "https://demo.owasp-juice.shop/rest/user/login", + "data": { + "email": "test@example.com", + "password": "password123" + } + }, + { + "url": "https://demo.owasp-juice.shop/api/Users/1", + "headers": [ + "Authorization: Bearer my_secret_token", + "X-Custom-Header: value" + ] + } +] diff --git a/sql_cli/__init__.py b/sql_cli/__init__.py new file mode 100644 index 000000000..5ab66d02c --- /dev/null +++ b/sql_cli/__init__.py @@ -0,0 +1,6 @@ +""" +SQLMap CLI Package +A beautiful CLI wrapper for sqlmap with automated testing capabilities +""" + +__version__ = "1.0.0" diff --git a/sql_cli/models.py b/sql_cli/models.py new file mode 100644 index 000000000..12d648424 --- /dev/null +++ b/sql_cli/models.py @@ -0,0 +1,10 @@ +from typing import List, Dict, Optional, TypedDict +from datetime import datetime + + +class ScanResult(TypedDict): + total_tests: int + vulnerabilities: List[Dict[str, str]] + start_time: Optional[datetime] + end_time: Optional[datetime] + target: Optional[str] diff --git a/sql_cli/scanner.py b/sql_cli/scanner.py new file mode 100644 index 000000000..cb94e8a2c --- /dev/null +++ b/sql_cli/scanner.py @@ -0,0 +1,523 @@ +from rich.table import Table +import sys +import subprocess +import json +import os +import tempfile +import shutil +from datetime import datetime +from typing import List, Dict, Tuple, Optional, Any +from concurrent.futures import ThreadPoolExecutor, as_completed +from rich.console import Console +from rich.progress import ( + Progress, + SpinnerColumn, + TextColumn, + BarColumn, + TimeElapsedColumn, +) +from rich.panel import Panel +from rich import box + +from .models import ScanResult +from .utils import SQLMAP_PATH, get_log_filename, save_log +from .ui import display_summary, display_batch_results + +console = Console() + + +class SQLMapScanner: + def __init__(self, enable_logging: bool = True): + self.enable_logging = enable_logging + self.results: ScanResult = { + "total_tests": 0, + "vulnerabilities": [], + "start_time": None, + "end_time": None, + "target": None, + } + + def run_sqlmap_test( + self, + url: str, + level: int, + risk: int, + technique: str = "BEUSTQ", + batch: bool = True, + data: Optional[str] = None, + headers: Optional[str] = None, + verbose: int = 1, + extra_args: Optional[List[str]] = None, + progress: Optional[Progress] = None, + task_id: Any = None, + ) -> Tuple[bool, str]: + """Run sqlmap with specified parameters and optional real-time progress""" + cmd = [ + sys.executable, + str(SQLMAP_PATH), + "-u", + url, + f"--level={level}", + f"--risk={risk}", + f"--technique={technique}", + "-v", + str(verbose), + ] + + if batch: + cmd.append("--batch") + + if data: + cmd.extend(["--data", data, "--method", "POST"]) + + if headers: + cmd.extend(["--headers", headers]) + + if extra_args: + cmd.extend(extra_args) + + # Create a unique temporary directory for this run to avoid session database locks + # which are the primary cause of concurrency bottlenecks in sqlmap + tmp_output_dir = tempfile.mkdtemp(prefix="sqlmap_scan_") + cmd.extend(["--output-dir", tmp_output_dir]) + + try: + if progress and task_id: + # Run with real-time output parsing + process = subprocess.Popen( + cmd, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1, + ) + + output_lines = [] + if process.stdout is None: + return False, "Failed to capture sqlmap output" + for line in process.stdout: + output_lines.append(line) + + # Update progress description based on sqlmap status + clean_line = line.strip() + if "[INFO]" in clean_line: + status = clean_line.split("[INFO]", 1)[1].strip() + # Clean up status message + if "testing" in status.lower(): + progress.update( + task_id, description=f"[cyan]{status[:50]}[/cyan]" + ) + elif "detecting" in status.lower(): + progress.update( + task_id, description=f"[yellow]{status[:50]}[/yellow]" + ) + elif "identified" in status.lower(): + progress.update( + task_id, description=f"[green]{status[:50]}[/green]" + ) + + process.wait() + full_output = "".join(output_lines) + + # Cleanup temporary output directory + try: + shutil.rmtree(tmp_output_dir) + except Exception as cleanup_error: + console.log( + f"Failed to remove temporary sqlmap output directory {tmp_output_dir!r}: {cleanup_error}" + ) + + return process.returncode == 0, full_output + else: + # Run without progress (non-interactive) + result = subprocess.run( + cmd, capture_output=True, text=True, timeout=600 + ) + + # Cleanup temporary output directory + try: + shutil.rmtree(tmp_output_dir) + except Exception as cleanup_error: + console.log( + f"Failed to remove temporary sqlmap output directory {tmp_output_dir!r}: {cleanup_error}" + ) + + return result.returncode == 0, result.stdout + result.stderr + except subprocess.TimeoutExpired: + # Cleanup on timeout + try: + shutil.rmtree(tmp_output_dir) + except Exception as cleanup_error: + console.log( + f"Failed to remove temporary sqlmap output directory {tmp_output_dir!r} after timeout: {cleanup_error}" + ) + return False, "Test timed out after 10 minutes" + except Exception as e: + # Cleanup on error + try: + shutil.rmtree(tmp_output_dir) + except Exception as cleanup_error: + console.log( + f"Failed to remove temporary sqlmap output directory {tmp_output_dir!r} after error: {cleanup_error}" + ) + return False, str(e) + + def parse_results(self, output: str) -> Dict[str, Any]: + """Parse sqlmap output for vulnerabilities""" + vulns = [] + + # Look for vulnerability indicators + if "sqlmap identified the following injection point" in output: + lines = output.split("\n") + current_param = "Unknown" + + for i, line in enumerate(lines): + if "Parameter:" in line: + current_param = line.split("Parameter:")[1].strip() + elif "Type:" in line: + vuln_type = line.split("Type:")[1].strip() + if i + 1 < len(lines) and "Title:" in lines[i + 1]: + title = lines[i + 1].split("Title:")[1].strip() + vulns.append( + { + "parameter": current_param, + "type": vuln_type, + "title": title, + } + ) + + backend_dbms = None + if "back-end DBMS:" in output.lower(): + for line in output.split("\n"): + if "back-end DBMS:" in line.lower(): + backend_dbms = line.split(":", 1)[1].strip() + break + + return { + "vulnerabilities": vulns, + "backend_dbms": backend_dbms, + "is_vulnerable": len(vulns) > 0 or "vulnerable" in output.lower(), + } + + def comprehensive_scan( + self, + url: str, + max_level: int = 5, + max_risk: int = 3, + techniques: str = "BEUSTQ", + data: Optional[str] = None, + headers: Optional[str] = None, + verbose: int = 1, + ): + """Run comprehensive scan with all levels and risks""" + self.results["target"] = url + self.results["start_time"] = datetime.now() + + results_table = Table(title="Scan Results", box=box.ROUNDED) + results_table.add_column("Level", style="cyan", justify="center") + results_table.add_column("Risk", style="yellow", justify="center") + results_table.add_column("Status", justify="center") + results_table.add_column("Findings", style="magenta") + + total_tests = max_level * max_risk + + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + BarColumn(), + TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), + TimeElapsedColumn(), + console=console, + ) as progress: + overall_task = progress.add_task( + f"[cyan]Scanning {url}...", total=total_tests + ) + + for level in range(1, max_level + 1): + for risk in range(1, max_risk + 1): + progress.update( + overall_task, + description=f"[cyan]Testing Level {level}, Risk {risk}...[/cyan]", + ) + + success, output = self.run_sqlmap_test( + url, + level, + risk, + techniques, + data=data, + headers=headers, + verbose=max(verbose, 3), + progress=progress, + task_id=overall_task, + ) + parsed = self.parse_results(output) + + status = "✓" if success else "✗" + status_style = "green" if success else "red" + + findings = ( + "No vulnerabilities" + if not parsed["is_vulnerable"] + else f"{len(parsed['vulnerabilities'])} found!" + ) + findings_style = ( + "green" if not parsed["is_vulnerable"] else "bold red" + ) + + if parsed["is_vulnerable"]: + # Deduplicate vulnerabilities across different level/risk combinations + existing_keys = set() + for v in self.results["vulnerabilities"]: + if isinstance(v, dict): + param = v.get("parameter") + vtype = v.get("type") + title = v.get("title") + else: + param = getattr(v, "parameter", None) + vtype = getattr(v, "type", None) + title = getattr(v, "title", None) + existing_keys.add((param, vtype, title)) + + for v in parsed["vulnerabilities"]: + if isinstance(v, dict): + param = v.get("parameter") + vtype = v.get("type") + title = v.get("title") + else: + param = getattr(v, "parameter", None) + vtype = getattr(v, "type", None) + title = getattr(v, "title", None) + + key = (param, vtype, title) + if key not in existing_keys: + self.results["vulnerabilities"].append(v) + existing_keys.add(key) + + results_table.add_row( + str(level), + str(risk), + f"[{status_style}]{status}[/{status_style}]", + f"[{findings_style}]{findings}[/{findings_style}]", + ) + + progress.update(overall_task, advance=1) + self.results["total_tests"] += 1 + + self.results["end_time"] = datetime.now() + console.print() + console.print(results_table) + display_summary(self.results) + + def quick_scan( + self, + url: str, + level: int = 1, + risk: int = 1, + data: Optional[str] = None, + headers: Optional[str] = None, + raw: bool = False, + verbose: int = 1, + ): + """Run a quick scan with default settings""" + self.results["target"] = url + self.results["start_time"] = datetime.now() + + if not raw: + scan_info = f"[cyan]Running quick scan on:[/cyan]\n[yellow]{url}[/yellow]\n[dim]Level: {level}, Risk: {risk}[/dim]" + if data: + scan_info += f"\n[dim]POST Data: {data}[/dim]" + if headers: + scan_info += f"\n[dim]Headers: {headers}[/dim]" + + console.print(Panel(scan_info, border_style="cyan", box=box.ROUNDED)) + + if raw: + console.print("[cyan]Running sqlmap...[/cyan]\n") + success, output = self.run_sqlmap_test( + url, level, risk, data=data, headers=headers, verbose=verbose + ) + console.print(output) + + if self.enable_logging: + log_file = get_log_filename(url) + save_log(log_file, output) + return + + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + TimeElapsedColumn(), + console=console, + ) as progress: + task = progress.add_task(f"[cyan]Scanning {url[:40]}...", total=None) + success, output = self.run_sqlmap_test( + url, + level, + risk, + data=data, + headers=headers, + verbose=max(verbose, 3), + progress=progress, + task_id=task, + ) + progress.update( + task, completed=True, description="[green]✓ Scan Complete[/green]" + ) + + if self.enable_logging: + log_file = get_log_filename(url) + save_log(log_file, output) + + parsed = self.parse_results(output) + self.results["vulnerabilities"] = parsed["vulnerabilities"] + self.results["total_tests"] = 1 + self.results["end_time"] = datetime.now() + + display_summary(self.results) + + def process_single_endpoint( + self, + endpoint: Dict, + level: int, + risk: int, + verbose: int, + progress: Optional[Progress] = None, + task_id: Any = None, + ) -> Dict: + """Process a single endpoint for batch mode""" + url = str(endpoint.get("url")) if endpoint.get("url") else "" + + data = endpoint.get("data") + if data is not None and not isinstance(data, str): + data = json.dumps(data) + + headers = endpoint.get("headers") + if headers is not None and isinstance(headers, list): + headers = "\n".join(headers) + + try: + # Force verbosity 3 for better progress tracking if in batch mode + # unless a specific high verbosity is already requested + exec_verbose = max(verbose, 3) if progress else verbose + + success, output = self.run_sqlmap_test( + url, + level, + risk, + data=data, + headers=headers, + verbose=exec_verbose, + progress=progress, + task_id=task_id, + ) + + if self.enable_logging: + log_file = get_log_filename(url) + save_log(log_file, output) + + parsed = self.parse_results(output) + + if progress and task_id: + status_color = "red" if parsed["is_vulnerable"] else "green" + status_text = "Vulnerable" if parsed["is_vulnerable"] else "Clean" + progress.update( + task_id, + description=f"[{status_color}]✓ {status_text}[/{status_color}] - {url[:30]}...", + completed=100, + ) + + return { + "url": url, + "data": data, + "success": success, + "vulnerabilities": parsed["vulnerabilities"], + "is_vulnerable": parsed["is_vulnerable"], + } + except Exception as e: + if progress and task_id: + progress.update( + task_id, + description=f"[bold red]✗ Error: {str(e)[:30]}[/bold red]", + completed=100, + ) + return { + "url": url, + "data": data, + "success": False, + "error": str(e), + "vulnerabilities": [], + "is_vulnerable": False, + } + + def batch_scan( + self, + endpoints: List[Dict], + level: int = 1, + risk: int = 1, + concurrency: int = 5, + verbose: int = 1, + ): + """Run batch scan on multiple endpoints with concurrency""" + + # Determine actual concurrency + if concurrency <= 0: + # For I/O bound tasks like scanning, we can use 2x CPU count + concurrency = (os.cpu_count() or 2) * 2 + + console.print( + Panel( + f"[cyan]Batch Scan Mode[/cyan]\n" + f"[dim]Testing {len(endpoints)} endpoint(s) with concurrency={concurrency}[/dim]\n" + f"[dim]Level: {level}, Risk: {risk}[/dim]", + border_style="cyan", + box=box.ROUNDED, + ) + ) + + results = [] + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + BarColumn(), + TimeElapsedColumn(), + console=console, + expand=True, + ) as progress: + with ThreadPoolExecutor(max_workers=concurrency) as executor: + future_to_endpoint = {} + + for endpoint in endpoints: + url = endpoint.get("url", "Unknown") + task_id = progress.add_task( + f"[dim]Waiting...[/dim] {url[:40]}...", total=100 + ) + future = executor.submit( + self.process_single_endpoint, + endpoint, + level, + risk, + verbose, + progress, + task_id, + ) + future_to_endpoint[future] = endpoint + + for future in as_completed(future_to_endpoint): + endpoint = future_to_endpoint[future] + try: + results.append(future.result()) + except Exception as e: + results.append( + { + "url": endpoint.get("url"), + "data": endpoint.get("data"), + "success": False, + "error": str(e), + "vulnerabilities": [], + "is_vulnerable": False, + } + ) + + display_batch_results(results) + return results diff --git a/sql_cli/ui.py b/sql_cli/ui.py new file mode 100644 index 000000000..171977474 --- /dev/null +++ b/sql_cli/ui.py @@ -0,0 +1,148 @@ +from rich.console import Console +from rich.panel import Panel +from rich.table import Table +from rich import box +from typing import List, Dict +from .models import ScanResult + +console = Console() + + +def print_banner(): + """Display a beautiful banner""" + banner = """ +╔═══════════════════════════════════════════════════════════════╗ +║ ║ +║ ███████╗ ██████╗ ██╗ ███╗ ███╗ █████╗ ██████╗ ║ +║ ██╔════╝██╔═══██╗██║ ████╗ ████║██╔══██╗██╔══██╗ ║ +║ ███████╗██║ ██║██║ ██╔████╔██║███████║██████╔╝ ║ +║ ╚════██║██║▄▄ ██║██║ ██║╚██╔╝██║██╔══██║██╔═══╝ ║ +║ ███████║╚██████╔╝███████╗██║ ╚═╝ ██║██║ ██║██║ ║ +║ ╚══════╝ ╚══▀▀═╝ ╚══════╝╚═╝ ╚═╝╚═╝ ╚═╝╚═╝ ║ +║ ║ +║ CLI - Automated SQL Injection Testing ║ +║ ║ +╚═══════════════════════════════════════════════════════════════╝ + """ + console.print(banner, style="bold cyan") + console.print( + Panel( + "[yellow]⚠️ Legal Disclaimer: Only use on targets you have permission to test[/yellow]", + border_style="yellow", + box=box.ROUNDED, + ) + ) + console.print() + + +def display_summary(results: ScanResult): + """Display a comprehensive summary of results""" + console.print() + + # Calculate duration + duration = 0.0 + if results["end_time"] and results["start_time"]: + duration = (results["end_time"] - results["start_time"]).total_seconds() + + # Create summary panel + summary_text = f""" +[cyan]Target:[/cyan] {results["target"] or "N/A"} +[cyan]Total Tests:[/cyan] {results["total_tests"]} +[cyan]Duration:[/cyan] {duration:.2f} seconds +[cyan]Vulnerabilities Found:[/cyan] {len(results["vulnerabilities"])} + """ + + console.print( + Panel( + summary_text.strip(), + title="[bold]Scan Summary[/bold]", + border_style="green" if len(results["vulnerabilities"]) == 0 else "red", + box=box.DOUBLE, + ) + ) + + # Display vulnerabilities if found + if results["vulnerabilities"]: + console.print() + vuln_table = Table(title="⚠️ Vulnerabilities Detected", box=box.HEAVY) + vuln_table.add_column("Parameter", style="cyan") + vuln_table.add_column("Type", style="yellow") + vuln_table.add_column("Title", style="red") + + for vuln in results["vulnerabilities"]: + vuln_table.add_row( + vuln.get("parameter", "N/A"), + vuln.get("type", "N/A"), + vuln.get("title", "N/A"), + ) + + console.print(vuln_table) + console.print() + console.print( + "[bold red]⚠️ SQL injection vulnerabilities detected! Take immediate action.[/bold red]" + ) + else: + console.print() + console.print( + "[bold green]✓ No SQL injection vulnerabilities detected.[/bold green]" + ) + + console.print() + + +def display_batch_results(results: List[Dict]): + """Display batch scan results in a table""" + console.print() + + # Create results table + results_table = Table(title="Batch Scan Results", box=box.ROUNDED) + results_table.add_column("URL", style="cyan", no_wrap=False) + results_table.add_column("Status", justify="center") + results_table.add_column("Vulnerabilities", style="magenta") + + vulnerable_count = 0 + successful_count = 0 + + for result in results: + url = result["url"][:60] + "..." if len(result["url"]) > 60 else result["url"] + + if result.get("error"): + status = "[red]✗ Error[/red]" + vulns = f"[red]{result['error'][:40]}[/red]" + elif result["success"]: + successful_count += 1 + if result["is_vulnerable"]: + vulnerable_count += 1 + status = "[red]✓ Vulnerable[/red]" + vulns = f"[red]{len(result['vulnerabilities'])} found[/red]" + else: + status = "[green]✓ Clean[/green]" + vulns = "[green]None[/green]" + else: + status = "[yellow]✗ Failed[/yellow]" + vulns = "[yellow]N/A[/yellow]" + + results_table.add_row(url, status, vulns) + + console.print(results_table) + + # Summary + console.print() + summary = f""" +[cyan]Batch Summary:[/cyan] + Total Endpoints: {len(results)} + Successful Scans: {successful_count} + Vulnerable: [red]{vulnerable_count}[/red] + Clean: [green]{successful_count - vulnerable_count}[/green] + """ + + border_color = "red" if vulnerable_count > 0 else "green" + console.print( + Panel( + summary.strip(), + title="[bold]Summary[/bold]", + border_style=border_color, + box=box.DOUBLE, + ) + ) + console.print() diff --git a/sql_cli/utils.py b/sql_cli/utils.py new file mode 100644 index 000000000..d84dfd8d6 --- /dev/null +++ b/sql_cli/utils.py @@ -0,0 +1,33 @@ +import re +import hashlib +import random +from pathlib import Path +from datetime import datetime +from rich.console import Console + +console = Console() + +SQLMAP_PATH = Path(__file__).parent.parent / "sqlmap.py" +LOGS_DIR = Path(__file__).parent.parent / "logs" + +def get_log_filename(url: str) -> Path: + """Generate a log filename based on URL and timestamp with hash for uniqueness""" + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + # Create a hash of the URL to ensure uniqueness + url_hash = hashlib.md5(url.encode()).hexdigest()[:8] + # Add random component for additional uniqueness in batch scenarios + random_component = random.randint(1000, 9999) + # Sanitize URL for filename (keep it readable but short) + safe_url = re.sub(r'[^\w\-_\.]', '_', url)[:30] + return LOGS_DIR / f"sqlmap_{safe_url}_{url_hash}_{timestamp}_{random_component}.log" + +def save_log(log_file: Path, content: str): + """Save content to log file""" + try: + if not LOGS_DIR.exists(): + LOGS_DIR.mkdir(exist_ok=True) + with open(log_file, 'w', encoding='utf-8') as f: + f.write(content) + console.print(f"[dim]Log saved to: {log_file}[/dim]") + except Exception as e: + console.print(f"[yellow]Warning: Could not save log: {e}[/yellow]") diff --git a/sqlmapcli.py b/sqlmapcli.py new file mode 100755 index 000000000..00d90cfd6 --- /dev/null +++ b/sqlmapcli.py @@ -0,0 +1,285 @@ +#!/usr/bin/env python3 +""" +SQLMap CLI - A beautiful CLI wrapper for sqlmap +Automates comprehensive SQL injection testing with a single command +""" + +import sys +import argparse +import json +from pathlib import Path + +# Add the current directory to path so we can import from sql_cli +sys.path.append(str(Path(__file__).parent)) + +try: + from rich.console import Console + from rich.panel import Panel + from rich.prompt import Prompt, Confirm +except ImportError: + print("Error: 'rich' library is required. Install it with: pip install rich") + sys.exit(1) + +from sql_cli.scanner import SQLMapScanner +from sql_cli.utils import SQLMAP_PATH +from sql_cli.ui import print_banner + +console = Console() + + +def interactive_mode(scanner: SQLMapScanner): + """Interactive mode for user input""" + console.print() + console.print( + Panel( + "[cyan]Interactive Mode[/cyan]\n[dim]Enter target details for SQL injection testing[/dim]", + border_style="cyan", + ) + ) + + url = Prompt.ask("\n[cyan]Enter target URL[/cyan]") + + # Ask if this is a POST request + has_data = Confirm.ask( + "[cyan]Does this request require POST data/body?[/cyan]", default=False + ) + + data = None + if has_data: + console.print("\n[dim]Examples:[/dim]") + console.print( + '[dim] JSON: {"email":"test@example.com","password":"pass123"}[/dim]' + ) + console.print("[dim] Form: username=admin&password=secret[/dim]") + data = Prompt.ask("\n[cyan]Enter POST data/body[/cyan]") + + # Ask for custom headers + has_headers = Confirm.ask( + "[cyan]Do you need to add custom headers (Auth, etc.)?[/cyan]", default=False + ) + + headers = None + if has_headers: + console.print("\n[dim]Example:[/dim]") + console.print( + '[dim] "Authorization: Bearer token; Cookie: PHPSESSID=..."[/dim]' + ) + headers = Prompt.ask("\n[cyan]Enter headers[/cyan]") + + scan_type = Prompt.ask( + "\n[cyan]Select scan type[/cyan]", + choices=["quick", "comprehensive"], + default="quick", + ) + + if scan_type == "quick": + # Input validation for level and risk + while True: + try: + level_str = Prompt.ask("[cyan]Test level (1-5)[/cyan]", default="1") + level = int(level_str) + if 1 <= level <= 5: + break + console.print("[red]Level must be between 1 and 5[/red]") + except ValueError: + console.print("[red]Please enter a valid number[/red]") + + while True: + try: + risk_str = Prompt.ask("[cyan]Test risk (1-3)[/cyan]", default="1") + risk = int(risk_str) + if 1 <= risk <= 3: + break + console.print("[red]Risk must be between 1 and 3[/red]") + except ValueError: + console.print("[red]Please enter a valid number[/red]") + + scanner.quick_scan(url, level, risk, data=data, headers=headers) + else: + # Input validation for max_level and max_risk + while True: + try: + max_level_str = Prompt.ask("[cyan]Maximum test level (1-5)[/cyan]", default="5") + max_level = int(max_level_str) + if 1 <= max_level <= 5: + break + console.print("[red]Level must be between 1 and 5[/red]") + except ValueError: + console.print("[red]Please enter a valid number[/red]") + + while True: + try: + max_risk_str = Prompt.ask("[cyan]Maximum test risk (1-3)[/cyan]", default="3") + max_risk = int(max_risk_str) + if 1 <= max_risk <= 3: + break + console.print("[red]Risk must be between 1 and 3[/red]") + except ValueError: + console.print("[red]Please enter a valid number[/red]") + + scanner.comprehensive_scan(url, max_level, max_risk, data=data, headers=headers) + + +def main(): + parser = argparse.ArgumentParser( + description="SQLMap CLI - Beautiful automated SQL injection testing", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=""" +Examples: + # Quick scan with default settings (GET parameter) + python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/products/search?q=test" + + # Test with POST data (JSON) + python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/user/login" --data='{"email":"test@example.com","password":"pass123"}' + + # Comprehensive scan (all risk and level combinations) + python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/products/search?q=test" --comprehensive + + # Batch mode - test multiple endpoints from JSON file + python sqlmapcli.py -b endpoints.json --level 2 --risk 2 + + # Interactive mode + python sqlmapcli.py --interactive +""", + ) + + parser.add_argument( + "-u", "--url", help='Target URL (e.g., "http://example.com/page?id=1")' + ) + parser.add_argument( + "--comprehensive", action="store_true", help="Run comprehensive scan" + ) + parser.add_argument( + "--level", + type=int, + default=1, + choices=[1, 2, 3, 4, 5], + help="Level (1-5, default: 1)", + ) + parser.add_argument( + "--risk", type=int, default=1, choices=[1, 2, 3], help="Risk (1-3, default: 1)" + ) + parser.add_argument( + "--max-level", + type=int, + default=5, + choices=[1, 2, 3, 4, 5], + help="Max level for comprehensive", + ) + parser.add_argument( + "--max-risk", + type=int, + default=3, + choices=[1, 2, 3], + help="Max risk for comprehensive", + ) + parser.add_argument( + "--technique", + type=str, + default="BEUSTQ", + help="SQL techniques (default: BEUSTQ)", + ) + parser.add_argument("--data", type=str, help="POST data") + parser.add_argument("--headers", type=str, help="Extra headers") + parser.add_argument("--raw", action="store_true", help="Show raw sqlmap output") + parser.add_argument( + "--verbose", type=int, choices=[0, 1, 2, 3, 4, 5, 6], help="Verbosity (0-6)" + ) + parser.add_argument( + "-i", "--interactive", action="store_true", help="Interactive mode" + ) + parser.add_argument("-b", "--batch-file", type=str, help="Path to batch JSON") + parser.add_argument( + "-c", + "--concurrency", + type=int, + default=0, + help="Number of concurrent scans (default: 0 for auto-scale)", + ) + parser.add_argument("--no-logs", action="store_true", help="Disable logs") + + args = parser.parse_args() + + scanner = SQLMapScanner(enable_logging=not args.no_logs) + print_banner() + + if not SQLMAP_PATH.exists(): + console.print( + f"[bold red]Error: sqlmap.py not found at {SQLMAP_PATH}[/bold red]" + ) + sys.exit(1) + + if args.interactive: + interactive_mode(scanner) + return + + if args.batch_file: + try: + with open(args.batch_file, "r") as f: + endpoints = json.load(f) + + if not isinstance(endpoints, list): + console.print( + "[bold red]Error: Batch file must contain a JSON array[/bold red]" + ) + sys.exit(1) + + verbose_level = args.verbose if args.verbose is not None else 1 + scanner.batch_scan( + endpoints, + level=args.level, + risk=args.risk, + concurrency=args.concurrency, + verbose=verbose_level, + ) + return + except FileNotFoundError: + console.print( + f"[bold red]Error: Batch file not found: {args.batch_file}[/bold red]" + ) + sys.exit(1) + except json.JSONDecodeError as e: + console.print( + f"[bold red]Error: Invalid JSON in batch file '{args.batch_file}': {e}[/bold red]" + ) + sys.exit(1) + except PermissionError: + console.print( + f"[bold red]Error: Permission denied when reading batch file: {args.batch_file}[/bold red]" + ) + sys.exit(1) + except Exception as e: + console.print(f"[bold red]Error loading batch file: {e}[/bold red]") + sys.exit(1) + + if not args.url: + console.print("[bold red]Error: URL is required[/bold red]") + parser.print_help() + sys.exit(1) + + verbose_level = args.verbose if args.verbose is not None else 1 + + if args.comprehensive: + scanner.comprehensive_scan( + args.url, + max_level=args.max_level, + max_risk=args.max_risk, + techniques=args.technique, + data=args.data, + headers=args.headers, + verbose=verbose_level, + ) + else: + scanner.quick_scan( + args.url, + level=args.level, + risk=args.risk, + data=args.data, + headers=args.headers, + raw=args.raw, + verbose=verbose_level, + ) + + +if __name__ == "__main__": + main() diff --git a/test_endpoints.json.example b/test_endpoints.json.example new file mode 100644 index 000000000..18ce192c0 --- /dev/null +++ b/test_endpoints.json.example @@ -0,0 +1,19 @@ +[ + { + "url": "https://httpbin.org/get?id=1", + "headers": [ + "X-Test-Header: value1", + "X-Auth-Token: secret123" + ] + }, + { + "url": "https://httpbin.org/post", + "data": { + "username": "admin", + "id": 123 + }, + "headers": [ + "Content-Type: application/json" + ] + } +]