add depth limit validator

This commit is contained in:
Aryan Iyappan 2021-08-13 18:22:12 +05:30
parent fce45ef552
commit aa11681048
6 changed files with 519 additions and 0 deletions

View File

@ -10,3 +10,4 @@ Execution
dataloader
fileuploading
subscriptions
validators

View File

@ -0,0 +1,35 @@
Middleware
==========
Validation rules help validate a given GraphQL query, before executing it.To help with common use
cases, graphene provides a few validation rules out of the box.
Depth limit Validator
-----------------
The depth limit validator helps to prevent execution of malicious
queries. It takes in the following arguments.
- ``max_depth`` is the maximum allowed depth for any operation in a GraphQL document.
- ``ignore`` Stops recursive depth checking based on a field name. Either a string or regexp to match the name, or a function that returns a boolean
- ``callback`` Called each time validation runs. Receives an Object which is a map of the depths for each operation.
Example
-------
Here is how you would implement depth-limiting on your schema.
.. code:: python
from graphene.validators import depth_limit_validator
# The following schema doesn't execute queries
# which have a depth more than 20.
result = schema.execute(
'THE QUERY',
validation_rules=[
depth_limit_validator(
max_depth=20
)
]
)

View File

@ -0,0 +1,6 @@
from .depth_limit_validator import depth_limit_validator
__all__ = [
"depth_limit_validator"
]

View File

@ -0,0 +1,198 @@
# This is a Python port of https://github.com/stems/graphql-depth-limit
# which is licensed under the terms of the MIT license, reproduced below.
#
# -----------
#
# MIT License
#
# Copyright (c) 2017 Stem
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
import re
from typing import Callable, Dict, List, Optional, Union
from graphql import GraphQLError, is_introspection_type
from graphql.language import (
DefinitionNode,
FieldNode,
FragmentDefinitionNode,
FragmentSpreadNode,
InlineFragmentNode,
Node,
OperationDefinitionNode,
)
from graphql.validation import ValidationContext, ValidationRule
IgnoreType = Union[Callable[[str], bool], re.Pattern, str]
def depth_limit_validator(
max_depth: int,
ignore: Optional[List[IgnoreType]] = None,
callback: Callable[[Dict[str, int]], None] = None,
):
class DepthLimitValidator(ValidationRule):
def __init__(self, validation_context: ValidationContext):
document = validation_context.document
definitions = document.definitions
fragments = get_fragments(definitions)
queries = get_queries_and_mutations(definitions)
query_depths = {}
for name in queries:
query_depths[name] = determine_depth(
node=queries[name],
fragments=fragments,
depth_so_far=0,
max_depth=max_depth,
context=validation_context,
operation_name=name,
ignore=ignore,
)
if callable(callback):
callback(query_depths)
super().__init__(validation_context)
return DepthLimitValidator
def get_fragments(
definitions: List[DefinitionNode],
) -> Dict[str, FragmentDefinitionNode]:
fragments = {}
for definition in definitions:
if isinstance(definition, FragmentDefinitionNode):
fragments[definition.name.value] = definition
return fragments
# This will actually get both queries and mutations.
# We can basically treat those the same
def get_queries_and_mutations(
definitions: List[DefinitionNode],
) -> Dict[str, OperationDefinitionNode]:
operations = {}
for definition in definitions:
if isinstance(definition, OperationDefinitionNode):
operation = definition.name.value if definition.name else "anonymous"
operations[operation] = definition
return operations
def determine_depth(
node: Node,
fragments: Dict[str, FragmentDefinitionNode],
depth_so_far: int,
max_depth: int,
context: ValidationContext,
operation_name: str,
ignore: Optional[List[IgnoreType]] = None,
) -> int:
if depth_so_far > max_depth:
context.report_error(
GraphQLError(
f"'{operation_name}' exceeds maximum operation depth of {max_depth}",
[node],
)
)
return depth_so_far
if isinstance(node, FieldNode):
# from: https://spec.graphql.org/June2018/#sec-Schema
# > All types and directives defined within a schema must not have a name which
# > begins with "__" (two underscores), as this is used exclusively
# > by GraphQLs introspection system.
should_ignore = str(node.name.value).startswith("__") or is_ignored(
node, ignore
)
if should_ignore or not node.selection_set:
return 0
return 1 + max(
map(
lambda selection: determine_depth(
node=selection,
fragments=fragments,
depth_so_far=depth_so_far + 1,
max_depth=max_depth,
context=context,
operation_name=operation_name,
ignore=ignore,
),
node.selection_set.selections,
)
)
elif isinstance(node, FragmentSpreadNode):
return determine_depth(
node=fragments[node.name.value],
fragments=fragments,
depth_so_far=depth_so_far,
max_depth=max_depth,
context=context,
operation_name=operation_name,
ignore=ignore,
)
elif isinstance(
node, (InlineFragmentNode, FragmentDefinitionNode, OperationDefinitionNode)
):
return max(
map(
lambda selection: determine_depth(
node=selection,
fragments=fragments,
depth_so_far=depth_so_far,
max_depth=max_depth,
context=context,
operation_name=operation_name,
ignore=ignore,
),
node.selection_set.selections,
)
)
else:
raise Exception(f"Depth crawler cannot handle: {node.kind}") # pragma: no cover
def is_ignored(node: FieldNode, ignore: Optional[List[IgnoreType]] = None) -> bool:
if ignore is None:
return False
for rule in ignore:
field_name = node.name.value
if isinstance(rule, str):
if field_name == rule:
return True
elif isinstance(rule, re.Pattern):
if rule.match(field_name):
return True
elif callable(rule):
if rule(field_name):
return True
else:
raise ValueError(f"Invalid ignore option: {rule}")
return False

