added Ctrl+C check in detection phase

This commit is contained in:
Miroslav Stampar 2010-12-18 10:42:09 +00:00
parent e355f92f22
commit 03220d34ba

View File

@ -77,335 +77,350 @@ def checkSqlInjection(place, parameter, value):
kb.testMode = True
for test in conf.tests:
title = test.title
stype = test.stype
clause = test.clause
try:
title = test.title
stype = test.stype
clause = test.clause
# Skip test if the user's wants to test only for a specific
# technique
if conf.technique and isinstance(conf.technique, int) and stype != conf.technique:
debugMsg = "skipping test '%s' because the user " % title
debugMsg += "specified to test only for "
debugMsg += "%s" % PAYLOAD.SQLINJECTION[conf.technique]
logger.debug(debugMsg)
continue
# Skip test if the risk is higher than the provided (or default)
# value
# Parse test's <risk>
if test.risk > conf.risk:
debugMsg = "skipping test '%s' because the risk " % title
debugMsg += "is higher than the provided"
logger.debug(debugMsg)
continue
# Skip test if the level is higher than the provided (or default)
# value
# Parse test's <level>
if test.level > conf.level:
debugMsg = "skipping test '%s' because the level " % title
debugMsg += "is higher than the provided"
logger.debug(debugMsg)
continue
# Skip DBMS-specific test if it does not match either the
# previously identified or the user's provided DBMS
if "details" in test and "dbms" in test.details:
dbms = test.details.dbms
else:
dbms = None
if dbms is not None:
if injection.dbms is not None and injection.dbms != dbms:
debugMsg = "skipping test '%s' because " % title
debugMsg += "the back-end DBMS identified is "
debugMsg += "%s" % injection.dbms
# Skip test if the user's wants to test only for a specific
# technique
if conf.technique and isinstance(conf.technique, int) and stype != conf.technique:
debugMsg = "skipping test '%s' because the user " % title
debugMsg += "specified to test only for "
debugMsg += "%s" % PAYLOAD.SQLINJECTION[conf.technique]
logger.debug(debugMsg)
continue
if conf.dbms is not None and conf.dbms.lower() != dbms.lower():
debugMsg = "skipping test '%s' because " % title
debugMsg += "the provided DBMS is %s" % conf.dbms
# Skip test if the risk is higher than the provided (or default)
# value
# Parse test's <risk>
if test.risk > conf.risk:
debugMsg = "skipping test '%s' because the risk " % title
debugMsg += "is higher than the provided"
logger.debug(debugMsg)
continue
# Skip test if it is the same SQL injection type already
# identified by another test
if injection.data and stype in injection.data:
debugMsg = "skipping test '%s' because " % title
debugMsg += "the payload for %s has " % PAYLOAD.SQLINJECTION[stype]
debugMsg += "already been identified"
logger.debug(debugMsg)
continue
# Skip test if the level is higher than the provided (or default)
# value
# Parse test's <level>
if test.level > conf.level:
debugMsg = "skipping test '%s' because the level " % title
debugMsg += "is higher than the provided"
logger.debug(debugMsg)
continue
# Skip test if it does not match the same SQL injection clause
# already identified by another test
clauseMatch = False
for clauseTest in clause:
if injection.clause is not None and clauseTest in injection.clause:
clauseMatch = True
break
if clause != [ 0 ] and injection.clause and injection.clause != [ 0 ] and not clauseMatch:
debugMsg = "skipping test '%s' because the clauses " % title
debugMsg += "differs from the clause already identified"
logger.debug(debugMsg)
continue
infoMsg = "testing '%s'" % title
logger.info(infoMsg)
# Parse test's <request>
comment = agent.getComment(test.request)
fstPayload = agent.cleanupPayload(test.request.payload, value)
fstPayload = unescapeDbms(fstPayload, injection, dbms)
fstPayload = "%s%s" % (fstPayload, comment)
if stype != 4 and clause != [2, 3] and clause != [ 3 ]:
space = " "
else:
space = ""
if conf.prefix is not None and conf.suffix is not None:
# Create a custom boundary object for user's supplied prefix
# and suffix
boundary = advancedDict()
boundary.level = 1
boundary.clause = [ 0 ]
boundary.where = [ 1, 2, 3 ]
boundary.prefix = conf.prefix
boundary.suffix = conf.suffix
if " like" in boundary.suffix.lower():
if "'" in boundary.suffix.lower():
boundary.ptype = 3
elif '"' in boundary.suffix.lower():
boundary.ptype = 5
elif "'" in boundary.suffix:
boundary.ptype = 2
elif '"' in boundary.suffix:
boundary.ptype = 4
# Skip DBMS-specific test if it does not match either the
# previously identified or the user's provided DBMS
if "details" in test and "dbms" in test.details:
dbms = test.details.dbms
else:
boundary.ptype = 1
dbms = None
# Prepend user's provided boundaries to all others boundaries
conf.boundaries.insert(0, boundary)
if dbms is not None:
if injection.dbms is not None and injection.dbms != dbms:
debugMsg = "skipping test '%s' because " % title
debugMsg += "the back-end DBMS identified is "
debugMsg += "%s" % injection.dbms
logger.debug(debugMsg)
for boundary in conf.boundaries:
injectable = False
continue
# Skip boundary if the level is higher than the provided (or
# default) value
# Parse boundary's <level>
if boundary.level > conf.level:
# NOTE: shall we report every single skipped boundary too?
if conf.dbms is not None and conf.dbms.lower() != dbms.lower():
debugMsg = "skipping test '%s' because " % title
debugMsg += "the provided DBMS is %s" % conf.dbms
logger.debug(debugMsg)
continue
# Skip test if it is the same SQL injection type already
# identified by another test
if injection.data and stype in injection.data:
debugMsg = "skipping test '%s' because " % title
debugMsg += "the payload for %s has " % PAYLOAD.SQLINJECTION[stype]
debugMsg += "already been identified"
logger.debug(debugMsg)
continue
# Skip boundary if it does not match against test's <clause>
# Parse test's <clause> and boundary's <clause>
# Skip test if it does not match the same SQL injection clause
# already identified by another test
clauseMatch = False
for clauseTest in test.clause:
if clauseTest in boundary.clause:
for clauseTest in clause:
if injection.clause is not None and clauseTest in injection.clause:
clauseMatch = True
break
if test.clause != [ 0 ] and boundary.clause != [ 0 ] and not clauseMatch:
if clause != [ 0 ] and injection.clause and injection.clause != [ 0 ] and not clauseMatch:
debugMsg = "skipping test '%s' because the clauses " % title
debugMsg += "differs from the clause already identified"
logger.debug(debugMsg)
continue
# Skip boundary if it does not match against test's <where>
# Parse test's <where> and boundary's <where>
whereMatch = False
infoMsg = "testing '%s'" % title
logger.info(infoMsg)
for where in test.where:
if where in boundary.where:
whereMatch = True
break
# Parse test's <request>
comment = agent.getComment(test.request)
fstPayload = agent.cleanupPayload(test.request.payload, value)
fstPayload = unescapeDbms(fstPayload, injection, dbms)
fstPayload = "%s%s" % (fstPayload, comment)
if not whereMatch:
continue
if stype != 4 and clause != [2, 3] and clause != [ 3 ]:
space = " "
else:
space = ""
# Parse boundary's <prefix>, <suffix> and <ptype>
prefix = boundary.prefix if boundary.prefix else ""
suffix = boundary.suffix if boundary.suffix else ""
ptype = boundary.ptype
if conf.prefix is not None and conf.suffix is not None:
# Create a custom boundary object for user's supplied prefix
# and suffix
boundary = advancedDict()
# If the previous injections succeeded, we know which prefix,
# suffix and parameter type to use for further tests, no
# need to cycle through the boundaries for the following tests
condBound = (injection.prefix is not None and injection.suffix is not None)
condBound &= (injection.prefix != prefix or injection.suffix != suffix)
condType = injection.ptype is not None and injection.ptype != ptype
boundary.level = 1
boundary.clause = [ 0 ]
boundary.where = [ 1, 2, 3 ]
boundary.prefix = conf.prefix
boundary.suffix = conf.suffix
if condBound or condType:
continue
if " like" in boundary.suffix.lower():
if "'" in boundary.suffix.lower():
boundary.ptype = 3
elif '"' in boundary.suffix.lower():
boundary.ptype = 5
elif "'" in boundary.suffix:
boundary.ptype = 2
elif '"' in boundary.suffix:
boundary.ptype = 4
else:
boundary.ptype = 1
# For each test's <where>
for where in test.where:
templatePayload = None
# Prepend user's provided boundaries to all others boundaries
conf.boundaries.insert(0, boundary)
# Threat the parameter original value according to the
# test's <where> tag
if where == 1:
origValue = value
elif where == 2:
origValue = "-%s" % randomInt()
# Use different page template than the original one
# as we are changing parameters value, which will result
# most definitely with a different content
templatePayload = agent.payload(place, parameter, value, origValue)
elif where == 3:
origValue = ""
for boundary in conf.boundaries:
injectable = False
kb.pageTemplate = getPageTemplate(templatePayload, place)
# Skip boundary if the level is higher than the provided (or
# default) value
# Parse boundary's <level>
if boundary.level > conf.level:
# NOTE: shall we report every single skipped boundary too?
continue
# Forge request payload by prepending with boundary's
# prefix and appending the boundary's suffix to the
# test's ' <payload><comment> ' string
boundPayload = "%s%s%s%s %s" % (origValue, prefix, space, fstPayload, suffix)
boundPayload = boundPayload.strip()
boundPayload = agent.cleanupPayload(boundPayload, value)
reqPayload = agent.payload(place, parameter, value, boundPayload)
# Skip boundary if it does not match against test's <clause>
# Parse test's <clause> and boundary's <clause>
clauseMatch = False
# Perform the test's request and check whether or not the
# payload was successful
# Parse test's <response>
for method, check in test.response.items():
check = agent.cleanupPayload(check, value)
for clauseTest in test.clause:
if clauseTest in boundary.clause:
clauseMatch = True
break
# In case of boolean-based blind SQL injection
if method == PAYLOAD.METHOD.COMPARISON:
sndPayload = agent.cleanupPayload(test.response.comparison, value)
sndPayload = unescapeDbms(sndPayload, injection, dbms)
sndPayload = "%s%s" % (sndPayload, comment)
if test.clause != [ 0 ] and boundary.clause != [ 0 ] and not clauseMatch:
continue
# Forge response payload by prepending with
# boundary's prefix and appending the boundary's
# suffix to the test's ' <payload><comment> '
# string
boundPayload = "%s%s%s%s %s" % (origValue, prefix, space, sndPayload, suffix)
boundPayload = boundPayload.strip()
boundPayload = agent.cleanupPayload(boundPayload, value)
cmpPayload = agent.payload(place, parameter, value, boundPayload)
# Skip boundary if it does not match against test's <where>
# Parse test's <where> and boundary's <where>
whereMatch = False
# Useful to set kb.matchRatio at first based on
# the False response content
kb.matchRatio = None
_ = Request.queryPage(cmpPayload, place)
for where in test.where:
if where in boundary.where:
whereMatch = True
break
# Perform the test's True request
trueResult = Request.queryPage(reqPayload, place)
if not whereMatch:
continue
if trueResult:
falseResult = Request.queryPage(cmpPayload, place)
# Parse boundary's <prefix>, <suffix> and <ptype>
prefix = boundary.prefix if boundary.prefix else ""
suffix = boundary.suffix if boundary.suffix else ""
ptype = boundary.ptype
# Perform the test's False request
if not falseResult:
infoMsg = "%s parameter '%s' is '%s' injectable " % (place, parameter, title)
logger.info(infoMsg)
# If the previous injections succeeded, we know which prefix,
# suffix and parameter type to use for further tests, no
# need to cycle through the boundaries for the following tests
condBound = (injection.prefix is not None and injection.suffix is not None)
condBound &= (injection.prefix != prefix or injection.suffix != suffix)
condType = injection.ptype is not None and injection.ptype != ptype
injectable = True
if condBound or condType:
continue
# In case of error-based or UNION query SQL injections
elif method == PAYLOAD.METHOD.GREP:
# Perform the test's request and grep the response
# body for the test's <grep> regular expression
reqBody, _ = Request.queryPage(reqPayload, place, content=True)
output = extractRegexResult(check, reqBody, re.DOTALL | re.IGNORECASE)
# For each test's <where>
for where in test.where:
templatePayload = None
if output:
result = output.replace(kb.misc.space, " ") == "1"
# Threat the parameter original value according to the
# test's <where> tag
if where == 1:
origValue = value
elif where == 2:
origValue = "-%s" % randomInt()
# Use different page template than the original one
# as we are changing parameters value, which will result
# most definitely with a different content
templatePayload = agent.payload(place, parameter, value, origValue)
elif where == 3:
origValue = ""
if result:
infoMsg = "%s parameter '%s' is '%s' injectable " % (place, parameter, title)
logger.info(infoMsg)
kb.pageTemplate = getPageTemplate(templatePayload, place)
injectable = True
# Forge request payload by prepending with boundary's
# prefix and appending the boundary's suffix to the
# test's ' <payload><comment> ' string
boundPayload = "%s%s%s%s %s" % (origValue, prefix, space, fstPayload, suffix)
boundPayload = boundPayload.strip()
boundPayload = agent.cleanupPayload(boundPayload, value)
reqPayload = agent.payload(place, parameter, value, boundPayload)
# In case of time-based blind or stacked queries
# SQL injections
elif method == PAYLOAD.METHOD.TIME:
# Store old value of socket timeout
pushValue(socket.getdefaulttimeout())
# Perform the test's request and check whether or not the
# payload was successful
# Parse test's <response>
for method, check in test.response.items():
check = agent.cleanupPayload(check, value)
# Set socket timeout to 2 minutes as some
# time based checks can take awhile
socket.setdefaulttimeout(120)
# In case of boolean-based blind SQL injection
if method == PAYLOAD.METHOD.COMPARISON:
sndPayload = agent.cleanupPayload(test.response.comparison, value)
sndPayload = unescapeDbms(sndPayload, injection, dbms)
sndPayload = "%s%s" % (sndPayload, comment)
# Perform the test's request
trueResult = Request.queryPage(reqPayload, place, timeBasedCompare=True)
# Forge response payload by prepending with
# boundary's prefix and appending the boundary's
# suffix to the test's ' <payload><comment> '
# string
boundPayload = "%s%s%s%s %s" % (origValue, prefix, space, sndPayload, suffix)
boundPayload = boundPayload.strip()
boundPayload = agent.cleanupPayload(boundPayload, value)
cmpPayload = agent.payload(place, parameter, value, boundPayload)
if trueResult:
# Confirm test's results
# Useful to set kb.matchRatio at first based on
# the False response content
kb.matchRatio = None
_ = Request.queryPage(cmpPayload, place)
# Perform the test's True request
trueResult = Request.queryPage(reqPayload, place)
if trueResult:
falseResult = Request.queryPage(cmpPayload, place)
# Perform the test's False request
if not falseResult:
infoMsg = "%s parameter '%s' is '%s' injectable " % (place, parameter, title)
logger.info(infoMsg)
injectable = True
# In case of error-based or UNION query SQL injections
elif method == PAYLOAD.METHOD.GREP:
# Perform the test's request and grep the response
# body for the test's <grep> regular expression
reqBody, _ = Request.queryPage(reqPayload, place, content=True)
output = extractRegexResult(check, reqBody, re.DOTALL | re.IGNORECASE)
if output:
result = output.replace(kb.misc.space, " ") == "1"
if result:
infoMsg = "%s parameter '%s' is '%s' injectable " % (place, parameter, title)
logger.info(infoMsg)
injectable = True
# In case of time-based blind or stacked queries
# SQL injections
elif method == PAYLOAD.METHOD.TIME:
# Store old value of socket timeout
pushValue(socket.getdefaulttimeout())
# Set socket timeout to 2 minutes as some
# time based checks can take awhile
socket.setdefaulttimeout(120)
# Perform the test's request
trueResult = Request.queryPage(reqPayload, place, timeBasedCompare=True)
if trueResult:
infoMsg = "%s parameter '%s' is '%s' injectable " % (place, parameter, title)
logger.info(infoMsg)
# Confirm test's results
trueResult = Request.queryPage(reqPayload, place, timeBasedCompare=True)
injectable = True
if trueResult:
infoMsg = "%s parameter '%s' is '%s' injectable " % (place, parameter, title)
logger.info(infoMsg)
# Restore value of socket timeout
socket.setdefaulttimeout(popValue())
injectable = True
# If the injection test was successful feed the injection
# object with the test's details
if injectable is True:
# Feed with the boundaries details only the first time a
# test has been successful
if injection.place is None or injection.parameter is None:
if place == PLACE.UA:
injection.parameter = conf.agent
# Restore value of socket timeout
socket.setdefaulttimeout(popValue())
# If the injection test was successful feed the injection
# object with the test's details
if injectable is True:
# Feed with the boundaries details only the first time a
# test has been successful
if injection.place is None or injection.parameter is None:
if place == PLACE.UA:
injection.parameter = conf.agent
else:
injection.parameter = parameter
injection.place = place
injection.ptype = ptype
injection.prefix = prefix
injection.suffix = suffix
injection.clause = clause
if "vector" in test and test.vector is not None:
vector = "%s%s" % (test.vector, comment)
else:
injection.parameter = parameter
vector = None
injection.place = place
injection.ptype = ptype
injection.prefix = prefix
injection.suffix = suffix
injection.clause = clause
# Feed with test details every time a test is successful
injection.data[stype] = advancedDict()
injection.data[stype].title = title
injection.data[stype].payload = agent.removePayloadDelimiters(reqPayload, False)
injection.data[stype].where = where
injection.data[stype].vector = vector
injection.data[stype].comment = comment
injection.data[stype].matchRatio = kb.matchRatio
injection.data[stype].templatePayload = templatePayload
if "vector" in test and test.vector is not None:
vector = "%s%s" % (test.vector, comment)
else:
vector = None
if hasattr(test, "details"):
for detailKey, detailValue in test.details.items():
if detailKey == "dbms" and injection.dbms is None:
injection.dbms = detailValue
kb.dbms = detailValue
elif detailKey == "dbms_version" and injection.dbms_version is None:
injection.dbms_version = detailValue
kb.dbmsVersion = [ detailValue ]
elif detailKey == "os" and injection.os is None:
injection.os = detailValue
# Feed with test details every time a test is successful
injection.data[stype] = advancedDict()
injection.data[stype].title = title
injection.data[stype].payload = agent.removePayloadDelimiters(reqPayload, False)
injection.data[stype].where = where
injection.data[stype].vector = vector
injection.data[stype].comment = comment
injection.data[stype].matchRatio = kb.matchRatio
injection.data[stype].templatePayload = templatePayload
if conf.beep:
beep()
if hasattr(test, "details"):
for detailKey, detailValue in test.details.items():
if detailKey == "dbms" and injection.dbms is None:
injection.dbms = detailValue
kb.dbms = detailValue
elif detailKey == "dbms_version" and injection.dbms_version is None:
injection.dbms_version = detailValue
kb.dbmsVersion = [ detailValue ]
elif detailKey == "os" and injection.os is None:
injection.os = detailValue
# There is no need to perform this test for other
# <where> tags
break
if conf.beep:
beep()
# There is no need to perform this test for other
# <where> tags
if injectable is True:
# There is no need to perform this test with others
# boundaries
break
if injectable is True:
# There is no need to perform this test with others
# boundaries
except KeyboardInterrupt:
warnMsg = "Ctrl+C detected in detection mode"
logger.warn(warnMsg)
message = "What do you want to do? [(S)kip current/(a)bort detection/(q)uit]"
test = readInput(message, default="S")
if not test or test[0] in ("s", "S"):
pass
elif test[0] in ("a", "A"):
break
elif test[0] in ("q", "Q"):
raise sqlmapUserQuitException
# Flush the flag
kb.testMode = False