First 1.0 with a separated Django version

This commit is contained in:
Syrus Akbary 2016-06-17 09:29:38 -07:00
parent 80f98c5fd3
commit feaa09616d
76 changed files with 2614 additions and 1 deletions

View File

@ -0,0 +1,14 @@
SECRET_KEY = 1
INSTALLED_APPS = [
'graphene_django',
'graphene_django.tests',
'examples.starwars',
]
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': 'django_test.sqlite',
}
}

View File

@ -0,0 +1,10 @@
from .types import (
DjangoObjectType,
DjangoNode
)
from .fields import (
DjangoConnectionField,
)
__all__ = ['DjangoObjectType', 'DjangoNode',
'DjangoConnectionField']

View File

@ -0,0 +1,24 @@
from django.db import models
class MissingType(object):
pass
try:
UUIDField = models.UUIDField
except AttributeError:
# Improved compatibility for Django 1.6
UUIDField = MissingType
try:
from django.db.models.related import RelatedObject
except:
# Improved compatibility for Django 1.6
RelatedObject = MissingType
try:
# Postgres fields are only available in Django 1.8+
from django.contrib.postgres.fields import ArrayField, HStoreField, JSONField, RangeField
except ImportError:
ArrayField, HStoreField, JSONField, RangeField = (MissingType, ) * 4

View File

@ -0,0 +1,158 @@
from django.db import models
from django.utils.encoding import force_text
from graphene import Enum, List, ID, Boolean, Float, Int, String, Field
from graphene.utils.str_converters import to_const
from graphene.relay import Node, ConnectionField
# from ...core.types.custom_scalars import DateTime, JSONString
from .compat import (ArrayField, HStoreField, JSONField, RangeField,
RelatedObject, UUIDField)
from .utils import get_related_model, import_single_dispatch
from .fields import DjangoConnectionField
singledispatch = import_single_dispatch()
class Registry(object):
def __init__(self):
self._registry = {}
self._registry_models = {}
def register(self, cls):
from .types import DjangoObjectType
print(cls.get_registry(), self)
assert issubclass(cls, DjangoObjectType), 'Only DjangoObjectTypes can be registered, received "{}"'.format(cls.__name__)
assert cls.get_registry() == self, 'Registry for a Model have to match.'
self._registry[cls._meta.model] = cls
def get_type_for_model(self, model):
return self._registry.get(model)
def convert_choices(choices):
for value, name in choices:
if isinstance(name, (tuple, list)):
for choice in convert_choices(name):
yield choice
else:
yield to_const(force_text(name)), value
def convert_django_field_with_choices(field, registry=None):
choices = getattr(field, 'choices', None)
if choices:
meta = field.model._meta
name = '{}_{}_{}'.format(meta.app_label, meta.object_name, field.name)
graphql_choices = list(convert_choices(choices))
return Enum(name.upper(), graphql_choices, description=field.help_text)
return convert_django_field(field, registry)
@singledispatch
def convert_django_field(field, registry=None):
raise Exception(
"Don't know how to convert the Django field %s (%s)" %
(field, field.__class__))
@convert_django_field.register(models.CharField)
@convert_django_field.register(models.TextField)
@convert_django_field.register(models.EmailField)
@convert_django_field.register(models.SlugField)
@convert_django_field.register(models.URLField)
@convert_django_field.register(models.GenericIPAddressField)
@convert_django_field.register(models.FileField)
@convert_django_field.register(UUIDField)
def convert_field_to_string(field, registry=None):
return String(description=field.help_text)
@convert_django_field.register(models.AutoField)
def convert_field_to_id(field, registry=None):
return ID(description=field.help_text)
@convert_django_field.register(models.PositiveIntegerField)
@convert_django_field.register(models.PositiveSmallIntegerField)
@convert_django_field.register(models.SmallIntegerField)
@convert_django_field.register(models.BigIntegerField)
@convert_django_field.register(models.IntegerField)
def convert_field_to_int(field, registry=None):
return Int(description=field.help_text)
@convert_django_field.register(models.BooleanField)
def convert_field_to_boolean(field, registry=None):
return Boolean(description=field.help_text, required=True)
@convert_django_field.register(models.NullBooleanField)
def convert_field_to_nullboolean(field, registry=None):
return Boolean(description=field.help_text)
@convert_django_field.register(models.DecimalField)
@convert_django_field.register(models.FloatField)
def convert_field_to_float(field, registry=None):
return Float(description=field.help_text)
@convert_django_field.register(models.DateField)
def convert_date_to_string(field, registry=None):
return DateTime(description=field.help_text)
@convert_django_field.register(models.OneToOneRel)
def convert_onetoone_field_to_djangomodel(field, registry=None):
model = get_related_model(field)
return Field(registry.get_type_for_model(model))
@convert_django_field.register(models.ManyToManyField)
@convert_django_field.register(models.ManyToManyRel)
@convert_django_field.register(models.ManyToOneRel)
def convert_field_to_list_or_connection(field, registry=None):
model = get_related_model(field)
_type = registry.get_type_for_model(model)
if not _type:
return
if issubclass(_type, Node):
return DjangoConnectionField(_type)
return Field(List(_type))
# For Django 1.6
@convert_django_field.register(RelatedObject)
def convert_relatedfield_to_djangomodel(field, registry=None):
model = field.model
_type = registry.get_type_for_model(model)
if issubclass(_type, Node):
return DjangoConnectionField(_type)
return Field(List(_type))
@convert_django_field.register(models.OneToOneField)
@convert_django_field.register(models.ForeignKey)
def convert_field_to_djangomodel(field, registry=None):
model = get_related_model(field)
_type = registry.get_type_for_model(model)
return Field(_type, description=field.help_text)
@convert_django_field.register(ArrayField)
def convert_postgres_array_to_list(field, registry=None):
base_type = convert_django_field(field.base_field)
return List(base_type, description=field.help_text)
@convert_django_field.register(HStoreField)
@convert_django_field.register(JSONField)
def convert_posgres_field_to_string(field, registry=None):
return JSONString(description=field.help_text)
@convert_django_field.register(RangeField)
def convert_posgres_range_to_string(field, registry=None):
inner_type = convert_django_field(field.base_field)
return List(inner_type, description=field.help_text)

View File

@ -0,0 +1,4 @@
from .middleware import DjangoDebugMiddleware
from .types import DjangoDebug
__all__ = ['DjangoDebugMiddleware', 'DjangoDebug']

View File

@ -0,0 +1,56 @@
from promise import Promise
from django.db import connections
from .sql.tracking import unwrap_cursor, wrap_cursor
from .types import DjangoDebug
class DjangoDebugContext(object):
def __init__(self):
self.debug_promise = None
self.promises = []
self.enable_instrumentation()
self.object = DjangoDebug(sql=[])
def get_debug_promise(self):
if not self.debug_promise:
self.debug_promise = Promise.all(self.promises)
return self.debug_promise.then(self.on_resolve_all_promises)
def on_resolve_all_promises(self, values):
self.disable_instrumentation()
return self.object
def add_promise(self, promise):
if self.debug_promise and not self.debug_promise.is_fulfilled:
self.promises.append(promise)
def enable_instrumentation(self):
# This is thread-safe because database connections are thread-local.
for connection in connections.all():
wrap_cursor(connection, self)
def disable_instrumentation(self):
for connection in connections.all():
unwrap_cursor(connection)
class DjangoDebugMiddleware(object):
def resolve(self, next, root, args, context, info):
django_debug = getattr(context, 'django_debug', None)
if not django_debug:
if context is None:
raise Exception('DjangoDebug cannot be executed in None contexts')
try:
context.django_debug = DjangoDebugContext()
except Exception:
raise Exception('DjangoDebug need the context to be writable, context received: {}.'.format(
context.__class__.__name__
))
if info.schema.graphene_schema.T(DjangoDebug) == info.return_type:
return context.django_debug.get_debug_promise()
promise = next(root, args, context, info)
context.django_debug.add_promise(promise)
return promise

View File

