sqlmap/lib/core/testing.py

418 lines
14 KiB
Python
Raw Normal View History

2019-05-08 13:47:52 +03:00
#!/usr/bin/env python
2010-09-15 17:28:56 +04:00
"""
2019-01-05 23:38:52 +03:00
Copyright (c) 2006-2019 sqlmap developers (http://sqlmap.org/)
2017-10-11 15:50:46 +03:00
See the file 'LICENSE' for copying permission
2010-09-15 17:28:56 +04:00
"""
2012-12-20 13:37:20 +04:00
import codecs
2010-09-27 17:26:46 +04:00
import doctest
2019-05-06 12:41:19 +03:00
import logging
2010-09-15 17:28:56 +04:00
import os
2019-04-29 15:19:56 +03:00
import random
2010-09-26 18:02:13 +04:00
import re
import shutil
2010-09-15 17:28:56 +04:00
import sys
2010-09-26 18:02:13 +04:00
import tempfile
import threading
2010-09-26 14:47:04 +04:00
import time
import traceback
2010-09-15 17:28:56 +04:00
2012-12-11 15:02:06 +04:00
from extra.beep.beep import beep
from extra.vulnserver import vulnserver
2010-09-26 18:02:13 +04:00
from lib.controller.controller import start
2010-11-24 00:00:42 +03:00
from lib.core.common import clearConsoleLine
2010-09-26 14:47:04 +04:00
from lib.core.common import dataToStdout
from lib.core.common import randomStr
2010-10-07 02:43:04 +04:00
from lib.core.common import readXmlFile
from lib.core.common import shellExec
2019-05-03 00:51:54 +03:00
from lib.core.compat import round
2019-05-03 14:20:15 +03:00
from lib.core.compat import xrange
2019-05-06 01:54:21 +03:00
from lib.core.convert import getUnicode
2010-09-15 17:28:56 +04:00
from lib.core.data import conf
2019-05-06 12:41:19 +03:00
from lib.core.data import kb
2010-09-15 17:28:56 +04:00
from lib.core.data import logger
from lib.core.data import paths
2016-05-31 14:02:26 +03:00
from lib.core.enums import MKSTEMP_PREFIX
from lib.core.exception import SqlmapBaseException
2013-01-22 15:25:01 +04:00
from lib.core.exception import SqlmapNotVulnerableException
from lib.core.log import LOGGER_HANDLER
2010-09-26 18:02:13 +04:00
from lib.core.option import init
2013-01-29 20:23:30 +04:00
from lib.core.option import initOptions
from lib.core.option import setVerbosity
2010-09-27 17:26:46 +04:00
from lib.core.optiondict import optDict
2012-12-20 13:37:20 +04:00
from lib.core.settings import UNICODE_ENCODING
2010-09-26 18:02:13 +04:00
from lib.parse.cmdline import cmdLineParser
2010-09-15 17:28:56 +04:00
2014-01-13 21:24:49 +04:00
class Failures(object):
failedItems = None
failedParseOn = None
failedTraceBack = None
2016-09-19 16:51:28 +03:00
_failures = Failures()
2019-05-03 14:20:15 +03:00
_rand = 0
2016-09-19 16:51:28 +03:00
def vulnTest():
"""
Runs the testing against 'vulnserver'
"""
retVal = True
count, length = 0, 6
2019-04-29 15:19:56 +03:00
address, port = "127.0.0.10", random.randint(1025, 65535)
def _thread():
vulnserver.init(quiet=True)
2019-04-29 15:19:56 +03:00
vulnserver.run(address=address, port=port)
thread = threading.Thread(target=_thread)
thread.daemon = True
thread.start()
for options, checks in (
("--flush-session --identify-waf", ("CloudFlare",)),
("--flush-session --parse-errors", (": syntax error", "Type: boolean-based blind", "Type: time-based blind", "Type: UNION query", "back-end DBMS: SQLite", "3 columns")),
("--banner --schema --dump -T users --binary-fields=surname --where 'id>3'", ("banner: '3", "INTEGER", "TEXT", "id", "name", "surname", "2 entries", "6E616D6569736E756C6C")),
2019-04-29 12:58:52 +03:00
("--all --tamper=between,randomcase", ("5 entries", "luther", "blisset", "fluffy", "179ad45c6ce2cb97cf1029e212046e81", "NULL", "nameisnull", "testpass")),
("--technique=B --hex --fresh-queries --threads=4 --sql-query='SELECT 987654321'", ("length of query output", ": '987654321'",)),
2019-04-29 15:19:56 +03:00
("--technique=T --fresh-queries --sql-query='SELECT 1234'", (": '1234'",)),
):
2019-05-02 01:45:44 +03:00
output = shellExec("%s %s -u http://%s:%d/?id=1 --batch %s" % (sys.executable, os.path.join(os.path.dirname(__file__), "..", "..", "sqlmap.py"), address, port, options))
2019-04-30 14:20:31 +03:00
output = getUnicode(output)
if not all(check in output for check in checks):
retVal = False
count += 1
status = '%d/%d (%d%%) ' % (count, length, round(100.0 * count / length))
dataToStdout("\r[%s] [INFO] complete: %s" % (time.strftime("%X"), status))
clearConsoleLine()
if retVal:
logger.info("vuln test final result: PASSED")
else:
logger.error("vuln test final result: FAILED")
return retVal
2019-05-03 14:20:15 +03:00
def dirtyPatchRandom():
"""
Unifying random generated data across different Python versions
"""
def _lcg():
global _rand
a = 1140671485
c = 128201163
m = 2 ** 24
_rand = (a * _rand + c) % m
return _rand
def _randint(a, b):
_ = a + (_lcg() % (b - a + 1))
return _
def _choice(seq):
return seq[_randint(0, len(seq) - 1)]
def _sample(population, k):
return [_choice(population) for _ in xrange(k)]
def _seed(seed):
global _rand
_rand = seed
random.choice = _choice
random.randint = _randint
random.sample = _sample
random.seed = _seed
2010-09-15 17:28:56 +04:00
def smokeTest():
"""
Runs the basic smoke testing of a program
2010-09-15 17:28:56 +04:00
"""
2019-05-03 14:20:15 +03:00
dirtyPatchRandom()
2010-09-15 17:28:56 +04:00
retVal = True
2010-09-26 14:47:04 +04:00
count, length = 0, 0
2019-01-06 02:37:30 +03:00
for root, _, files in os.walk(paths.SQLMAP_ROOT_PATH):
if any(_ in root for _ in ("thirdparty", "extra")):
continue
for filename in files:
if os.path.splitext(filename)[1].lower() == ".py" and filename != "__init__.py":
length += 1
for root, _, files in os.walk(paths.SQLMAP_ROOT_PATH):
if any(_ in root for _ in ("thirdparty", "extra")):
continue
for filename in files:
if os.path.splitext(filename)[1].lower() == ".py" and filename != "__init__.py":
path = os.path.join(root, os.path.splitext(filename)[0])
path = path.replace(paths.SQLMAP_ROOT_PATH, '.')
path = path.replace(os.sep, '.').lstrip('.')
try:
__import__(path)
module = sys.modules[path]
2019-01-22 03:20:27 +03:00
except Exception as ex:
2019-01-06 02:37:30 +03:00
retVal = False
dataToStdout("\r")
2019-01-22 03:20:27 +03:00
errMsg = "smoke test failed at importing module '%s' (%s):\n%s" % (path, os.path.join(root, filename), ex)
2019-01-06 02:37:30 +03:00
logger.error(errMsg)
else:
2019-05-06 12:41:19 +03:00
logger.setLevel(logging.CRITICAL)
kb.smokeMode = True
2019-01-06 02:37:30 +03:00
(failure_count, test_count) = doctest.testmod(module)
2019-05-06 12:41:19 +03:00
kb.smokeMode = False
logger.setLevel(logging.INFO)
2019-01-06 02:37:30 +03:00
if failure_count > 0:
2010-09-15 17:28:56 +04:00
retVal = False
2019-01-06 02:37:30 +03:00
count += 1
status = '%d/%d (%d%%) ' % (count, length, round(100.0 * count / length))
dataToStdout("\r[%s] [INFO] complete: %s" % (time.strftime("%X"), status))
2010-09-26 14:47:04 +04:00
2010-11-24 00:00:42 +03:00
clearConsoleLine()
2010-09-15 17:28:56 +04:00
if retVal:
logger.info("smoke test final result: PASSED")
2010-09-15 17:28:56 +04:00
else:
logger.error("smoke test final result: FAILED")
2010-09-15 17:28:56 +04:00
return retVal
2010-09-15 17:32:42 +04:00
2010-09-27 17:26:46 +04:00
def adjustValueType(tagName, value):
for family in optDict:
2010-09-27 17:26:46 +04:00
for name, type_ in optDict[family].items():
if type(type_) == tuple:
type_ = type_[0]
if tagName == name:
if type_ == "boolean":
value = (value == "True")
elif type_ == "integer":
value = int(value)
elif type_ == "float":
value = float(value)
break
return value
2010-09-15 17:32:42 +04:00
def liveTest():
"""
Runs the test of a program against the live testing environment
2010-09-15 17:32:42 +04:00
"""
2010-09-26 18:56:55 +04:00
retVal = True
count = 0
global_ = {}
vars_ = {}
2010-10-07 02:43:04 +04:00
livetests = readXmlFile(paths.LIVE_TESTS_XML)
length = len(livetests.getElementsByTagName("case"))
element = livetests.getElementsByTagName("global")
if element:
for item in element:
2010-09-26 18:02:13 +04:00
for child in item.childNodes:
if child.nodeType == child.ELEMENT_NODE and child.hasAttribute("value"):
2010-09-27 17:26:46 +04:00
global_[child.tagName] = adjustValueType(child.tagName, child.getAttribute("value"))
element = livetests.getElementsByTagName("vars")
if element:
for item in element:
for child in item.childNodes:
if child.nodeType == child.ELEMENT_NODE and child.hasAttribute("value"):
var = child.getAttribute("value")
vars_[child.tagName] = randomStr(6) if var == "random" else var
2010-09-26 18:02:13 +04:00
for case in livetests.getElementsByTagName("case"):
2013-01-30 14:32:56 +04:00
parse_from_console_output = False
count += 1
name = None
parse = []
switches = dict(global_)
value = ""
vulnerable = True
2013-01-21 21:10:56 +04:00
result = None
if case.hasAttribute("name"):
name = case.getAttribute("name")
2010-09-26 18:02:13 +04:00
if conf.runCase and ((conf.runCase.isdigit() and conf.runCase != count) or not re.search(conf.runCase, name, re.DOTALL)):
continue
2010-09-26 18:02:13 +04:00
if case.getElementsByTagName("switches"):
for child in case.getElementsByTagName("switches")[0].childNodes:
if child.nodeType == child.ELEMENT_NODE and child.hasAttribute("value"):
2010-09-27 17:26:46 +04:00
value = replaceVars(child.getAttribute("value"), vars_)
switches[child.tagName] = adjustValueType(child.tagName, value)
2010-09-26 18:02:13 +04:00
if case.getElementsByTagName("parse"):
for item in case.getElementsByTagName("parse")[0].getElementsByTagName("item"):
2010-09-26 18:02:13 +04:00
if item.hasAttribute("value"):
value = replaceVars(item.getAttribute("value"), vars_)
if item.hasAttribute("console_output"):
2013-01-30 14:32:56 +04:00
parse_from_console_output = bool(item.getAttribute("console_output"))
2013-01-30 14:32:56 +04:00
parse.append((value, parse_from_console_output))
2010-09-26 18:02:13 +04:00
conf.verbose = global_.get("verbose", 1)
setVerbosity()
msg = "running live test case: %s (%d/%d)" % (name, count, length)
logger.info(msg)
initCase(switches, count)
test_case_fd = codecs.open(os.path.join(paths.SQLMAP_OUTPUT_PATH, "test_case"), "wb", UNICODE_ENCODING)
2013-01-19 21:11:16 +04:00
test_case_fd.write("%s\n" % name)
2013-02-03 19:39:07 +04:00
try:
result = runCase(parse)
except SqlmapNotVulnerableException:
vulnerable = False
finally:
conf.verbose = global_.get("verbose", 1)
setVerbosity()
2013-02-03 19:39:07 +04:00
2013-01-21 21:10:56 +04:00
if result is True:
logger.info("test passed")
cleanCase()
else:
2014-01-13 21:24:49 +04:00
errMsg = "test failed"
2013-01-18 17:02:35 +04:00
2016-09-19 16:51:28 +03:00
if _failures.failedItems:
errMsg += " at parsing items: %s" % ", ".join(i for i in _failures.failedItems)
2013-01-18 17:02:35 +04:00
2014-01-13 21:24:49 +04:00
errMsg += " - scan folder: %s" % paths.SQLMAP_OUTPUT_PATH
2016-09-19 16:51:28 +03:00
errMsg += " - traceback: %s" % bool(_failures.failedTraceBack)
2013-01-18 17:02:35 +04:00
if not vulnerable:
2013-01-18 17:02:35 +04:00
errMsg += " - SQL injection not detected"
logger.error(errMsg)
2013-01-19 21:11:16 +04:00
test_case_fd.write("%s\n" % errMsg)
2013-01-18 17:02:35 +04:00
2016-09-19 16:51:28 +03:00
if _failures.failedParseOn:
2013-01-18 17:02:35 +04:00
console_output_fd = codecs.open(os.path.join(paths.SQLMAP_OUTPUT_PATH, "console_output"), "wb", UNICODE_ENCODING)
2016-09-19 16:51:28 +03:00
console_output_fd.write(_failures.failedParseOn)
console_output_fd.close()
2016-09-19 16:51:28 +03:00
if _failures.failedTraceBack:
2013-01-18 17:02:35 +04:00
traceback_fd = codecs.open(os.path.join(paths.SQLMAP_OUTPUT_PATH, "traceback"), "wb", UNICODE_ENCODING)
2016-09-19 16:51:28 +03:00
traceback_fd.write(_failures.failedTraceBack)
2013-01-18 17:02:35 +04:00
traceback_fd.close()
beep()
if conf.stopFail is True:
return retVal
test_case_fd.close()
2013-01-18 17:02:35 +04:00
retVal &= bool(result)
dataToStdout("\n")
2010-09-26 18:56:55 +04:00
if retVal:
logger.info("live test final result: PASSED")
2010-09-26 18:56:55 +04:00
else:
logger.error("live test final result: FAILED")
2010-09-26 18:56:55 +04:00
return retVal
2010-09-26 18:02:13 +04:00
def initCase(switches, count):
2016-09-19 16:51:28 +03:00
_failures.failedItems = []
_failures.failedParseOn = None
_failures.failedTraceBack = None
2016-05-31 14:02:26 +03:00
paths.SQLMAP_OUTPUT_PATH = tempfile.mkdtemp(prefix="%s%d-" % (MKSTEMP_PREFIX.TESTING, count))
2011-04-30 17:20:05 +04:00
paths.SQLMAP_DUMP_PATH = os.path.join(paths.SQLMAP_OUTPUT_PATH, "%s", "dump")
paths.SQLMAP_FILES_PATH = os.path.join(paths.SQLMAP_OUTPUT_PATH, "%s", "files")
logger.debug("using output directory '%s' for this test case" % paths.SQLMAP_OUTPUT_PATH)
LOGGER_HANDLER.stream = sys.stdout = tempfile.SpooledTemporaryFile(max_size=0, mode="w+b", prefix="sqlmapstdout-")
2010-09-26 18:02:13 +04:00
cmdLineOptions = cmdLineParser()
if switches:
for key, value in switches.items():
2010-09-27 17:26:46 +04:00
if key in cmdLineOptions.__dict__:
cmdLineOptions.__dict__[key] = value
2013-01-29 20:23:30 +04:00
initOptions(cmdLineOptions, True)
init()
2010-09-26 18:56:55 +04:00
def cleanCase():
shutil.rmtree(paths.SQLMAP_OUTPUT_PATH, True)
2010-09-26 18:02:13 +04:00
2013-02-03 19:39:07 +04:00
def runCase(parse):
2012-12-19 18:34:34 +04:00
retVal = True
handled_exception = None
unhandled_exception = None
result = False
console = ""
try:
result = start()
except KeyboardInterrupt:
2013-01-26 19:33:09 +04:00
pass
2019-01-22 03:20:27 +03:00
except SqlmapBaseException as ex:
handled_exception = ex
except Exception as ex:
unhandled_exception = ex
finally:
sys.stdout.seek(0)
console = sys.stdout.read()
2012-12-19 18:34:34 +04:00
LOGGER_HANDLER.stream = sys.stdout = sys.__stdout__
if unhandled_exception:
2016-09-19 16:51:28 +03:00
_failures.failedTraceBack = "unhandled exception: %s" % str(traceback.format_exc())
2013-01-18 17:02:35 +04:00
retVal = None
elif handled_exception:
2016-09-19 16:51:28 +03:00
_failures.failedTraceBack = "handled exception: %s" % str(traceback.format_exc())
2013-01-18 17:02:35 +04:00
retVal = None
elif result is False: # this means no SQL injection has been detected - if None, ignore
2010-09-26 18:56:55 +04:00
retVal = False
2014-11-04 02:34:35 +03:00
console = getUnicode(console, encoding=sys.stdin.encoding)
if parse and retVal:
2012-12-20 13:37:20 +04:00
with codecs.open(conf.dumper.getOutputFile(), "rb", UNICODE_ENCODING) as f:
content = f.read()
2012-12-19 17:47:17 +04:00
2013-01-30 14:32:56 +04:00
for item, parse_from_console_output in parse:
parse_on = console if parse_from_console_output else content
if item.startswith("r'") and item.endswith("'"):
if not re.search(item[2:-1], parse_on, re.DOTALL):
2013-01-18 17:02:35 +04:00
retVal = None
2016-09-19 16:51:28 +03:00
_failures.failedItems.append(item)
2012-12-20 13:37:20 +04:00
elif item not in parse_on:
2013-01-18 17:02:35 +04:00
retVal = None
2016-09-19 16:51:28 +03:00
_failures.failedItems.append(item)
2010-09-26 18:56:55 +04:00
2016-09-19 16:51:28 +03:00
if _failures.failedItems:
_failures.failedParseOn = console
elif retVal is False:
2016-09-19 16:51:28 +03:00
_failures.failedParseOn = console
2010-09-26 18:56:55 +04:00
return retVal
2010-09-26 18:02:13 +04:00
def replaceVars(item, vars_):
2010-09-26 18:02:13 +04:00
retVal = item
2012-12-19 18:25:29 +04:00
if item and vars_:
2019-05-08 13:28:50 +03:00
for var in re.findall(r"\$\{([^}]+)\}", item):
if var in vars_:
retVal = retVal.replace("${%s}" % var, vars_[var])
2012-12-19 18:25:29 +04:00
2012-12-06 17:15:44 +04:00
return retVal