diff --git a/rest_framework/pagination.py b/rest_framework/pagination.py index d5af2ac81..618352391 100644 --- a/rest_framework/pagination.py +++ b/rest_framework/pagination.py @@ -171,6 +171,8 @@ class PageNumberPagination(BasePagination): template = 'rest_framework/pagination/numbers.html' + invalid_page_message = _('Invalid page "{page_number}": {message}.') + def _handle_backwards_compat(self, view): """ Prior to version 3.1, pagination was handled in the view, and the @@ -203,7 +205,7 @@ class PageNumberPagination(BasePagination): try: self.page = paginator.page(page_number) except InvalidPage as exc: - msg = _('Invalid page "{page_number}": {message}.').format( + msg = self.invalid_page_message.format( page_number=page_number, message=six.text_type(exc) ) raise NotFound(msg) @@ -386,8 +388,8 @@ Cursor = namedtuple('Cursor', ['offset', 'reverse', 'position']) def decode_cursor(encoded): - tokens = urlparse.parse_qs(b64decode(encoded), keep_blank_values=True) try: + tokens = urlparse.parse_qs(b64decode(encoded), keep_blank_values=True) offset = int(tokens['offset'][0]) reverse = bool(int(tokens['reverse'][0])) position = tokens['position'][0] @@ -411,7 +413,8 @@ class CursorPagination(BasePagination): # Support case where ordering is already negative # Support tuple orderings cursor_query_param = 'cursor' - page_size = 5 + page_size = api_settings.PAGINATE_BY + invalid_cursor_message = _('Invalid cursor') def paginate_queryset(self, queryset, request, view=None): self.base_url = request.build_absolute_uri() @@ -424,8 +427,9 @@ class CursorPagination(BasePagination): (offset, reverse, current_position) = (0, False, '') else: self.cursor = decode_cursor(encoded) + if self.cursor is None: + raise NotFound(self.invalid_cursor_message) (offset, reverse, current_position) = self.cursor - # TODO: Invalid cursors should 404 # Cursor pagination always enforces an ordering. if reverse: @@ -458,7 +462,7 @@ class CursorPagination(BasePagination): # If we have a reverse queryset, then the query ordering was in reverse # so we need to reverse the items again before returning them to the user. if reverse: - self.page = reversed(self.page) + self.page = list(reversed(self.page)) if reverse: # Determine next and previous positions for reverse cursors. @@ -483,8 +487,14 @@ class CursorPagination(BasePagination): if not self.has_next: return None - compare = self.next_position + if self.cursor and self.cursor.reverse and self.cursor.offset != 0: + # If we're reversing direction and we have an offset cursor + # then we cannot use the first position we find as a marker. + compare = self._get_position_from_instance(self.page[-1], self.ordering) + else: + compare = self.next_position offset = 0 + for item in reversed(self.page): position = self._get_position_from_instance(item, self.ordering) if position != compare: @@ -526,8 +536,14 @@ class CursorPagination(BasePagination): if not self.has_previous: return None - compare = self.previous_position + if self.cursor and not self.cursor.reverse and self.cursor.offset != 0: + # If we're reversing direction and we have an offset cursor + # then we cannot use the first position we find as a marker. + compare = self._get_position_from_instance(self.page[0], self.ordering) + else: + compare = self.previous_position offset = 0 + for item in self.page: position = self._get_position_from_instance(item, self.ordering) if position != compare: diff --git a/tests/test_pagination.py b/tests/test_pagination.py index 4907a0807..e32dd0288 100644 --- a/tests/test_pagination.py +++ b/tests/test_pagination.py @@ -451,171 +451,127 @@ class TestCursorPagination: def order_by(self, ordering): if ordering.startswith('-'): - return MockQuerySet(reversed(self.items)) + return MockQuerySet(list(reversed(self.items))) return self def __getitem__(self, sliced): return self.items[sliced] - self.pagination = pagination.CursorPagination() - self.queryset = MockQuerySet( - [MockObject(idx) for idx in range(1, 16)] - ) + class ExamplePagination(pagination.CursorPagination): + page_size = 5 - def paginate_queryset(self, request): - return list(self.pagination.paginate_queryset(self.queryset, request)) - - # def get_paginated_content(self, queryset): - # response = self.pagination.get_paginated_response(queryset) - # return response.data - - # def get_html_context(self): - # return self.pagination.get_html_context() - - def test_following_cursor(self): - request = Request(factory.get('/')) - queryset = self.paginate_queryset(request) - assert [item.created for item in queryset] == [1, 2, 3, 4, 5] - - next_url = self.pagination.get_next_link() - assert next_url - - request = Request(factory.get(next_url)) - queryset = self.paginate_queryset(request) - assert [item.created for item in queryset] == [6, 7, 8, 9, 10] - - next_url = self.pagination.get_next_link() - assert next_url - - request = Request(factory.get(next_url)) - queryset = self.paginate_queryset(request) - assert [item.created for item in queryset] == [11, 12, 13, 14, 15] - - next_url = self.pagination.get_next_link() - assert next_url is None - - # Now page back again - - previous_url = self.pagination.get_previous_link() - assert previous_url - - request = Request(factory.get(previous_url)) - queryset = self.paginate_queryset(request) - assert [item.created for item in queryset] == [6, 7, 8, 9, 10] - - previous_url = self.pagination.get_previous_link() - assert previous_url - - request = Request(factory.get(previous_url)) - queryset = self.paginate_queryset(request) - assert [item.created for item in queryset] == [1, 2, 3, 4, 5] - - previous_url = self.pagination.get_previous_link() - assert previous_url is None - - -class TestCrazyCursorPagination: - """ - Unit tests for `pagination.CursorPagination`. - """ - - def setup(self): - class MockObject(object): - def __init__(self, idx): - self.created = idx - - class MockQuerySet(object): - def __init__(self, items): - self.items = items - - def filter(self, created__gt=None, created__lt=None): - if created__gt is not None: - return MockQuerySet([ - item for item in self.items - if item.created > int(created__gt) - ]) - - assert created__lt is not None - return MockQuerySet([ - item for item in self.items - if item.created < int(created__lt) - ]) - - def order_by(self, ordering): - if ordering.startswith('-'): - return MockQuerySet(reversed(self.items)) - return self - - def __getitem__(self, sliced): - return self.items[sliced] - - self.pagination = pagination.CursorPagination() + self.pagination = ExamplePagination() self.queryset = MockQuerySet([ MockObject(idx) for idx in [ 1, 1, 1, 1, 1, - 1, 1, 1, 1, 1, - 1, 1, 2, 3, 4, - 5, 6, 7, 8, 9 + 1, 2, 3, 4, 4, + 4, 4, 5, 6, 7, + 7, 7, 7, 7, 7, + 7, 7, 7, 8, 9, + 9, 9, 9, 9, 9 ] ]) - def paginate_queryset(self, request): - return list(self.pagination.paginate_queryset(self.queryset, request)) + def get_pages(self, url): + """ + Given a URL return a tuple of: - def test_following_cursor_identical_items(self): - request = Request(factory.get('/')) - queryset = self.paginate_queryset(request) - assert [item.created for item in queryset] == [1, 1, 1, 1, 1] + (previous page, current page, next page, previous url, next url) + """ + request = Request(factory.get(url)) + queryset = self.pagination.paginate_queryset(self.queryset, request) + current = [item.created for item in queryset] next_url = self.pagination.get_next_link() - assert next_url - - request = Request(factory.get(next_url)) - queryset = self.paginate_queryset(request) - assert [item.created for item in queryset] == [1, 1, 1, 1, 1] - - next_url = self.pagination.get_next_link() - assert next_url - - request = Request(factory.get(next_url)) - queryset = self.paginate_queryset(request) - assert [item.created for item in queryset] == [1, 1, 2, 3, 4] - - next_url = self.pagination.get_next_link() - assert next_url - - request = Request(factory.get(next_url)) - queryset = self.paginate_queryset(request) - assert [item.created for item in queryset] == [5, 6, 7, 8, 9] - - next_url = self.pagination.get_next_link() - assert next_url is None - - # Now page back again - previous_url = self.pagination.get_previous_link() - assert previous_url - request = Request(factory.get(previous_url)) - queryset = self.paginate_queryset(request) - assert [item.created for item in queryset] == [1, 1, 2, 3, 4] + if next_url is not None: + request = Request(factory.get(next_url)) + queryset = self.pagination.paginate_queryset(self.queryset, request) + next = [item.created for item in queryset] + else: + next = None - previous_url = self.pagination.get_previous_link() - assert previous_url + if previous_url is not None: + request = Request(factory.get(previous_url)) + queryset = self.pagination.paginate_queryset(self.queryset, request) + previous = [item.created for item in queryset] + else: + previous = None - request = Request(factory.get(previous_url)) - queryset = self.paginate_queryset(request) - assert [item.created for item in queryset] == [1, 1, 1, 1, 1] + return (previous, current, next, previous_url, next_url) - previous_url = self.pagination.get_previous_link() - assert previous_url + def test_invalid_cursor(self): + request = Request(factory.get('/', {'cursor': '123'})) + with pytest.raises(exceptions.NotFound): + self.pagination.paginate_queryset(self.queryset, request) - request = Request(factory.get(previous_url)) - queryset = self.paginate_queryset(request) - assert [item.created for item in queryset] == [1, 1, 1, 1, 1] + def test_cursor_pagination(self): + (previous, current, next, previous_url, next_url) = self.get_pages('/') - previous_url = self.pagination.get_previous_link() - assert previous_url is None + assert previous is None + assert current == [1, 1, 1, 1, 1] + assert next == [1, 2, 3, 4, 4] + + (previous, current, next, previous_url, next_url) = self.get_pages(next_url) + + assert previous == [1, 1, 1, 1, 1] + assert current == [1, 2, 3, 4, 4] + assert next == [4, 4, 5, 6, 7] + + (previous, current, next, previous_url, next_url) = self.get_pages(next_url) + + assert previous == [1, 2, 3, 4, 4] + assert current == [4, 4, 5, 6, 7] + assert next == [7, 7, 7, 7, 7] + + (previous, current, next, previous_url, next_url) = self.get_pages(next_url) + + assert previous == [4, 4, 4, 5, 6] # Paging artifact + assert current == [7, 7, 7, 7, 7] + assert next == [7, 7, 7, 8, 9] + + (previous, current, next, previous_url, next_url) = self.get_pages(next_url) + + assert previous == [7, 7, 7, 7, 7] + assert current == [7, 7, 7, 8, 9] + assert next == [9, 9, 9, 9, 9] + + (previous, current, next, previous_url, next_url) = self.get_pages(next_url) + + assert previous == [7, 7, 7, 8, 9] + assert current == [9, 9, 9, 9, 9] + assert next is None + + (previous, current, next, previous_url, next_url) = self.get_pages(previous_url) + + assert previous == [7, 7, 7, 7, 7] + assert current == [7, 7, 7, 8, 9] + assert next == [9, 9, 9, 9, 9] + + (previous, current, next, previous_url, next_url) = self.get_pages(previous_url) + + assert previous == [4, 4, 5, 6, 7] + assert current == [7, 7, 7, 7, 7] + assert next == [8, 9, 9, 9, 9] # Paging artifact + + (previous, current, next, previous_url, next_url) = self.get_pages(previous_url) + + assert previous == [1, 2, 3, 4, 4] + assert current == [4, 4, 5, 6, 7] + assert next == [7, 7, 7, 7, 7] + + (previous, current, next, previous_url, next_url) = self.get_pages(previous_url) + + assert previous == [1, 1, 1, 1, 1] + assert current == [1, 2, 3, 4, 4] + assert next == [4, 4, 5, 6, 7] + + (previous, current, next, previous_url, next_url) = self.get_pages(previous_url) + + assert previous is None + assert current == [1, 1, 1, 1, 1] + assert next == [1, 2, 3, 4, 4] def test_get_displayed_page_numbers():