@ -0,0 +1,170 @@
# Code obtained from django-debug-toolbar sql panel tracking
from __future__ import absolute_import, unicode_literals
import json
from threading import local
from time import time
from django.utils import six
from django.utils.encoding import force_text
from .types import DjangoDebugSQL, DjangoDebugPostgreSQL
class SQLQueryTriggered(Exception):
"""Thrown when template panel triggers a query"""
class ThreadLocalState(local):
def __init__(self):
self.enabled = True
@property
def Wrapper(self):
if self.enabled:
return NormalCursorWrapper
return ExceptionCursorWrapper
def recording(self, v):
self.enabled = v
state = ThreadLocalState()
recording = state.recording # export function
def wrap_cursor(connection, panel):
if not hasattr(connection, '_graphene_cursor'):
connection._graphene_cursor = connection.cursor
def cursor():
return state.Wrapper(connection._graphene_cursor(), connection, panel)
connection.cursor = cursor
return cursor
def unwrap_cursor(connection):
if hasattr(connection, '_graphene_cursor'):
previous_cursor = connection._graphene_cursor
connection.cursor = previous_cursor
del connection._graphene_cursor
class ExceptionCursorWrapper(object):
"""
Wraps a cursor and raises an exception on any operation.
Used in Templates panel.
"""
def __init__(self, cursor, db, logger):
pass
def __getattr__(self, attr):
raise SQLQueryTriggered()
class NormalCursorWrapper(object):
"""
Wraps a cursor and logs queries.
"""
def __init__(self, cursor, db, logger):
self.cursor = cursor
# Instance of a BaseDatabaseWrapper subclass
self.db = db
# logger must implement a ``record`` method
self.logger = logger
def _quote_expr(self, element):
if isinstance(element, six.string_types):
return "'%s'" % force_text(element).replace("'", "''")
else:
return repr(element)
def _quote_params(self, params):
if not params:
return params
if isinstance(params, dict):
return dict((key, self._quote_expr(value))
for key, value in params.items())
return list(map(self._quote_expr, params))
def _decode(self, param):
try:
return force_text(param, strings_only=True)
except UnicodeDecodeError:
return '(encoded string)'
def _record(self, method, sql, params):
start_time = time()
try:
return method(sql, params)
finally:
stop_time = time()
duration = (stop_time - start_time)
_params = ''
try:
_params = json.dumps(list(map(self._decode, params)))
except Exception:
pass # object not JSON serializable
alias = getattr(self.db, 'alias', 'default')
conn = self.db.connection
vendor = getattr(conn, 'vendor', 'unknown')
params = {
'vendor': vendor,
'alias': alias,
'sql': self.db.ops.last_executed_query(
self.cursor, sql, self._quote_params(params)),
'duration': duration,
'raw_sql': sql,
'params': _params,
'start_time': start_time,
'stop_time': stop_time,
'is_slow': duration > 10,
'is_select': sql.lower().strip().startswith('select'),
}
if vendor == 'postgresql':
# If an erroneous query was ran on the connection, it might
# be in a state where checking isolation_level raises an
# exception.
try:
iso_level = conn.isolation_level
except conn.InternalError:
iso_level = 'unknown'
params.update({
'trans_id': self.logger.get_transaction_id(alias),
'trans_status': conn.get_transaction_status(),
'iso_level': iso_level,
'encoding': conn.encoding,
})
_sql = DjangoDebugPostgreSQL(**params)
else:
_sql = DjangoDebugSQL(**params)
# We keep `sql` to maintain backwards compatibility
self.logger.object.sql.append(_sql)
def callproc(self, procname, params=()):
return self._record(self.cursor.callproc, procname, params)
def execute(self, sql, params=()):
return self._record(self.cursor.execute, sql, params)
def executemany(self, sql, param_list):
return self._record(self.cursor.executemany, sql, param_list)
def __getattr__(self, attr):
return getattr(self.cursor, attr)
def __iter__(self):
return iter(self.cursor)
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()

View File

@ -0,0 +1,25 @@
from .....core import Boolean, Float, ObjectType, String
class DjangoDebugBaseSQL(ObjectType):
vendor = String()
alias = String()
sql = String()
duration = Float()
raw_sql = String()
params = String()
start_time = Float()
stop_time = Float()
is_slow = Boolean()
is_select = Boolean()
class DjangoDebugSQL(DjangoDebugBaseSQL):
pass
class DjangoDebugPostgreSQL(DjangoDebugBaseSQL):
trans_id = String()
trans_status = String()
iso_level = String()
encoding = String()

View File

@ -0,0 +1,219 @@
import pytest
import graphene
from graphene.contrib.django import DjangoConnectionField, DjangoNode
from graphene.contrib.django.utils import DJANGO_FILTER_INSTALLED
from ...tests.models import Reporter
from ..middleware import DjangoDebugMiddleware
from ..types import DjangoDebug
class context(object):
pass
# from examples.starwars_django.models import Character
pytestmark = pytest.mark.django_db
def test_should_query_field():
r1 = Reporter(last_name='ABA')
r1.save()
r2 = Reporter(last_name='Griffin')
r2.save()
class ReporterType(DjangoNode):
class Meta:
model = Reporter
class Query(graphene.ObjectType):
reporter = graphene.Field(ReporterType)
debug = graphene.Field(DjangoDebug, name='__debug')
def resolve_reporter(self, *args, **kwargs):
return Reporter.objects.first()
query = '''
query ReporterQuery {
reporter {
lastName
}
__debug {
sql {
rawSql
}
}
}
'''
expected = {
'reporter': {
'lastName': 'ABA',
},
'__debug': {
'sql': [{
'rawSql': str(Reporter.objects.order_by('pk')[:1].query)
}]
}
}
schema = graphene.Schema(query=Query, middlewares=[DjangoDebugMiddleware()])
result = schema.execute(query, context_value=context())
assert not result.errors
assert result.data == expected
def test_should_query_list():
r1 = Reporter(last_name='ABA')
r1.save()
r2 = Reporter(last_name='Griffin')
r2.save()
class ReporterType(DjangoNode):
class Meta:
model = Reporter
class Query(graphene.ObjectType):
all_reporters = ReporterType.List()
debug = graphene.Field(DjangoDebug, name='__debug')
def resolve_all_reporters(self, *args, **kwargs):
return Reporter.objects.all()
query = '''
query ReporterQuery {
allReporters {
lastName
}
__debug {
sql {
rawSql
}
}
}
'''
expected = {
'allReporters': [{
'lastName': 'ABA',
}, {
'lastName': 'Griffin',
}],
'__debug': {
'sql': [{
'rawSql': str(Reporter.objects.all().query)
}]
}
}
schema = graphene.Schema(query=Query, middlewares=[DjangoDebugMiddleware()])
result = schema.execute(query, context_value=context())
assert not result.errors
assert result.data == expected
def test_should_query_connection():
r1 = Reporter(last_name='ABA')
r1.save()
r2 = Reporter(last_name='Griffin')
r2.save()
class ReporterType(DjangoNode):
class Meta:
model = Reporter
class Query(graphene.ObjectType):
all_reporters = DjangoConnectionField(ReporterType)
debug = graphene.Field(DjangoDebug, name='__debug')
def resolve_all_reporters(self, *args, **kwargs):
return Reporter.objects.all()
query = '''
query ReporterQuery {
allReporters(first:1) {
edges {
node {
lastName
}
}
}
__debug {
sql {
rawSql
}
}
}
'''
expected = {
'allReporters': {
'edges': [{
'node': {
'lastName': 'ABA',
}
}]
},
}
schema = graphene.Schema(query=Query, middlewares=[DjangoDebugMiddleware()])
result = schema.execute(query, context_value=context())
assert not result.errors
assert result.data['allReporters'] == expected['allReporters']
assert 'COUNT' in result.data['__debug']['sql'][0]['rawSql']
query = str(Reporter.objects.all()[:1].query)
assert result.data['__debug']['sql'][1]['rawSql'] == query
@pytest.mark.skipif(not DJANGO_FILTER_INSTALLED,
reason="requires django-filter")
def test_should_query_connectionfilter():
from graphene.contrib.django.filter import DjangoFilterConnectionField
r1 = Reporter(last_name='ABA')
r1.save()
r2 = Reporter(last_name='Griffin')
r2.save()
class ReporterType(DjangoNode):
class Meta:
model = Reporter
class Query(graphene.ObjectType):
all_reporters = DjangoFilterConnectionField(ReporterType)
debug = graphene.Field(DjangoDebug, name='__debug')
def resolve_all_reporters(self, *args, **kwargs):
return Reporter.objects.all()
query = '''
query ReporterQuery {
allReporters(first:1) {
edges {
node {
lastName
}
}
}
__debug {
sql {
rawSql
}
}
}
'''
expected = {
'allReporters': {
'edges': [{
'node': {
'lastName': 'ABA',
}
}]
},
}
schema = graphene.Schema(query=Query, middlewares=[DjangoDebugMiddleware()])
result = schema.execute(query, context_value=context())
assert not result.errors
assert result.data['allReporters'] == expected['allReporters']
assert 'COUNT' in result.data['__debug']['sql'][0]['rawSql']
query = str(Reporter.objects.all()[:1].query)
assert result.data['__debug']['sql'][1]['rawSql'] == query

View File

@ -0,0 +1,7 @@
from ....core.classtypes.objecttype import ObjectType
from ....core.types import Field
from .sql.types import DjangoDebugBaseSQL
class DjangoDebug(ObjectType):
sql = Field(DjangoDebugBaseSQL.List())

