First pass at cursor pagination

This commit is contained in:
Tom Christie 2015-01-17 00:10:43 +00:00
parent dc18040ba4
commit 4919492582
2 changed files with 139 additions and 0 deletions

View File

@ -3,10 +3,12 @@ Pagination serializers determine the structure of the output that should
be used for paginated responses. be used for paginated responses.
""" """
from __future__ import unicode_literals from __future__ import unicode_literals
from base64 import b64encode, b64decode
from collections import namedtuple from collections import namedtuple
from django.core.paginator import InvalidPage, Paginator as DjangoPaginator from django.core.paginator import InvalidPage, Paginator as DjangoPaginator
from django.template import Context, loader from django.template import Context, loader
from django.utils import six from django.utils import six
from django.utils.six.moves.urllib import parse as urlparse
from django.utils.translation import ugettext as _ from django.utils.translation import ugettext as _
from rest_framework.compat import OrderedDict from rest_framework.compat import OrderedDict
from rest_framework.exceptions import NotFound from rest_framework.exceptions import NotFound
@ -377,3 +379,52 @@ class LimitOffsetPagination(BasePagination):
template = loader.get_template(self.template) template = loader.get_template(self.template)
context = Context(self.get_html_context()) context = Context(self.get_html_context())
return template.render(context) return template.render(context)
class CursorPagination(BasePagination):
# reverse
# limit
# multiple orderings
cursor_query_param = 'cursor'
page_size = 5
def paginate_queryset(self, queryset, request, view=None):
self.base_url = request.build_absolute_uri()
self.ordering = self.get_ordering()
encoded = request.query_params.get(self.cursor_query_param)
if encoded is None:
cursor = None
else:
cursor = self.decode_cursor(encoded, self.ordering)
if cursor is not None:
kwargs = {self.ordering + '__gt': cursor}
queryset = queryset.filter(**kwargs)
results = list(queryset[:self.page_size + 1])
self.page = results[:self.page_size]
self.has_next = len(results) > len(self.page)
return self.page
def get_next_link(self):
if not self.has_next:
return None
last_item = self.page[-1]
cursor = self.get_cursor_from_instance(last_item, self.ordering)
encoded = self.encode_cursor(cursor, self.ordering)
return replace_query_param(self.base_url, self.cursor_query_param, encoded)
def get_ordering(self):
return 'created'
def get_cursor_from_instance(self, instance, ordering):
return getattr(instance, ordering)
def decode_cursor(self, encoded, ordering):
items = urlparse.parse_qs(b64decode(encoded))
return items.get(ordering)[0]
def encode_cursor(self, cursor, ordering):
items = [(ordering, cursor)]
return b64encode(urlparse.urlencode(items, doseq=True))

View File

@ -422,6 +422,94 @@ class TestLimitOffset:
assert queryset == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] assert queryset == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
class TestCursorPagination:
"""
Unit tests for `pagination.CursorPagination`.
"""
def setup(self):
class MockObject(object):
def __init__(self, idx):
self.created = idx
class MockQuerySet(object):
def __init__(self, items):
self.items = items
def filter(self, created__gt):
return [
item for item in self.items
if item.created > int(created__gt)
]
def __getitem__(self, sliced):
return self.items[sliced]
self.pagination = pagination.CursorPagination()
self.queryset = MockQuerySet(
[MockObject(idx) for idx in range(1, 21)]
)
def paginate_queryset(self, request):
return list(self.pagination.paginate_queryset(self.queryset, request))
# def get_paginated_content(self, queryset):
# response = self.pagination.get_paginated_response(queryset)
# return response.data
# def get_html_context(self):
# return self.pagination.get_html_context()
def test_following_cursor(self):
request = Request(factory.get('/'))
queryset = self.paginate_queryset(request)
assert [item.created for item in queryset] == [1, 2, 3, 4, 5]
next_url = self.pagination.get_next_link()
assert next_url
request = Request(factory.get(next_url))
queryset = self.paginate_queryset(request)
assert [item.created for item in queryset] == [6, 7, 8, 9, 10]
next_url = self.pagination.get_next_link()
assert next_url
request = Request(factory.get(next_url))
queryset = self.paginate_queryset(request)
assert [item.created for item in queryset] == [11, 12, 13, 14, 15]
next_url = self.pagination.get_next_link()
assert next_url
request = Request(factory.get(next_url))
queryset = self.paginate_queryset(request)
assert [item.created for item in queryset] == [16, 17, 18, 19, 20]
next_url = self.pagination.get_next_link()
assert next_url is None
# assert content == {
# 'results': [1, 2, 3, 4, 5],
# 'previous': None,
# 'next': 'http://testserver/?limit=5&offset=5',
# 'count': 100
# }
# assert context == {
# 'previous_url': None,
# 'next_url': 'http://testserver/?limit=5&offset=5',
# 'page_links': [
# PageLink('http://testserver/?limit=5', 1, True, False),
# PageLink('http://testserver/?limit=5&offset=5', 2, False, False),
# PageLink('http://testserver/?limit=5&offset=10', 3, False, False),
# PAGE_BREAK,
# PageLink('http://testserver/?limit=5&offset=95', 20, False, False),
# ]
# }
# assert self.pagination.display_page_controls
# assert isinstance(self.pagination.to_html(), type(''))
def test_get_displayed_page_numbers(): def test_get_displayed_page_numbers():
""" """
Test our contextual page display function. Test our contextual page display function.