From 8f13e35f3ff798982f540634a7f011209e032d1e Mon Sep 17 00:00:00 2001 From: donnd-t Date: Fri, 17 Sep 2021 15:06:20 +1000 Subject: [PATCH] refactor swagger code to separate module --- lib/core/common.py | 119 +------------------------------------------ lib/core/swagger.py | 120 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 122 insertions(+), 117 deletions(-) create mode 100644 lib/core/swagger.py diff --git a/lib/core/common.py b/lib/core/common.py index 242b70445..0f918fc03 100644 --- a/lib/core/common.py +++ b/lib/core/common.py @@ -36,7 +36,6 @@ import threading import time import types import unicodedata -import json from difflib import SequenceMatcher from math import sqrt @@ -184,6 +183,7 @@ from lib.core.settings import VERSION_COMPARISON_CORRECTION from lib.core.settings import VERSION_STRING from lib.core.settings import ZIP_HEADER from lib.core.settings import WEBSCARAB_SPLITTER +from lib.core.swagger import parse as _parseSwagger from lib.core.threads import getCurrentThreadData from lib.utils.safe2bin import safecharencode from lib.utils.sqlalchemy import _sqlalchemy @@ -5363,121 +5363,6 @@ def parseRequestFile(reqFile, checkParams=True): if not(conf.scope and not re.search(conf.scope, url, re.I)): yield (url, conf.method or method, data, cookie, tuple(headers)) - def _swaggerOperationParameters(parameters, types): - return list(filter(lambda p: (p["in"] in types), parameters)) - - def _swaggerOperationQueryString(parameters): - queryParameters = _swaggerOperationParameters(parameters, ["query"]) - if len(queryParameters) < 1: - return None - queryString = "" - for qp in queryParameters: - queryString += "&%s=%s" %(qp["name"], qp["example"]) - - return queryString.replace('&', '', 1) - - def _swaggerOperationPath(path, parameters): - pathParameters = _swaggerOperationParameters(parameters, ["path"]) - if len(pathParameters) < 1: - return path - parameterPath = path - for p in pathParameters: - parameterPath = parameterPath.replace("{%s}" %p["name"], "%s*" %p["example"]) - return parameterPath - - def _swaggerRef(swagger, refPath): - paths = refPath.replace("#/", "", 1).split('/') - r = swagger - for p in paths: - r = r[p] - return r - - def _swaggerBody(swagger, refPath): - body = {} - ref = _swaggerRef(swagger, refPath) - if "type" in ref and ref["type"] == "object" and "properties" in ref: - properties = ref["properties"] - for prop in properties: - if "example" in properties[prop]: - value = properties[prop]["example"] - #if properties[prop]["type"] in ["string", "enum"] and value[0] != '"': - # value = "\"%s\"" %value - body[prop] = value - elif "$ref" in properties[prop]: - body[prop] = _swaggerBody(swagger, properties[prop]["$ref"]) - elif properties[prop]["type"] == "array" and "$ref" in properties[prop]["items"]: - body[prop] = [ _swaggerBody(swagger, properties[prop]["items"]["$ref"]) ] - - return body - - - def _parseSwagger(content): - """ - Parses Swagger OpenAPI 3.x.x JSON documents - """ - - try: - swagger = json.loads(content) - - # extra validations - if "openapi" not in swagger or not swagger["openapi"].startswith("3."): - errMsg = "swagger must be OpenAPI 3.x.x!" - raise SqlmapSyntaxException(errMsg) - - if ("servers" not in swagger or - not isinstance(swagger["servers"], list) or - len(swagger["servers"]) < 1 or - "url" not in swagger["servers"][0]): - errMsg = "swagger server is missing!" - raise SqlmapSyntaxException(errMsg) - - server = swagger["servers"][0]["url"] - - logger.info("swagger OpenAPI version '%s', server '%s'" %(swagger["openapi"], server)) - - for path in swagger["paths"]: - for operation in swagger["paths"][path]: - op = swagger["paths"][path][operation] - - tags = conf.swaggerTags - - if tags is None or any(tag in op["tags"] for tag in tags): - - body = {} - if "requestBody" in op: - ref = op["requestBody"]["content"]["application/json"]["schema"]["$ref"] - body = _swaggerBody(swagger, ref) - - # header injection is not currently supported - if (len(_swaggerOperationParameters(op["parameters"], ["query", "path"]))) > 0 or body: - url = None - method = None - data = None - cookie = None - - parameterPath = _swaggerOperationPath(path, op["parameters"]) - qs = _swaggerOperationQueryString(op["parameters"]) - url = "%s%s" % (server, parameterPath) - method = operation.upper() - if body: - data = json.dumps(body) - - if qs is not None: - url += "?" + qs - - logger.debug("swagger url '%s', method '%s', data '%s', cookie '%s'" %(url, method, data, cookie)) - yield (url, method, data, cookie, None) - - else: - logger.info("excluding path '%s', operation '%s' as there are no parameters to inject" %(path, operation)) - - - except json.decoder.JSONDecodeError: - errMsg = "swagger file is not valid JSON" - raise SqlmapSyntaxException(errMsg) - - - content = readCachedFileContent(reqFile) if conf.scope: @@ -5490,7 +5375,7 @@ def parseRequestFile(reqFile, checkParams=True): yield target if conf.swaggerFile: - for target in _parseSwagger(content): + for target in _parseSwagger(content, conf.swaggerTags): yield target def getSafeExString(ex, encoding=None): diff --git a/lib/core/swagger.py b/lib/core/swagger.py new file mode 100644 index 000000000..1697aecbd --- /dev/null +++ b/lib/core/swagger.py @@ -0,0 +1,120 @@ +#!/usr/bin/env python + +""" +Copyright (c) 2006-2021 sqlmap developers (https://sqlmap.org/) +See the file 'LICENSE' for copying permission +""" + +import json + +from lib.core.data import logger +from lib.core.exception import SqlmapSyntaxException + +def _operationParameters(parameters, types): + return list(filter(lambda p: (p["in"] in types), parameters)) + +def _operationQueryString(parameters): + queryParameters = _operationParameters(parameters, ["query"]) + if len(queryParameters) < 1: + return None + queryString = "" + for qp in queryParameters: + queryString += "&%s=%s" %(qp["name"], qp["example"]) + + return queryString.replace('&', '', 1) + +def _operationPath(path, parameters): + pathParameters = _operationParameters(parameters, ["path"]) + if len(pathParameters) < 1: + return path + parameterPath = path + for p in pathParameters: + parameterPath = parameterPath.replace("{%s}" %p["name"], "%s*" %p["example"]) + return parameterPath + +def _ref(swagger, refPath): + paths = refPath.replace("#/", "", 1).split('/') + r = swagger + for p in paths: + r = r[p] + return r + +def _body(swagger, refPath): + body = {} + ref = _ref(swagger, refPath) + if "type" in ref and ref["type"] == "object" and "properties" in ref: + properties = ref["properties"] + for prop in properties: + if "example" in properties[prop]: + value = properties[prop]["example"] + body[prop] = value + elif "$ref" in properties[prop]: + body[prop] = _body(swagger, properties[prop]["$ref"]) + elif properties[prop]["type"] == "array" and "$ref" in properties[prop]["items"]: + body[prop] = [ _body(swagger, properties[prop]["items"]["$ref"]) ] + + return body + +def parse(content, tags): + """ + Parses Swagger OpenAPI 3.x.x JSON documents + """ + + try: + swagger = json.loads(content) + + # extra validations + if "openapi" not in swagger or not swagger["openapi"].startswith("3."): + errMsg = "swagger must be OpenAPI 3.x.x!" + raise SqlmapSyntaxException(errMsg) + + if ("servers" not in swagger or + not isinstance(swagger["servers"], list) or + len(swagger["servers"]) < 1 or + "url" not in swagger["servers"][0]): + errMsg = "swagger server is missing!" + raise SqlmapSyntaxException(errMsg) + + server = swagger["servers"][0]["url"] + + logger.info("swagger OpenAPI version '%s', server '%s'" %(swagger["openapi"], server)) + + for path in swagger["paths"]: + for operation in swagger["paths"][path]: + op = swagger["paths"][path][operation] + + # skip any operations without one of our tags + if tags is not None and not any(tag in op["tags"] for tag in tags): + continue + + body = {} + if "requestBody" in op: + ref = op["requestBody"]["content"]["application/json"]["schema"]["$ref"] + body = _body(swagger, ref) + + # header injection is not currently supported + if (len(_operationParameters(op["parameters"], ["query", "path"]))) < 1 and not body: + logger.info("excluding path '%s', operation '%s' as there are no parameters to inject" %(path, operation)) + continue + + url = None + method = None + data = None + cookie = None + + parameterPath = _operationPath(path, op["parameters"]) + qs = _operationQueryString(op["parameters"]) + url = "%s%s" % (server, parameterPath) + method = operation.upper() + if body: + data = json.dumps(body) + + if qs is not None: + url += "?" + qs + + logger.debug("swagger url '%s', method '%s', data '%s', cookie '%s'" %(url, method, data, cookie)) + yield (url, method, data, cookie, None) + + except json.decoder.JSONDecodeError: + errMsg = "swagger file is not valid JSON" + raise SqlmapSyntaxException(errMsg)