View File

@ -0,0 +1,64 @@
# from ...core.exceptions import SkipField
from graphene import Field, List
from graphene.relay import ConnectionField
from .utils import DJANGO_FILTER_INSTALLED, get_type_for_model, maybe_queryset
class DjangoConnectionField(ConnectionField):
def __init__(self, *args, **kwargs):
self.on = kwargs.pop('on', False)
# kwargs['default'] = kwargs.pop('default', self.get_manager)
return super(DjangoConnectionField, self).__init__(*args, **kwargs)
@property
def model(self):
return self.type._meta.model
def get_manager(self):
if self.on:
return getattr(self.model, self.on)
else:
return self.model._default_manager
def get_queryset(self, resolved_qs, args, info):
return resolved_qs
def from_list(self, connection_type, resolved, args, context, info):
resolved_qs = maybe_queryset(resolved)
qs = self.get_queryset(resolved_qs, args, info)
return super(DjangoConnectionField, self).from_list(connection_type, qs, args, context, info)
def get_list_or_connection_type_for_model(model):
pass
# field_object_type = model_field.get_object_type(schema)
# if not field_object_type:
# raise SkipField()
# if isinstance(:
# if field_object_type._meta.filter_fields:
# field = DjangoFilterConnectionField(field_object_type)
# else:
# field = DjangoConnectionField(field_object_type)
# else:
# field = List(field_object_type)
# field.contribute_to_class(self.object_type, self.attname)
# return schema.T(field)
def get_graphene_type_from_model(model):
pass
# _type = self.get_object_type(schema)
# if not _type and self.parent._meta.only_fields:
# raise Exception(
# "Model %r is not accessible by the schema. "
# "You can either register the type manually "
# "using @schema.register. "
# "Or disable the field in %s" % (
# self.model,
# self.parent,
# )
# )
# if not _type:
# raise SkipField()
# return schema.T(_type)

View File

@ -0,0 +1,14 @@
import warnings
from graphene.contrib.django.utils import DJANGO_FILTER_INSTALLED
if not DJANGO_FILTER_INSTALLED:
warnings.warn(
"Use of django filtering requires the django-filter package "
"be installed. You can do so using `pip install django-filter`", ImportWarning
)
else:
from .fields import DjangoFilterConnectionField
from .filterset import GrapheneFilterSet, GlobalIDFilter, GlobalIDMultipleChoiceFilter
__all__ = ['DjangoFilterConnectionField', 'GrapheneFilterSet',
'GlobalIDFilter', 'GlobalIDMultipleChoiceFilter']

View File

@ -0,0 +1,36 @@
from ..fields import DjangoConnectionField
from .utils import get_filtering_args_from_filterset, get_filterset_class
class DjangoFilterConnectionField(DjangoConnectionField):
def __init__(self, type, fields=None, order_by=None,
extra_filter_meta=None, filterset_class=None,
*args, **kwargs):
self.order_by = order_by or type._meta.filter_order_by
self.fields = fields or type._meta.filter_fields
meta = dict(model=type._meta.model,
fields=self.fields,
order_by=self.order_by)
if extra_filter_meta:
meta.update(extra_filter_meta)
self.filterset_class = get_filterset_class(filterset_class, **meta)
self.filtering_args = get_filtering_args_from_filterset(self.filterset_class, type)
kwargs.setdefault('args', {})
kwargs['args'].update(**self.filtering_args)
super(DjangoFilterConnectionField, self).__init__(type, *args, **kwargs)
def get_queryset(self, qs, args, info):
filterset_class = self.filterset_class
filter_kwargs = self.get_filter_kwargs(args)
order = self.get_order(args)
if order:
qs = qs.order_by(order)
return filterset_class(data=filter_kwargs, queryset=qs)
def get_filter_kwargs(self, args):
return {k: v for k, v in args.items() if k in self.filtering_args}
def get_order(self, args):
return args.get('order_by', None)

View File

@ -0,0 +1,116 @@
import six
from django.conf import settings
from django.db import models
from django.utils.text import capfirst
from django_filters import Filter, MultipleChoiceFilter
from django_filters.filterset import FilterSet, FilterSetMetaclass
from graphene.contrib.django.forms import (GlobalIDFormField,
GlobalIDMultipleChoiceField)
from graphql_relay.node.node import from_global_id
class GlobalIDFilter(Filter):
field_class = GlobalIDFormField
def filter(self, qs, value):
_type, _id = from_global_id(value)
return super(GlobalIDFilter, self).filter(qs, _id)
class GlobalIDMultipleChoiceFilter(MultipleChoiceFilter):
field_class = GlobalIDMultipleChoiceField
def filter(self, qs, value):
gids = [from_global_id(v)[1] for v in value]
return super(GlobalIDMultipleChoiceFilter, self).filter(qs, gids)
ORDER_BY_FIELD = getattr(settings, 'GRAPHENE_ORDER_BY_FIELD', 'order_by')
GRAPHENE_FILTER_SET_OVERRIDES = {
models.AutoField: {
'filter_class': GlobalIDFilter,
},
models.OneToOneField: {
'filter_class': GlobalIDFilter,
},
models.ForeignKey: {
'filter_class': GlobalIDFilter,
},
models.ManyToManyField: {
'filter_class': GlobalIDMultipleChoiceFilter,
}
}
class GrapheneFilterSetMetaclass(FilterSetMetaclass):
def __new__(cls, name, bases, attrs):
new_class = super(GrapheneFilterSetMetaclass, cls).__new__(cls, name, bases, attrs)
# Customise the filter_overrides for Graphene
for k, v in GRAPHENE_FILTER_SET_OVERRIDES.items():
new_class.filter_overrides.setdefault(k, v)
return new_class
class GrapheneFilterSetMixin(object):
order_by_field = ORDER_BY_FIELD
@classmethod
def filter_for_reverse_field(cls, f, name):
"""Handles retrieving filters for reverse relationships
We override the default implementation so that we can handle
Global IDs (the default implementation expects database
primary keys)
"""
rel = f.field.rel
default = {
'name': name,
'label': capfirst(rel.related_name)
}
if rel.multiple:
# For to-many relationships
return GlobalIDMultipleChoiceFilter(**default)
else:
# For to-one relationships
return GlobalIDFilter(**default)
class GrapheneFilterSet(six.with_metaclass(GrapheneFilterSetMetaclass, GrapheneFilterSetMixin, FilterSet)):
""" Base class for FilterSets used by Graphene
You shouldn't usually need to use this class. The
DjangoFilterConnectionField will wrap FilterSets with this class as
necessary
"""
def setup_filterset(filterset_class):
""" Wrap a provided filterset in Graphene-specific functionality
"""
return type(
'Graphene{}'.format(filterset_class.__name__),
(six.with_metaclass(GrapheneFilterSetMetaclass, GrapheneFilterSetMixin, filterset_class),),
{},
)
def custom_filterset_factory(model, filterset_base_class=GrapheneFilterSet,
**meta):
""" Create a filterset for the given model using the provided meta data
"""
meta.update({
'model': model,
})
meta_class = type(str('Meta'), (object,), meta)
filterset = type(
str('%sFilterSet' % model._meta.object_name),
(filterset_base_class,),
{
'Meta': meta_class
}
)
return filterset

View File

@ -0,0 +1,31 @@
import django_filters
from graphene.contrib.django.tests.models import Article, Pet, Reporter
class ArticleFilter(django_filters.FilterSet):
class Meta:
model = Article
fields = {
'headline': ['exact', 'icontains'],
'pub_date': ['gt', 'lt', 'exact'],
'reporter': ['exact'],
}
order_by = True
class ReporterFilter(django_filters.FilterSet):
class Meta:
model = Reporter
fields = ['first_name', 'last_name', 'email', 'pets']
order_by = False
class PetFilter(django_filters.FilterSet):
class Meta:
model = Pet
fields = ['name']
order_by = False

View File

