Add batch processing with concurrency and automatic log saving to logs folder

Co-authored-by: GilbertKrantz <90319182+GilbertKrantz@users.noreply.github.com>
This commit is contained in:
copilot-swe-agent[bot] 2026-01-07 12:30:45 +00:00
parent 3a975b79c1
commit 9803ef57df
5 changed files with 323 additions and 6 deletions

3
.gitignore vendored
View File

@ -5,4 +5,5 @@ __pycache__/
traffic.txt traffic.txt
*~ *~
req*.txt req*.txt
.idea/ .idea/
logs/

View File

@ -71,6 +71,61 @@ python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/user/login" --data='{
**Note**: The `--raw` flag ensures the CLI output matches sqlmap exactly, bypassing all formatting and parsing. **Note**: The `--raw` flag ensures the CLI output matches sqlmap exactly, bypassing all formatting and parsing.
### 7. Batch Mode - Test Multiple Endpoints
Test multiple endpoints with concurrency:
```bash
# Test multiple endpoints from a JSON file with 5 concurrent scans (default)
python sqlmapcli.py -b endpoints.json --level 2 --risk 2
# Test with higher concurrency (10 concurrent scans)
python sqlmapcli.py -b endpoints.json --level 2 --risk 2 --concurrency 10
# Test with custom settings
python sqlmapcli.py -b endpoints.json --level 3 --risk 2 --concurrency 5
```
**Batch File Format** (`endpoints.json`):
```json
[
{
"url": "https://demo.owasp-juice.shop/rest/products/search?q=test"
},
{
"url": "https://demo.owasp-juice.shop/rest/user/login",
"data": "{\"email\":\"test@example.com\",\"password\":\"password123\"}"
},
{
"url": "https://demo.owasp-juice.shop/api/Users/1"
}
]
```
**Features**:
- Tests N endpoints with M concurrency
- Automatically saves logs for each endpoint
- Displays progress and summary table
- Supports both GET and POST requests
### 8. Log Management
Logs are automatically saved to the `logs/` folder:
```bash
# Run scan with logging (default behavior)
python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/products/search?q=test"
# Log saved to: logs/sqlmap_https___demo_owasp_juice_shop_rest_produ_20260107_123456.log
# Disable logging if needed
python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/products/search?q=test" --no-logs
```
**Log Features**:
- Automatic log folder creation
- Timestamped log files
- Sanitized filenames based on URL
- Complete sqlmap output saved
## Real-World Testing Example ## Real-World Testing Example
**Using OWASP Juice Shop Demo** (a legitimate vulnerable application for security testing): **Using OWASP Juice Shop Demo** (a legitimate vulnerable application for security testing):

View File

@ -66,11 +66,15 @@ python sqlmapcli.py --interactive
📊 **Clear result summaries** - vulnerability tables with color-coded findings 📊 **Clear result summaries** - vulnerability tables with color-coded findings
🎯 **Interactive mode** - guided prompts for easy testing, including POST data support 🎯 **Interactive mode** - guided prompts for easy testing, including POST data support
⏱️ **Progress tracking** - see exactly what's being tested in real-time ⏱️ **Progress tracking** - see exactly what's being tested in real-time
🔄 **Batch processing** - test multiple endpoints with configurable concurrency
📝 **Automatic logging** - saves all scan results to logs/ folder
#### CLI Options #### CLI Options
``` ```
-u, --url Target URL -u, --url Target URL
-b, --batch-file JSON file with multiple endpoints
-c, --concurrency Concurrent scans for batch mode (default: 5)
--comprehensive Run all risk/level combinations (1-3 risk, 1-5 levels) --comprehensive Run all risk/level combinations (1-3 risk, 1-5 levels)
--level {1-5} Test level (default: 1) --level {1-5} Test level (default: 1)
--risk {1-3} Test risk (default: 1) --risk {1-3} Test risk (default: 1)
@ -80,11 +84,14 @@ python sqlmapcli.py --interactive
--data POST data string (JSON or form data) --data POST data string (JSON or form data)
--raw Show raw sqlmap output (bypasses formatting) --raw Show raw sqlmap output (bypasses formatting)
--verbose {0-6} Sqlmap verbosity level (default: 1) --verbose {0-6} Sqlmap verbosity level (default: 1)
--no-logs Disable automatic log saving
-i, --interactive Interactive mode -i, --interactive Interactive mode
``` ```
**Note**: Use `--raw` flag to see the exact same output as running sqlmap directly. This ensures you get all details that sqlmap provides without any formatting or parsing. **Note**: Use `--raw` flag to see the exact same output as running sqlmap directly. This ensures you get all details that sqlmap provides without any formatting or parsing.
**Batch Mode**: Test multiple endpoints from a JSON file with concurrent scanning. See `endpoints.json.example` for format.
--- ---
### Original SQLMap Usage ### Original SQLMap Usage

