infi.clickhouse_orm/tests/test_querysets.py

432 lines
19 KiB
Python
Raw Permalink Normal View History

2017-04-26 15:46:34 +03:00
# -*- coding: utf-8 -*-
2017-08-16 23:48:18 +03:00
from __future__ import unicode_literals, print_function
2017-04-26 15:46:34 +03:00
import unittest
from infi.clickhouse_orm.database import Database
from infi.clickhouse_orm.query import Q
2017-04-26 15:46:34 +03:00
from .base_test_with_data import *
import logging
from datetime import date, datetime
try:
Enum # exists in Python 3.4+
except NameError:
from enum import Enum # use the enum34 library instead
class QuerySetTestCase(TestCaseWithData):
def setUp(self):
super(QuerySetTestCase, self).setUp()
self.database.insert(self._sample_data())
2017-06-24 12:28:42 +03:00
2017-04-26 15:46:34 +03:00
def _test_qs(self, qs, expected_count):
2017-05-05 15:21:55 +03:00
logging.info(qs.as_sql())
2017-09-10 17:17:04 +03:00
count = 0
2017-04-26 15:46:34 +03:00
for instance in qs:
2017-09-10 17:17:04 +03:00
count += 1
logging.info('\t[%d]\t%s' % (count, instance.to_dict()))
self.assertEqual(count, expected_count)
self.assertEqual(qs.count(), expected_count)
2017-04-26 15:46:34 +03:00
def test_no_filtering(self):
qs = Person.objects_in(self.database)
self._test_qs(qs, len(data))
def test_truthiness(self):
qs = Person.objects_in(self.database)
self.assertTrue(qs.filter(first_name='Connor'))
self.assertFalse(qs.filter(first_name='Willy'))
2018-06-26 15:20:11 +03:00
def test_filter_null_value(self):
qs = Person.objects_in(self.database)
self._test_qs(qs.filter(passport=None), 98)
self._test_qs(qs.exclude(passport=None), 2)
self._test_qs(qs.filter(passport__ne=None), 2)
self._test_qs(qs.exclude(passport__ne=None), 98)
2017-04-26 15:46:34 +03:00
def test_filter_string_field(self):
qs = Person.objects_in(self.database)
self._test_qs(qs.filter(first_name='Ciaran'), 2)
self._test_qs(qs.filter(first_name='ciaran'), 0) # case sensitive
self._test_qs(qs.filter(first_name__iexact='ciaran'), 2) # case insensitive
self._test_qs(qs.filter(first_name__gt='Whilemina'), 4)
self._test_qs(qs.filter(first_name__gte='Whilemina'), 5)
self._test_qs(qs.filter(first_name__lt='Adam'), 1)
self._test_qs(qs.filter(first_name__lte='Adam'), 2)
self._test_qs(qs.filter(first_name__in=('Connor', 'Courtney')), 3) # in tuple
self._test_qs(qs.filter(first_name__in=['Connor', 'Courtney']), 3) # in list
self._test_qs(qs.filter(first_name__in="'Connor', 'Courtney'"), 3) # in string
self._test_qs(qs.filter(first_name__not_in="'Connor', 'Courtney'"), 97)
2017-04-26 15:46:34 +03:00
self._test_qs(qs.filter(first_name__contains='sh'), 3) # case sensitive
self._test_qs(qs.filter(first_name__icontains='sh'), 6) # case insensitive
self._test_qs(qs.filter(first_name__startswith='le'), 0) # case sensitive
self._test_qs(qs.filter(first_name__istartswith='Le'), 2) # case insensitive
self._test_qs(qs.filter(first_name__istartswith=''), 100) # empty prefix
self._test_qs(qs.filter(first_name__endswith='IA'), 0) # case sensitive
self._test_qs(qs.filter(first_name__iendswith='ia'), 3) # case insensitive
self._test_qs(qs.filter(first_name__iendswith=''), 100) # empty suffix
def test_filter_with_q_objects(self):
qs = Person.objects_in(self.database)
self._test_qs(qs.filter(Q(first_name='Ciaran')), 2)
self._test_qs(qs.filter(Q(first_name='Ciaran') | Q(first_name='Chelsea')), 3)
self._test_qs(qs.filter(Q(first_name__in=['Warren', 'Whilemina', 'Whitney']) & Q(height__gte=1.7)), 3)
self._test_qs(qs.filter((Q(first_name__in=['Warren', 'Whilemina', 'Whitney']) & Q(height__gte=1.7) |
(Q(first_name__in=['Victoria', 'Victor', 'Venus']) & Q(height__lt=1.7)))), 4)
self._test_qs(qs.filter(Q(first_name='Elton') & ~Q(last_name='Smith')), 1)
# Check operator precendence
self._test_qs(qs.filter(first_name='Cassady').filter(Q(last_name='Knapp') | Q(last_name='Rogers') | Q(last_name='Gregory')), 2)
self._test_qs(qs.filter(Q(first_name='Cassady') & Q(last_name='Knapp') | Q(first_name='Beatrice') & Q(last_name='Gregory')), 2)
self._test_qs(qs.filter(Q(first_name='Courtney') | Q(first_name='Cassady') & Q(last_name='Knapp')), 3)
def test_filter_unicode_string(self):
self.database.insert([
Person(first_name=u'דונלד', last_name=u'דאק')
])
qs = Person.objects_in(self.database)
self._test_qs(qs.filter(first_name=u'דונלד'), 1)
2017-04-26 15:46:34 +03:00
def test_filter_float_field(self):
qs = Person.objects_in(self.database)
self._test_qs(qs.filter(height__gt=2), 0)
self._test_qs(qs.filter(height__lt=1.61), 4)
self._test_qs(qs.filter(height__lt='1.61'), 4)
self._test_qs(qs.exclude(height__lt='1.61'), 96)
self._test_qs(qs.filter(height__gt=0), 100)
self._test_qs(qs.exclude(height__gt=0), 0)
def test_filter_date_field(self):
qs = Person.objects_in(self.database)
self._test_qs(qs.filter(birthday='1970-12-02'), 1)
self._test_qs(qs.filter(birthday__eq='1970-12-02'), 1)
self._test_qs(qs.filter(birthday__ne='1970-12-02'), 99)
2017-04-26 15:46:34 +03:00
self._test_qs(qs.filter(birthday=date(1970, 12, 2)), 1)
self._test_qs(qs.filter(birthday__lte=date(1970, 12, 2)), 3)
def test_only(self):
qs = Person.objects_in(self.database).only('first_name', 'last_name')
for person in qs:
self.assertTrue(person.first_name)
self.assertTrue(person.last_name)
self.assertFalse(person.height)
self.assertEqual(person.birthday, date(1970, 1, 1))
2017-04-26 15:46:34 +03:00
def test_order_by(self):
qs = Person.objects_in(self.database)
self.assertFalse('ORDER BY' in qs.as_sql())
self.assertFalse(qs.order_by_as_sql())
2017-04-26 15:46:34 +03:00
person = list(qs.order_by('first_name', 'last_name'))[0]
self.assertEqual(person.first_name, 'Abdul')
2017-04-26 15:46:34 +03:00
person = list(qs.order_by('-first_name', '-last_name'))[0]
self.assertEqual(person.first_name, 'Yolanda')
2017-04-26 15:46:34 +03:00
person = list(qs.order_by('height'))[0]
self.assertEqual(person.height, 1.59)
2017-04-26 15:46:34 +03:00
person = list(qs.order_by('-height'))[0]
self.assertEqual(person.height, 1.8)
2017-04-26 15:46:34 +03:00
def test_in_subquery(self):
qs = Person.objects_in(self.database)
self._test_qs(qs.filter(height__in='SELECT max(height) FROM $table'), 2)
self._test_qs(qs.filter(first_name__in=qs.only('last_name')), 2)
self._test_qs(qs.filter(first_name__not_in=qs.only('last_name')), 98)
2017-04-26 15:46:34 +03:00
def _insert_sample_model(self):
self.database.create_table(SampleModel)
now = datetime.now()
self.database.insert([
SampleModel(timestamp=now, num=1, color=Color.red),
SampleModel(timestamp=now, num=2, color=Color.red),
SampleModel(timestamp=now, num=3, color=Color.blue),
SampleModel(timestamp=now, num=4, color=Color.white),
])
def _insert_sample_collapsing_model(self):
self.database.create_table(SampleCollapsingModel)
now = datetime.now()
self.database.insert([
SampleCollapsingModel(timestamp=now, num=1, color=Color.red),
SampleCollapsingModel(timestamp=now, num=2, color=Color.red),
SampleCollapsingModel(timestamp=now, num=2, color=Color.red, sign=-1),
SampleCollapsingModel(timestamp=now, num=2, color=Color.green),
SampleCollapsingModel(timestamp=now, num=3, color=Color.white),
SampleCollapsingModel(timestamp=now, num=4, color=Color.white, sign=1),
SampleCollapsingModel(timestamp=now, num=4, color=Color.white, sign=-1),
SampleCollapsingModel(timestamp=now, num=4, color=Color.blue, sign=1),
])
2017-04-26 15:46:34 +03:00
def test_filter_enum_field(self):
self._insert_sample_model()
qs = SampleModel.objects_in(self.database)
self._test_qs(qs.filter(color=Color.red), 2)
self._test_qs(qs.exclude(color=Color.white), 3)
# Different ways to specify blue
self._test_qs(qs.filter(color__gt=Color.blue), 1)
self._test_qs(qs.filter(color__gt='blue'), 1)
self._test_qs(qs.filter(color__gt=2), 1)
def test_filter_int_field(self):
self._insert_sample_model()
qs = SampleModel.objects_in(self.database)
self._test_qs(qs.filter(num=1), 1)
self._test_qs(qs.filter(num__eq=1), 1)
self._test_qs(qs.filter(num__ne=1), 3)
2017-04-26 15:46:34 +03:00
self._test_qs(qs.filter(num__gt=1), 3)
self._test_qs(qs.filter(num__gte=1), 4)
self._test_qs(qs.filter(num__in=(1, 2, 3)), 3)
2017-04-28 18:18:23 +03:00
self._test_qs(qs.filter(num__in=range(1, 4)), 3)
2017-04-26 15:46:34 +03:00
2017-06-24 12:28:42 +03:00
def test_slicing(self):
db = Database('system')
2017-08-16 17:03:49 +03:00
numbers = list(range(100))
2017-06-24 12:28:42 +03:00
qs = Numbers.objects_in(db)
self.assertEqual(qs[0].number, numbers[0])
self.assertEqual(qs[5].number, numbers[5])
self.assertEqual([row.number for row in qs[:1]], numbers[:1])
self.assertEqual([row.number for row in qs[:10]], numbers[:10])
self.assertEqual([row.number for row in qs[3:10]], numbers[3:10])
self.assertEqual([row.number for row in qs[9:10]], numbers[9:10])
self.assertEqual([row.number for row in qs[10:10]], numbers[10:10])
2017-06-24 12:28:42 +03:00
def test_invalid_slicing(self):
db = Database('system')
qs = Numbers.objects_in(db)
with self.assertRaises(AssertionError):
qs[3:10:2]
with self.assertRaises(AssertionError):
qs[-5]
with self.assertRaises(AssertionError):
qs[:-5]
with self.assertRaises(AssertionError):
qs[50:1]
def test_pagination(self):
qs = Person.objects_in(self.database).order_by('first_name', 'last_name')
# Try different page sizes
for page_size in (1, 2, 7, 10, 30, 100, 150):
# Iterate over pages and collect all intances
page_num = 1
instances = set()
while True:
page = qs.paginate(page_num, page_size)
self.assertEqual(page.number_of_objects, len(data))
self.assertGreater(page.pages_total, 0)
[instances.add(obj.to_tsv()) for obj in page.objects]
if page.pages_total == page_num:
break
page_num += 1
# Verify that all instances were returned
self.assertEqual(len(instances), len(data))
def test_pagination_last_page(self):
qs = Person.objects_in(self.database).order_by('first_name', 'last_name')
# Try different page sizes
for page_size in (1, 2, 7, 10, 30, 100, 150):
# Ask for the last page in two different ways and verify equality
page_a = qs.paginate(-1, page_size)
page_b = qs.paginate(page_a.pages_total, page_size)
self.assertEqual(page_a[1:], page_b[1:])
self.assertEqual([obj.to_tsv() for obj in page_a.objects],
[obj.to_tsv() for obj in page_b.objects])
def test_pagination_invalid_page(self):
qs = Person.objects_in(self.database).order_by('first_name', 'last_name')
for page_num in (0, -2, -100):
with self.assertRaises(ValueError):
qs.paginate(page_num, 100)
def test_pagination_with_conditions(self):
qs = Person.objects_in(self.database).order_by('first_name', 'last_name').filter(first_name__lt='Ava')
page = qs.paginate(1, 100)
self.assertEqual(page.number_of_objects, 10)
2017-09-10 17:17:04 +03:00
def test_distinct(self):
qs = Person.objects_in(self.database).distinct()
self._test_qs(qs, 100)
self._test_qs(qs.only('first_name'), 94)
def test_materialized_field(self):
self._insert_sample_model()
qs = SampleModel.objects_in(self.database)
for obj in qs:
self.assertTrue(obj.materialized_date != DateField.min_value)
def test_alias_field(self):
self._insert_sample_model()
qs = SampleModel.objects_in(self.database)
for obj in qs:
self.assertTrue(obj.num_squared == obj.num ** 2)
def test_count_of_slice(self):
qs = Person.objects_in(self.database)
self._test_qs(qs[:70], 70)
self._test_qs(qs[70:80], 10)
self._test_qs(qs[80:], 20)
def test_final(self):
# Final can be used with CollapsingMergeTree engine only
with self.assertRaises(TypeError):
Person.objects_in(self.database).final()
self._insert_sample_collapsing_model()
res = list(SampleCollapsingModel.objects_in(self.database).final().order_by('num'))
self.assertEqual(4, len(res))
for item, exp_color in zip(res, (Color.red, Color.green, Color.white, Color.blue)):
self.assertEqual(exp_color, item.color)
class AggregateTestCase(TestCaseWithData):
def setUp(self):
super(AggregateTestCase, self).setUp()
self.database.insert(self._sample_data())
def test_aggregate_no_grouping(self):
qs = Person.objects_in(self.database).aggregate(average_height='avg(height)', count='count()')
2017-08-16 17:03:49 +03:00
print(qs.as_sql())
self.assertEqual(qs.count(), 1)
for row in qs:
self.assertAlmostEqual(row.average_height, 1.6923, places=4)
self.assertEqual(row.count, 100)
def test_aggregate_with_filter(self):
# When filter comes before aggregate
qs = Person.objects_in(self.database).filter(first_name='Warren').aggregate(average_height='avg(height)', count='count()')
2017-08-16 17:03:49 +03:00
print(qs.as_sql())
self.assertEqual(qs.count(), 1)
for row in qs:
self.assertAlmostEqual(row.average_height, 1.675, places=4)
self.assertEqual(row.count, 2)
# When filter comes after aggregate
qs = Person.objects_in(self.database).aggregate(average_height='avg(height)', count='count()').filter(first_name='Warren')
2017-08-16 17:03:49 +03:00
print(qs.as_sql())
self.assertEqual(qs.count(), 1)
for row in qs:
self.assertAlmostEqual(row.average_height, 1.675, places=4)
self.assertEqual(row.count, 2)
def test_aggregate_with_implicit_grouping(self):
qs = Person.objects_in(self.database).aggregate('first_name', average_height='avg(height)', count='count()')
2017-08-16 17:03:49 +03:00
print(qs.as_sql())
self.assertEqual(qs.count(), 94)
total = 0
for row in qs:
self.assertTrue(1.5 < row.average_height < 2)
self.assertTrue(0 < row.count < 3)
total += row.count
self.assertEqual(total, 100)
def test_aggregate_with_explicit_grouping(self):
qs = Person.objects_in(self.database).aggregate(weekday='toDayOfWeek(birthday)', count='count()').group_by('weekday')
2017-08-16 17:03:49 +03:00
print(qs.as_sql())
self.assertEqual(qs.count(), 7)
total = 0
for row in qs:
total += row.count
self.assertEqual(total, 100)
def test_aggregate_with_order_by(self):
qs = Person.objects_in(self.database).aggregate(weekday='toDayOfWeek(birthday)', count='count()').group_by('weekday')
days = [row.weekday for row in qs.order_by('weekday')]
self.assertEqual(days, list(range(1, 8)))
def test_aggregate_with_indexing(self):
qs = Person.objects_in(self.database).aggregate(weekday='toDayOfWeek(birthday)', count='count()').group_by('weekday')
total = 0
for i in range(7):
total += qs[i].count
self.assertEqual(total, 100)
def test_aggregate_with_slicing(self):
qs = Person.objects_in(self.database).aggregate(weekday='toDayOfWeek(birthday)', count='count()').group_by('weekday')
total = sum(row.count for row in qs[:3]) + sum(row.count for row in qs[3:])
self.assertEqual(total, 100)
def test_aggregate_with_pagination(self):
qs = Person.objects_in(self.database).aggregate(weekday='toDayOfWeek(birthday)', count='count()').group_by('weekday')
total = 0
page_num = 1
while True:
page = qs.paginate(page_num, page_size=3)
self.assertEqual(page.number_of_objects, 7)
total += sum(row.count for row in page.objects)
if page.pages_total == page_num:
break
page_num += 1
self.assertEqual(total, 100)
def test_aggregate_with_wrong_grouping(self):
with self.assertRaises(AssertionError):
Person.objects_in(self.database).aggregate(weekday='toDayOfWeek(birthday)', count='count()').group_by('first_name')
def test_aggregate_with_no_calculated_fields(self):
with self.assertRaises(AssertionError):
Person.objects_in(self.database).aggregate()
def test_aggregate_with_only(self):
# Cannot put only() after aggregate()
with self.assertRaises(NotImplementedError):
Person.objects_in(self.database).aggregate(weekday='toDayOfWeek(birthday)', count='count()').only('weekday')
# When only() comes before aggregate(), it gets overridden
qs = Person.objects_in(self.database).only('last_name').aggregate(average_height='avg(height)', count='count()')
self.assertTrue('last_name' not in qs.as_sql())
def test_aggregate_on_aggregate(self):
with self.assertRaises(NotImplementedError):
Person.objects_in(self.database).aggregate(weekday='toDayOfWeek(birthday)', count='count()').aggregate(s='sum(height)')
def test_filter_on_calculated_field(self):
# This is currently not supported, so we expect it to fail
with self.assertRaises(AttributeError):
qs = Person.objects_in(self.database).aggregate(weekday='toDayOfWeek(birthday)', count='count()').group_by('weekday')
qs = qs.filter(weekday=1)
self.assertEqual(qs.count(), 1)
2017-09-10 17:17:04 +03:00
def test_aggregate_with_distinct(self):
# In this case distinct has no effect
qs = Person.objects_in(self.database).aggregate(average_height='avg(height)').distinct()
print(qs.as_sql())
self.assertEqual(qs.count(), 1)
2017-09-10 17:17:04 +03:00
def test_double_underscore_field(self):
class Mdl(Model):
the__number = Int32Field()
the__next__number = Int32Field()
engine = Memory()
qs = Mdl.objects_in(self.database).filter(the__number=1)
self.assertEqual(qs.conditions_as_sql(), 'the__number = 1')
qs = Mdl.objects_in(self.database).filter(the__number__gt=1)
self.assertEqual(qs.conditions_as_sql(), 'the__number > 1')
qs = Mdl.objects_in(self.database).filter(the__next__number=1)
self.assertEqual(qs.conditions_as_sql(), 'the__next__number = 1')
qs = Mdl.objects_in(self.database).filter(the__next__number__gt=1)
self.assertEqual(qs.conditions_as_sql(), 'the__next__number > 1')
2017-04-26 15:46:34 +03:00
Color = Enum('Color', u'red blue green yellow brown white black')
class SampleModel(Model):
timestamp = DateTimeField()
materialized_date = DateField(materialized='toDate(timestamp)')
num = Int32Field()
color = Enum8Field(Color)
num_squared = Int32Field(alias='num*num')
2017-04-26 15:46:34 +03:00
2017-06-24 12:28:42 +03:00
engine = MergeTree('materialized_date', ('materialized_date',))
class SampleCollapsingModel(SampleModel):
sign = Int8Field(default=1)
engine = CollapsingMergeTree('materialized_date', ('num',), 'sign')
2017-06-24 12:28:42 +03:00
class Numbers(Model):
2017-08-16 17:03:49 +03:00
number = UInt64Field()