@ -0,0 +1,287 @@
from datetime import datetime
import pytest
from graphene import ObjectType, Schema
from graphene.contrib.django import DjangoNode
from graphene.contrib.django.forms import (GlobalIDFormField,
GlobalIDMultipleChoiceField)
from graphene.contrib.django.tests.models import Article, Pet, Reporter
from graphene.contrib.django.utils import DJANGO_FILTER_INSTALLED
from graphene.relay import NodeField
pytestmark = []
if DJANGO_FILTER_INSTALLED:
import django_filters
from graphene.contrib.django.filter import (GlobalIDFilter, DjangoFilterConnectionField,
GlobalIDMultipleChoiceFilter)
from graphene.contrib.django.filter.tests.filters import ArticleFilter, PetFilter
else:
pytestmark.append(pytest.mark.skipif(True, reason='django_filters not installed'))
pytestmark.append(pytest.mark.django_db)
class ArticleNode(DjangoNode):
class Meta:
model = Article
class ReporterNode(DjangoNode):
class Meta:
model = Reporter
class PetNode(DjangoNode):
class Meta:
model = Pet
schema = Schema()
def assert_arguments(field, *arguments):
ignore = ('after', 'before', 'first', 'last', 'orderBy')
actual = [
name
for name in schema.T(field.arguments)
if name not in ignore and not name.startswith('_')
]
assert set(arguments) == set(actual), \
'Expected arguments ({}) did not match actual ({})'.format(
arguments,
actual
)
def assert_orderable(field):
assert 'orderBy' in schema.T(field.arguments), \
'Field cannot be ordered'
def assert_not_orderable(field):
assert 'orderBy' not in schema.T(field.arguments), \
'Field can be ordered'
def test_filter_explicit_filterset_arguments():
field = DjangoFilterConnectionField(ArticleNode, filterset_class=ArticleFilter)
assert_arguments(field,
'headline', 'headline_Icontains',
'pubDate', 'pubDate_Gt', 'pubDate_Lt',
'reporter',
)
def test_filter_shortcut_filterset_arguments_list():
field = DjangoFilterConnectionField(ArticleNode, fields=['pub_date', 'reporter'])
assert_arguments(field,
'pubDate',
'reporter',
)
def test_filter_shortcut_filterset_arguments_dict():
field = DjangoFilterConnectionField(ArticleNode, fields={
'headline': ['exact', 'icontains'],
'reporter': ['exact'],
})
assert_arguments(field,
'headline', 'headline_Icontains',
'reporter',
)
def test_filter_explicit_filterset_orderable():
field = DjangoFilterConnectionField(ArticleNode, filterset_class=ArticleFilter)
assert_orderable(field)
def test_filter_shortcut_filterset_orderable_true():
field = DjangoFilterConnectionField(ArticleNode, order_by=True)
assert_orderable(field)
def test_filter_shortcut_filterset_orderable_headline():
field = DjangoFilterConnectionField(ArticleNode, order_by=['headline'])
assert_orderable(field)
def test_filter_explicit_filterset_not_orderable():
field = DjangoFilterConnectionField(PetNode, filterset_class=PetFilter)
assert_not_orderable(field)
def test_filter_shortcut_filterset_extra_meta():
field = DjangoFilterConnectionField(ArticleNode, extra_filter_meta={
'order_by': True
})
assert_orderable(field)
def test_filter_filterset_information_on_meta():
class ReporterFilterNode(DjangoNode):
class Meta:
model = Reporter
filter_fields = ['first_name', 'articles']
filter_order_by = True
field = DjangoFilterConnectionField(ReporterFilterNode)
assert_arguments(field, 'firstName', 'articles')
assert_orderable(field)
def test_filter_filterset_information_on_meta_related():
class ReporterFilterNode(DjangoNode):
class Meta:
model = Reporter
filter_fields = ['first_name', 'articles']
filter_order_by = True
class ArticleFilterNode(DjangoNode):
class Meta:
model = Article
filter_fields = ['headline', 'reporter']
filter_order_by = True
class Query(ObjectType):
all_reporters = DjangoFilterConnectionField(ReporterFilterNode)
all_articles = DjangoFilterConnectionField(ArticleFilterNode)
reporter = NodeField(ReporterFilterNode)
article = NodeField(ArticleFilterNode)
schema = Schema(query=Query)
schema.schema # Trigger the schema loading
articles_field = schema.get_type('ReporterFilterNode')._meta.fields_map['articles']
assert_arguments(articles_field, 'headline', 'reporter')
assert_orderable(articles_field)
def test_filter_filterset_related_results():
class ReporterFilterNode(DjangoNode):
class Meta:
model = Reporter
filter_fields = ['first_name', 'articles']
filter_order_by = True
class ArticleFilterNode(DjangoNode):
class Meta:
model = Article
filter_fields = ['headline', 'reporter']
filter_order_by = True
class Query(ObjectType):
all_reporters = DjangoFilterConnectionField(ReporterFilterNode)
all_articles = DjangoFilterConnectionField(ArticleFilterNode)
reporter = NodeField(ReporterFilterNode)
article = NodeField(ArticleFilterNode)
r1 = Reporter.objects.create(first_name='r1', last_name='r1', email='r1@test.com')
r2 = Reporter.objects.create(first_name='r2', last_name='r2', email='r2@test.com')
Article.objects.create(headline='a1', pub_date=datetime.now(), reporter=r1)
Article.objects.create(headline='a2', pub_date=datetime.now(), reporter=r2)
query = '''
query {
allReporters {
edges {
node {
articles {
edges {
node {
headline
}
}
}
}
}
}
}
'''
schema = Schema(query=Query)
result = schema.execute(query)
assert not result.errors
# We should only get back a single article for each reporter
assert len(result.data['allReporters']['edges'][0]['node']['articles']['edges']) == 1
assert len(result.data['allReporters']['edges'][1]['node']['articles']['edges']) == 1
def test_global_id_field_implicit():
field = DjangoFilterConnectionField(ArticleNode, fields=['id'])
filterset_class = field.filterset_class
id_filter = filterset_class.base_filters['id']
assert isinstance(id_filter, GlobalIDFilter)
assert id_filter.field_class == GlobalIDFormField
def test_global_id_field_explicit():
class ArticleIdFilter(django_filters.FilterSet):
class Meta:
model = Article
fields = ['id']
field = DjangoFilterConnectionField(ArticleNode, filterset_class=ArticleIdFilter)
filterset_class = field.filterset_class
id_filter = filterset_class.base_filters['id']
assert isinstance(id_filter, GlobalIDFilter)
assert id_filter.field_class == GlobalIDFormField
def test_global_id_field_relation():
field = DjangoFilterConnectionField(ArticleNode, fields=['reporter'])
filterset_class = field.filterset_class
id_filter = filterset_class.base_filters['reporter']
assert isinstance(id_filter, GlobalIDFilter)
assert id_filter.field_class == GlobalIDFormField
def test_global_id_multiple_field_implicit():
field = DjangoFilterConnectionField(ReporterNode, fields=['pets'])
filterset_class = field.filterset_class
multiple_filter = filterset_class.base_filters['pets']
assert isinstance(multiple_filter, GlobalIDMultipleChoiceFilter)
assert multiple_filter.field_class == GlobalIDMultipleChoiceField
def test_global_id_multiple_field_explicit():
class ReporterPetsFilter(django_filters.FilterSet):
class Meta:
model = Reporter
fields = ['pets']
field = DjangoFilterConnectionField(ReporterNode, filterset_class=ReporterPetsFilter)
filterset_class = field.filterset_class
multiple_filter = filterset_class.base_filters['pets']
assert isinstance(multiple_filter, GlobalIDMultipleChoiceFilter)
assert multiple_filter.field_class == GlobalIDMultipleChoiceField
def test_global_id_multiple_field_implicit_reverse():
field = DjangoFilterConnectionField(ReporterNode, fields=['articles'])
filterset_class = field.filterset_class
multiple_filter = filterset_class.base_filters['articles']
assert isinstance(multiple_filter, GlobalIDMultipleChoiceFilter)
assert multiple_filter.field_class == GlobalIDMultipleChoiceField
def test_global_id_multiple_field_explicit_reverse():
class ReporterPetsFilter(django_filters.FilterSet):
class Meta:
model = Reporter
fields = ['articles']
field = DjangoFilterConnectionField(ReporterNode, filterset_class=ReporterPetsFilter)
filterset_class = field.filterset_class
multiple_filter = filterset_class.base_filters['articles']
assert isinstance(multiple_filter, GlobalIDMultipleChoiceFilter)
assert multiple_filter.field_class == GlobalIDMultipleChoiceField

View File

@ -0,0 +1,31 @@
import six
from ....core.types import Argument, String
from .filterset import custom_filterset_factory, setup_filterset
def get_filtering_args_from_filterset(filterset_class, type):
""" Inspect a FilterSet and produce the arguments to pass to
a Graphene Field. These arguments will be available to
filter against in the GraphQL
"""
from graphene.contrib.django.form_converter import convert_form_field
args = {}
for name, filter_field in six.iteritems(filterset_class.base_filters):
field_type = Argument(convert_form_field(filter_field.field))
args[name] = field_type
# Also add the 'order_by' field
if filterset_class._meta.order_by:
args[filterset_class.order_by_field] = Argument(String())
return args
def get_filterset_class(filterset_class, **meta):
"""Get the class to be used as the FilterSet"""
if filterset_class:
# If were given a FilterSet class, then set it up and
# return it
return setup_filterset(filterset_class)
return custom_filterset_factory(**meta)

