added advanced utils + self written constructors

This commit is contained in:
MardanovTimur 2019-03-23 04:04:31 +03:00
parent 87a98fc6f8
commit 7ab0e806c1
9 changed files with 443 additions and 3 deletions

View File

@ -0,0 +1,11 @@
from graphene.relay import Connection
from graphene.types.scalars import Int
class CountableConnectionInitial(Connection):
class Meta:
abstract = True
total_count = Int()
def resolve_total_count(self, info, **kwargs):
return len(self.iterable)

View File

@ -0,0 +1,78 @@
from inspect import isclass
from types import GeneratorType
from typing import Callable
from functools import wraps, partial, singledispatch
from graphene.relay.node import from_global_id
from graphene.types.objecttype import ObjectType
@singledispatch
def paginate_instance(qs, kwargs):
""" Paginate difference of type qs.
If list or tuple just primitive slicing
If <NodeSet>
"""
raise NotImplementedError("Type {} not implemented yet.".format(type(qs)))
def paginate(resolver):
""" Paginator for resolver functions
Input types (iterable):
list, tuple, NodeSet
"""
@wraps(resolver)
def wrapper(root, info, **kwargs):
qs = resolver(root, info, **kwargs)
qs = paginate_instance(qs, kwargs)
return qs
return wrapper
@paginate_instance.register(list)
@paginate_instance.register(tuple)
@paginate_instance.register(GeneratorType)
def paginate_list(qs, kwargs):
""" Base pagination dispatcher by iterable pythonic collections
"""
if 'first' in kwargs and 'last' in kwargs:
qs = qs[:kwargs['first']]
qs = qs[kwargs['last']:]
elif 'first' in kwargs:
qs = qs[:kwargs['first']]
elif 'last' in kwargs:
qs = qs[-kwargs['last']:]
return qs
try:
from neomodel.match import NodeSet # noqa
@paginate_instance.register(NodeSet)
def paginate_nodeset(qs, kwargs):
# Warning. Type of pagination is lazy
if 'first' in kwargs and 'last' in kwargs:
qs = qs.set_skip(kwargs['first'] - kwargs['last'])
qs = qs.set_limit(kwargs['last'])
elif 'last' in kwargs:
count = len(qs)
qs = qs.set_skip(count - kwargs['last'])
qs = qs.set_limit(kwargs['last'])
elif 'first' in kwargs:
qs = qs.set_limit(kwargs['first'])
return qs
except:
raise NotImplementedError("Neomodel does not installed")
finally:
print('Install custom neomodel (ver=3.0.0)')
def check_connection(func):
""" Check that node is ObjectType
"""
@wraps(func)
def wrapper(node_, resolver, *args, **kwargs):
if not (isclass(node_) and issubclass(node_, ObjectType)):
raise NotImplementedError("{} not implemented.".format(type(node_)))
kwargs['registry_name'] = node_.__name__
return func(node_, resolver, *args, **kwargs)
return wrapper

View File

View File

