mirror of
				https://github.com/graphql-python/graphene-django.git
				synced 2025-10-26 05:30:59 +03:00 
			
		
		
		
	Co-authored-by: Firas Kafri <3097061+firaskafri@users.noreply.github.com> Co-authored-by: Kien Dang <mail@kien.ai>
		
			
				
	
	
		
			304 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			304 lines
		
	
	
		
			11 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import warnings
 | |
| from collections import OrderedDict
 | |
| from typing import Type  # noqa: F401
 | |
| 
 | |
| from django.db.models import Model  # noqa: F401
 | |
| 
 | |
| import graphene
 | |
| from graphene.relay import Connection, Node
 | |
| from graphene.types.objecttype import ObjectType, ObjectTypeOptions
 | |
| from graphene.types.utils import yank_fields_from_attrs
 | |
| 
 | |
| from .converter import convert_django_field_with_choices
 | |
| from .registry import Registry, get_global_registry
 | |
| from .settings import graphene_settings
 | |
| from .utils import (
 | |
|     DJANGO_FILTER_INSTALLED,
 | |
|     camelize,
 | |
|     get_model_fields,
 | |
|     is_valid_django_model,
 | |
| )
 | |
| 
 | |
| ALL_FIELDS = "__all__"
 | |
| 
 | |
| 
 | |
| def construct_fields(
 | |
|     model, registry, only_fields, exclude_fields, convert_choices_to_enum=None
 | |
| ):
 | |
|     _model_fields = get_model_fields(model)
 | |
| 
 | |
|     fields = OrderedDict()
 | |
|     for name, field in _model_fields:
 | |
|         is_not_in_only = (
 | |
|             only_fields is not None
 | |
|             and only_fields != ALL_FIELDS
 | |
|             and name not in only_fields
 | |
|         )
 | |
|         # is_already_created = name in options.fields
 | |
|         is_excluded = (
 | |
|             exclude_fields is not None and name in exclude_fields
 | |
|         )  # or is_already_created
 | |
|         # https://docs.djangoproject.com/en/1.10/ref/models/fields/#django.db.models.ForeignKey.related_query_name
 | |
|         is_no_backref = str(name).endswith("+")
 | |
|         if is_not_in_only or is_excluded or is_no_backref:
 | |
|             # We skip this field if we specify only_fields and is not
 | |
|             # in there. Or when we exclude this field in exclude_fields.
 | |
|             # Or when there is no back reference.
 | |
|             continue
 | |
| 
 | |
|         _convert_choices_to_enum = convert_choices_to_enum
 | |
|         if isinstance(_convert_choices_to_enum, list):
 | |
|             # then `convert_choices_to_enum` is a list of field names to convert
 | |
|             if name in _convert_choices_to_enum:
 | |
|                 _convert_choices_to_enum = True
 | |
|             else:
 | |
|                 _convert_choices_to_enum = False
 | |
| 
 | |
|         converted = convert_django_field_with_choices(
 | |
|             field, registry, convert_choices_to_enum=_convert_choices_to_enum
 | |
|         )
 | |
|         fields[name] = converted
 | |
| 
 | |
|     return fields
 | |
| 
 | |
| 
 | |
| def validate_fields(type_, model, fields, only_fields, exclude_fields):
 | |
|     # Validate the given fields against the model's fields and custom fields
 | |
|     all_field_names = set(fields.keys())
 | |
|     only_fields = only_fields if only_fields is not ALL_FIELDS else ()
 | |
|     for name in only_fields or ():
 | |
|         if name in all_field_names:
 | |
|             continue
 | |
| 
 | |
|         if hasattr(model, name):
 | |
|             warnings.warn(
 | |
|                 (
 | |
|                     'Field name "{field_name}" matches an attribute on Django model "{app_label}.{object_name}" '
 | |
|                     "but it's not a model field so Graphene cannot determine what type it should be. "
 | |
|                     'Either define the type of the field on DjangoObjectType "{type_}" or remove it from the "fields" list.'
 | |
|                 ).format(
 | |
|                     field_name=name,
 | |
|                     app_label=model._meta.app_label,
 | |
|                     object_name=model._meta.object_name,
 | |
|                     type_=type_,
 | |
|                 )
 | |
|             )
 | |
| 
 | |
|         else:
 | |
|             warnings.warn(
 | |
|                 (
 | |
|                     'Field name "{field_name}" doesn\'t exist on Django model "{app_label}.{object_name}". '
 | |
|                     'Consider removing the field from the "fields" list of DjangoObjectType "{type_}" because it has no effect.'
 | |
|                 ).format(
 | |
|                     field_name=name,
 | |
|                     app_label=model._meta.app_label,
 | |
|                     object_name=model._meta.object_name,
 | |
|                     type_=type_,
 | |
|                 )
 | |
|             )
 | |
