Update inference.py

This commit is contained in:
Hu@ngD0w 2025-09-01 01:12:54 +08:00 committed by GitHub
parent d9d9b5eeb7
commit bb1120d183
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

View File

@ -9,6 +9,8 @@ from __future__ import division
import re
import time
import os
from difflib import SequenceMatcher
from lib.core.agent import agent
from lib.core.common import Backend
@ -34,6 +36,7 @@ from lib.core.data import conf
from lib.core.data import kb
from lib.core.data import logger
from lib.core.data import queries
from lib.core.data import paths
from lib.core.enums import ADJUST_TIME_DELAY
from lib.core.enums import CHARSET_TYPE
from lib.core.enums import DBMS
@ -64,6 +67,146 @@ from lib.utils.safe2bin import safecharencode
from lib.utils.xrange import xrange
from thirdparty import six
# Similarity threshold for dictionary matching
SIMILARITY_THRESHOLD = 0.8
# Cache for dictionary content
_dict_cache = None
_dict_cache_columns = None
def loadDictionary():
"""
Load common database/table names from dictionary file
"""
global _dict_cache
if _dict_cache is not None:
return _dict_cache
_dict_cache = []
dict_file = paths.COMMON_TABLES
if os.path.exists(dict_file):
try:
with open(dict_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
_dict_cache.append(line.lower())
except Exception as e:
logger.debug("Failed to load dictionary file: %s" % e)
return _dict_cache
def loadColumnsDictionary():
"""
Load common column names from dictionary file
"""
global _dict_cache_columns
if _dict_cache_columns is not None:
return _dict_cache_columns
_dict_cache_columns = []
dict_file = paths.COMMON_COLUMNS
if os.path.exists(dict_file):
try:
with open(dict_file, 'r', encoding='utf-8') as f:
for line in f:
line = line.strip()
if line and not line.startswith('#'):
_dict_cache_columns.append(line.lower())
except Exception as e:
logger.debug("Failed to load columns dictionary file: %s" % e)
return _dict_cache_columns
def isColumnQuery(expression):
"""
Check if the query is related to columns
"""
if not expression:
return False
expression_lower = expression.lower()
# Common patterns that indicate column queries
column_patterns = [
'information_schema.*columns',
'pg_attribute',
'sys\\.columns',
'describe',
'show.*columns',
'show.*fields',
'column_name',
'col_name'
]
for pattern in column_patterns:
if pattern in expression_lower:
return True
return False
def checkSimilarity(partial_value, expression=None):
"""
Check similarity between partial inferred value and dictionary items
Returns the best match and similarity ratio
"""
if not partial_value:
return None, 0.0
# Choose dictionary based on query type
if expression and isColumnQuery(expression):
dictionary = loadColumnsDictionary()
dict_type = "columns"
else:
dictionary = loadDictionary()
dict_type = "tables"
if not dictionary:
return None, 0.0
best_match = None
best_ratio = 0.0
partial_lower = partial_value.lower()
for dict_item in dictionary:
# Calculate similarity ratio
ratio = SequenceMatcher(None, partial_lower, dict_item).ratio()
# Give higher weight if partial value matches dictionary item prefix
if dict_item.startswith(partial_lower):
ratio = max(ratio, 0.9)
if ratio > best_ratio:
best_ratio = ratio
best_match = dict_item
return best_match, best_ratio, dict_type
def testDictionaryMatch(expression, payload, best_match, timeBasedCompare):
"""
Test if dictionary match is successful
"""
try:
# Build test query
testValue = unescaper.escape("'%s'" % best_match) if "'" not in best_match else unescaper.escape("%s" % best_match, quote=False)
query = getTechniqueData().vector
query = agent.prefixQuery(query.replace(INFERENCE_MARKER, "(%s)%s%s" % (expression, INFERENCE_EQUALS_CHAR, testValue)))
query = agent.suffixQuery(query)
result = Request.queryPage(agent.payload(newValue=query), timeBasedCompare=timeBasedCompare, raise404=False)
incrementCounter(getTechnique())
return result
except Exception as e:
logger.debug("Dictionary match test failed: %s" % e)
return False
def bisection(payload, expression, length=None, charsetType=None, firstChar=None, lastChar=None, dump=False):
"""
Bisection algorithm that can be used to perform blind SQL injection
@ -517,6 +660,35 @@ def bisection(payload, expression, length=None, charsetType=None, firstChar=None
else:
break
# Dictionary similarity check for multi-threading
# Check if we have enough characters to test similarity
with kb.locks.value:
currentPartialValue = ""
for i in xrange(currentCharIndex - firstChar):
if threadData.shared.value[i] is not None:
currentPartialValue += threadData.shared.value[i]
if len(currentPartialValue) >= 3:
best_match, similarity_ratio, dict_type = checkSimilarity(currentPartialValue, expression)
if best_match and similarity_ratio >= SIMILARITY_THRESHOLD:
infoMsg = "checking %s dictionary similarity for '%s' (similarity: %.2f)" % (dict_type, currentPartialValue, similarity_ratio)
logger.info(infoMsg)
if testDictionaryMatch(expressionUnescaped, payload, best_match, timeBasedCompare):
infoMsg = "%s dictionary match successful: '%s'" % (dict_type.capitalize(), best_match)
logger.info(infoMsg)
# Fill the remaining characters with the matched value
remaining_chars = best_match[len(currentPartialValue):]
for i, char in enumerate(remaining_chars):
if currentCharIndex + i - firstChar < len(threadData.shared.value):
threadData.shared.value[currentCharIndex + i - 1 - firstChar] = char
# Update the index to skip the matched characters
threadData.shared.index[0] += len(remaining_chars)
continue
# NOTE: https://github.com/sqlmapproject/sqlmap/issues/4629
if not isListLike(threadData.shared.value):
break
@ -596,6 +768,27 @@ def bisection(payload, expression, length=None, charsetType=None, firstChar=None
while True:
index += 1
# Dictionary similarity check feature
# Check if partial value has high similarity with dictionary items
if len(partialValue) >= 3: # Only check if we have at least 3 characters
best_match, similarity_ratio, dict_type = checkSimilarity(partialValue, expression)
if best_match and similarity_ratio >= SIMILARITY_THRESHOLD:
infoMsg = "checking %s dictionary similarity for '%s' (similarity: %.2f)" % (dict_type, partialValue, similarity_ratio)
logger.info(infoMsg)
if testDictionaryMatch(expressionUnescaped, payload, best_match, timeBasedCompare):
infoMsg = "%s dictionary match successful: '%s'" % (dict_type.capitalize(), best_match)
logger.info(infoMsg)
if showEta:
progress.progress(len(best_match))
elif conf.verbose in (1, 2) or conf.api:
dataToStdout(filterControlChars(best_match[index - 1:]))
finalValue = best_match
break
# Common prediction feature (a.k.a. "good samaritan")
# NOTE: to be used only when multi-threading is not set for
# the moment