View File

@ -0,0 +1,70 @@
from django import forms
from django.forms.fields import BaseTemporalField
from graphene import ID, Boolean, Float, Int, String, List
from .forms import GlobalIDFormField, GlobalIDMultipleChoiceField
from .utils import import_single_dispatch
singledispatch = import_single_dispatch()
try:
UUIDField = forms.UUIDField
except AttributeError:
class UUIDField(object):
pass
@singledispatch
def convert_form_field(field):
raise Exception(
"Don't know how to convert the Django form field %s (%s) "
"to Graphene type" %
(field, field.__class__)
)
@convert_form_field.register(BaseTemporalField)
@convert_form_field.register(forms.CharField)
@convert_form_field.register(forms.EmailField)
@convert_form_field.register(forms.SlugField)
@convert_form_field.register(forms.URLField)
@convert_form_field.register(forms.ChoiceField)
@convert_form_field.register(forms.RegexField)
@convert_form_field.register(forms.Field)
@convert_form_field.register(UUIDField)
def convert_form_field_to_string(field):
return String(description=field.help_text)
@convert_form_field.register(forms.IntegerField)
@convert_form_field.register(forms.NumberInput)
def convert_form_field_to_int(field):
return Int(description=field.help_text)
@convert_form_field.register(forms.BooleanField)
def convert_form_field_to_boolean(field):
return Boolean(description=field.help_text, required=True)
@convert_form_field.register(forms.NullBooleanField)
def convert_form_field_to_nullboolean(field):
return Boolean(description=field.help_text)
@convert_form_field.register(forms.DecimalField)
@convert_form_field.register(forms.FloatField)
def convert_form_field_to_float(field):
return Float(description=field.help_text)
@convert_form_field.register(forms.ModelMultipleChoiceField)
@convert_form_field.register(GlobalIDMultipleChoiceField)
def convert_form_field_to_list(field):
return List(ID())
@convert_form_field.register(forms.ModelChoiceField)
@convert_form_field.register(GlobalIDFormField)
def convert_form_field_to_id(field):
return ID()

View File

@ -0,0 +1,42 @@
import binascii
from django.core.exceptions import ValidationError
from django.forms import CharField, Field, IntegerField, MultipleChoiceField
from django.utils.translation import ugettext_lazy as _
from graphql_relay import from_global_id
class GlobalIDFormField(Field):
default_error_messages = {
'invalid': _('Invalid ID specified.'),
}
def clean(self, value):
if not value and not self.required:
return None
try:
_type, _id = from_global_id(value)
except (TypeError, ValueError, UnicodeDecodeError, binascii.Error):
raise ValidationError(self.error_messages['invalid'])
try:
IntegerField().clean(_id)
CharField().clean(_type)
except ValidationError:
raise ValidationError(self.error_messages['invalid'])
return value
class GlobalIDMultipleChoiceField(MultipleChoiceField):
default_error_messages = {
'invalid_choice': _('One of the specified IDs was invalid (%(value)s).'),
'invalid_list': _('Enter a list of values.'),
}
def valid_value(self, value):
# Clean will raise a validation error if there is a problem
GlobalIDFormField().clean(value)
return True

View File

@ -0,0 +1,72 @@
import importlib
import json
from distutils.version import StrictVersion
from optparse import make_option
from django import get_version as get_django_version
from django.core.management.base import BaseCommand, CommandError
LT_DJANGO_1_8 = StrictVersion(get_django_version()) < StrictVersion('1.8')
if LT_DJANGO_1_8:
class CommandArguments(BaseCommand):
option_list = BaseCommand.option_list + (
make_option(
'--schema',
type=str,
dest='schema',
default='',
help='Django app containing schema to dump, e.g. myproject.core.schema',
),
make_option(
'--out',
type=str,
dest='out',
default='',
help='Output file (default: schema.json)'
),
)
else:
class CommandArguments(BaseCommand):
def add_arguments(self, parser):
from django.conf import settings
parser.add_argument(
'--schema',
type=str,
dest='schema',
default=getattr(settings, 'GRAPHENE_SCHEMA', ''),
help='Django app containing schema to dump, e.g. myproject.core.schema')
parser.add_argument(
'--out',
type=str,
dest='out',
default=getattr(settings, 'GRAPHENE_SCHEMA_OUTPUT', 'schema.json'),
help='Output file (default: schema.json)')
class Command(CommandArguments):
help = 'Dump Graphene schema JSON to file'
can_import_settings = True
def save_file(self, out, schema_dict):
with open(out, 'w') as outfile:
json.dump(schema_dict, outfile)
def handle(self, *args, **options):
from django.conf import settings
schema = options.get('schema') or getattr(settings, 'GRAPHENE_SCHEMA', '')
out = options.get('out') or getattr(settings, 'GRAPHENE_SCHEMA_OUTPUT', 'schema.json')
if schema == '':
raise CommandError('Specify schema on GRAPHENE_SCHEMA setting or by using --schema')
i = importlib.import_module(schema)
schema_dict = {'data': i.schema.introspect()}
self.save_file(out, schema_dict)
style = getattr(self, 'style', None)
SUCCESS = getattr(style, 'SUCCESS', lambda x: x)
self.stdout.write(SUCCESS('Successfully dumped GraphQL schema to %s' % out))

View File

@ -0,0 +1,52 @@
from __future__ import absolute_import
from django.db import models
from django.utils.translation import ugettext_lazy as _
CHOICES = (
(1, 'this'),
(2, _('that'))
)
class Pet(models.Model):
name = models.CharField(max_length=30)
class FilmDetails(models.Model):
location = models.CharField(max_length=30)
film = models.OneToOneField('Film', related_name='details')
class Film(models.Model):
reporters = models.ManyToManyField('Reporter',
related_name='films')
class Reporter(models.Model):
first_name = models.CharField(max_length=30)
last_name = models.CharField(max_length=30)
email = models.EmailField()
pets = models.ManyToManyField('self')
a_choice = models.CharField(max_length=30, choices=CHOICES)
def __str__(self): # __unicode__ on Python 2
return "%s %s" % (self.first_name, self.last_name)
class Article(models.Model):
headline = models.CharField(max_length=100)
pub_date = models.DateField()
reporter = models.ForeignKey(Reporter, related_name='articles')
lang = models.CharField(max_length=2, help_text='Language', choices=[
('es', 'Spanish'),
('en', 'English')
], default='es')
importance = models.IntegerField('Importance', null=True, blank=True,
choices=[(1, u'Very important'), (2, u'Not as important')])
def __str__(self): # __unicode__ on Python 2
return self.headline
class Meta:
ordering = ('headline',)

View File

@ -0,0 +1,11 @@
from django.core import management
from mock import patch
from six import StringIO
@patch('graphene_django.management.commands.graphql_schema.Command.save_file')
def test_generate_file_on_call_graphql_schema(savefile_mock, settings):
settings.GRAPHENE_SCHEMA = 'graphene_django.tests.test_urls'
out = StringIO()
management.call_command('graphql_schema', schema='', stdout=out)
assert "Successfully dumped GraphQL schema to schema.json" in out.getvalue()

View File

