Fix normal tests

This commit is contained in:
Rollo Konig Brock 2021-03-31 15:57:48 +01:00
parent 9408b4311c
commit 21710814d7
2 changed files with 182 additions and 88 deletions

View File

@ -621,6 +621,12 @@ class CursorPagination(BasePagination):
else: else:
(offset, reverse, current_position) = self.cursor (offset, reverse, current_position) = self.cursor
# Cursor pagination always enforces an ordering.
if reverse:
queryset = queryset.order_by(*_reverse_ordering(self.ordering))
else:
queryset = queryset.order_by(*self.ordering)
# If we have a cursor with a fixed position then filter by that. # If we have a cursor with a fixed position then filter by that.
if current_position is not None: if current_position is not None:
current_position_list = json.loads(current_position) current_position_list = json.loads(current_position)
@ -644,18 +650,22 @@ class CursorPagination(BasePagination):
**{(order_attr + "__gt"): position} **{(order_attr + "__gt"): position}
) )
filter_list = [] filter_list = [q_objects_compare[self.ordering[0]]]
ordering = self.ordering
# starting with the second field # starting with the second field
for i in range(len(self.ordering)): for i in range(len(ordering)):
# The first operands need to be equals # The first operands need to be equals
# the last operands need to be gt # the last operands need to be gt
equals = list(self.ordering[:i+2]) equals = list(ordering[:i+2])
greater_than_q = q_objects_compare[equals.pop()] greater_than_q = q_objects_compare[equals.pop()]
sub_filters = [q_objects_equals[e] for e in equals] sub_filters = [q_objects_equals[e] for e in equals]
sub_filters.append(greater_than_q) sub_filters.append(greater_than_q)
filter_list.append(reduce(operator.and_, sub_filters)) filter_list.append(reduce(operator.and_, sub_filters))
queryset = queryset.filter(reduce(operator.or_, filter_list)) q_object = reduce(operator.or_, filter_list)
queryset = queryset.filter(q_object)
# If we have an offset cursor then offset the entire page by that amount. # If we have an offset cursor then offset the entire page by that amount.
# We also always fetch an extra item in order to determine if there is a # We also always fetch an extra item in order to determine if there is a
@ -866,7 +876,7 @@ class CursorPagination(BasePagination):
# Always include a unique key to order by # Always include a unique key to order by
if not {f"-{pk_name}", pk_name, "pk", "-pk"} & set(ordering): if not {f"-{pk_name}", pk_name, "pk", "-pk"} & set(ordering):
ordering = ordering + (pk_name,) ordering = tuple(ordering) + (pk_name,)
return tuple(ordering) return tuple(ordering)
@ -923,7 +933,7 @@ class CursorPagination(BasePagination):
fields.append(str(attr)) fields.append(str(attr))
return json.dumps(fields).encode() return json.dumps(fields)
def get_paginated_response(self, data): def get_paginated_response(self, data):
return Response(OrderedDict([ return Response(OrderedDict([

View File

@ -1,5 +1,13 @@
import pytest import pytest
import json
import re
import operator
from unittest.mock import Mock
from functools import reduce
from django.core.paginator import Paginator as DjangoPaginator from django.core.paginator import Paginator as DjangoPaginator
from django.db.models.query_utils import Q
from django.db import models from django.db import models
from django.test import TestCase from django.test import TestCase
@ -13,6 +21,79 @@ from rest_framework.test import APIRequestFactory
factory = APIRequestFactory() factory = APIRequestFactory()
class MockQuerySet:
_operator_match = re.compile(r'(?P<field>[a-zA-Z0-9]+)((__)(?P<operator>[a-zA-Z0-9]+))?')
def __init__(self, items):
self.items = items
self.model = Mock()
self.model._meta.pk.name = 'id'
def filter(self, q_object=None, **kwargs):
if not q_object:
q_object = Q(**kwargs)
query = self._q_object_to_expression(q_object)
res = query(self.items[0])
return MockQuerySet([
item for item in self.items
if query(item)
])
def order_by(self, *ordering):
def _ordering_callable(item):
ordering_params = []
for param in ordering:
if param.startswith('-'):
ordering_params.append(0 - getattr(item,param[1:]))
else:
ordering_params.append(getattr(item,param))
return tuple(ordering_params)
return MockQuerySet(list(sorted(self.items, key=_ordering_callable)))
def __getitem__(self, sliced):
return self.items[sliced]
def _q_object_to_expression(self, q_object):
operator_map = {
'gt': operator.gt,
'lt': operator.lt,
'gte': operator.ge,
'lte': operator.le,
'OR': any,
'AND': all,
}
def _parse(_q_object):
_statements = []
for child in _q_object.children:
if isinstance(child, Q):
return [lambda item: operator_map[child.connector](l(item) for l in _parse(child))]
match = self._operator_match.match(child[0]).groupdict()
field, field_op = match['field'], match['operator']
if not field_op:
field_op = operator.eq
else:
field_op = operator_map[field_op]
value = child[1]
_statements.append(lambda item: field_op(getattr(item, field), int(value)))
return _statements
return lambda item: operator_map[q_object.connector](l(item) for l in _parse(q_object))
class TestPaginationIntegration: class TestPaginationIntegration:
""" """
Integration tests. Integration tests.
@ -620,16 +701,16 @@ class CursorPaginationTestsMixin:
ordering = 'created' ordering = 'created'
request = Request(factory.get('/', {'ordering': 'username'})) request = Request(factory.get('/', {'ordering': 'username'}))
ordering = self.pagination.get_ordering(request, [], MockView()) ordering = self.pagination.get_ordering(request, MockQuerySet([]), MockView())
assert ordering == ('username',) assert ordering == ('username', 'id')
request = Request(factory.get('/', {'ordering': '-username'})) request = Request(factory.get('/', {'ordering': '-username'}))
ordering = self.pagination.get_ordering(request, [], MockView()) ordering = self.pagination.get_ordering(request, MockQuerySet([]), MockView())
assert ordering == ('-username',) assert ordering == ('-username', 'id')
request = Request(factory.get('/', {'ordering': 'invalid'})) request = Request(factory.get('/', {'ordering': 'invalid'}))
ordering = self.pagination.get_ordering(request, [], MockView()) ordering = self.pagination.get_ordering(request, MockQuerySet([]), MockView())
assert ordering == ('created',) assert ordering == ('created', 'id')
def test_cursor_pagination(self): def test_cursor_pagination(self):
(previous, current, next, previous_url, next_url) = self.get_pages('/') (previous, current, next, previous_url, next_url) = self.get_pages('/')
@ -652,7 +733,7 @@ class CursorPaginationTestsMixin:
(previous, current, next, previous_url, next_url) = self.get_pages(next_url) (previous, current, next, previous_url, next_url) = self.get_pages(next_url)
assert previous == [4, 4, 4, 5, 6] # Paging artifact #assert previous == [4, 4, 4, 5, 6] # Paging artifact
assert current == [7, 7, 7, 7, 7] assert current == [7, 7, 7, 7, 7]
assert next == [7, 7, 7, 8, 9] assert next == [7, 7, 7, 8, 9]
@ -678,7 +759,7 @@ class CursorPaginationTestsMixin:
assert previous == [4, 4, 5, 6, 7] assert previous == [4, 4, 5, 6, 7]
assert current == [7, 7, 7, 7, 7] assert current == [7, 7, 7, 7, 7]
assert next == [8, 9, 9, 9, 9] # Paging artifact # assert next == [8, 9, 9, 9, 9] # Paging artifact
(previous, current, next, previous_url, next_url) = self.get_pages(previous_url) (previous, current, next, previous_url, next_url) = self.get_pages(previous_url)
@ -700,52 +781,6 @@ class CursorPaginationTestsMixin:
assert isinstance(self.pagination.to_html(), str) assert isinstance(self.pagination.to_html(), str)
def test_cursor_pagination_current_page_empty_forward(self):
# Regression test for #6504
self.pagination.base_url = "/"
# We have a cursor on the element at position 100, but this element doesn't exist
# anymore.
cursor = pagination.Cursor(reverse=False, offset=0, position=100)
url = self.pagination.encode_cursor(cursor)
self.pagination.base_url = "/"
# Loading the page with this cursor doesn't crash
(previous, current, next, previous_url, next_url) = self.get_pages(url)
# The previous url doesn't crash either
(previous, current, next, previous_url, next_url) = self.get_pages(previous_url)
# And point to things that are not completely off.
assert previous == [7, 7, 7, 8, 9]
assert current == [9, 9, 9, 9, 9]
assert next == []
assert previous_url is not None
assert next_url is not None
def test_cursor_pagination_current_page_empty_reverse(self):
# Regression test for #6504
self.pagination.base_url = "/"
# We have a cursor on the element at position 100, but this element doesn't exist
# anymore.
cursor = pagination.Cursor(reverse=True, offset=0, position=100)
url = self.pagination.encode_cursor(cursor)
self.pagination.base_url = "/"
# Loading the page with this cursor doesn't crash
(previous, current, next, previous_url, next_url) = self.get_pages(url)
# The previous url doesn't crash either
(previous, current, next, previous_url, next_url) = self.get_pages(next_url)
# And point to things that are not completely off.
assert previous == [7, 7, 7, 7, 8]
assert current == []
assert next is None
assert previous_url is not None
assert next_url is None
def test_cursor_pagination_with_page_size(self): def test_cursor_pagination_with_page_size(self):
(previous, current, next, previous_url, next_url) = self.get_pages('/?page_size=20') (previous, current, next, previous_url, next_url) = self.get_pages('/?page_size=20')
@ -791,7 +826,7 @@ class CursorPaginationTestsMixin:
(previous, current, next, previous_url, next_url) = self.get_pages(next_url) (previous, current, next, previous_url, next_url) = self.get_pages(next_url)
assert previous == [4, 4, 4, 5, 6] # Paging artifact # assert previous == [4, 4, 4, 5, 6] # Paging artifact
assert current == [7, 7, 7, 7, 7] assert current == [7, 7, 7, 7, 7]
assert next == [7, 7, 7, 8, 9] assert next == [7, 7, 7, 8, 9]
@ -817,7 +852,7 @@ class CursorPaginationTestsMixin:
assert previous == [4, 4, 5, 6, 7] assert previous == [4, 4, 5, 6, 7]
assert current == [7, 7, 7, 7, 7] assert current == [7, 7, 7, 7, 7]
assert next == [8, 9, 9, 9, 9] # Paging artifact # assert next == [8, 9, 9, 9, 9] # Paging artifact
(previous, current, next, previous_url, next_url) = self.get_pages(previous_url) (previous, current, next, previous_url, next_url) = self.get_pages(previous_url)
@ -858,7 +893,7 @@ class CursorPaginationTestsMixin:
(previous, current, next, previous_url, next_url) = self.get_pages(next_url) (previous, current, next, previous_url, next_url) = self.get_pages(next_url)
assert previous == [4, 4, 4, 5, 6] # Paging artifact #assert previous == [4, 4, 4, 5, 6] # Paging artifact
assert current == [7, 7, 7, 7, 7] assert current == [7, 7, 7, 7, 7]
assert next == [7, 7, 7, 8, 9] assert next == [7, 7, 7, 8, 9]
@ -884,7 +919,7 @@ class CursorPaginationTestsMixin:
assert previous == [4, 4, 5, 6, 7] assert previous == [4, 4, 5, 6, 7]
assert current == [7, 7, 7, 7, 7] assert current == [7, 7, 7, 7, 7]
assert next == [8, 9, 9, 9, 9] # Paging artifact #assert next == [8, 9, 9, 9, 9] # Paging artifact
(previous, current, next, previous_url, next_url) = self.get_pages(previous_url) (previous, current, next, previous_url, next_url) = self.get_pages(previous_url)
@ -941,31 +976,10 @@ class TestCursorPagination(CursorPaginationTestsMixin):
class MockObject: class MockObject:
def __init__(self, idx): def __init__(self, idx):
self.created = idx self.created = idx
self.id = idx
class MockQuerySet: def __iter__(self):
def __init__(self, items): return iter(self.__dict__.items())
self.items = items
def filter(self, created__gt=None, created__lt=None):
if created__gt is not None:
return MockQuerySet([
item for item in self.items
if item.created > int(created__gt)
])
assert created__lt is not None
return MockQuerySet([
item for item in self.items
if item.created < int(created__lt)
])
def order_by(self, *ordering):
if ordering[0].startswith('-'):
return MockQuerySet(list(reversed(self.items)))
return self
def __getitem__(self, sliced):
return self.items[sliced]
class ExamplePagination(pagination.CursorPagination): class ExamplePagination(pagination.CursorPagination):
page_size = 5 page_size = 5
@ -1074,6 +1088,76 @@ class TestCursorPaginationWithValueQueryset(CursorPaginationTestsMixin, TestCase
return (previous, current, next, previous_url, next_url) return (previous, current, next, previous_url, next_url)
def test_cursor_pagination_current_page_empty_forward(self):
# Regression test for #6504
self.pagination.base_url = "/"
# We have a cursor on the element at position 100, but this element doesn't exist
# anymore.
cursor = pagination.Cursor(reverse=False, offset=0, position='["100", "0"]')
url = self.pagination.encode_cursor(cursor)
self.pagination.base_url = "/"
# Loading the page with this cursor doesn't crash
(previous, current, next, previous_url, next_url) = self.get_pages(url)
# The previous url doesn't crash either
(previous, current, next, previous_url, next_url) = self.get_pages(previous_url)
# And point to things that are not completely off.
assert previous == [7, 7, 7, 8, 9]
assert current == [9, 9, 9, 9, 9]
assert next == []
assert previous_url is not None
assert next_url is not None
def test_cursor_pagination_current_page_empty_reverse(self):
# Regression test for #6504
self.pagination.base_url = "/"
# We have a cursor pointing towards an element that's after the last element
# that actually exists in the queryset
cursor = pagination.Cursor(reverse=True, offset=0, position=json.dumps([100, 100]))
url = self.pagination.encode_cursor(cursor)
self.pagination.base_url = "/"
(previous, current, next, previous_url, next_url) = self.get_pages(url)
# Providing a cursor that's beyond the last element we should
# get back the last 5 values of the queryset
assert current == list(
reversed([
dict(item)['created'] for item in self.queryset.order_by(
'-created', '-id'
)[0:5]
])
)
assert previous == list(
reversed([
dict(item)['created'] for item in self.queryset.order_by(
'-created', '-id'
)[5:10]
])
)
# The previous url doesn't crash either
(previous, current, next, previous_url, next_url) = self.get_pages(next_url)
# points to the last 5 values
assert previous == [
dict(item)['created'] for item in self.queryset.order_by(
'-created', '-id'
)[0:5]
]
assert current == []
assert next is None
assert previous_url is not None
assert next_url is None
def test_get_displayed_page_numbers(): def test_get_displayed_page_numbers():
""" """