| 
 | |
|     # Validate exclude fields
 | |
|     for name in exclude_fields or ():
 | |
|         if name in all_field_names:
 | |
|             # Field is a custom field
 | |
|             warnings.warn(
 | |
|                 f'Excluding the custom field "{name}" on DjangoObjectType "{type_}" has no effect. '
 | |
|                 'Either remove the custom field or remove the field from the "exclude" list.'
 | |
|             )
 | |
|         else:
 | |
|             if not hasattr(model, name):
 | |
|                 warnings.warn(
 | |
|                     (
 | |
|                         'Django model "{app_label}.{object_name}" does not have a field or attribute named "{field_name}". '
 | |
|                         'Consider removing the field from the "exclude" list of DjangoObjectType "{type_}" because it has no effect'
 | |
|                     ).format(
 | |
|                         field_name=name,
 | |
|                         app_label=model._meta.app_label,
 | |
|                         object_name=model._meta.object_name,
 | |
|                         type_=type_,
 | |
|                     )
 | |
|                 )
 | |
| 
 | |
| 
 | |
| class DjangoObjectTypeOptions(ObjectTypeOptions):
 | |
|     model = None  # type: Type[Model]
 | |
|     registry = None  # type: Registry
 | |
|     connection = None  # type: Type[Connection]
 | |
| 
 | |
|     filter_fields = ()
 | |
|     filterset_class = None
 | |
| 
 | |
| 
 | |
| class DjangoObjectType(ObjectType):
 | |
|     @classmethod
 | |
|     def __init_subclass_with_meta__(
 | |
|         cls,
 | |
|         model=None,
 | |
|         registry=None,
 | |
|         skip_registry=False,
 | |
|         only_fields=None,  # deprecated in favour of `fields`
 | |
|         fields=None,
 | |
|         exclude_fields=None,  # deprecated in favour of `exclude`
 | |
|         exclude=None,
 | |
|         filter_fields=None,
 | |
|         filterset_class=None,
 | |
|         connection=None,
 | |
|         connection_class=None,
 | |
|         use_connection=None,
 | |
|         interfaces=(),
 | |
|         convert_choices_to_enum=None,
 | |
|         _meta=None,
 | |
|         **options,
 | |
|     ):
 | |
|         assert is_valid_django_model(model), (
 | |
|             'You need to pass a valid Django Model in {}.Meta, received "{}".'
 | |
|         ).format(cls.__name__, model)
 | |
| 
 | |
|         if not registry:
 | |
|             registry = get_global_registry()
 | |
| 
 | |
|         assert isinstance(registry, Registry), (
 | |
|             f"The attribute registry in {cls.__name__} needs to be an instance of "
 | |
|             f'Registry, received "{registry}".'
 | |
|         )
 | |
| 
 | |
|         if filter_fields and filterset_class:
 | |
|             raise Exception("Can't set both filter_fields and filterset_class")
 | |
| 
 | |
|         if not DJANGO_FILTER_INSTALLED and (filter_fields or filterset_class):
 | |
|             raise Exception(
 | |
|                 "Can only set filter_fields or filterset_class if "
 | |
|                 "Django-Filter is installed"
 | |
|             )
 | |
| 
 | |
|         assert not (fields and exclude), (
 | |
|             "Cannot set both 'fields' and 'exclude' options on "
 | |
|             f"DjangoObjectType {cls.__name__}."
 | |
|         )
 | |
| 
 | |
|         # Alias only_fields -> fields
 | |
|         if only_fields and fields:
 | |
|             raise Exception("Can't set both only_fields and fields")
 | |
|         if only_fields:
 | |
|             warnings.warn(
 | |
|                 "Defining `only_fields` is deprecated in favour of `fields`.",
 | |
|                 DeprecationWarning,
 | |
|                 stacklevel=2,
 | |
|             )
 | |
|             fields = only_fields
 | |
|         if fields and fields != ALL_FIELDS and not isinstance(fields, (list, tuple)):
 | |
|             raise TypeError(
 | |
|                 'The `fields` option must be a list or tuple or "__all__". '
 | |
|                 "Got %s." % type(fields).__name__
 | |
|             )
 | |
| 
 | |
|         # Alias exclude_fields -> exclude
 | |
|         if exclude_fields and exclude:
 | |