@ -0,0 +1,258 @@
import pytest
from django.db import models
from django.utils.translation import ugettext_lazy as _
from py.test import raises
import graphene
from graphene.relay import Node, ConnectionField
from graphene.utils.get_graphql_type import get_graphql_type
# from graphene.core.types.custom_scalars import DateTime, JSONString
from ..compat import (ArrayField, HStoreField, JSONField, MissingType,
RangeField)
from ..converter import convert_django_field, convert_django_field_with_choices, Registry
from .models import Article, Reporter, Film, FilmDetails, Pet
from ..types import DjangoObjectType, DjangoNode
def assert_conversion(django_field, graphene_field, *args, **kwargs):
field = django_field(help_text='Custom Help Text', *args, **kwargs)
graphene_type = convert_django_field(field)
assert isinstance(graphene_type, graphene_field)
field = graphene_type.as_field()
assert field.description == 'Custom Help Text'
return field
def test_should_unknown_django_field_raise_exception():
with raises(Exception) as excinfo:
convert_django_field(None)
assert 'Don\'t know how to convert the Django field' in str(excinfo.value)
def test_should_date_convert_string():
assert_conversion(models.DateField, DateTime)
def test_should_char_convert_string():
assert_conversion(models.CharField, graphene.String)
def test_should_text_convert_string():
assert_conversion(models.TextField, graphene.String)
def test_should_email_convert_string():
assert_conversion(models.EmailField, graphene.String)
def test_should_slug_convert_string():
assert_conversion(models.SlugField, graphene.String)
def test_should_url_convert_string():
assert_conversion(models.URLField, graphene.String)
def test_should_ipaddress_convert_string():
assert_conversion(models.GenericIPAddressField, graphene.String)
def test_should_file_convert_string():
assert_conversion(models.FileField, graphene.String)
def test_should_image_convert_string():
assert_conversion(models.ImageField, graphene.String)
def test_should_auto_convert_id():
assert_conversion(models.AutoField, graphene.ID, primary_key=True)
def test_should_positive_integer_convert_int():
assert_conversion(models.PositiveIntegerField, graphene.Int)
def test_should_positive_small_convert_int():
assert_conversion(models.PositiveSmallIntegerField, graphene.Int)
def test_should_small_integer_convert_int():
assert_conversion(models.SmallIntegerField, graphene.Int)
def test_should_big_integer_convert_int():
assert_conversion(models.BigIntegerField, graphene.Int)
def test_should_integer_convert_int():
assert_conversion(models.IntegerField, graphene.Int)
def test_should_boolean_convert_boolean():
field = assert_conversion(models.BooleanField, graphene.Boolean)
assert field.required is True
def test_should_nullboolean_convert_boolean():
field = assert_conversion(models.NullBooleanField, graphene.Boolean)
assert field.required is False
def test_field_with_choices_convert_enum():
field = models.CharField(help_text='Language', choices=(
('es', 'Spanish'),
('en', 'English')
))
class TranslatedModel(models.Model):
language = field
class Meta:
app_label = 'test'
graphene_type = convert_django_field_with_choices(field)
assert issubclass(graphene_type, graphene.Enum)
assert graphene_type._meta.graphql_type.name == 'TEST_TRANSLATEDMODEL_LANGUAGE'
assert graphene_type._meta.graphql_type.description == 'Language'
assert graphene_type._meta.enum.__members__['SPANISH'].value == 'es'
assert graphene_type._meta.enum.__members__['ENGLISH'].value == 'en'
def test_field_with_grouped_choices():
field = models.CharField(help_text='Language', choices=(
('Europe', (
('es', 'Spanish'),
('en', 'English'),
)),
))
class GroupedChoicesModel(models.Model):
language = field
class Meta:
app_label = 'test'
convert_django_field_with_choices(field)
def test_field_with_choices_gettext():
field = models.CharField(help_text='Language', choices=(
('es', _('Spanish')),
('en', _('English'))
))
class TranslatedChoicesModel(models.Model):
language = field
class Meta:
app_label = 'test'
convert_django_field_with_choices(field)
def test_should_float_convert_float():
assert_conversion(models.FloatField, graphene.Float)
def test_should_manytomany_convert_connectionorlist():
registry = Registry()
graphene_field = convert_django_field(Reporter._meta.local_many_to_many[0], registry)
assert not graphene_field
def test_should_manytomany_convert_connectionorlist_list():
registry = Registry()
class A(DjangoObjectType):
class Meta:
model = Reporter
registry.register(A)
graphene_field = convert_django_field(Reporter._meta.local_many_to_many[0], registry)
assert isinstance(graphene_field, graphene.Field)
assert isinstance(graphene_field.type, graphene.List)
assert graphene_field.type.of_type == get_graphql_type(A)
def test_should_manytomany_convert_connectionorlist_connection():
registry = Registry()
class A(DjangoNode, DjangoObjectType):
class Meta:
model = Reporter
registry.register(A)
graphene_field = convert_django_field(Reporter._meta.local_many_to_many[0], registry)
assert isinstance(graphene_field, ConnectionField)
assert graphene_field.type == get_graphql_type(A.get_default_connection())
def test_should_manytoone_convert_connectionorlist():
# Django 1.9 uses 'rel', <1.9 uses 'related
related = getattr(Reporter.articles, 'rel', None) or \
getattr(Reporter.articles, 'related')
registry = Registry()
class A(DjangoObjectType):
class Meta:
model = Article
registry.register(A)
graphene_field = convert_django_field(related, registry)
assert isinstance(graphene_field, graphene.Field)
assert isinstance(graphene_field.type, graphene.List)
assert graphene_field.type.of_type == get_graphql_type(A)
def test_should_onetoone_reverse_convert_model():
# Django 1.9 uses 'rel', <1.9 uses 'related
related = getattr(Film.details, 'rel', None) or \
getattr(Film.details, 'related')
class A(DjangoObjectType):
class Meta:
model = FilmDetails
registry = Registry()
registry.register(A)
graphene_field = convert_django_field(related, registry)
assert isinstance(graphene_field, graphene.Field)
assert graphene_field.type == get_graphql_type(A)
@pytest.mark.skipif(ArrayField is MissingType,
reason="ArrayField should exist")
def test_should_postgres_array_convert_list():
field = assert_conversion(ArrayField, graphene.List, models.CharField(max_length=100))
assert isinstance(field.type, graphene.List)
assert isinstance(field.type.of_type, graphene.String)
@pytest.mark.skipif(ArrayField is MissingType,
reason="ArrayField should exist")
def test_should_postgres_array_multiple_convert_list():
field = assert_conversion(ArrayField, graphene.List, ArrayField(models.CharField(max_length=100)))
assert isinstance(field.type, graphene.List)
assert isinstance(field.type.of_type, graphene.List)
assert isinstance(field.type.of_type.of_type, graphene.String)
@pytest.mark.skipif(HStoreField is MissingType,
reason="HStoreField should exist")
def test_should_postgres_hstore_convert_string():
assert_conversion(HStoreField, JSONString)
@pytest.mark.skipif(JSONField is MissingType,
reason="JSONField should exist")
def test_should_postgres_json_convert_string():
assert_conversion(JSONField, JSONString)
@pytest.mark.skipif(RangeField is MissingType,
reason="RangeField should exist")
def test_should_postgres_range_convert_list():
from django.contrib.postgres.fields import IntegerRangeField
field = assert_conversion(IntegerRangeField, graphene.List)
assert isinstance(field.type.of_type, graphene.Int)

View File

@ -0,0 +1,103 @@
from django import forms
from py.test import raises
import graphene
from ..form_converter import convert_form_field
from graphene import ID, List
from .models import Reporter
def assert_conversion(django_field, graphene_field, *args):
field = django_field(*args, help_text='Custom Help Text')
graphene_type = convert_form_field(field)
assert isinstance(graphene_type, graphene_field)
field = graphene_type.as_field()
assert field.description == 'Custom Help Text'
return field
def test_should_unknown_django_field_raise_exception():
with raises(Exception) as excinfo:
convert_form_field(None)
assert 'Don\'t know how to convert the Django form field' in str(excinfo.value)
def test_should_date_convert_string():
assert_conversion(forms.DateField, graphene.String)
def test_should_time_convert_string():
assert_conversion(forms.TimeField, graphene.String)
def test_should_date_time_convert_string():
assert_conversion(forms.DateTimeField, graphene.String)
def test_should_char_convert_string():
assert_conversion(forms.CharField, graphene.String)
def test_should_email_convert_string():
assert_conversion(forms.EmailField, graphene.String)
def test_should_slug_convert_string():
assert_conversion(forms.SlugField, graphene.String)
def test_should_url_convert_string():
assert_conversion(forms.URLField, graphene.String)
def test_should_choice_convert_string():
assert_conversion(forms.ChoiceField, graphene.String)
def test_should_base_field_convert_string():
assert_conversion(forms.Field, graphene.String)
def test_should_regex_convert_string():
assert_conversion(forms.RegexField, graphene.String, '[0-9]+')
def test_should_uuid_convert_string():
if hasattr(forms, 'UUIDField'):
assert_conversion(forms.UUIDField, graphene.String)
def test_should_integer_convert_int():
assert_conversion(forms.IntegerField, graphene.Int)
def test_should_boolean_convert_boolean():
field = assert_conversion(forms.BooleanField, graphene.Boolean)
assert field.required is True
def test_should_nullboolean_convert_boolean():
field = assert_conversion(forms.NullBooleanField, graphene.Boolean)
assert field.required is False
def test_should_float_convert_float():
assert_conversion(forms.FloatField, graphene.Float)
def test_should_decimal_convert_float():
assert_conversion(forms.DecimalField, graphene.Float)
def test_should_multiple_choice_convert_connectionorlist():
field = forms.ModelMultipleChoiceField(Reporter.objects.all())
graphene_type = convert_form_field(field)
assert isinstance(graphene_type, List)
assert isinstance(graphene_type.of_type, ID)
def test_should_manytoone_convert_connectionorlist():
field = forms.ModelChoiceField(Reporter.objects.all())
graphene_type = convert_form_field(field)
assert isinstance(graphene_type, graphene.ID)

View File