@ -0,0 +1,162 @@
from functools import singledispatch
from typing import Union, Callable, Optional, Type
from django.utils.translation import ugettext as _
from graphene import String, List, ID, ObjectType, Field
from graphene.types.mountedtype import MountedType
from graphene.types.unmountedtype import UnmountedType
from graphene_django.types import DjangoObjectType
from neomodel.core import StructuredNode
from graphene_ql.decorators import paginate
from .lib import (
GrapheneQLEdgeException,
know_parent,
pagination,
)
def EdgeNode(*args, **kwargs):
""" Edge between nodes
Attrs:
cls_node -> ObjectType
EdgeNode
target_model: StructuredNode
Target model in edge relationship
target_field: str
Field name of <StructuredNode> target model
resolver -> function (None)
override function if you need them
description: str
return_type:
which graphene field is set
kwargs : -> extra arguments
"""
return EdgeNodeClass(*args, **kwargs).build()
class EdgeNodeClass:
parent_type_exception = GrapheneQLEdgeException(_(
'Parent type is incorrect for this field. Say to back'))
def __init__(self, cls_node,
target_model = None,
target_field = None,
resolver = None,
description = "Edge Node",
return_type = List,
*args,
**kwargs):
"""
Args:
cls_node: Type[DjangoObjectType],
target_model: Optional[Type[StructuredNode]] = None,
target_field: Optional[str] = None,
resolver: Optional[Callable] = None,
description: str = "Edge Node",
return_type: Union[MountedType, UnmountedType] = List,
*args, **kwargs
"""
self.cls_node = cls_node
self._resolver = resolver
self.description = description
self._target_model = target_model
self._target_field = target_field
self.arg_fields = {
'id': ID(required=False),
**kwargs,
**know_parent,
**pagination
}
self.return_type = return_type
def build(self, ):
""" Build edgeNode manager
"""
return self.return_type(self.cls_node,
**self.arg_fields,
description=self.description,
resolver=self.resolver)
@property
def resolver(self) -> Callable:
""" Resolver function
"""
return self.get_default_resolver() if self._resolver is None else self._resolver
@property
def target_model(self, ):
if self._target_model is not None:
return self._target_model
raise GrapheneQLEdgeException(message="""
target_model or resolver in EdgeNode
should be defined""")
@property
def target_field(self, ):
if self._target_field is None:
return str(self.target_model.__class__).lower()
return self._target_field
def get_default_resolver(self, ):
return get_resolver(self.return_type(String), self) # just init list field
@singledispatch
def get_resolver(node_type, edge_node):
raise NotImplementedError(f"{node_type} type isn't implemented yet")
@get_resolver.register(List)
def list_resolver(node_type, edge_node) -> Callable:
@paginate
def default_resolver(root, info, **kwargs) -> List:
""" Default <List> resolver
"""
rel_data = []
relation_field = getattr(root, edge_node.target_field)
if not kwargs.get('know_parent'):
for rel_node in relation_field.filter():
rel_data.append(relation_field.relationship(rel_node))
else:
if not hasattr(root, '_parent'):
raise EdgeNodeClass.parent_type_exception
else:
rel_data = relation_field.filter_relationships(root._parent)
if kwargs.get('id'):
rel_data = relation_field.filter_relationships(
edge_node.target_model.nodes.get(uid=kwargs['id']))
return rel_data
return default_resolver
@get_resolver.register(Field)
def field_resolver(node_type, edge_node) -> Callable:
def default_resolver(root, info, **kwargs) -> Field:
""" Default <Field> resolver
"""
data = None
relation_field = getattr(root, edge_node.target_field)
if not kwargs.get('know_parent'):
data = relation_field.filter().first_or_none()
else:
if not hasattr(root, '_parent'):
raise EdgeNodeClass.parent_type_exception
else:
data = relation_field.relationship(root._parent)
if kwargs.get('id'):
data = relation_field.relationship(
edge_node.target_model.nodes.get(uid=kwargs['id']))
return data
return default_resolver

View File

@ -0,0 +1,16 @@
#encoding=utf-8
from graphene.types.scalars import Boolean, Int
__author__ = "TimurMardanov"
know_parent = dict(know_parent=Boolean(default_value=True))
pagination = dict(first=Int(default_value=100), last=Int())
class GrapheneQLEdgeException(Exception):
def __init__(self, message):
self.message = message
def __repr__(self, ):
return self.message

View File

