Make it kinda work

This commit is contained in:
Fernando Tancini 2021-08-31 11:46:51 -03:00
parent 54815497e6
commit c81ab16e6a
2 changed files with 159 additions and 2 deletions

View File

@ -1,4 +1,4 @@
from graphene import List, ObjectType
from graphene import List, ObjectType, Int
from .sql.types import DjangoDebugSQL
from .exception.types import DjangoDebugException
@ -9,6 +9,10 @@ class DjangoDebug(ObjectType):
description = "Debugging information for the current query."
sql = List(DjangoDebugSQL, description="Executed SQL queries for this API query.")
sql_count = Int(description="number of executed SQL queries for this API query.")
exceptions = List(
DjangoDebugException, description="Raise exceptions for this API query."
)
def resolve_sql_count(root, info):
return len(root.sql)

View File

@ -1,12 +1,18 @@
import warnings
from collections import OrderedDict
from copy import deepcopy
from typing import Type
from django.db.models import Model, Prefetch, QuerySet
import graphene
from django.db.models import Model
from graphene.relay import Connection, Node
from graphene.types import Dynamic
from graphene.types.mountedtype import MountedType
from graphene.types.objecttype import ObjectType, ObjectTypeOptions
from graphene.types.unmountedtype import UnmountedType
from graphene.types.utils import yank_fields_from_attrs
from graphene.utils.str_converters import to_snake_case
from .converter import convert_django_field_with_choices
from .registry import Registry, get_global_registry
@ -151,6 +157,14 @@ class DjangoObjectType(ObjectType):
_meta=None,
**options
):
# ---- < Fernnado Code > ----
cls._preparation_functions_by_field = {}
cls.register_every_qs_preparation()
cls.register_meta_fields(model, fields)
cls.register_attr_fields()
# ---- </ Fernnado Code > ----
assert is_valid_django_model(model), (
'You need to pass a valid Django Model in {}.Meta, received "{}".'
).format(cls.__name__, model)
@ -295,6 +309,119 @@ class DjangoObjectType(ObjectType):
return None
# ---- < Fernando Code > ----
@staticmethod
def convert_model_field_to_graphene_type(model_field):
graphene_field = convert_django_field_with_choices(model_field, registry=get_global_registry())
return deepcopy(graphene_field)
@classmethod
def prepare(cls, value, selection, info):
if not isinstance(value, (Model, QuerySet,)):
return value # If value is not a qs, it cannot be prepared
if isinstance(value, Model):
should_return_model_instance = True
queryset = type(value._setupfunc()).objects.filter(pk=value.pk)
elif isinstance(value, QuerySet):
should_return_model_instance = False
queryset = value
if queryset.model != cls._meta.model:
raise Exception(f'{cls.__name__}.prepare() received a queryset from {queryset.model} model')
if hasattr(cls, '_every_qs_preparation'):
queryset = cls._every_qs_preparation(queryset, selection, info)
sub_selection_by_field_name = {to_snake_case(s.name.value): s for s in selection.selection_set.selections}
for field_name, functions in cls._preparation_functions_by_field.items():
if field_name in sub_selection_by_field_name:
sub_selection = sub_selection_by_field_name[field_name]
for func in functions:
queryset = func(queryset, sub_selection, info)
if should_return_model_instance:
return queryset.first()
else:
return queryset
@classmethod
def append_field_preparation_function(cls, field_name, preparation_function):
cls._preparation_functions_by_field[field_name] = cls._preparation_functions_by_field.get(field_name, [])
cls._preparation_functions_by_field[field_name].append(preparation_function)
@classmethod
def register_model_field(cls, model, field_name):
model_field = model._meta.get_field(field_name)
related_model = model_field.related_model
graphene_field = cls.convert_model_field_to_graphene_type(model_field)
setattr(cls, field_name, graphene_field)
if related_model:
def prepare_function(queryset, selection, info):
graphene_type = get_type_of_field(cls, field_name)
related_qs = related_model.objects.all()
related_qs = graphene_type.prepare(related_qs, selection, info)
return queryset.prefetch_related(Prefetch(field_name, queryset=related_qs))
cls.append_field_preparation_function(field_name, prepare_function)
@classmethod
def register_annotate_function(cls, field_name, annotate_function):
info_in_params = cls.validate_function_signature(annotate_function, num_params_besides_info=1)
def _annotate_function(queryset, _selection, info):
args = [queryset]
if info_in_params:
args.append(info)
return annotate_function( *args )
cls.append_field_preparation_function(field_name, _annotate_function)
@classmethod
def prepare_prefetch_obj(cls, prefetch_obj, selection, info):
related_model = get_related_model_from_lookup(cls._meta.model, prefetch_obj.prefetch_through)
related_queryset = prefetch_obj.queryset
if related_queryset is None:
related_queryset = related_model.objects.all()
related_type = get_global_registry().get_type_for_model(related_model)
prefetch_obj.queryset = related_type.prepare(related_queryset, selection, info)
@classmethod
def register_every_qs_preparation(cls):
if hasattr(cls, 'every_qs_preparation'):
info_in_params = cls.validate_function_signature(cls.every_qs_preparation, num_params_besides_info=1)
def _every_qs_preparation(queryset, _selection, info):
args = [queryset]
if info_in_params:
args.append(info)
return cls.every_qs_preparation( *args )
cls._every_qs_preparation = _every_qs_preparation
@classmethod
def register_meta_fields(cls, model, fields):
if not fields:
return
model_fields_set = {field.name for field in model._meta.get_fields()}
for field_name in fields:
if field_name in model_fields_set:
cls.register_model_field(model, field_name)
else:
raise Exception(f'{cls.__name__} field registration error ({field_name}): \
specification must be a string of model field name.')
@classmethod
def register_attr_fields(cls):
for field_name in dir(cls):
field_specification = getattr(cls, field_name)
if isinstance(field_specification, (UnmountedType, MountedType,)):
annotate_function = getattr(cls, f'annotate_{field_name}', None)
if annotate_function:
cls.register_annotate_function(field_name, annotate_function)
# ---- </ Fernando Code > ----
class ErrorType(ObjectType):
field = graphene.String(required=True)
messages = graphene.List(graphene.NonNull(graphene.String), required=True)
@ -303,3 +430,29 @@ class ErrorType(ObjectType):
def from_errors(cls, errors):
data = camelize(errors) if graphene_settings.CAMELCASE_ERRORS else errors
return [cls(field=key, messages=value) for key, value in data.items()]
# ---- < Fernando Code > ----
def strip_list_and_nonnull_off(graphene_type):
while hasattr(graphene_type, 'of_type'):
graphene_type = graphene_type.of_type
return graphene_type
def get_type_of_field(graphene_type, field_name):
field = graphene_type._meta.fields[field_name]
if isinstance(field, Dynamic):
field = field.type()
graphene_type_of_field = field.type
graphene_type_of_field = strip_list_and_nonnull_off(graphene_type_of_field)
return graphene_type_of_field
def get_related_model_from_lookup(model_class, lookup):
model_ptr = model_class
for field_name in lookup.split('__'):
model_ptr = model_ptr._meta.get_field(field_name).related_model
return model_ptr
# ---- </ Fernando Code > ----