#!/usr/bin/env python """ vulnserver.py - Trivial SQLi vulnerable HTTP server (Note: for testing purposes) Copyright (c) 2006-2026 sqlmap developers (https://sqlmap.org) See the file 'LICENSE' for copying permission """ from __future__ import print_function import base64 import json import random import re import sqlite3 import string import sys import threading import traceback PY3 = sys.version_info >= (3, 0) UNICODE_ENCODING = "utf-8" DEBUG = False if PY3: from http.client import INTERNAL_SERVER_ERROR from http.client import NOT_FOUND from http.client import OK from http.server import BaseHTTPRequestHandler from http.server import HTTPServer from socketserver import ThreadingMixIn from urllib.parse import parse_qs from urllib.parse import unquote_plus else: from BaseHTTPServer import BaseHTTPRequestHandler from BaseHTTPServer import HTTPServer from httplib import INTERNAL_SERVER_ERROR from httplib import NOT_FOUND from httplib import OK from SocketServer import ThreadingMixIn from urlparse import parse_qs from urllib import unquote_plus SCHEMA = """ CREATE TABLE users ( id INTEGER, name TEXT, surname TEXT, PRIMARY KEY (id) ); INSERT INTO users (id, name, surname) VALUES (1, 'luther', 'blisset'); INSERT INTO users (id, name, surname) VALUES (2, 'fluffy', 'bunny'); INSERT INTO users (id, name, surname) VALUES (3, 'wu', 'ming'); INSERT INTO users (id, name, surname) VALUES (4, 'sqlmap/1.0-dev (https://sqlmap.org)', 'user agent header'); INSERT INTO users (id, name, surname) VALUES (5, NULL, 'nameisnull'); CREATE TABLE creds ( user_id INTEGER, password_hash TEXT, FOREIGN KEY (user_id) REFERENCES users(id) ); INSERT INTO creds (user_id, password_hash) VALUES (1, 'db3a16990a0008a3b04707fdef6584a0'); INSERT INTO creds (user_id, password_hash) VALUES (2, '4db967ce67b15e7fb84c266a76684729'); INSERT INTO creds (user_id, password_hash) VALUES (3, 'f5a2950eaa10f9e99896800eacbe8275'); INSERT INTO creds (user_id, password_hash) VALUES (4, NULL); INSERT INTO creds (user_id, password_hash) VALUES (5, '179ad45c6ce2cb97cf1029e212046e81'); """ LISTEN_ADDRESS = "localhost" LISTEN_PORT = 8440 _conn = None _cursor = None _lock = None _server = None _alive = False _csrf_token = None def init(quiet=False): global _conn global _cursor global _lock global _csrf_token _csrf_token = "".join(random.sample(string.ascii_letters + string.digits, 20)) _conn = sqlite3.connect(":memory:", isolation_level=None, check_same_thread=False) _cursor = _conn.cursor() _lock = threading.Lock() _cursor.executescript(SCHEMA) if quiet: global print def _(*args, **kwargs): pass print = _ class ThreadingServer(ThreadingMixIn, HTTPServer): def finish_request(self, *args, **kwargs): try: HTTPServer.finish_request(self, *args, **kwargs) except Exception: if DEBUG: traceback.print_exc() class ReqHandler(BaseHTTPRequestHandler): def do_REQUEST(self): path, query = self.path.split('?', 1) if '?' in self.path else (self.path, "") params = {} if query: params.update(parse_qs(query)) if "