diff --git a/lib/techniques/blind/inference.py b/lib/techniques/blind/inference.py index 223ac12c4..3a742345e 100644 --- a/lib/techniques/blind/inference.py +++ b/lib/techniques/blind/inference.py @@ -9,6 +9,8 @@ from __future__ import division import re import time +import os +from difflib import SequenceMatcher from lib.core.agent import agent from lib.core.common import Backend @@ -34,6 +36,7 @@ from lib.core.data import conf from lib.core.data import kb from lib.core.data import logger from lib.core.data import queries +from lib.core.data import paths from lib.core.enums import ADJUST_TIME_DELAY from lib.core.enums import CHARSET_TYPE from lib.core.enums import DBMS @@ -64,6 +67,146 @@ from lib.utils.safe2bin import safecharencode from lib.utils.xrange import xrange from thirdparty import six +# Similarity threshold for dictionary matching +SIMILARITY_THRESHOLD = 0.8 + +# Cache for dictionary content +_dict_cache = None +_dict_cache_columns = None + +def loadDictionary(): + """ + Load common database/table names from dictionary file + """ + global _dict_cache + + if _dict_cache is not None: + return _dict_cache + + _dict_cache = [] + dict_file = paths.COMMON_TABLES + + if os.path.exists(dict_file): + try: + with open(dict_file, 'r', encoding='utf-8') as f: + for line in f: + line = line.strip() + if line and not line.startswith('#'): + _dict_cache.append(line.lower()) + except Exception as e: + logger.debug("Failed to load dictionary file: %s" % e) + + return _dict_cache + +def loadColumnsDictionary(): + """ + Load common column names from dictionary file + """ + global _dict_cache_columns + + if _dict_cache_columns is not None: + return _dict_cache_columns + + _dict_cache_columns = [] + dict_file = paths.COMMON_COLUMNS + + if os.path.exists(dict_file): + try: + with open(dict_file, 'r', encoding='utf-8') as f: + for line in f: + line = line.strip() + if line and not line.startswith('#'): + _dict_cache_columns.append(line.lower()) + except Exception as e: + logger.debug("Failed to load columns dictionary file: %s" % e) + + return _dict_cache_columns + +def isColumnQuery(expression): + """ + Check if the query is related to columns + """ + if not expression: + return False + + expression_lower = expression.lower() + + # Common patterns that indicate column queries + column_patterns = [ + 'information_schema.*columns', + 'pg_attribute', + 'sys\\.columns', + 'describe', + 'show.*columns', + 'show.*fields', + 'column_name', + 'col_name' + ] + + for pattern in column_patterns: + if pattern in expression_lower: + return True + + return False + +def checkSimilarity(partial_value, expression=None): + """ + Check similarity between partial inferred value and dictionary items + Returns the best match and similarity ratio + """ + if not partial_value: + return None, 0.0 + + # Choose dictionary based on query type + if expression and isColumnQuery(expression): + dictionary = loadColumnsDictionary() + dict_type = "columns" + else: + dictionary = loadDictionary() + dict_type = "tables" + + if not dictionary: + return None, 0.0 + + best_match = None + best_ratio = 0.0 + + partial_lower = partial_value.lower() + + for dict_item in dictionary: + # Calculate similarity ratio + ratio = SequenceMatcher(None, partial_lower, dict_item).ratio() + + # Give higher weight if partial value matches dictionary item prefix + if dict_item.startswith(partial_lower): + ratio = max(ratio, 0.9) + + if ratio > best_ratio: + best_ratio = ratio + best_match = dict_item + + return best_match, best_ratio, dict_type + +def testDictionaryMatch(expression, payload, best_match, timeBasedCompare): + """ + Test if dictionary match is successful + """ + try: + # Build test query + testValue = unescaper.escape("'%s'" % best_match) if "'" not in best_match else unescaper.escape("%s" % best_match, quote=False) + + query = getTechniqueData().vector + query = agent.prefixQuery(query.replace(INFERENCE_MARKER, "(%s)%s%s" % (expression, INFERENCE_EQUALS_CHAR, testValue))) + query = agent.suffixQuery(query) + + result = Request.queryPage(agent.payload(newValue=query), timeBasedCompare=timeBasedCompare, raise404=False) + incrementCounter(getTechnique()) + + return result + except Exception as e: + logger.debug("Dictionary match test failed: %s" % e) + return False + def bisection(payload, expression, length=None, charsetType=None, firstChar=None, lastChar=None, dump=False): """ Bisection algorithm that can be used to perform blind SQL injection @@ -517,6 +660,35 @@ def bisection(payload, expression, length=None, charsetType=None, firstChar=None else: break + # Dictionary similarity check for multi-threading + # Check if we have enough characters to test similarity + with kb.locks.value: + currentPartialValue = "" + for i in xrange(currentCharIndex - firstChar): + if threadData.shared.value[i] is not None: + currentPartialValue += threadData.shared.value[i] + + if len(currentPartialValue) >= 3: + best_match, similarity_ratio, dict_type = checkSimilarity(currentPartialValue, expression) + + if best_match and similarity_ratio >= SIMILARITY_THRESHOLD: + infoMsg = "checking %s dictionary similarity for '%s' (similarity: %.2f)" % (dict_type, currentPartialValue, similarity_ratio) + logger.info(infoMsg) + + if testDictionaryMatch(expressionUnescaped, payload, best_match, timeBasedCompare): + infoMsg = "%s dictionary match successful: '%s'" % (dict_type.capitalize(), best_match) + logger.info(infoMsg) + + # Fill the remaining characters with the matched value + remaining_chars = best_match[len(currentPartialValue):] + for i, char in enumerate(remaining_chars): + if currentCharIndex + i - firstChar < len(threadData.shared.value): + threadData.shared.value[currentCharIndex + i - 1 - firstChar] = char + + # Update the index to skip the matched characters + threadData.shared.index[0] += len(remaining_chars) + continue + # NOTE: https://github.com/sqlmapproject/sqlmap/issues/4629 if not isListLike(threadData.shared.value): break @@ -596,6 +768,27 @@ def bisection(payload, expression, length=None, charsetType=None, firstChar=None while True: index += 1 + # Dictionary similarity check feature + # Check if partial value has high similarity with dictionary items + if len(partialValue) >= 3: # Only check if we have at least 3 characters + best_match, similarity_ratio, dict_type = checkSimilarity(partialValue, expression) + + if best_match and similarity_ratio >= SIMILARITY_THRESHOLD: + infoMsg = "checking %s dictionary similarity for '%s' (similarity: %.2f)" % (dict_type, partialValue, similarity_ratio) + logger.info(infoMsg) + + if testDictionaryMatch(expressionUnescaped, payload, best_match, timeBasedCompare): + infoMsg = "%s dictionary match successful: '%s'" % (dict_type.capitalize(), best_match) + logger.info(infoMsg) + + if showEta: + progress.progress(len(best_match)) + elif conf.verbose in (1, 2) or conf.api: + dataToStdout(filterControlChars(best_match[index - 1:])) + + finalValue = best_match + break + # Common prediction feature (a.k.a. "good samaritan") # NOTE: to be used only when multi-threading is not set for # the moment