12
endpoints.json.example Normal file
View File

@ -0,0 +1,12 @@
[
{
"url": "https://demo.owasp-juice.shop/rest/products/search?q=test"
},
{
"url": "https://demo.owasp-juice.shop/rest/user/login",
"data": "{\"email\":\"test@example.com\",\"password\":\"password123\"}"
},
{
"url": "https://demo.owasp-juice.shop/api/Users/1"
}
]

View File

@ -9,9 +9,12 @@ import sys
import argparse import argparse
import time import time
import re import re
import json
import os
from pathlib import Path from pathlib import Path
from typing import List, Dict, Tuple from typing import List, Dict, Tuple
from datetime import datetime from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
try: try:
from rich.console import Console from rich.console import Console
@ -31,6 +34,7 @@ except ImportError:
console = Console() console = Console()
SQLMAP_PATH = Path(__file__).parent / "sqlmap.py" SQLMAP_PATH = Path(__file__).parent / "sqlmap.py"
LOGS_DIR = Path(__file__).parent / "logs"
# SQL injection techniques # SQL injection techniques
TECHNIQUES = { TECHNIQUES = {
@ -43,8 +47,9 @@ TECHNIQUES = {
} }
class SQLMapCLI: class SQLMapCLI:
def __init__(self): def __init__(self, enable_logging: bool = True):
self.console = Console() self.console = Console()
self.enable_logging = enable_logging
self.results = { self.results = {
'total_tests': 0, 'total_tests': 0,
'vulnerabilities': [], 'vulnerabilities': [],
@ -52,6 +57,26 @@ class SQLMapCLI:
'end_time': None, 'end_time': None,
'target': None 'target': None
} }
# Create logs directory if it doesn't exist
if self.enable_logging:
LOGS_DIR.mkdir(exist_ok=True)
def get_log_filename(self, url: str) -> Path:
"""Generate a log filename based on URL and timestamp"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# Sanitize URL for filename
safe_url = re.sub(r'[^\w\-_\.]', '_', url)[:50]
return LOGS_DIR / f"sqlmap_{safe_url}_{timestamp}.log"
def save_log(self, log_file: Path, content: str):
"""Save content to log file"""
try:
with open(log_file, 'w', encoding='utf-8') as f:
f.write(content)
self.console.print(f"[dim]Log saved to: {log_file}[/dim]")
except Exception as e:
self.console.print(f"[yellow]Warning: Could not save log: {e}[/yellow]")
def print_banner(self): def print_banner(self):
"""Display a beautiful banner""" """Display a beautiful banner"""
@ -245,6 +270,11 @@ class SQLMapCLI:
self.console.print("[cyan]Running sqlmap...[/cyan]\n") self.console.print("[cyan]Running sqlmap...[/cyan]\n")
success, output = self.run_sqlmap_test(url, level, risk, data=data, verbose=verbose) success, output = self.run_sqlmap_test(url, level, risk, data=data, verbose=verbose)
self.console.print(output) self.console.print(output)
# Save log
if self.enable_logging:
log_file = self.get_log_filename(url)
self.save_log(log_file, output)
return return
with Progress( with Progress(
@ -258,6 +288,11 @@ class SQLMapCLI:
success, output = self.run_sqlmap_test(url, level, risk, data=data, verbose=verbose) success, output = self.run_sqlmap_test(url, level, risk, data=data, verbose=verbose)
progress.update(task, completed=True) progress.update(task, completed=True)
# Save log
if self.enable_logging:
log_file = self.get_log_filename(url)
self.save_log(log_file, output)
parsed = self.parse_results(output) parsed = self.parse_results(output)
self.results['vulnerabilities'] = parsed['vulnerabilities'] self.results['vulnerabilities'] = parsed['vulnerabilities']
self.results['total_tests'] = 1 self.results['total_tests'] = 1
@ -317,6 +352,162 @@ class SQLMapCLI:
self.console.print() self.console.print()
def process_single_endpoint(self, endpoint: Dict, level: int, risk: int, verbose: int) -> Dict:
"""Process a single endpoint for batch mode"""
url = endpoint.get('url')
data = endpoint.get('data')
try:
success, output = self.run_sqlmap_test(url, level, risk, data=data, verbose=verbose)
# Save log
if self.enable_logging:
log_file = self.get_log_filename(url)
self.save_log(log_file, output)
parsed = self.parse_results(output)
return {
'url': url,
'data': data,
'success': success,
'vulnerabilities': parsed['vulnerabilities'],
'is_vulnerable': parsed['is_vulnerable']
}
except Exception as e:
return {
'url': url,
'data': data,
'success': False,
'error': str(e),
'vulnerabilities': [],
'is_vulnerable': False
}
def batch_scan(self, endpoints: List[Dict], level: int = 1, risk: int = 1,
concurrency: int = 5, verbose: int = 1):
"""Run batch scan on multiple endpoints with concurrency"""
self.console.print(
Panel(
f"[cyan]Batch Scan Mode[/cyan]\n"
f"[dim]Testing {len(endpoints)} endpoint(s) with concurrency={concurrency}[/dim]\n"
f"[dim]Level: {level}, Risk: {risk}[/dim]",
border_style="cyan",
box=box.ROUNDED
)
)
results = []
completed = 0
with Progress(
SpinnerColumn(),
TextColumn("[progress.description]{task.description}"),
BarColumn(),
TextColumn("[progress.percentage]{task.percentage:>3.0f}%"),
TextColumn("({task.completed}/{task.total})"),
TimeElapsedColumn(),
console=self.console
) as progress:
task = progress.add_task(
"[cyan]Processing endpoints...",
total=len(endpoints)
)
with ThreadPoolExecutor(max_workers=concurrency) as executor:
future_to_endpoint = {
executor.submit(
self.process_single_endpoint,
endpoint,
level,
risk,
verbose
): endpoint
for endpoint in endpoints
}
for future in as_completed(future_to_endpoint):
endpoint = future_to_endpoint[future]
try:
result = future.result()
results.append(result)
completed += 1
progress.update(task, advance=1)
except Exception as e:
results.append({
'url': endpoint.get('url'),
'data': endpoint.get('data'),
'success': False,
'error': str(e),
'vulnerabilities': [],
'is_vulnerable': False
})
completed += 1
progress.update(task, advance=1)
# Display batch results
self.display_batch_results(results)
return results
def display_batch_results(self, results: List[Dict]):
"""Display batch scan results in a table"""
self.console.print()
# Create results table
results_table = Table(title="Batch Scan Results", box=box.ROUNDED)
results_table.add_column("URL", style="cyan", no_wrap=False)
results_table.add_column("Status", justify="center")
results_table.add_column("Vulnerabilities", style="magenta")
vulnerable_count = 0
successful_count = 0
for result in results:
url = result['url'][:60] + '...' if len(result['url']) > 60 else result['url']
if result.get('error'):
status = "[red]✗ Error[/red]"
vulns = f"[red]{result['error'][:40]}[/red]"
elif result['success']:
successful_count += 1
if result['is_vulnerable']:
vulnerable_count += 1
status = "[red]✓ Vulnerable[/red]"
vulns = f"[red]{len(result['vulnerabilities'])} found[/red]"
else:
status = "[green]✓ Clean[/green]"
vulns = "[green]None[/green]"
else:
status = "[yellow]✗ Failed[/yellow]"
vulns = "[yellow]N/A[/yellow]"
results_table.add_row(url, status, vulns)
self.console.print(results_table)
# Summary
self.console.print()
summary = f"""
[cyan]Batch Summary:[/cyan]
Total Endpoints: {len(results)}
Successful Scans: {successful_count}
Vulnerable: [red]{vulnerable_count}[/red]
Clean: [green]{successful_count - vulnerable_count}[/green]
"""
border_color = "red" if vulnerable_count > 0 else "green"
self.console.print(
Panel(
summary.strip(),
title="[bold]Summary[/bold]",
border_style=border_color,
box=box.DOUBLE
)
)
self.console.print()
def interactive_mode(self): def interactive_mode(self):
"""Interactive mode for user input""" """Interactive mode for user input"""
self.console.print() self.console.print()
@ -370,8 +561,14 @@ Examples:
# Comprehensive scan (all risk and level combinations) # Comprehensive scan (all risk and level combinations)
python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/products/search?q=test" --comprehensive python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/products/search?q=test" --comprehensive
# Custom level and risk with POST data # Batch mode - test multiple endpoints from JSON file
python sqlmapcli.py -u "https://demo.owasp-juice.shop/rest/user/login" --data='{"email":"test@example.com","password":"pass123"}' --level 3 --risk 2 python sqlmapcli.py -b endpoints.json --level 2 --risk 2 --concurrency 10
# Batch mode example JSON file format:
# [
# {"url": "https://example.com/api/users?id=1"},
# {"url": "https://example.com/api/login", "data": "{\\"user\\":\\"test\\",\\"pass\\":\\"test\\"}"}
# ]
# Interactive mode # Interactive mode
python sqlmapcli.py --interactive python sqlmapcli.py --interactive
@ -453,9 +650,28 @@ Examples:
help='Run in interactive mode' help='Run in interactive mode'
) )
parser.add_argument(
'-b', '--batch-file',
type=str,
help='Path to JSON file containing multiple endpoints to test'
)
parser.add_argument(
'-c', '--concurrency',
type=int,
default=5,
help='Number of concurrent scans for batch mode (default: 5)'
)
parser.add_argument(
'--no-logs',
action='store_true',
help='Disable saving logs to the logs folder'
)
args = parser.parse_args() args = parser.parse_args()
cli = SQLMapCLI() cli = SQLMapCLI(enable_logging=not args.no_logs)
cli.print_banner() cli.print_banner()
# Check if sqlmap exists # Check if sqlmap exists
@ -472,9 +688,35 @@ Examples:
cli.interactive_mode() cli.interactive_mode()
return return
# Batch mode
if args.batch_file:
try:
with open(args.batch_file, 'r') as f:
endpoints = json.load(f)
if not isinstance(endpoints, list):
console.print("[bold red]Error: Batch file must contain a JSON array of endpoints[/bold red]")
sys.exit(1)
verbose_level = args.verbose if args.verbose is not None else 1
cli.batch_scan(
endpoints,
level=args.level,
risk=args.risk,
concurrency=args.concurrency,
verbose=verbose_level
)
return
except FileNotFoundError:
console.print(f"[bold red]Error: Batch file not found: {args.batch_file}[/bold red]")
sys.exit(1)
except json.JSONDecodeError as e:
console.print(f"[bold red]Error: Invalid JSON in batch file: {e}[/bold red]")
sys.exit(1)
# Check if URL is provided # Check if URL is provided
if not args.url: if not args.url:
console.print("[bold red]Error: URL is required (use -u or --interactive)[/bold red]") console.print("[bold red]Error: URL is required (use -u, -b, or --interactive)[/bold red]")
parser.print_help() parser.print_help()
sys.exit(1) sys.exit(1)