Moved Graphene-Django and Graphene-SQLAlchemy to individual packages

This commit is contained in:
Syrus Akbary 2016-09-17 19:39:35 -07:00
parent aa84d6d8db
commit 92b04225b2
100 changed files with 50 additions and 5141 deletions

View File

@ -10,13 +10,22 @@ Please read [UPGRADE-v1.0.md](/UPGRADE-v1.0.md) to learn how to upgrade.
- **Easy to use:** Graphene helps you use GraphQL in Python without effort.
- **Relay:** Graphene has builtin support for Relay
- **Django:** Automatic *Django model* mapping to Graphene Types. Check a fully working [Django](http://github.com/graphql-python/swapi-graphene) implementation
- **Data agnostic:** Graphene supports any kind of data source: SQL (Django, SQLAlchemy), NoSQL, custom Python objects...
We believe that by providing a complete API you could plug Graphene anywhere your data lives and make your data available
through GraphQL.
Graphene also supports *SQLAlchemy*!
*What is supported in this Python version?* **Everything**: Interfaces, ObjectTypes, Scalars, Unions and Relay (Nodes, Connections), in addition to queries, mutations and subscriptions.
## Integrations
Graphene has multiple integrations with different frameworks:
| integration | Package |
|---------------|-------------------|
| Django | [graphene-django](https://github.com/graphql-python/graphene-django/) |
| SQLAlchemy | [graphene-sqlalchemy](https://github.com/graphql-python/graphene-sqlalchemy/) |
| Google App Engine | [graphene-gae](https://github.com/graphql-python/graphene-gae/) |
| Peewee | *In progress* ([Tracking Issue](https://github.com/graphql-python/graphene/issues/289)) |
**NEW**!: [Try graphene online](http://graphene-python.org/playground/)
## Installation
@ -24,10 +33,6 @@ For instaling graphene, just run this command in your shell
```bash
pip install "graphene>=1.0.dev"
# In case of need Django model support
pip install "graphene-django>=1.0.dev"
# Or in case of need SQLAlchemy support
pip install "graphene-sqlalchemy>=1.0.dev"
```
## 1.0 Upgrade Guide
@ -42,15 +47,10 @@ Here is one example for get you started:
```python
class Query(graphene.ObjectType):
hello = graphene.String(description='A typical hello world')
ping = graphene.String(description='Ping someone',
to=graphene.String())
def resolve_hello(self, args, context, info):
return 'World'
def resolve_ping(self, args, context, info):
return 'Pinging {}'.format(args.get('to'))
schema = graphene.Schema(query=Query)
```
@ -60,7 +60,6 @@ Then Querying `graphene.Schema` is as simple as:
query = '''
query SayHello {
hello
ping(to:"peter")
}
'''
result = schema.execute(query)

View File

@ -1,31 +1,38 @@
You are in the ``next`` unreleased version of Graphene (``1.0.dev``).
Please read `UPGRADE-v1.0.md </UPGRADE-v1.0.md>`__ to learn how to
upgrade.
Please read `UPGRADE-v1.0.md`_ to learn how to upgrade.
--------------
|Graphene Logo| `Graphene <http://graphene-python.org>`__ |Build Status| |PyPI version| |Coverage Status|
=========================================================================================================
|Graphene Logo| `Graphene`_ |Build Status| |PyPI version| |Coverage Status|
===========================================================================
`Graphene <http://graphene-python.org>`__ is a Python library for
building GraphQL schemas/types fast and easily.
`Graphene`_ is a Python library for building GraphQL schemas/types fast
and easily.
- **Easy to use:** Graphene helps you use GraphQL in Python without
effort.
- **Relay:** Graphene has builtin support for Relay
- **Django:** Automatic *Django model* mapping to Graphene Types. Check
a fully working
`Django <http://github.com/graphql-python/swapi-graphene>`__
implementation
- **Data agnostic:** Graphene supports any kind of data source: SQL
(Django, SQLAlchemy), NoSQL, custom Python objects… We believe that
by providing a complete API you could plug Graphene anywhere your
data lives and make your data available through GraphQL.
Graphene also supports *SQLAlchemy*!
Integrations
------------
*What is supported in this Python version?* **Everything**: Interfaces,
ObjectTypes, Scalars, Unions and Relay (Nodes, Connections), in addition
to queries, mutations and subscriptions.
Graphene has multiple integrations with different frameworks:
**NEW**!: `Try graphene
online <http://graphene-python.org/playground/>`__
+---------------------+-------------------------------------+
| integration | Package |
+=====================+=====================================+
| Django | `graphene-django`_ |
+---------------------+-------------------------------------+
| SQLAlchemy | `graphene-sqlalchemy`_ |
+---------------------+-------------------------------------+
| Google App Engine | `graphene-gae`_ |
+---------------------+-------------------------------------+
| Peewee | *In progress* (`Tracking Issue`_) |
+---------------------+-------------------------------------+
Installation
------------
@ -35,16 +42,11 @@ For instaling graphene, just run this command in your shell
.. code:: bash
pip install "graphene>=1.0.dev"
# In case of need Django model support
pip install "graphene-django>=1.0.dev"
# Or in case of need SQLAlchemy support
pip install "graphene-sqlalchemy>=1.0.dev"
1.0 Upgrade Guide
-----------------
Please read `UPGRADE-v1.0.md </UPGRADE-v1.0.md>`__ to learn how to
upgrade.
Please read `UPGRADE-v1.0.md`_ to learn how to upgrade.
Examples
--------
@ -55,15 +57,10 @@ Here is one example for get you started:
class Query(graphene.ObjectType):
hello = graphene.String(description='A typical hello world')
ping = graphene.String(description='Ping someone',
to=graphene.String())
def resolve_hello(self, args, context, info):
return 'World'
def resolve_ping(self, args, context, info):
return 'Pinging {}'.format(args.get('to'))
schema = graphene.Schema(query=Query)
Then Querying ``graphene.Schema`` is as simple as:
@ -73,17 +70,15 @@ Then Querying ``graphene.Schema`` is as simple as:
query = '''
query SayHello {
hello
ping(to:"peter")
}
'''
result = schema.execute(query)
If you want to learn even more, you can also check the following
`examples <examples/>`__:
`examples`_:
- **Basic Schema**: `Starwars example <examples/starwars>`__
- **Relay Schema**: `Starwars Relay
example <examples/starwars_relay>`__
- **Basic Schema**: `Starwars example`_
- **Relay Schema**: `Starwars Relay example`_
Contributing
------------
@ -100,6 +95,16 @@ After developing, the full test suite can be evaluated by running:
python setup.py test # Use --pytest-args="-v -s" for verbose mode
.. _UPGRADE-v1.0.md: /UPGRADE-v1.0.md
.. _Graphene: http://graphene-python.org
.. _graphene-django: https://github.com/graphql-python/graphene-django/
.. _graphene-sqlalchemy: https://github.com/graphql-python/graphene-sqlalchemy/
.. _graphene-gae: https://github.com/graphql-python/graphene-gae/
.. _Tracking Issue: https://github.com/graphql-python/graphene/issues/289
.. _examples: examples/
.. _Starwars example: examples/starwars
.. _Starwars Relay example: examples/starwars_relay
.. |Graphene Logo| image:: http://graphene-python.org/favicon.png
.. |Build Status| image:: https://travis-ci.org/graphql-python/graphene.svg?branch=master
:target: https://travis-ci.org/graphql-python/graphene

View File

@ -1,18 +0,0 @@
import sys, os
ROOT_PATH = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, ROOT_PATH + '/examples/')
SECRET_KEY = 1
INSTALLED_APPS = [
'graphene_django',
'graphene_django.tests',
'starwars',
]
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': 'django_test.sqlite',
}
}

View File

@ -1,64 +0,0 @@
Cookbook Example Django Project
===============================
This example project demos integration between Graphene and Django.
The project contains two apps, one named `ingredients` and another
named `recepies`.
Getting started
---------------
First you'll need to get the source of the project. Do this by cloning the
whole Graphene repository:
```bash
# Get the example project code
git clone https://github.com/graphql-python/graphene.git
cd graphene/examples/cookbook
```
It is good idea (but not required) to create a virtual environment
for this project. We'll do this using
[virtualenv](http://docs.python-guide.org/en/latest/dev/virtualenvs/)
to keep things simple,
but you may also find something like
[virtualenvwrapper](https://virtualenvwrapper.readthedocs.org/en/latest/)
to be useful:
```bash
# Create a virtualenv in which we can install the dependencies
virtualenv env
source env/bin/activate
```
Now we can install our dependencies:
```bash
pip install -r requirements.txt
```
Now setup our database:
```bash
# Setup the database
./manage.py migrate
# Load some example data
./manage.py loaddata ingredients
# Create an admin user (useful for logging into the admin UI
# at http://127.0.0.1:8000/admin)
./manage.py createsuperuser
```
Now you should be ready to start the server:
```bash
./manage.py runserver
```
Now head on over to
[http://127.0.0.1:8000/graphiql](http://127.0.0.1:8000/graphiql)
and run some queries!
(See the [Django quickstart guide](http://graphene-python.org/docs/quickstart-django/)
for some example queries)

View File

@ -1,6 +0,0 @@
from django.contrib import admin
from cookbook.ingredients.models import Category, Ingredient
admin.site.register(Ingredient)
admin.site.register(Category)

View File

@ -1,7 +0,0 @@
from django.apps import AppConfig
class IngredientsConfig(AppConfig):
name = 'cookbook.ingredients'
label = 'ingredients'
verbose_name = 'Ingredients'

View File

@ -1 +0,0 @@
[{"model": "ingredients.category", "pk": 1, "fields": {"name": "Dairy"}}, {"model": "ingredients.category", "pk": 2, "fields": {"name": "Meat"}}, {"model": "ingredients.ingredient", "pk": 1, "fields": {"name": "Eggs", "notes": "Good old eggs", "category": 1}}, {"model": "ingredients.ingredient", "pk": 2, "fields": {"name": "Milk", "notes": "Comes from a cow", "category": 1}}, {"model": "ingredients.ingredient", "pk": 3, "fields": {"name": "Beef", "notes": "Much like milk, this comes from a cow", "category": 2}}, {"model": "ingredients.ingredient", "pk": 4, "fields": {"name": "Chicken", "notes": "Definitely doesn't come from a cow", "category": 2}}]

View File

@ -1,33 +0,0 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.9 on 2015-12-04 18:15
from __future__ import unicode_literals
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
]
operations = [
migrations.CreateModel(
name='Category',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('name', models.CharField(max_length=100)),
],
),
migrations.CreateModel(
name='Ingredient',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('name', models.CharField(max_length=100)),
('notes', models.TextField()),
('category', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='ingredients', to='ingredients.Category')),
],
),
]

View File

@ -1,17 +0,0 @@
from django.db import models
class Category(models.Model):
name = models.CharField(max_length=100)
def __str__(self):
return self.name
class Ingredient(models.Model):
name = models.CharField(max_length=100)
notes = models.TextField()
category = models.ForeignKey(Category, related_name='ingredients')
def __str__(self):
return self.name

View File

@ -1,36 +0,0 @@
from cookbook.ingredients.models import Category, Ingredient
from graphene import ObjectType, Field
from graphene_django.filter import DjangoFilterConnectionField
from graphene_django.types import DjangoNode, DjangoObjectType
# Graphene will automatically map the User model's fields onto the UserType.
# This is configured in the UserType's Meta class (as you can see below)
class CategoryNode(DjangoNode, DjangoObjectType):
class Meta:
model = Category
filter_fields = ['name', 'ingredients']
filter_order_by = ['name']
class IngredientNode(DjangoNode, DjangoObjectType):
class Meta:
model = Ingredient
# Allow for some more advanced filtering here
filter_fields = {
'name': ['exact', 'icontains', 'istartswith'],
'notes': ['exact', 'icontains'],
'category': ['exact'],
'category__name': ['exact'],
}
filter_order_by = ['name', 'category__name']
class Query(ObjectType):
category = Field(CategoryNode)
all_categories = DjangoFilterConnectionField(CategoryNode)
ingredient = Field(IngredientNode)
all_ingredients = DjangoFilterConnectionField(IngredientNode)

View File

@ -1,2 +0,0 @@
# Create your tests here.

View File

@ -1,2 +0,0 @@
# Create your views here.

View File

@ -1,6 +0,0 @@
from django.contrib import admin
from cookbook.recipes.models import Recipe, RecipeIngredient
admin.site.register(Recipe)
admin.site.register(RecipeIngredient)

View File

@ -1,7 +0,0 @@
from django.apps import AppConfig
class RecipesConfig(AppConfig):
name = 'cookbook.recipes'
label = 'recipes'
verbose_name = 'Recipes'

View File

@ -1,36 +0,0 @@
# -*- coding: utf-8 -*-
# Generated by Django 1.9 on 2015-12-04 18:20
from __future__ import unicode_literals
import django.db.models.deletion
from django.db import migrations, models
class Migration(migrations.Migration):
initial = True
dependencies = [
('ingredients', '0001_initial'),
]
operations = [
migrations.CreateModel(
name='Recipe',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('title', models.CharField(max_length=100)),
('instructions', models.TextField()),
],
),
migrations.CreateModel(
name='RecipeIngredient',
fields=[
('id', models.AutoField(auto_created=True, primary_key=True, serialize=False, verbose_name='ID')),
('amount', models.FloatField()),
('unit', models.CharField(choices=[('kg', 'Kilograms'), ('l', 'Litres'), ('', 'Units')], max_length=20)),
('ingredient', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='used_by', to='ingredients.Ingredient')),
('recipes', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='amounts', to='recipes.Recipe')),
],
),
]

View File

@ -1,19 +0,0 @@
from django.db import models
from cookbook.ingredients.models import Ingredient
class Recipe(models.Model):
title = models.CharField(max_length=100)
instructions = models.TextField()
class RecipeIngredient(models.Model):
recipes = models.ForeignKey(Recipe, related_name='amounts')
ingredient = models.ForeignKey(Ingredient, related_name='used_by')
amount = models.FloatField()
unit = models.CharField(max_length=20, choices=(
('kg', 'Kilograms'),
('l', 'Litres'),
('', 'Units'),
))

View File

@ -1,2 +0,0 @@
# Create your tests here.

View File

@ -1,2 +0,0 @@
# Create your views here.

View File

@ -1,9 +0,0 @@
import graphene
import cookbook.ingredients.schema
# print cookbook.ingredients.schema.Query._meta.graphql_type.get_fields()['allIngredients'].args
class Query(cookbook.ingredients.schema.Query):
pass
schema = graphene.Schema(query=Query)

View File

@ -1,125 +0,0 @@
"""
Django settings for cookbook project.
Generated by 'django-admin startproject' using Django 1.9.
For more information on this file, see
https://docs.djangoproject.com/en/1.9/topics/settings/
For the full list of settings and their values, see
https://docs.djangoproject.com/en/1.9/ref/settings/
"""
import os
# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
# Quick-start development settings - unsuitable for production
# See https://docs.djangoproject.com/en/1.9/howto/deployment/checklist/
# SECURITY WARNING: keep the secret key used in production secret!
SECRET_KEY = '_$=$%eqxk$8ss4n7mtgarw^5$8^d5+c83!vwatr@i_81myb=e4'
# SECURITY WARNING: don't run with debug turned on in production!
DEBUG = True
ALLOWED_HOSTS = []
# Application definition
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'django_graphiql',
'cookbook.ingredients.apps.IngredientsConfig',
'cookbook.recipes.apps.RecipesConfig',
]
MIDDLEWARE_CLASSES = [
'django.middleware.security.SecurityMiddleware',
'django.contrib.sessions.middleware.SessionMiddleware',
'django.middleware.common.CommonMiddleware',
'django.middleware.csrf.CsrfViewMiddleware',
'django.contrib.auth.middleware.AuthenticationMiddleware',
'django.contrib.auth.middleware.SessionAuthenticationMiddleware',
'django.contrib.messages.middleware.MessageMiddleware',
'django.middleware.clickjacking.XFrameOptionsMiddleware',
]
ROOT_URLCONF = 'cookbook.urls'
TEMPLATES = [
{
'BACKEND': 'django.template.backends.django.DjangoTemplates',
'DIRS': [],
'APP_DIRS': True,
'OPTIONS': {
'context_processors': [
'django.template.context_processors.debug',
'django.template.context_processors.request',
'django.contrib.auth.context_processors.auth',
'django.contrib.messages.context_processors.messages',
],
},
},
]
WSGI_APPLICATION = 'cookbook.wsgi.application'
# Database
# https://docs.djangoproject.com/en/1.9/ref/settings/#databases
DATABASES = {
'default': {
'ENGINE': 'django.db.backends.sqlite3',
'NAME': os.path.join(BASE_DIR, 'db.sqlite3'),
}
}
# Password validation
# https://docs.djangoproject.com/en/1.9/ref/settings/#auth-password-validators
AUTH_PASSWORD_VALIDATORS = [
{
'NAME': 'django.contrib.auth.password_validation.UserAttributeSimilarityValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.MinimumLengthValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.CommonPasswordValidator',
},
{
'NAME': 'django.contrib.auth.password_validation.NumericPasswordValidator',
},
]
# Internationalization
# https://docs.djangoproject.com/en/1.9/topics/i18n/
LANGUAGE_CODE = 'en-us'
TIME_ZONE = 'UTC'
USE_I18N = True
USE_L10N = True
USE_TZ = True
# Static files (CSS, JavaScript, Images)
# https://docs.djangoproject.com/en/1.9/howto/static-files/
STATIC_URL = '/static/'

