Fixing it up to use the declarative API

This commit is contained in:
Jeremy Feinstein 2015-11-25 20:31:49 -05:00
parent 487543206c
commit 9a0b33ca77
11 changed files with 557 additions and 101 deletions

View File

@ -2,16 +2,18 @@ from sqlalchemy import types
from sqlalchemy.orm import interfaces
from singledispatch import singledispatch
from graphene.contrib.sqlalchemy.fields import ConnectionOrListField, SQLAlchemyModelField
from graphene.core.fields import BooleanField, FloatField, IDField, IntField, StringField
from ...core.types.scalars import Boolean, Float, ID, Int, String
from .fields import ConnectionOrListField, SQLAlchemyModelField
def convert_sqlalchemy_relationship(relationship):
model_field = SQLAlchemyModelField(field.table, description=relationship.key)
if relationship.direction == interfaces.ONETOMANY:
direction = relationship.direction
model = relationship.mapper.entity
model_field = SQLAlchemyModelField(model, description=relationship.doc)
if direction == interfaces.MANYTOONE:
return model_field
elif (relationship.direction == interfaces.MANYTOONE or
relationship.direction == interfaces.MANYTOMANY):
elif (direction == interfaces.ONETOMANY or
direction == interfaces.MANYTOMANY):
return ConnectionOrListField(model_field)
@ -19,43 +21,43 @@ def convert_sqlalchemy_column(column):
try:
return convert_sqlalchemy_type(column.type, column)
except Exception:
raise
raise Exception(
"Don't know how to convert the SQLAlchemy field %s (%s)" % (column, column.__class__))
@singledispatch
def convert_sqlalchemy_type():
raise Exception(
"Don't know how to convert the SQLAlchemy column %s (%s)" % (column, column.__class__))
def convert_sqlalchemy_type(type, column):
raise Exception()
@convert_sqlalchemy_type.register(types.Date)
@convert_sqlalchemy_type.register(types.DateTime)
@convert_sqlalchemy_type.register(types.Time)
@convert_sqlalchemy_type.register(types.Text)
@convert_sqlalchemy_type.register(types.String)
@convert_sqlalchemy_type.register(types.Text)
@convert_sqlalchemy_type.register(types.Unicode)
@convert_sqlalchemy_type.register(types.UnicodeText)
@convert_sqlalchemy_type.register(types.Enum)
def convert_column_to_string(type, column):
return StringField(description=column.description)
return String(description=column.doc)
@convert_sqlalchemy_type.register(types.SmallInteger)
@convert_sqlalchemy_type.register(types.BigInteger)
@convert_sqlalchemy_type.register(types.Integer)
def convert_column_to_int_or_id(column):
def convert_column_to_int_or_id(type, column):
if column.primary_key:
return IDField(description=column.description)
return ID(description=column.doc)
else:
return IntField(description=column.description)
return Int(description=column.doc)
@convert_sqlalchemy_type.register(types.Boolean)
def convert_column_to_boolean(column):
return BooleanField(description=column.description)
def convert_column_to_boolean(type, column):
return Boolean(description=column.doc)
@convert_sqlalchemy_type.register(types.Float)
@convert_sqlalchemy_type.register(types.Numeric)
def convert_column_to_float(column):
return FloatField(description=column.description)
def convert_column_to_float(type, column):
return Float(description=column.doc)

View File