|             raise Exception("Can't set both exclude_fields and exclude")
 | |
|         if exclude_fields:
 | |
|             warnings.warn(
 | |
|                 "Defining `exclude_fields` is deprecated in favour of `exclude`.",
 | |
|                 DeprecationWarning,
 | |
|                 stacklevel=2,
 | |
|             )
 | |
|             exclude = exclude_fields
 | |
|         if exclude and not isinstance(exclude, (list, tuple)):
 | |
|             raise TypeError(
 | |
|                 "The `exclude` option must be a list or tuple. Got %s."
 | |
|                 % type(exclude).__name__
 | |
|             )
 | |
| 
 | |
|         if fields is None and exclude is None:
 | |
|             warnings.warn(
 | |
|                 "Creating a DjangoObjectType without either the `fields` "
 | |
|                 "or the `exclude` option is deprecated. Add an explicit `fields "
 | |
|                 f"= '__all__'` option on DjangoObjectType {cls.__name__} to use all "
 | |
|                 "fields",
 | |
|                 DeprecationWarning,
 | |
|                 stacklevel=2,
 | |
|             )
 | |
| 
 | |
|         django_fields = yank_fields_from_attrs(
 | |
|             construct_fields(model, registry, fields, exclude, convert_choices_to_enum),
 | |
|             _as=graphene.Field,
 | |
|         )
 | |
| 
 | |
|         if use_connection is None and interfaces:
 | |
|             use_connection = any(
 | |
|                 issubclass(interface, Node) for interface in interfaces
 | |
|             )
 | |
| 
 | |
|         if use_connection and not connection:
 | |
|             # We create the connection automatically
 | |
|             if not connection_class:
 | |
|                 connection_class = Connection
 | |
| 
 | |
|             connection = connection_class.create_type(
 | |
|                 "{}Connection".format(options.get("name") or cls.__name__), node=cls
 | |
|             )
 | |
| 
 | |
|         if connection is not None:
 | |
|             assert issubclass(
 | |
|                 connection, Connection
 | |
|             ), f"The connection must be a Connection. Received {connection.__name__}"
 | |
| 
 | |
|         if not _meta:
 | |
|             _meta = DjangoObjectTypeOptions(cls)
 | |
| 
 | |
|         _meta.model = model
 | |
|         _meta.registry = registry
 | |
|         _meta.filter_fields = filter_fields
 | |
|         _meta.filterset_class = filterset_class
 | |
|         _meta.fields = django_fields
 | |
|         _meta.connection = connection
 | |
|         _meta.convert_choices_to_enum = convert_choices_to_enum
 | |
| 
 | |
|         super().__init_subclass_with_meta__(
 | |
|             _meta=_meta, interfaces=interfaces, **options
 | |
|         )
 | |
| 
 | |
|         # Validate fields
 | |
|         validate_fields(cls, model, _meta.fields, fields, exclude)
 | |
| 
 | |
|         if not skip_registry:
 | |
|             registry.register(cls)
 | |
| 
 | |
|     def resolve_id(self, info):
 | |
|         return self.pk
 | |
| 
 | |
|     @classmethod
 | |
|     def is_type_of(cls, root, info):
 | |
|         if isinstance(root, cls):
 | |
|             return True
 | |
|         if not is_valid_django_model(root.__class__):
 | |
|             raise Exception(f'Received incompatible instance "{root}".')
 | |
| 
 | |
|         if cls._meta.model._meta.proxy:
 | |
|             model = root._meta.model
 | |
|         else:
 | |
|             model = root._meta.model._meta.concrete_model
 | |
| 
 | |
|         return model == cls._meta.model
 | |
| 
 | |
|     @classmethod
 | |
|     def get_queryset(cls, queryset, info):
 | |
|         return queryset
 | |
| 
 | |
|     @classmethod
 | |
|     def get_node(cls, info, id):
 | |
|         queryset = cls.get_queryset(cls._meta.model.objects, info)
 | |
|         try:
 | |
|             return queryset.get(pk=id)
 | |
|         except cls._meta.model.DoesNotExist:
 | |
|             return None
 | |
| 
 | |
| 
 | |
| class ErrorType(ObjectType):
 | |
|     field = graphene.String(required=True)
 | |
|     messages = graphene.List(graphene.NonNull(graphene.String), required=True)
 | |
| 
 | |
|     @classmethod
 | |
|     def from_errors(cls, errors):
 | |
|         data = camelize(errors) if graphene_settings.CAMELCASE_ERRORS else errors
 | |
|         return [cls(field=key, messages=value) for key, value in data.items()]
 |