View File

@ -1,12 +0,0 @@
from django.conf.urls import include, url
from django.contrib import admin
from django.views.decorators.csrf import csrf_exempt
from cookbook.schema import schema
from graphene_django.views import GraphQLView
urlpatterns = [
url(r'^admin/', admin.site.urls),
url(r'^graphql', csrf_exempt(GraphQLView.as_view(schema=schema))),
url(r'^graphiql', include('django_graphiql.urls')),
]

View File

@ -1,16 +0,0 @@
"""
WSGI config for cookbook project.
It exposes the WSGI callable as a module-level variable named ``application``.
For more information on this file, see
https://docs.djangoproject.com/en/1.9/howto/deployment/wsgi/
"""
import os
from django.core.wsgi import get_wsgi_application
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "cookbook.settings")
application = get_wsgi_application()

View File

@ -1,10 +0,0 @@
#!/usr/bin/env python
import os
import sys
if __name__ == "__main__":
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "cookbook.settings")
from django.core.management import execute_from_command_line
execute_from_command_line(sys.argv)

View File

@ -1,5 +0,0 @@
graphene[django]
django_graphiql
graphql-core
django==1.9
django-filter==0.11.0

View File

@ -1,114 +0,0 @@
from .models import Character, Faction, Ship
def initialize():
human = Character(
name='Human'
)
human.save()
droid = Character(
name='Droid'
)
droid.save()
rebels = Faction(
id='1',
name='Alliance to Restore the Republic',
hero=human
)
rebels.save()
empire = Faction(
id='2',
name='Galactic Empire',
hero=droid
)
empire.save()
xwing = Ship(
id='1',
name='X-Wing',
faction=rebels,
)
xwing.save()
ywing = Ship(
id='2',
name='Y-Wing',
faction=rebels,
)
ywing.save()
awing = Ship(
id='3',
name='A-Wing',
faction=rebels,
)
awing.save()
# Yeah, technically it's Corellian. But it flew in the service of the rebels,
# so for the purposes of this demo it's a rebel ship.
falcon = Ship(
id='4',
name='Millenium Falcon',
faction=rebels,
)
falcon.save()
homeOne = Ship(
id='5',
name='Home One',
faction=rebels,
)
homeOne.save()
tieFighter = Ship(
id='6',
name='TIE Fighter',
faction=empire,
)
tieFighter.save()
tieInterceptor = Ship(
id='7',
name='TIE Interceptor',
faction=empire,
)
tieInterceptor.save()
executor = Ship(
id='8',
name='Executor',
faction=empire,
)
executor.save()
def create_ship(ship_name, faction_id):
new_ship = Ship(
name=ship_name,
faction_id=faction_id
)
new_ship.save()
return new_ship
def get_ship(_id):
return Ship.objects.get(id=_id)
def get_ships():
return Ship.objects.all()
def get_faction(_id):
return Faction.objects.get(id=_id)
def get_rebels():
return get_faction(1)
def get_empire():
return get_faction(2)

View File

@ -1,26 +0,0 @@
from __future__ import absolute_import
from django.db import models
class Character(models.Model):
name = models.CharField(max_length=50)
def __str__(self):
return self.name
class Faction(models.Model):
name = models.CharField(max_length=50)
hero = models.ForeignKey(Character)
def __str__(self):
return self.name
class Ship(models.Model):
name = models.CharField(max_length=50)
faction = models.ForeignKey(Faction, related_name='ships')
def __str__(self):
return self.name

View File

@ -1,87 +0,0 @@
import graphene
from graphene import relay, resolve_only_args, Schema
from graphene_django import DjangoObjectType
from .data import (create_ship, get_empire, get_faction, get_rebels, get_ship,
get_ships)
from .models import (
Character as CharacterModel,
Faction as FactionModel,
Ship as ShipModel
)
class Ship(DjangoObjectType):
class Meta:
model = ShipModel
interfaces = (relay.Node, )
@classmethod
def get_node(cls, id, context, info):
node = get_ship(id)
print(node)
return node
class Character(DjangoObjectType):
class Meta:
model = CharacterModel
class Faction(DjangoObjectType):
class Meta:
model = FactionModel
interfaces = (relay.Node, )
@classmethod
def get_node(cls, id, context, info):
return get_faction(id)
class IntroduceShip(relay.ClientIDMutation):
class Input:
ship_name = graphene.String(required=True)
faction_id = graphene.String(required=True)
ship = graphene.Field(Ship)
faction = graphene.Field(Faction)
@classmethod
def mutate_and_get_payload(cls, input, context, info):
ship_name = input.get('shipName')
faction_id = input.get('factionId')
ship = create_ship(ship_name, faction_id)
faction = get_faction(faction_id)
return IntroduceShip(ship=ship, faction=faction)
class Query(graphene.ObjectType):
rebels = graphene.Field(Faction)
empire = graphene.Field(Faction)
node = relay.Node.Field()
ships = relay.ConnectionField(Ship, description='All the ships.')
@resolve_only_args
def resolve_ships(self):
return get_ships()
@resolve_only_args
def resolve_rebels(self):
return get_rebels()
@resolve_only_args
def resolve_empire(self):
return get_empire()
class Mutation(graphene.ObjectType):
introduce_ship = IntroduceShip.Field()
# We register the Character Model because if not would be
# inaccessible for the schema
schema = Schema(query=Query, mutation=Mutation, types=[Ship, Character])

View File

@ -1,47 +0,0 @@
import pytest
from ..data import initialize
from ..schema import schema
pytestmark = pytest.mark.django_db
def test_correct_fetch_first_ship_rebels():
initialize()
query = '''
query RebelsShipsQuery {
rebels {
name,
hero {
name
}
ships(first: 1) {
edges {
node {
name
}
}
}
}
}
'''
expected = {
'rebels': {
'name': 'Alliance to Restore the Republic',
'hero': {
'name': 'Human'
},
'ships': {
'edges': [
{
'node': {
'name': 'X-Wing'
}
}
]
}
}
}
result = schema.execute(query)
assert not result.errors
assert result.data == expected

View File

@ -1,79 +0,0 @@
import pytest
from ..data import initialize
from ..schema import schema
pytestmark = pytest.mark.django_db
def test_mutations():
initialize()
query = '''
mutation MyMutation {
introduceShip(input:{clientMutationId:"abc", shipName: "Peter", factionId: "1"}) {
ship {
id
name
}
faction {
name
ships {
edges {
node {
id
name
}
}
}
}
}
}
'''
expected = {
'introduceShip': {
'ship': {
'id': 'U2hpcDo5',
'name': 'Peter'
},
'faction': {
'name': 'Alliance to Restore the Republic',
'ships': {
'edges': [{
'node': {
'id': 'U2hpcDox',
'name': 'X-Wing'
}
}, {
'node': {
'id': 'U2hpcDoy',
'name': 'Y-Wing'
}
}, {
'node': {
'id': 'U2hpcDoz',
'name': 'A-Wing'
}
}, {
'node': {
'id': 'U2hpcDo0',
'name': 'Millenium Falcon'
}
}, {
'node': {
'id': 'U2hpcDo1',
'name': 'Home One'
}
}, {
'node': {
'id': 'U2hpcDo5',
'name': 'Peter'
}
}]
},
}
}
}
result = schema.execute(query)
assert not result.errors
assert result.data == expected

View File

@ -1,117 +0,0 @@
import pytest
from ..data import initialize
from ..schema import schema
pytestmark = pytest.mark.django_db
def test_correctly_fetches_id_name_rebels():
initialize()
query = '''
query RebelsQuery {
rebels {
id
name
}
}
'''
expected = {
'rebels': {
'id': 'RmFjdGlvbjox',
'name': 'Alliance to Restore the Republic'
}
}
result = schema.execute(query)
assert not result.errors
assert result.data == expected
def test_correctly_refetches_rebels():
initialize()
query = '''
query RebelsRefetchQuery {
node(id: "RmFjdGlvbjox") {
id
... on Faction {
name
}
}
}
'''
expected = {
'node': {
'id': 'RmFjdGlvbjox',
'name': 'Alliance to Restore the Republic'
}
}
result = schema.execute(query)
assert not result.errors
assert result.data == expected
def test_correctly_fetches_id_name_empire():
initialize()
query = '''
query EmpireQuery {
empire {
id
name
}
}
'''
expected = {
'empire': {
'id': 'RmFjdGlvbjoy',
'name': 'Galactic Empire'
}
}
result = schema.execute(query)
assert not result.errors
assert result.data == expected
def test_correctly_refetches_empire():
initialize()
query = '''
query EmpireRefetchQuery {
node(id: "RmFjdGlvbjoy") {
id
... on Faction {
name
}
}
}
'''
expected = {
'node': {
'id': 'RmFjdGlvbjoy',
'name': 'Galactic Empire'
}
}
result = schema.execute(query)
assert not result.errors
assert result.data == expected
def test_correctly_refetches_xwing():
initialize()
query = '''
query XWingRefetchQuery {
node(id: "U2hpcDox") {
id
... on Ship {
name
}
}
}
'''
expected = {
'node': {
'id': 'U2hpcDox',
'name': 'X-Wing'
}
}
result = schema.execute(query)
assert not result.errors
assert result.data == expected

View File

@ -1,9 +0,0 @@
from .types import (
DjangoObjectType,
)
from .fields import (
DjangoConnectionField,
)
__all__ = ['DjangoObjectType',
'DjangoConnectionField']

View File

@ -1,24 +0,0 @@
from django.db import models
class MissingType(object):
pass
try:
UUIDField = models.UUIDField
except AttributeError:
# Improved compatibility for Django 1.6
UUIDField = MissingType
try:
from django.db.models.related import RelatedObject
except:
# Improved compatibility for Django 1.6
RelatedObject = MissingType
try:
# Postgres fields are only available in Django 1.8+
from django.contrib.postgres.fields import ArrayField, HStoreField, JSONField, RangeField
except ImportError:
ArrayField, HStoreField, JSONField, RangeField = (MissingType, ) * 4

View File

@ -1,189 +0,0 @@
from django.db import models
from django.utils.encoding import force_text
from graphene import Enum, List, ID, Boolean, Float, Int, String, Field, NonNull, Field, Dynamic
from graphene.types.json import JSONString
from graphene.types.datetime import DateTime
from graphene.utils.str_converters import to_const
from graphene.relay import is_node
from .compat import (ArrayField, HStoreField, JSONField, RangeField,
RelatedObject, UUIDField)
from .utils import get_related_model, import_single_dispatch
from .fields import get_connection_field
singledispatch = import_single_dispatch()
def convert_choice_name(name):
return to_const(force_text(name))
def get_choices(choices):
for value, help_text in choices:
if isinstance(help_text, (tuple, list)):
for choice in get_choices(help_text):
yield choice
else:
name = convert_choice_name(help_text)
description = help_text
yield name, value, description
def convert_django_field_with_choices(field, registry=None):
choices = getattr(field, 'choices', None)
if choices:
meta = field.model._meta
name = '{}{}'.format(meta.object_name, field.name.capitalize())
choices = list(get_choices(choices))
named_choices = [(c[0], c[1]) for c in choices]
named_choices_descriptions = {c[0]:c[2] for c in choices}
class EnumWithDescriptionsType(object):
@property
def description(self):
return named_choices_descriptions[self.name]
enum = Enum(name, list(named_choices), type=EnumWithDescriptionsType)
return enum(description=field.help_text)
return convert_django_field(field, registry)
@singledispatch
def convert_django_field(field, registry=None):
raise Exception(
"Don't know how to convert the Django field %s (%s)" %
(field, field.__class__))
@convert_django_field.register(models.CharField)
@convert_django_field.register(models.TextField)
@convert_django_field.register(models.EmailField)
@convert_django_field.register(models.SlugField)
@convert_django_field.register(models.URLField)
@convert_django_field.register(models.GenericIPAddressField)
@convert_django_field.register(models.FileField)
@convert_django_field.register(UUIDField)
def convert_field_to_string(field, registry=None):
return String(description=field.help_text)
@convert_django_field.register(models.AutoField)
def convert_field_to_id(field, registry=None):
return ID(description=field.help_text)
@convert_django_field.register(models.PositiveIntegerField)
@convert_django_field.register(models.PositiveSmallIntegerField)
@convert_django_field.register(models.SmallIntegerField)
@convert_django_field.register(models.BigIntegerField)
@convert_django_field.register(models.IntegerField)
def convert_field_to_int(field, registry=None):
return Int(description=field.help_text)
@convert_django_field.register(models.BooleanField)
def convert_field_to_boolean(field, registry=None):
return NonNull(Boolean, description=field.help_text)
@convert_django_field.register(models.NullBooleanField)
def convert_field_to_nullboolean(field, registry=None):
return Boolean(description=field.help_text)
@convert_django_field.register(models.DecimalField)
@convert_django_field.register(models.FloatField)
def convert_field_to_float(field, registry=None):
return Float(description=field.help_text)
@convert_django_field.register(models.DateField)
def convert_date_to_string(field, registry=None):
return DateTime(description=field.help_text)
@convert_django_field.register(models.OneToOneRel)
def convert_onetoone_field_to_djangomodel(field, registry=None):
model = get_related_model(field)
def dynamic_type():
_type = registry.get_type_for_model(model)
if not _type:
return
return Field(_type)
return Dynamic(dynamic_type)
@convert_django_field.register(models.ManyToManyField)
@convert_django_field.register(models.ManyToManyRel)
@convert_django_field.register(models.ManyToOneRel)
def convert_field_to_list_or_connection(field, registry=None):
model = get_related_model(field)
def dynamic_type():
_type = registry.get_type_for_model(model)
if not _type:
return
if is_node(_type):
return get_connection_field(_type)
return Field(List(_type))
return Dynamic(dynamic_type)
# For Django 1.6
@convert_django_field.register(RelatedObject)
def convert_relatedfield_to_djangomodel(field, registry=None):
model = field.model
def dynamic_type():
_type = registry.get_type_for_model(model)
if not _type:
return
if is_node(_type):
return get_connection_field(_type)
return Field(List(_type))
return Dynamic(dynamic_type)
@convert_django_field.register(models.OneToOneField)
@convert_django_field.register(models.ForeignKey)
def convert_field_to_djangomodel(field, registry=None):
model = get_related_model(field)
def dynamic_type():
_type = registry.get_type_for_model(model)
if not _type:
return
return Field(_type, description=field.help_text)
return Dynamic(dynamic_type)
@convert_django_field.register(ArrayField)
def convert_postgres_array_to_list(field, registry=None):
base_type = convert_django_field(field.base_field)
if not isinstance(base_type, (List, NonNull)):
base_type = type(base_type)
return List(base_type, description=field.help_text)
@convert_django_field.register(HStoreField)
@convert_django_field.register(JSONField)
def convert_posgres_field_to_string(field, registry=None):
return JSONString(description=field.help_text)
@convert_django_field.register(RangeField)
def convert_posgres_range_to_string(field, registry=None):
inner_type = convert_django_field(field.base_field)
if not isinstance(inner_type, (List, NonNull)):
inner_type = type(inner_type)
return List(inner_type, description=field.help_text)

View File

@ -1,4 +0,0 @@
from .middleware import DjangoDebugMiddleware
from .types import DjangoDebug
__all__ = ['DjangoDebugMiddleware', 'DjangoDebug']

View File

@ -1,56 +0,0 @@
from promise import Promise
from django.db import connections
from .sql.tracking import unwrap_cursor, wrap_cursor
from .types import DjangoDebug
class DjangoDebugContext(object):
def __init__(self):
self.debug_promise = None
self.promises = []
self.enable_instrumentation()
self.object = DjangoDebug(sql=[])
def get_debug_promise(self):
if not self.debug_promise:
self.debug_promise = Promise.all(self.promises)
return self.debug_promise.then(self.on_resolve_all_promises)
def on_resolve_all_promises(self, values):
self.disable_instrumentation()
return self.object
def add_promise(self, promise):
if self.debug_promise and not self.debug_promise.is_fulfilled:
self.promises.append(promise)
def enable_instrumentation(self):
# This is thread-safe because database connections are thread-local.
for connection in connections.all():
wrap_cursor(connection, self)
def disable_instrumentation(self):
for connection in connections.all():
unwrap_cursor(connection)
class DjangoDebugMiddleware(object):
def resolve(self, next, root, args, context, info):
django_debug = getattr(context, 'django_debug', None)
if not django_debug:
if context is None:
raise Exception('DjangoDebug cannot be executed in None contexts')
try:
context.django_debug = DjangoDebugContext()
except Exception:
raise Exception('DjangoDebug need the context to be writable, context received: {}.'.format(
context.__class__.__name__
))
if info.schema.get_type('DjangoDebug') == info.return_type:
return context.django_debug.get_debug_promise()
promise = next(root, args, context, info)
context.django_debug.add_promise(promise)
return promise

