Support for mutations: QuerySet.update and QuerySet.delete

This commit is contained in:
Itai Shirav 2020-06-27 00:02:11 +03:00
parent 40a1e21348
commit 436c296609
7 changed files with 181 additions and 2 deletions

View File

@ -5,6 +5,7 @@ Unreleased
---------- ----------
- Support for model constraints - Support for model constraints
- Support for data skipping indexes - Support for data skipping indexes
- Support for mutations: `QuerySet.update` and `QuerySet.delete`
- Support FINAL for `ReplacingMergeTree` (chripede) - Support FINAL for `ReplacingMergeTree` (chripede)
- Added `DateTime64Field` (NiyazNz) - Added `DateTime64Field` (NiyazNz)
- Make `DateTimeField` and `DateTime64Field` timezone-aware (NiyazNz) - Make `DateTimeField` and `DateTime64Field` timezone-aware (NiyazNz)

View File

@ -1192,6 +1192,13 @@ Returns the contents of the query's `WHERE` or `PREWHERE` clause as a string.
Returns the number of matching model instances. Returns the number of matching model instances.
#### delete()
Deletes all records matched by this queryset's conditions.
Note that ClickHouse performs deletions in the background, so they are not immediate.
#### distinct() #### distinct()
@ -1268,6 +1275,14 @@ The result is a namedtuple containing `objects` (list), `number_of_objects`,
Returns the selected fields or expressions as a SQL string. Returns the selected fields or expressions as a SQL string.
#### update(**kwargs)
Updates all records matched by this queryset's conditions.
Keyword arguments specify the field names and expressions to use for the update.
Note that ClickHouse performs updates in the background, so they are not immediate.
### AggregateQuerySet ### AggregateQuerySet
Extends QuerySet Extends QuerySet
@ -1315,6 +1330,13 @@ Returns the contents of the query's `WHERE` or `PREWHERE` clause as a string.
Returns the number of rows after aggregation. Returns the number of rows after aggregation.
#### delete()
Deletes all records matched by this queryset's conditions.
Note that ClickHouse performs deletions in the background, so they are not immediate.
#### distinct() #### distinct()
@ -1397,6 +1419,14 @@ The result is a namedtuple containing `objects` (list), `number_of_objects`,
Returns the selected fields or expressions as a SQL string. Returns the selected fields or expressions as a SQL string.
#### update(**kwargs)
Updates all records matched by this queryset's conditions.
Keyword arguments specify the field names and expressions to use for the update.
Note that ClickHouse performs updates in the background, so they are not immediate.
#### with_totals() #### with_totals()

View File

@ -151,7 +151,7 @@ Adds a DISTINCT clause to the query, meaning that any duplicate rows in the resu
94 94
Final Final
-------- -----
This method can be used only with `CollapsingMergeTree` engine. This method can be used only with `CollapsingMergeTree` engine.
Adds a FINAL modifier to the query, meaning that the selected data is fully "collapsed" by the engine's sign field. Adds a FINAL modifier to the query, meaning that the selected data is fully "collapsed" by the engine's sign field.
@ -203,6 +203,23 @@ The `paginate` method returns a `namedtuple` containing the following fields:
Note that you should use `QuerySet.order_by` so that the ordering is unique, otherwise there might be inconsistencies in the pagination (such as an instance that appears on two different pages). Note that you should use `QuerySet.order_by` so that the ordering is unique, otherwise there might be inconsistencies in the pagination (such as an instance that appears on two different pages).
Mutations
---------
To delete all records that match a queryset's conditions use the `delete` method:
Person.objects_in(database).filter(first_name='Max').delete()
To update records that match a queryset's conditions call the `update` method and provide the field names to update and the expressions to use (as keyword arguments):
Person.objects_in(database).filter(first_name='Max').update(first_name='Maximilian')
Note a few caveats:
- ClickHouse cannot update columns that are used in the calculation of the primary or the partition key.
- Mutations happen in the background, so they are not immediate.
- Only tables in the `MergeTree` family support mutations.
Aggregation Aggregation
----------- -----------

View File