@ -0,0 +1,36 @@
from django.core.exceptions import ValidationError
from py.test import raises
from ..forms import GlobalIDFormField
# 'TXlUeXBlOjEwMA==' -> 'MyType', 100
# 'TXlUeXBlOmFiYw==' -> 'MyType', 'abc'
def test_global_id_valid():
field = GlobalIDFormField()
field.clean('TXlUeXBlOjEwMA==')
def test_global_id_invalid():
field = GlobalIDFormField()
with raises(ValidationError):
field.clean('badvalue')
def test_global_id_none():
field = GlobalIDFormField()
with raises(ValidationError):
field.clean(None)
def test_global_id_none_optional():
field = GlobalIDFormField(required=False)
field.clean(None)
def test_global_id_bad_int():
field = GlobalIDFormField()
with raises(ValidationError):
field.clean('TXlUeXBlOmFiYw==')

View File

@ -0,0 +1,201 @@
import datetime
import pytest
from django.db import models
from py.test import raises
import graphene
from graphene import relay
from ..compat import MissingType, RangeField
from ..types import DjangoNode, DjangoObjectType
from .models import Article, Reporter
pytestmark = pytest.mark.django_db
def test_should_query_only_fields():
with raises(Exception):
class ReporterType(DjangoObjectType):
class Meta:
model = Reporter
only_fields = ('articles', )
schema = graphene.Schema(query=ReporterType)
query = '''
query ReporterQuery {
articles
}
'''
result = schema.execute(query)
assert not result.errors
def test_should_query_well():
class ReporterType(DjangoObjectType):
class Meta:
model = Reporter
class Query(graphene.ObjectType):
reporter = graphene.Field(ReporterType)
def resolve_reporter(self, *args, **kwargs):
return ReporterType(Reporter(first_name='ABA', last_name='X'))
query = '''
query ReporterQuery {
reporter {
firstName,
lastName,
email
}
}
'''
expected = {
'reporter': {
'firstName': 'ABA',
'lastName': 'X',
'email': ''
}
}
schema = graphene.Schema(query=Query)
result = schema.execute(query)
assert not result.errors
assert result.data == expected
@pytest.mark.skipif(RangeField is MissingType,
reason="RangeField should exist")
def test_should_query_postgres_fields():
from django.contrib.postgres.fields import IntegerRangeField, ArrayField, JSONField, HStoreField
class Event(models.Model):
ages = IntegerRangeField(help_text='The age ranges')
data = JSONField(help_text='Data')
store = HStoreField()
tags = ArrayField(models.CharField(max_length=50))
class EventType(DjangoObjectType):
class Meta:
model = Event
class Query(graphene.ObjectType):
event = graphene.Field(EventType)
def resolve_event(self, *args, **kwargs):
return Event(
ages=(0, 10),
data={'angry_babies': True},
store={'h': 'store'},
tags=['child', 'angry', 'babies']
)
schema = graphene.Schema(query=Query)
query = '''
query myQuery {
event {
ages
tags
data
store
}
}
'''
expected = {
'event': {
'ages': [0, 10],
'tags': ['child', 'angry', 'babies'],
'data': '{"angry_babies": true}',
'store': '{"h": "store"}',
},
}
result = schema.execute(query)
assert not result.errors
assert result.data == expected
def test_should_node():
class ReporterNode(DjangoNode):
class Meta:
model = Reporter
@classmethod
def get_node(cls, id, info):
return ReporterNode(Reporter(id=2, first_name='Cookie Monster'))
def resolve_articles(self, *args, **kwargs):
return [ArticleNode(Article(headline='Hi!'))]
class ArticleNode(DjangoNode):
class Meta:
model = Article
@classmethod
def get_node(cls, id, info):
return ArticleNode(Article(id=1, headline='Article node', pub_date=datetime.date(2002, 3, 11)))
class Query(graphene.ObjectType):
node = relay.NodeField()
reporter = graphene.Field(ReporterNode)
article = graphene.Field(ArticleNode)
def resolve_reporter(self, *args, **kwargs):
return ReporterNode(
Reporter(id=1, first_name='ABA', last_name='X'))
query = '''
query ReporterQuery {
reporter {
id,
firstName,
articles {
edges {
node {
headline
}
}
}
lastName,
email
}
myArticle: node(id:"QXJ0aWNsZU5vZGU6MQ==") {
id
... on ReporterNode {
firstName
}
... on ArticleNode {
headline
pubDate
}
}
}
'''
expected = {
'reporter': {
'id': 'UmVwb3J0ZXJOb2RlOjE=',
'firstName': 'ABA',
'lastName': 'X',
'email': '',
'articles': {
'edges': [{
'node': {
'headline': 'Hi!'
}
}]
},
},
'myArticle': {
'id': 'QXJ0aWNsZU5vZGU6MQ==',
'headline': 'Article node',
'pubDate': '2002-03-11',
}
}
schema = graphene.Schema(query=Query)
result = schema.execute(query)
assert not result.errors
assert result.data == expected

View File

@ -0,0 +1,45 @@
from py.test import raises
from ..types import DjangoObjectType
from tests.utils import assert_equal_lists
from .models import Reporter
def test_should_raise_if_no_model():
with raises(Exception) as excinfo:
class Character1(DjangoObjectType):
pass
assert 'model in the Meta' in str(excinfo.value)
def test_should_raise_if_model_is_invalid():
with raises(Exception) as excinfo:
class Character2(DjangoObjectType):
class Meta:
model = 1
assert 'not a Django model' in str(excinfo.value)
def test_should_map_fields_correctly():
class ReporterType2(DjangoObjectType):
class Meta:
model = Reporter
assert_equal_lists(
ReporterType2._meta.fields_map.keys(),
['articles', 'first_name', 'last_name', 'email', 'pets', 'id', 'films']
)
def test_should_map_only_few_fields():
class Reporter2(DjangoObjectType):
class Meta:
model = Reporter
only_fields = ('id', 'email')
assert_equal_lists(
Reporter2._meta.fields_map.keys(),
['id', 'email']
)

View File

@ -0,0 +1,102 @@
from graphql.type import GraphQLObjectType
from mock import patch
from graphene import Schema, Interface
from ..types import DjangoNode, DjangoObjectType
from graphene.core.fields import Field
from graphene.core.types.scalars import Int
from graphene.relay.fields import GlobalIDField
from tests.utils import assert_equal_lists
from .models import Article, Reporter
schema = Schema()
@schema.register
class Character(DjangoObjectType):
'''Character description'''
class Meta:
model = Reporter
@schema.register
class Human(DjangoNode):
'''Human description'''
pub_date = Int()
class Meta:
model = Article
def test_django_interface():
assert DjangoNode._meta.interface is True
@patch('graphene_django.tests.models.Article.objects.get', return_value=Article(id=1))
def test_django_get_node(get):
human = Human.get_node(1, None)
get.assert_called_with(id=1)
assert human.id == 1
def test_djangonode_idfield():
idfield = DjangoNode._meta.fields_map['id']
assert isinstance(idfield, GlobalIDField)
def test_node_idfield():
idfield = Human._meta.fields_map['id']
assert isinstance(idfield, GlobalIDField)
def test_node_replacedfield():
idfield = Human._meta.fields_map['pub_date']
assert isinstance(idfield, Field)
assert schema.T(idfield).type == schema.T(Int())
def test_objecttype_init_none():
h = Human()
assert h._root is None
def test_objecttype_init_good():
instance = Article()
h = Human(instance)
assert h._root == instance
def test_object_type():
object_type = schema.T(Human)
Human._meta.fields_map
assert Human._meta.interface is False
assert isinstance(object_type, GraphQLObjectType)
assert_equal_lists(
object_type.get_fields().keys(),
['headline', 'id', 'reporter', 'pubDate']
)
assert schema.T(DjangoNode) in object_type.get_interfaces()
def test_node_notinterface():
assert Human._meta.interface is False
assert DjangoNode in Human._meta.interfaces
def test_django_objecttype_could_extend_interface():
schema = Schema()
@schema.register
class Customer(Interface):
id = Int()
@schema.register
class UserType(DjangoObjectType):
class Meta:
model = Reporter
interfaces = [Customer]
object_type = schema.T(UserType)
assert schema.T(Customer) in object_type.get_interfaces()

View File

@ -0,0 +1,45 @@
from django.conf.urls import url
import graphene
from graphene import Schema
from ..types import DjangoNode
from ..views import GraphQLView
from .models import Article, Reporter
class Character(DjangoNode):
class Meta:
model = Reporter
def get_node(self, id):
pass
class Human(DjangoNode):
raises = graphene.String()
class Meta:
model = Article
def resolve_raises(self, *args):
raise Exception("This field should raise exception")
def get_node(self, id):
pass
class Query(graphene.ObjectType):
human = graphene.Field(Human)
def resolve_human(self, args, info):
return Human()
schema = Schema(query=Query)
urlpatterns = [
url(r'^graphql', GraphQLView.as_view(schema=schema)),
]