@ -1,67 +1,69 @@
from graphene import relay
from graphene.contrib.sqlalchemy.utils import get_type_for_model, lazy_map
from graphene.core.fields import Field, LazyField, ListField
from graphene.relay.utils import is_node
from sqlalchemy.orm import Query
from ...core.exceptions import SkipField
from ...core.fields import Field
from ...core.types.base import FieldType
from ...core.types.definitions import List
from ...relay import ConnectionField
from ...relay.utils import is_node
from ...utils import LazyMap
from .utils import get_type_for_model
class SQLAlchemyConnectionField(relay.ConnectionField):
class SQLAlchemyConnectionField(ConnectionField):
def wrap_resolved(self, value, instance, args, info):
schema = info.schema.graphene_schema
return lazy_map(value, self.get_object_type(schema))
if isinstance(value, Query):
return LazyMap(value, self.type)
return value
class LazyListField(ListField):
class LazyListField(Field):
def resolve(self, instance, args, info):
schema = info.schema.graphene_schema
resolved = super(LazyListField, self).resolve(instance, args, info)
return lazy_map(resolved, self.get_object_type(schema))
def get_type(self, schema):
return List(self.type)
def resolver(self, instance, args, info):
resolved = super(LazyListField, self).resolver(instance, args, info)
return LazyMap(resolved, self.type)
class ConnectionOrListField(LazyField):
class ConnectionOrListField(Field):
def get_field(self, schema):
model_field = self.field_type
def internal_type(self, schema):
model_field = self.type
field_object_type = model_field.get_object_type(schema)
if not field_object_type:
raise SkipField()
if is_node(field_object_type):
field = SQLAlchemyConnectionField(model_field)
field = SQLAlchemyConnectionField(field_object_type)
else:
field = LazyListField(model_field)
field.contribute_to_class(self.object_type, self.name)
return field
field = LazyListField(field_object_type)
field.contribute_to_class(self.object_type, self.attname)
return schema.T(field)
class SQLAlchemyModelField(Field):
class SQLAlchemyModelField(FieldType):
def __init__(self, model, *args, **kwargs):
super(SQLAlchemyModelField, self).__init__(None, *args, **kwargs)
self.model = model
def resolve(self, instance, args, info):
resolved = super(SQLAlchemyModelField, self).resolve(instance, args, info)
schema = info.schema.graphene_schema
_type = self.get_object_type(schema)
assert _type, ("Field %s cannot be retrieved as the "
"ObjectType is not registered by the schema" % (
self.attname
))
return _type(resolved)
super(SQLAlchemyModelField, self).__init__(*args, **kwargs)
def internal_type(self, schema):
_type = self.get_object_type(schema)
if not _type and self.object_type._meta.only_fields:
if not _type and self.parent._meta.only_fields:
raise Exception(
"Model %r is not accessible by the schema. "
"Table %r is not accessible by the schema. "
"You can either register the type manually "
"using @schema.register. "
"Or disable the field %s in %s" % (
"Or disable the field in %s" % (
self.model,
self.attname,
self.object_type
self.parent,
)
)
return schema.T(_type) or Field.SKIP
if not _type:
raise SkipField()
return schema.T(_type)
def get_object_type(self, schema):
return get_type_for_model(schema, self.model)

View File

@ -1,23 +1,23 @@
import inspect
from sqlalchemy import Table
from sqlalchemy.ext.declarative.api import DeclarativeMeta
from graphene.core.options import Options
from graphene.relay.types import Node
from graphene.relay.utils import is_node
from ...core.options import Options
from ...relay.types import Node
from ...relay.utils import is_node
VALID_ATTRS = ('table', 'only_columns', 'exclude_columns')
VALID_ATTRS = ('model', 'only_fields', 'exclude_fields')
def is_base(cls):
from graphene.contrib.SQLAlchemy.types import SQLAlchemyObjectType
from graphene.contrib.sqlalchemy.types import SQLAlchemyObjectType
return SQLAlchemyObjectType in cls.__bases__
class SQLAlchemyOptions(Options):
def __init__(self, *args, **kwargs):
self.table = None
self.model = None
super(SQLAlchemyOptions, self).__init__(*args, **kwargs)
self.valid_attrs += VALID_ATTRS
self.only_fields = None
@ -30,8 +30,8 @@ class SQLAlchemyOptions(Options):
self.interfaces.append(Node)
if not is_node(cls) and not is_base(cls):
return
if not self.table:
if not self.model:
raise Exception(
'SQLAlchemy ObjectType %s must have a table in the Meta class attr' % cls)
elif not inspect.isclass(self.table) or not issubclass(self.table, Table):
raise Exception('Provided table in %s is not a SQLAlchemy table' % cls)
'SQLAlchemy ObjectType %s must have a model in the Meta class attr' % cls)
elif not inspect.isclass(self.model) or not isinstance(self.model, DeclarativeMeta):
raise Exception('Provided model in %s is not a SQLAlchemy model' % cls)