View File

@ -1,169 +0,0 @@
# Code obtained from django-debug-toolbar sql panel tracking
from __future__ import absolute_import, unicode_literals
import json
from threading import local
from time import time
from django.utils import six
from django.utils.encoding import force_text
from .types import DjangoDebugSQL
class SQLQueryTriggered(Exception):
"""Thrown when template panel triggers a query"""
class ThreadLocalState(local):
def __init__(self):
self.enabled = True
@property
def Wrapper(self):
if self.enabled:
return NormalCursorWrapper
return ExceptionCursorWrapper
def recording(self, v):
self.enabled = v
state = ThreadLocalState()
recording = state.recording # export function
def wrap_cursor(connection, panel):
if not hasattr(connection, '_graphene_cursor'):
connection._graphene_cursor = connection.cursor
def cursor():
return state.Wrapper(connection._graphene_cursor(), connection, panel)
connection.cursor = cursor
return cursor
def unwrap_cursor(connection):
if hasattr(connection, '_graphene_cursor'):
previous_cursor = connection._graphene_cursor
connection.cursor = previous_cursor
del connection._graphene_cursor
class ExceptionCursorWrapper(object):
"""
Wraps a cursor and raises an exception on any operation.
Used in Templates panel.
"""
def __init__(self, cursor, db, logger):
pass
def __getattr__(self, attr):
raise SQLQueryTriggered()
class NormalCursorWrapper(object):
"""
Wraps a cursor and logs queries.
"""
def __init__(self, cursor, db, logger):
self.cursor = cursor
# Instance of a BaseDatabaseWrapper subclass
self.db = db
# logger must implement a ``record`` method
self.logger = logger
def _quote_expr(self, element):
if isinstance(element, six.string_types):
return "'%s'" % force_text(element).replace("'", "''")
else:
return repr(element)
def _quote_params(self, params):
if not params:
return params
if isinstance(params, dict):
return dict((key, self._quote_expr(value))
for key, value in params.items())
return list(map(self._quote_expr, params))
def _decode(self, param):
try:
return force_text(param, strings_only=True)
except UnicodeDecodeError:
return '(encoded string)'
def _record(self, method, sql, params):
start_time = time()
try:
return method(sql, params)
finally:
stop_time = time()
duration = (stop_time - start_time)
_params = ''
try:
_params = json.dumps(list(map(self._decode, params)))
except Exception:
pass # object not JSON serializable
alias = getattr(self.db, 'alias', 'default')
conn = self.db.connection
vendor = getattr(conn, 'vendor', 'unknown')
params = {
'vendor': vendor,
'alias': alias,
'sql': self.db.ops.last_executed_query(
self.cursor, sql, self._quote_params(params)),
'duration': duration,
'raw_sql': sql,
'params': _params,
'start_time': start_time,
'stop_time': stop_time,
'is_slow': duration > 10,
'is_select': sql.lower().strip().startswith('select'),
}
if vendor == 'postgresql':
# If an erroneous query was ran on the connection, it might
# be in a state where checking isolation_level raises an
# exception.
try:
iso_level = conn.isolation_level
except conn.InternalError:
iso_level = 'unknown'
params.update({
'trans_id': self.logger.get_transaction_id(alias),
'trans_status': conn.get_transaction_status(),
'iso_level': iso_level,
'encoding': conn.encoding,
})
_sql = DjangoDebugSQL(**params)
# We keep `sql` to maintain backwards compatibility
self.logger.object.sql.append(_sql)
def callproc(self, procname, params=()):
return self._record(self.cursor.callproc, procname, params)
def execute(self, sql, params=()):
return self._record(self.cursor.execute, sql, params)
def executemany(self, sql, param_list):
return self._record(self.cursor.executemany, sql, param_list)
def __getattr__(self, attr):
return getattr(self.cursor, attr)
def __iter__(self):
return iter(self.cursor)
def __enter__(self):
return self
def __exit__(self, type, value, traceback):
self.close()

View File

@ -1,20 +0,0 @@
from graphene import Boolean, Float, ObjectType, String
class DjangoDebugSQL(ObjectType):
vendor = String()
alias = String()
sql = String()
duration = Float()
raw_sql = String()
params = String()
start_time = Float()
stop_time = Float()
is_slow = Boolean()
is_select = Boolean()
# Postgres
trans_id = String()
trans_status = String()
iso_level = String()
encoding = String()

View File

@ -1,225 +0,0 @@
import pytest
import graphene
from graphene.relay import Node
from graphene_django import DjangoConnectionField, DjangoObjectType
from graphene_django.utils import DJANGO_FILTER_INSTALLED
from ...tests.models import Reporter
from ..middleware import DjangoDebugMiddleware
from ..types import DjangoDebug
class context(object):
pass
# from examples.starwars_django.models import Character
pytestmark = pytest.mark.django_db
def test_should_query_field():
r1 = Reporter(last_name='ABA')
r1.save()
r2 = Reporter(last_name='Griffin')
r2.save()
class ReporterType(DjangoObjectType):
class Meta:
model = Reporter
interfaces = (Node, )
class Query(graphene.ObjectType):
reporter = graphene.Field(ReporterType)
debug = graphene.Field(DjangoDebug, name='__debug')
def resolve_reporter(self, *args, **kwargs):
return Reporter.objects.first()
query = '''
query ReporterQuery {
reporter {
lastName
}
__debug {
sql {
rawSql
}
}
}
'''
expected = {
'reporter': {
'lastName': 'ABA',
},
'__debug': {
'sql': [{
'rawSql': str(Reporter.objects.order_by('pk')[:1].query)
}]
}
}
schema = graphene.Schema(query=Query, middlewares=[DjangoDebugMiddleware()])
result = schema.execute(query, context_value=context())
assert not result.errors
assert result.data == expected
def test_should_query_list():
r1 = Reporter(last_name='ABA')
r1.save()
r2 = Reporter(last_name='Griffin')
r2.save()
class ReporterType(DjangoObjectType):
class Meta:
model = Reporter
interfaces = (Node, )
class Query(graphene.ObjectType):
all_reporters = graphene.List(ReporterType)
debug = graphene.Field(DjangoDebug, name='__debug')
def resolve_all_reporters(self, *args, **kwargs):
return Reporter.objects.all()
query = '''
query ReporterQuery {
allReporters {
lastName
}
__debug {
sql {
rawSql
}
}
}
'''
expected = {
'allReporters': [{
'lastName': 'ABA',
}, {
'lastName': 'Griffin',
}],
'__debug': {
'sql': [{
'rawSql': str(Reporter.objects.all().query)
}]
}
}
schema = graphene.Schema(query=Query, middlewares=[DjangoDebugMiddleware()])
result = schema.execute(query, context_value=context())
assert not result.errors
assert result.data == expected
def test_should_query_connection():
r1 = Reporter(last_name='ABA')
r1.save()
r2 = Reporter(last_name='Griffin')
r2.save()
class ReporterType(DjangoObjectType):
class Meta:
model = Reporter
interfaces = (Node, )
class Query(graphene.ObjectType):
all_reporters = DjangoConnectionField(ReporterType)
debug = graphene.Field(DjangoDebug, name='__debug')
def resolve_all_reporters(self, *args, **kwargs):
return Reporter.objects.all()
query = '''
query ReporterQuery {
allReporters(first:1) {
edges {
node {
lastName
}
}
}
__debug {
sql {
rawSql
}
}
}
'''
expected = {
'allReporters': {
'edges': [{
'node': {
'lastName': 'ABA',
}
}]
},
}
schema = graphene.Schema(query=Query, middlewares=[DjangoDebugMiddleware()])
result = schema.execute(query, context_value=context())
assert not result.errors
assert result.data['allReporters'] == expected['allReporters']
assert 'COUNT' in result.data['__debug']['sql'][0]['rawSql']
query = str(Reporter.objects.all()[:1].query)
assert result.data['__debug']['sql'][1]['rawSql'] == query
@pytest.mark.skipif(not DJANGO_FILTER_INSTALLED,
reason="requires django-filter")
def test_should_query_connectionfilter():
from ...filter import DjangoFilterConnectionField
r1 = Reporter(last_name='ABA')
r1.save()
r2 = Reporter(last_name='Griffin')
r2.save()
class ReporterType(DjangoObjectType):
class Meta:
model = Reporter
interfaces = (Node, )
class Query(graphene.ObjectType):
all_reporters = DjangoFilterConnectionField(ReporterType)
s = graphene.String(resolver=lambda *_: "S")
debug = graphene.Field(DjangoDebug, name='__debug')
def resolve_all_reporters(self, *args, **kwargs):
return Reporter.objects.all()
query = '''
query ReporterQuery {
allReporters(first:1) {
edges {
node {
lastName
}
}
}
__debug {
sql {
rawSql
}
}
}
'''
expected = {
'allReporters': {
'edges': [{
'node': {
'lastName': 'ABA',
}
}]
},
}
schema = graphene.Schema(query=Query, middlewares=[DjangoDebugMiddleware()])
result = schema.execute(query, context_value=context())
assert not result.errors
assert result.data['allReporters'] == expected['allReporters']
assert 'COUNT' in result.data['__debug']['sql'][0]['rawSql']
query = str(Reporter.objects.all()[:1].query)
assert result.data['__debug']['sql'][1]['rawSql'] == query

View File

@ -1,6 +0,0 @@
from graphene import ObjectType, List
from .sql.types import DjangoDebugSQL
class DjangoDebug(ObjectType):
sql = List(DjangoDebugSQL)

View File

@ -1,57 +0,0 @@
from functools import partial
from django.db.models.query import QuerySet
from graphene.relay import ConnectionField, PageInfo
from graphql_relay.connection.arrayconnection import connection_from_list_slice
from .utils import maybe_queryset, DJANGO_FILTER_INSTALLED
class DjangoConnectionField(ConnectionField):
def __init__(self, *args, **kwargs):
self.on = kwargs.pop('on', False)
return super(DjangoConnectionField, self).__init__(*args, **kwargs)
@property
def model(self):
return self.type._meta.node._meta.model
def get_manager(self):
if self.on:
return getattr(self.model, self.on)
else:
return self.model._default_manager
@staticmethod
def connection_resolver(resolver, connection, default_manager, root, args, context, info):
iterable = resolver(root, args, context, info)
if iterable is None:
iterable = default_manager
iterable = maybe_queryset(iterable)
if isinstance(iterable, QuerySet):
_len = iterable.count()
else:
_len = len(iterable)
connection = connection_from_list_slice(
iterable,
args,
slice_start=0,
list_length=_len,
list_slice_length=_len,
connection_type=connection,
edge_type=connection.Edge,
pageinfo_type=PageInfo,
)
connection.iterable = iterable
connection.length = _len
return connection
def get_resolver(self, parent_resolver):
return partial(self.connection_resolver, parent_resolver, self.type, self.get_manager())
def get_connection_field(*args, **kwargs):
if DJANGO_FILTER_INSTALLED:
from .filter.fields import DjangoFilterConnectionField
return DjangoFilterConnectionField(*args, **kwargs)
return ConnectionField(*args, **kwargs)

View File

@ -1,14 +0,0 @@
import warnings
from ..utils import DJANGO_FILTER_INSTALLED
if not DJANGO_FILTER_INSTALLED:
warnings.warn(
"Use of django filtering requires the django-filter package "
"be installed. You can do so using `pip install django-filter`", ImportWarning
)
else:
from .fields import DjangoFilterConnectionField
from .filterset import GrapheneFilterSet, GlobalIDFilter, GlobalIDMultipleChoiceFilter
__all__ = ['DjangoFilterConnectionField', 'GrapheneFilterSet',
'GlobalIDFilter', 'GlobalIDMultipleChoiceFilter']

View File

@ -1,39 +0,0 @@
from functools import partial
from ..fields import DjangoConnectionField
from .utils import get_filtering_args_from_filterset, get_filterset_class
class DjangoFilterConnectionField(DjangoConnectionField):
def __init__(self, type, fields=None, order_by=None,
extra_filter_meta=None, filterset_class=None,
*args, **kwargs):
self.order_by = order_by or type._meta.filter_order_by
self.fields = fields or type._meta.filter_fields
meta = dict(model=type._meta.model,
fields=self.fields,
order_by=self.order_by)
if extra_filter_meta:
meta.update(extra_filter_meta)
self.filterset_class = get_filterset_class(filterset_class, **meta)
self.filtering_args = get_filtering_args_from_filterset(self.filterset_class, type)
kwargs.setdefault('args', {})
kwargs['args'].update(self.filtering_args)
super(DjangoFilterConnectionField, self).__init__(type, *args, **kwargs)
@staticmethod
def connection_resolver(resolver, connection, default_manager, filterset_class, filtering_args,
root, args, context, info):
filter_kwargs = {k: v for k, v in args.items() if k in filtering_args}
order = args.get('order_by', None)
qs = default_manager.get_queryset()
if order:
qs = qs.order_by(order)
qs = filterset_class(data=filter_kwargs, queryset=qs)
return DjangoConnectionField.connection_resolver(resolver, connection, qs, root, args, context, info)
def get_resolver(self, parent_resolver):
return partial(self.connection_resolver, parent_resolver, self.type, self.get_manager(),
self.filterset_class, self.filtering_args)

View File

@ -1,115 +0,0 @@
import six
from django.conf import settings
from django.db import models
from django.utils.text import capfirst
from django_filters import Filter, MultipleChoiceFilter
from django_filters.filterset import FilterSet, FilterSetMetaclass
from ..forms import GlobalIDFormField, GlobalIDMultipleChoiceField
from graphql_relay.node.node import from_global_id
class GlobalIDFilter(Filter):
field_class = GlobalIDFormField
def filter(self, qs, value):
_type, _id = from_global_id(value)
return super(GlobalIDFilter, self).filter(qs, _id)
class GlobalIDMultipleChoiceFilter(MultipleChoiceFilter):
field_class = GlobalIDMultipleChoiceField
def filter(self, qs, value):
gids = [from_global_id(v)[1] for v in value]
return super(GlobalIDMultipleChoiceFilter, self).filter(qs, gids)
ORDER_BY_FIELD = getattr(settings, 'GRAPHENE_ORDER_BY_FIELD', 'order_by')
GRAPHENE_FILTER_SET_OVERRIDES = {
models.AutoField: {
'filter_class': GlobalIDFilter,
},
models.OneToOneField: {
'filter_class': GlobalIDFilter,
},
models.ForeignKey: {
'filter_class': GlobalIDFilter,
},
models.ManyToManyField: {
'filter_class': GlobalIDMultipleChoiceFilter,
}
}
class GrapheneFilterSetMetaclass(FilterSetMetaclass):
def __new__(cls, name, bases, attrs):
new_class = super(GrapheneFilterSetMetaclass, cls).__new__(cls, name, bases, attrs)
# Customise the filter_overrides for Graphene
for k, v in GRAPHENE_FILTER_SET_OVERRIDES.items():
new_class.filter_overrides.setdefault(k, v)
return new_class
class GrapheneFilterSetMixin(object):
order_by_field = ORDER_BY_FIELD
@classmethod
def filter_for_reverse_field(cls, f, name):
"""Handles retrieving filters for reverse relationships
We override the default implementation so that we can handle
Global IDs (the default implementation expects database
primary keys)
"""
rel = f.field.rel
default = {
'name': name,
'label': capfirst(rel.related_name)
}
if rel.multiple:
# For to-many relationships
return GlobalIDMultipleChoiceFilter(**default)
else:
# For to-one relationships
return GlobalIDFilter(**default)
class GrapheneFilterSet(six.with_metaclass(GrapheneFilterSetMetaclass, GrapheneFilterSetMixin, FilterSet)):
""" Base class for FilterSets used by Graphene
You shouldn't usually need to use this class. The
DjangoFilterConnectionField will wrap FilterSets with this class as
necessary
"""
def setup_filterset(filterset_class):
""" Wrap a provided filterset in Graphene-specific functionality
"""
return type(
'Graphene{}'.format(filterset_class.__name__),
(six.with_metaclass(GrapheneFilterSetMetaclass, GrapheneFilterSetMixin, filterset_class),),
{},
)
def custom_filterset_factory(model, filterset_base_class=GrapheneFilterSet,
**meta):
""" Create a filterset for the given model using the provided meta data
"""
meta.update({
'model': model,
})
meta_class = type(str('Meta'), (object,), meta)
filterset = type(
str('%sFilterSet' % model._meta.object_name),
(filterset_base_class,),
{
'Meta': meta_class
}
)
return filterset