View File

@ -0,0 +1,57 @@
import json
def format_response(response):
return json.loads(response.content.decode())
def test_client_get_good_query(settings, client):
settings.ROOT_URLCONF = 'graphene_django.tests.test_urls'
response = client.get('/graphql', {'query': '{ human { headline } }'})
json_response = format_response(response)
expected_json = {
'data': {
'human': {
'headline': None
}
}
}
assert json_response == expected_json
def test_client_get_good_query_with_raise(settings, client):
settings.ROOT_URLCONF = 'graphene_django.tests.test_urls'
response = client.get('/graphql', {'query': '{ human { raises } }'})
json_response = format_response(response)
assert json_response['errors'][0]['message'] == 'This field should raise exception'
assert json_response['data']['human']['raises'] is None
def test_client_post_good_query_json(settings, client):
settings.ROOT_URLCONF = 'graphene_django.tests.test_urls'
response = client.post(
'/graphql', json.dumps({'query': '{ human { headline } }'}), 'application/json')
json_response = format_response(response)
expected_json = {
'data': {
'human': {
'headline': None
}
}
}
assert json_response == expected_json
def test_client_post_good_query_graphql(settings, client):
settings.ROOT_URLCONF = 'graphene_django.tests.test_urls'
response = client.post(
'/graphql', '{ human { headline } }', 'application/graphql')
json_response = format_response(response)
expected_json = {
'data': {
'human': {
'headline': None
}
}
}
assert json_response == expected_json

View File

@ -0,0 +1,92 @@
import inspect
import six
from django.db import models
from graphene import Field, Interface
from graphene.types.objecttype import ObjectType, ObjectTypeMeta, attrs_without_fields, GrapheneObjectType, get_interfaces
from graphene.types.interface import InterfaceTypeMeta
from graphene.relay import Connection, Node
from graphene.relay.node import NodeMeta
from .converter import convert_django_field_with_choices, Registry
from graphene.types.options import Options
from graphene import String
from .utils import get_model_fields
from graphene.utils.is_base_type import is_base_type
from graphene.utils.copy_fields import copy_fields
from graphene.utils.get_fields import get_fields
from graphene.utils.is_base_type import is_base_type
class DjangoObjectTypeMeta(ObjectTypeMeta):
def __new__(cls, name, bases, attrs):
# super_new = super(DjangoObjectTypeMeta, cls).__new__
super_new = type.__new__
# Also ensure initialization is only performed for subclasses of Model
# (excluding Model class itself).
if not is_base_type(bases, DjangoObjectTypeMeta):
return super_new(cls, name, bases, attrs)
options = Options(
attrs.pop('Meta', None),
name=None,
description=None,
model=None,
fields=(),
exclude=(),
interfaces=(),
)
assert options.model, 'You need to pass a valid Django Model in {}.Meta'.format(name)
get_model_fields(options.model)
interfaces = tuple(options.interfaces)
fields = get_fields(ObjectType, attrs, bases, interfaces)
attrs = attrs_without_fields(attrs, fields)
cls = super_new(cls, name, bases, dict(attrs, _meta=options))
fields = copy_fields(Field, fields, parent=cls)
base_interfaces = tuple(b for b in bases if issubclass(b, Interface))
options.graphql_type = GrapheneObjectType(
graphene_type=cls,
name=options.name or cls.__name__,
description=options.description or cls.__doc__,
fields=fields,
interfaces=tuple(get_interfaces(interfaces + base_interfaces))
)
# for field in all_fields:
# is_not_in_only = only_fields and field.name not in only_fields
# is_already_created = field.name in already_created_fields
# is_excluded = field.name in cls._meta.exclude_fields or is_already_created
# if is_not_in_only or is_excluded:
# # We skip this field if we specify only_fields and is not
# # in there. Or when we exclude this field in exclude_fields
# continue
# converted_field = convert_django_field_with_choices(field)
return cls
class DjangoObjectType(six.with_metaclass(DjangoObjectTypeMeta, ObjectType)):
_registry = None
@classmethod
def get_registry(cls):
if not DjangoObjectType._registry:
DjangoObjectType._registry = Registry()
return DjangoObjectType._registry
class DjangoNodeMeta(DjangoObjectTypeMeta, NodeMeta):
pass
class DjangoNode(six.with_metaclass(DjangoNodeMeta, Node)):
@classmethod
def get_node(cls, id, context, info):
try:
instance = cls._meta.model.objects.get(id=id)
return cls(instance)
except cls._meta.model.DoesNotExist:
return None

View File

@ -0,0 +1,98 @@
from django.db import models
from django.db.models.manager import Manager
from django.db.models.query import QuerySet
# from graphene.utils import LazyList
class LazyList(object):
pass
from .compat import RelatedObject
try:
import django_filters # noqa
DJANGO_FILTER_INSTALLED = True
except (ImportError, AttributeError):
# AtributeError raised if DjangoFilters installed with a incompatible Django Version
DJANGO_FILTER_INSTALLED = False
def get_type_for_model(schema, model):
schema = schema
types = schema.types.values()
for _type in types:
type_model = hasattr(_type, '_meta') and getattr(
_type._meta, 'model', None)
if model == type_model:
return _type
def get_reverse_fields(model):
for name, attr in model.__dict__.items():
# Django =>1.9 uses 'rel', django <1.9 uses 'related'
related = getattr(attr, 'rel', None) or \
getattr(attr, 'related', None)
if isinstance(related, RelatedObject):
# Hack for making it compatible with Django 1.6
new_related = RelatedObject(related.parent_model, related.model, related.field)
new_related.name = name
yield new_related
elif isinstance(related, models.ManyToOneRel):
yield related
elif isinstance(related, models.ManyToManyRel) and not related.symmetrical:
yield related
class WrappedQueryset(LazyList):
def __len__(self):
# Dont calculate the length using len(queryset), as this will
# evaluate the whole queryset and return it's length.
# Use .count() instead
return self._origin.count()
def maybe_queryset(value):
if isinstance(value, Manager):
value = value.get_queryset()
if isinstance(value, QuerySet):
return WrappedQueryset(value)
return value
def get_model_fields(model):
reverse_fields = get_reverse_fields(model)
all_fields = sorted(list(model._meta.fields) +
list(model._meta.local_many_to_many))
all_fields += list(reverse_fields)
return all_fields
def get_related_model(field):
if hasattr(field, 'rel'):
# Django 1.6, 1.7
return field.rel.to
return field.related_model
def import_single_dispatch():
try:
from functools import singledispatch
except ImportError:
singledispatch = None
if not singledispatch:
try:
from singledispatch import singledispatch
except ImportError:
pass
if not singledispatch:
raise Exception(
"It seems your python version does not include "
"functools.singledispatch. Please install the 'singledispatch' "
"package. More information here: "
"https://pypi.python.org/pypi/singledispatch"
)
return singledispatch

View File

@ -0,0 +1,12 @@
from graphql_django_view import GraphQLView as BaseGraphQLView
class GraphQLView(BaseGraphQLView):
graphene_schema = None
def __init__(self, schema, **kwargs):
super(GraphQLView, self).__init__(
graphene_schema=schema,
schema=schema,
**kwargs
)

View File

@ -0,0 +1,2 @@
[pytest]
DJANGO_SETTINGS_MODULE = django_test_settings

49
graphene-django/setup.py Normal file
View File

@ -0,0 +1,49 @@
from setuptools import find_packages, setup
setup(
name='graphene-django',
version='1.0',
description='Graphene Django integration',
# long_description=open('README.rst').read(),
url='https://github.com/graphql-python/graphene-django',
author='Syrus Akbary',
author_email='me@syrusakbary.com',
license='MIT',
classifiers=[
'Development Status :: 3 - Alpha',
'Intended Audience :: Developers',
'Topic :: Software Development :: Libraries',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: Implementation :: PyPy',
],
keywords='api graphql protocol rest relay graphene',
packages=find_packages(exclude=['tests']),
install_requires=[
'six>=1.10.0',
'graphene>=1.0',
'Django>=1.6.0',
'singledispatch>=3.4.0.3',
'graphql-django-view>=1.3',
],
tests_require=[
'django-filter>=0.10.0',
'pytest>=2.7.2',
'pytest-django',
'mock',
# Required for Django postgres fields testing
'psycopg2',
],
)

View File

@ -24,7 +24,7 @@ class PyTest(TestCommand):
setup(
name='graphene',
version='0.10.2',
version='1.0.0',
description='GraphQL Framework for Python',
long_description=open('README.rst').read(),