View File

View File

@ -0,0 +1,279 @@
import re
from pytest import raises
from graphql import parse, get_introspection_query, validate
from ...types import Schema, ObjectType, Interface
from ...types import String, Int, List, Field
from ..depth_limit_validator import depth_limit_validator
class PetType(Interface):
name = String(required=True)
class meta:
name = "Pet"
class CatType(ObjectType):
class meta:
name = "Cat"
interfaces = (PetType,)
class DogType(ObjectType):
class meta:
name = "Dog"
interfaces = (PetType,)
class AddressType(ObjectType):
street = String(required=True)
number = Int(required=True)
city = String(required=True)
country = String(required=True)
class Meta:
name = "Address"
class HumanType(ObjectType):
name = String(required=True)
email = String(required=True)
address = Field(AddressType, required=True)
pets = List(PetType, required=True)
class Meta:
name = "Human"
class Query(ObjectType):
user = Field(
HumanType,
required=True,
name=String()
)
version = String(
required=True
)
user1 = Field(
HumanType,
required=True
)
user2 = Field(
HumanType,
required=True
)
user3 = Field(
HumanType,
required=True
)
@staticmethod
def resolve_user(root, info, name=None):
pass
schema = Schema(query=Query)
def run_query(query: str, max_depth: int, ignore=None):
document = parse(query)
result = None
def callback(query_depths):
nonlocal result
result = query_depths
errors = validate(
schema.graphql_schema,
document,
rules=(
depth_limit_validator(
max_depth=max_depth,
ignore=ignore,
callback=callback
),
),
)
return errors, result
def test_should_count_depth_without_fragment():
query = """
query read0 {
version
}
query read1 {
version
user {
name
}
}
query read2 {
matt: user(name: "matt") {
email
}
andy: user(name: "andy") {
email
address {
city
}
}
}
query read3 {
matt: user(name: "matt") {
email
}
andy: user(name: "andy") {
email
address {
city
}
pets {
name
owner {
name
}
}
}
}
"""
expected = {"read0": 0, "read1": 1, "read2": 2, "read3": 3}
errors, result = run_query(query, 10)
assert not errors
assert result == expected
def test_should_count_with_fragments():
query = """
query read0 {
... on Query {
version
}
}
query read1 {
version
user {
... on Human {
name
}
}
}
fragment humanInfo on Human {
email
}
fragment petInfo on Pet {
name
owner {
name
}
}
query read2 {
matt: user(name: "matt") {
...humanInfo
}
andy: user(name: "andy") {
...humanInfo
address {
city
}
}
}
query read3 {
matt: user(name: "matt") {
...humanInfo
}
andy: user(name: "andy") {
... on Human {
email
}
address {
city
}
pets {
...petInfo
}
}
}
"""
expected = {"read0": 0, "read1": 1, "read2": 2, "read3": 3}
errors, result = run_query(query, 10)
assert not errors
assert result == expected
def test_should_ignore_the_introspection_query():
errors, result = run_query(get_introspection_query(), 10)
assert not errors
assert result == {"IntrospectionQuery": 0}
def test_should_catch_very_deep_query():
query = """{
user {
pets {
owner {
pets {
owner {
pets {
name
}
}
}
}
}
}
}
"""
errors, result = run_query(query, 4)
assert len(errors) == 1
assert errors[0].message == "'anonymous' exceeds maximum operation depth of 4"
def test_should_ignore_field():
query = """
query read1 {
user { address { city } }
}
query read2 {
user1 { address { city } }
user2 { address { city } }
user3 { address { city } }
}
"""
errors, result = run_query(
query,
10,
ignore=[
"user1",
re.compile("user2"),
lambda field_name: field_name == "user3",
],
)
expected = {"read1": 2, "read2": 0}
assert not errors
assert result == expected
def test_should_raise_invalid_ignore():
query = """
query read1 {
user { address { city } }
}
"""
with raises(ValueError, match="Invalid ignore option:"):
run_query(
query,
10,
ignore=[True],
)