View File

@ -0,0 +1,37 @@
from __future__ import absolute_import
from sqlalchemy import Table, Column, Integer, String, Date, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
Base = declarative_base()
association_table = Table('association', Base.metadata,
Column('pet_id', Integer, ForeignKey('pets.id')),
Column('reporter_id', Integer, ForeignKey('reporters.id')))
class Pet(Base):
__tablename__ = 'pets'
id = Column(Integer(), primary_key=True)
name = Column(String(30))
reporter_id = Column(Integer(), ForeignKey('reporters.id'))
class Reporter(Base):
__tablename__ = 'reporters'
id = Column(Integer(), primary_key=True)
first_name = Column(String(30))
last_name = Column(String(30))
email = Column(String())
pets = relationship('Pet', secondary=association_table, backref='reporters')
articles = relationship('Article', backref='reporter')
class Article(Base):
__tablename__ = 'articles'
id = Column(Integer(), primary_key=True)
headline = Column(String(100))
pub_date = Column(Date())
reporter_id = Column(Integer(), ForeignKey('reporters.id'))

View File

@ -0,0 +1,103 @@
from sqlalchemy import types, Column
from py.test import raises
import graphene
from graphene.contrib.sqlalchemy.converter import convert_sqlalchemy_column, convert_sqlalchemy_relationship
from graphene.contrib.sqlalchemy.fields import ConnectionOrListField, SQLAlchemyModelField
from .models import Article, Reporter, Pet
def assert_column_conversion(sqlalchemy_type, graphene_field, **kwargs):
column = Column(sqlalchemy_type, doc='Custom Help Text', **kwargs)
graphene_type = convert_sqlalchemy_column(column)
assert isinstance(graphene_type, graphene_field)
field = graphene_type.as_field()
assert field.description == 'Custom Help Text'
return field
def test_should_unknown_sqlalchemy_field_raise_exception():
with raises(Exception) as excinfo:
convert_sqlalchemy_column(None)
assert 'Don\'t know how to convert the SQLAlchemy field' in str(excinfo.value)
def test_should_date_convert_string():
assert_column_conversion(types.Date(), graphene.String)
def test_should_datetime_convert_string():
assert_column_conversion(types.DateTime(), graphene.String)
def test_should_time_convert_string():
assert_column_conversion(types.Time(), graphene.String)
def test_should_string_convert_string():
assert_column_conversion(types.String(), graphene.String)
def test_should_text_convert_string():
assert_column_conversion(types.Text(), graphene.String)
def test_should_unicode_convert_string():
assert_column_conversion(types.Unicode(), graphene.String)
def test_should_unicodetext_convert_string():
assert_column_conversion(types.UnicodeText(), graphene.String)
def test_should_enum_convert_string():
assert_column_conversion(types.Enum(), graphene.String)
def test_should_small_integer_convert_int():
assert_column_conversion(types.SmallInteger(), graphene.Int)
def test_should_big_integer_convert_int():
assert_column_conversion(types.BigInteger(), graphene.Int)
def test_should_integer_convert_int():
assert_column_conversion(types.Integer(), graphene.Int)
def test_should_integer_convert_id():
assert_column_conversion(types.Integer(), graphene.ID, primary_key=True)
def test_should_boolean_convert_boolean():
field = assert_column_conversion(types.Boolean(), graphene.Boolean)
def test_should_float_convert_float():
assert_column_conversion(types.Float(), graphene.Float)
def test_should_numeric_convert_float():
assert_column_conversion(types.Numeric(), graphene.Float)
def test_should_manytomany_convert_connectionorlist():
graphene_type = convert_sqlalchemy_relationship(Reporter.pets.property)
assert isinstance(graphene_type, ConnectionOrListField)
assert isinstance(graphene_type.type, SQLAlchemyModelField)
assert graphene_type.type.model == Pet
def test_should_manytoone_convert_connectionorlist():
field = convert_sqlalchemy_relationship(Article.reporter.property)
assert isinstance(field, SQLAlchemyModelField)
assert field.model == Reporter
def test_should_onetomany_convert_model():
graphene_type = convert_sqlalchemy_relationship(Reporter.articles.property)
assert isinstance(graphene_type, ConnectionOrListField)
assert isinstance(graphene_type.type, SQLAlchemyModelField)
assert graphene_type.type.model == Article