View File

@ -1,31 +0,0 @@
import django_filters
from graphene_django.tests.models import Article, Pet, Reporter
class ArticleFilter(django_filters.FilterSet):
class Meta:
model = Article
fields = {
'headline': ['exact', 'icontains'],
'pub_date': ['gt', 'lt', 'exact'],
'reporter': ['exact'],
}
order_by = True
class ReporterFilter(django_filters.FilterSet):
class Meta:
model = Reporter
fields = ['first_name', 'last_name', 'email', 'pets']
order_by = False
class PetFilter(django_filters.FilterSet):
class Meta:
model = Pet
fields = ['name']
order_by = False

View File

@ -1,339 +0,0 @@
from datetime import datetime
import pytest
from graphene import ObjectType, Schema, Field
from graphene.relay import Node
from graphene_django import DjangoObjectType
from graphene_django.forms import (GlobalIDFormField,
GlobalIDMultipleChoiceField)
from graphene_django.tests.models import Article, Pet, Reporter
from graphene_django.utils import DJANGO_FILTER_INSTALLED
pytestmark = []
if DJANGO_FILTER_INSTALLED:
import django_filters
from graphene_django.filter import (GlobalIDFilter, DjangoFilterConnectionField,
GlobalIDMultipleChoiceFilter)
from graphene_django.filter.tests.filters import ArticleFilter, PetFilter
else:
pytestmark.append(pytest.mark.skipif(True, reason='django_filters not installed'))
pytestmark.append(pytest.mark.django_db)
class ArticleNode(DjangoObjectType):
class Meta:
model = Article
interfaces = (Node, )
class ReporterNode(DjangoObjectType):
class Meta:
model = Reporter
interfaces = (Node, )
class PetNode(DjangoObjectType):
class Meta:
model = Pet
interfaces = (Node, )
# schema = Schema()
def get_args(field):
if isinstance(field.args, list):
return {arg.name: arg for arg in field.args}
else:
return field.args
def assert_arguments(field, *arguments):
ignore = ('after', 'before', 'first', 'last', 'order_by')
args = get_args(field)
actual = [
name
for name in args
if name not in ignore and not name.startswith('_')
]
assert set(arguments) == set(actual), \
'Expected arguments ({}) did not match actual ({})'.format(
arguments,
actual
)
def assert_orderable(field):
args = get_args(field)
assert 'order_by' in args, \
'Field cannot be ordered'
def assert_not_orderable(field):
args = get_args(field)
assert 'order_by' not in args, \
'Field can be ordered'
def test_filter_explicit_filterset_arguments():
field = DjangoFilterConnectionField(ArticleNode, filterset_class=ArticleFilter)
assert_arguments(field,
'headline', 'headline__icontains',
'pub_date', 'pub_date__gt', 'pub_date__lt',
'reporter',
)
def test_filter_shortcut_filterset_arguments_list():
field = DjangoFilterConnectionField(ArticleNode, fields=['pub_date', 'reporter'])
assert_arguments(field,
'pub_date',
'reporter',
)
def test_filter_shortcut_filterset_arguments_dict():
field = DjangoFilterConnectionField(ArticleNode, fields={
'headline': ['exact', 'icontains'],
'reporter': ['exact'],
})
assert_arguments(field,
'headline', 'headline__icontains',
'reporter',
)
def test_filter_explicit_filterset_orderable():
field = DjangoFilterConnectionField(ArticleNode, filterset_class=ArticleFilter)
assert_orderable(field)
def test_filter_shortcut_filterset_orderable_true():
field = DjangoFilterConnectionField(ArticleNode, order_by=True)
assert_orderable(field)
def test_filter_shortcut_filterset_orderable_headline():
field = DjangoFilterConnectionField(ArticleNode, order_by=['headline'])
assert_orderable(field)
def test_filter_explicit_filterset_not_orderable():
field = DjangoFilterConnectionField(PetNode, filterset_class=PetFilter)
assert_not_orderable(field)
def test_filter_shortcut_filterset_extra_meta():
field = DjangoFilterConnectionField(ArticleNode, extra_filter_meta={
'order_by': True
})
assert_orderable(field)
def test_filter_filterset_information_on_meta():
class ReporterFilterNode(DjangoObjectType):
class Meta:
model = Reporter
interfaces = (Node, )
filter_fields = ['first_name', 'articles']
filter_order_by = True
field = DjangoFilterConnectionField(ReporterFilterNode)
assert_arguments(field, 'first_name', 'articles')
assert_orderable(field)
def test_filter_filterset_information_on_meta_related():
class ReporterFilterNode(DjangoObjectType):
class Meta:
model = Reporter
interfaces = (Node, )
filter_fields = ['first_name', 'articles']
filter_order_by = True
class ArticleFilterNode(DjangoObjectType):
class Meta:
model = Article
interfaces = (Node, )
filter_fields = ['headline', 'reporter']
filter_order_by = True
class Query(ObjectType):
all_reporters = DjangoFilterConnectionField(ReporterFilterNode)
all_articles = DjangoFilterConnectionField(ArticleFilterNode)
reporter = Field(ReporterFilterNode)
article = Field(ArticleFilterNode)
schema = Schema(query=Query)
articles_field = ReporterFilterNode._meta.fields['articles'].get_type()
assert_arguments(articles_field, 'headline', 'reporter')
assert_orderable(articles_field)
def test_filter_filterset_related_results():
class ReporterFilterNode(DjangoObjectType):
class Meta:
model = Reporter
interfaces = (Node, )
filter_fields = ['first_name', 'articles']
filter_order_by = True
class ArticleFilterNode(DjangoObjectType):
class Meta:
interfaces = (Node, )
model = Article
filter_fields = ['headline', 'reporter']
filter_order_by = True
class Query(ObjectType):
all_reporters = DjangoFilterConnectionField(ReporterFilterNode)
all_articles = DjangoFilterConnectionField(ArticleFilterNode)
reporter = Field(ReporterFilterNode)
article = Field(ArticleFilterNode)
r1 = Reporter.objects.create(first_name='r1', last_name='r1', email='r1@test.com')
r2 = Reporter.objects.create(first_name='r2', last_name='r2', email='r2@test.com')
Article.objects.create(headline='a1', pub_date=datetime.now(), reporter=r1)
Article.objects.create(headline='a2', pub_date=datetime.now(), reporter=r2)
query = '''
query {
allReporters {
edges {
node {
articles {
edges {
node {
headline
}
}
}
}
}
}
}
'''
schema = Schema(query=Query)
result = schema.execute(query)
assert not result.errors
# We should only get back a single article for each reporter
assert len(result.data['allReporters']['edges'][0]['node']['articles']['edges']) == 1
assert len(result.data['allReporters']['edges'][1]['node']['articles']['edges']) == 1
def test_global_id_field_implicit():
field = DjangoFilterConnectionField(ArticleNode, fields=['id'])
filterset_class = field.filterset_class
id_filter = filterset_class.base_filters['id']
assert isinstance(id_filter, GlobalIDFilter)
assert id_filter.field_class == GlobalIDFormField
def test_global_id_field_explicit():
class ArticleIdFilter(django_filters.FilterSet):
class Meta:
model = Article
fields = ['id']
field = DjangoFilterConnectionField(ArticleNode, filterset_class=ArticleIdFilter)
filterset_class = field.filterset_class
id_filter = filterset_class.base_filters['id']
assert isinstance(id_filter, GlobalIDFilter)
assert id_filter.field_class == GlobalIDFormField
def test_global_id_field_relation():
field = DjangoFilterConnectionField(ArticleNode, fields=['reporter'])
filterset_class = field.filterset_class
id_filter = filterset_class.base_filters['reporter']
assert isinstance(id_filter, GlobalIDFilter)
assert id_filter.field_class == GlobalIDFormField
def test_global_id_multiple_field_implicit():
field = DjangoFilterConnectionField(ReporterNode, fields=['pets'])
filterset_class = field.filterset_class
multiple_filter = filterset_class.base_filters['pets']
assert isinstance(multiple_filter, GlobalIDMultipleChoiceFilter)
assert multiple_filter.field_class == GlobalIDMultipleChoiceField
def test_global_id_multiple_field_explicit():
class ReporterPetsFilter(django_filters.FilterSet):
class Meta:
model = Reporter
fields = ['pets']
field = DjangoFilterConnectionField(ReporterNode, filterset_class=ReporterPetsFilter)
filterset_class = field.filterset_class
multiple_filter = filterset_class.base_filters['pets']
assert isinstance(multiple_filter, GlobalIDMultipleChoiceFilter)
assert multiple_filter.field_class == GlobalIDMultipleChoiceField
def test_global_id_multiple_field_implicit_reverse():
field = DjangoFilterConnectionField(ReporterNode, fields=['articles'])
filterset_class = field.filterset_class
multiple_filter = filterset_class.base_filters['articles']
assert isinstance(multiple_filter, GlobalIDMultipleChoiceFilter)
assert multiple_filter.field_class == GlobalIDMultipleChoiceField
def test_global_id_multiple_field_explicit_reverse():
class ReporterPetsFilter(django_filters.FilterSet):
class Meta:
model = Reporter
fields = ['articles']
field = DjangoFilterConnectionField(ReporterNode, filterset_class=ReporterPetsFilter)
filterset_class = field.filterset_class
multiple_filter = filterset_class.base_filters['articles']
assert isinstance(multiple_filter, GlobalIDMultipleChoiceFilter)
assert multiple_filter.field_class == GlobalIDMultipleChoiceField
def test_filter_filterset_related_results():
class ReporterFilterNode(DjangoObjectType):
class Meta:
model = Reporter
interfaces = (Node, )
filter_fields = {
'first_name': ['icontains']
}
class Query(ObjectType):
all_reporters = DjangoFilterConnectionField(ReporterFilterNode)
r1 = Reporter.objects.create(first_name='A test user', last_name='Last Name', email='test1@test.com')
r2 = Reporter.objects.create(first_name='Other test user', last_name='Other Last Name', email='test2@test.com')
r3 = Reporter.objects.create(first_name='Random', last_name='RandomLast', email='random@test.com')
query = '''
query {
allReporters(firstName_Icontains: "test") {
edges {
node {
id
}
}
}
}
'''
schema = Schema(query=Query)
result = schema.execute(query)
assert not result.errors
# We should only get two reporters
assert len(result.data['allReporters']['edges']) == 2

View File

@ -1,31 +0,0 @@
import six
from graphene import Argument, String
from .filterset import custom_filterset_factory, setup_filterset
def get_filtering_args_from_filterset(filterset_class, type):
""" Inspect a FilterSet and produce the arguments to pass to
a Graphene Field. These arguments will be available to
filter against in the GraphQL
"""
from ..form_converter import convert_form_field
args = {}
for name, filter_field in six.iteritems(filterset_class.base_filters):
field_type = convert_form_field(filter_field.field)
args[name] = field_type
# Also add the 'order_by' field
if filterset_class._meta.order_by:
args[filterset_class.order_by_field] = String()
return args
def get_filterset_class(filterset_class, **meta):
"""Get the class to be used as the FilterSet"""
if filterset_class:
# If were given a FilterSet class, then set it up and
# return it
return setup_filterset(filterset_class)
return custom_filterset_factory(**meta)

View File

@ -1,70 +0,0 @@
from django import forms
from django.forms.fields import BaseTemporalField
from graphene import ID, Boolean, Float, Int, String, List
from .forms import GlobalIDFormField, GlobalIDMultipleChoiceField
from .utils import import_single_dispatch
singledispatch = import_single_dispatch()
try:
UUIDField = forms.UUIDField
except AttributeError:
class UUIDField(object):
pass
@singledispatch
def convert_form_field(field):
raise Exception(
"Don't know how to convert the Django form field %s (%s) "
"to Graphene type" %
(field, field.__class__)
)
@convert_form_field.register(BaseTemporalField)
@convert_form_field.register(forms.CharField)
@convert_form_field.register(forms.EmailField)
@convert_form_field.register(forms.SlugField)
@convert_form_field.register(forms.URLField)
@convert_form_field.register(forms.ChoiceField)
@convert_form_field.register(forms.RegexField)
@convert_form_field.register(forms.Field)
@convert_form_field.register(UUIDField)
def convert_form_field_to_string(field):
return String(description=field.help_text)
@convert_form_field.register(forms.IntegerField)
@convert_form_field.register(forms.NumberInput)
def convert_form_field_to_int(field):
return Int(description=field.help_text)
@convert_form_field.register(forms.BooleanField)
def convert_form_field_to_boolean(field):
return Boolean(description=field.help_text, required=True)
@convert_form_field.register(forms.NullBooleanField)
def convert_form_field_to_nullboolean(field):
return Boolean(description=field.help_text)
@convert_form_field.register(forms.DecimalField)
@convert_form_field.register(forms.FloatField)
def convert_form_field_to_float(field):
return Float(description=field.help_text)
@convert_form_field.register(forms.ModelMultipleChoiceField)
@convert_form_field.register(GlobalIDMultipleChoiceField)
def convert_form_field_to_list(field):
return List(ID)
@convert_form_field.register(forms.ModelChoiceField)
@convert_form_field.register(GlobalIDFormField)
def convert_form_field_to_id(field):
return ID()

View File

@ -1,42 +0,0 @@
import binascii
from django.core.exceptions import ValidationError
from django.forms import CharField, Field, MultipleChoiceField
from django.utils.translation import ugettext_lazy as _
from graphql_relay import from_global_id
class GlobalIDFormField(Field):
default_error_messages = {
'invalid': _('Invalid ID specified.'),
}
def clean(self, value):
if not value and not self.required:
return None
try:
_type, _id = from_global_id(value)
except (TypeError, ValueError, UnicodeDecodeError, binascii.Error):
raise ValidationError(self.error_messages['invalid'])
try:
CharField().clean(_id)
CharField().clean(_type)
except ValidationError:
raise ValidationError(self.error_messages['invalid'])
return value
class GlobalIDMultipleChoiceField(MultipleChoiceField):
default_error_messages = {
'invalid_choice': _('One of the specified IDs was invalid (%(value)s).'),
'invalid_list': _('Enter a list of values.'),
}
def valid_value(self, value):
# Clean will raise a validation error if there is a problem
GlobalIDFormField().clean(value)
return True

View File

@ -1,72 +0,0 @@
import importlib
import json
from distutils.version import StrictVersion
from optparse import make_option
from django import get_version as get_django_version
from django.core.management.base import BaseCommand, CommandError
LT_DJANGO_1_8 = StrictVersion(get_django_version()) < StrictVersion('1.8')
if LT_DJANGO_1_8:
class CommandArguments(BaseCommand):
option_list = BaseCommand.option_list + (
make_option(
'--schema',
type=str,
dest='schema',
default='',
help='Django app containing schema to dump, e.g. myproject.core.schema',
),
make_option(
'--out',
type=str,
dest='out',
default='',
help='Output file (default: schema.json)'
),
)
else:
class CommandArguments(BaseCommand):
def add_arguments(self, parser):
from django.conf import settings
parser.add_argument(
'--schema',
type=str,
dest='schema',
default=getattr(settings, 'GRAPHENE_SCHEMA', ''),
help='Django app containing schema to dump, e.g. myproject.core.schema')
parser.add_argument(
'--out',
type=str,
dest='out',
default=getattr(settings, 'GRAPHENE_SCHEMA_OUTPUT', 'schema.json'),
help='Output file (default: schema.json)')
class Command(CommandArguments):
help = 'Dump Graphene schema JSON to file'
can_import_settings = True
def save_file(self, out, schema_dict):
with open(out, 'w') as outfile:
json.dump(schema_dict, outfile)
def handle(self, *args, **options):
from django.conf import settings
schema = options.get('schema') or getattr(settings, 'GRAPHENE_SCHEMA', '')
out = options.get('out') or getattr(settings, 'GRAPHENE_SCHEMA_OUTPUT', 'schema.json')
if schema == '':
raise CommandError('Specify schema on GRAPHENE_SCHEMA setting or by using --schema')
i = importlib.import_module(schema)
schema_dict = {'data': i.schema.introspect()}
self.save_file(out, schema_dict)
style = getattr(self, 'style', None)
SUCCESS = getattr(style, 'SUCCESS', lambda x: x)
self.stdout.write(SUCCESS('Successfully dumped GraphQL schema to %s' % out))

View File