@ -32,6 +32,7 @@
* [Final](querysets.md#final) * [Final](querysets.md#final)
* [Slicing](querysets.md#slicing) * [Slicing](querysets.md#slicing)
* [Pagination](querysets.md#pagination) * [Pagination](querysets.md#pagination)
* [Mutations](querysets.md#mutations)
* [Aggregation](querysets.md#aggregation) * [Aggregation](querysets.md#aggregation)
* [Adding totals](querysets.md#adding-totals) * [Adding totals](querysets.md#adding-totals)

View File

@ -4,7 +4,7 @@ import pytz
from copy import copy, deepcopy from copy import copy, deepcopy
from math import ceil from math import ceil
from datetime import date, datetime from datetime import date, datetime
from .utils import comma_join, string_or_func from .utils import comma_join, string_or_func, arg_to_sql
# TODO # TODO
@ -547,6 +547,40 @@ class QuerySet(object):
qs._final = True qs._final = True
return qs return qs
def delete(self):
"""
Deletes all records matched by this queryset's conditions.
Note that ClickHouse performs deletions in the background, so they are not immediate.
"""
self._verify_mutation_allowed()
conditions = (self._where_q & self._prewhere_q).to_sql(self._model_cls)
sql = 'ALTER TABLE $db.`%s` DELETE WHERE %s' % (self._model_cls.table_name(), conditions)
self._database.raw(sql)
return self
def update(self, **kwargs):
"""
Updates all records matched by this queryset's conditions.
Keyword arguments specify the field names and expressions to use for the update.
Note that ClickHouse performs updates in the background, so they are not immediate.
"""
assert kwargs, 'No fields specified for update'
self._verify_mutation_allowed()
fields = comma_join('`%s` = %s' % (name, arg_to_sql(expr)) for name, expr in kwargs.items())
conditions = (self._where_q & self._prewhere_q).to_sql(self._model_cls)
sql = 'ALTER TABLE $db.`%s` UPDATE %s WHERE %s' % (self._model_cls.table_name(), fields, conditions)
self._database.raw(sql)
return self
def _verify_mutation_allowed(self):
'''
Checks that the queryset's state allows mutations. Raises an AssertionError if not.
'''
assert not self._limits, 'Mutations are not allowed after slicing the queryset'
assert not self._limit_by, 'Mutations are not allowed after calling limit_by(...)'
assert not self._distinct, 'Mutations are not allowed after calling distinct()'
assert not self._final, 'Mutations are not allowed after calling final()'
def aggregate(self, *args, **kwargs): def aggregate(self, *args, **kwargs):
""" """
Returns an `AggregateQuerySet` over this query, with `args` serving as Returns an `AggregateQuerySet` over this query, with `args` serving as
@ -647,6 +681,9 @@ class AggregateQuerySet(QuerySet):
qs._grouping_with_totals = True qs._grouping_with_totals = True
return qs return qs
def _verify_mutation_allowed(self):
raise AssertionError('Cannot mutate an AggregateQuerySet')
# Expose only relevant classes in import * # Expose only relevant classes in import *
__all__ = [c.__name__ for c in [Q, QuerySet, AggregateQuerySet]] __all__ = [c.__name__ for c in [Q, QuerySet, AggregateQuerySet]]

View File

@ -21,6 +21,10 @@ class TestCaseWithData(unittest.TestCase):
self.database.drop_table(Person) self.database.drop_table(Person)
self.database.drop_database() self.database.drop_database()
def _insert_all(self):
self.database.insert(self._sample_data())
self.assertTrue(self.database.count(Person))
def _insert_and_check(self, data, count, batch_size=1000): def _insert_and_check(self, data, count, batch_size=1000):
self.database.insert(data, batch_size=batch_size) self.database.insert(data, batch_size=batch_size)
self.assertEqual(count, self.database.count(Person)) self.assertEqual(count, self.database.count(Person))
@ -32,6 +36,7 @@ class TestCaseWithData(unittest.TestCase):
yield Person(**entry) yield Person(**entry)
class Person(Model): class Person(Model):
first_name = StringField() first_name = StringField()

88
tests/test_mutations.py Normal file
View File

@ -0,0 +1,88 @@
from infi.clickhouse_orm import F
from .base_test_with_data import *
from time import sleep
class MutationsTestCase(TestCaseWithData):
def _wait_for_mutations(self):
sql = 'SELECT * FROM system.mutations WHERE is_done = 0'
while list(self.database.raw(sql)):
sleep(0.25)
def test_delete_all(self):
self._insert_all()
Person.objects_in(self.database).delete()
self._wait_for_mutations()
self.assertFalse(Person.objects_in(self.database))
def test_delete_with_where_cond(self):
self._insert_all()
cond = Person.first_name == 'Cassady'
self.assertTrue(Person.objects_in(self.database).filter(cond))
Person.objects_in(self.database).filter(cond).delete()
self._wait_for_mutations()
self.assertFalse(Person.objects_in(self.database).filter(cond))
self.assertTrue(Person.objects_in(self.database).exclude(cond))
def test_delete_with_prewhere_cond(self):
self._insert_all()
cond = F.toYear(Person.birthday) == 1977
self.assertTrue(Person.objects_in(self.database).filter(cond))
Person.objects_in(self.database).filter(cond, prewhere=True).delete()
self._wait_for_mutations()
self.assertFalse(Person.objects_in(self.database).filter(cond))
self.assertTrue(Person.objects_in(self.database).exclude(cond))
def test_update_all(self):
self._insert_all()
Person.objects_in(self.database).update(height=0)
self._wait_for_mutations()
for p in Person.objects_in(self.database): print(p.height)
self.assertFalse(Person.objects_in(self.database).exclude(height=0))
def test_update_with_where_cond(self):
self._insert_all()
cond = Person.first_name == 'Cassady'
Person.objects_in(self.database).filter(cond).update(height=0)
self._wait_for_mutations()
self.assertFalse(Person.objects_in(self.database).filter(cond).exclude(height=0))
def test_update_with_prewhere_cond(self):
self._insert_all()
cond = F.toYear(Person.birthday) == 1977
Person.objects_in(self.database).filter(cond, prewhere=True).update(height=0)
self._wait_for_mutations()
self.assertFalse(Person.objects_in(self.database).filter(cond).exclude(height=0))
def test_update_multiple_fields(self):
self._insert_all()
Person.objects_in(self.database).update(height=0, passport=None)
self._wait_for_mutations()
self.assertFalse(Person.objects_in(self.database).exclude(height=0))
self.assertFalse(Person.objects_in(self.database).exclude(passport=None))
def test_chained_update(self):
self._insert_all()
Person.objects_in(self.database).update(height=F.rand()).update(passport=99999)
self._wait_for_mutations()
self.assertFalse(Person.objects_in(self.database).exclude(passport=99999))
def test_invalid_state_for_mutations(self):
base_query = Person.objects_in(self.database)
queries = [
base_query[0:1],
base_query.limit_by(5, 'first_name'),
base_query.distinct(),
base_query.aggregate('first_name', count=F.count())
]
for query in queries:
print(query)
with self.assertRaises(AssertionError):
query.delete()
with self.assertRaises(AssertionError):
query.update(height=1.8)
def test_missing_fields_for_update(self):
with self.assertRaises(AssertionError):
Person.objects_in(self.database).update()