Implementing additional self-test stuff (--vuln-test)

This commit is contained in:
Miroslav Stampar 2019-04-19 13:28:11 +02:00
parent bb7bd51d94
commit 10fe87fb4e
6 changed files with 79 additions and 7 deletions

View File

@ -53,7 +53,7 @@ _conn = None
_cursor = None
_server = None
def init():
def init(quiet=False):
global _conn
global _cursor
@ -62,6 +62,14 @@ def init():
_cursor.executescript(SCHEMA)
if quiet:
global print
def _(*args, **kwargs):
pass
print = _
class ThreadingServer(ThreadingMixIn, HTTPServer):
def finish_request(self, *args, **kwargs):
try:
@ -130,6 +138,9 @@ class ReqHandler(BaseHTTPRequestHandler):
self.data = data
self.do_REQUEST()
def log_message(self, format, *args):
return
def run(address=LISTEN_ADDRESS, port=LISTEN_PORT):
global _server
try:

View File

@ -2091,6 +2091,19 @@ def getConsoleWidth(default=80):
return width or default
def shellExec(cmd):
"""
Executes arbitrary shell command
>>> shellExec('echo 1').strip()
'1'
"""
try:
return subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT).communicate()[0] or ""
except Exception as ex:
return six.text_type(ex)
def clearConsoleLine(forceOutput=False):
"""
Clears current console line
@ -2597,11 +2610,12 @@ def adjustTimeDelay(lastQueryDuration, lowerStdLimit):
kb.delayCandidates = [candidate] + kb.delayCandidates[:-1]
if all((_ == candidate for _ in kb.delayCandidates)) and candidate < conf.timeSec:
conf.timeSec = candidate
if lastQueryDuration / (1.0 * conf.timeSec / candidate) > MIN_VALID_DELAYED_RESPONSE: # Note: to prevent problems with fast responses for heavy-queries like RANDOMBLOB
conf.timeSec = candidate
infoMsg = "adjusting time delay to "
infoMsg += "%d second%s due to good response times" % (conf.timeSec, 's' if conf.timeSec > 1 else '')
logger.info(infoMsg)
infoMsg = "adjusting time delay to "
infoMsg += "%d second%s due to good response times" % (conf.timeSec, 's' if conf.timeSec > 1 else '')
logger.info(infoMsg)
def getLastRequestHTTPError():
"""

View File

@ -17,7 +17,7 @@ from lib.core.enums import DBMS_DIRECTORY_NAME
from lib.core.enums import OS
# sqlmap version (<major>.<minor>.<month>.<monthly commit>)
VERSION = "1.3.4.29"
VERSION = "1.3.4.30"
TYPE = "dev" if VERSION.count('.') > 2 and VERSION.split('.')[-1] != '0' else "stable"
TYPE_COLORS = {"dev": 33, "stable": 90, "pip": 34}
VERSION_STRING = "sqlmap/%s#%s" % ('.'.join(VERSION.split('.')[:-1]) if VERSION.count('.') > 2 and VERSION.split('.')[-1] == '0' else VERSION, TYPE)

View File

@ -10,12 +10,15 @@ import doctest
import os
import re
import shutil
import subprocess
import sys
import tempfile
import threading
import time
import traceback
from extra.beep.beep import beep
from extra.vulnserver import vulnserver
from lib.controller.controller import start
from lib.core.common import checkIntegrity
from lib.core.common import clearConsoleLine
@ -23,6 +26,7 @@ from lib.core.common import dataToStdout
from lib.core.common import getUnicode
from lib.core.common import randomStr
from lib.core.common import readXmlFile
from lib.core.common import shellExec
from lib.core.data import conf
from lib.core.data import logger
from lib.core.data import paths
@ -44,6 +48,43 @@ class Failures(object):
_failures = Failures()
def vulnTest():
"""
Runs the testing against 'vulnserver'
"""
retVal = True
count, length = 0, 5
def _thread():
vulnserver.init(quiet=True)
vulnserver.run()
thread = threading.Thread(target=_thread)
thread.daemon = True
thread.start()
for options, checks in (
("--flush-session", ("Type: boolean-based blind", "Type: time-based blind", "Type: UNION query", "back-end DBMS: SQLite", "3 columns")),
("--banner --schema --dump -T users --binary-fields=surname --where 'id>3'", ("banner: '3", "INTEGER", "TEXT", "id", "name", "surname", "2 entries", "6E616D6569736E756C6C")),
("--all", ("5 entries", "luther", "blisset", "fluffy", "ming", "NULL", "nameisnull")),
("--technique=B --hex --fresh-queries --sql-query='SELECT 987654321'", ("single-thread", ": '987654321'",)),
("--technique=T --fresh-queries --sql-query='SELECT 987654321'", (": '987654321'",)),
):
output = shellExec("python sqlmap.py -u http://%s:%d/?id=1 --batch %s" % (vulnserver.LISTEN_ADDRESS, vulnserver.LISTEN_PORT, options))
if not all(check in output for check in checks):
retVal = False
count += 1
status = '%d/%d (%d%%) ' % (count, length, round(100.0 * count / length))
dataToStdout("\r[%s] [INFO] complete: %s" % (time.strftime("%X"), status))
clearConsoleLine()
if retVal:
logger.info("vuln test final result: PASSED")
else:
logger.error("vuln test final result: FAILED")
def smokeTest():
"""
Runs the basic smoke testing of a program

View File

@ -722,6 +722,9 @@ def cmdLineParser(argv=None):
parser.add_option("--live-test", dest="liveTest", action="store_true",
help=SUPPRESS_HELP)
parser.add_option("--vuln-test", dest="vulnTest", action="store_true",
help=SUPPRESS_HELP)
parser.add_option("--stop-fail", dest="stopFail", action="store_true",
help=SUPPRESS_HELP)
@ -913,7 +916,7 @@ def cmdLineParser(argv=None):
if args.dummy:
args.url = args.url or DUMMY_URL
if not any((args.direct, args.url, args.logFile, args.bulkFile, args.googleDork, args.configFile, args.requestFile, args.updateAll, args.smokeTest, args.liveTest, args.wizard, args.dependencies, args.purge, args.sitemapUrl, args.listTampers, args.hashFile)):
if not any((args.direct, args.url, args.logFile, args.bulkFile, args.googleDork, args.configFile, args.requestFile, args.updateAll, args.smokeTest, args.vulnTest, args.liveTest, args.wizard, args.dependencies, args.purge, args.sitemapUrl, args.listTampers, args.hashFile)):
errMsg = "missing a mandatory option (-d, -u, -l, -m, -r, -g, -c, -x, --list-tampers, --wizard, --update, --purge or --dependencies). "
errMsg += "Use -h for basic and -hh for advanced help\n"
parser.error(errMsg)

View File

@ -160,6 +160,9 @@ def main():
if conf.smokeTest:
from lib.core.testing import smokeTest
smokeTest()
elif conf.vulnTest:
from lib.core.testing import vulnTest
vulnTest()
elif conf.liveTest:
from lib.core.testing import liveTest
liveTest()