@ -1,29 +0,0 @@
class Registry(object):
def __init__(self):
self._registry = {}
self._registry_models = {}
def register(self, cls):
from .types import DjangoObjectType
assert issubclass(cls, DjangoObjectType), 'Only DjangoObjectTypes can be registered, received "{}"'.format(cls.__name__)
assert cls._meta.registry == self, 'Registry for a Model have to match.'
# assert self.get_type_for_model(cls._meta.model) == cls, 'Multiple DjangoObjectTypes registered for "{}"'.format(cls._meta.model)
self._registry[cls._meta.model] = cls
def get_type_for_model(self, model):
return self._registry.get(model)
registry = None
def get_global_registry():
global registry
if not registry:
registry = Registry()
return registry
def reset_global_registry():
global registry
registry = None

View File

@ -1,52 +0,0 @@
from __future__ import absolute_import
from django.db import models
from django.utils.translation import ugettext_lazy as _
CHOICES = (
(1, 'this'),
(2, _('that'))
)
class Pet(models.Model):
name = models.CharField(max_length=30)
class FilmDetails(models.Model):
location = models.CharField(max_length=30)
film = models.OneToOneField('Film', related_name='details')
class Film(models.Model):
reporters = models.ManyToManyField('Reporter',
related_name='films')
class Reporter(models.Model):
first_name = models.CharField(max_length=30)
last_name = models.CharField(max_length=30)
email = models.EmailField()
pets = models.ManyToManyField('self')
a_choice = models.CharField(max_length=30, choices=CHOICES)
def __str__(self): # __unicode__ on Python 2
return "%s %s" % (self.first_name, self.last_name)
class Article(models.Model):
headline = models.CharField(max_length=100)
pub_date = models.DateField()
reporter = models.ForeignKey(Reporter, related_name='articles')
lang = models.CharField(max_length=2, help_text='Language', choices=[
('es', 'Spanish'),
('en', 'English')
], default='es')
importance = models.IntegerField('Importance', null=True, blank=True,
choices=[(1, u'Very important'), (2, u'Not as important')])
def __str__(self): # __unicode__ on Python 2
return self.headline
class Meta:
ordering = ('headline',)

View File

@ -1,39 +0,0 @@
import graphene
from graphene import Schema, relay
from ..types import DjangoObjectType
from .models import Article, Reporter
class Character(DjangoObjectType):
class Meta:
model = Reporter
interfaces = (relay.Node, )
def get_node(self, id, context, info):
pass
class Human(DjangoObjectType):
raises = graphene.String()
class Meta:
model = Article
interfaces = (relay.Node, )
def resolve_raises(self, *args):
raise Exception("This field should raise exception")
def get_node(self, id):
pass
class Query(graphene.ObjectType):
human = graphene.Field(Human)
def resolve_human(self, args, context, info):
return Human()
schema = Schema(query=Query)

View File

@ -1,11 +0,0 @@
from django.core import management
from mock import patch
from six import StringIO
@patch('graphene_django.management.commands.graphql_schema.Command.save_file')
def test_generate_file_on_call_graphql_schema(savefile_mock, settings):
settings.GRAPHENE_SCHEMA = 'graphene_django.tests.schema'
out = StringIO()
management.call_command('graphql_schema', schema='', stdout=out)
assert "Successfully dumped GraphQL schema to schema.json" in out.getvalue()

View File

@ -1,261 +0,0 @@
import pytest
from django.db import models
from django.utils.translation import ugettext_lazy as _
from py.test import raises
import graphene
from graphene.relay import Node, ConnectionField
from graphene.types.datetime import DateTime
from graphene.types.json import JSONString
# from graphene.core.types.custom_scalars import DateTime, JSONString
from ..compat import (ArrayField, HStoreField, JSONField, MissingType,
RangeField)
from ..converter import convert_django_field, convert_django_field_with_choices
from ..registry import Registry
from .models import Article, Reporter, Film, FilmDetails, Pet
from ..types import DjangoObjectType
def assert_conversion(django_field, graphene_field, *args, **kwargs):
field = django_field(help_text='Custom Help Text', *args, **kwargs)
graphene_type = convert_django_field(field)
assert isinstance(graphene_type, graphene_field)
field = graphene_type.Field()
assert field.description == 'Custom Help Text'
return field
def test_should_unknown_django_field_raise_exception():
with raises(Exception) as excinfo:
convert_django_field(None)
assert 'Don\'t know how to convert the Django field' in str(excinfo.value)
def test_should_date_convert_string():
assert_conversion(models.DateField, DateTime)
def test_should_char_convert_string():
assert_conversion(models.CharField, graphene.String)
def test_should_text_convert_string():
assert_conversion(models.TextField, graphene.String)
def test_should_email_convert_string():
assert_conversion(models.EmailField, graphene.String)
def test_should_slug_convert_string():
assert_conversion(models.SlugField, graphene.String)
def test_should_url_convert_string():
assert_conversion(models.URLField, graphene.String)
def test_should_ipaddress_convert_string():
assert_conversion(models.GenericIPAddressField, graphene.String)
def test_should_file_convert_string():
assert_conversion(models.FileField, graphene.String)
def test_should_image_convert_string():
assert_conversion(models.ImageField, graphene.String)
def test_should_auto_convert_id():
assert_conversion(models.AutoField, graphene.ID, primary_key=True)
def test_should_positive_integer_convert_int():
assert_conversion(models.PositiveIntegerField, graphene.Int)
def test_should_positive_small_convert_int():
assert_conversion(models.PositiveSmallIntegerField, graphene.Int)
def test_should_small_integer_convert_int():
assert_conversion(models.SmallIntegerField, graphene.Int)
def test_should_big_integer_convert_int():
assert_conversion(models.BigIntegerField, graphene.Int)
def test_should_integer_convert_int():
assert_conversion(models.IntegerField, graphene.Int)
def test_should_boolean_convert_boolean():
field = assert_conversion(models.BooleanField, graphene.NonNull)
assert field.type.of_type == graphene.Boolean
def test_should_nullboolean_convert_boolean():
assert_conversion(models.NullBooleanField, graphene.Boolean)
def test_field_with_choices_convert_enum():
field = models.CharField(help_text='Language', choices=(
('es', 'Spanish'),
('en', 'English')
))
class TranslatedModel(models.Model):
language = field
class Meta:
app_label = 'test'
graphene_type = convert_django_field_with_choices(field)
assert isinstance(graphene_type, graphene.Enum)
assert graphene_type._meta.name == 'TranslatedModelLanguage'
assert graphene_type._meta.enum.__members__['SPANISH'].value == 'es'
assert graphene_type._meta.enum.__members__['SPANISH'].description == 'Spanish'
assert graphene_type._meta.enum.__members__['ENGLISH'].value == 'en'
assert graphene_type._meta.enum.__members__['ENGLISH'].description == 'English'
def test_field_with_grouped_choices():
field = models.CharField(help_text='Language', choices=(
('Europe', (
('es', 'Spanish'),
('en', 'English'),
)),
))
class GroupedChoicesModel(models.Model):
language = field
class Meta:
app_label = 'test'
convert_django_field_with_choices(field)
def test_field_with_choices_gettext():
field = models.CharField(help_text='Language', choices=(
('es', _('Spanish')),
('en', _('English'))
))
class TranslatedChoicesModel(models.Model):
language = field
class Meta:
app_label = 'test'
convert_django_field_with_choices(field)
def test_should_float_convert_float():
assert_conversion(models.FloatField, graphene.Float)
def test_should_manytomany_convert_connectionorlist():
registry = Registry()
dynamic_field = convert_django_field(Reporter._meta.local_many_to_many[0], registry)
assert not dynamic_field.get_type()
def test_should_manytomany_convert_connectionorlist_list():
class A(DjangoObjectType):
class Meta:
model = Reporter
graphene_field = convert_django_field(Reporter._meta.local_many_to_many[0], A._meta.registry)
assert isinstance(graphene_field, graphene.Dynamic)
dynamic_field = graphene_field.get_type()
assert isinstance(dynamic_field, graphene.Field)
assert isinstance(dynamic_field.type, graphene.List)
assert dynamic_field.type.of_type == A
def test_should_manytomany_convert_connectionorlist_connection():
class A(DjangoObjectType):
class Meta:
model = Reporter
interfaces = (Node, )
graphene_field = convert_django_field(Reporter._meta.local_many_to_many[0], A._meta.registry)
assert isinstance(graphene_field, graphene.Dynamic)
dynamic_field = graphene_field.get_type()
assert isinstance(dynamic_field, ConnectionField)
assert dynamic_field.type == A.Connection
def test_should_manytoone_convert_connectionorlist():
# Django 1.9 uses 'rel', <1.9 uses 'related
related = getattr(Reporter.articles, 'rel', None) or \
getattr(Reporter.articles, 'related')
class A(DjangoObjectType):
class Meta:
model = Article
graphene_field = convert_django_field(related, A._meta.registry)
assert isinstance(graphene_field, graphene.Dynamic)
dynamic_field = graphene_field.get_type()
assert isinstance(dynamic_field, graphene.Field)
assert isinstance(dynamic_field.type, graphene.List)
assert dynamic_field.type.of_type == A
def test_should_onetoone_reverse_convert_model():
# Django 1.9 uses 'rel', <1.9 uses 'related
related = getattr(Film.details, 'rel', None) or \
getattr(Film.details, 'related')
class A(DjangoObjectType):
class Meta:
model = FilmDetails
graphene_field = convert_django_field(related, A._meta.registry)
assert isinstance(graphene_field, graphene.Dynamic)
dynamic_field = graphene_field.get_type()
assert isinstance(dynamic_field, graphene.Field)
assert dynamic_field.type == A
@pytest.mark.skipif(ArrayField is MissingType,
reason="ArrayField should exist")
def test_should_postgres_array_convert_list():
field = assert_conversion(ArrayField, graphene.List, models.CharField(max_length=100))
assert isinstance(field.type, graphene.List)
assert field.type.of_type == graphene.String
@pytest.mark.skipif(ArrayField is MissingType,
reason="ArrayField should exist")
def test_should_postgres_array_multiple_convert_list():
field = assert_conversion(ArrayField, graphene.List, ArrayField(models.CharField(max_length=100)))
assert isinstance(field.type, graphene.List)
assert isinstance(field.type.of_type, graphene.List)
assert field.type.of_type.of_type == graphene.String
@pytest.mark.skipif(HStoreField is MissingType,
reason="HStoreField should exist")
def test_should_postgres_hstore_convert_string():
assert_conversion(HStoreField, JSONString)
@pytest.mark.skipif(JSONField is MissingType,
reason="JSONField should exist")
def test_should_postgres_json_convert_string():
assert_conversion(JSONField, JSONString)
@pytest.mark.skipif(RangeField is MissingType,
reason="RangeField should exist")
def test_should_postgres_range_convert_list():
from django.contrib.postgres.fields import IntegerRangeField
field = assert_conversion(IntegerRangeField, graphene.List)
assert isinstance(field.type, graphene.List)
assert field.type.of_type == graphene.Int

View File

@ -1,103 +0,0 @@
from django import forms
from py.test import raises
import graphene
from ..form_converter import convert_form_field
from graphene import ID, List, NonNull
from .models import Reporter
def assert_conversion(django_field, graphene_field, *args):
field = django_field(*args, help_text='Custom Help Text')
graphene_type = convert_form_field(field)
assert isinstance(graphene_type, graphene_field)
field = graphene_type.Field()
assert field.description == 'Custom Help Text'
return field
def test_should_unknown_django_field_raise_exception():
with raises(Exception) as excinfo:
convert_form_field(None)
assert 'Don\'t know how to convert the Django form field' in str(excinfo.value)
def test_should_date_convert_string():
assert_conversion(forms.DateField, graphene.String)
def test_should_time_convert_string():
assert_conversion(forms.TimeField, graphene.String)
def test_should_date_time_convert_string():
assert_conversion(forms.DateTimeField, graphene.String)
def test_should_char_convert_string():
assert_conversion(forms.CharField, graphene.String)
def test_should_email_convert_string():
assert_conversion(forms.EmailField, graphene.String)
def test_should_slug_convert_string():
assert_conversion(forms.SlugField, graphene.String)
def test_should_url_convert_string():
assert_conversion(forms.URLField, graphene.String)
def test_should_choice_convert_string():
assert_conversion(forms.ChoiceField, graphene.String)
def test_should_base_field_convert_string():
assert_conversion(forms.Field, graphene.String)
def test_should_regex_convert_string():
assert_conversion(forms.RegexField, graphene.String, '[0-9]+')
def test_should_uuid_convert_string():
if hasattr(forms, 'UUIDField'):
assert_conversion(forms.UUIDField, graphene.String)
def test_should_integer_convert_int():
assert_conversion(forms.IntegerField, graphene.Int)
def test_should_boolean_convert_boolean():
field = assert_conversion(forms.BooleanField, graphene.Boolean)
assert isinstance(field.type, NonNull)
def test_should_nullboolean_convert_boolean():
field = assert_conversion(forms.NullBooleanField, graphene.Boolean)
assert not isinstance(field.type, NonNull)
def test_should_float_convert_float():
assert_conversion(forms.FloatField, graphene.Float)
def test_should_decimal_convert_float():
assert_conversion(forms.DecimalField, graphene.Float)
def test_should_multiple_choice_convert_connectionorlist():
field = forms.ModelMultipleChoiceField(Reporter.objects.all())
graphene_type = convert_form_field(field)
assert isinstance(graphene_type, List)
assert graphene_type.of_type == ID
def test_should_manytoone_convert_connectionorlist():
field = forms.ModelChoiceField(Reporter.objects.all())
graphene_type = convert_form_field(field)
assert isinstance(graphene_type, graphene.ID)

View File

@ -1,29 +0,0 @@
from django.core.exceptions import ValidationError
from py.test import raises
from ..forms import GlobalIDFormField
# 'TXlUeXBlOmFiYw==' -> 'MyType', 'abc'
def test_global_id_valid():
field = GlobalIDFormField()
field.clean('TXlUeXBlOmFiYw==')
def test_global_id_invalid():
field = GlobalIDFormField()
with raises(ValidationError):
field.clean('badvalue')
def test_global_id_none():
field = GlobalIDFormField()
with raises(ValidationError):
field.clean(None)
def test_global_id_none_optional():
field = GlobalIDFormField(required=False)
field.clean(None)

View File

@ -1,252 +0,0 @@
import datetime
import pytest
from django.db import models
from py.test import raises
import graphene
from graphene.relay import Node
from ..compat import MissingType, RangeField
from ..types import DjangoObjectType
from ..fields import DjangoConnectionField
from ..registry import reset_global_registry, get_global_registry
from .models import Article, Reporter
pytestmark = pytest.mark.django_db
def test_should_query_only_fields():
with raises(Exception):
class ReporterType(DjangoObjectType):
class Meta:
model = Reporter
only_fields = ('articles', )
schema = graphene.Schema(query=ReporterType)
query = '''
query ReporterQuery {
articles
}
'''
result = schema.execute(query)
assert not result.errors
def test_should_query_well():
class ReporterType(DjangoObjectType):
class Meta:
model = Reporter
class Query(graphene.ObjectType):
reporter = graphene.Field(ReporterType)
def resolve_reporter(self, *args, **kwargs):
return Reporter(first_name='ABA', last_name='X')
query = '''
query ReporterQuery {
reporter {
firstName,
lastName,
email
}
}
'''
expected = {
'reporter': {
'firstName': 'ABA',
'lastName': 'X',
'email': ''
}
}
schema = graphene.Schema(query=Query)
result = schema.execute(query)
assert not result.errors
assert result.data == expected
@pytest.mark.skipif(RangeField is MissingType,
reason="RangeField should exist")
def test_should_query_postgres_fields():
from django.contrib.postgres.fields import IntegerRangeField, ArrayField, JSONField, HStoreField
class Event(models.Model):
ages = IntegerRangeField(help_text='The age ranges')
data = JSONField(help_text='Data')
store = HStoreField()
tags = ArrayField(models.CharField(max_length=50))
class EventType(DjangoObjectType):
class Meta:
model = Event
class Query(graphene.ObjectType):
event = graphene.Field(EventType)
def resolve_event(self, *args, **kwargs):
return Event(
ages=(0, 10),
data={'angry_babies': True},
store={'h': 'store'},
tags=['child', 'angry', 'babies']
)
schema = graphene.Schema(query=Query)
query = '''
query myQuery {
event {
ages
tags
data
store
}
}
'''
expected = {
'event': {
'ages': [0, 10],
'tags': ['child', 'angry', 'babies'],
'data': '{"angry_babies": true}',
'store': '{"h": "store"}',
},
}
result = schema.execute(query)
assert not result.errors
assert result.data == expected
def test_should_node():
# reset_global_registry()
# Node._meta.registry = get_global_registry()
class ReporterNode(DjangoObjectType):
class Meta:
model = Reporter
interfaces = (Node, )
@classmethod
def get_node(cls, id, context, info):
return Reporter(id=2, first_name='Cookie Monster')
def resolve_articles(self, *args, **kwargs):
return [Article(headline='Hi!')]
class ArticleNode(DjangoObjectType):
class Meta:
model = Article
interfaces = (Node, )
@classmethod
def get_node(cls, id, context, info):
return Article(id=1, headline='Article node', pub_date=datetime.date(2002, 3, 11))
class Query(graphene.ObjectType):
node = Node.Field()
reporter = graphene.Field(ReporterNode)
article = graphene.Field(ArticleNode)
def resolve_reporter(self, *args, **kwargs):
return Reporter(id=1, first_name='ABA', last_name='X')
query = '''
query ReporterQuery {
reporter {
id,
firstName,
articles {
edges {
node {
headline
}
}
}
lastName,
email
}
myArticle: node(id:"QXJ0aWNsZU5vZGU6MQ==") {
id
... on ReporterNode {
firstName
}
... on ArticleNode {
headline
pubDate
}
}
}
'''
expected = {
'reporter': {
'id': 'UmVwb3J0ZXJOb2RlOjE=',
'firstName': 'ABA',
'lastName': 'X',
'email': '',
'articles': {
'edges': [{
'node': {
'headline': 'Hi!'
}
}]
},
},
'myArticle': {
'id': 'QXJ0aWNsZU5vZGU6MQ==',
'headline': 'Article node',
'pubDate': '2002-03-11',
}
}
schema = graphene.Schema(query=Query)
result = schema.execute(query)
assert not result.errors
assert result.data == expected
def test_should_query_connectionfields():
class ReporterType(DjangoObjectType):
class Meta:
model = Reporter
interfaces = (Node, )
only_fields = ('articles', )
class Query(graphene.ObjectType):
all_reporters = DjangoConnectionField(ReporterType)
def resolve_all_reporters(self, args, context, info):
return [Reporter(id=1)]
schema = graphene.Schema(query=Query)
query = '''
query ReporterConnectionQuery {
allReporters {
pageInfo {
hasNextPage
}
edges {
node {
id
}
}
}
}
'''
result = schema.execute(query)
assert not result.errors
assert result.data == {
'allReporters': {
'pageInfo': {
'hasNextPage': False,
},
'edges': [{
'node': {
'id': 'UmVwb3J0ZXJUeXBlOjE='
}
}]
}
}