View File

@ -0,0 +1,141 @@
from py.test import raises
import graphene
from graphene import relay
from graphene.contrib.sqlalchemy import SQLAlchemyNode, SQLAlchemyObjectType
from .models import Article, Reporter
def test_should_query_only_fields():
with raises(Exception):
class ReporterType(SQLAlchemyObjectType):
class Meta:
model = Reporter
only_fields = ('articles', )
schema = graphene.Schema(query=ReporterType)
query = '''
query ReporterQuery {
articles
}
'''
result = schema.execute(query)
assert not result.errors
def test_should_query_well():
class ReporterType(SQLAlchemyObjectType):
class Meta:
model = Reporter
class Query(graphene.ObjectType):
reporter = graphene.Field(ReporterType)
def resolve_reporter(self, *args, **kwargs):
return ReporterType(Reporter(first_name='ABA', last_name='X'))
query = '''
query ReporterQuery {
reporter {
firstName,
lastName,
email
}
}
'''
expected = {
'reporter': {
'firstName': 'ABA',
'lastName': 'X',
'email': None
}
}
schema = graphene.Schema(query=Query)
result = schema.execute(query)
assert not result.errors
assert result.data == expected
def test_should_node():
class ReporterNode(SQLAlchemyNode):
class Meta:
model = Reporter
exclude_fields = ('id', )
@classmethod
def get_node(cls, id, info):
return ReporterNode(Reporter(id=2, first_name='Cookie Monster'))
def resolve_articles(self, *args, **kwargs):
return [ArticleNode(Article(headline='Hi!'))]
class ArticleNode(SQLAlchemyNode):
class Meta:
model = Article
exclude_fields = ('id', )
@classmethod
def get_node(cls, id, info):
return ArticleNode(Article(id=1, headline='Article node'))
class Query(graphene.ObjectType):
node = relay.NodeField()
reporter = graphene.Field(ReporterNode)
article = graphene.Field(ArticleNode)
def resolve_reporter(self, *args, **kwargs):
return ReporterNode(Reporter(id=1, first_name='ABA', last_name='X'))
query = '''
query ReporterQuery {
reporter {
id,
firstName,
articles {
edges {
node {
headline
}
}
}
lastName,
email
}
myArticle: node(id:"QXJ0aWNsZU5vZGU6MQ==") {
id
... on ReporterNode {
firstName
}
... on ArticleNode {
headline
}
}
}
'''
expected = {
'reporter': {
'id': 'UmVwb3J0ZXJOb2RlOjE=',
'firstName': 'ABA',
'lastName': 'X',
'email': None,
'articles': {
'edges': [{
'node': {
'headline': 'Hi!'
}
}]
},
},
'myArticle': {
'id': 'QXJ0aWNsZU5vZGU6MQ==',
'headline': 'Article node'
}
}
schema = graphene.Schema(query=Query)
result = schema.execute(query)
assert not result.errors
assert result.data == expected

View File

