Make Schema compatible with GraphQL-core-next

This commit is contained in:
Christoph Zwerschke 2019-06-30 02:30:51 +02:00 committed by Mel van Londen
parent 2986658470
commit 6a0afb08a1
10 changed files with 542 additions and 490 deletions

View File

@ -1,7 +1,5 @@
from __future__ import absolute_import
from graphql.pyutils.compat import Enum
try:
from inspect import signature
except ImportError:

View File

@ -8,7 +8,7 @@ from graphql import (
)
class GrapheneGraphQLType(object):
class GrapheneGraphQLType:
"""
A class for extending the base GraphQLType with the related
graphene_type

View File

@ -1,7 +1,8 @@
from collections import OrderedDict
from enum import Enum as PyEnum
from graphene.utils.subclass_with_meta import SubclassWithMeta_Meta
from ..pyutils.compat import Enum as PyEnum
from .base import BaseOptions, BaseType
from .unmountedtype import UnmountedType

View File

@ -1,88 +1,133 @@
import inspect
from functools import partial
from graphql import GraphQLObjectType, GraphQLSchema, graphql, is_type
from graphql.type.directives import (
GraphQLDirective,
GraphQLIncludeDirective,
GraphQLSkipDirective,
from graphql import (
default_type_resolver,
get_introspection_query,
graphql,
graphql_sync,
introspection_types,
is_type,
print_schema,
GraphQLArgument,
GraphQLBoolean,
GraphQLEnumValue,
GraphQLField,
GraphQLFloat,
GraphQLID,
GraphQLInputField,
GraphQLInt,
GraphQLList,
GraphQLNonNull,
GraphQLObjectType,
GraphQLSchema,
GraphQLString,
INVALID,
)
from graphql.type.introspection import IntrospectionSchema
from graphql.utils.introspection_query import introspection_query
from graphql.utils.schema_printer import print_schema
from .definitions import GrapheneGraphQLType
from ..utils.str_converters import to_camel_case
from ..utils.get_unbound_function import get_unbound_function
from .definitions import (
GrapheneEnumType,
GrapheneGraphQLType,
GrapheneInputObjectType,
GrapheneInterfaceType,
GrapheneObjectType,
GrapheneScalarType,
GrapheneUnionType,
)
from .dynamic import Dynamic
from .enum import Enum
from .field import Field
from .inputobjecttype import InputObjectType
from .interface import Interface
from .objecttype import ObjectType
from .typemap import TypeMap, is_graphene_type
from .resolver import get_default_resolver
from .scalars import ID, Boolean, Float, Int, Scalar, String
from .structures import List, NonNull
from .union import Union
from .utils import get_field_as
introspection_query = get_introspection_query()
IntrospectionSchema = introspection_types["__Schema"]
def assert_valid_root_type(_type):
if _type is None:
def assert_valid_root_type(type_):
if type_ is None:
return
is_graphene_objecttype = inspect.isclass(_type) and issubclass(_type, ObjectType)
is_graphql_objecttype = isinstance(_type, GraphQLObjectType)
is_graphene_objecttype = inspect.isclass(type_) and issubclass(type_, ObjectType)
is_graphql_objecttype = isinstance(type_, GraphQLObjectType)
assert is_graphene_objecttype or is_graphql_objecttype, (
"Type {} is not a valid ObjectType."
).format(_type)
).format(type_)
class Schema(GraphQLSchema):
"""
Schema Definition
def is_graphene_type(type_):
if isinstance(type_, (List, NonNull)):
return True
if inspect.isclass(type_) and issubclass(
type_, (ObjectType, InputObjectType, Scalar, Interface, Union, Enum)
):
return True
A Schema is created by supplying the root types of each type of operation,
query and mutation (optional).
"""
def resolve_type(resolve_type_func, map_, type_name, root, info, _type):
type_ = resolve_type_func(root, info)
if not type_:
return_type = map_[type_name]
return default_type_resolver(root, info, return_type)
if inspect.isclass(type_) and issubclass(type_, ObjectType):
graphql_type = map_.get(type_._meta.name)
assert graphql_type, "Can't find type {} in schema".format(type_._meta.name)
assert graphql_type.graphene_type == type_, (
"The type {} does not match with the associated graphene type {}."
).format(type_, graphql_type.graphene_type)
return graphql_type
return type_
def is_type_of_from_possible_types(possible_types, root, _info):
return isinstance(root, possible_types)
class GrapheneGraphQLSchema(GraphQLSchema):
"""A GraphQLSchema that can deal with Graphene types as well."""
def __init__(
self,
query=None,
mutation=None,
subscription=None,
directives=None,
types=None,
directives=None,
auto_camelcase=True,
):
assert_valid_root_type(query)
assert_valid_root_type(mutation)
assert_valid_root_type(subscription)
self._query = query
self._mutation = mutation
self._subscription = subscription
self.types = types
self.auto_camelcase = auto_camelcase
if directives is None:
directives = [GraphQLIncludeDirective, GraphQLSkipDirective]
super().__init__(query, mutation, subscription, types, directives)
assert all(
isinstance(d, GraphQLDirective) for d in directives
), "Schema directives must be List[GraphQLDirective] if provided but got: {}.".format(
directives
)
self._directives = directives
self.build_typemap()
def get_query_type(self):
return self.get_graphql_type(self._query)
def get_mutation_type(self):
return self.get_graphql_type(self._mutation)
def get_subscription_type(self):
return self.get_graphql_type(self._subscription)
def __getattr__(self, type_name):
"""
This function let the developer select a type in a given schema
by accessing its attrs.
Example: using schema.Query for accessing the "Query" type in the Schema
"""
_type = super(Schema, self).get_type(type_name)
if _type is None:
raise AttributeError('Type "{}" not found in the Schema'.format(type_name))
if isinstance(_type, GrapheneGraphQLType):
return _type.graphene_type
return _type
if query:
self.query_type = self.get_type(
query.name if isinstance(query, GraphQLObjectType) else query._meta.name
)
if mutation:
self.mutation_type = self.get_type(
mutation.name
if isinstance(mutation, GraphQLObjectType)
else mutation._meta.name
)
if subscription:
self.subscription_type = self.get_type(
subscription.name
if isinstance(subscription, GraphQLObjectType)
else subscription._meta.name
)
def get_graphql_type(self, _type):
if not _type:
@ -98,30 +143,343 @@ class Schema(GraphQLSchema):
return graphql_type
raise Exception("{} is not a valid GraphQL type.".format(_type))
def execute(self, *args, **kwargs):
return graphql(self, *args, **kwargs)
# noinspection PyMethodOverriding
def type_map_reducer(self, map_, type_):
if not type_:
return map_
if inspect.isfunction(type_):
type_ = type_()
if is_graphene_type(type_):
return self.graphene_reducer(map_, type_)
return super().type_map_reducer(map_, type_)
def introspect(self):
instrospection = self.execute(introspection_query)
if instrospection.errors:
raise instrospection.errors[0]
return instrospection.data
def graphene_reducer(self, map_, type_):
if isinstance(type_, (List, NonNull)):
return self.type_map_reducer(map_, type_.of_type)
if type_._meta.name in map_:
_type = map_[type_._meta.name]
if isinstance(_type, GrapheneGraphQLType):
assert _type.graphene_type == type_, (
"Found different types with the same name in the schema: {}, {}."
).format(_type.graphene_type, type_)
return map_
if issubclass(type_, ObjectType):
internal_type = self.construct_objecttype(map_, type_)
elif issubclass(type_, InputObjectType):
internal_type = self.construct_inputobjecttype(map_, type_)
elif issubclass(type_, Interface):
internal_type = self.construct_interface(map_, type_)
elif issubclass(type_, Scalar):
internal_type = self.construct_scalar(type_)
elif issubclass(type_, Enum):
internal_type = self.construct_enum(type_)
elif issubclass(type_, Union):
internal_type = self.construct_union(map_, type_)
else:
raise Exception("Expected Graphene type, but received: {}.".format(type_))
return super().type_map_reducer(map_, internal_type)
@staticmethod
def construct_scalar(type_):
# We have a mapping to the original GraphQL types
# so there are no collisions.
_scalars = {
String: GraphQLString,
Int: GraphQLInt,
Float: GraphQLFloat,
Boolean: GraphQLBoolean,
ID: GraphQLID,
}
if type_ in _scalars:
return _scalars[type_]
return GrapheneScalarType(
graphene_type=type_,
name=type_._meta.name,
description=type_._meta.description,
serialize=getattr(type_, "serialize", None),
parse_value=getattr(type_, "parse_value", None),
parse_literal=getattr(type_, "parse_literal", None),
)
@staticmethod
def construct_enum(type_):
values = {}
for name, value in type_._meta.enum.__members__.items():
description = getattr(value, "description", None)
deprecation_reason = getattr(value, "deprecation_reason", None)
if not description and callable(type_._meta.description):
description = type_._meta.description(value)
if not deprecation_reason and callable(type_._meta.deprecation_reason):
deprecation_reason = type_._meta.deprecation_reason(value)
values[name] = GraphQLEnumValue(
value=value.value,
description=description,
deprecation_reason=deprecation_reason,
)
type_description = (
type_._meta.description(None)
if callable(type_._meta.description)
else type_._meta.description
)
return GrapheneEnumType(
graphene_type=type_,
values=values,
name=type_._meta.name,
description=type_description,
)
def construct_objecttype(self, map_, type_):
if type_._meta.name in map_:
_type = map_[type_._meta.name]
if isinstance(_type, GrapheneGraphQLType):
assert _type.graphene_type == type_, (
"Found different types with the same name in the schema: {}, {}."
).format(_type.graphene_type, type_)
return _type
def interfaces():
interfaces = []
for interface in type_._meta.interfaces:
self.graphene_reducer(map_, interface)
internal_type = map_[interface._meta.name]
assert internal_type.graphene_type == interface
interfaces.append(internal_type)
return interfaces
if type_._meta.possible_types:
is_type_of = partial(
is_type_of_from_possible_types, type_._meta.possible_types
)
else:
is_type_of = type_.is_type_of
return GrapheneObjectType(
graphene_type=type_,
name=type_._meta.name,
description=type_._meta.description,
fields=partial(self.construct_fields_for_type, map_, type_),
is_type_of=is_type_of,
interfaces=interfaces,
)
def construct_interface(self, map_, type_):
if type_._meta.name in map_:
_type = map_[type_._meta.name]
if isinstance(_type, GrapheneInterfaceType):
assert _type.graphene_type == type_, (
"Found different types with the same name in the schema: {}, {}."
).format(_type.graphene_type, type_)
return _type
_resolve_type = None
if type_.resolve_type:
_resolve_type = partial(
resolve_type, type_.resolve_type, map_, type_._meta.name
)
return GrapheneInterfaceType(
graphene_type=type_,
name=type_._meta.name,
description=type_._meta.description,
fields=partial(self.construct_fields_for_type, map_, type_),
resolve_type=_resolve_type,
)
def construct_inputobjecttype(self, map_, type_):
return GrapheneInputObjectType(
graphene_type=type_,
name=type_._meta.name,
description=type_._meta.description,
# TODO: container_type not supported by core-next (is this needed?)
# container_type=type_._meta.container,
fields=partial(
self.construct_fields_for_type, map_, type_, is_input_type=True
),
)
def construct_union(self, map_, type_):
_resolve_type = None
if type_.resolve_type:
_resolve_type = partial(
resolve_type, type_.resolve_type, map_, type_._meta.name
)
def types():
union_types = []
for objecttype in type_._meta.types:
self.graphene_reducer(map_, objecttype)
internal_type = map_[objecttype._meta.name]
assert internal_type.graphene_type == objecttype
union_types.append(internal_type)
return union_types
return GrapheneUnionType(
graphene_type=type_,
name=type_._meta.name,
description=type_._meta.description,
types=types,
resolve_type=_resolve_type,
)
def get_name(self, name):
if self.auto_camelcase:
return to_camel_case(name)
return name
def construct_fields_for_type(self, map_, type_, is_input_type=False):
fields = {}
for name, field in type_._meta.fields.items():
if isinstance(field, Dynamic):
field = get_field_as(field.get_type(self), _as=Field)
if not field:
continue
map_ = self.type_map_reducer(map_, field.type)
field_type = self.get_field_type(map_, field.type)
if is_input_type:
_field = GraphQLInputField(
field_type,
default_value=field.default_value,
# TODO: out_name not (yet) supported by core-next
# out_name=name,
description=field.description,
)
else:
args = {}
for arg_name, arg in field.args.items():
map_ = self.type_map_reducer(map_, arg.type)
arg_type = self.get_field_type(map_, arg.type)
processed_arg_name = arg.name or self.get_name(arg_name)
args[processed_arg_name] = GraphQLArgument(
arg_type,
# TODO: out_name not (yet) supported by core-next
# out_name=arg_name,
description=arg.description,
default_value=INVALID
if isinstance(arg.type, NonNull)
else arg.default_value,
)
_field = GraphQLField(
field_type,
args=args,
resolve=field.get_resolver(
self.get_resolver_for_type(type_, name, field.default_value)
),
deprecation_reason=field.deprecation_reason,
description=field.description,
)
field_name = field.name or self.get_name(name)
fields[field_name] = _field
return fields
def get_resolver_for_type(self, type_, name, default_value):
if not issubclass(type_, ObjectType):
return
resolver = getattr(type_, "resolve_{}".format(name), None)
if not resolver:
# If we don't find the resolver in the ObjectType class, then try to
# find it in each of the interfaces
interface_resolver = None
for interface in type_._meta.interfaces:
if name not in interface._meta.fields:
continue
interface_resolver = getattr(interface, "resolve_{}".format(name), None)
if interface_resolver:
break
resolver = interface_resolver
# Only if is not decorated with classmethod
if resolver:
return get_unbound_function(resolver)
default_resolver = type_._meta.default_resolver or get_default_resolver()
return partial(default_resolver, name, default_value)
def get_field_type(self, map_, type_):
if isinstance(type_, List):
return GraphQLList(self.get_field_type(map_, type_.of_type))
if isinstance(type_, NonNull):
return GraphQLNonNull(self.get_field_type(map_, type_.of_type))
return map_.get(type_._meta.name)
class Schema:
"""
Schema Definition
A Schema is created by supplying the root types of each type of operation,
query and mutation (optional).
"""
def __init__(
self,
query=None,
mutation=None,
subscription=None,
types=None,
directives=None,
auto_camelcase=True,
):
self.query = query
self.mutation = mutation
self.subscription = subscription
self.graphql_schema = GrapheneGraphQLSchema(
query,
mutation,
subscription,
types,
directives,
auto_camelcase=auto_camelcase,
)
def __str__(self):
return print_schema(self)
return print_schema(self.graphql_schema)
def __getattr__(self, type_name):
"""
This function let the developer select a type in a given schema
by accessing its attrs.
Example: using schema.Query for accessing the "Query" type in the Schema
"""
_type = self.graphql_schema.get_type(type_name)
if _type is None:
raise AttributeError('Type "{}" not found in the Schema'.format(type_name))
if isinstance(_type, GrapheneGraphQLType):
return _type.graphene_type
return _type
def lazy(self, _type):
return lambda: self.get_type(_type)
def build_typemap(self):
initial_types = [
self._query,
self._mutation,
self._subscription,
IntrospectionSchema,
]
if self.types:
initial_types += self.types
self._type_map = TypeMap(
initial_types, auto_camelcase=self.auto_camelcase, schema=self
async def async_execute(self, *args, **kwargs):
return graphql(self.graphql_schema, *args, **normalize_execute_kwargs(kwargs))
def execute(self, *args, **kwargs):
return graphql_sync(
self.graphql_schema, *args, **normalize_execute_kwargs(kwargs)
)
def introspect(self):
introspection = self.execute(introspection_query)
if introspection.errors:
raise introspection.errors[0]
return introspection.data
def normalize_execute_kwargs(kwargs):
"""Replace alias names in keyword arguments for graphql()"""
if "root" in kwargs and "root_value" not in kwargs:
kwargs["root_value"] = kwargs.pop("root")
if "context" in kwargs and "context_value" not in kwargs:
kwargs["context_value"] = kwargs.pop("context")
if "variables" in kwargs and "variable_values" not in kwargs:
kwargs["variable_values"] = kwargs.pop("variables")
if "operation" in kwargs and "operation_name" not in kwargs:
kwargs["operation_name"] = kwargs.pop("operation")
return kwargs

View File

@ -69,7 +69,8 @@ class MyInputObjectType(InputObjectType):
def test_defines_a_query_only_schema():
blog_schema = Schema(Query)
assert blog_schema.get_query_type().graphene_type == Query
assert blog_schema.query == Query
assert blog_schema.graphql_schema.query_type.graphene_type == Query
article_field = Query._meta.fields["article"]
assert article_field.type == Article
@ -95,7 +96,8 @@ def test_defines_a_query_only_schema():
def test_defines_a_mutation_schema():
blog_schema = Schema(Query, mutation=Mutation)
assert blog_schema.get_mutation_type().graphene_type == Mutation
assert blog_schema.mutation == Mutation
assert blog_schema.graphql_schema.mutation_type.graphene_type == Mutation
write_mutation = Mutation._meta.fields["write_article"]
assert write_mutation.type == Article
@ -105,7 +107,8 @@ def test_defines_a_mutation_schema():
def test_defines_a_subscription_schema():
blog_schema = Schema(Query, subscription=Subscription)
assert blog_schema.get_subscription_type().graphene_type == Subscription
assert blog_schema.subscription == Subscription
assert blog_schema.graphql_schema.subscription_type.graphene_type == Subscription
subscription = Subscription._meta.fields["article_subscribe"]
assert subscription.type == Article
@ -126,8 +129,9 @@ def test_includes_nested_input_objects_in_the_map():
subscribe_to_something = Field(Article, input=Argument(SomeInputObject))
schema = Schema(query=Query, mutation=SomeMutation, subscription=SomeSubscription)
type_map = schema.graphql_schema.type_map
assert schema.get_type_map()["NestedInputObject"].graphene_type is NestedInputObject
assert type_map["NestedInputObject"].graphene_type is NestedInputObject
def test_includes_interfaces_thunk_subtypes_in_the_type_map():
@ -142,8 +146,9 @@ def test_includes_interfaces_thunk_subtypes_in_the_type_map():
iface = Field(lambda: SomeInterface)
schema = Schema(query=Query, types=[SomeSubtype])
type_map = schema.graphql_schema.type_map
assert schema.get_type_map()["SomeSubtype"].graphene_type is SomeSubtype
assert type_map["SomeSubtype"].graphene_type is SomeSubtype
def test_includes_types_in_union():
@ -161,9 +166,10 @@ def test_includes_types_in_union():
union = Field(MyUnion)
schema = Schema(query=Query)
type_map = schema.graphql_schema.type_map
assert schema.get_type_map()["OtherType"].graphene_type is OtherType
assert schema.get_type_map()["SomeType"].graphene_type is SomeType
assert type_map["OtherType"].graphene_type is OtherType
assert type_map["SomeType"].graphene_type is SomeType
def test_maps_enum():
@ -181,9 +187,10 @@ def test_maps_enum():
union = Field(MyUnion)
schema = Schema(query=Query)
type_map = schema.graphql_schema.type_map
assert schema.get_type_map()["OtherType"].graphene_type is OtherType
assert schema.get_type_map()["SomeType"].graphene_type is SomeType
assert type_map["OtherType"].graphene_type is OtherType
assert type_map["SomeType"].graphene_type is SomeType
def test_includes_interfaces_subtypes_in_the_type_map():
@ -198,8 +205,9 @@ def test_includes_interfaces_subtypes_in_the_type_map():
iface = Field(SomeInterface)
schema = Schema(query=Query, types=[SomeSubtype])
type_map = schema.graphql_schema.type_map
assert schema.get_type_map()["SomeSubtype"].graphene_type is SomeSubtype
assert type_map["SomeSubtype"].graphene_type is SomeSubtype
def test_stringifies_simple_types():

View File

@ -105,6 +105,7 @@ def test_generate_inputobjecttype_inherit_abstracttype_reversed():
]
# TOOD: I think this fails because container_type is not supported in core-next
def test_inputobjecttype_of_input():
class Child(InputObjectType):
first_name = String()

View File

@ -1,7 +1,13 @@
import json
from functools import partial
from graphql import GraphQLError, ResolveInfo, Source, execute, parse
from graphql import (
GraphQLError,
GraphQLResolveInfo as ResolveInfo,
Source,
execute,
parse,
)
from ..context import Context
from ..dynamic import Dynamic
@ -175,7 +181,7 @@ def test_query_wrong_default_value():
assert len(executed.errors) == 1
assert (
executed.errors[0].message
== GraphQLError('Expected value of type "MyType" but got: str.').message
== GraphQLError("Expected value of type 'MyType' but got: 'hello'.").message
)
assert executed.data == {"hello": None}

View File

@ -1,5 +1,7 @@
import pytest
from graphql.pyutils import dedent
from ..field import Field
from ..objecttype import ObjectType
from ..scalars import String
@ -15,8 +17,8 @@ class Query(ObjectType):
def test_schema():
schema = Schema(Query)
assert schema.get_query_type() == schema.get_graphql_type(Query)
schema = Schema(Query).graphql_schema
assert schema.query_type == schema.get_graphql_type(Query)
def test_schema_get_type():
@ -35,23 +37,23 @@ def test_schema_get_type_error():
def test_schema_str():
schema = Schema(Query)
assert (
str(schema)
== """schema {
query: Query
}
assert str(schema) == dedent(
"""
type MyOtherType {
field: String
}
type MyOtherType {
field: String
}
type Query {
inner: MyOtherType
}
"""
type Query {
inner: MyOtherType
}
"""
)
def test_schema_introspect():
schema = Schema(Query)
print()
print(schema)
print()
print(schema.introspect())
assert "__schema" in schema.introspect()

View File

@ -4,7 +4,7 @@ from graphql.type import (
GraphQLEnumType,
GraphQLEnumValue,
GraphQLField,
GraphQLInputObjectField,
GraphQLInputField,
GraphQLInputObjectType,
GraphQLInterfaceType,
GraphQLObjectType,
@ -20,7 +20,13 @@ from ..interface import Interface
from ..objecttype import ObjectType
from ..scalars import Int, String
from ..structures import List, NonNull
from ..typemap import TypeMap, resolve_type
from ..schema import GrapheneGraphQLSchema, resolve_type
def create_type_map(types, auto_camelcase=True):
query = GraphQLObjectType("Query", {})
schema = GrapheneGraphQLSchema(query, types=types, auto_camelcase=auto_camelcase)
return schema.type_map
def test_enum():
@ -39,22 +45,18 @@ def test_enum():
if self == MyEnum.foo:
return "Is deprecated"
typemap = TypeMap([MyEnum])
assert "MyEnum" in typemap
graphql_enum = typemap["MyEnum"]
type_map = create_type_map([MyEnum])
assert "MyEnum" in type_map
graphql_enum = type_map["MyEnum"]
assert isinstance(graphql_enum, GraphQLEnumType)
assert graphql_enum.name == "MyEnum"
assert graphql_enum.description == "Description"
values = graphql_enum.values
assert values == [
GraphQLEnumValue(
name="foo",
value=1,
description="Description foo=1",
deprecation_reason="Is deprecated",
assert graphql_enum.values == {
'foo': GraphQLEnumValue(
value=1, description="Description foo=1", deprecation_reason="Is deprecated"
),
GraphQLEnumValue(name="bar", value=2, description="Description bar=2"),
]
'bar': GraphQLEnumValue(value=2, description="Description bar=2"),
}
def test_objecttype():
@ -70,9 +72,9 @@ def test_objecttype():
def resolve_foo(self, bar):
return bar
typemap = TypeMap([MyObjectType])
assert "MyObjectType" in typemap
graphql_type = typemap["MyObjectType"]
type_map = create_type_map([MyObjectType])
assert "MyObjectType" in type_map
graphql_type = type_map["MyObjectType"]
assert isinstance(graphql_type, GraphQLObjectType)
assert graphql_type.name == "MyObjectType"
assert graphql_type.description == "Description"
@ -88,7 +90,8 @@ def test_objecttype():
GraphQLString,
description="Argument description",
default_value="x",
out_name="bar",
# TODO: out_name not (yet) supported by core-next
# out_name="bar",
)
}
@ -100,10 +103,10 @@ def test_dynamic_objecttype():
bar = Dynamic(lambda: Field(String))
own = Field(lambda: MyObjectType)
typemap = TypeMap([MyObjectType])
assert "MyObjectType" in typemap
type_map = create_type_map([MyObjectType])
assert "MyObjectType" in type_map
assert list(MyObjectType._meta.fields.keys()) == ["bar", "own"]
graphql_type = typemap["MyObjectType"]
graphql_type = type_map["MyObjectType"]
fields = graphql_type.fields
assert list(fields.keys()) == ["bar", "own"]
@ -125,9 +128,9 @@ def test_interface():
def resolve_foo(self, args, info):
return args.get("bar")
typemap = TypeMap([MyInterface])
assert "MyInterface" in typemap
graphql_type = typemap["MyInterface"]
type_map = create_type_map([MyInterface])
assert "MyInterface" in type_map
graphql_type = type_map["MyInterface"]
assert isinstance(graphql_type, GraphQLInterfaceType)
assert graphql_type.name == "MyInterface"
assert graphql_type.description == "Description"
@ -139,13 +142,14 @@ def test_interface():
foo_field = fields["foo"]
assert isinstance(foo_field, GraphQLField)
assert foo_field.description == "Field description"
assert not foo_field.resolver # Resolver not attached in interfaces
assert not foo_field.resolve # Resolver not attached in interfaces
assert foo_field.args == {
"bar": GraphQLArgument(
GraphQLString,
description="Argument description",
default_value="x",
out_name="bar",
# TODO: out_name not (yet) supported by core-next
# out_name="bar",
)
}
@ -169,15 +173,16 @@ def test_inputobject():
def resolve_foo_bar(self, args, info):
return args.get("bar")
typemap = TypeMap([MyInputObjectType])
assert "MyInputObjectType" in typemap
graphql_type = typemap["MyInputObjectType"]
type_map = create_type_map([MyInputObjectType])
assert "MyInputObjectType" in type_map
graphql_type = type_map["MyInputObjectType"]
assert isinstance(graphql_type, GraphQLInputObjectType)
assert graphql_type.name == "MyInputObjectType"
assert graphql_type.description == "Description"
other_graphql_type = typemap["OtherObjectType"]
inner_graphql_type = typemap["MyInnerObjectType"]
other_graphql_type = type_map["OtherObjectType"]
inner_graphql_type = type_map["MyInnerObjectType"]
# TODO: create_container not supported by core-next
container = graphql_type.create_container(
{
"bar": "oh!",
@ -205,7 +210,7 @@ def test_inputobject():
own_field = fields["own"]
assert own_field.type == graphql_type
foo_field = fields["fooBar"]
assert isinstance(foo_field, GraphQLInputObjectField)
assert isinstance(foo_field, GraphQLInputField)
assert foo_field.description == "Field description"
@ -215,9 +220,9 @@ def test_objecttype_camelcase():
foo_bar = String(bar_foo=String())
typemap = TypeMap([MyObjectType])
assert "MyObjectType" in typemap
graphql_type = typemap["MyObjectType"]
type_map = create_type_map([MyObjectType])
assert "MyObjectType" in type_map
graphql_type = type_map["MyObjectType"]
assert isinstance(graphql_type, GraphQLObjectType)
assert graphql_type.name == "MyObjectType"
assert graphql_type.description == "Description"
@ -227,7 +232,12 @@ def test_objecttype_camelcase():
foo_field = fields["fooBar"]
assert isinstance(foo_field, GraphQLField)
assert foo_field.args == {
"barFoo": GraphQLArgument(GraphQLString, out_name="bar_foo")
"barFoo": GraphQLArgument(
GraphQLString,
default_value=None,
# TODO: out_name not (yet) supported by core-next
# out_name="bar_foo"
)
}
@ -237,9 +247,9 @@ def test_objecttype_camelcase_disabled():
foo_bar = String(bar_foo=String())
typemap = TypeMap([MyObjectType], auto_camelcase=False)
assert "MyObjectType" in typemap
graphql_type = typemap["MyObjectType"]
type_map = create_type_map([MyObjectType], auto_camelcase=False)
assert "MyObjectType" in type_map
graphql_type = type_map["MyObjectType"]
assert isinstance(graphql_type, GraphQLObjectType)
assert graphql_type.name == "MyObjectType"
assert graphql_type.description == "Description"
@ -249,7 +259,12 @@ def test_objecttype_camelcase_disabled():
foo_field = fields["foo_bar"]
assert isinstance(foo_field, GraphQLField)
assert foo_field.args == {
"bar_foo": GraphQLArgument(GraphQLString, out_name="bar_foo")
"bar_foo": GraphQLArgument(
GraphQLString,
default_value=None,
# TODO: out_name not (yet) supported by core-next
# out_name="bar_foo"
)
}
@ -262,8 +277,8 @@ def test_objecttype_with_possible_types():
foo_bar = String()
typemap = TypeMap([MyObjectType])
graphql_type = typemap["MyObjectType"]
type_map = create_type_map([MyObjectType])
graphql_type = type_map["MyObjectType"]
assert graphql_type.is_type_of
assert graphql_type.is_type_of({}, None) is True
assert graphql_type.is_type_of(MyObjectType(), None) is False
@ -279,8 +294,8 @@ def test_resolve_type_with_missing_type():
def resolve_type_func(root, info):
return MyOtherObjectType
typemap = TypeMap([MyObjectType])
type_map = create_type_map([MyObjectType])
with pytest.raises(AssertionError) as excinfo:
resolve_type(resolve_type_func, typemap, "MyOtherObjectType", {}, {})
resolve_type(resolve_type_func, type_map, "MyOtherObjectType", {}, {}, None)
assert "MyOtherObjectTyp" in str(excinfo.value)

View File

@ -1,337 +0,0 @@
import inspect
from collections import OrderedDict
from functools import partial
from graphql import (
GraphQLArgument,
GraphQLBoolean,
GraphQLField,
GraphQLFloat,
GraphQLID,
GraphQLInputObjectField,
GraphQLInt,
GraphQLList,
GraphQLNonNull,
GraphQLString,
)
from graphql.execution.executor import get_default_resolve_type_fn
from graphql.type import GraphQLEnumValue
from graphql.type.typemap import GraphQLTypeMap
from ..utils.get_unbound_function import get_unbound_function
from ..utils.str_converters import to_camel_case
from .definitions import (
GrapheneEnumType,
GrapheneGraphQLType,
GrapheneInputObjectType,
GrapheneInterfaceType,
GrapheneObjectType,
GrapheneScalarType,
GrapheneUnionType,
)
from .dynamic import Dynamic
from .enum import Enum
from .field import Field
from .inputobjecttype import InputObjectType
from .interface import Interface
from .objecttype import ObjectType
from .resolver import get_default_resolver
from .scalars import ID, Boolean, Float, Int, Scalar, String
from .structures import List, NonNull
from .union import Union
from .utils import get_field_as
def is_graphene_type(_type):
if isinstance(_type, (List, NonNull)):
return True
if inspect.isclass(_type) and issubclass(
_type, (ObjectType, InputObjectType, Scalar, Interface, Union, Enum)
):
return True
def resolve_type(resolve_type_func, map, type_name, root, info):
_type = resolve_type_func(root, info)
if not _type:
return_type = map[type_name]
return get_default_resolve_type_fn(root, info, return_type)
if inspect.isclass(_type) and issubclass(_type, ObjectType):
graphql_type = map.get(_type._meta.name)
assert graphql_type, "Can't find type {} in schema".format(_type._meta.name)
assert graphql_type.graphene_type == _type, (
"The type {} does not match with the associated graphene type {}."
).format(_type, graphql_type.graphene_type)
return graphql_type
return _type
def is_type_of_from_possible_types(possible_types, root, info):
return isinstance(root, possible_types)
class TypeMap(GraphQLTypeMap):
def __init__(self, types, auto_camelcase=True, schema=None):
self.auto_camelcase = auto_camelcase
self.schema = schema
super(TypeMap, self).__init__(types)
def reducer(self, map, type):
if not type:
return map
if inspect.isfunction(type):
type = type()
if is_graphene_type(type):
return self.graphene_reducer(map, type)
return GraphQLTypeMap.reducer(map, type)
def graphene_reducer(self, map, type):
if isinstance(type, (List, NonNull)):
return self.reducer(map, type.of_type)
if type._meta.name in map:
_type = map[type._meta.name]
if isinstance(_type, GrapheneGraphQLType):
assert _type.graphene_type == type, (
"Found different types with the same name in the schema: {}, {}."
).format(_type.graphene_type, type)
return map
if issubclass(type, ObjectType):
internal_type = self.construct_objecttype(map, type)
elif issubclass(type, InputObjectType):
internal_type = self.construct_inputobjecttype(map, type)
elif issubclass(type, Interface):
internal_type = self.construct_interface(map, type)
elif issubclass(type, Scalar):
internal_type = self.construct_scalar(map, type)
elif issubclass(type, Enum):
internal_type = self.construct_enum(map, type)
elif issubclass(type, Union):
internal_type = self.construct_union(map, type)
else:
raise Exception("Expected Graphene type, but received: {}.".format(type))
return GraphQLTypeMap.reducer(map, internal_type)
def construct_scalar(self, map, type):
# We have a mapping to the original GraphQL types
# so there are no collisions.
_scalars = {
String: GraphQLString,
Int: GraphQLInt,
Float: GraphQLFloat,
Boolean: GraphQLBoolean,
ID: GraphQLID,
}
if type in _scalars:
return _scalars[type]
return GrapheneScalarType(
graphene_type=type,
name=type._meta.name,
description=type._meta.description,
serialize=getattr(type, "serialize", None),
parse_value=getattr(type, "parse_value", None),
parse_literal=getattr(type, "parse_literal", None),
)
def construct_enum(self, map, type):
values = OrderedDict()
for name, value in type._meta.enum.__members__.items():
description = getattr(value, "description", None)
deprecation_reason = getattr(value, "deprecation_reason", None)
if not description and callable(type._meta.description):
description = type._meta.description(value)
if not deprecation_reason and callable(type._meta.deprecation_reason):
deprecation_reason = type._meta.deprecation_reason(value)
values[name] = GraphQLEnumValue(
name=name,
value=value.value,
description=description,
deprecation_reason=deprecation_reason,
)
type_description = (
type._meta.description(None)
if callable(type._meta.description)
else type._meta.description
)
return GrapheneEnumType(
graphene_type=type,
values=values,
name=type._meta.name,
description=type_description,
)
def construct_objecttype(self, map, type):
if type._meta.name in map:
_type = map[type._meta.name]
if isinstance(_type, GrapheneGraphQLType):
assert _type.graphene_type == type, (
"Found different types with the same name in the schema: {}, {}."
).format(_type.graphene_type, type)
return _type
def interfaces():
interfaces = []
for interface in type._meta.interfaces:
self.graphene_reducer(map, interface)
internal_type = map[interface._meta.name]
assert internal_type.graphene_type == interface
interfaces.append(internal_type)
return interfaces
if type._meta.possible_types:
is_type_of = partial(
is_type_of_from_possible_types, type._meta.possible_types
)
else:
is_type_of = type.is_type_of
return GrapheneObjectType(
graphene_type=type,
name=type._meta.name,
description=type._meta.description,
fields=partial(self.construct_fields_for_type, map, type),
is_type_of=is_type_of,
interfaces=interfaces,
)
def construct_interface(self, map, type):
if type._meta.name in map:
_type = map[type._meta.name]
if isinstance(_type, GrapheneInterfaceType):
assert _type.graphene_type == type, (
"Found different types with the same name in the schema: {}, {}."
).format(_type.graphene_type, type)
return _type
_resolve_type = None
if type.resolve_type:
_resolve_type = partial(
resolve_type, type.resolve_type, map, type._meta.name
)
return GrapheneInterfaceType(
graphene_type=type,
name=type._meta.name,
description=type._meta.description,
fields=partial(self.construct_fields_for_type, map, type),
resolve_type=_resolve_type,
)
def construct_inputobjecttype(self, map, type):
return GrapheneInputObjectType(
graphene_type=type,
name=type._meta.name,
description=type._meta.description,
container_type=type._meta.container,
fields=partial(
self.construct_fields_for_type, map, type, is_input_type=True
),
)
def construct_union(self, map, type):
_resolve_type = None
if type.resolve_type:
_resolve_type = partial(
resolve_type, type.resolve_type, map, type._meta.name
)
def types():
union_types = []
for objecttype in type._meta.types:
self.graphene_reducer(map, objecttype)
internal_type = map[objecttype._meta.name]
assert internal_type.graphene_type == objecttype
union_types.append(internal_type)
return union_types
return GrapheneUnionType(
graphene_type=type,
name=type._meta.name,
description=type._meta.description,
types=types,
resolve_type=_resolve_type,
)
def get_name(self, name):
if self.auto_camelcase:
return to_camel_case(name)
return name
def construct_fields_for_type(self, map, type, is_input_type=False):
fields = OrderedDict()
for name, field in type._meta.fields.items():
if isinstance(field, Dynamic):
field = get_field_as(field.get_type(self.schema), _as=Field)
if not field:
continue
map = self.reducer(map, field.type)
field_type = self.get_field_type(map, field.type)
if is_input_type:
_field = GraphQLInputObjectField(
field_type,
default_value=field.default_value,
out_name=name,
description=field.description,
)
else:
args = OrderedDict()
for arg_name, arg in field.args.items():
map = self.reducer(map, arg.type)
arg_type = self.get_field_type(map, arg.type)
processed_arg_name = arg.name or self.get_name(arg_name)
args[processed_arg_name] = GraphQLArgument(
arg_type,
out_name=arg_name,
description=arg.description,
default_value=arg.default_value,
)
_field = GraphQLField(
field_type,
args=args,
resolver=field.get_resolver(
self.get_resolver_for_type(type, name, field.default_value)
),
deprecation_reason=field.deprecation_reason,
description=field.description,
)
field_name = field.name or self.get_name(name)
fields[field_name] = _field
return fields
def get_resolver_for_type(self, type, name, default_value):
if not issubclass(type, ObjectType):
return
resolver = getattr(type, "resolve_{}".format(name), None)
if not resolver:
# If we don't find the resolver in the ObjectType class, then try to
# find it in each of the interfaces
interface_resolver = None
for interface in type._meta.interfaces:
if name not in interface._meta.fields:
continue
interface_resolver = getattr(interface, "resolve_{}".format(name), None)
if interface_resolver:
break
resolver = interface_resolver
# Only if is not decorated with classmethod
if resolver:
return get_unbound_function(resolver)
default_resolver = type._meta.default_resolver or get_default_resolver()
return partial(default_resolver, name, default_value)
def get_field_type(self, map, type):
if isinstance(type, List):
return GraphQLList(self.get_field_type(map, type.of_type))
if isinstance(type, NonNull):
return GraphQLNonNull(self.get_field_type(map, type.of_type))
return map.get(type._meta.name)