mirror of
synced 2025-03-05 04:15:47 +03:00
Refactored all graphene code moving to 1.0
This commit is contained in:
@ -1,70 +0,0 @@
from graphene import signals
from .core import (
from graphene.core.fields import (
from graphene.utils import (
__all__ = [
@ -1,12 +0,0 @@
from graphene.contrib.django.types import (
from graphene.contrib.django.fields import (
__all__ = ['DjangoObjectType', 'DjangoNode', 'DjangoConnection',
'DjangoModelField', 'DjangoConnectionField']
@ -1,24 +0,0 @@
from django.db import models
class MissingType(object):
UUIDField = models.UUIDField
except AttributeError:
# Improved compatibility for Django 1.6
UUIDField = MissingType
from django.db.models.related import RelatedObject
# Improved compatibility for Django 1.6
RelatedObject = MissingType
# Postgres fields are only available in Django 1.8+
from django.contrib.postgres.fields import ArrayField, HStoreField, JSONField, RangeField
except ImportError:
ArrayField, HStoreField, JSONField, RangeField = (MissingType, ) * 4
@ -1,136 +0,0 @@
from django.db import models
from django.utils.encoding import force_text
from ...core.classtypes.enum import Enum
from ...core.types.custom_scalars import DateTime, JSONString
from ...core.types.definitions import List
from ...core.types.scalars import ID, Boolean, Float, Int, String
from ...utils import to_const
from .compat import (ArrayField, HStoreField, JSONField, RangeField,
RelatedObject, UUIDField)
from .utils import get_related_model, import_single_dispatch
singledispatch = import_single_dispatch()
def convert_choices(choices):
for value, name in choices:
if isinstance(name, (tuple, list)):
for choice in convert_choices(name):
yield choice
yield to_const(force_text(name)), value
def convert_django_field_with_choices(field):
choices = getattr(field, 'choices', None)
if choices:
meta = field.model._meta
name = '{}_{}_{}'.format(meta.app_label, meta.object_name, field.name)
graphql_choices = list(convert_choices(choices))
return Enum(name.upper(), graphql_choices, description=field.help_text)
return convert_django_field(field)
def convert_django_field(field):
raise Exception(
"Don't know how to convert the Django field %s (%s)" %
(field, field.__class__))
def convert_field_to_string(field):
return String(description=field.help_text)
def convert_field_to_id(field):
return ID(description=field.help_text)
def convert_field_to_int(field):
return Int(description=field.help_text)
def convert_field_to_boolean(field):
return Boolean(description=field.help_text, required=True)
def convert_field_to_nullboolean(field):
return Boolean(description=field.help_text)
def convert_field_to_float(field):
return Float(description=field.help_text)
def convert_date_to_string(field):
return DateTime(description=field.help_text)
def convert_onetoone_field_to_djangomodel(field):
from .fields import DjangoModelField
return DjangoModelField(get_related_model(field))
def convert_field_to_list_or_connection(field):
from .fields import DjangoModelField, ConnectionOrListField
model_field = DjangoModelField(get_related_model(field))
return ConnectionOrListField(model_field)
# For Django 1.6
def convert_relatedfield_to_djangomodel(field):
from .fields import DjangoModelField, ConnectionOrListField
model_field = DjangoModelField(field.model)
if isinstance(field.field, models.OneToOneField):
return model_field
return ConnectionOrListField(model_field)
def convert_field_to_djangomodel(field):
from .fields import DjangoModelField
return DjangoModelField(get_related_model(field), description=field.help_text)
def convert_postgres_array_to_list(field):
base_type = convert_django_field(field.base_field)
return List(base_type, description=field.help_text)
def convert_posgres_field_to_string(field):
return JSONString(description=field.help_text)
def convert_posgres_range_to_string(field):
inner_type = convert_django_field(field.base_field)
return List(inner_type, description=field.help_text)
@ -1,4 +0,0 @@
from .middleware import DjangoDebugMiddleware
from .types import DjangoDebug
__all__ = ['DjangoDebugMiddleware', 'DjangoDebug']
@ -1,56 +0,0 @@
from promise import Promise
from django.db import connections
from .sql.tracking import unwrap_cursor, wrap_cursor
from .types import DjangoDebug
class DjangoDebugContext(object):
def __init__(self):
self.debug_promise = None
self.promises = []
self.object = DjangoDebug(sql=[])
def get_debug_promise(self):
if not self.debug_promise:
self.debug_promise = Promise.all(self.promises)
return self.debug_promise.then(self.on_resolve_all_promises)
def on_resolve_all_promises(self, values):
return self.object
def add_promise(self, promise):
if self.debug_promise and not self.debug_promise.is_fulfilled:
def enable_instrumentation(self):
# This is thread-safe because database connections are thread-local.
for connection in connections.all():
wrap_cursor(connection, self)
def disable_instrumentation(self):
for connection in connections.all():
class DjangoDebugMiddleware(object):
def resolve(self, next, root, args, context, info):
django_debug = getattr(context, 'django_debug', None)
if not django_debug:
if context is None:
raise Exception('DjangoDebug cannot be executed in None contexts')
context.django_debug = DjangoDebugContext()
except Exception:
raise Exception('DjangoDebug need the context to be writable, context received: {}.'.format(
if info.schema.graphene_schema.T(DjangoDebug) == info.return_type:
return context.django_debug.get_debug_promise()
promise = next(root, args, context, info)
return promise
@ -1,170 +0,0 @@
# Code obtained from django-debug-toolbar sql panel tracking
from __future__ import absolute_import, unicode_literals
import json
from threading import local
from time import time
from django.utils import six
from django.utils.encoding import force_text
from .types import DjangoDebugSQL, DjangoDebugPostgreSQL
class SQLQueryTriggered(Exception):
"""Thrown when template panel triggers a query"""
class ThreadLocalState(local):
def __init__(self):
self.enabled = True
def Wrapper(self):
if self.enabled:
return NormalCursorWrapper
return ExceptionCursorWrapper
def recording(self, v):
self.enabled = v
state = ThreadLocalState()
recording = state.recording # export function
def wrap_cursor(connection, panel):
if not hasattr(connection, '_graphene_cursor'):
connection._graphene_cursor = connection.cursor
def cursor():
return state.Wrapper(connection._graphene_cursor(), connection, panel)
connection.cursor = cursor
return cursor
def unwrap_cursor(connection):
if hasattr(connection, '_graphene_cursor'):
previous_cursor = connection._graphene_cursor
connection.cursor = previous_cursor
del connection._graphene_cursor
class ExceptionCursorWrapper(object):
Wraps a cursor and raises an exception on any operation.
Used in Templates panel.
def __init__(self, cursor, db, logger):
def __getattr__(self, attr):
raise SQLQueryTriggered()
class NormalCursorWrapper(object):
Wraps a cursor and logs queries.
def __init__(self, cursor, db, logger):
self.cursor = cursor
# Instance of a BaseDatabaseWrapper subclass
self.db = db
# logger must implement a ``record`` method
self.logger = logger
def _quote_expr(self, element):
if isinstance(element, six.string_types):
return "'%s'" % force_text(element).replace("'", "''")
return repr(element)
def _quote_params(self, params):
if not params:
return params
if isinstance(params, dict):
return dict((key, self._quote_expr(value))
for key, value in params.items())
return list(map(self._quote_expr, params))
def _decode(self, param):
return force_text(param, strings_only=True)
except UnicodeDecodeError:
return '(encoded string)'
def _record(self, method, sql, params):
start_time = time()
return method(sql, params)
stop_time = time()
duration = (stop_time - start_time)
_params = ''
_params = json.dumps(list(map(self._decode, params)))
except Exception:
pass # object not JSON serializable
alias = getattr(self.db, 'alias', 'default')
conn = self.db.connection
vendor = getattr(conn, 'vendor', 'unknown')
params = {
'vendor': vendor,
'alias': alias,
'sql': self.db.ops.last_executed_query(
self.cursor, sql, self._quote_params(params)),
'duration': duration,
'raw_sql': sql,
'params': _params,
'start_time': start_time,
'stop_time': stop_time,
'is_slow': duration > 10,
'is_select': sql.lower().strip().startswith('select'),
if vendor == 'postgresql':
# If an erroneous query was ran on the connection, it might
# be in a state where checking isolation_level raises an
# exception.
iso_level = conn.isolation_level
except conn.InternalError:
iso_level = 'unknown'
'trans_id': self.logger.get_transaction_id(alias),
'trans_status': conn.get_transaction_status(),
'iso_level': iso_level,
'encoding': conn.encoding,
_sql = DjangoDebugPostgreSQL(**params)
_sql = DjangoDebugSQL(**params)
# We keep `sql` to maintain backwards compatibility
def callproc(self, procname, params=()):
return self._record(self.cursor.callproc, procname, params)
def execute(self, sql, params=()):
return self._record(self.cursor.execute, sql, params)
def executemany(self, sql, param_list):
return self._record(self.cursor.executemany, sql, param_list)
def __getattr__(self, attr):
return getattr(self.cursor, attr)
def __iter__(self):
return iter(self.cursor)
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
@ -1,25 +0,0 @@
from .....core import Boolean, Float, ObjectType, String
class DjangoDebugBaseSQL(ObjectType):
vendor = String()
alias = String()
sql = String()
duration = Float()
raw_sql = String()
params = String()
start_time = Float()
stop_time = Float()
is_slow = Boolean()
is_select = Boolean()
class DjangoDebugSQL(DjangoDebugBaseSQL):
class DjangoDebugPostgreSQL(DjangoDebugBaseSQL):
trans_id = String()
trans_status = String()
iso_level = String()
encoding = String()
@ -1,219 +0,0 @@
import pytest
import graphene
from graphene.contrib.django import DjangoConnectionField, DjangoNode
from graphene.contrib.django.utils import DJANGO_FILTER_INSTALLED
from ...tests.models import Reporter
from ..middleware import DjangoDebugMiddleware
from ..types import DjangoDebug
class context(object):
# from examples.starwars_django.models import Character
pytestmark = pytest.mark.django_db
def test_should_query_field():
r1 = Reporter(last_name='ABA')
r2 = Reporter(last_name='Griffin')
class ReporterType(DjangoNode):
class Meta:
model = Reporter
class Query(graphene.ObjectType):
reporter = graphene.Field(ReporterType)
debug = graphene.Field(DjangoDebug, name='__debug')
def resolve_reporter(self, *args, **kwargs):
return Reporter.objects.first()
query = '''
query ReporterQuery {
reporter {
__debug {
sql {
expected = {
'reporter': {
'lastName': 'ABA',
'__debug': {
'sql': [{
'rawSql': str(Reporter.objects.order_by('pk')[:1].query)
schema = graphene.Schema(query=Query, middlewares=[DjangoDebugMiddleware()])
result = schema.execute(query, context_value=context())
assert not result.errors
assert result.data == expected
def test_should_query_list():
r1 = Reporter(last_name='ABA')
r2 = Reporter(last_name='Griffin')
class ReporterType(DjangoNode):
class Meta:
model = Reporter
class Query(graphene.ObjectType):
all_reporters = ReporterType.List()
debug = graphene.Field(DjangoDebug, name='__debug')
def resolve_all_reporters(self, *args, **kwargs):
return Reporter.objects.all()
query = '''
query ReporterQuery {
allReporters {
__debug {
sql {
expected = {
'allReporters': [{
'lastName': 'ABA',
}, {
'lastName': 'Griffin',
'__debug': {
'sql': [{
'rawSql': str(Reporter.objects.all().query)
schema = graphene.Schema(query=Query, middlewares=[DjangoDebugMiddleware()])
result = schema.execute(query, context_value=context())
assert not result.errors
assert result.data == expected
def test_should_query_connection():
r1 = Reporter(last_name='ABA')
r2 = Reporter(last_name='Griffin')
class ReporterType(DjangoNode):
class Meta:
model = Reporter
class Query(graphene.ObjectType):
all_reporters = DjangoConnectionField(ReporterType)
debug = graphene.Field(DjangoDebug, name='__debug')
def resolve_all_reporters(self, *args, **kwargs):
return Reporter.objects.all()
query = '''
query ReporterQuery {
allReporters(first:1) {
edges {
node {
__debug {
sql {
expected = {
'allReporters': {
'edges': [{
'node': {
'lastName': 'ABA',
schema = graphene.Schema(query=Query, middlewares=[DjangoDebugMiddleware()])
result = schema.execute(query, context_value=context())
assert not result.errors
assert result.data['allReporters'] == expected['allReporters']
assert 'COUNT' in result.data['__debug']['sql'][0]['rawSql']
query = str(Reporter.objects.all()[:1].query)
assert result.data['__debug']['sql'][1]['rawSql'] == query
@pytest.mark.skipif(not DJANGO_FILTER_INSTALLED,
reason="requires django-filter")
def test_should_query_connectionfilter():
from graphene.contrib.django.filter import DjangoFilterConnectionField
r1 = Reporter(last_name='ABA')
r2 = Reporter(last_name='Griffin')
class ReporterType(DjangoNode):
class Meta:
model = Reporter
class Query(graphene.ObjectType):
all_reporters = DjangoFilterConnectionField(ReporterType)
debug = graphene.Field(DjangoDebug, name='__debug')
def resolve_all_reporters(self, *args, **kwargs):
return Reporter.objects.all()
query = '''
query ReporterQuery {
allReporters(first:1) {
edges {
node {
__debug {
sql {
expected = {
'allReporters': {
'edges': [{
'node': {
'lastName': 'ABA',
schema = graphene.Schema(query=Query, middlewares=[DjangoDebugMiddleware()])
result = schema.execute(query, context_value=context())
assert not result.errors
assert result.data['allReporters'] == expected['allReporters']
assert 'COUNT' in result.data['__debug']['sql'][0]['rawSql']
query = str(Reporter.objects.all()[:1].query)
assert result.data['__debug']['sql'][1]['rawSql'] == query
@ -1,7 +0,0 @@
from ....core.classtypes.objecttype import ObjectType
from ....core.types import Field
from .sql.types import DjangoDebugBaseSQL
class DjangoDebug(ObjectType):
sql = Field(DjangoDebugBaseSQL.List())
@ -1,80 +0,0 @@
from ...core.exceptions import SkipField
from ...core.fields import Field
from ...core.types.base import FieldType
from ...core.types.definitions import List
from ...relay import ConnectionField
from ...relay.utils import is_node
from .utils import DJANGO_FILTER_INSTALLED, get_type_for_model, maybe_queryset
class DjangoConnectionField(ConnectionField):
def __init__(self, *args, **kwargs):
self.on = kwargs.pop('on', False)
kwargs['default'] = kwargs.pop('default', self.get_manager)
return super(DjangoConnectionField, self).__init__(*args, **kwargs)
def model(self):
return self.type._meta.model
def get_manager(self):
if self.on:
return getattr(self.model, self.on)
return self.model._default_manager
def get_queryset(self, resolved_qs, args, info):
return resolved_qs
def from_list(self, connection_type, resolved, args, context, info):
resolved_qs = maybe_queryset(resolved)
qs = self.get_queryset(resolved_qs, args, info)
return super(DjangoConnectionField, self).from_list(connection_type, qs, args, context, info)
class ConnectionOrListField(Field):
def internal_type(self, schema):
from .filter.fields import DjangoFilterConnectionField
model_field = self.type
field_object_type = model_field.get_object_type(schema)
if not field_object_type:
raise SkipField()
if is_node(field_object_type):
if field_object_type._meta.filter_fields:
field = DjangoFilterConnectionField(field_object_type)
field = DjangoConnectionField(field_object_type)
field = Field(List(field_object_type))
field.contribute_to_class(self.object_type, self.attname)
return schema.T(field)
class DjangoModelField(FieldType):
def __init__(self, model, *args, **kwargs):
self.model = model
super(DjangoModelField, self).__init__(*args, **kwargs)
def internal_type(self, schema):
_type = self.get_object_type(schema)
if not _type and self.parent._meta.only_fields:
raise Exception(
"Model %r is not accessible by the schema. "
"You can either register the type manually "
"using @schema.register. "
"Or disable the field in %s" % (
if not _type:
raise SkipField()
return schema.T(_type)
def get_object_type(self, schema):
return get_type_for_model(schema, self.model)
@ -1,14 +0,0 @@
import warnings
from graphene.contrib.django.utils import DJANGO_FILTER_INSTALLED
"Use of django filtering requires the django-filter package "
"be installed. You can do so using `pip install django-filter`", ImportWarning
from .fields import DjangoFilterConnectionField
from .filterset import GrapheneFilterSet, GlobalIDFilter, GlobalIDMultipleChoiceFilter
__all__ = ['DjangoFilterConnectionField', 'GrapheneFilterSet',
'GlobalIDFilter', 'GlobalIDMultipleChoiceFilter']
@ -1,36 +0,0 @@
from ..fields import DjangoConnectionField
from .utils import get_filtering_args_from_filterset, get_filterset_class
class DjangoFilterConnectionField(DjangoConnectionField):
def __init__(self, type, fields=None, order_by=None,
extra_filter_meta=None, filterset_class=None,
*args, **kwargs):
self.order_by = order_by or type._meta.filter_order_by
self.fields = fields or type._meta.filter_fields
meta = dict(model=type._meta.model,
if extra_filter_meta:
self.filterset_class = get_filterset_class(filterset_class, **meta)
self.filtering_args = get_filtering_args_from_filterset(self.filterset_class, type)
kwargs.setdefault('args', {})
super(DjangoFilterConnectionField, self).__init__(type, *args, **kwargs)
def get_queryset(self, qs, args, info):
filterset_class = self.filterset_class
filter_kwargs = self.get_filter_kwargs(args)
order = self.get_order(args)
if order:
qs = qs.order_by(order)
return filterset_class(data=filter_kwargs, queryset=qs)
def get_filter_kwargs(self, args):
return {k: v for k, v in args.items() if k in self.filtering_args}
def get_order(self, args):
return args.get('order_by', None)
@ -1,116 +0,0 @@
import six
from django.conf import settings
from django.db import models
from django.utils.text import capfirst
from django_filters import Filter, MultipleChoiceFilter
from django_filters.filterset import FilterSet, FilterSetMetaclass
from graphene.contrib.django.forms import (GlobalIDFormField,
from graphql_relay.node.node import from_global_id
class GlobalIDFilter(Filter):
field_class = GlobalIDFormField
def filter(self, qs, value):
_type, _id = from_global_id(value)
return super(GlobalIDFilter, self).filter(qs, _id)
class GlobalIDMultipleChoiceFilter(MultipleChoiceFilter):
field_class = GlobalIDMultipleChoiceField
def filter(self, qs, value):
gids = [from_global_id(v)[1] for v in value]
return super(GlobalIDMultipleChoiceFilter, self).filter(qs, gids)
ORDER_BY_FIELD = getattr(settings, 'GRAPHENE_ORDER_BY_FIELD', 'order_by')
models.AutoField: {
'filter_class': GlobalIDFilter,
models.OneToOneField: {
'filter_class': GlobalIDFilter,
models.ForeignKey: {
'filter_class': GlobalIDFilter,
models.ManyToManyField: {
'filter_class': GlobalIDMultipleChoiceFilter,
class GrapheneFilterSetMetaclass(FilterSetMetaclass):
def __new__(cls, name, bases, attrs):
new_class = super(GrapheneFilterSetMetaclass, cls).__new__(cls, name, bases, attrs)
# Customise the filter_overrides for Graphene
new_class.filter_overrides.setdefault(k, v)
return new_class
class GrapheneFilterSetMixin(object):
order_by_field = ORDER_BY_FIELD
def filter_for_reverse_field(cls, f, name):
"""Handles retrieving filters for reverse relationships
We override the default implementation so that we can handle
Global IDs (the default implementation expects database
primary keys)
rel = f.field.rel
default = {
'name': name,
'label': capfirst(rel.related_name)
if rel.multiple:
# For to-many relationships
return GlobalIDMultipleChoiceFilter(**default)
# For to-one relationships
return GlobalIDFilter(**default)
class GrapheneFilterSet(six.with_metaclass(GrapheneFilterSetMetaclass, GrapheneFilterSetMixin, FilterSet)):
""" Base class for FilterSets used by Graphene
You shouldn't usually need to use this class. The
DjangoFilterConnectionField will wrap FilterSets with this class as
def setup_filterset(filterset_class):
""" Wrap a provided filterset in Graphene-specific functionality
return type(
(six.with_metaclass(GrapheneFilterSetMetaclass, GrapheneFilterSetMixin, filterset_class),),
def custom_filterset_factory(model, filterset_base_class=GrapheneFilterSet,
""" Create a filterset for the given model using the provided meta data
'model': model,
meta_class = type(str('Meta'), (object,), meta)
filterset = type(
str('%sFilterSet' % model._meta.object_name),
'Meta': meta_class
return filterset
@ -1,31 +0,0 @@
import django_filters
from graphene.contrib.django.tests.models import Article, Pet, Reporter
class ArticleFilter(django_filters.FilterSet):
class Meta:
model = Article
fields = {
'headline': ['exact', 'icontains'],
'pub_date': ['gt', 'lt', 'exact'],
'reporter': ['exact'],
order_by = True
class ReporterFilter(django_filters.FilterSet):
class Meta:
model = Reporter
fields = ['first_name', 'last_name', 'email', 'pets']
order_by = False
class PetFilter(django_filters.FilterSet):
class Meta:
model = Pet
fields = ['name']
order_by = False
@ -1,287 +0,0 @@
from datetime import datetime
import pytest
from graphene import ObjectType, Schema
from graphene.contrib.django import DjangoNode
from graphene.contrib.django.forms import (GlobalIDFormField,
from graphene.contrib.django.tests.models import Article, Pet, Reporter
from graphene.contrib.django.utils import DJANGO_FILTER_INSTALLED
from graphene.relay import NodeField
pytestmark = []
import django_filters
from graphene.contrib.django.filter import (GlobalIDFilter, DjangoFilterConnectionField,
from graphene.contrib.django.filter.tests.filters import ArticleFilter, PetFilter
pytestmark.append(pytest.mark.skipif(True, reason='django_filters not installed'))
class ArticleNode(DjangoNode):
class Meta:
model = Article
class ReporterNode(DjangoNode):
class Meta:
model = Reporter
class PetNode(DjangoNode):
class Meta:
model = Pet
schema = Schema()
def assert_arguments(field, *arguments):
ignore = ('after', 'before', 'first', 'last', 'orderBy')
actual = [
for name in schema.T(field.arguments)
if name not in ignore and not name.startswith('_')
assert set(arguments) == set(actual), \
'Expected arguments ({}) did not match actual ({})'.format(
def assert_orderable(field):
assert 'orderBy' in schema.T(field.arguments), \
'Field cannot be ordered'
def assert_not_orderable(field):
assert 'orderBy' not in schema.T(field.arguments), \
'Field can be ordered'
def test_filter_explicit_filterset_arguments():
field = DjangoFilterConnectionField(ArticleNode, filterset_class=ArticleFilter)
'headline', 'headline_Icontains',
'pubDate', 'pubDate_Gt', 'pubDate_Lt',
def test_filter_shortcut_filterset_arguments_list():
field = DjangoFilterConnectionField(ArticleNode, fields=['pub_date', 'reporter'])
def test_filter_shortcut_filterset_arguments_dict():
field = DjangoFilterConnectionField(ArticleNode, fields={
'headline': ['exact', 'icontains'],
'reporter': ['exact'],
'headline', 'headline_Icontains',
def test_filter_explicit_filterset_orderable():
field = DjangoFilterConnectionField(ArticleNode, filterset_class=ArticleFilter)
def test_filter_shortcut_filterset_orderable_true():
field = DjangoFilterConnectionField(ArticleNode, order_by=True)
def test_filter_shortcut_filterset_orderable_headline():
field = DjangoFilterConnectionField(ArticleNode, order_by=['headline'])
def test_filter_explicit_filterset_not_orderable():
field = DjangoFilterConnectionField(PetNode, filterset_class=PetFilter)
def test_filter_shortcut_filterset_extra_meta():
field = DjangoFilterConnectionField(ArticleNode, extra_filter_meta={
'order_by': True
def test_filter_filterset_information_on_meta():
class ReporterFilterNode(DjangoNode):
class Meta:
model = Reporter
filter_fields = ['first_name', 'articles']
filter_order_by = True
field = DjangoFilterConnectionField(ReporterFilterNode)
assert_arguments(field, 'firstName', 'articles')
def test_filter_filterset_information_on_meta_related():
class ReporterFilterNode(DjangoNode):
class Meta:
model = Reporter
filter_fields = ['first_name', 'articles']
filter_order_by = True
class ArticleFilterNode(DjangoNode):
class Meta:
model = Article
filter_fields = ['headline', 'reporter']
filter_order_by = True
class Query(ObjectType):
all_reporters = DjangoFilterConnectionField(ReporterFilterNode)
all_articles = DjangoFilterConnectionField(ArticleFilterNode)
reporter = NodeField(ReporterFilterNode)
article = NodeField(ArticleFilterNode)
schema = Schema(query=Query)
schema.schema # Trigger the schema loading
articles_field = schema.get_type('ReporterFilterNode')._meta.fields_map['articles']
assert_arguments(articles_field, 'headline', 'reporter')
def test_filter_filterset_related_results():
class ReporterFilterNode(DjangoNode):
class Meta:
model = Reporter
filter_fields = ['first_name', 'articles']
filter_order_by = True
class ArticleFilterNode(DjangoNode):
class Meta:
model = Article
filter_fields = ['headline', 'reporter']
filter_order_by = True
class Query(ObjectType):
all_reporters = DjangoFilterConnectionField(ReporterFilterNode)
all_articles = DjangoFilterConnectionField(ArticleFilterNode)
reporter = NodeField(ReporterFilterNode)
article = NodeField(ArticleFilterNode)
r1 = Reporter.objects.create(first_name='r1', last_name='r1', email='r1@test.com')
r2 = Reporter.objects.create(first_name='r2', last_name='r2', email='r2@test.com')
Article.objects.create(headline='a1', pub_date=datetime.now(), reporter=r1)
Article.objects.create(headline='a2', pub_date=datetime.now(), reporter=r2)
query = '''
query {
allReporters {
edges {
node {
articles {
edges {
node {
schema = Schema(query=Query)
result = schema.execute(query)
assert not result.errors
# We should only get back a single article for each reporter
assert len(result.data['allReporters']['edges'][0]['node']['articles']['edges']) == 1
assert len(result.data['allReporters']['edges'][1]['node']['articles']['edges']) == 1
def test_global_id_field_implicit():
field = DjangoFilterConnectionField(ArticleNode, fields=['id'])
filterset_class = field.filterset_class
id_filter = filterset_class.base_filters['id']
assert isinstance(id_filter, GlobalIDFilter)
assert id_filter.field_class == GlobalIDFormField
def test_global_id_field_explicit():
class ArticleIdFilter(django_filters.FilterSet):
class Meta:
model = Article
fields = ['id']
field = DjangoFilterConnectionField(ArticleNode, filterset_class=ArticleIdFilter)
filterset_class = field.filterset_class
id_filter = filterset_class.base_filters['id']
assert isinstance(id_filter, GlobalIDFilter)
assert id_filter.field_class == GlobalIDFormField
def test_global_id_field_relation():
field = DjangoFilterConnectionField(ArticleNode, fields=['reporter'])
filterset_class = field.filterset_class
id_filter = filterset_class.base_filters['reporter']
assert isinstance(id_filter, GlobalIDFilter)
assert id_filter.field_class == GlobalIDFormField
def test_global_id_multiple_field_implicit():
field = DjangoFilterConnectionField(ReporterNode, fields=['pets'])
filterset_class = field.filterset_class
multiple_filter = filterset_class.base_filters['pets']
assert isinstance(multiple_filter, GlobalIDMultipleChoiceFilter)
assert multiple_filter.field_class == GlobalIDMultipleChoiceField
def test_global_id_multiple_field_explicit():
class ReporterPetsFilter(django_filters.FilterSet):
class Meta:
model = Reporter
fields = ['pets']
field = DjangoFilterConnectionField(ReporterNode, filterset_class=ReporterPetsFilter)
filterset_class = field.filterset_class
multiple_filter = filterset_class.base_filters['pets']
assert isinstance(multiple_filter, GlobalIDMultipleChoiceFilter)
assert multiple_filter.field_class == GlobalIDMultipleChoiceField
def test_global_id_multiple_field_implicit_reverse():
field = DjangoFilterConnectionField(ReporterNode, fields=['articles'])
filterset_class = field.filterset_class
multiple_filter = filterset_class.base_filters['articles']
assert isinstance(multiple_filter, GlobalIDMultipleChoiceFilter)
assert multiple_filter.field_class == GlobalIDMultipleChoiceField
def test_global_id_multiple_field_explicit_reverse():
class ReporterPetsFilter(django_filters.FilterSet):
class Meta:
model = Reporter
fields = ['articles']
field = DjangoFilterConnectionField(ReporterNode, filterset_class=ReporterPetsFilter)
filterset_class = field.filterset_class
multiple_filter = filterset_class.base_filters['articles']
assert isinstance(multiple_filter, GlobalIDMultipleChoiceFilter)
assert multiple_filter.field_class == GlobalIDMultipleChoiceField
@ -1,31 +0,0 @@
import six
from ....core.types import Argument, String
from .filterset import custom_filterset_factory, setup_filterset
def get_filtering_args_from_filterset(filterset_class, type):
""" Inspect a FilterSet and produce the arguments to pass to
a Graphene Field. These arguments will be available to
filter against in the GraphQL
from graphene.contrib.django.form_converter import convert_form_field
args = {}
for name, filter_field in six.iteritems(filterset_class.base_filters):
field_type = Argument(convert_form_field(filter_field.field))
args[name] = field_type
# Also add the 'order_by' field
if filterset_class._meta.order_by:
args[filterset_class.order_by_field] = Argument(String())
return args
def get_filterset_class(filterset_class, **meta):
"""Get the class to be used as the FilterSet"""
if filterset_class:
# If were given a FilterSet class, then set it up and
# return it
return setup_filterset(filterset_class)
return custom_filterset_factory(**meta)
@ -1,73 +0,0 @@
from django import forms
from django.forms.fields import BaseTemporalField
from graphene import ID, Boolean, Float, Int, String
from graphene.contrib.django.forms import (GlobalIDFormField,
from graphene.contrib.django.utils import import_single_dispatch
from graphene.core.types.definitions import List
singledispatch = import_single_dispatch()
UUIDField = forms.UUIDField
except AttributeError:
class UUIDField(object):
def convert_form_field(field):
raise Exception(
"Don't know how to convert the Django form field %s (%s) "
"to Graphene type" %
(field, field.__class__)
def convert_form_field_to_string(field):
return String(description=field.help_text)
def convert_form_field_to_int(field):
return Int(description=field.help_text)
def convert_form_field_to_boolean(field):
return Boolean(description=field.help_text, required=True)
def convert_form_field_to_nullboolean(field):
return Boolean(description=field.help_text)
def convert_form_field_to_float(field):
return Float(description=field.help_text)
def convert_form_field_to_list(field):
return List(ID())
def convert_form_field_to_id(field):
return ID()
@ -1,42 +0,0 @@
import binascii
from django.core.exceptions import ValidationError
from django.forms import CharField, Field, IntegerField, MultipleChoiceField
from django.utils.translation import ugettext_lazy as _
from graphql_relay import from_global_id
class GlobalIDFormField(Field):
default_error_messages = {
'invalid': _('Invalid ID specified.'),
def clean(self, value):
if not value and not self.required:
return None
_type, _id = from_global_id(value)
except (TypeError, ValueError, UnicodeDecodeError, binascii.Error):
raise ValidationError(self.error_messages['invalid'])
except ValidationError:
raise ValidationError(self.error_messages['invalid'])
return value
class GlobalIDMultipleChoiceField(MultipleChoiceField):
default_error_messages = {
'invalid_choice': _('One of the specified IDs was invalid (%(value)s).'),
'invalid_list': _('Enter a list of values.'),
def valid_value(self, value):
# Clean will raise a validation error if there is a problem
return True
@ -1,72 +0,0 @@
import importlib
import json
from distutils.version import StrictVersion
from optparse import make_option
from django import get_version as get_django_version
from django.core.management.base import BaseCommand, CommandError
LT_DJANGO_1_8 = StrictVersion(get_django_version()) < StrictVersion('1.8')
if LT_DJANGO_1_8:
class CommandArguments(BaseCommand):
option_list = BaseCommand.option_list + (
help='Django app containing schema to dump, e.g. myproject.core.schema',
help='Output file (default: schema.json)'
class CommandArguments(BaseCommand):
def add_arguments(self, parser):
from django.conf import settings
default=getattr(settings, 'GRAPHENE_SCHEMA', ''),
help='Django app containing schema to dump, e.g. myproject.core.schema')
default=getattr(settings, 'GRAPHENE_SCHEMA_OUTPUT', 'schema.json'),
help='Output file (default: schema.json)')
class Command(CommandArguments):
help = 'Dump Graphene schema JSON to file'
can_import_settings = True
def save_file(self, out, schema_dict):
with open(out, 'w') as outfile:
json.dump(schema_dict, outfile)
def handle(self, *args, **options):
from django.conf import settings
schema = options.get('schema') or getattr(settings, 'GRAPHENE_SCHEMA', '')
out = options.get('out') or getattr(settings, 'GRAPHENE_SCHEMA_OUTPUT', 'schema.json')
if schema == '':
raise CommandError('Specify schema on GRAPHENE_SCHEMA setting or by using --schema')
i = importlib.import_module(schema)
schema_dict = {'data': i.schema.introspect()}
self.save_file(out, schema_dict)
style = getattr(self, 'style', None)
SUCCESS = getattr(style, 'SUCCESS', lambda x: x)
self.stdout.write(SUCCESS('Successfully dumped GraphQL schema to %s' % out))
@ -1,27 +0,0 @@
from ...core.classtypes.objecttype import ObjectTypeOptions
from ...relay.types import Node
from ...relay.utils import is_node
from .utils import DJANGO_FILTER_INSTALLED
VALID_ATTRS = ('model', 'only_fields', 'exclude_fields')
VALID_ATTRS += ('filter_fields', 'filter_order_by')
class DjangoOptions(ObjectTypeOptions):
def __init__(self, *args, **kwargs):
super(DjangoOptions, self).__init__(*args, **kwargs)
self.model = None
self.valid_attrs += VALID_ATTRS
self.only_fields = None
self.exclude_fields = []
self.filter_fields = None
self.filter_order_by = None
def contribute_to_class(self, cls, name):
super(DjangoOptions, self).contribute_to_class(cls, name)
if is_node(cls):
self.exclude_fields = list(self.exclude_fields) + ['id']
@ -1,52 +0,0 @@
from __future__ import absolute_import
from django.db import models
from django.utils.translation import ugettext_lazy as _
(1, 'this'),
(2, _('that'))
class Pet(models.Model):
name = models.CharField(max_length=30)
class FilmDetails(models.Model):
location = models.CharField(max_length=30)
film = models.OneToOneField('Film', related_name='details')
class Film(models.Model):
reporters = models.ManyToManyField('Reporter',
class Reporter(models.Model):
first_name = models.CharField(max_length=30)
last_name = models.CharField(max_length=30)
email = models.EmailField()
pets = models.ManyToManyField('self')
a_choice = models.CharField(max_length=30, choices=CHOICES)
def __str__(self): # __unicode__ on Python 2
return "%s %s" % (self.first_name, self.last_name)
class Article(models.Model):
headline = models.CharField(max_length=100)
pub_date = models.DateField()
reporter = models.ForeignKey(Reporter, related_name='articles')
lang = models.CharField(max_length=2, help_text='Language', choices=[
('es', 'Spanish'),
('en', 'English')
], default='es')
importance = models.IntegerField('Importance', null=True, blank=True,
choices=[(1, u'Very important'), (2, u'Not as important')])
def __str__(self): # __unicode__ on Python 2
return self.headline
class Meta:
ordering = ('headline',)
@ -1,11 +0,0 @@
from django.core import management
from mock import patch
from six import StringIO
def test_generate_file_on_call_graphql_schema(savefile_mock, settings):
settings.GRAPHENE_SCHEMA = 'graphene.contrib.django.tests.test_urls'
out = StringIO()
management.call_command('graphql_schema', schema='', stdout=out)
assert "Successfully dumped GraphQL schema to schema.json" in out.getvalue()
@ -1,227 +0,0 @@
import pytest
from django.db import models
from django.utils.translation import ugettext_lazy as _
from py.test import raises
import graphene
from graphene.core.types.custom_scalars import DateTime, JSONString
from ..compat import (ArrayField, HStoreField, JSONField, MissingType,
from ..converter import convert_django_field, convert_django_field_with_choices
from ..fields import ConnectionOrListField, DjangoModelField
from .models import Article, Reporter, Film, FilmDetails
def assert_conversion(django_field, graphene_field, *args, **kwargs):
field = django_field(help_text='Custom Help Text', *args, **kwargs)
graphene_type = convert_django_field(field)
assert isinstance(graphene_type, graphene_field)
field = graphene_type.as_field()
assert field.description == 'Custom Help Text'
return field
def test_should_unknown_django_field_raise_exception():
with raises(Exception) as excinfo:
assert 'Don\'t know how to convert the Django field' in str(excinfo.value)
def test_should_date_convert_string():
assert_conversion(models.DateField, DateTime)
def test_should_char_convert_string():
assert_conversion(models.CharField, graphene.String)
def test_should_text_convert_string():
assert_conversion(models.TextField, graphene.String)
def test_should_email_convert_string():
assert_conversion(models.EmailField, graphene.String)
def test_should_slug_convert_string():
assert_conversion(models.SlugField, graphene.String)
def test_should_url_convert_string():
assert_conversion(models.URLField, graphene.String)
def test_should_ipaddress_convert_string():
assert_conversion(models.GenericIPAddressField, graphene.String)
def test_should_file_convert_string():
assert_conversion(models.FileField, graphene.String)
def test_should_image_convert_string():
assert_conversion(models.ImageField, graphene.String)
def test_should_auto_convert_id():
assert_conversion(models.AutoField, graphene.ID, primary_key=True)
def test_should_positive_integer_convert_int():
assert_conversion(models.PositiveIntegerField, graphene.Int)
def test_should_positive_small_convert_int():
assert_conversion(models.PositiveSmallIntegerField, graphene.Int)
def test_should_small_integer_convert_int():
assert_conversion(models.SmallIntegerField, graphene.Int)
def test_should_big_integer_convert_int():
assert_conversion(models.BigIntegerField, graphene.Int)
def test_should_integer_convert_int():
assert_conversion(models.IntegerField, graphene.Int)
def test_should_boolean_convert_boolean():
field = assert_conversion(models.BooleanField, graphene.Boolean)
assert field.required is True
def test_should_nullboolean_convert_boolean():
field = assert_conversion(models.NullBooleanField, graphene.Boolean)
assert field.required is False
def test_field_with_choices_convert_enum():
field = models.CharField(help_text='Language', choices=(
('es', 'Spanish'),
('en', 'English')
class TranslatedModel(models.Model):
language = field
class Meta:
app_label = 'test'
graphene_type = convert_django_field_with_choices(field)
assert issubclass(graphene_type, graphene.Enum)
assert graphene_type._meta.type_name == 'TEST_TRANSLATEDMODEL_LANGUAGE'
assert graphene_type._meta.description == 'Language'
assert graphene_type.__enum__.__members__['SPANISH'].value == 'es'
assert graphene_type.__enum__.__members__['ENGLISH'].value == 'en'
def test_field_with_grouped_choices():
field = models.CharField(help_text='Language', choices=(
('Europe', (
('es', 'Spanish'),
('en', 'English'),
class GroupedChoicesModel(models.Model):
language = field
class Meta:
app_label = 'test'
def test_field_with_choices_gettext():
field = models.CharField(help_text='Language', choices=(
('es', _('Spanish')),
('en', _('English'))
class TranslatedChoicesModel(models.Model):
language = field
class Meta:
app_label = 'test'
def test_should_float_convert_float():
assert_conversion(models.FloatField, graphene.Float)
def test_should_manytomany_convert_connectionorlist():
graphene_type = convert_django_field(Reporter._meta.local_many_to_many[0])
assert isinstance(graphene_type, ConnectionOrListField)
assert isinstance(graphene_type.type, DjangoModelField)
assert graphene_type.type.model == Reporter
def test_should_manytoone_convert_connectionorlist():
# Django 1.9 uses 'rel', <1.9 uses 'related
related = getattr(Reporter.articles, 'rel', None) or \
getattr(Reporter.articles, 'related')
graphene_type = convert_django_field(related)
assert isinstance(graphene_type, ConnectionOrListField)
assert isinstance(graphene_type.type, DjangoModelField)
assert graphene_type.type.model == Article
def test_should_onetoone_reverse_convert_model():
# Django 1.9 uses 'rel', <1.9 uses 'related
related = getattr(Film.details, 'rel', None) or \
getattr(Film.details, 'related')
graphene_type = convert_django_field(related)
assert isinstance(graphene_type, DjangoModelField)
assert graphene_type.model == FilmDetails
def test_should_onetoone_convert_model():
field = assert_conversion(models.OneToOneField, DjangoModelField, Article)
assert field.type.model == Article
def test_should_foreignkey_convert_model():
field = assert_conversion(models.ForeignKey, DjangoModelField, Article)
assert field.type.model == Article
@pytest.mark.skipif(ArrayField is MissingType,
reason="ArrayField should exist")
def test_should_postgres_array_convert_list():
field = assert_conversion(ArrayField, graphene.List, models.CharField(max_length=100))
assert isinstance(field.type, graphene.List)
assert isinstance(field.type.of_type, graphene.String)
@pytest.mark.skipif(ArrayField is MissingType,
reason="ArrayField should exist")
def test_should_postgres_array_multiple_convert_list():
field = assert_conversion(ArrayField, graphene.List, ArrayField(models.CharField(max_length=100)))
assert isinstance(field.type, graphene.List)
assert isinstance(field.type.of_type, graphene.List)
assert isinstance(field.type.of_type.of_type, graphene.String)
@pytest.mark.skipif(HStoreField is MissingType,
reason="HStoreField should exist")
def test_should_postgres_hstore_convert_string():
assert_conversion(HStoreField, JSONString)
@pytest.mark.skipif(JSONField is MissingType,
reason="JSONField should exist")
def test_should_postgres_json_convert_string():
assert_conversion(JSONField, JSONString)
@pytest.mark.skipif(RangeField is MissingType,
reason="RangeField should exist")
def test_should_postgres_range_convert_list():
from django.contrib.postgres.fields import IntegerRangeField
field = assert_conversion(IntegerRangeField, graphene.List)
assert isinstance(field.type.of_type, graphene.Int)
@ -1,103 +0,0 @@
from django import forms
from py.test import raises
import graphene
from graphene.contrib.django.form_converter import convert_form_field
from graphene.core.types import ID, List
from .models import Reporter
def assert_conversion(django_field, graphene_field, *args):
field = django_field(*args, help_text='Custom Help Text')
graphene_type = convert_form_field(field)
assert isinstance(graphene_type, graphene_field)
field = graphene_type.as_field()
assert field.description == 'Custom Help Text'
return field
def test_should_unknown_django_field_raise_exception():
with raises(Exception) as excinfo:
assert 'Don\'t know how to convert the Django form field' in str(excinfo.value)
def test_should_date_convert_string():
assert_conversion(forms.DateField, graphene.String)
def test_should_time_convert_string():
assert_conversion(forms.TimeField, graphene.String)
def test_should_date_time_convert_string():
assert_conversion(forms.DateTimeField, graphene.String)
def test_should_char_convert_string():
assert_conversion(forms.CharField, graphene.String)
def test_should_email_convert_string():
assert_conversion(forms.EmailField, graphene.String)
def test_should_slug_convert_string():
assert_conversion(forms.SlugField, graphene.String)
def test_should_url_convert_string():
assert_conversion(forms.URLField, graphene.String)
def test_should_choice_convert_string():
assert_conversion(forms.ChoiceField, graphene.String)
def test_should_base_field_convert_string():
assert_conversion(forms.Field, graphene.String)
def test_should_regex_convert_string():
assert_conversion(forms.RegexField, graphene.String, '[0-9]+')
def test_should_uuid_convert_string():
if hasattr(forms, 'UUIDField'):
assert_conversion(forms.UUIDField, graphene.String)
def test_should_integer_convert_int():
assert_conversion(forms.IntegerField, graphene.Int)
def test_should_boolean_convert_boolean():
field = assert_conversion(forms.BooleanField, graphene.Boolean)
assert field.required is True
def test_should_nullboolean_convert_boolean():
field = assert_conversion(forms.NullBooleanField, graphene.Boolean)
assert field.required is False
def test_should_float_convert_float():
assert_conversion(forms.FloatField, graphene.Float)
def test_should_decimal_convert_float():
assert_conversion(forms.DecimalField, graphene.Float)
def test_should_multiple_choice_convert_connectionorlist():
field = forms.ModelMultipleChoiceField(Reporter.objects.all())
graphene_type = convert_form_field(field)
assert isinstance(graphene_type, List)
assert isinstance(graphene_type.of_type, ID)
def test_should_manytoone_convert_connectionorlist():
field = forms.ModelChoiceField(Reporter.objects.all())
graphene_type = convert_form_field(field)
assert isinstance(graphene_type, graphene.ID)
@ -1,36 +0,0 @@
from django.core.exceptions import ValidationError
from py.test import raises
from graphene.contrib.django.forms import GlobalIDFormField
# 'TXlUeXBlOjEwMA==' -> 'MyType', 100
# 'TXlUeXBlOmFiYw==' -> 'MyType', 'abc'
def test_global_id_valid():
field = GlobalIDFormField()
def test_global_id_invalid():
field = GlobalIDFormField()
with raises(ValidationError):
def test_global_id_none():
field = GlobalIDFormField()
with raises(ValidationError):
def test_global_id_none_optional():
field = GlobalIDFormField(required=False)
def test_global_id_bad_int():
field = GlobalIDFormField()
with raises(ValidationError):
@ -1,201 +0,0 @@
import datetime
import pytest
from django.db import models
from py.test import raises
import graphene
from graphene import relay
from ..compat import MissingType, RangeField
from ..types import DjangoNode, DjangoObjectType
from .models import Article, Reporter
pytestmark = pytest.mark.django_db
def test_should_query_only_fields():
with raises(Exception):
class ReporterType(DjangoObjectType):
class Meta:
model = Reporter
only_fields = ('articles', )
schema = graphene.Schema(query=ReporterType)
query = '''
query ReporterQuery {
result = schema.execute(query)
assert not result.errors
def test_should_query_well():
class ReporterType(DjangoObjectType):
class Meta:
model = Reporter
class Query(graphene.ObjectType):
reporter = graphene.Field(ReporterType)
def resolve_reporter(self, *args, **kwargs):
return ReporterType(Reporter(first_name='ABA', last_name='X'))
query = '''
query ReporterQuery {
reporter {
expected = {
'reporter': {
'firstName': 'ABA',
'lastName': 'X',
'email': ''
schema = graphene.Schema(query=Query)
result = schema.execute(query)
assert not result.errors
assert result.data == expected
@pytest.mark.skipif(RangeField is MissingType,
reason="RangeField should exist")
def test_should_query_postgres_fields():
from django.contrib.postgres.fields import IntegerRangeField, ArrayField, JSONField, HStoreField
class Event(models.Model):
ages = IntegerRangeField(help_text='The age ranges')
data = JSONField(help_text='Data')
store = HStoreField()
tags = ArrayField(models.CharField(max_length=50))
class EventType(DjangoObjectType):
class Meta:
model = Event
class Query(graphene.ObjectType):
event = graphene.Field(EventType)
def resolve_event(self, *args, **kwargs):
return Event(
ages=(0, 10),
data={'angry_babies': True},
store={'h': 'store'},
tags=['child', 'angry', 'babies']
schema = graphene.Schema(query=Query)
query = '''
query myQuery {
event {
expected = {
'event': {
'ages': [0, 10],
'tags': ['child', 'angry', 'babies'],
'data': '{"angry_babies": true}',
'store': '{"h": "store"}',
result = schema.execute(query)
assert not result.errors
assert result.data == expected
def test_should_node():
class ReporterNode(DjangoNode):
class Meta:
model = Reporter
def get_node(cls, id, info):
return ReporterNode(Reporter(id=2, first_name='Cookie Monster'))
def resolve_articles(self, *args, **kwargs):
return [ArticleNode(Article(headline='Hi!'))]
class ArticleNode(DjangoNode):
class Meta:
model = Article
def get_node(cls, id, info):
return ArticleNode(Article(id=1, headline='Article node', pub_date=datetime.date(2002, 3, 11)))
class Query(graphene.ObjectType):
node = relay.NodeField()
reporter = graphene.Field(ReporterNode)
article = graphene.Field(ArticleNode)
def resolve_reporter(self, *args, **kwargs):
return ReporterNode(
Reporter(id=1, first_name='ABA', last_name='X'))
query = '''
query ReporterQuery {
reporter {
articles {
edges {
node {
myArticle: node(id:"QXJ0aWNsZU5vZGU6MQ==") {
... on ReporterNode {
... on ArticleNode {
expected = {
'reporter': {
'id': 'UmVwb3J0ZXJOb2RlOjE=',
'firstName': 'ABA',
'lastName': 'X',
'email': '',
'articles': {
'edges': [{
'node': {
'headline': 'Hi!'
'myArticle': {
'id': 'QXJ0aWNsZU5vZGU6MQ==',
'headline': 'Article node',
'pubDate': '2002-03-11',
schema = graphene.Schema(query=Query)
result = schema.execute(query)
assert not result.errors
assert result.data == expected
@ -1,45 +0,0 @@
from py.test import raises
from graphene.contrib.django import DjangoObjectType
from tests.utils import assert_equal_lists
from .models import Reporter
def test_should_raise_if_no_model():
with raises(Exception) as excinfo:
class Character1(DjangoObjectType):
assert 'model in the Meta' in str(excinfo.value)
def test_should_raise_if_model_is_invalid():
with raises(Exception) as excinfo:
class Character2(DjangoObjectType):
class Meta:
model = 1
assert 'not a Django model' in str(excinfo.value)
def test_should_map_fields_correctly():
class ReporterType2(DjangoObjectType):
class Meta:
model = Reporter
['articles', 'first_name', 'last_name', 'email', 'pets', 'id', 'films']
def test_should_map_only_few_fields():
class Reporter2(DjangoObjectType):
class Meta:
model = Reporter
only_fields = ('id', 'email')
['id', 'email']
@ -1,102 +0,0 @@
from graphql.type import GraphQLObjectType
from mock import patch
from graphene import Schema, Interface
from graphene.contrib.django.types import DjangoNode, DjangoObjectType
from graphene.core.fields import Field
from graphene.core.types.scalars import Int
from graphene.relay.fields import GlobalIDField
from tests.utils import assert_equal_lists
from .models import Article, Reporter
schema = Schema()
class Character(DjangoObjectType):
'''Character description'''
class Meta:
model = Reporter
class Human(DjangoNode):
'''Human description'''
pub_date = Int()
class Meta:
model = Article
def test_django_interface():
assert DjangoNode._meta.interface is True
@patch('graphene.contrib.django.tests.models.Article.objects.get', return_value=Article(id=1))
def test_django_get_node(get):
human = Human.get_node(1, None)
assert human.id == 1
def test_djangonode_idfield():
idfield = DjangoNode._meta.fields_map['id']
assert isinstance(idfield, GlobalIDField)
def test_node_idfield():
idfield = Human._meta.fields_map['id']
assert isinstance(idfield, GlobalIDField)
def test_node_replacedfield():
idfield = Human._meta.fields_map['pub_date']
assert isinstance(idfield, Field)
assert schema.T(idfield).type == schema.T(Int())
def test_objecttype_init_none():
h = Human()
assert h._root is None
def test_objecttype_init_good():
instance = Article()
h = Human(instance)
assert h._root == instance
def test_object_type():
object_type = schema.T(Human)
assert Human._meta.interface is False
assert isinstance(object_type, GraphQLObjectType)
['headline', 'id', 'reporter', 'pubDate']
assert schema.T(DjangoNode) in object_type.get_interfaces()
def test_node_notinterface():
assert Human._meta.interface is False
assert DjangoNode in Human._meta.interfaces
def test_django_objecttype_could_extend_interface():
schema = Schema()
class Customer(Interface):
id = Int()
class UserType(DjangoObjectType):
class Meta:
model = Reporter
interfaces = [Customer]
object_type = schema.T(UserType)
assert schema.T(Customer) in object_type.get_interfaces()
@ -1,45 +0,0 @@
from django.conf.urls import url
import graphene
from graphene import Schema
from graphene.contrib.django.types import DjangoNode
from graphene.contrib.django.views import GraphQLView
from .models import Article, Reporter
class Character(DjangoNode):
class Meta:
model = Reporter
def get_node(self, id):
class Human(DjangoNode):
raises = graphene.String()
class Meta:
model = Article
def resolve_raises(self, *args):
raise Exception("This field should raise exception")
def get_node(self, id):
class Query(graphene.ObjectType):
human = graphene.Field(Human)
def resolve_human(self, args, info):
return Human()
schema = Schema(query=Query)
urlpatterns = [
url(r'^graphql', GraphQLView.as_view(schema=schema)),
@ -1,57 +0,0 @@
import json
def format_response(response):
return json.loads(response.content.decode())
def test_client_get_good_query(settings, client):
settings.ROOT_URLCONF = 'graphene.contrib.django.tests.test_urls'
response = client.get('/graphql', {'query': '{ human { headline } }'})
json_response = format_response(response)
expected_json = {
'data': {
'human': {
'headline': None
assert json_response == expected_json
def test_client_get_good_query_with_raise(settings, client):
settings.ROOT_URLCONF = 'graphene.contrib.django.tests.test_urls'
response = client.get('/graphql', {'query': '{ human { raises } }'})
json_response = format_response(response)
assert json_response['errors'][0]['message'] == 'This field should raise exception'
assert json_response['data']['human']['raises'] is None
def test_client_post_good_query_json(settings, client):
settings.ROOT_URLCONF = 'graphene.contrib.django.tests.test_urls'
response = client.post(
'/graphql', json.dumps({'query': '{ human { headline } }'}), 'application/json')
json_response = format_response(response)
expected_json = {
'data': {
'human': {
'headline': None
assert json_response == expected_json
def test_client_post_good_query_graphql(settings, client):
settings.ROOT_URLCONF = 'graphene.contrib.django.tests.test_urls'
response = client.post(
'/graphql', '{ human { headline } }', 'application/graphql')
json_response = format_response(response)
expected_json = {
'data': {
'human': {
'headline': None
assert json_response == expected_json
@ -1,106 +0,0 @@
import inspect
import six
from django.db import models
from ...core.classtypes.objecttype import ObjectType, ObjectTypeMeta
from ...relay.types import Connection, Node, NodeMeta
from .converter import convert_django_field_with_choices
from .options import DjangoOptions
from .utils import get_reverse_fields
class DjangoObjectTypeMeta(ObjectTypeMeta):
options_class = DjangoOptions
def construct_fields(cls):
only_fields = cls._meta.only_fields
reverse_fields = get_reverse_fields(cls._meta.model)
all_fields = sorted(list(cls._meta.model._meta.fields) +
all_fields += list(reverse_fields)
already_created_fields = {f.attname for f in cls._meta.local_fields}
for field in all_fields:
is_not_in_only = only_fields and field.name not in only_fields
is_already_created = field.name in already_created_fields
is_excluded = field.name in cls._meta.exclude_fields or is_already_created
if is_not_in_only or is_excluded:
# We skip this field if we specify only_fields and is not
# in there. Or when we exclude this field in exclude_fields
converted_field = convert_django_field_with_choices(field)
cls.add_to_class(field.name, converted_field)
def construct(cls, *args, **kwargs):
cls = super(DjangoObjectTypeMeta, cls).construct(*args, **kwargs)
if not cls._meta.abstract:
if not cls._meta.model:
raise Exception(
'Django ObjectType %s must have a model in the Meta class attr' %
elif not inspect.isclass(cls._meta.model) or not issubclass(cls._meta.model, models.Model):
raise Exception('Provided model in %s is not a Django model' % cls)
return cls
class InstanceObjectType(ObjectType):
class Meta:
abstract = True
def __init__(self, _root=None):
super(InstanceObjectType, self).__init__(_root=_root)
assert not self._root or isinstance(self._root, self._meta.model), (
'{} received a non-compatible instance ({}) '
'when expecting {}'.format(
def instance(self):
return self._root
def instance(self, value):
self._root = value
class DjangoObjectType(six.with_metaclass(
DjangoObjectTypeMeta, InstanceObjectType)):
class Meta:
abstract = True
class DjangoConnection(Connection):
class DjangoNodeMeta(DjangoObjectTypeMeta, NodeMeta):
class NodeInstance(Node, InstanceObjectType):
class Meta:
abstract = True
class DjangoNode(six.with_metaclass(
DjangoNodeMeta, NodeInstance)):
class Meta:
abstract = True
def get_node(cls, id, info=None):
instance = cls._meta.model.objects.get(id=id)
return cls(instance)
except cls._meta.model.DoesNotExist:
return None
@ -1,87 +0,0 @@
from django.db import models
from django.db.models.manager import Manager
from django.db.models.query import QuerySet
from graphene.utils import LazyList
from .compat import RelatedObject
import django_filters # noqa
except (ImportError, AttributeError):
# AtributeError raised if DjangoFilters installed with a incompatible Django Version
def get_type_for_model(schema, model):
schema = schema
types = schema.types.values()
for _type in types:
type_model = hasattr(_type, '_meta') and getattr(
_type._meta, 'model', None)
if model == type_model:
return _type
def get_reverse_fields(model):
for name, attr in model.__dict__.items():
# Django =>1.9 uses 'rel', django <1.9 uses 'related'
related = getattr(attr, 'rel', None) or \
getattr(attr, 'related', None)
if isinstance(related, RelatedObject):
# Hack for making it compatible with Django 1.6
new_related = RelatedObject(related.parent_model, related.model, related.field)
new_related.name = name
yield new_related
elif isinstance(related, models.ManyToOneRel):
yield related
elif isinstance(related, models.ManyToManyRel) and not related.symmetrical:
yield related
class WrappedQueryset(LazyList):
def __len__(self):
# Dont calculate the length using len(queryset), as this will
# evaluate the whole queryset and return it's length.
# Use .count() instead
return self._origin.count()
def maybe_queryset(value):
if isinstance(value, Manager):
value = value.get_queryset()
if isinstance(value, QuerySet):
return WrappedQueryset(value)
return value
def get_related_model(field):
if hasattr(field, 'rel'):
# Django 1.6, 1.7
return field.rel.to
return field.related_model
def import_single_dispatch():
from functools import singledispatch
except ImportError:
singledispatch = None
if not singledispatch:
from singledispatch import singledispatch
except ImportError:
if not singledispatch:
raise Exception(
"It seems your python version does not include "
"functools.singledispatch. Please install the 'singledispatch' "
"package. More information here: "
return singledispatch
@ -1,13 +0,0 @@
from graphql_django_view import GraphQLView as BaseGraphQLView
class GraphQLView(BaseGraphQLView):
graphene_schema = None
def __init__(self, schema, **kwargs):
super(GraphQLView, self).__init__(
@ -1,11 +0,0 @@
from graphene.contrib.sqlalchemy.types import (
from graphene.contrib.sqlalchemy.fields import (
__all__ = ['SQLAlchemyObjectType', 'SQLAlchemyNode',
'SQLAlchemyConnectionField', 'SQLAlchemyModelField']
@ -1,73 +0,0 @@
from singledispatch import singledispatch
from sqlalchemy import types
from sqlalchemy.orm import interfaces
from ...core.classtypes.enum import Enum
from ...core.types.scalars import ID, Boolean, Float, Int, String
from .fields import ConnectionOrListField, SQLAlchemyModelField
from sqlalchemy_utils.types.choice import ChoiceType
except ImportError:
class ChoiceType(object):
def convert_sqlalchemy_relationship(relationship):
direction = relationship.direction
model = relationship.mapper.entity
model_field = SQLAlchemyModelField(model, description=relationship.doc)
if direction == interfaces.MANYTOONE:
return model_field
elif (direction == interfaces.ONETOMANY or
direction == interfaces.MANYTOMANY):
return ConnectionOrListField(model_field)
def convert_sqlalchemy_column(column):
return convert_sqlalchemy_type(getattr(column, 'type', None), column)
def convert_sqlalchemy_type(type, column):
raise Exception(
"Don't know how to convert the SQLAlchemy field %s (%s)" % (column, column.__class__))
def convert_column_to_string(type, column):
return String(description=column.doc)
def convert_column_to_int_or_id(type, column):
if column.primary_key:
return ID(description=column.doc)
return Int(description=column.doc)
def convert_column_to_boolean(type, column):
return Boolean(description=column.doc)
def convert_column_to_float(type, column):
return Float(description=column.doc)
def convert_column_to_enum(type, column):
name = '{}_{}'.format(column.table.name, column.name).upper()
return Enum(name, type.choices, description=column.doc)
@ -1,69 +0,0 @@
from ...core.exceptions import SkipField
from ...core.fields import Field
from ...core.types.base import FieldType
from ...core.types.definitions import List
from ...relay import ConnectionField
from ...relay.utils import is_node
from .utils import get_query, get_type_for_model, maybe_query
class DefaultQuery(object):
class SQLAlchemyConnectionField(ConnectionField):
def __init__(self, *args, **kwargs):
kwargs['default'] = kwargs.pop('default', lambda: DefaultQuery)
return super(SQLAlchemyConnectionField, self).__init__(*args, **kwargs)
def model(self):
return self.type._meta.model
def from_list(self, connection_type, resolved, args, context, info):
if resolved is DefaultQuery:
resolved = get_query(self.model, info)
query = maybe_query(resolved)
return super(SQLAlchemyConnectionField, self).from_list(connection_type, query, args, context, info)
class ConnectionOrListField(Field):
def internal_type(self, schema):
model_field = self.type
field_object_type = model_field.get_object_type(schema)
if not field_object_type:
raise SkipField()
if is_node(field_object_type):
field = SQLAlchemyConnectionField(field_object_type)
field = Field(List(field_object_type))
field.contribute_to_class(self.object_type, self.attname)
return schema.T(field)
class SQLAlchemyModelField(FieldType):
def __init__(self, model, *args, **kwargs):
self.model = model
super(SQLAlchemyModelField, self).__init__(*args, **kwargs)
def internal_type(self, schema):
_type = self.get_object_type(schema)
if not _type and self.parent._meta.only_fields:
raise Exception(
"Table %r is not accessible by the schema. "
"You can either register the type manually "
"using @schema.register. "
"Or disable the field in %s" % (
if not _type:
raise SkipField()
return schema.T(_type)
def get_object_type(self, schema):
return get_type_for_model(schema, self.model)
@ -1,24 +0,0 @@
from ...core.classtypes.objecttype import ObjectTypeOptions
from ...relay.types import Node
from ...relay.utils import is_node
VALID_ATTRS = ('model', 'only_fields', 'exclude_fields', 'identifier')
class SQLAlchemyOptions(ObjectTypeOptions):
def __init__(self, *args, **kwargs):
super(SQLAlchemyOptions, self).__init__(*args, **kwargs)
self.model = None
self.identifier = "id"
self.valid_attrs += VALID_ATTRS
self.only_fields = None
self.exclude_fields = []
self.filter_fields = None
self.filter_order_by = None
def contribute_to_class(self, cls, name):
super(SQLAlchemyOptions, self).contribute_to_class(cls, name)
if is_node(cls):
self.exclude_fields = list(self.exclude_fields) + ['id']
@ -1,42 +0,0 @@
from __future__ import absolute_import
from sqlalchemy import Column, Date, ForeignKey, Integer, String, Table
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
Base = declarative_base()
association_table = Table('association', Base.metadata,
Column('pet_id', Integer, ForeignKey('pets.id')),
Column('reporter_id', Integer, ForeignKey('reporters.id')))
class Editor(Base):
__tablename__ = 'editors'
editor_id = Column(Integer(), primary_key=True)
name = Column(String(100))
class Pet(Base):
__tablename__ = 'pets'
id = Column(Integer(), primary_key=True)
name = Column(String(30))
reporter_id = Column(Integer(), ForeignKey('reporters.id'))
class Reporter(Base):
__tablename__ = 'reporters'
id = Column(Integer(), primary_key=True)
first_name = Column(String(30))
last_name = Column(String(30))
email = Column(String())
pets = relationship('Pet', secondary=association_table, backref='reporters')
articles = relationship('Article', backref='reporter')
class Article(Base):
__tablename__ = 'articles'
id = Column(Integer(), primary_key=True)
headline = Column(String(100))
pub_date = Column(Date())
reporter_id = Column(Integer(), ForeignKey('reporters.id'))
@ -1,124 +0,0 @@
from py.test import raises
from sqlalchemy import Column, Table, types
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy_utils.types.choice import ChoiceType
import graphene
from graphene.contrib.sqlalchemy.converter import (convert_sqlalchemy_column,
from graphene.contrib.sqlalchemy.fields import (ConnectionOrListField,
from .models import Article, Pet, Reporter
def assert_column_conversion(sqlalchemy_type, graphene_field, **kwargs):
column = Column(sqlalchemy_type, doc='Custom Help Text', **kwargs)
graphene_type = convert_sqlalchemy_column(column)
assert isinstance(graphene_type, graphene_field)
field = graphene_type.as_field()
assert field.description == 'Custom Help Text'
return field
def test_should_unknown_sqlalchemy_field_raise_exception():
with raises(Exception) as excinfo:
assert 'Don\'t know how to convert the SQLAlchemy field' in str(excinfo.value)
def test_should_date_convert_string():
assert_column_conversion(types.Date(), graphene.String)
def test_should_datetime_convert_string():
assert_column_conversion(types.DateTime(), graphene.String)
def test_should_time_convert_string():
assert_column_conversion(types.Time(), graphene.String)
def test_should_string_convert_string():
assert_column_conversion(types.String(), graphene.String)
def test_should_text_convert_string():
assert_column_conversion(types.Text(), graphene.String)
def test_should_unicode_convert_string():
assert_column_conversion(types.Unicode(), graphene.String)
def test_should_unicodetext_convert_string():
assert_column_conversion(types.UnicodeText(), graphene.String)
def test_should_enum_convert_string():
assert_column_conversion(types.Enum(), graphene.String)
def test_should_small_integer_convert_int():
assert_column_conversion(types.SmallInteger(), graphene.Int)
def test_should_big_integer_convert_int():
assert_column_conversion(types.BigInteger(), graphene.Int)
def test_should_integer_convert_int():
assert_column_conversion(types.Integer(), graphene.Int)
def test_should_integer_convert_id():
assert_column_conversion(types.Integer(), graphene.ID, primary_key=True)
def test_should_boolean_convert_boolean():
assert_column_conversion(types.Boolean(), graphene.Boolean)
def test_should_float_convert_float():
assert_column_conversion(types.Float(), graphene.Float)
def test_should_numeric_convert_float():
assert_column_conversion(types.Numeric(), graphene.Float)
def test_should_choice_convert_enum():
(u'es', u'Spanish'),
(u'en', u'English')
column = Column(ChoiceType(TYPES), doc='Language', name='language')
Base = declarative_base()
Table('translatedmodel', Base.metadata, column)
graphene_type = convert_sqlalchemy_column(column)
assert issubclass(graphene_type, graphene.Enum)
assert graphene_type._meta.type_name == 'TRANSLATEDMODEL_LANGUAGE'
assert graphene_type._meta.description == 'Language'
assert graphene_type.__enum__.__members__['es'].value == 'Spanish'
assert graphene_type.__enum__.__members__['en'].value == 'English'
def test_should_manytomany_convert_connectionorlist():
graphene_type = convert_sqlalchemy_relationship(Reporter.pets.property)
assert isinstance(graphene_type, ConnectionOrListField)
assert isinstance(graphene_type.type, SQLAlchemyModelField)
assert graphene_type.type.model == Pet
def test_should_manytoone_convert_connectionorlist():
field = convert_sqlalchemy_relationship(Article.reporter.property)
assert isinstance(field, SQLAlchemyModelField)
assert field.model == Reporter
def test_should_onetomany_convert_model():
graphene_type = convert_sqlalchemy_relationship(Reporter.articles.property)
assert isinstance(graphene_type, ConnectionOrListField)
assert isinstance(graphene_type.type, SQLAlchemyModelField)
assert graphene_type.type.model == Article
@ -1,239 +0,0 @@
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
import graphene
from graphene import relay
from graphene.contrib.sqlalchemy import (SQLAlchemyConnectionField,
SQLAlchemyNode, SQLAlchemyObjectType)
from .models import Article, Base, Editor, Reporter
db = create_engine('sqlite:///test_sqlalchemy.sqlite3')
def session():
connection = db.engine.connect()
transaction = connection.begin()
# options = dict(bind=connection, binds={})
session_factory = sessionmaker(bind=connection)
session = scoped_session(session_factory)
yield session
# Finalize test here
def setup_fixtures(session):
reporter = Reporter(first_name='ABA', last_name='X')
reporter2 = Reporter(first_name='ABO', last_name='Y')
article = Article(headline='Hi!')
editor = Editor(name="John")
def test_should_query_well(session):
class ReporterType(SQLAlchemyObjectType):
class Meta:
model = Reporter
class Query(graphene.ObjectType):
reporter = graphene.Field(ReporterType)
reporters = ReporterType.List()
def resolve_reporter(self, *args, **kwargs):
return session.query(Reporter).first()
def resolve_reporters(self, *args, **kwargs):
return session.query(Reporter)
query = '''
query ReporterQuery {
reporter {
reporters {
expected = {
'reporter': {
'firstName': 'ABA',
'lastName': 'X',
'email': None
'reporters': [{
'firstName': 'ABA',
}, {
'firstName': 'ABO',
schema = graphene.Schema(query=Query)
result = schema.execute(query)
assert not result.errors
assert result.data == expected
def test_should_node(session):
class ReporterNode(SQLAlchemyNode):
class Meta:
model = Reporter
def get_node(cls, id, info):
return Reporter(id=2, first_name='Cookie Monster')
def resolve_articles(self, *args, **kwargs):
return [Article(headline='Hi!')]
class ArticleNode(SQLAlchemyNode):
class Meta:
model = Article
# @classmethod
# def get_node(cls, id, info):
# return Article(id=1, headline='Article node')
class Query(graphene.ObjectType):
node = relay.NodeField()
reporter = graphene.Field(ReporterNode)
article = graphene.Field(ArticleNode)
all_articles = SQLAlchemyConnectionField(ArticleNode)
def resolve_reporter(self, *args, **kwargs):
return Reporter(id=1, first_name='ABA', last_name='X')
def resolve_article(self, *args, **kwargs):
return Article(id=1, headline='Article node')
query = '''
query ReporterQuery {
reporter {
articles {
edges {
node {
allArticles {
edges {
node {
myArticle: node(id:"QXJ0aWNsZU5vZGU6MQ==") {
... on ReporterNode {
... on ArticleNode {
expected = {
'reporter': {
'id': 'UmVwb3J0ZXJOb2RlOjE=',
'firstName': 'ABA',
'lastName': 'X',
'email': None,
'articles': {
'edges': [{
'node': {
'headline': 'Hi!'
'allArticles': {
'edges': [{
'node': {
'headline': 'Hi!'
'myArticle': {
'id': 'QXJ0aWNsZU5vZGU6MQ==',
'headline': 'Hi!'
schema = graphene.Schema(query=Query, session=session)
result = schema.execute(query)
assert not result.errors
assert result.data == expected
def test_should_custom_identifier(session):
class EditorNode(SQLAlchemyNode):
class Meta:
model = Editor
identifier = "editor_id"
class Query(graphene.ObjectType):
node = relay.NodeField(EditorNode)
all_editors = SQLAlchemyConnectionField(EditorNode)
query = '''
query EditorQuery {
allEditors {
edges {
node {
node(id: "RWRpdG9yTm9kZTox") {
expected = {
'allEditors': {
'edges': [{
'node': {
'id': 'RWRpdG9yTm9kZTox',
'name': 'John'
'node': {
'name': 'John'
schema = graphene.Schema(query=Query, session=session)
result = schema.execute(query)
assert not result.errors
assert result.data == expected
@ -1,45 +0,0 @@
from py.test import raises
from graphene.contrib.sqlalchemy import SQLAlchemyObjectType
from tests.utils import assert_equal_lists
from .models import Reporter
def test_should_raise_if_no_model():
with raises(Exception) as excinfo:
class Character1(SQLAlchemyObjectType):
assert 'model in the Meta' in str(excinfo.value)
def test_should_raise_if_model_is_invalid():
with raises(Exception) as excinfo:
class Character2(SQLAlchemyObjectType):
class Meta:
model = 1
assert 'not a SQLAlchemy model' in str(excinfo.value)
def test_should_map_fields_correctly():
class ReporterType2(SQLAlchemyObjectType):
class Meta:
model = Reporter
['articles', 'first_name', 'last_name', 'email', 'pets', 'id']
def test_should_map_only_few_fields():
class Reporter2(SQLAlchemyObjectType):
class Meta:
model = Reporter
only_fields = ('id', 'email')
['id', 'email']
@ -1,102 +0,0 @@
from graphql.type import GraphQLObjectType
from pytest import raises
from graphene import Schema
from graphene.contrib.sqlalchemy.types import (SQLAlchemyNode,
from graphene.core.fields import Field
from graphene.core.types.scalars import Int
from graphene.relay.fields import GlobalIDField
from tests.utils import assert_equal_lists
from .models import Article, Reporter
schema = Schema()
class Character(SQLAlchemyObjectType):
'''Character description'''
class Meta:
model = Reporter
class Human(SQLAlchemyNode):
'''Human description'''
pub_date = Int()
class Meta:
model = Article
exclude_fields = ('id', )
def test_sqlalchemy_interface():
assert SQLAlchemyNode._meta.interface is True
# @patch('graphene.contrib.sqlalchemy.tests.models.Article.filter', return_value=Article(id=1))
# def test_sqlalchemy_get_node(get):
# human = Human.get_node(1, None)
# get.assert_called_with(id=1)
# assert human.id == 1
def test_objecttype_registered():
object_type = schema.T(Character)
assert isinstance(object_type, GraphQLObjectType)
assert Character._meta.model == Reporter
['articles', 'firstName', 'lastName', 'email', 'id']
def test_sqlalchemynode_idfield():
idfield = SQLAlchemyNode._meta.fields_map['id']
assert isinstance(idfield, GlobalIDField)
def test_node_idfield():
idfield = Human._meta.fields_map['id']
assert isinstance(idfield, GlobalIDField)
def test_node_replacedfield():
idfield = Human._meta.fields_map['pub_date']
assert isinstance(idfield, Field)
assert schema.T(idfield).type == schema.T(Int())
def test_interface_objecttype_init_none():
h = Human()
assert h._root is None
def test_interface_objecttype_init_good():
instance = Article()
h = Human(instance)
assert h._root == instance
def test_interface_objecttype_init_unexpected():
with raises(AssertionError) as excinfo:
assert str(excinfo.value) == "Human received a non-compatible instance (object) when expecting Article"
def test_object_type():
object_type = schema.T(Human)
assert Human._meta.interface is False
assert isinstance(object_type, GraphQLObjectType)
['headline', 'id', 'reporter', 'reporterId', 'pubDate']
assert schema.T(SQLAlchemyNode) in object_type.get_interfaces()
def test_node_notinterface():
assert Human._meta.interface is False
assert SQLAlchemyNode in Human._meta.interfaces
@ -1,25 +0,0 @@
from graphene import ObjectType, Schema, String
from ..utils import get_session
def test_get_session():
session = 'My SQLAlchemy session'
schema = Schema(session=session)
class Query(ObjectType):
x = String()
def resolve_x(self, args, info):
return get_session(info)
query = '''
query ReporterQuery {
schema = Schema(query=Query, session=session)
result = schema.execute(query)
assert not result.errors
assert result.data['x'] == session
@ -1,125 +0,0 @@
import inspect
import six
from sqlalchemy.inspection import inspect as sqlalchemyinspect
from sqlalchemy.orm.exc import NoResultFound
from ...core.classtypes.objecttype import ObjectType, ObjectTypeMeta
from ...relay.types import Connection, Node, NodeMeta
from .converter import (convert_sqlalchemy_column,
from .options import SQLAlchemyOptions
from .utils import get_query, is_mapped
class SQLAlchemyObjectTypeMeta(ObjectTypeMeta):
options_class = SQLAlchemyOptions
def construct_fields(cls):
only_fields = cls._meta.only_fields
exclude_fields = cls._meta.exclude_fields
already_created_fields = {f.attname for f in cls._meta.local_fields}
inspected_model = sqlalchemyinspect(cls._meta.model)
# Get all the columns for the relationships on the model
for relationship in inspected_model.relationships:
is_not_in_only = only_fields and relationship.key not in only_fields
is_already_created = relationship.key in already_created_fields
is_excluded = relationship.key in exclude_fields or is_already_created
if is_not_in_only or is_excluded:
# We skip this field if we specify only_fields and is not
# in there. Or when we excldue this field in exclude_fields
converted_relationship = convert_sqlalchemy_relationship(relationship)
cls.add_to_class(relationship.key, converted_relationship)
for column in inspected_model.columns:
is_not_in_only = only_fields and column.name not in only_fields
is_already_created = column.name in already_created_fields
is_excluded = column.name in exclude_fields or is_already_created
if is_not_in_only or is_excluded:
# We skip this field if we specify only_fields and is not
# in there. Or when we excldue this field in exclude_fields
converted_column = convert_sqlalchemy_column(column)
cls.add_to_class(column.name, converted_column)
def construct(cls, *args, **kwargs):
cls = super(SQLAlchemyObjectTypeMeta, cls).construct(*args, **kwargs)
if not cls._meta.abstract:
if not cls._meta.model:
raise Exception(
'SQLAlchemy ObjectType %s must have a model in the Meta class attr' %
elif not inspect.isclass(cls._meta.model) or not is_mapped(cls._meta.model):
raise Exception('Provided model in %s is not a SQLAlchemy model' % cls)
return cls
class InstanceObjectType(ObjectType):
class Meta:
abstract = True
def __init__(self, _root=None):
super(InstanceObjectType, self).__init__(_root=_root)
assert not self._root or isinstance(self._root, self._meta.model), (
'{} received a non-compatible instance ({}) '
'when expecting {}'.format(
def instance(self):
return self._root
def instance(self, value):
self._root = value
class SQLAlchemyObjectType(six.with_metaclass(
SQLAlchemyObjectTypeMeta, InstanceObjectType)):
class Meta:
abstract = True
class SQLAlchemyConnection(Connection):
class SQLAlchemyNodeMeta(SQLAlchemyObjectTypeMeta, NodeMeta):
class NodeInstance(Node, InstanceObjectType):
class Meta:
abstract = True
class SQLAlchemyNode(six.with_metaclass(
SQLAlchemyNodeMeta, NodeInstance)):
class Meta:
abstract = True
def to_global_id(self):
id_ = getattr(self.instance, self._meta.identifier)
return self.global_id(id_)
def get_node(cls, id, info=None):
model = cls._meta.model
identifier = cls._meta.identifier
query = get_query(model, info)
instance = query.filter(getattr(model, identifier) == id).one()
return cls(instance)
except NoResultFound:
return None
@ -1,49 +0,0 @@
from sqlalchemy.ext.declarative.api import DeclarativeMeta
from sqlalchemy.orm.query import Query
from graphene.utils import LazyList
def get_type_for_model(schema, model):
schema = schema
types = schema.types.values()
for _type in types:
type_model = hasattr(_type, '_meta') and getattr(
_type._meta, 'model', None)
if model == type_model:
return _type
def get_session(info):
schema = info.schema.graphene_schema
return schema.options.get('session')
def get_query(model, info):
query = getattr(model, 'query', None)
if not query:
session = get_session(info)
if not session:
raise Exception('A query in the model Base or a session in the schema is required for querying.\n'
'Read more http://graphene-python.org/docs/sqlalchemy/tips/#querying')
query = session.query(model)
return query
class WrappedQuery(LazyList):
def __len__(self):
# Dont calculate the length using len(query), as this will
# evaluate the whole queryset and return it's length.
# Use .count() instead
return self._origin.count()
def maybe_query(value):
if isinstance(value, Query):
return WrappedQuery(value)
return value
def is_mapped(obj):
return isinstance(obj, DeclarativeMeta)
@ -1,48 +0,0 @@
from .schema import (
from .classtypes import (
from .types import (
__all__ = [
@ -1,18 +0,0 @@
from .inputobjecttype import InputObjectType
from .interface import Interface
from .mutation import Mutation
from .objecttype import ObjectType
from .options import Options
from .scalar import Scalar
from .enum import Enum
from .uniontype import UnionType
__all__ = [
@ -1,131 +0,0 @@
import copy
import inspect
from collections import OrderedDict
from functools import partial
import six
from .options import Options
class ClassTypeMeta(type):
options_class = Options
def __new__(mcs, name, bases, attrs):
super_new = super(ClassTypeMeta, mcs).__new__
module = attrs.pop('__module__', None)
doc = attrs.pop('__doc__', None)
new_class = super_new(mcs, name, bases, {
'__module__': module,
'__doc__': doc
attr_meta = attrs.pop('Meta', None)
if not attr_meta:
meta = getattr(new_class, 'Meta', None)
meta = attr_meta
new_class.add_to_class('_meta', new_class.get_options(meta))
return mcs.construct(new_class, bases, attrs)
def get_options(cls, meta):
return cls.options_class(meta)
def add_to_class(cls, name, value):
# We should call the contribute_to_class method only if it's bound
if not inspect.isclass(value) and hasattr(
value, 'contribute_to_class'):
value.contribute_to_class(cls, name)
setattr(cls, name, value)
def construct(cls, bases, attrs):
# Add all attributes to the class.
for obj_name, obj in attrs.items():
cls.add_to_class(obj_name, obj)
if not cls._meta.abstract:
from ..types import List, NonNull
setattr(cls, 'NonNull', partial(NonNull, cls))
setattr(cls, 'List', partial(List, cls))
return cls
class ClassType(six.with_metaclass(ClassTypeMeta)):
class Meta:
abstract = True
def internal_type(cls, schema):
raise NotImplementedError("Function internal_type not implemented in type {}".format(cls))
class FieldsOptions(Options):
def __init__(self, *args, **kwargs):
super(FieldsOptions, self).__init__(*args, **kwargs)
self.local_fields = []
def add_field(self, field):
def fields(self):
return sorted(self.local_fields)
def fields_map(self):
return OrderedDict([(f.attname, f) for f in self.fields])
def fields_group_type(self):
from ..types.field import FieldsGroupType
return FieldsGroupType(*self.local_fields)
class FieldsClassTypeMeta(ClassTypeMeta):
options_class = FieldsOptions
def extend_fields(cls, bases):
new_fields = cls._meta.local_fields
field_names = {f.attname: f for f in new_fields}
for base in bases:
if not isinstance(base, FieldsClassTypeMeta):
parent_fields = base._meta.local_fields
for field in parent_fields:
if field.attname in field_names and field.type.__class__ != field_names[
raise Exception(
'Local field %r in class %r (%r) clashes '
'with field with similar name from '
'Interface %s (%r)' % (
new_field = copy.copy(field)
cls.add_to_class(field.attname, new_field)
def construct(cls, bases, attrs):
cls = super(FieldsClassTypeMeta, cls).construct(bases, attrs)
return cls
class FieldsClassType(six.with_metaclass(FieldsClassTypeMeta, ClassType)):
class Meta:
abstract = True
def fields_internal_types(cls, schema):
return schema.T(cls._meta.fields_group_type)
@ -1,51 +0,0 @@
import six
from graphql.type import GraphQLEnumType, GraphQLEnumValue
from ...utils.enum import Enum as PyEnum
from ..types.base import MountedType
from .base import ClassType, ClassTypeMeta
class EnumMeta(ClassTypeMeta):
def construct(cls, bases, attrs):
__enum__ = attrs.get('__enum__', None)
if not cls._meta.abstract and not __enum__:
__enum__ = PyEnum(cls._meta.type_name, attrs)
setattr(cls, '__enum__', __enum__)
if __enum__:
for k, v in __enum__.__members__.items():
attrs[k] = v.value
return super(EnumMeta, cls).construct(bases, attrs)
def __call__(cls, *args, **kwargs):
if cls is Enum:
return cls.create_enum(*args, **kwargs)
return super(EnumMeta, cls).__call__(*args, **kwargs)
def create_enum(cls, name, names=None, description=None):
attrs = {
'__enum__': PyEnum(name, names)
if description:
attrs['__doc__'] = description
return type(name, (Enum,), attrs)
class Enum(six.with_metaclass(EnumMeta, ClassType, MountedType)):
class Meta:
abstract = True
def internal_type(cls, schema):
if cls._meta.abstract:
raise Exception("Abstract Enum don't have a specific type.")
values = {k: GraphQLEnumValue(v.value) for k, v in cls.__enum__.__members__.items()}
# GraphQLEnumValue
return GraphQLEnumType(
@ -1,25 +0,0 @@
from functools import partial
from graphql.type import GraphQLInputObjectType
from .base import FieldsClassType
class InputObjectType(FieldsClassType):
class Meta:
abstract = True
def __init__(self, *args, **kwargs):
raise Exception("An InputObjectType cannot be initialized")
def internal_type(cls, schema):
if cls._meta.abstract:
raise Exception("Abstract InputObjectTypes don't have a specific type.")
return GraphQLInputObjectType(
fields=partial(cls.fields_internal_types, schema),
@ -1,53 +0,0 @@
from functools import partial
import six
from graphql.type import GraphQLInterfaceType
from .base import FieldsClassTypeMeta
from .objecttype import ObjectType, ObjectTypeMeta
class InterfaceMeta(ObjectTypeMeta):
def construct(cls, bases, attrs):
if cls._meta.abstract or Interface in bases:
# Return Interface type
cls = FieldsClassTypeMeta.construct(cls, bases, attrs)
setattr(cls._meta, 'interface', True)
return cls
# Return ObjectType class with all the inherited interfaces
cls = super(InterfaceMeta, cls).construct(bases, attrs)
for interface in bases:
is_interface = issubclass(interface, Interface) and getattr(interface._meta, 'interface', False)
if not is_interface:
return cls
class Interface(six.with_metaclass(InterfaceMeta, ObjectType)):
class Meta:
abstract = True
def __init__(self, *args, **kwargs):
if self._meta.interface:
raise Exception("An interface cannot be initialized")
return super(Interface, self).__init__(*args, **kwargs)
def _resolve_type(cls, schema, instance, *args):
return schema.T(instance.__class__)
def internal_type(cls, schema):
if not cls._meta.interface:
return super(Interface, cls).internal_type(schema)
return GraphQLInterfaceType(
resolve_type=partial(cls._resolve_type, schema),
fields=partial(cls.fields_internal_types, schema)
@ -1,32 +0,0 @@
import six
from .objecttype import ObjectType, ObjectTypeMeta
class MutationMeta(ObjectTypeMeta):
def construct(cls, bases, attrs):
input_class = attrs.pop('Input', None)
if input_class:
items = dict(vars(input_class))
items.pop('__dict__', None)
items.pop('__doc__', None)
items.pop('__module__', None)
items.pop('__weakref__', None)
cls.add_to_class('arguments', cls.construct_arguments(items))
cls = super(MutationMeta, cls).construct(bases, attrs)
return cls
def construct_arguments(cls, items):
from ..types.argument import ArgumentsGroup
return ArgumentsGroup(**items)
class Mutation(six.with_metaclass(MutationMeta, ObjectType)):
class Meta:
abstract = True
def get_arguments(cls):
return cls.arguments
@ -1,109 +0,0 @@
from functools import partial
import six
from graphql.type import GraphQLObjectType
from graphene import signals
from .base import FieldsClassType, FieldsClassTypeMeta, FieldsOptions
from .uniontype import UnionType
def is_objecttype(cls):
if not issubclass(cls, ObjectType):
return False
return not(cls._meta.abstract or cls._meta.interface)
class ObjectTypeOptions(FieldsOptions):
def __init__(self, *args, **kwargs):
super(ObjectTypeOptions, self).__init__(*args, **kwargs)
self.interface = False
self.valid_attrs += ['interfaces']
self.interfaces = []
class ObjectTypeMeta(FieldsClassTypeMeta):
def construct(cls, bases, attrs):
cls = super(ObjectTypeMeta, cls).construct(bases, attrs)
if not cls._meta.abstract:
union_types = list(filter(is_objecttype, bases))
if len(union_types) > 1:
meta_attrs = dict(cls._meta.original_attrs, types=union_types)
Meta = type('Meta', (object, ), meta_attrs)
attrs['Meta'] = Meta
attrs['__module__'] = cls.__module__
attrs['__doc__'] = cls.__doc__
return type(cls.__name__, (UnionType, ), attrs)
return cls
options_class = ObjectTypeOptions
class ObjectType(six.with_metaclass(ObjectTypeMeta, FieldsClassType)):
class Meta:
abstract = True
def __getattr__(self, name):
if name == '_root':
return getattr(self._root, name)
def __init__(self, *args, **kwargs):
signals.pre_init.send(self.__class__, args=args, kwargs=kwargs)
self._root = kwargs.pop('_root', None)
args_len = len(args)
fields = self._meta.fields
if args_len > len(fields):
# Daft, but matches old exception sans the err msg.
raise IndexError("Number of args exceeds number of fields")
fields_iter = iter(fields)
if not kwargs:
for val, field in zip(args, fields_iter):
setattr(self, field.attname, val)
for val, field in zip(args, fields_iter):
setattr(self, field.attname, val)
kwargs.pop(field.attname, None)
for field in fields_iter:
val = kwargs.pop(field.attname)
setattr(self, field.attname, val)
except KeyError:
if kwargs:
for prop in list(kwargs):
if isinstance(getattr(self.__class__, prop), property):
setattr(self, prop, kwargs.pop(prop))
except AttributeError:
if kwargs:
raise TypeError(
"'%s' is an invalid keyword argument for this function" %
signals.post_init.send(self.__class__, instance=self)
def internal_type(cls, schema):
if cls._meta.abstract:
raise Exception("Abstract ObjectTypes don't have a specific type.")
return GraphQLObjectType(
interfaces=list(map(schema.T, cls._meta.interfaces)),
fields=partial(cls.fields_internal_types, schema),
is_type_of=getattr(cls, 'is_type_of', None)
def wrap(cls, instance, args, info):
return cls(_root=instance)
@ -1,21 +0,0 @@
from graphql.type import GraphQLScalarType
from ..types.base import MountedType
from .base import ClassType
class Scalar(ClassType, MountedType):
def internal_type(cls, schema):
serialize = getattr(cls, 'serialize')
parse_literal = getattr(cls, 'parse_literal')
parse_value = getattr(cls, 'parse_value')
return GraphQLScalarType(
@ -1,78 +0,0 @@
from ...schema import Schema
from ...types import Field, List, NonNull, String
from ..base import ClassType, FieldsClassType
def test_classtype_basic():
class Character(ClassType):
'''Character description'''
assert Character._meta.type_name == 'Character'
assert Character._meta.description == 'Character description'
def test_classtype_advanced():
class Character(ClassType):
class Meta:
type_name = 'OtherCharacter'
description = 'OtherCharacter description'
assert Character._meta.type_name == 'OtherCharacter'
assert Character._meta.description == 'OtherCharacter description'
def test_classtype_definition_list():
class Character(ClassType):
'''Character description'''
assert isinstance(Character.List(), List)
assert Character.List().of_type == Character
def test_classtype_definition_nonnull():
class Character(ClassType):
'''Character description'''
assert isinstance(Character.NonNull(), NonNull)
assert Character.NonNull().of_type == Character
def test_fieldsclasstype_definition_order():
class Character(ClassType):
'''Character description'''
class Query(FieldsClassType):
name = String()
char = Character.NonNull()
assert list(Query._meta.fields_map.keys()) == ['name', 'char']
def test_fieldsclasstype():
f = Field(String())
class Character(FieldsClassType):
field_name = f
assert Character._meta.fields == [f]
def test_fieldsclasstype_fieldtype():
f = Field(String())
class Character(FieldsClassType):
field_name = f
schema = Schema(query=Character)
assert Character.fields_internal_types(schema)['fieldName'] == schema.T(f)
assert Character._meta.fields_map['field_name'] == f
def test_fieldsclasstype_inheritfields():
name_field = Field(String())
last_name_field = Field(String())
class Fields1(FieldsClassType):
name = name_field
class Fields2(Fields1):
last_name = last_name_field
assert list(Fields2._meta.fields_map.keys()) == ['name', 'last_name']
@ -1,49 +0,0 @@
from graphql.type import GraphQLEnumType
from graphene.core.schema import Schema
from ..enum import Enum
from ..objecttype import ObjectType
def test_enum():
class RGB(Enum):
'''RGB enum description'''
RED = 0
BLUE = 2
schema = Schema()
object_type = schema.T(RGB)
assert isinstance(object_type, GraphQLEnumType)
assert RGB._meta.type_name == 'RGB'
assert RGB._meta.description == 'RGB enum description'
assert RGB.RED == 0
assert RGB.GREEN == 1
assert RGB.BLUE == 2
def test_enum_values():
RGB = Enum('RGB', dict(RED=0, GREEN=1, BLUE=2), description='RGB enum description')
schema = Schema()
object_type = schema.T(RGB)
assert isinstance(object_type, GraphQLEnumType)
assert RGB._meta.type_name == 'RGB'
assert RGB._meta.description == 'RGB enum description'
assert RGB.RED == 0
assert RGB.GREEN == 1
assert RGB.BLUE == 2
def test_enum_instance():
RGB = Enum('RGB', dict(RED=0, GREEN=1, BLUE=2))
RGB_field = RGB(description='RGB enum description')
class ObjectWithColor(ObjectType):
color = RGB_field
object_field = ObjectWithColor._meta.fields_map['color']
assert object_field.description == 'RGB enum description'
@ -1,21 +0,0 @@
from graphql.type import GraphQLInputObjectType
from graphene.core.schema import Schema
from graphene.core.types import String
from ..inputobjecttype import InputObjectType
def test_inputobjecttype():
class InputCharacter(InputObjectType):
'''InputCharacter description'''
name = String()
schema = Schema()
object_type = schema.T(InputCharacter)
assert isinstance(object_type, GraphQLInputObjectType)
assert InputCharacter._meta.type_name == 'InputCharacter'
assert object_type.description == 'InputCharacter description'
assert list(object_type.get_fields().keys()) == ['name']
@ -1,86 +0,0 @@
from graphql.type import GraphQLInterfaceType, GraphQLObjectType
from py.test import raises
from graphene.core.schema import Schema
from graphene.core.types import String
from ..interface import Interface
from ..objecttype import ObjectType
def test_interface():
class Character(Interface):
'''Character description'''
name = String()
schema = Schema()
object_type = schema.T(Character)
assert issubclass(Character, Interface)
assert isinstance(object_type, GraphQLInterfaceType)
assert Character._meta.interface
assert Character._meta.type_name == 'Character'
assert object_type.description == 'Character description'
assert list(object_type.get_fields().keys()) == ['name']
def test_interface_cannot_initialize():
class Character(Interface):
with raises(Exception) as excinfo:
assert 'An interface cannot be initialized' == str(excinfo.value)
def test_interface_inheritance_abstract():
class Character(Interface):
class ShouldBeInterface(Character):
class Meta:
abstract = True
class ShouldBeObjectType(ShouldBeInterface):
assert ShouldBeInterface._meta.interface
assert not ShouldBeObjectType._meta.interface
assert issubclass(ShouldBeObjectType, ObjectType)
def test_interface_inheritance():
class Character(Interface):
class GeneralInterface(Interface):
class ShouldBeObjectType(GeneralInterface, Character):
schema = Schema()
assert Character._meta.interface
assert not ShouldBeObjectType._meta.interface
assert issubclass(ShouldBeObjectType, ObjectType)
assert Character in ShouldBeObjectType._meta.interfaces
assert GeneralInterface in ShouldBeObjectType._meta.interfaces
assert isinstance(schema.T(Character), GraphQLInterfaceType)
assert isinstance(schema.T(ShouldBeObjectType), GraphQLObjectType)
def test_interface_inheritance_non_objects():
class CommonClass(object):
common_attr = True
class Character(CommonClass, Interface):
class ShouldBeObjectType(Character):
assert Character._meta.interface
assert Character.common_attr
assert ShouldBeObjectType.common_attr
@ -1,27 +0,0 @@
from graphql.type import GraphQLObjectType
from graphene.core.schema import Schema
from graphene.core.types import String
from ...types.argument import ArgumentsGroup
from ..mutation import Mutation
def test_mutation():
class MyMutation(Mutation):
'''MyMutation description'''
class Input:
arg_name = String()
name = String()
schema = Schema()
object_type = schema.T(MyMutation)
assert MyMutation._meta.type_name == 'MyMutation'
assert isinstance(object_type, GraphQLObjectType)
assert object_type.description == 'MyMutation description'
assert list(object_type.get_fields().keys()) == ['name']
assert MyMutation._meta.fields_map['name'].object_type == MyMutation
assert isinstance(MyMutation.arguments, ArgumentsGroup)
assert 'argName' in schema.T(MyMutation.arguments)
@ -1,116 +0,0 @@
from graphql.type import GraphQLObjectType
from py.test import raises
from graphene.core.schema import Schema
from graphene.core.types import String
from ..objecttype import ObjectType
from ..uniontype import UnionType
def test_object_type():
class Human(ObjectType):
'''Human description'''
name = String()
friends = String()
schema = Schema()
object_type = schema.T(Human)
assert Human._meta.type_name == 'Human'
assert isinstance(object_type, GraphQLObjectType)
assert object_type.description == 'Human description'
assert list(object_type.get_fields().keys()) == ['name', 'friends']
assert Human._meta.fields_map['name'].object_type == Human
def test_object_type_container():
class Human(ObjectType):
name = String()
friends = String()
h = Human(name='My name')
assert h.name == 'My name'
def test_object_type_set_properties():
class Human(ObjectType):
name = String()
friends = String()
def readonly_prop(self):
return 'readonly'
def write_prop(self):
return self._write_prop
def write_prop(self, value):
self._write_prop = value
h = Human(readonly_prop='custom', write_prop='custom')
assert h.readonly_prop == 'readonly'
assert h.write_prop == 'custom'
def test_object_type_container_invalid_kwarg():
class Human(ObjectType):
name = String()
with raises(TypeError):
Human(invalid='My name')
def test_object_type_container_too_many_args():
class Human(ObjectType):
name = String()
with raises(IndexError):
Human('Peter', 'No friends :(', None)
def test_object_type_union():
class Human(ObjectType):
name = String()
class Pet(ObjectType):
name = String()
class Thing(Human, Pet):
'''Thing union description'''
my_attr = True
assert issubclass(Thing, UnionType)
assert Thing._meta.types == [Human, Pet]
assert Thing._meta.type_name == 'Thing'
assert Thing._meta.description == 'Thing union description'
assert Thing.my_attr
def test_object_type_not_union_if_abstract():
schema = Schema()
class Query1(ObjectType):
field1 = String()
class Meta:
abstract = True
class Query2(ObjectType):
field2 = String()
class Meta:
abstract = True
class Query(Query1, Query2):
'''Query description'''
my_attr = True
object_type = schema.T(Query)
assert issubclass(Query, ObjectType)
assert Query._meta.type_name == 'Query'
assert Query._meta.description == 'Query description'
assert isinstance(object_type, GraphQLObjectType)
assert list(Query._meta.fields_map.keys()) == ['field1', 'field2']
@ -1,54 +0,0 @@
from py.test import raises
from graphene.core.classtypes import Options
class Meta:
type_name = 'Character'
class InvalidMeta:
other_value = True
def test_options_contribute():
opt = Options(Meta)
class ObjectType(object):
opt.contribute_to_class(ObjectType, '_meta')
assert ObjectType._meta == opt
def test_options_typename():
opt = Options(Meta)
class ObjectType(object):
opt.contribute_to_class(ObjectType, '_meta')
assert opt.type_name == 'Character'
def test_options_description():
opt = Options(Meta)
class ObjectType(object):
'''False description'''
opt.contribute_to_class(ObjectType, '_meta')
assert opt.description == 'False description'
def test_field_no_contributed_raises_error():
opt = Options(InvalidMeta)
class ObjectType(object):
with raises(Exception) as excinfo:
opt.contribute_to_class(ObjectType, '_meta')
assert 'invalid attribute' in str(excinfo.value)
@ -1,32 +0,0 @@
from graphql.type import GraphQLScalarType
from ...schema import Schema
from ..scalar import Scalar
def test_custom_scalar():
import datetime
from graphql.language import ast
class DateTimeScalar(Scalar):
'''DateTimeScalar Documentation'''
def serialize(dt):
return dt.isoformat()
def parse_literal(node):
if isinstance(node, ast.StringValue):
return datetime.datetime.strptime(
node.value, "%Y-%m-%dT%H:%M:%S.%f")
def parse_value(value):
return datetime.datetime.strptime(value, "%Y-%m-%dT%H:%M:%S.%f")
schema = Schema()
scalar_type = schema.T(DateTimeScalar)
assert isinstance(scalar_type, GraphQLScalarType)
assert scalar_type.name == 'DateTimeScalar'
assert scalar_type.description == 'DateTimeScalar Documentation'
@ -1,28 +0,0 @@
from graphql.type import GraphQLUnionType
from graphene.core.schema import Schema
from graphene.core.types import String
from ..objecttype import ObjectType
from ..uniontype import UnionType
def test_uniontype():
class Human(ObjectType):
name = String()
class Pet(ObjectType):
name = String()
class Thing(UnionType):
'''Thing union description'''
class Meta:
types = [Human, Pet]
schema = Schema()
object_type = schema.T(Thing)
assert isinstance(object_type, GraphQLUnionType)
assert Thing._meta.type_name == 'Thing'
assert object_type.description == 'Thing union description'
assert object_type.get_types() == [schema.T(Human), schema.T(Pet)]
@ -1,42 +0,0 @@
from functools import partial
import six
from graphql.type import GraphQLUnionType
from .base import FieldsClassType, FieldsClassTypeMeta, FieldsOptions
class UnionTypeOptions(FieldsOptions):
def __init__(self, *args, **kwargs):
super(UnionTypeOptions, self).__init__(*args, **kwargs)
self.types = []
class UnionTypeMeta(FieldsClassTypeMeta):
options_class = UnionTypeOptions
def get_options(cls, meta):
return cls.options_class(meta, types=[])
class UnionType(six.with_metaclass(UnionTypeMeta, FieldsClassType)):
class Meta:
abstract = True
def _resolve_type(cls, schema, instance, *args):
return schema.T(instance.__class__)
def internal_type(cls, schema):
if cls._meta.abstract:
raise Exception("Abstract ObjectTypes don't have a specific type.")
return GraphQLUnionType(
types=list(map(schema.T, cls._meta.types)),
resolve_type=partial(cls._resolve_type, schema),
@ -1,2 +0,0 @@
class SkipField(Exception):
@ -1,49 +0,0 @@
import warnings
from .types.base import FieldType
from .types.definitions import List, NonNull
from .types.field import Field
from .types.scalars import ID, Boolean, Float, Int, String
class DeprecatedField(FieldType):
def __init__(self, *args, **kwargs):
cls = self.__class__
warnings.warn("Using {} is not longer supported".format(
cls.__name__), FutureWarning)
if 'resolve' in kwargs:
kwargs['resolver'] = kwargs.pop('resolve')
return super(DeprecatedField, self).__init__(*args, **kwargs)
class StringField(DeprecatedField, String):
class IntField(DeprecatedField, Int):
class BooleanField(DeprecatedField, Boolean):
class IDField(DeprecatedField, ID):
class FloatField(DeprecatedField, Float):
class ListField(DeprecatedField, List):
class NonNullField(DeprecatedField, NonNull):
__all__ = ['Field', 'StringField', 'IntField', 'BooleanField',
'IDField', 'FloatField', 'ListField', 'NonNullField']
@ -1,133 +0,0 @@
import inspect
from graphql import graphql
from graphql.type import GraphQLSchema as _GraphQLSchema
from graphql.utils.introspection_query import introspection_query
from graphql.utils.schema_printer import print_schema
from graphene import signals
from ..middlewares import MiddlewareManager, CamelCaseArgsMiddleware
from .classtypes.base import ClassType
from .types.base import InstanceType
class GraphQLSchema(_GraphQLSchema):
def __init__(self, schema, *args, **kwargs):
self.graphene_schema = schema
super(GraphQLSchema, self).__init__(*args, **kwargs)
class Schema(object):
_executor = None
def __init__(self, query=None, mutation=None, subscription=None,
name='Schema', executor=None, middlewares=None, auto_camelcase=True, **options):
self._types_names = {}
self._types = {}
self.mutation = mutation
self.query = query
self.subscription = subscription
self.name = name
self.executor = executor
if 'plugins' in options:
raise Exception('Plugins are deprecated, please use middlewares.')
middlewares = middlewares or []
if auto_camelcase:
self.auto_camelcase = auto_camelcase
self.middleware_manager = MiddlewareManager(self, middlewares)
self.options = options
def __repr__(self):
return '<Schema: %s (%s)>' % (str(self.name), hash(self))
def T(self, _type):
if not _type:
if isinstance(_type, ClassType):
_type = type(_type)
is_classtype = inspect.isclass(_type) and issubclass(_type, ClassType)
is_instancetype = isinstance(_type, InstanceType)
if is_classtype or is_instancetype:
if _type not in self._types:
internal_type = _type.internal_type(self)
self._types[_type] = internal_type
if is_classtype:
return self._types[_type]
return _type
def executor(self):
return self._executor
def executor(self, value):
self._executor = value
def schema(self):
if not self.query:
raise Exception('You have to define a base query type')
return GraphQLSchema(
types=[self.T(_type) for _type in list(self._types_names.values())],
def register(self, object_type, force=False):
type_name = object_type._meta.type_name
registered_object_type = not force and self._types_names.get(type_name, None)
if registered_object_type:
assert registered_object_type == object_type, 'Type {} already registered with other object type'.format(
self._types_names[object_type._meta.type_name] = object_type
return object_type
def objecttype(self, type):
name = getattr(type, 'name', None)
if name:
objecttype = self._types_names.get(name, None)
if objecttype and inspect.isclass(
objecttype) and issubclass(objecttype, ClassType):
return objecttype
def __str__(self):
return print_schema(self.schema)
def setup(self):
assert self.query, 'The base query type is not set'
def get_type(self, type_name):
if type_name not in self._types_names:
raise KeyError('Type %r not found in %r' % (type_name, self))
return self._types_names[type_name]
def resolver_with_middleware(self, resolver):
return self.middleware_manager.wrap(resolver)
def types(self):
return self._types_names
def execute(self, request_string='', root_value=None, variable_values=None,
context_value=None, operation_name=None, executor=None):
return graphql(
executor=executor or self._executor
def introspect(self):
return graphql(self.schema, introspection_query).data
@ -1,63 +0,0 @@
import graphene
from graphene.core.schema import Schema
my_id = 0
class Query(graphene.ObjectType):
base = graphene.String()
class ChangeNumber(graphene.Mutation):
'''Result mutation'''
class Input:
to = graphene.Int()
result = graphene.String()
def mutate(cls, instance, args, info):
global my_id
my_id = args.get('to', my_id + 1)
return ChangeNumber(result=my_id)
class MyResultMutation(graphene.ObjectType):
change_number = graphene.Field(ChangeNumber)
schema = Schema(query=Query, mutation=MyResultMutation)
def test_mutation_input():
assert list(schema.T(ChangeNumber.arguments).keys()) == ['to']
def test_execute_mutations():
query = '''
mutation M{
first: changeNumber {
second: changeNumber {
third: changeNumber(to: 5) {
expected = {
'first': {
'result': '1',
'second': {
'result': '2',
'third': {
'result': '5',
result = schema.execute(query, root_value=object())
assert not result.errors
assert result.data == expected
@ -1,178 +0,0 @@
from graphql.type import (GraphQLBoolean, GraphQLField, GraphQLFloat,
GraphQLID, GraphQLInt, GraphQLNonNull, GraphQLString)
from py.test import raises
from graphene.core.fields import (BooleanField, Field, FloatField, IDField,
IntField, NonNullField, StringField)
from graphene.core.schema import Schema
from graphene.core.types import ObjectType
class MyOt(ObjectType):
def resolve_customdoc(self, *args, **kwargs):
'''Resolver documentation'''
return None
def __str__(self):
return "ObjectType"
schema = Schema()
def test_field_no_contributed_raises_error():
f = Field(GraphQLString)
with raises(Exception):
def test_field_type():
f = Field(GraphQLString)
f.contribute_to_class(MyOt, 'field_name')
assert isinstance(schema.T(f), GraphQLField)
assert schema.T(f).type == GraphQLString
def test_field_name():
f = Field(GraphQLString)
f.contribute_to_class(MyOt, 'field_name')
assert f.name is None
assert f.attname == 'field_name'
def test_field_name_use_name_if_exists():
f = Field(GraphQLString, name='my_custom_name')
f.contribute_to_class(MyOt, 'field_name')
assert f.name == 'my_custom_name'
def test_stringfield_type():
f = StringField()
f.contribute_to_class(MyOt, 'field_name')
assert schema.T(f) == GraphQLString
def test_idfield_type():
f = IDField()
f.contribute_to_class(MyOt, 'field_name')
assert schema.T(f) == GraphQLID
def test_booleanfield_type():
f = BooleanField()
f.contribute_to_class(MyOt, 'field_name')
assert schema.T(f) == GraphQLBoolean
def test_intfield_type():
f = IntField()
f.contribute_to_class(MyOt, 'field_name')
assert schema.T(f) == GraphQLInt
def test_floatfield_type():
f = FloatField()
f.contribute_to_class(MyOt, 'field_name')
assert schema.T(f) == GraphQLFloat
def test_nonnullfield_type():
f = NonNullField(StringField())
f.contribute_to_class(MyOt, 'field_name')
assert isinstance(schema.T(f), GraphQLNonNull)
def test_stringfield_type_required():
f = StringField(required=True).as_field()
f.contribute_to_class(MyOt, 'field_name')
assert isinstance(schema.T(f), GraphQLField)
assert isinstance(schema.T(f).type, GraphQLNonNull)
def test_field_resolve():
f = StringField(required=True, resolve=lambda *args: 'RESOLVED').as_field()
f.contribute_to_class(MyOt, 'field_name')
field_type = schema.T(f)
assert 'RESOLVED' == field_type.resolver(MyOt, None, None, None).value
def test_field_resolve_type_custom():
class MyCustomType(ObjectType):
f = Field('MyCustomType')
class OtherType(ObjectType):
field_name = f
s = Schema()
s.query = OtherType
assert s.T(f).type == s.T(MyCustomType)
def test_field_orders():
f1 = Field(None)
f2 = Field(None)
assert f1 < f2
def test_field_orders_wrong_type():
field = Field(None)
assert not field < 1
except TypeError:
# Fix exception raising in Python3+
def test_field_eq():
f1 = Field(None)
f2 = Field(None)
assert f1 != f2
def test_field_eq_wrong_type():
field = Field(None)
assert field != 1
def test_field_hash():
f1 = Field(None)
f2 = Field(None)
assert hash(f1) != hash(f2)
def test_field_none_type_raises_error():
s = Schema()
f = Field(None)
f.contribute_to_class(MyOt, 'field_name')
with raises(Exception) as excinfo:
assert str(
excinfo.value) == "Internal type for field MyOt.field_name is None"
def test_field_str():
f = StringField().as_field()
f.contribute_to_class(MyOt, 'field_name')
assert str(f) == "MyOt.field_name"
def test_field_repr():
f = StringField().as_field()
assert repr(f) == "<graphene.core.types.field.Field>"
def test_field_repr_contributed():
f = StringField().as_field()
f.contribute_to_class(MyOt, 'field_name')
assert repr(f) == "<graphene.core.types.field.Field: field_name>"
def test_field_resolve_objecttype_cos():
f = StringField().as_field()
f.contribute_to_class(MyOt, 'customdoc')
field = schema.T(f)
assert field.description == 'Resolver documentation'
@ -1,64 +0,0 @@
from graphql import graphql
from graphql.type import GraphQLSchema
from graphene.core.fields import Field
from graphene.core.schema import Schema
from graphene.core.types import Interface, List, ObjectType, String
class Character(Interface):
name = String()
class Pet(ObjectType):
type = String()
def resolve_type(self, args, info):
return 'Dog'
class Human(Character):
friends = List(Character)
pet = Field(Pet)
def resolve_name(self, *args):
return 'Peter'
def resolve_friend(self, *args):
return Human(object())
def resolve_pet(self, *args):
return Pet(object())
schema = Schema()
Human_type = schema.T(Human)
def test_type():
assert Human._meta.fields_map['name'].resolver(
Human(object()), {}, None, None) == 'Peter'
def test_query():
schema = GraphQLSchema(query=Human_type)
query = '''
pet {
expected = {
'name': 'Peter',
'pet': {
'type': 'Dog'
result = graphql(schema, query, root_value=Human(object()))
assert not result.errors
assert result.data == expected
@ -1,214 +0,0 @@
from graphql import graphql
from py.test import raises
from graphene import Interface, List, ObjectType, Schema, String
from graphene.core.fields import Field
from graphene.core.types.base import LazyType
from tests.utils import assert_equal_lists
schema = Schema(name='My own schema')
class Character(Interface):
name = String()
class Pet(ObjectType):
type = String(resolver=lambda *_: 'Dog')
class Human(Character):
friends = List(Character)
pet = Field(Pet)
def resolve_name(self, *args):
return 'Peter'
def resolve_friend(self, *args):
return Human(object())
def resolve_pet(self, *args):
return Pet(object())
schema.query = Human
def test_get_registered_type():
assert schema.get_type('Character') == Character
def test_get_unregistered_type():
with raises(Exception) as excinfo:
assert 'not found' in str(excinfo.value)
def test_schema_query():
assert schema.query == Human
def test_query_schema_graphql():
query = '''
pet {
expected = {
'name': 'Peter',
'pet': {
'type': 'Dog'
result = graphql(schema.schema, query, root_value=Human(object()))
assert not result.errors
assert result.data == expected
def test_query_schema_execute():
query = '''
pet {
expected = {
'name': 'Peter',
'pet': {
'type': 'Dog'
result = schema.execute(query, root_value=object())
assert not result.errors
assert result.data == expected
def test_schema_get_type_map():
['__Field', 'String', 'Pet', 'Character', '__InputValue',
'__Directive', '__DirectiveLocation', '__TypeKind', '__Schema',
'__Type', 'Human', '__EnumValue', 'Boolean'])
def test_schema_no_query():
schema = Schema(name='My own schema')
with raises(Exception) as excinfo:
assert 'define a base query type' in str(excinfo)
def test_auto_camelcase_off():
schema = Schema(name='My own schema', auto_camelcase=False)
class Query(ObjectType):
test_field = String(resolver=lambda *_: 'Dog')
schema.query = Query
query = "query {test_field}"
expected = {"test_field": "Dog"}
result = graphql(schema.schema, query, root_value=Query(object()))
assert not result.errors
assert result.data == expected
def test_schema_register():
schema = Schema(name='My own schema')
class MyType(ObjectType):
type = String(resolver=lambda *_: 'Dog')
schema.query = MyType
assert schema.get_type('MyType') == MyType
def test_schema_register_interfaces():
class Query(ObjectType):
f = Field(Character)
def resolve_f(self, args, info):
return Human()
schema = Schema(query=Query)
result = schema.execute('{ f { name } }')
assert not result.errors
def test_schema_register_no_query_type():
schema = Schema(name='My own schema')
class MyType(ObjectType):
type = String(resolver=lambda *_: 'Dog')
with raises(Exception) as excinfo:
assert 'base query type' in str(excinfo.value)
def test_schema_introspect():
schema = Schema(name='My own schema')
class MyType(ObjectType):
type = String(resolver=lambda *_: 'Dog')
schema.query = MyType
introspection = schema.introspect()
assert '__schema' in introspection
def test_lazytype():
schema = Schema(name='My own schema')
t = LazyType('MyType')
class MyType(ObjectType):
type = String(resolver=lambda *_: 'Dog')
schema.query = MyType
assert schema.T(t) == schema.T(MyType)
def test_deprecated_plugins_throws_exception():
with raises(Exception) as excinfo:
assert 'Plugins are deprecated, please use middlewares' in str(excinfo.value)
def test_schema_str():
expected = """
schema {
query: Human
interface Character {
name: String
type Human implements Character {
name: String
friends: [Character]
pet: Pet
type Pet {
type: String
assert str(schema) == expected
@ -1,29 +0,0 @@
import graphene
class Query(graphene.ObjectType):
base = graphene.String()
class Subscription(graphene.ObjectType):
subscribe_to_foo = graphene.Boolean(id=graphene.Int())
def resolve_subscribe_to_foo(self, args, info):
return args.get('id') == 1
schema = graphene.Schema(query=Query, subscription=Subscription)
def test_execute_subscription():
query = '''
subscription {
subscribeToFoo(id: 1)
expected = {
'subscribeToFoo': True
result = schema.execute(query)
assert not result.errors
assert result.data == expected
@ -1,29 +0,0 @@
from .base import InstanceType, LazyType, OrderedType
from .argument import Argument, ArgumentsGroup, to_arguments
from .definitions import List, NonNull
# Compatibility import
from .objecttype import Interface, ObjectType, Mutation, InputObjectType
from .scalars import String, ID, Boolean, Int, Float
from .field import Field, InputField
__all__ = [
@ -1,53 +0,0 @@
from itertools import chain
from graphql.type import GraphQLArgument
from .base import ArgumentType, GroupNamedType, NamedType, OrderedType
class Argument(NamedType, OrderedType):
def __init__(self, type, description=None, default=None,
name=None, _creation_counter=None):
super(Argument, self).__init__(name=name, _creation_counter=_creation_counter)
self.type = type
self.description = description
self.default = default
def internal_type(self, schema):
return GraphQLArgument(
self.default, self.description)
def __repr__(self):
return self.name
class ArgumentsGroup(GroupNamedType):
def __init__(self, *args, **kwargs):
arguments = to_arguments(*args, **kwargs)
super(ArgumentsGroup, self).__init__(*arguments)
def to_arguments(*args, **kwargs):
arguments = {}
iter_arguments = chain(kwargs.items(), [(None, a) for a in args])
for default_name, arg in iter_arguments:
if isinstance(arg, Argument):
argument = arg
elif isinstance(arg, ArgumentType):
argument = arg.as_argument()
raise ValueError('Unknown argument %s=%r' % (default_name, arg))
if default_name:
argument.default_name = default_name
name = argument.name or argument.default_name
assert name, 'Argument in field must have a name'
assert name not in arguments, 'Found more than one Argument with same name {}'.format(name)
arguments[name] = argument
return sorted(arguments.values())
@ -1,170 +0,0 @@
from collections import OrderedDict
from functools import partial, total_ordering
import six
from ...utils import to_camel_case
class InstanceType(object):
def internal_type(self, schema):
raise NotImplementedError("internal_type for type {} is not implemented".format(self.__class__.__name__))
class MountType(InstanceType):
parent = None
def mount(self, cls):
self.parent = cls
class LazyType(MountType):
def __init__(self, type):
self.type = type
def is_self(self):
return self.type == 'self'
def internal_type(self, schema):
type = None
if callable(self.type):
type = self.type(self.parent)
elif isinstance(self.type, six.string_types):
if self.is_self:
type = self.parent
type = schema.get_type(self.type)
assert type, 'Type in %s %r cannot be none' % (self.type, self.parent)
return schema.T(type)
class OrderedType(MountType):
creation_counter = 0
def __init__(self, _creation_counter=None):
self.creation_counter = _creation_counter or self.gen_counter()
def gen_counter():
counter = OrderedType.creation_counter
OrderedType.creation_counter += 1
return counter
def __eq__(self, other):
# Needed for @total_ordering
if isinstance(self, type(other)):
return self.creation_counter == other.creation_counter
return NotImplemented
def __lt__(self, other):
# This is needed because bisect does not take a comparison function.
if isinstance(other, OrderedType):
return self.creation_counter < other.creation_counter
return NotImplemented
def __gt__(self, other):
# This is needed because bisect does not take a comparison function.
if isinstance(other, OrderedType):
return self.creation_counter > other.creation_counter
return NotImplemented
def __hash__(self):
return hash((self.creation_counter))
class MirroredType(OrderedType):
def __init__(self, *args, **kwargs):
_creation_counter = kwargs.pop('_creation_counter', None)
super(MirroredType, self).__init__(_creation_counter=_creation_counter)
self.args = args
self.kwargs = kwargs
def List(self): # noqa
from .definitions import List
return List(self, *self.args, **self.kwargs)
def NonNull(self): # noqa
from .definitions import NonNull
return NonNull(self, *self.args, **self.kwargs)
class ArgumentType(MirroredType):
def as_argument(self):
from .argument import Argument
return Argument(
self, _creation_counter=self.creation_counter, *self.args, **self.kwargs)
class FieldType(MirroredType):
def contribute_to_class(self, cls, name):
from ..classtypes.base import FieldsClassType
from ..classtypes.inputobjecttype import InputObjectType
if issubclass(cls, (InputObjectType)):
inputfield = self.as_inputfield()
return inputfield.contribute_to_class(cls, name)
elif issubclass(cls, (FieldsClassType)):
field = self.as_field()
return field.contribute_to_class(cls, name)
def as_field(self):
from .field import Field
return Field(self, _creation_counter=self.creation_counter,
*self.args, **self.kwargs)
def as_inputfield(self):
from .field import InputField
return InputField(
self, _creation_counter=self.creation_counter, *self.args, **self.kwargs)
class MountedType(FieldType, ArgumentType):
class NamedType(InstanceType):
def __init__(self, name=None, default_name=None, *args, **kwargs):
self.name = name
self.default_name = None
super(NamedType, self).__init__(*args, **kwargs)
class GroupNamedType(InstanceType):
def __init__(self, *types):
self.types = types
def get_named_type(self, schema, type):
name = type.name
if not name and schema.auto_camelcase:
name = to_camel_case(type.default_name)
elif not name:
name = type.default_name
return name, schema.T(type)
def iter_types(self, schema):
return map(partial(self.get_named_type, schema), self.types)
def internal_type(self, schema):
return OrderedDict(self.iter_types(schema))
def __len__(self):
return len(self.types)
def __iter__(self):
return iter(self.types)
def __contains__(self, *args):
return self.types.__contains__(*args)
def __getitem__(self, *args):
return self.types.__getitem__(*args)
@ -1,40 +0,0 @@
import json
import iso8601
from graphql.language import ast
from ...core.classtypes.scalar import Scalar
class JSONString(Scalar):
'''JSON String'''
def serialize(dt):
return json.dumps(dt)
def parse_literal(node):
if isinstance(node, ast.StringValue):
return json.dumps(node.value)
def parse_value(value):
return json.dumps(value)
class DateTime(Scalar):
'''DateTime in ISO 8601 format'''
def serialize(dt):
return dt.isoformat()
def parse_literal(node):
if isinstance(node, ast.StringValue):
return iso8601.parse_date(node.value)
def parse_value(value):
return iso8601.parse_date(value)
@ -1,29 +0,0 @@
import six
from graphql.type import GraphQLList, GraphQLNonNull
from .base import LazyType, MountedType, MountType
class OfType(MountedType):
def __init__(self, of_type, *args, **kwargs):
if isinstance(of_type, six.string_types):
of_type = LazyType(of_type)
self.of_type = of_type
super(OfType, self).__init__(*args, **kwargs)
def internal_type(self, schema):
return self.T(schema.T(self.of_type))
def mount(self, cls):
self.parent = cls
if isinstance(self.of_type, MountType):
class List(OfType):
T = GraphQLList
class NonNull(OfType):
T = GraphQLNonNull
@ -1,189 +0,0 @@
from collections import OrderedDict
from functools import wraps
import six
from graphql.type import GraphQLField, GraphQLInputObjectField
from ...utils import maybe_func
from ...utils.wrap_resolver_function import wrap_resolver_function
from ..classtypes.base import FieldsClassType
from ..classtypes.inputobjecttype import InputObjectType
from ..classtypes.mutation import Mutation
from ..exceptions import SkipField
from .argument import Argument, ArgumentsGroup
from .base import (ArgumentType, GroupNamedType, LazyType, MountType,
NamedType, OrderedType)
from .definitions import NonNull
class Field(NamedType, OrderedType):
def __init__(
self, type, description=None, args=None, name=None, resolver=None,
source=None, required=False, default=None, deprecation_reason=None,
*args_list, **kwargs):
_creation_counter = kwargs.pop('_creation_counter', None)
if isinstance(name, (Argument, ArgumentType)):
kwargs['name'] = name
name = None
super(Field, self).__init__(name=name, _creation_counter=_creation_counter)
if isinstance(type, six.string_types):
type = LazyType(type)
self.required = required
self.type = type
self.description = description
self.deprecation_reason = deprecation_reason
args = OrderedDict(args or {}, **kwargs)
self.arguments = ArgumentsGroup(*args_list, **args)
self.object_type = None
self.attname = None
self.default_name = None
self.resolver_fn = resolver
self.source = source
assert not (self.source and self.resolver_fn), ('You cannot have a source'
' and a resolver at the same time')
self.default = default
def contribute_to_class(self, cls, attname):
assert issubclass(
cls, (FieldsClassType)), 'Field {} cannot be mounted in {}'.format(
self, cls)
self.attname = attname
self.default_name = attname
self.object_type = cls
if isinstance(self.type, MountType):
def resolver(self):
resolver = self.get_resolver_fn()
return resolver
def default(self):
if callable(self._default):
return self._default()
return self._default
def default(self, value):
self._default = value
def get_resolver_fn(self):
if self.resolver_fn:
return self.resolver_fn
resolve_fn_name = 'resolve_%s' % self.attname
if hasattr(self.object_type, resolve_fn_name):
return getattr(self.object_type, resolve_fn_name)
def default_getter(instance, args, info):
value = getattr(instance, self.source or self.attname, self.default)
return maybe_func(value)
return default_getter
def get_type(self, schema):
if self.required:
return NonNull(self.type)
return self.type
def internal_type(self, schema):
if not self.object_type:
raise Exception('The field is not mounted in any ClassType')
resolver = self.resolver
description = self.description
arguments = self.arguments
if not description and resolver:
description = resolver.__doc__
type = schema.T(self.get_type(schema))
type_objecttype = schema.objecttype(type)
if type_objecttype and issubclass(type_objecttype, Mutation):
assert len(arguments) == 0
arguments = type_objecttype.get_arguments()
resolver = getattr(type_objecttype, 'mutate')
resolver = wrap_resolver_function(resolver)
my_resolver = wrap_resolver_function(resolver)
def wrapped_func(instance, args, context, info):
if not isinstance(instance, self.object_type):
instance = self.object_type(_root=instance)
return my_resolver(instance, args, context, info)
resolver = wrapped_func
assert type, 'Internal type for field %s is None' % str(self)
return GraphQLField(
def __repr__(self):
Displays the module, class and name of the field.
path = '%s.%s' % (self.__class__.__module__, self.__class__.__name__)
name = getattr(self, 'attname', None)
if name is not None:
return '<%s: %s>' % (path, name)
return '<%s>' % path
def __str__(self):
""" Return "object_type.field_name". """
return '%s.%s' % (self.object_type.__name__, self.attname)
def __eq__(self, other):
eq = super(Field, self).__eq__(other)
if isinstance(self, type(other)):
return eq and self.object_type == other.object_type
return NotImplemented
def __hash__(self):
return hash((self.creation_counter, self.object_type))
class InputField(NamedType, OrderedType):
def __init__(self, type, description=None, default=None,
name=None, _creation_counter=None, required=False):
super(InputField, self).__init__(_creation_counter=_creation_counter)
if isinstance(type, six.string_types):
type = LazyType(type)
if required:
type = NonNull(type)
self.type = type
self.description = description
self.default = default
def contribute_to_class(self, cls, attname):
assert issubclass(
cls, (InputObjectType)), 'InputField {} cannot be mounted in {}'.format(
self, cls)
self.attname = attname
self.default_name = attname
self.object_type = cls
if isinstance(self.type, MountType):
def internal_type(self, schema):
return GraphQLInputObjectField(
default_value=self.default, description=self.description
class FieldsGroupType(GroupNamedType):
def iter_types(self, schema):
for field in sorted(self.types):
yield self.get_named_type(schema, field)
except SkipField:
@ -1,3 +0,0 @@
from ..classtypes import InputObjectType, Interface, Mutation, ObjectType
__all__ = ['ObjectType', 'Interface', 'Mutation', 'InputObjectType']
@ -1,30 +0,0 @@
from graphql.type import (GraphQLBoolean, GraphQLFloat, GraphQLID, GraphQLInt,
from .base import MountedType
class ScalarType(MountedType):
def internal_type(self, schema):
return self._internal_type
class String(ScalarType):
_internal_type = GraphQLString
class Int(ScalarType):
_internal_type = GraphQLInt
class Boolean(ScalarType):
_internal_type = GraphQLBoolean
class ID(ScalarType):
_internal_type = GraphQLID
class Float(ScalarType):
_internal_type = GraphQLFloat
@ -1,47 +0,0 @@
from graphql.type import GraphQLArgument
from pytest import raises
from graphene.core.schema import Schema
from graphene.core.types import ObjectType
from ..argument import Argument, to_arguments
from ..scalars import String
def test_argument_internal_type():
class MyObjectType(ObjectType):
schema = Schema(query=MyObjectType)
a = Argument(MyObjectType, description='My argument', default='3')
type = schema.T(a)
assert isinstance(type, GraphQLArgument)
assert type.description == 'My argument'
assert type.default_value == '3'
def test_to_arguments():
arguments = to_arguments(
Argument(String, name='myArg'),
assert [a.name or a.default_name for a in arguments] == [
'myArg', 'otherArg', 'my_kwarg', 'other_kwarg']
def test_to_arguments_no_name():
with raises(AssertionError) as excinfo:
assert 'must have a name' in str(excinfo.value)
def test_to_arguments_wrong_type():
with raises(ValueError) as excinfo:
assert 'Unknown argument p=3' == str(excinfo.value)
@ -1,97 +0,0 @@
from mock import patch
from graphene.core.types import InputObjectType, ObjectType
from ..argument import Argument
from ..base import MountedType, OrderedType
from ..definitions import List, NonNull
from ..field import Field, InputField
def test_orderedtype_equal():
a = OrderedType()
assert a == a
assert hash(a) == hash(a)
def test_orderedtype_different():
a = OrderedType()
b = OrderedType()
assert a != b
assert hash(a) != hash(b)
assert a < b
assert b > a
def test_type_as_field_called(Field):
def resolver(x):
return x
a = MountedType(2, description='A', resolver=resolver)
def test_type_as_argument_called(Argument):
a = MountedType(2, description='A')
a, 2, _creation_counter=a.creation_counter, description='A')
def test_type_as_field():
def resolver(x):
return x
class MyObjectType(ObjectType):
t = MountedType(description='A', resolver=resolver)
fields_map = MyObjectType._meta.fields_map
field = fields_map.get('t')
assert isinstance(field, Field)
assert field.description == 'A'
assert field.object_type == MyObjectType
def test_type_as_inputfield():
class MyObjectType(InputObjectType):
t = MountedType(description='A')
fields_map = MyObjectType._meta.fields_map
field = fields_map.get('t')
assert isinstance(field, InputField)
assert field.description == 'A'
assert field.object_type == MyObjectType
def test_type_as_argument():
a = MountedType(description='A')
argument = a.as_argument()
assert isinstance(argument, Argument)
def test_type_as_list():
m = MountedType(2, 3, my_c='A')
a = m.List
assert isinstance(a, List)
assert a.of_type == m
assert a.args == (2, 3)
assert a.kwargs == {'my_c': 'A'}
def test_type_as_nonnull():
m = MountedType(2, 3, my_c='A')
a = m.NonNull
assert isinstance(a, NonNull)
assert a.of_type == m
assert a.args == (2, 3)
assert a.kwargs == {'my_c': 'A'}
@ -1,26 +0,0 @@
import iso8601
from graphql.language.ast import StringValue
from ..custom_scalars import DateTime
def test_date_time():
test_iso_string = "2016-04-29T18:34:12.502Z"
def check_datetime(test_dt):
assert test_dt.tzinfo == iso8601.UTC
assert test_dt.year == 2016
assert test_dt.month == 4
assert test_dt.day == 29
assert test_dt.hour == 18
assert test_dt.minute == 34
assert test_dt.second == 12
test_dt = DateTime().parse_value(test_iso_string)
assert DateTime.serialize(test_dt) == "2016-04-29T18:34:12.502000+00:00"
node = StringValue(test_iso_string)
test_dt = DateTime.parse_literal(node)
@ -1,27 +0,0 @@
from graphql.type import GraphQLList, GraphQLNonNull, GraphQLString
from graphene.core.schema import Schema
from ..definitions import List, NonNull
from ..scalars import String
schema = Schema()
def test_list_scalar():
type = schema.T(List(String()))
assert isinstance(type, GraphQLList)
assert type.of_type == GraphQLString
def test_nonnull_scalar():
type = schema.T(NonNull(String()))
assert isinstance(type, GraphQLNonNull)
assert type.of_type == GraphQLString
def test_nested_scalars():
type = schema.T(NonNull(List(String())))
assert isinstance(type, GraphQLNonNull)
assert isinstance(type.of_type, GraphQLList)
assert type.of_type.of_type == GraphQLString
@ -1,236 +0,0 @@
from graphql.type import GraphQLField, GraphQLInputObjectField, GraphQLString
from graphene.core.schema import Schema
from graphene.core.types import InputObjectType, ObjectType
from ..base import LazyType
from ..definitions import List
from ..field import Field, InputField
from ..scalars import String
def test_field_internal_type():
def resolver(*args):
return 'RESOLVED'
field = Field(String(), description='My argument', resolver=resolver)
class Query(ObjectType):
my_field = field
schema = Schema(query=Query)
type = schema.T(field)
assert field.name is None
assert field.attname == 'my_field'
assert isinstance(type, GraphQLField)
assert type.description == 'My argument'
assert type.resolver(None, {}, None, None).value == 'RESOLVED'
assert type.type == GraphQLString
def test_field_objectype_resolver():
field = Field(String)
class Query(ObjectType):
my_field = field
def resolve_my_field(self, *args, **kwargs):
'''Custom description'''
return 'RESOLVED'
schema = Schema(query=Query)
type = schema.T(field)
assert isinstance(type, GraphQLField)
assert type.description == 'Custom description'
assert type.resolver(Query(), {}, None, None).value == 'RESOLVED'
def test_field_custom_name():
field = Field(None, name='my_customName')
class MyObjectType(ObjectType):
my_field = field
assert field.name == 'my_customName'
assert field.attname == 'my_field'
def test_field_self():
field = Field('self', name='my_customName')
class MyObjectType(ObjectType):
my_field = field
schema = Schema()
assert schema.T(field).type == schema.T(MyObjectType)
def test_field_eq():
field = Field('self', name='my_customName')
field2 = Field('self', name='my_customName')
assert field == field
assert field2 != field
def test_field_mounted():
field = Field(List('MyObjectType'), name='my_customName')
class MyObjectType(ObjectType):
my_field = field
assert field.parent == MyObjectType
assert field.type.parent == MyObjectType
def test_field_string_reference():
field = Field('MyObjectType', name='my_customName')
class MyObjectType(ObjectType):
my_field = field
schema = Schema(query=MyObjectType)
assert isinstance(field.type, LazyType)
assert schema.T(field.type) == schema.T(MyObjectType)
def test_field_custom_arguments():
field = Field(None, name='my_customName', p=String())
schema = Schema()
args = field.arguments
assert 'p' in schema.T(args)
def test_field_name_as_argument():
field = Field(None, name=String())
schema = Schema()
args = field.arguments
assert 'name' in schema.T(args)
def test_inputfield_internal_type():
field = InputField(String, description='My input field', default='3')
class MyObjectType(InputObjectType):
my_field = field
class Query(ObjectType):
input_ot = Field(MyObjectType)
schema = Schema(query=MyObjectType)
type = schema.T(field)
assert field.name is None
assert field.attname == 'my_field'
assert isinstance(type, GraphQLInputObjectField)
assert type.description == 'My input field'
assert type.default_value == '3'
def test_inputfield_string_reference():
class MyInput(InputObjectType):
my_field = InputField(String, description='My input field', default='3')
my_input_field = InputField('MyInput')
class OtherInput(InputObjectType):
my_input = my_input_field
class Query(ObjectType):
a = String()
schema = Schema(query=Query)
my_input_type = schema.T(MyInput)
my_input_field_type = schema.T(my_input_field)
assert my_input_field_type.type == my_input_type
def test_field_resolve_argument():
def resolver(instance, args, info):
return args.get('first_name')
field = Field(String(), first_name=String(), description='My argument', resolver=resolver)
class Query(ObjectType):
my_field = field
schema = Schema(query=Query)
type = schema.T(field)
assert type.resolver(None, {'firstName': 'Peter'}, None, None).value == 'Peter'
def test_field_resolve_vars():
class Query(ObjectType):
hello = String(first_name=String())
def resolve_hello(self, args, info):
return 'Hello ' + args.get('first_name')
schema = Schema(query=Query)
result = schema.execute("""
query foo($firstName:String)
""", variable_values={"firstName": "Serkan"})
expected = {
'hello': 'Hello Serkan'
assert result.data == expected
def test_field_internal_type_deprecated():
deprecation_reason = 'No more used'
field = Field(String(), description='My argument',
class Query(ObjectType):
my_field = field
schema = Schema(query=Query)
type = schema.T(field)
assert isinstance(type, GraphQLField)
assert type.deprecation_reason == deprecation_reason
def test_field_resolve_object():
class Root(object):
att = True
def att_func():
return True
field = Field(String(), description='My argument')
field_func = Field(String(), description='My argument')
class Query(ObjectType):
att = field
att_func = field_func
assert field.resolver(Root, {}, None) is True
def test_field_resolve_source_object():
class Root(object):
att_source = True
def att_func_source():
return True
field = Field(String(), source='att_source', description='My argument')
field_func = Field(String(), source='att_func_source', description='My argument')
class Query(ObjectType):
att = field
att_func = field_func
assert field.resolver(Root, {}, None) is True
@ -1,28 +0,0 @@
from graphql.type import (GraphQLBoolean, GraphQLFloat, GraphQLID, GraphQLInt,
from graphene.core.schema import Schema
from ..scalars import ID, Boolean, Float, Int, String
schema = Schema()
def test_string_scalar():
assert schema.T(String()) == GraphQLString
def test_int_scalar():
assert schema.T(Int()) == GraphQLInt
def test_boolean_scalar():
assert schema.T(Boolean()) == GraphQLBoolean
def test_id_scalar():
assert schema.T(ID()) == GraphQLID
def test_float_scalar():
assert schema.T(Float()) == GraphQLFloat
@ -1,6 +0,0 @@
from .base import MiddlewareManager
from .camel_case import CamelCaseArgsMiddleware
__all__ = [
'MiddlewareManager', 'CamelCaseArgsMiddleware'
@ -1,23 +0,0 @@
from ..utils import promise_middleware
class MiddlewareManager(object):
def __init__(self, schema, middlewares=None):
self.schema = schema
self.middlewares = middlewares or []
def add_middleware(self, middleware):
def get_middleware_resolvers(self):
for middleware in self.middlewares:
if not hasattr(middleware, MIDDLEWARE_RESOLVER_FUNCTION):
yield getattr(middleware, MIDDLEWARE_RESOLVER_FUNCTION)
def wrap(self, resolver):
middleware_resolvers = self.get_middleware_resolvers()
return promise_middleware(resolver, middleware_resolvers)
@ -1,8 +0,0 @@
from ..utils import ProxySnakeDict
class CamelCaseArgsMiddleware(object):
def resolve(self, next, root, args, context, info):
args = ProxySnakeDict(args)
return next(root, args, context, info)
@ -1,18 +0,0 @@
from .fields import (
from .types import (
from .utils import is_node
__all__ = ['ConnectionField', 'NodeField', 'GlobalIDField', 'Node',
'PageInfo', 'Edge', 'Connection', 'ClientIDMutation', 'is_node']
Some files were not shown because too many files have changed in this diff Show More
Reference in New Issue
Block a user