mirror of
https://github.com/encode/django-rest-framework.git
synced 2024-11-23 01:57:00 +03:00
37f210a455
Co-authored-by: Lucidiot <lucidiot@protonmail.com> Co-authored-by: dongfangtianyu <dongfangtianyu@qq.com>
617 lines
21 KiB
Python
617 lines
21 KiB
Python
import re
|
|
import warnings
|
|
from collections import Counter, OrderedDict
|
|
from urllib import parse
|
|
|
|
from django.db import models
|
|
from django.utils.encoding import force_text, smart_text
|
|
|
|
from rest_framework import exceptions, serializers
|
|
from rest_framework.compat import coreapi, coreschema, uritemplate
|
|
from rest_framework.settings import api_settings
|
|
from rest_framework.utils import formatting
|
|
|
|
from .generators import BaseSchemaGenerator
|
|
from .inspectors import ViewInspector
|
|
from .utils import get_pk_description, is_list_view
|
|
|
|
# Used in _get_description_section()
|
|
# TODO: ???: move up to base.
|
|
header_regex = re.compile('^[a-zA-Z][0-9A-Za-z_]*:')
|
|
|
|
# Generator #
|
|
# TODO: Pull some of this into base.
|
|
|
|
|
|
def is_custom_action(action):
|
|
return action not in {
|
|
'retrieve', 'list', 'create', 'update', 'partial_update', 'destroy'
|
|
}
|
|
|
|
|
|
def distribute_links(obj):
|
|
for key, value in obj.items():
|
|
distribute_links(value)
|
|
|
|
for preferred_key, link in obj.links:
|
|
key = obj.get_available_key(preferred_key)
|
|
obj[key] = link
|
|
|
|
|
|
INSERT_INTO_COLLISION_FMT = """
|
|
Schema Naming Collision.
|
|
|
|
coreapi.Link for URL path {value_url} cannot be inserted into schema.
|
|
Position conflicts with coreapi.Link for URL path {target_url}.
|
|
|
|
Attempted to insert link with keys: {keys}.
|
|
|
|
Adjust URLs to avoid naming collision or override `SchemaGenerator.get_keys()`
|
|
to customise schema structure.
|
|
"""
|
|
|
|
|
|
class LinkNode(OrderedDict):
|
|
def __init__(self):
|
|
self.links = []
|
|
self.methods_counter = Counter()
|
|
super(LinkNode, self).__init__()
|
|
|
|
def get_available_key(self, preferred_key):
|
|
if preferred_key not in self:
|
|
return preferred_key
|
|
|
|
while True:
|
|
current_val = self.methods_counter[preferred_key]
|
|
self.methods_counter[preferred_key] += 1
|
|
|
|
key = '{}_{}'.format(preferred_key, current_val)
|
|
if key not in self:
|
|
return key
|
|
|
|
|
|
def insert_into(target, keys, value):
|
|
"""
|
|
Nested dictionary insertion.
|
|
|
|
>>> example = {}
|
|
>>> insert_into(example, ['a', 'b', 'c'], 123)
|
|
>>> example
|
|
LinkNode({'a': LinkNode({'b': LinkNode({'c': LinkNode(links=[123])}}})))
|
|
"""
|
|
for key in keys[:-1]:
|
|
if key not in target:
|
|
target[key] = LinkNode()
|
|
target = target[key]
|
|
|
|
try:
|
|
target.links.append((keys[-1], value))
|
|
except TypeError:
|
|
msg = INSERT_INTO_COLLISION_FMT.format(
|
|
value_url=value.url,
|
|
target_url=target.url,
|
|
keys=keys
|
|
)
|
|
raise ValueError(msg)
|
|
|
|
|
|
class SchemaGenerator(BaseSchemaGenerator):
|
|
"""
|
|
Original CoreAPI version.
|
|
"""
|
|
# Map HTTP methods onto actions.
|
|
default_mapping = {
|
|
'get': 'retrieve',
|
|
'post': 'create',
|
|
'put': 'update',
|
|
'patch': 'partial_update',
|
|
'delete': 'destroy',
|
|
}
|
|
|
|
# Map the method names we use for viewset actions onto external schema names.
|
|
# These give us names that are more suitable for the external representation.
|
|
# Set by 'SCHEMA_COERCE_METHOD_NAMES'.
|
|
coerce_method_names = None
|
|
|
|
def __init__(self, title=None, url=None, description=None, patterns=None, urlconf=None):
|
|
assert coreapi, '`coreapi` must be installed for schema support.'
|
|
assert coreschema, '`coreschema` must be installed for schema support.'
|
|
|
|
super(SchemaGenerator, self).__init__(title, url, description, patterns, urlconf)
|
|
self.coerce_method_names = api_settings.SCHEMA_COERCE_METHOD_NAMES
|
|
|
|
def get_links(self, request=None):
|
|
"""
|
|
Return a dictionary containing all the links that should be
|
|
included in the API schema.
|
|
"""
|
|
links = LinkNode()
|
|
|
|
paths, view_endpoints = self._get_paths_and_endpoints(request)
|
|
|
|
# Only generate the path prefix for paths that will be included
|
|
if not paths:
|
|
return None
|
|
prefix = self.determine_path_prefix(paths)
|
|
|
|
for path, method, view in view_endpoints:
|
|
if not self.has_view_permissions(path, method, view):
|
|
continue
|
|
link = view.schema.get_link(path, method, base_url=self.url)
|
|
subpath = path[len(prefix):]
|
|
keys = self.get_keys(subpath, method, view)
|
|
insert_into(links, keys, link)
|
|
|
|
return links
|
|
|
|
def get_schema(self, request=None, public=False):
|
|
"""
|
|
Generate a `coreapi.Document` representing the API schema.
|
|
"""
|
|
self._initialise_endpoints()
|
|
|
|
links = self.get_links(None if public else request)
|
|
if not links:
|
|
return None
|
|
|
|
url = self.url
|
|
if not url and request is not None:
|
|
url = request.build_absolute_uri()
|
|
|
|
distribute_links(links)
|
|
return coreapi.Document(
|
|
title=self.title, description=self.description,
|
|
url=url, content=links
|
|
)
|
|
|
|
# Method for generating the link layout....
|
|
def get_keys(self, subpath, method, view):
|
|
"""
|
|
Return a list of keys that should be used to layout a link within
|
|
the schema document.
|
|
|
|
/users/ ("users", "list"), ("users", "create")
|
|
/users/{pk}/ ("users", "read"), ("users", "update"), ("users", "delete")
|
|
/users/enabled/ ("users", "enabled") # custom viewset list action
|
|
/users/{pk}/star/ ("users", "star") # custom viewset detail action
|
|
/users/{pk}/groups/ ("users", "groups", "list"), ("users", "groups", "create")
|
|
/users/{pk}/groups/{pk}/ ("users", "groups", "read"), ("users", "groups", "update"), ("users", "groups", "delete")
|
|
"""
|
|
if hasattr(view, 'action'):
|
|
# Viewsets have explicitly named actions.
|
|
action = view.action
|
|
else:
|
|
# Views have no associated action, so we determine one from the method.
|
|
if is_list_view(subpath, method, view):
|
|
action = 'list'
|
|
else:
|
|
action = self.default_mapping[method.lower()]
|
|
|
|
named_path_components = [
|
|
component for component
|
|
in subpath.strip('/').split('/')
|
|
if '{' not in component
|
|
]
|
|
|
|
if is_custom_action(action):
|
|
# Custom action, eg "/users/{pk}/activate/", "/users/active/"
|
|
if len(view.action_map) > 1:
|
|
action = self.default_mapping[method.lower()]
|
|
if action in self.coerce_method_names:
|
|
action = self.coerce_method_names[action]
|
|
return named_path_components + [action]
|
|
else:
|
|
return named_path_components[:-1] + [action]
|
|
|
|
if action in self.coerce_method_names:
|
|
action = self.coerce_method_names[action]
|
|
|
|
# Default action, eg "/users/", "/users/{pk}/"
|
|
return named_path_components + [action]
|
|
|
|
# View Inspectors #
|
|
|
|
|
|
def field_to_schema(field):
|
|
title = force_text(field.label) if field.label else ''
|
|
description = force_text(field.help_text) if field.help_text else ''
|
|
|
|
if isinstance(field, (serializers.ListSerializer, serializers.ListField)):
|
|
child_schema = field_to_schema(field.child)
|
|
return coreschema.Array(
|
|
items=child_schema,
|
|
title=title,
|
|
description=description
|
|
)
|
|
elif isinstance(field, serializers.DictField):
|
|
return coreschema.Object(
|
|
title=title,
|
|
description=description
|
|
)
|
|
elif isinstance(field, serializers.Serializer):
|
|
return coreschema.Object(
|
|
properties=OrderedDict([
|
|
(key, field_to_schema(value))
|
|
for key, value
|
|
in field.fields.items()
|
|
]),
|
|
title=title,
|
|
description=description
|
|
)
|
|
elif isinstance(field, serializers.ManyRelatedField):
|
|
related_field_schema = field_to_schema(field.child_relation)
|
|
|
|
return coreschema.Array(
|
|
items=related_field_schema,
|
|
title=title,
|
|
description=description
|
|
)
|
|
elif isinstance(field, serializers.PrimaryKeyRelatedField):
|
|
schema_cls = coreschema.String
|
|
model = getattr(field.queryset, 'model', None)
|
|
if model is not None:
|
|
model_field = model._meta.pk
|
|
if isinstance(model_field, models.AutoField):
|
|
schema_cls = coreschema.Integer
|
|
return schema_cls(title=title, description=description)
|
|
elif isinstance(field, serializers.RelatedField):
|
|
return coreschema.String(title=title, description=description)
|
|
elif isinstance(field, serializers.MultipleChoiceField):
|
|
return coreschema.Array(
|
|
items=coreschema.Enum(enum=list(field.choices)),
|
|
title=title,
|
|
description=description
|
|
)
|
|
elif isinstance(field, serializers.ChoiceField):
|
|
return coreschema.Enum(
|
|
enum=list(field.choices),
|
|
title=title,
|
|
description=description
|
|
)
|
|
elif isinstance(field, serializers.BooleanField):
|
|
return coreschema.Boolean(title=title, description=description)
|
|
elif isinstance(field, (serializers.DecimalField, serializers.FloatField)):
|
|
return coreschema.Number(title=title, description=description)
|
|
elif isinstance(field, serializers.IntegerField):
|
|
return coreschema.Integer(title=title, description=description)
|
|
elif isinstance(field, serializers.DateField):
|
|
return coreschema.String(
|
|
title=title,
|
|
description=description,
|
|
format='date'
|
|
)
|
|
elif isinstance(field, serializers.DateTimeField):
|
|
return coreschema.String(
|
|
title=title,
|
|
description=description,
|
|
format='date-time'
|
|
)
|
|
elif isinstance(field, serializers.JSONField):
|
|
return coreschema.Object(title=title, description=description)
|
|
|
|
if field.style.get('base_template') == 'textarea.html':
|
|
return coreschema.String(
|
|
title=title,
|
|
description=description,
|
|
format='textarea'
|
|
)
|
|
|
|
return coreschema.String(title=title, description=description)
|
|
|
|
|
|
class AutoSchema(ViewInspector):
|
|
"""
|
|
Default inspector for APIView
|
|
|
|
Responsible for per-view introspection and schema generation.
|
|
"""
|
|
def __init__(self, manual_fields=None):
|
|
"""
|
|
Parameters:
|
|
|
|
* `manual_fields`: list of `coreapi.Field` instances that
|
|
will be added to auto-generated fields, overwriting on `Field.name`
|
|
"""
|
|
super(AutoSchema, self).__init__()
|
|
if manual_fields is None:
|
|
manual_fields = []
|
|
self._manual_fields = manual_fields
|
|
|
|
def get_link(self, path, method, base_url):
|
|
"""
|
|
Generate `coreapi.Link` for self.view, path and method.
|
|
|
|
This is the main _public_ access point.
|
|
|
|
Parameters:
|
|
|
|
* path: Route path for view from URLConf.
|
|
* method: The HTTP request method.
|
|
* base_url: The project "mount point" as given to SchemaGenerator
|
|
"""
|
|
fields = self.get_path_fields(path, method)
|
|
fields += self.get_serializer_fields(path, method)
|
|
fields += self.get_pagination_fields(path, method)
|
|
fields += self.get_filter_fields(path, method)
|
|
|
|
manual_fields = self.get_manual_fields(path, method)
|
|
fields = self.update_fields(fields, manual_fields)
|
|
|
|
if fields and any([field.location in ('form', 'body') for field in fields]):
|
|
encoding = self.get_encoding(path, method)
|
|
else:
|
|
encoding = None
|
|
|
|
description = self.get_description(path, method)
|
|
|
|
if base_url and path.startswith('/'):
|
|
path = path[1:]
|
|
|
|
return coreapi.Link(
|
|
url=parse.urljoin(base_url, path),
|
|
action=method.lower(),
|
|
encoding=encoding,
|
|
fields=fields,
|
|
description=description
|
|
)
|
|
|
|
def get_description(self, path, method):
|
|
"""
|
|
Determine a link description.
|
|
|
|
This will be based on the method docstring if one exists,
|
|
or else the class docstring.
|
|
"""
|
|
view = self.view
|
|
|
|
method_name = getattr(view, 'action', method.lower())
|
|
method_docstring = getattr(view, method_name, None).__doc__
|
|
if method_docstring:
|
|
# An explicit docstring on the method or action.
|
|
return self._get_description_section(view, method.lower(), formatting.dedent(smart_text(method_docstring)))
|
|
else:
|
|
return self._get_description_section(view, getattr(view, 'action', method.lower()), view.get_view_description())
|
|
|
|
def _get_description_section(self, view, header, description):
|
|
lines = [line for line in description.splitlines()]
|
|
current_section = ''
|
|
sections = {'': ''}
|
|
|
|
for line in lines:
|
|
if header_regex.match(line):
|
|
current_section, seperator, lead = line.partition(':')
|
|
sections[current_section] = lead.strip()
|
|
else:
|
|
sections[current_section] += '\n' + line
|
|
|
|
# TODO: SCHEMA_COERCE_METHOD_NAMES appears here and in `SchemaGenerator.get_keys`
|
|
coerce_method_names = api_settings.SCHEMA_COERCE_METHOD_NAMES
|
|
if header in sections:
|
|
return sections[header].strip()
|
|
if header in coerce_method_names:
|
|
if coerce_method_names[header] in sections:
|
|
return sections[coerce_method_names[header]].strip()
|
|
return sections[''].strip()
|
|
|
|
def get_path_fields(self, path, method):
|
|
"""
|
|
Return a list of `coreapi.Field` instances corresponding to any
|
|
templated path variables.
|
|
"""
|
|
view = self.view
|
|
model = getattr(getattr(view, 'queryset', None), 'model', None)
|
|
fields = []
|
|
|
|
for variable in uritemplate.variables(path):
|
|
title = ''
|
|
description = ''
|
|
schema_cls = coreschema.String
|
|
kwargs = {}
|
|
if model is not None:
|
|
# Attempt to infer a field description if possible.
|
|
try:
|
|
model_field = model._meta.get_field(variable)
|
|
except Exception:
|
|
model_field = None
|
|
|
|
if model_field is not None and model_field.verbose_name:
|
|
title = force_text(model_field.verbose_name)
|
|
|
|
if model_field is not None and model_field.help_text:
|
|
description = force_text(model_field.help_text)
|
|
elif model_field is not None and model_field.primary_key:
|
|
description = get_pk_description(model, model_field)
|
|
|
|
if hasattr(view, 'lookup_value_regex') and view.lookup_field == variable:
|
|
kwargs['pattern'] = view.lookup_value_regex
|
|
elif isinstance(model_field, models.AutoField):
|
|
schema_cls = coreschema.Integer
|
|
|
|
field = coreapi.Field(
|
|
name=variable,
|
|
location='path',
|
|
required=True,
|
|
schema=schema_cls(title=title, description=description, **kwargs)
|
|
)
|
|
fields.append(field)
|
|
|
|
return fields
|
|
|
|
def get_serializer_fields(self, path, method):
|
|
"""
|
|
Return a list of `coreapi.Field` instances corresponding to any
|
|
request body input, as determined by the serializer class.
|
|
"""
|
|
view = self.view
|
|
|
|
if method not in ('PUT', 'PATCH', 'POST'):
|
|
return []
|
|
|
|
if not hasattr(view, 'get_serializer'):
|
|
return []
|
|
|
|
try:
|
|
serializer = view.get_serializer()
|
|
except exceptions.APIException:
|
|
serializer = None
|
|
warnings.warn('{}.get_serializer() raised an exception during '
|
|
'schema generation. Serializer fields will not be '
|
|
'generated for {} {}.'
|
|
.format(view.__class__.__name__, method, path))
|
|
|
|
if isinstance(serializer, serializers.ListSerializer):
|
|
return [
|
|
coreapi.Field(
|
|
name='data',
|
|
location='body',
|
|
required=True,
|
|
schema=coreschema.Array()
|
|
)
|
|
]
|
|
|
|
if not isinstance(serializer, serializers.Serializer):
|
|
return []
|
|
|
|
fields = []
|
|
for field in serializer.fields.values():
|
|
if field.read_only or isinstance(field, serializers.HiddenField):
|
|
continue
|
|
|
|
required = field.required and method != 'PATCH'
|
|
field = coreapi.Field(
|
|
name=field.field_name,
|
|
location='form',
|
|
required=required,
|
|
schema=field_to_schema(field)
|
|
)
|
|
fields.append(field)
|
|
|
|
return fields
|
|
|
|
def get_pagination_fields(self, path, method):
|
|
view = self.view
|
|
|
|
if not is_list_view(path, method, view):
|
|
return []
|
|
|
|
pagination = getattr(view, 'pagination_class', None)
|
|
if not pagination:
|
|
return []
|
|
|
|
paginator = view.pagination_class()
|
|
return paginator.get_schema_fields(view)
|
|
|
|
def _allows_filters(self, path, method):
|
|
"""
|
|
Determine whether to include filter Fields in schema.
|
|
|
|
Default implementation looks for ModelViewSet or GenericAPIView
|
|
actions/methods that cause filtering on the default implementation.
|
|
|
|
Override to adjust behaviour for your view.
|
|
|
|
Note: Introduced in v3.7: Initially "private" (i.e. with leading underscore)
|
|
to allow changes based on user experience.
|
|
"""
|
|
if getattr(self.view, 'filter_backends', None) is None:
|
|
return False
|
|
|
|
if hasattr(self.view, 'action'):
|
|
return self.view.action in ["list", "retrieve", "update", "partial_update", "destroy"]
|
|
|
|
return method.lower() in ["get", "put", "patch", "delete"]
|
|
|
|
def get_filter_fields(self, path, method):
|
|
if not self._allows_filters(path, method):
|
|
return []
|
|
|
|
fields = []
|
|
for filter_backend in self.view.filter_backends:
|
|
fields += filter_backend().get_schema_fields(self.view)
|
|
return fields
|
|
|
|
def get_manual_fields(self, path, method):
|
|
return self._manual_fields
|
|
|
|
@staticmethod
|
|
def update_fields(fields, update_with):
|
|
"""
|
|
Update list of coreapi.Field instances, overwriting on `Field.name`.
|
|
|
|
Utility function to handle replacing coreapi.Field fields
|
|
from a list by name. Used to handle `manual_fields`.
|
|
|
|
Parameters:
|
|
|
|
* `fields`: list of `coreapi.Field` instances to update
|
|
* `update_with: list of `coreapi.Field` instances to add or replace.
|
|
"""
|
|
if not update_with:
|
|
return fields
|
|
|
|
by_name = OrderedDict((f.name, f) for f in fields)
|
|
for f in update_with:
|
|
by_name[f.name] = f
|
|
fields = list(by_name.values())
|
|
return fields
|
|
|
|
def get_encoding(self, path, method):
|
|
"""
|
|
Return the 'encoding' parameter to use for a given endpoint.
|
|
"""
|
|
view = self.view
|
|
|
|
# Core API supports the following request encodings over HTTP...
|
|
supported_media_types = {
|
|
'application/json',
|
|
'application/x-www-form-urlencoded',
|
|
'multipart/form-data',
|
|
}
|
|
parser_classes = getattr(view, 'parser_classes', [])
|
|
for parser_class in parser_classes:
|
|
media_type = getattr(parser_class, 'media_type', None)
|
|
if media_type in supported_media_types:
|
|
return media_type
|
|
# Raw binary uploads are supported with "application/octet-stream"
|
|
if media_type == '*/*':
|
|
return 'application/octet-stream'
|
|
|
|
return None
|
|
|
|
|
|
class ManualSchema(ViewInspector):
|
|
"""
|
|
Allows providing a list of coreapi.Fields,
|
|
plus an optional description.
|
|
"""
|
|
def __init__(self, fields, description='', encoding=None):
|
|
"""
|
|
Parameters:
|
|
|
|
* `fields`: list of `coreapi.Field` instances.
|
|
* `description`: String description for view. Optional.
|
|
"""
|
|
super(ManualSchema, self).__init__()
|
|
assert all(isinstance(f, coreapi.Field) for f in fields), "`fields` must be a list of coreapi.Field instances"
|
|
self._fields = fields
|
|
self._description = description
|
|
self._encoding = encoding
|
|
|
|
def get_link(self, path, method, base_url):
|
|
|
|
if base_url and path.startswith('/'):
|
|
path = path[1:]
|
|
|
|
return coreapi.Link(
|
|
url=parse.urljoin(base_url, path),
|
|
action=method.lower(),
|
|
encoding=self._encoding,
|
|
fields=self._fields,
|
|
description=self._description
|
|
)
|
|
|
|
|
|
def is_enabled():
|
|
"""Is CoreAPI Mode enabled?"""
|
|
return issubclass(api_settings.DEFAULT_SCHEMA_CLASS, AutoSchema)
|