@ -0,0 +1,45 @@
from py.test import raises
from graphene.contrib.sqlalchemy import SQLAlchemyObjectType
from tests.utils import assert_equal_lists
from .models import Reporter
def test_should_raise_if_no_model():
with raises(Exception) as excinfo:
class Character1(SQLAlchemyObjectType):
pass
assert 'model in the Meta' in str(excinfo.value)
def test_should_raise_if_model_is_invalid():
with raises(Exception) as excinfo:
class Character2(SQLAlchemyObjectType):
class Meta:
model = 1
assert 'not a SQLAlchemy model' in str(excinfo.value)
def test_should_map_fields_correctly():
class ReporterType2(SQLAlchemyObjectType):
class Meta:
model = Reporter
assert_equal_lists(
ReporterType2._meta.fields_map.keys(),
['articles', 'first_name', 'last_name', 'email', 'pets', 'id']
)
def test_should_map_only_few_fields():
class Reporter2(SQLAlchemyObjectType):
class Meta:
model = Reporter
only_fields = ('id', 'email')
assert_equal_lists(
Reporter2._meta.fields_map.keys(),
['id', 'email']
)

View File

@ -0,0 +1,106 @@
from graphql.core.type import GraphQLInterfaceType, GraphQLObjectType
from pytest import raises
from graphene import Schema
from graphene.contrib.sqlalchemy.types import SQLAlchemyInterface, SQLAlchemyNode
from graphene.core.fields import Field
from graphene.core.types.scalars import Int
from graphene.relay.fields import GlobalIDField
from tests.utils import assert_equal_lists
from .models import Article, Reporter
schema = Schema()
class Character(SQLAlchemyInterface):
'''Character description'''
class Meta:
model = Reporter
@schema.register
class Human(SQLAlchemyNode):
'''Human description'''
pub_date = Int()
class Meta:
model = Article
exclude_fields = ('id', )
def test_sqlalchemy_interface():
assert SQLAlchemyNode._meta.is_interface is True
def test_sqlalchemy_get_node(get):
human = Human.get_node(1, None)
get.assert_called_with(id=1)
assert human.id == 1
def test_pseudo_interface_registered():
object_type = schema.T(Character)
assert Character._meta.is_interface is True
assert isinstance(object_type, GraphQLInterfaceType)
assert Character._meta.model == Reporter
assert_equal_lists(
object_type.get_fields().keys(),
['articles', 'firstName', 'lastName', 'email', 'pets', 'id']
)
def test_sqlalchemynode_idfield():
idfield = SQLAlchemyNode._meta.fields_map['id']
assert isinstance(idfield, GlobalIDField)
def test_node_idfield():
idfield = Human._meta.fields_map['id']
assert isinstance(idfield, GlobalIDField)
def test_node_replacedfield():
idfield = Human._meta.fields_map['pub_date']
assert isinstance(idfield, Field)
assert schema.T(idfield).type == schema.T(Int())
def test_interface_resolve_type():
resolve_type = Character.resolve_type(schema, Human())
assert isinstance(resolve_type, GraphQLObjectType)
def test_interface_objecttype_init_none():
h = Human()
assert h._root is None
def test_interface_objecttype_init_good():
instance = Article()
h = Human(instance)
assert h._root == instance
def test_interface_objecttype_init_unexpected():
with raises(AssertionError) as excinfo:
Human(object())
assert str(excinfo.value) == "Human received a non-compatible instance (object) when expecting Article"
def test_object_type():
object_type = schema.T(Human)
Human._meta.fields_map
assert Human._meta.is_interface is False
assert isinstance(object_type, GraphQLObjectType)
assert_equal_lists(
object_type.get_fields().keys(),
['headline', 'id', 'reporter', 'reporterId', 'pubDate']
)
assert schema.T(SQLAlchemyNode) in object_type.get_interfaces()
def test_node_notinterface():
assert Human._meta.is_interface is False
assert SQLAlchemyNode in Human._meta.interfaces

View File

