From 4919492582547d227a22852ad2339fa73739cc94 Mon Sep 17 00:00:00 2001 From: Tom Christie Date: Sat, 17 Jan 2015 00:10:43 +0000 Subject: [PATCH] First pass at cursor pagination --- rest_framework/pagination.py | 51 +++++++++++++++++++++ tests/test_pagination.py | 88 ++++++++++++++++++++++++++++++++++++ 2 files changed, 139 insertions(+) diff --git a/rest_framework/pagination.py b/rest_framework/pagination.py index 1b7524c6c..89d6f9f4f 100644 --- a/rest_framework/pagination.py +++ b/rest_framework/pagination.py @@ -3,10 +3,12 @@ Pagination serializers determine the structure of the output that should be used for paginated responses. """ from __future__ import unicode_literals +from base64 import b64encode, b64decode from collections import namedtuple from django.core.paginator import InvalidPage, Paginator as DjangoPaginator from django.template import Context, loader from django.utils import six +from django.utils.six.moves.urllib import parse as urlparse from django.utils.translation import ugettext as _ from rest_framework.compat import OrderedDict from rest_framework.exceptions import NotFound @@ -377,3 +379,52 @@ class LimitOffsetPagination(BasePagination): template = loader.get_template(self.template) context = Context(self.get_html_context()) return template.render(context) + + +class CursorPagination(BasePagination): + # reverse + # limit + # multiple orderings + cursor_query_param = 'cursor' + page_size = 5 + + def paginate_queryset(self, queryset, request, view=None): + self.base_url = request.build_absolute_uri() + self.ordering = self.get_ordering() + encoded = request.query_params.get(self.cursor_query_param) + + if encoded is None: + cursor = None + else: + cursor = self.decode_cursor(encoded, self.ordering) + + if cursor is not None: + kwargs = {self.ordering + '__gt': cursor} + queryset = queryset.filter(**kwargs) + + results = list(queryset[:self.page_size + 1]) + self.page = results[:self.page_size] + self.has_next = len(results) > len(self.page) + return self.page + + def get_next_link(self): + if not self.has_next: + return None + last_item = self.page[-1] + cursor = self.get_cursor_from_instance(last_item, self.ordering) + encoded = self.encode_cursor(cursor, self.ordering) + return replace_query_param(self.base_url, self.cursor_query_param, encoded) + + def get_ordering(self): + return 'created' + + def get_cursor_from_instance(self, instance, ordering): + return getattr(instance, ordering) + + def decode_cursor(self, encoded, ordering): + items = urlparse.parse_qs(b64decode(encoded)) + return items.get(ordering)[0] + + def encode_cursor(self, cursor, ordering): + items = [(ordering, cursor)] + return b64encode(urlparse.urlencode(items, doseq=True)) diff --git a/tests/test_pagination.py b/tests/test_pagination.py index 7cc923472..7f18b446b 100644 --- a/tests/test_pagination.py +++ b/tests/test_pagination.py @@ -422,6 +422,94 @@ class TestLimitOffset: assert queryset == [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] +class TestCursorPagination: + """ + Unit tests for `pagination.CursorPagination`. + """ + + def setup(self): + class MockObject(object): + def __init__(self, idx): + self.created = idx + + class MockQuerySet(object): + def __init__(self, items): + self.items = items + + def filter(self, created__gt): + return [ + item for item in self.items + if item.created > int(created__gt) + ] + + def __getitem__(self, sliced): + return self.items[sliced] + + self.pagination = pagination.CursorPagination() + self.queryset = MockQuerySet( + [MockObject(idx) for idx in range(1, 21)] + ) + + def paginate_queryset(self, request): + return list(self.pagination.paginate_queryset(self.queryset, request)) + + # def get_paginated_content(self, queryset): + # response = self.pagination.get_paginated_response(queryset) + # return response.data + + # def get_html_context(self): + # return self.pagination.get_html_context() + + def test_following_cursor(self): + request = Request(factory.get('/')) + queryset = self.paginate_queryset(request) + assert [item.created for item in queryset] == [1, 2, 3, 4, 5] + + next_url = self.pagination.get_next_link() + assert next_url + + request = Request(factory.get(next_url)) + queryset = self.paginate_queryset(request) + assert [item.created for item in queryset] == [6, 7, 8, 9, 10] + + next_url = self.pagination.get_next_link() + assert next_url + + request = Request(factory.get(next_url)) + queryset = self.paginate_queryset(request) + assert [item.created for item in queryset] == [11, 12, 13, 14, 15] + + next_url = self.pagination.get_next_link() + assert next_url + + request = Request(factory.get(next_url)) + queryset = self.paginate_queryset(request) + assert [item.created for item in queryset] == [16, 17, 18, 19, 20] + + next_url = self.pagination.get_next_link() + assert next_url is None + + # assert content == { + # 'results': [1, 2, 3, 4, 5], + # 'previous': None, + # 'next': 'http://testserver/?limit=5&offset=5', + # 'count': 100 + # } + # assert context == { + # 'previous_url': None, + # 'next_url': 'http://testserver/?limit=5&offset=5', + # 'page_links': [ + # PageLink('http://testserver/?limit=5', 1, True, False), + # PageLink('http://testserver/?limit=5&offset=5', 2, False, False), + # PageLink('http://testserver/?limit=5&offset=10', 3, False, False), + # PAGE_BREAK, + # PageLink('http://testserver/?limit=5&offset=95', 20, False, False), + # ] + # } + # assert self.pagination.display_page_controls + # assert isinstance(self.pagination.to_html(), type('')) + + def test_get_displayed_page_numbers(): """ Test our contextual page display function.