View File

@ -1,40 +0,0 @@
from py.test import raises
from ..types import DjangoObjectType
from ..registry import Registry
from .models import Reporter
def test_should_raise_if_no_model():
with raises(Exception) as excinfo:
class Character1(DjangoObjectType):
pass
assert 'valid Django Model' in str(excinfo.value)
def test_should_raise_if_model_is_invalid():
with raises(Exception) as excinfo:
class Character2(DjangoObjectType):
class Meta:
model = 1
assert 'valid Django Model' in str(excinfo.value)
def test_should_map_fields_correctly():
class ReporterType2(DjangoObjectType):
class Meta:
model = Reporter
registry = Registry()
assert list(ReporterType2._meta.fields.keys()) == ['id', 'first_name', 'last_name', 'email', 'pets', 'a_choice', 'articles', 'films']
def test_should_map_only_few_fields():
class Reporter2(DjangoObjectType):
class Meta:
model = Reporter
only_fields = ('id', 'email')
assert list(Reporter2._meta.fields.keys()) == ['id', 'email']

View File

@ -1,123 +0,0 @@
from graphql.type import GraphQLObjectType
from mock import patch
from graphene import ObjectType, Field, Int, ID, Schema, Interface
from graphene.relay import Node, ConnectionField
from ..types import DjangoObjectType
from .models import Article as ArticleModel, Reporter as ReporterModel
from ..registry import reset_global_registry, Registry
reset_global_registry()
class Reporter(DjangoObjectType):
'''Reporter description'''
class Meta:
model = ReporterModel
class Article(DjangoObjectType):
'''Article description'''
class Meta:
model = ArticleModel
interfaces = (Node, )
class RootQuery(ObjectType):
node = Node.Field()
schema = Schema(query=RootQuery, types=[Article, Reporter])
def test_django_interface():
assert issubclass(Node, Interface)
assert issubclass(Node, Node)
@patch('graphene_django.tests.models.Article.objects.get', return_value=Article(id=1))
def test_django_get_node(get):
article = Article.get_node(1, None, None)
get.assert_called_with(id=1)
assert article.id == 1
def test_django_objecttype_map_correct_fields():
fields = Reporter._meta.fields
assert list(fields.keys()) == ['id', 'first_name', 'last_name', 'email', 'pets', 'a_choice', 'articles', 'films']
def test_django_objecttype_with_node_have_correct_fields():
fields = Article._meta.fields
assert list(fields.keys()) == ['id', 'headline', 'pub_date', 'reporter', 'lang', 'importance']
def test_schema_representation():
expected = """
schema {
query: RootQuery
}
type Article implements Node {
id: ID!
headline: String
pubDate: DateTime
reporter: Reporter
lang: ArticleLang
importance: ArticleImportance
}
type ArticleConnection {
pageInfo: PageInfo!
edges: [ArticleEdge]
}
type ArticleEdge {
node: Article
cursor: String!
}
enum ArticleImportance {
VERY_IMPORTANT
NOT_AS_IMPORTANT
}
enum ArticleLang {
SPANISH
ENGLISH
}
scalar DateTime
interface Node {
id: ID!
}
type PageInfo {
hasNextPage: Boolean!
hasPreviousPage: Boolean!
startCursor: String
endCursor: String
}
type Reporter {
id: ID
firstName: String
lastName: String
email: String
pets: [Reporter]
aChoice: ReporterA_choice
articles(before: String, after: String, first: Int, last: Int): ArticleConnection
}
enum ReporterA_choice {
THIS
THAT
}
type RootQuery {
node(id: ID!): Node
}
""".lstrip()
assert str(schema) == expected

View File

@ -1,57 +0,0 @@
import json
def format_response(response):
return json.loads(response.content.decode())
def test_client_get_good_query(settings, client):
settings.ROOT_URLCONF = 'graphene_django.tests.urls'
response = client.get('/graphql', {'query': '{ human { headline } }'})
json_response = format_response(response)
expected_json = {
'data': {
'human': {
'headline': None
}
}
}
assert json_response == expected_json
def test_client_get_good_query_with_raise(settings, client):
settings.ROOT_URLCONF = 'graphene_django.tests.urls'
response = client.get('/graphql', {'query': '{ human { raises } }'})
json_response = format_response(response)
assert json_response['errors'][0]['message'] == 'This field should raise exception'
assert json_response['data']['human']['raises'] is None
def test_client_post_good_query_json(settings, client):
settings.ROOT_URLCONF = 'graphene_django.tests.urls'
response = client.post(
'/graphql', json.dumps({'query': '{ human { headline } }'}), 'application/json')
json_response = format_response(response)
expected_json = {
'data': {
'human': {
'headline': None
}
}
}
assert json_response == expected_json
def test_client_post_good_query_graphql(settings, client):
settings.ROOT_URLCONF = 'graphene_django.tests.urls'
response = client.post(
'/graphql', '{ human { headline } }', 'application/graphql')
json_response = format_response(response)
expected_json = {
'data': {
'human': {
'headline': None
}
}
}
assert json_response == expected_json

View File

@ -1,8 +0,0 @@
from django.conf.urls import url
from ..views import GraphQLView
from .schema import schema
urlpatterns = [
url(r'^graphql', GraphQLView.as_view(schema=schema)),
]

View File

@ -1,114 +0,0 @@
from collections import OrderedDict
import six
from graphene import ObjectType, Field
from graphene.types.objecttype import ObjectTypeMeta
from .converter import convert_django_field_with_choices
from graphene.types.options import Options
from .utils import get_model_fields, is_valid_django_model, DJANGO_FILTER_INSTALLED
from .registry import Registry, get_global_registry
from graphene.utils.is_base_type import is_base_type
from graphene.types.utils import yank_fields_from_attrs, merge
def construct_fields(options):
_model_fields = get_model_fields(options.model)
only_fields = options.only_fields
exclude_fields = options.exclude_fields
fields = OrderedDict()
for field in _model_fields:
name = field.name
is_not_in_only = only_fields and name not in options.only_fields
is_already_created = name in options.fields
is_excluded = name in exclude_fields or is_already_created
if is_not_in_only or is_excluded:
# We skip this field if we specify only_fields and is not
# in there. Or when we exclude this field in exclude_fields
continue
converted = convert_django_field_with_choices(field, options.registry)
if not converted:
continue
fields[name] = converted
return fields
class DjangoObjectTypeMeta(ObjectTypeMeta):
@staticmethod
def __new__(cls, name, bases, attrs):
# Also ensure initialization is only performed for subclasses of
# DjangoObjectType
if not is_base_type(bases, DjangoObjectTypeMeta):
return type.__new__(cls, name, bases, attrs)
defaults = dict(
name=name,
description=attrs.pop('__doc__', None),
model=None,
local_fields=None,
only_fields=(),
exclude_fields=(),
interfaces=(),
registry=None
)
if DJANGO_FILTER_INSTALLED:
# In case Django filter is available, then
# we allow more attributes in Meta
defaults.update(
filter_fields=(),
filter_order_by=(),
)
options = Options(
attrs.pop('Meta', None),
**defaults
)
if not options.registry:
options.registry = get_global_registry()
assert isinstance(options.registry, Registry), (
'The attribute registry in {}.Meta needs to be an instance of '
'Registry, received "{}".'
).format(name, options.registry)
assert is_valid_django_model(options.model), (
'You need to pass a valid Django Model in {}.Meta, received "{}".'
).format(name, options.model)
cls = ObjectTypeMeta.__new__(cls, name, bases, dict(attrs, _meta=options))
options.registry.register(cls)
options.django_fields = yank_fields_from_attrs(
construct_fields(options),
_as=Field,
)
options.fields = merge(
options.interface_fields,
options.django_fields,
options.base_fields,
options.local_fields
)
return cls
class DjangoObjectType(six.with_metaclass(DjangoObjectTypeMeta, ObjectType)):
@classmethod
def is_type_of(cls, root, context, info):
if isinstance(root, cls):
return True
if not is_valid_django_model(type(root)):
raise Exception((
'Received incompatible instance "{}".'
).format(root))
model = root._meta.model
return model == cls._meta.model
@classmethod
def get_node(cls, id, context, info):
try:
return cls._meta.model.objects.get(id=id)
except cls._meta.model.DoesNotExist:
return None

View File

@ -1,81 +0,0 @@
import inspect
from django.db import models
from django.db.models.manager import Manager
# from graphene.utils import LazyList
class LazyList(object):
pass
from .compat import RelatedObject
try:
import django_filters # noqa
DJANGO_FILTER_INSTALLED = True
except (ImportError, AttributeError):
# AtributeError raised if DjangoFilters installed with a incompatible Django Version
DJANGO_FILTER_INSTALLED = False
def get_reverse_fields(model):
for name, attr in model.__dict__.items():
# Django =>1.9 uses 'rel', django <1.9 uses 'related'
related = getattr(attr, 'rel', None) or \
getattr(attr, 'related', None)
if isinstance(related, RelatedObject):
# Hack for making it compatible with Django 1.6
new_related = RelatedObject(related.parent_model, related.model, related.field)
new_related.name = name
yield new_related
elif isinstance(related, models.ManyToOneRel):
yield related
elif isinstance(related, models.ManyToManyRel) and not related.symmetrical:
yield related
def maybe_queryset(value):
if isinstance(value, Manager):
value = value.get_queryset()
return value
def get_model_fields(model):
reverse_fields = get_reverse_fields(model)
all_fields = sorted(list(model._meta.fields) +
list(model._meta.local_many_to_many))
all_fields += list(reverse_fields)
return all_fields
def get_related_model(field):
if hasattr(field, 'rel'):
# Django 1.6, 1.7
return field.rel.to
return field.related_model
def is_valid_django_model(model):
return inspect.isclass(model) and issubclass(model, models.Model)
def import_single_dispatch():
try:
from functools import singledispatch
except ImportError:
singledispatch = None
if not singledispatch:
try:
from singledispatch import singledispatch
except ImportError:
pass
if not singledispatch:
raise Exception(
"It seems your python version does not include "
"functools.singledispatch. Please install the 'singledispatch' "
"package. More information here: "
"https://pypi.python.org/pypi/singledispatch"
)
return singledispatch

View File

@ -1,9 +0,0 @@
from graphql_django_view import GraphQLView as BaseGraphQLView
class GraphQLView(BaseGraphQLView):
def __init__(self, schema, **kwargs):
super(GraphQLView, self).__init__(
schema=schema,
**kwargs
)

View File

@ -1,2 +0,0 @@
[pytest]
DJANGO_SETTINGS_MODULE = django_test_settings

View File

@ -1,50 +0,0 @@
from setuptools import find_packages, setup
setup(
name='graphene-django',
version='1.0.dev20160910000001',
description='Graphene Django integration',
# long_description=open('README.rst').read(),
url='https://github.com/graphql-python/graphene-django',
author='Syrus Akbary',
author_email='me@syrusakbary.com',
license='MIT',
classifiers=[
'Development Status :: 3 - Alpha',
'Intended Audience :: Developers',
'Topic :: Software Development :: Libraries',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: Implementation :: PyPy',
],
keywords='api graphql protocol rest relay graphene',
packages=find_packages(exclude=['tests']),
install_requires=[
'six>=1.10.0',
'graphene>=1.0.dev',
'Django>=1.6.0',
'iso8601',
'singledispatch>=3.4.0.3',
'graphql-django-view>=1.3',
],
tests_require=[
'django-filter>=0.10.0',
'pytest>=2.7.2',
'pytest-django',
'mock',
# Required for Django postgres fields testing
'psycopg2',
],
)

View File