@ -0,0 +1,165 @@
from types import FunctionType, GeneratorType
from functools import reduce, partial
from graphene import (
NonNull, Boolean,
relay,
Int,
Connection as GConnection,
ObjectType,
Field,
List,
Enum,
JSONString as JString,
)
from graphene_django.fields import DjangoConnectionField as DjangoConnectionFieldOriginal
from graphene_django.forms.converter import convert_form_field
from graphene_django.filter.fields import DjangoFilterConnectionField
from graphene_django.types import DjangoObjectType
from graphene_django.filter.utils import (
get_filtering_args_from_filterset,
get_filterset_class
)
from lazy_import import lazy_callable, LazyCallable, lazy_module
from ..decorators import check_connection, paginate_instance
from ..utils import pagination_params
try:
from neomodel.match import NodeSet # noqa
except:
raise ImportError("Install neomodel.")
class ConnectionField(relay.ConnectionField):
""" Push node connection kwargs into ConnectionEdge.hidden_kwargs
"""
@classmethod
def resolve_connection(cls, connection_type, args, resolved):
connection = super(ConnectionField, cls).resolve_connection(connection_type, args, resolved)
connection.hidden_kwargs = args
return connection
@check_connection
def Connection(node_, resolver, *args, **kwargs):
"""
node_: [ObjectType, DjangoObjectType],
resolver,
*args,
**kwargs
Connection class which working with custom
ObjectTypes and Nodes and supports base connection.
node, resolver - required named arguments
args, kwargs - base Field arguments
Can custom count
"""
kwargs = {**pagination_params, **kwargs}
registry_name = kwargs.pop('registry_name')
override_name = kwargs.pop('name', '')
meta_name = "{}{}CustomEdgeConnection".format(registry_name,
override_name)
class EdgeConnection(ObjectType):
node = Field(node_)
class Meta:
name = meta_name
def __init__(self, node, *args, **kwargs):
if callable(node):
raise TypeError("node_resolver is not callable object")
super(EdgeConnection, self).__init__(*args, **kwargs)
self._node = node
def resolve_node(self, info, **kwargs):
return self._node
meta_name_connection = "{}{}CustomConnection".format(registry_name,
override_name)
class ConnectionDecorator(ObjectType):
edges = List(EdgeConnection)
total_count = Int()
class Meta:
name = meta_name_connection
def __init__(self, *args, **kwargs):
self.resolver_ = kwargs.pop('pr', None)
super(ConnectionDecorator, self).__init__(*args, **kwargs)
def resolve_edges(self, info, **kwargs):
items = self.resolver_(**kwargs)
return [EdgeConnection(node=item, **kwargs) for item in items]
def resolve_total_count(self, info, **kwargs):
""" Custom total count resolver
"""
result = self.resolver_(count=True)
if isinstance(result, GeneratorType):
result = list(result)
elif isinstance(result, int):
return result
elif isinstance(result, NodeSet):
return len(result.set_skip(0).set_limit(1))
if isinstance(result, (list, tuple)) and result:
if isinstance(result[0], int):
# if returned count manually
return result[0]
# if returned iterable object
return len(result)
def resolve_connection_decorator(root, info, **kwargs):
resolver_ = partial(resolver, root, info, *args, **kwargs)
return ConnectionDecorator(pr=resolver_)
return Field(ConnectionDecorator, resolver=resolve_connection_decorator, *args, **kwargs)
def RelayConnection(node_, *args, **kwargs):
""" node_: [ObjectType, DjangoObjectType, Callable],
*args,
**kwargs
Quick implementation of stock relay connection
# node: should contains relay.Node in Meta.interfaces
+ total_count implements
def - total_count_resolver: custom function for resolve total_count
"""
registry_name = kwargs.pop('name', '')
total_count_resolver = kwargs.pop('total_count_resolver', None)
def Connection(node_):
node_ = node_() if isinstance(node_, FunctionType) else node_
if isinstance(node_, LazyCallable):
node_()
meta_name = "{}{}CC".format(node_.__name__, registry_name)
class CustomConnection(relay.Connection):
class Meta:
node = node_
name = meta_name
def __init__(self, *ar, **kw):
super(CustomConnection, self).__init__(*ar, **kw)
self._extra_kwargs = kwargs
total_count = Int()
def resolve_total_count(self, info, **params):
if total_count_resolver:
return total_count_resolver(self, info, **self.hidden_kwargs)
if isinstance(self.iterable, NodeSet):
return len(self.iterable.set_skip(0).set_limit(1))
elif isinstance(self.iterable, (list, tuple)):
return len(list(self.iterable))
return 0
return CustomConnection
return ConnectionField(lambda: Connection(node_), *args, **kwargs)

View File

@ -9,6 +9,7 @@ from graphene.types.utils import yank_fields_from_attrs
from .converter import convert_django_field_with_choices
from .registry import Registry, get_global_registry
from .utils import DJANGO_FILTER_INSTALLED, get_model_fields, is_valid_neomodel_model
from .countable import CountableConnectionInitial as CountableConnection
from neomodel import (
DoesNotExist,
@ -60,7 +61,7 @@ class DjangoObjectType(ObjectType):
neomodel_filter_fields=None,
know_parent_fields=[],
connection=None,
connection_class=None,
connection_class=CountableConnection,
use_connection=None,
interfaces=(),
_meta=None,

View File

@ -1,11 +1,14 @@
import inspect
from django.db import models
from graphene.types.scalars import Int
from neomodel import (
NodeSet,
StructuredNode,
)
pagination_params = dict(first=Int(default_value=100), last=Int())
# from graphene.utils import LazyList
def is_parent_set(info):

View File

@ -13,7 +13,7 @@ with open("graphene_django/__init__.py", "rb") as f:
rest_framework_require = ["djangorestframework>=3.6.3"]
neomodel_require = [
# "neomodel==3.3.0",
"neomodel==3.3.0",
]
tests_require = [
@ -56,7 +56,11 @@ setup(
"Django>=1.11",
"singledispatch>=3.4.0.3",
"promise>=2.1",
*neomodel_require,
"lazy-import==0.2.2",
],
dependency_links=[
# TODO refactor this
"git+git://github.com/MardanovTimur/neomodel.git@arch_neomodel#egg=neomodel", # custom neomodel
],
setup_requires=["pytest-runner"],
tests_require=tests_require,