@ -1,13 +1,11 @@
import six
from sqlalchemy.inspection import inspect
from graphene.contrib.sqlalchemy.converter import convert_sqlalchemy_column,
convert_sqlalchemy_relationship
from graphene.contrib.sqlalchemy.options import SQLAlchemyOptions
from graphene.contrib.sqlalchemy.utils import get_reverse_columns
from graphene.core.types import BaseObjectType, ObjectTypeMeta
from graphene.relay.fields import GlobalIDField
from graphene.relay.types import BaseNode
from ...core.types import BaseObjectType, ObjectTypeMeta
from ...relay.fields import GlobalIDField
from ...relay.types import BaseNode
from .converter import convert_sqlalchemy_column, convert_sqlalchemy_relationship
from .options import SQLAlchemyOptions
class SQLAlchemyObjectTypeMeta(ObjectTypeMeta):
@ -17,26 +15,60 @@ class SQLAlchemyObjectTypeMeta(ObjectTypeMeta):
return SQLAlchemyInterface in parents
def add_extra_fields(cls):
if not cls._meta.table:
if not cls._meta.model:
return
inspected_table = inspect(cls._meta.table)
# Get all the columns for the relationships on the table
for relationship in inspected_table.relationships:
only_fields = cls._meta.only_fields
exclude_fields = cls._meta.exclude_fields
already_created_fields = {f.attname for f in cls._meta.local_fields}
inspected_model = inspect(cls._meta.model)
# Get all the columns for the relationships on the model
for relationship in inspected_model.relationships:
is_not_in_only = only_fields and relationship.key not in only_fields
is_already_created = relationship.key in already_created_fields
is_excluded = relationship.key in exclude_fields or is_already_created
if is_not_in_only or is_excluded:
# We skip this field if we specify only_fields and is not
# in there. Or when we excldue this field in exclude_fields
continue
converted_relationship = convert_sqlalchemy_relationship(relationship)
cls.add_to_class(relationship.key, converted_relationship)
for column in inspected_table.columns:
for column in inspected_model.columns:
is_not_in_only = only_fields and column.name not in only_fields
is_already_created = column.name in already_created_fields
is_excluded = column.name in exclude_fields or is_already_created
if is_not_in_only or is_excluded:
# We skip this field if we specify only_fields and is not
# in there. Or when we excldue this field in exclude_fields
continue
converted_column = convert_sqlalchemy_column(column)
cls.add_to_class(column.name, converted_column)
class InstanceObjectType(BaseObjectType):
def __init__(self, instance=None):
self.instance = instance
super(InstanceObjectType, self).__init__()
def __init__(self, _root=None):
if _root:
assert isinstance(_root, self._meta.model), (
'{} received a non-compatible instance ({}) '
'when expecting {}'.format(
self.__class__.__name__,
_root.__class__.__name__,
self._meta.model.__name__
))
super(InstanceObjectType, self).__init__(_root=_root)
@property
def instance(self):
return self._root
@instance.setter
def instance(self, value):
self._root = value
def __getattr__(self, attr):
return getattr(self.instance, attr)
return getattr(self._root, attr)
class SQLAlchemyObjectType(six.with_metaclass(SQLAlchemyObjectTypeMeta, InstanceObjectType)):
@ -51,6 +83,9 @@ class SQLAlchemyNode(BaseNode, SQLAlchemyInterface):
id = GlobalIDField()
@classmethod
def get_node(cls, id):
instance = cls._meta.table.objects.filter(id=id).first()
def get_node(cls, id, info=None):
try:
instance = cls._meta.model.filter(id=id).one()
return cls(instance)
except cls._meta.model.DoesNotExist:
return None

View File

@ -1,10 +1,3 @@
from django.db import models
from django.db.models.manager import Manager
from django.db.models.query import QuerySet
from graphene.utils import LazyMap
def get_type_for_model(schema, model):
schema = schema
types = schema.types.values()
@ -13,11 +6,3 @@ def get_type_for_model(schema, model):
_type._meta, 'model', None)
if model == type_model:
return _type
def lazy_map(value, func):
if isinstance(value, Manager):
value = value.get_queryset()
if isinstance(value, QuerySet):
return LazyMap(value, func)
return value