@ -1,50 +0,0 @@
Example Flask+SQLAlchemy Project
================================
This example project demos integration between Graphene, Flask and SQLAlchemy.
The project contains two models, one named `Department` and another
named `Employee`.
Getting started
---------------
First you'll need to get the source of the project. Do this by cloning the
whole Graphene repository:
```bash
# Get the example project code
git clone https://github.com/graphql-python/graphene.git
cd graphene/examples/flask_sqlalchemy
```
It is good idea (but not required) to create a virtual environment
for this project. We'll do this using
[virtualenv](http://docs.python-guide.org/en/latest/dev/virtualenvs/)
to keep things simple,
but you may also find something like
[virtualenvwrapper](https://virtualenvwrapper.readthedocs.org/en/latest/)
to be useful:
```bash
# Create a virtualenv in which we can install the dependencies
virtualenv env
source env/bin/activate
```
Now we can install our dependencies:
```bash
pip install -r requirements.txt
```
Now the following command will setup the database, and start the server:
```bash
./app.py
```
Now head on over to
[http://127.0.0.1:5000/graphiql](http://127.0.0.1:5000/graphiql)
and run some queries!

View File

@ -1,40 +0,0 @@
from flask import Flask
from database import db_session, init_db
from flask_graphql import GraphQLView
from schema import schema
app = Flask(__name__)
app.debug = True
default_query = '''
{
allEmployees {
edges {
node {
id,
name,
department {
id,
name
},
role {
id,
name
}
}
}
}
}'''.strip()
app.add_url_rule('/graphql', view_func=GraphQLView.as_view('graphql', schema=schema, graphiql=True))
@app.teardown_appcontext
def shutdown_session(exception=None):
db_session.remove()
if __name__ == '__main__':
init_db()
app.run()

View File

@ -1,38 +0,0 @@
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import scoped_session, sessionmaker
engine = create_engine('sqlite:///database.sqlite3', convert_unicode=True)
db_session = scoped_session(sessionmaker(autocommit=False,
autoflush=False,
bind=engine))
Base = declarative_base()
Base.query = db_session.query_property()
def init_db():
# import all modules here that might define models so that
# they will be registered properly on the metadata. Otherwise
# you will have to import them first before calling init_db()
from models import Department, Employee, Role
Base.metadata.drop_all(bind=engine)
Base.metadata.create_all(bind=engine)
# Create the fixtures
engineering = Department(name='Engineering')
db_session.add(engineering)
hr = Department(name='Human Resources')
db_session.add(hr)
manager = Role(name='manager')
db_session.add(manager)
engineer = Role(name='engineer')
db_session.add(engineer)
peter = Employee(name='Peter', department=engineering, role=engineer)
db_session.add(peter)
roy = Employee(name='Roy', department=engineering, role=engineer)
db_session.add(roy)
tracy = Employee(name='Tracy', department=hr, role=manager)
db_session.add(tracy)
db_session.commit()

View File

@ -1,39 +0,0 @@
from sqlalchemy import Column, DateTime, ForeignKey, Integer, String, func
from sqlalchemy.orm import backref, relationship
from database import Base
class Department(Base):
__tablename__ = 'department'
id = Column(Integer, primary_key=True)
name = Column(String)
class Role(Base):
__tablename__ = 'roles'
role_id = Column(Integer, primary_key=True)
name = Column(String)
class Employee(Base):
__tablename__ = 'employee'
id = Column(Integer, primary_key=True)
name = Column(String)
# Use default=func.now() to set the default hiring time
# of an Employee to be the current time when an
# Employee record was created
hired_on = Column(DateTime, default=func.now())
department_id = Column(Integer, ForeignKey('department.id'))
role_id = Column(Integer, ForeignKey('roles.role_id'))
# Use cascade='delete,all' to propagate the deletion of a Department onto its Employees
department = relationship(
Department,
backref=backref('employees',
uselist=True,
cascade='delete,all'))
role = relationship(
Role,
backref=backref('roles',
uselist=True,
cascade='delete,all'))

View File

@ -1,4 +0,0 @@
graphene[sqlalchemy]
SQLAlchemy==1.0.11
Flask==0.10.1
Flask-GraphQL==1.3.0

View File

@ -1,38 +0,0 @@
import graphene
from graphene import relay
from graphene_sqlalchemy import (SQLAlchemyConnectionField,
SQLAlchemyObjectType)
from models import Department as DepartmentModel
from models import Employee as EmployeeModel
from models import Role as RoleModel
class Department(SQLAlchemyObjectType):
class Meta:
model = DepartmentModel
interfaces = (relay.Node, )
class Employee(SQLAlchemyObjectType):
class Meta:
model = EmployeeModel
interfaces = (relay.Node, )
class Role(SQLAlchemyObjectType):
class Meta:
model = RoleModel
interfaces = (relay.Node, )
class Query(graphene.ObjectType):
node = relay.Node.Field()
all_employees = SQLAlchemyConnectionField(Employee)
all_roles = SQLAlchemyConnectionField(Role)
role = graphene.Field(Role)
schema = graphene.Schema(query=Query, types=[Department, Employee, Role])

View File

@ -1,15 +0,0 @@
from .types import (
SQLAlchemyObjectType,
)
from .fields import (
SQLAlchemyConnectionField
)
from .utils import (
get_query,
get_session
)
__all__ = ['SQLAlchemyObjectType',
'SQLAlchemyConnectionField',
'get_query',
'get_session']

View File

@ -1,133 +0,0 @@
from singledispatch import singledispatch
from sqlalchemy import types
from sqlalchemy.orm import interfaces
from sqlalchemy.dialects import postgresql
from graphene import Enum, ID, Boolean, Float, Int, String, List, Field, Dynamic
from graphene.relay import is_node
from graphene.types.json import JSONString
from .fields import SQLAlchemyConnectionField
try:
from sqlalchemy_utils import ChoiceType, ScalarListType
except ImportError:
class ChoiceType(object):
pass
class ScalarListType(object):
pass
def convert_sqlalchemy_relationship(relationship, registry):
direction = relationship.direction
model = relationship.mapper.entity
def dynamic_type():
_type = registry.get_type_for_model(model)
if not _type:
return None
if (direction == interfaces.MANYTOONE or not relationship.uselist):
return Field(_type)
elif (direction == interfaces.ONETOMANY or
direction == interfaces.MANYTOMANY):
if is_node(_type):
return SQLAlchemyConnectionField(_type)
return Field(List(_type))
return Dynamic(dynamic_type)
def convert_sqlalchemy_composite(composite, registry):
converter = registry.get_converter_for_composite(composite.composite_class)
if not converter:
try:
raise Exception(
"Don't know how to convert the composite field %s (%s)" %
(composite, composite.composite_class))
except AttributeError:
# handle fields that are not attached to a class yet (don't have a parent)
raise Exception(
"Don't know how to convert the composite field %r (%s)" %
(composite, composite.composite_class))
return converter(composite, registry)
def _register_composite_class(cls, registry=None):
if registry is None:
from .registry import get_global_registry
registry = get_global_registry()
def inner(fn):
registry.register_composite_converter(cls, fn)
return inner
convert_sqlalchemy_composite.register = _register_composite_class
def convert_sqlalchemy_column(column, registry=None):
return convert_sqlalchemy_type(getattr(column, 'type', None), column, registry)
@singledispatch
def convert_sqlalchemy_type(type, column, registry=None):
raise Exception(
"Don't know how to convert the SQLAlchemy field %s (%s)" % (column, column.__class__))
@convert_sqlalchemy_type.register(types.Date)
@convert_sqlalchemy_type.register(types.DateTime)
@convert_sqlalchemy_type.register(types.Time)
@convert_sqlalchemy_type.register(types.String)
@convert_sqlalchemy_type.register(types.Text)
@convert_sqlalchemy_type.register(types.Unicode)
@convert_sqlalchemy_type.register(types.UnicodeText)
@convert_sqlalchemy_type.register(types.Enum)
@convert_sqlalchemy_type.register(postgresql.ENUM)
@convert_sqlalchemy_type.register(postgresql.UUID)
def convert_column_to_string(type, column, registry=None):
return String(description=column.doc, required=not(column.nullable))
@convert_sqlalchemy_type.register(types.SmallInteger)
@convert_sqlalchemy_type.register(types.BigInteger)
@convert_sqlalchemy_type.register(types.Integer)
def convert_column_to_int_or_id(type, column, registry=None):
if column.primary_key:
return ID(description=column.doc, required=not(column.nullable))
else:
return Int(description=column.doc, required=not(column.nullable))
@convert_sqlalchemy_type.register(types.Boolean)
def convert_column_to_boolean(type, column, registry=None):
return Boolean(description=column.doc, required=not(column.nullable))
@convert_sqlalchemy_type.register(types.Float)
@convert_sqlalchemy_type.register(types.Numeric)
def convert_column_to_float(type, column, registry=None):
return Float(description=column.doc, required=not(column.nullable))
@convert_sqlalchemy_type.register(ChoiceType)
def convert_column_to_enum(type, column, registry=None):
name = '{}_{}'.format(column.table.name, column.name).upper()
return Enum(name, type.choices, description=column.doc)
@convert_sqlalchemy_type.register(ScalarListType)
def convert_scalar_list_to_list(type, column, registry=None):
return List(String, description=column.doc)
@convert_sqlalchemy_type.register(postgresql.ARRAY)
def convert_postgres_array_to_list(type, column, registry=None):
graphene_type = convert_sqlalchemy_type(column.type.item_type, column)
return List(graphene_type, description=column.doc, required=not(column.nullable))
@convert_sqlalchemy_type.register(postgresql.HSTORE)
@convert_sqlalchemy_type.register(postgresql.JSON)
@convert_sqlalchemy_type.register(postgresql.JSONB)
def convert_json_to_string(type, column, registry=None):
return JSONString(description=column.doc, required=not(column.nullable))

View File

@ -1,37 +0,0 @@
from functools import partial
from sqlalchemy.orm.query import Query
from graphene.relay import ConnectionField
from graphene.relay.connection import PageInfo
from graphql_relay.connection.arrayconnection import connection_from_list_slice
from .utils import get_query
class SQLAlchemyConnectionField(ConnectionField):
@property
def model(self):
return self.type._meta.node._meta.model
@staticmethod
def connection_resolver(resolver, connection, model, root, args, context, info):
iterable = resolver(root, args, context, info)
if iterable is None:
iterable = get_query(model, context)
if isinstance(iterable, Query):
_len = iterable.count()
else:
_len = len(iterable)
return connection_from_list_slice(
iterable,
args,
slice_start=0,
list_length=_len,
list_slice_length=_len,
connection_type=connection,
pageinfo_type=PageInfo,
edge_type=connection.Edge,
)
def get_resolver(self, parent_resolver):
return partial(self.connection_resolver, parent_resolver, self.type, self.model)

View File

@ -1,39 +0,0 @@
class Registry(object):
def __init__(self):
self._registry = {}
self._registry_models = {}
self._registry_composites = {}
def register(self, cls):
from .types import SQLAlchemyObjectType
assert issubclass(cls, SQLAlchemyObjectType), 'Only SQLAlchemyObjectType can be registered, received "{}"'.format(cls.__name__)
assert cls._meta.registry == self, 'Registry for a Model have to match.'
# assert self.get_type_for_model(cls._meta.model) in [None, cls], (
# 'SQLAlchemy model "{}" already associated with '
# 'another type "{}".'
# ).format(cls._meta.model, self._registry[cls._meta.model])
self._registry[cls._meta.model] = cls
def get_type_for_model(self, model):
return self._registry.get(model)
def register_composite_converter(self, composite, converter):
self._registry_composites[composite] = converter
def get_converter_for_composite(self, composite):
return self._registry_composites.get(composite)
registry = None
def get_global_registry():
global registry
if not registry:
registry = Registry()
return registry
def reset_global_registry():
global registry
registry = None

View File

@ -1,43 +0,0 @@
from __future__ import absolute_import
from sqlalchemy import Column, Date, ForeignKey, Integer, String, Table
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
Base = declarative_base()
association_table = Table('association', Base.metadata,
Column('pet_id', Integer, ForeignKey('pets.id')),
Column('reporter_id', Integer, ForeignKey('reporters.id')))
class Editor(Base):
__tablename__ = 'editors'
editor_id = Column(Integer(), primary_key=True)
name = Column(String(100))
class Pet(Base):
__tablename__ = 'pets'
id = Column(Integer(), primary_key=True)
name = Column(String(30))
reporter_id = Column(Integer(), ForeignKey('reporters.id'))
class Reporter(Base):
__tablename__ = 'reporters'
id = Column(Integer(), primary_key=True)
first_name = Column(String(30))
last_name = Column(String(30))
email = Column(String())
pets = relationship('Pet', secondary=association_table, backref='reporters')
articles = relationship('Article', backref='reporter')
favorite_article = relationship("Article", uselist=False)
class Article(Base):
__tablename__ = 'articles'
id = Column(Integer(), primary_key=True)
headline = Column(String(100))
pub_date = Column(Date())
reporter_id = Column(Integer(), ForeignKey('reporters.id'))

View File

@ -1,266 +0,0 @@
from py.test import raises
from sqlalchemy import Column, Table, types
from sqlalchemy.orm import composite
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy_utils import ChoiceType, ScalarListType
from sqlalchemy.dialects import postgresql
import graphene
from graphene.relay import Node
from graphene.types.json import JSONString
from ..converter import (convert_sqlalchemy_column,
convert_sqlalchemy_composite,
convert_sqlalchemy_relationship)
from ..fields import SQLAlchemyConnectionField
from ..types import SQLAlchemyObjectType
from ..registry import Registry
from .models import Article, Pet, Reporter
def assert_column_conversion(sqlalchemy_type, graphene_field, **kwargs):
column = Column(sqlalchemy_type, doc='Custom Help Text', **kwargs)
graphene_type = convert_sqlalchemy_column(column)
assert isinstance(graphene_type, graphene_field)
field = graphene_type.Field()
assert field.description == 'Custom Help Text'
return field
def assert_composite_conversion(composite_class, composite_columns, graphene_field,
registry, **kwargs):
composite_column = composite(composite_class, *composite_columns,
doc='Custom Help Text', **kwargs)
graphene_type = convert_sqlalchemy_composite(composite_column, registry)
assert isinstance(graphene_type, graphene_field)
field = graphene_type.Field()
# SQLAlchemy currently does not persist the doc onto the column, even though
# the documentation says it does....
# assert field.description == 'Custom Help Text'
return field
def test_should_unknown_sqlalchemy_field_raise_exception():
with raises(Exception) as excinfo:
convert_sqlalchemy_column(None)
assert 'Don\'t know how to convert the SQLAlchemy field' in str(excinfo.value)
def test_should_date_convert_string():
assert_column_conversion(types.Date(), graphene.String)
def test_should_datetime_convert_string():
assert_column_conversion(types.DateTime(), graphene.String)
def test_should_time_convert_string():
assert_column_conversion(types.Time(), graphene.String)
def test_should_string_convert_string():
assert_column_conversion(types.String(), graphene.String)
def test_should_text_convert_string():
assert_column_conversion(types.Text(), graphene.String)
def test_should_unicode_convert_string():
assert_column_conversion(types.Unicode(), graphene.String)
def test_should_unicodetext_convert_string():
assert_column_conversion(types.UnicodeText(), graphene.String)
def test_should_enum_convert_string():
assert_column_conversion(types.Enum(), graphene.String)
def test_should_small_integer_convert_int():
assert_column_conversion(types.SmallInteger(), graphene.Int)
def test_should_big_integer_convert_int():
assert_column_conversion(types.BigInteger(), graphene.Int)
def test_should_integer_convert_int():
assert_column_conversion(types.Integer(), graphene.Int)
def test_should_integer_convert_id():
assert_column_conversion(types.Integer(), graphene.ID, primary_key=True)
def test_should_boolean_convert_boolean():
assert_column_conversion(types.Boolean(), graphene.Boolean)
def test_should_float_convert_float():
assert_column_conversion(types.Float(), graphene.Float)
def test_should_numeric_convert_float():
assert_column_conversion(types.Numeric(), graphene.Float)
def test_should_choice_convert_enum():
TYPES = [
(u'es', u'Spanish'),
(u'en', u'English')
]
column = Column(ChoiceType(TYPES), doc='Language', name='language')
Base = declarative_base()
Table('translatedmodel', Base.metadata, column)
graphene_type = convert_sqlalchemy_column(column)
assert issubclass(graphene_type, graphene.Enum)
assert graphene_type._meta.name == 'TRANSLATEDMODEL_LANGUAGE'
assert graphene_type._meta.description == 'Language'
assert graphene_type._meta.enum.__members__['es'].value == 'Spanish'
assert graphene_type._meta.enum.__members__['en'].value == 'English'
def test_should_scalar_list_convert_list():
assert_column_conversion(ScalarListType(), graphene.List)
def test_should_manytomany_convert_connectionorlist():
registry = Registry()
dynamic_field = convert_sqlalchemy_relationship(Reporter.pets.property, registry)
assert isinstance(dynamic_field, graphene.Dynamic)
assert not dynamic_field.get_type()
def test_should_manytomany_convert_connectionorlist_list():
class A(SQLAlchemyObjectType):
class Meta:
model = Pet
dynamic_field = convert_sqlalchemy_relationship(Reporter.pets.property, A._meta.registry)
assert isinstance(dynamic_field, graphene.Dynamic)
graphene_type = dynamic_field.get_type()
assert isinstance(graphene_type, graphene.Field)
assert isinstance(graphene_type.type, graphene.List)
assert graphene_type.type.of_type == A
def test_should_manytomany_convert_connectionorlist_connection():
class A(SQLAlchemyObjectType):
class Meta:
model = Pet
interfaces = (Node, )
dynamic_field = convert_sqlalchemy_relationship(Reporter.pets.property, A._meta.registry)
assert isinstance(dynamic_field, graphene.Dynamic)
assert isinstance(dynamic_field.get_type(), SQLAlchemyConnectionField)
def test_should_manytoone_convert_connectionorlist():
registry = Registry()
dynamic_field = convert_sqlalchemy_relationship(Article.reporter.property, registry)
assert isinstance(dynamic_field, graphene.Dynamic)
assert not dynamic_field.get_type()
def test_should_manytoone_convert_connectionorlist_list():
class A(SQLAlchemyObjectType):
class Meta:
model = Reporter
dynamic_field = convert_sqlalchemy_relationship(Article.reporter.property, A._meta.registry)
assert isinstance(dynamic_field, graphene.Dynamic)
graphene_type = dynamic_field.get_type()
assert isinstance(graphene_type, graphene.Field)
assert graphene_type.type == A
def test_should_manytoone_convert_connectionorlist_connection():
class A(SQLAlchemyObjectType):
class Meta:
model = Reporter
interfaces = (Node, )
dynamic_field = convert_sqlalchemy_relationship(Article.reporter.property, A._meta.registry)
assert isinstance(dynamic_field, graphene.Dynamic)
graphene_type = dynamic_field.get_type()
assert isinstance(graphene_type, graphene.Field)
assert graphene_type.type == A
def test_should_onetoone_convert_field():
class A(SQLAlchemyObjectType):
class Meta:
model = Article
interfaces = (Node, )
dynamic_field = convert_sqlalchemy_relationship(Reporter.favorite_article.property, A._meta.registry)
assert isinstance(dynamic_field, graphene.Dynamic)
graphene_type = dynamic_field.get_type()
assert isinstance(graphene_type, graphene.Field)
assert graphene_type.type == A
def test_should_postgresql_uuid_convert():
assert_column_conversion(postgresql.UUID(), graphene.String)
def test_should_postgresql_enum_convert():
assert_column_conversion(postgresql.ENUM(), graphene.String)
def test_should_postgresql_array_convert():
assert_column_conversion(postgresql.ARRAY(types.Integer), graphene.List)
def test_should_postgresql_json_convert():
assert_column_conversion(postgresql.JSON(), JSONString)
def test_should_postgresql_jsonb_convert():
assert_column_conversion(postgresql.JSONB(), JSONString)
def test_should_postgresql_hstore_convert():
assert_column_conversion(postgresql.HSTORE(), JSONString)
def test_should_composite_convert():
class CompositeClass(object):
def __init__(self, col1, col2):
self.col1 = col1
self.col2 = col2
registry = Registry()
@convert_sqlalchemy_composite.register(CompositeClass, registry)
def convert_composite_class(composite, registry):
return graphene.String(description=composite.doc)
assert_composite_conversion(CompositeClass,
(Column(types.Unicode(50)),
Column(types.Unicode(50))),
graphene.String,
registry)
def test_should_unknown_sqlalchemy_composite_raise_exception():
registry = Registry()
with raises(Exception) as excinfo:
class CompositeClass(object):
def __init__(self, col1, col2):
self.col1 = col1
self.col2 = col2
assert_composite_conversion(CompositeClass,
(Column(types.Unicode(50)),
Column(types.Unicode(50))),
graphene.String,
registry)
assert 'Don\'t know how to convert the composite field' in str(excinfo.value)

View File

@ -1,330 +0,0 @@
import pytest
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session, sessionmaker
import graphene
from graphene.relay import Node
from ..types import SQLAlchemyObjectType
from ..fields import SQLAlchemyConnectionField
from .models import Article, Base, Editor, Reporter
db = create_engine('sqlite:///test_sqlalchemy.sqlite3')
@pytest.yield_fixture(scope='function')
def session():
connection = db.engine.connect()
transaction = connection.begin()
Base.metadata.create_all(connection)
# options = dict(bind=connection, binds={})
session_factory = sessionmaker(bind=connection)
session = scoped_session(session_factory)
yield session
# Finalize test here
transaction.rollback()
connection.close()
session.remove()
def setup_fixtures(session):
reporter = Reporter(first_name='ABA', last_name='X')
session.add(reporter)
reporter2 = Reporter(first_name='ABO', last_name='Y')
session.add(reporter2)
article = Article(headline='Hi!')
article.reporter = reporter
session.add(article)
editor = Editor(name="John")
session.add(editor)
session.commit()
def test_should_query_well(session):
setup_fixtures(session)
class ReporterType(SQLAlchemyObjectType):
class Meta:
model = Reporter
class Query(graphene.ObjectType):
reporter = graphene.Field(ReporterType)
reporters = graphene.List(ReporterType)
def resolve_reporter(self, *args, **kwargs):
return session.query(Reporter).first()
def resolve_reporters(self, *args, **kwargs):
return session.query(Reporter)
query = '''
query ReporterQuery {
reporter {
firstName,
lastName,
email
}
reporters {
firstName
}
}
'''
expected = {
'reporter': {
'firstName': 'ABA',
'lastName': 'X',
'email': None
},
'reporters': [{
'firstName': 'ABA',
}, {
'firstName': 'ABO',
}]
}
schema = graphene.Schema(query=Query)
result = schema.execute(query)
assert not result.errors
assert result.data == expected
def test_should_node(session):
setup_fixtures(session)
class ReporterNode(SQLAlchemyObjectType):
class Meta:
model = Reporter
interfaces = (Node, )
@classmethod
def get_node(cls, id, info):
return Reporter(id=2, first_name='Cookie Monster')
class ArticleNode(SQLAlchemyObjectType):
class Meta:
model = Article
interfaces = (Node, )
# @classmethod
# def get_node(cls, id, info):
# return Article(id=1, headline='Article node')
class Query(graphene.ObjectType):
node = Node.Field()
reporter = graphene.Field(ReporterNode)
article = graphene.Field(ArticleNode)
all_articles = SQLAlchemyConnectionField(ArticleNode)
def resolve_reporter(self, *args, **kwargs):
return session.query(Reporter).first()
def resolve_article(self, *args, **kwargs):
return session.query(Article).first()
query = '''
query ReporterQuery {
reporter {
id,
firstName,
articles {
edges {
node {
headline
}
}
}
lastName,
email
}
allArticles {
edges {
node {
headline
}
}
}
myArticle: node(id:"QXJ0aWNsZU5vZGU6MQ==") {
id
... on ReporterNode {
firstName
}
... on ArticleNode {
headline
}
}
}
'''
expected = {
'reporter': {
'id': 'UmVwb3J0ZXJOb2RlOjE=',
'firstName': 'ABA',
'lastName': 'X',
'email': None,
'articles': {
'edges': [{
'node': {
'headline': 'Hi!'
}
}]
},
},
'allArticles': {
'edges': [{
'node': {
'headline': 'Hi!'
}
}]
},
'myArticle': {
'id': 'QXJ0aWNsZU5vZGU6MQ==',
'headline': 'Hi!'
}
}
schema = graphene.Schema(query=Query)
result = schema.execute(query, context_value={'session': session})
assert not result.errors
assert result.data == expected
def test_should_custom_identifier(session):
setup_fixtures(session)
class EditorNode(SQLAlchemyObjectType):
class Meta:
model = Editor
interfaces = (Node, )
class Query(graphene.ObjectType):
node = Node.Field()
all_editors = SQLAlchemyConnectionField(EditorNode)
query = '''
query EditorQuery {
allEditors {
edges {
node {
id,
name
}
}
},
node(id: "RWRpdG9yTm9kZTox") {
...on EditorNode {
name
}
}
}
'''
expected = {
'allEditors': {
'edges': [{
'node': {
'id': 'RWRpdG9yTm9kZTox',
'name': 'John'
}
}]
},
'node': {
'name': 'John'
}
}
schema = graphene.Schema(query=Query)
result = schema.execute(query, context_value={'session': session})
assert not result.errors
assert result.data == expected
def test_should_mutate_well(session):
setup_fixtures(session)
class EditorNode(SQLAlchemyObjectType):
class Meta:
model = Editor
interfaces = (Node, )
class ReporterNode(SQLAlchemyObjectType):
class Meta:
model = Reporter
interfaces = (Node, )
@classmethod
def get_node(cls, id, info):
return Reporter(id=2, first_name='Cookie Monster')
class ArticleNode(SQLAlchemyObjectType):
class Meta:
model = Article
interfaces = (Node, )
class CreateArticle(graphene.Mutation):
class Input:
headline = graphene.String()
reporter_id = graphene.ID()
ok = graphene.Boolean()
article = graphene.Field(ArticleNode)
@classmethod
def mutate(cls, instance, args, context, info):
new_article = Article(
headline=args.get('headline'),
reporter_id = args.get('reporter_id'),
)
session.add(new_article)
session.commit()
ok = True
return CreateArticle(article=new_article, ok=ok)
class Query(graphene.ObjectType):
node = Node.Field()
class Mutation(graphene.ObjectType):
create_article = CreateArticle.Field()
query = '''
mutation ArticleCreator {
createArticle(
headline: "My Article"
reporterId: "1"
) {
ok
article {
headline
reporter {
id
firstName
}
}
}
}
'''
expected = {
'createArticle': {
'ok': True,
'article': {
'headline': 'My Article',
'reporter': {
'id': 'UmVwb3J0ZXJOb2RlOjE=',
'firstName': 'ABA'
}
}
},
}
schema = graphene.Schema(query=Query, mutation=Mutation)
result = schema.execute(query, context_value={'session': session})
assert not result.errors
assert result.data == expected

View File

@ -1,40 +0,0 @@
from py.test import raises
from ..types import SQLAlchemyObjectType
from .models import Reporter
from ..registry import Registry
def test_should_raise_if_no_model():
with raises(Exception) as excinfo:
class Character1(SQLAlchemyObjectType):
pass
assert 'valid SQLAlchemy Model' in str(excinfo.value)
def test_should_raise_if_model_is_invalid():
with raises(Exception) as excinfo:
class Character2(SQLAlchemyObjectType):
class Meta:
model = 1
assert 'valid SQLAlchemy Model' in str(excinfo.value)
def test_should_map_fields_correctly():
class ReporterType2(SQLAlchemyObjectType):
class Meta:
model = Reporter
registry = Registry()
assert list(ReporterType2._meta.fields.keys()) == ['id', 'first_name', 'last_name', 'email', 'pets', 'articles', 'favorite_article']
def test_should_map_only_few_fields():
class Reporter2(SQLAlchemyObjectType):
class Meta:
model = Reporter
only_fields = ('id', 'email')
assert list(Reporter2._meta.fields.keys()) == ['id', 'email']

View File

@ -1,73 +0,0 @@
from graphql.type import GraphQLObjectType, GraphQLInterfaceType
from graphql import GraphQLInt
from pytest import raises
from graphene import Schema, Interface, ObjectType
from graphene.relay import Node, is_node
from ..types import SQLAlchemyObjectType
from ..registry import Registry
from graphene import Field, Int
from .models import Article, Reporter
registry = Registry()
class Character(SQLAlchemyObjectType):
'''Character description'''
class Meta:
model = Reporter
registry = registry
class Human(SQLAlchemyObjectType):
'''Human description'''
pub_date = Int()
class Meta:
model = Article
exclude_fields = ('id', )
registry = registry
interfaces = (Node, )
def test_sqlalchemy_interface():
assert issubclass(Node, Interface)
assert issubclass(Node, Node)
# @patch('graphene.contrib.sqlalchemy.tests.models.Article.filter', return_value=Article(id=1))
# def test_sqlalchemy_get_node(get):
# human = Human.get_node(1, None)
# get.assert_called_with(id=1)
# assert human.id == 1
def test_objecttype_registered():
assert issubclass(Character, ObjectType)
assert Character._meta.model == Reporter
assert list(Character._meta.fields.keys()) == ['id', 'first_name', 'last_name', 'email', 'pets', 'articles', 'favorite_article']
# def test_sqlalchemynode_idfield():
# idfield = Node._meta.fields_map['id']
# assert isinstance(idfield, GlobalIDField)
# def test_node_idfield():
# idfield = Human._meta.fields_map['id']
# assert isinstance(idfield, GlobalIDField)
def test_node_replacedfield():
idfield = Human._meta.fields['pub_date']
assert isinstance(idfield, Field)
assert idfield.type == Int
def test_object_type():
assert issubclass(Human, ObjectType)
assert list(Human._meta.fields.keys()) == ['id', 'headline', 'reporter_id', 'reporter', 'pub_date']
assert is_node(Human)

View File

@ -1,24 +0,0 @@
from graphene import ObjectType, Schema, String
from ..utils import get_session
def test_get_session():
session = 'My SQLAlchemy session'
class Query(ObjectType):
x = String()
def resolve_x(self, args, context, info):
return get_session(context)
query = '''
query ReporterQuery {
x
}
'''
schema = Schema(query=Query)
result = schema.execute(query, context_value={'session': session})
assert not result.errors
assert result.data['x'] == session

View File

@ -1,145 +0,0 @@
from collections import OrderedDict
import six
from sqlalchemy.inspection import inspect as sqlalchemyinspect
from sqlalchemy.orm.exc import NoResultFound
from graphene import ObjectType, Field
from graphene.relay import is_node
from .converter import (convert_sqlalchemy_column,
convert_sqlalchemy_composite,
convert_sqlalchemy_relationship)
from .utils import is_mapped
from graphene.types.objecttype import ObjectTypeMeta
from graphene.types.options import Options
from .registry import Registry, get_global_registry
from graphene.utils.is_base_type import is_base_type
from graphene.types.utils import yank_fields_from_attrs, merge
from .utils import get_query
def construct_fields(options):
only_fields = options.only_fields
exclude_fields = options.exclude_fields
inspected_model = sqlalchemyinspect(options.model)
fields = OrderedDict()
for name, column in inspected_model.columns.items():
is_not_in_only = only_fields and name not in only_fields
is_already_created = name in options.fields
is_excluded = name in exclude_fields or is_already_created
if is_not_in_only or is_excluded:
# We skip this field if we specify only_fields and is not
# in there. Or when we excldue this field in exclude_fields
continue
converted_column = convert_sqlalchemy_column(column, options.registry)
fields[name] = converted_column
for name, composite in inspected_model.composites.items():
is_not_in_only = only_fields and name not in only_fields
is_already_created = name in options.fields
is_excluded = name in exclude_fields or is_already_created
if is_not_in_only or is_excluded:
# We skip this field if we specify only_fields and is not
# in there. Or when we excldue this field in exclude_fields
continue
converted_composite = convert_sqlalchemy_composite(composite, options.registry)
fields[name] = converted_composite
# Get all the columns for the relationships on the model
for relationship in inspected_model.relationships:
is_not_in_only = only_fields and relationship.key not in only_fields
is_already_created = relationship.key in options.fields
is_excluded = relationship.key in exclude_fields or is_already_created
if is_not_in_only or is_excluded:
# We skip this field if we specify only_fields and is not
# in there. Or when we excldue this field in exclude_fields
continue
converted_relationship = convert_sqlalchemy_relationship(relationship, options.registry)
name = relationship.key
fields[name] = converted_relationship
return fields
class SQLAlchemyObjectTypeMeta(ObjectTypeMeta):
@staticmethod
def __new__(cls, name, bases, attrs):
# Also ensure initialization is only performed for subclasses of Model
# (excluding Model class itself).
if not is_base_type(bases, SQLAlchemyObjectTypeMeta):
return type.__new__(cls, name, bases, attrs)
options = Options(
attrs.pop('Meta', None),
name=name,
description=attrs.pop('__doc__', None),
model=None,
local_fields=None,
only_fields=(),
exclude_fields=(),
id='id',
interfaces=(),
registry=None
)
if not options.registry:
options.registry = get_global_registry()
assert isinstance(options.registry, Registry), (
'The attribute registry in {}.Meta needs to be an'
' instance of Registry, received "{}".'
).format(name, options.registry)
assert is_mapped(options.model), (
'You need to pass a valid SQLAlchemy Model in '
'{}.Meta, received "{}".'
).format(name, options.model)
cls = ObjectTypeMeta.__new__(cls, name, bases, dict(attrs, _meta=options))
options.registry.register(cls)
options.sqlalchemy_fields = yank_fields_from_attrs(
construct_fields(options),
_as=Field,
)
options.fields = merge(
options.interface_fields,
options.sqlalchemy_fields,
options.base_fields,
options.local_fields
)
return cls
class SQLAlchemyObjectType(six.with_metaclass(SQLAlchemyObjectTypeMeta, ObjectType)):
@classmethod
def is_type_of(cls, root, context, info):
if isinstance(root, cls):
return True
if not is_mapped(type(root)):
raise Exception((
'Received incompatible instance "{}".'
).format(root))
return type(root) == cls._meta.model
@classmethod
def get_query(cls, context):
model = cls._meta.model
return get_query(model, context)
@classmethod
def get_node(cls, id, context, info):
try:
return cls.get_query(context).get(id)
except NoResultFound:
return None
def resolve_id(root, args, context, info):
graphene_type = info.parent_type.graphene_type
if is_node(graphene_type):
return root.__mapper__.primary_key_from_instance(root)[0]
return getattr(root, graphene_type._meta.id, None)

View File

@ -1,20 +0,0 @@
from sqlalchemy.ext.declarative.api import DeclarativeMeta
def get_session(context):
return context.get('session')
def get_query(model, context):
query = getattr(model, 'query', None)
if not query:
session = get_session(context)
if not session:
raise Exception('A query in the model Base or a session in the schema is required for querying.\n'
'Read more http://graphene-python.org/docs/sqlalchemy/tips/#querying')
query = session.query(model)
return query
def is_mapped(obj):
return isinstance(obj, DeclarativeMeta)

View File

@ -1,44 +0,0 @@
from setuptools import find_packages, setup
setup(
name='graphene-sqlalchemy',
version='1.0.dev20160910000001',
description='Graphene SQLAlchemy integration',
# long_description=open('README.rst').read(),
url='https://github.com/graphql-python/graphene-sqlalchemy',
author='Syrus Akbary',
author_email='me@syrusakbary.com',
license='MIT',
classifiers=[
'Development Status :: 3 - Alpha',
'Intended Audience :: Developers',
'Topic :: Software Development :: Libraries',
'Programming Language :: Python :: 2',
'Programming Language :: Python :: 2.7',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.3',
'Programming Language :: Python :: 3.4',
'Programming Language :: Python :: 3.5',
'Programming Language :: Python :: Implementation :: PyPy',
],
keywords='api graphql protocol rest relay graphene',
packages=find_packages(exclude=['tests']),
install_requires=[
'six>=1.10.0',
'graphene>=1.0.dev',
'SQLAlchemy',
'singledispatch>=3.4.0.3',
],
tests_require=[
'pytest>=2.7.2',
'mock',
],
)