From 9803ef57df7cd340f734b39ca4e0fcfa111f0cf0 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Wed, 7 Jan 2026 12:30:45 +0000 Subject: [PATCH] Add batch processing with concurrency and automatic log saving to logs folder Co-authored-by: GilbertKrantz <90319182+GilbertKrantz@users.noreply.github.com> --- .gitignore | 3 +- EXAMPLES.md | 55 +++++++++ README.md | 7 ++ endpoints.json.example | 12 ++ sqlmapcli.py | 252 ++++++++++++++++++++++++++++++++++++++++- 5 files changed, 323 insertions(+), 6 deletions(-) create mode 100644 endpoints.json.example diff --git a/.gitignore b/.gitignore index 1f7f94a3b..d1cbbb82b 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,5 @@ __pycache__/ traffic.txt *~ req*.txt -.idea/ \ No newline at end of file +.idea/ +logs/ \ No newline at end of file diff --git a/EXAMPLES.md b/EXAMPLES.md index 0b30193d4..239218c13 100644 --- a/EXAMPLES.md +++ b/EXAMPLES.md @@ -71,6 +71,61 @@ python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/user/login" --data='{ **Note**: The `--raw` flag ensures the CLI output matches sqlmap exactly, bypassing all formatting and parsing. +### 7. Batch Mode - Test Multiple Endpoints +Test multiple endpoints with concurrency: + +```bash +# Test multiple endpoints from a JSON file with 5 concurrent scans (default) +python sqlmapcli.py -b endpoints.json --level 2 --risk 2 + +# Test with higher concurrency (10 concurrent scans) +python sqlmapcli.py -b endpoints.json --level 2 --risk 2 --concurrency 10 + +# Test with custom settings +python sqlmapcli.py -b endpoints.json --level 3 --risk 2 --concurrency 5 +``` + +**Batch File Format** (`endpoints.json`): +```json +[ + { + "url": "https://demo.owasp-juice.shop/rest/products/search?q=test" + }, + { + "url": "https://demo.owasp-juice.shop/rest/user/login", + "data": "{\"email\":\"test@example.com\",\"password\":\"password123\"}" + }, + { + "url": "https://demo.owasp-juice.shop/api/Users/1" + } +] +``` + +**Features**: +- Tests N endpoints with M concurrency +- Automatically saves logs for each endpoint +- Displays progress and summary table +- Supports both GET and POST requests + +### 8. Log Management + +Logs are automatically saved to the `logs/` folder: + +```bash +# Run scan with logging (default behavior) +python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/products/search?q=test" +# Log saved to: logs/sqlmap_https___demo_owasp_juice_shop_rest_produ_20260107_123456.log + +# Disable logging if needed +python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/products/search?q=test" --no-logs +``` + +**Log Features**: +- Automatic log folder creation +- Timestamped log files +- Sanitized filenames based on URL +- Complete sqlmap output saved + ## Real-World Testing Example **Using OWASP Juice Shop Demo** (a legitimate vulnerable application for security testing): diff --git a/README.md b/README.md index 5ddba9cd5..93bba356f 100644 --- a/README.md +++ b/README.md @@ -66,11 +66,15 @@ python sqlmapcli.py --interactive 📊 **Clear result summaries** - vulnerability tables with color-coded findings 🎯 **Interactive mode** - guided prompts for easy testing, including POST data support ⏱️ **Progress tracking** - see exactly what's being tested in real-time +🔄 **Batch processing** - test multiple endpoints with configurable concurrency +📝 **Automatic logging** - saves all scan results to logs/ folder #### CLI Options ``` -u, --url Target URL +-b, --batch-file JSON file with multiple endpoints +-c, --concurrency Concurrent scans for batch mode (default: 5) --comprehensive Run all risk/level combinations (1-3 risk, 1-5 levels) --level {1-5} Test level (default: 1) --risk {1-3} Test risk (default: 1) @@ -80,11 +84,14 @@ python sqlmapcli.py --interactive --data POST data string (JSON or form data) --raw Show raw sqlmap output (bypasses formatting) --verbose {0-6} Sqlmap verbosity level (default: 1) +--no-logs Disable automatic log saving -i, --interactive Interactive mode ``` **Note**: Use `--raw` flag to see the exact same output as running sqlmap directly. This ensures you get all details that sqlmap provides without any formatting or parsing. +**Batch Mode**: Test multiple endpoints from a JSON file with concurrent scanning. See `endpoints.json.example` for format. + --- ### Original SQLMap Usage diff --git a/endpoints.json.example b/endpoints.json.example new file mode 100644 index 000000000..8afdd417b --- /dev/null +++ b/endpoints.json.example @@ -0,0 +1,12 @@ +[ + { + "url": "https://demo.owasp-juice.shop/rest/products/search?q=test" + }, + { + "url": "https://demo.owasp-juice.shop/rest/user/login", + "data": "{\"email\":\"test@example.com\",\"password\":\"password123\"}" + }, + { + "url": "https://demo.owasp-juice.shop/api/Users/1" + } +] diff --git a/sqlmapcli.py b/sqlmapcli.py index 41b2847fe..4b65b3643 100755 --- a/sqlmapcli.py +++ b/sqlmapcli.py @@ -9,9 +9,12 @@ import sys import argparse import time import re +import json +import os from pathlib import Path from typing import List, Dict, Tuple from datetime import datetime +from concurrent.futures import ThreadPoolExecutor, as_completed try: from rich.console import Console @@ -31,6 +34,7 @@ except ImportError: console = Console() SQLMAP_PATH = Path(__file__).parent / "sqlmap.py" +LOGS_DIR = Path(__file__).parent / "logs" # SQL injection techniques TECHNIQUES = { @@ -43,8 +47,9 @@ TECHNIQUES = { } class SQLMapCLI: - def __init__(self): + def __init__(self, enable_logging: bool = True): self.console = Console() + self.enable_logging = enable_logging self.results = { 'total_tests': 0, 'vulnerabilities': [], @@ -52,6 +57,26 @@ class SQLMapCLI: 'end_time': None, 'target': None } + + # Create logs directory if it doesn't exist + if self.enable_logging: + LOGS_DIR.mkdir(exist_ok=True) + + def get_log_filename(self, url: str) -> Path: + """Generate a log filename based on URL and timestamp""" + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + # Sanitize URL for filename + safe_url = re.sub(r'[^\w\-_\.]', '_', url)[:50] + return LOGS_DIR / f"sqlmap_{safe_url}_{timestamp}.log" + + def save_log(self, log_file: Path, content: str): + """Save content to log file""" + try: + with open(log_file, 'w', encoding='utf-8') as f: + f.write(content) + self.console.print(f"[dim]Log saved to: {log_file}[/dim]") + except Exception as e: + self.console.print(f"[yellow]Warning: Could not save log: {e}[/yellow]") def print_banner(self): """Display a beautiful banner""" @@ -245,6 +270,11 @@ class SQLMapCLI: self.console.print("[cyan]Running sqlmap...[/cyan]\n") success, output = self.run_sqlmap_test(url, level, risk, data=data, verbose=verbose) self.console.print(output) + + # Save log + if self.enable_logging: + log_file = self.get_log_filename(url) + self.save_log(log_file, output) return with Progress( @@ -258,6 +288,11 @@ class SQLMapCLI: success, output = self.run_sqlmap_test(url, level, risk, data=data, verbose=verbose) progress.update(task, completed=True) + # Save log + if self.enable_logging: + log_file = self.get_log_filename(url) + self.save_log(log_file, output) + parsed = self.parse_results(output) self.results['vulnerabilities'] = parsed['vulnerabilities'] self.results['total_tests'] = 1 @@ -317,6 +352,162 @@ class SQLMapCLI: self.console.print() + def process_single_endpoint(self, endpoint: Dict, level: int, risk: int, verbose: int) -> Dict: + """Process a single endpoint for batch mode""" + url = endpoint.get('url') + data = endpoint.get('data') + + try: + success, output = self.run_sqlmap_test(url, level, risk, data=data, verbose=verbose) + + # Save log + if self.enable_logging: + log_file = self.get_log_filename(url) + self.save_log(log_file, output) + + parsed = self.parse_results(output) + + return { + 'url': url, + 'data': data, + 'success': success, + 'vulnerabilities': parsed['vulnerabilities'], + 'is_vulnerable': parsed['is_vulnerable'] + } + except Exception as e: + return { + 'url': url, + 'data': data, + 'success': False, + 'error': str(e), + 'vulnerabilities': [], + 'is_vulnerable': False + } + + def batch_scan(self, endpoints: List[Dict], level: int = 1, risk: int = 1, + concurrency: int = 5, verbose: int = 1): + """Run batch scan on multiple endpoints with concurrency""" + self.console.print( + Panel( + f"[cyan]Batch Scan Mode[/cyan]\n" + f"[dim]Testing {len(endpoints)} endpoint(s) with concurrency={concurrency}[/dim]\n" + f"[dim]Level: {level}, Risk: {risk}[/dim]", + border_style="cyan", + box=box.ROUNDED + ) + ) + + results = [] + completed = 0 + + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + BarColumn(), + TextColumn("[progress.percentage]{task.percentage:>3.0f}%"), + TextColumn("({task.completed}/{task.total})"), + TimeElapsedColumn(), + console=self.console + ) as progress: + + task = progress.add_task( + "[cyan]Processing endpoints...", + total=len(endpoints) + ) + + with ThreadPoolExecutor(max_workers=concurrency) as executor: + future_to_endpoint = { + executor.submit( + self.process_single_endpoint, + endpoint, + level, + risk, + verbose + ): endpoint + for endpoint in endpoints + } + + for future in as_completed(future_to_endpoint): + endpoint = future_to_endpoint[future] + try: + result = future.result() + results.append(result) + completed += 1 + progress.update(task, advance=1) + except Exception as e: + results.append({ + 'url': endpoint.get('url'), + 'data': endpoint.get('data'), + 'success': False, + 'error': str(e), + 'vulnerabilities': [], + 'is_vulnerable': False + }) + completed += 1 + progress.update(task, advance=1) + + # Display batch results + self.display_batch_results(results) + + return results + + def display_batch_results(self, results: List[Dict]): + """Display batch scan results in a table""" + self.console.print() + + # Create results table + results_table = Table(title="Batch Scan Results", box=box.ROUNDED) + results_table.add_column("URL", style="cyan", no_wrap=False) + results_table.add_column("Status", justify="center") + results_table.add_column("Vulnerabilities", style="magenta") + + vulnerable_count = 0 + successful_count = 0 + + for result in results: + url = result['url'][:60] + '...' if len(result['url']) > 60 else result['url'] + + if result.get('error'): + status = "[red]✗ Error[/red]" + vulns = f"[red]{result['error'][:40]}[/red]" + elif result['success']: + successful_count += 1 + if result['is_vulnerable']: + vulnerable_count += 1 + status = "[red]✓ Vulnerable[/red]" + vulns = f"[red]{len(result['vulnerabilities'])} found[/red]" + else: + status = "[green]✓ Clean[/green]" + vulns = "[green]None[/green]" + else: + status = "[yellow]✗ Failed[/yellow]" + vulns = "[yellow]N/A[/yellow]" + + results_table.add_row(url, status, vulns) + + self.console.print(results_table) + + # Summary + self.console.print() + summary = f""" +[cyan]Batch Summary:[/cyan] + Total Endpoints: {len(results)} + Successful Scans: {successful_count} + Vulnerable: [red]{vulnerable_count}[/red] + Clean: [green]{successful_count - vulnerable_count}[/green] + """ + + border_color = "red" if vulnerable_count > 0 else "green" + self.console.print( + Panel( + summary.strip(), + title="[bold]Summary[/bold]", + border_style=border_color, + box=box.DOUBLE + ) + ) + self.console.print() + def interactive_mode(self): """Interactive mode for user input""" self.console.print() @@ -370,8 +561,14 @@ Examples: # Comprehensive scan (all risk and level combinations) python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/products/search?q=test" --comprehensive - # Custom level and risk with POST data - python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/user/login" --data='{"email":"test@example.com","password":"pass123"}' --level 3 --risk 2 + # Batch mode - test multiple endpoints from JSON file + python sqlmapcli.py -b endpoints.json --level 2 --risk 2 --concurrency 10 + + # Batch mode example JSON file format: + # [ + # {"url": "https://example.com/api/users?id=1"}, + # {"url": "https://example.com/api/login", "data": "{\\"user\\":\\"test\\",\\"pass\\":\\"test\\"}"} + # ] # Interactive mode python sqlmapcli.py --interactive @@ -453,9 +650,28 @@ Examples: help='Run in interactive mode' ) + parser.add_argument( + '-b', '--batch-file', + type=str, + help='Path to JSON file containing multiple endpoints to test' + ) + + parser.add_argument( + '-c', '--concurrency', + type=int, + default=5, + help='Number of concurrent scans for batch mode (default: 5)' + ) + + parser.add_argument( + '--no-logs', + action='store_true', + help='Disable saving logs to the logs folder' + ) + args = parser.parse_args() - cli = SQLMapCLI() + cli = SQLMapCLI(enable_logging=not args.no_logs) cli.print_banner() # Check if sqlmap exists @@ -472,9 +688,35 @@ Examples: cli.interactive_mode() return + # Batch mode + if args.batch_file: + try: + with open(args.batch_file, 'r') as f: + endpoints = json.load(f) + + if not isinstance(endpoints, list): + console.print("[bold red]Error: Batch file must contain a JSON array of endpoints[/bold red]") + sys.exit(1) + + verbose_level = args.verbose if args.verbose is not None else 1 + cli.batch_scan( + endpoints, + level=args.level, + risk=args.risk, + concurrency=args.concurrency, + verbose=verbose_level + ) + return + except FileNotFoundError: + console.print(f"[bold red]Error: Batch file not found: {args.batch_file}[/bold red]") + sys.exit(1) + except json.JSONDecodeError as e: + console.print(f"[bold red]Error: Invalid JSON in batch file: {e}[/bold red]") + sys.exit(1) + # Check if URL is provided if not args.url: - console.print("[bold red]Error: URL is required (use -u or --interactive)[/bold red]") + console.print("[bold red]Error: URL is required (use -u, -b, or --interactive)[/bold red]") parser.print_help() sys.exit(1)