diff --git a/CHANGELOG.md b/CHANGELOG.md index 979edeb..90f49fe 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ Unreleased ---------- - Support for model constraints - Support for data skipping indexes +- Support for mutations: `QuerySet.update` and `QuerySet.delete` - Support FINAL for `ReplacingMergeTree` (chripede) - Added `DateTime64Field` (NiyazNz) - Make `DateTimeField` and `DateTime64Field` timezone-aware (NiyazNz) diff --git a/docs/class_reference.md b/docs/class_reference.md index d415b13..cb718c5 100644 --- a/docs/class_reference.md +++ b/docs/class_reference.md @@ -1192,6 +1192,13 @@ Returns the contents of the query's `WHERE` or `PREWHERE` clause as a string. Returns the number of matching model instances. +#### delete() + + +Deletes all records matched by this queryset's conditions. +Note that ClickHouse performs deletions in the background, so they are not immediate. + + #### distinct() @@ -1268,6 +1275,14 @@ The result is a namedtuple containing `objects` (list), `number_of_objects`, Returns the selected fields or expressions as a SQL string. +#### update(**kwargs) + + +Updates all records matched by this queryset's conditions. +Keyword arguments specify the field names and expressions to use for the update. +Note that ClickHouse performs updates in the background, so they are not immediate. + + ### AggregateQuerySet Extends QuerySet @@ -1315,6 +1330,13 @@ Returns the contents of the query's `WHERE` or `PREWHERE` clause as a string. Returns the number of rows after aggregation. +#### delete() + + +Deletes all records matched by this queryset's conditions. +Note that ClickHouse performs deletions in the background, so they are not immediate. + + #### distinct() @@ -1397,6 +1419,14 @@ The result is a namedtuple containing `objects` (list), `number_of_objects`, Returns the selected fields or expressions as a SQL string. +#### update(**kwargs) + + +Updates all records matched by this queryset's conditions. +Keyword arguments specify the field names and expressions to use for the update. +Note that ClickHouse performs updates in the background, so they are not immediate. + + #### with_totals() diff --git a/docs/querysets.md b/docs/querysets.md index d85ca06..76ecb0e 100644 --- a/docs/querysets.md +++ b/docs/querysets.md @@ -151,7 +151,7 @@ Adds a DISTINCT clause to the query, meaning that any duplicate rows in the resu 94 Final --------- +----- This method can be used only with `CollapsingMergeTree` engine. Adds a FINAL modifier to the query, meaning that the selected data is fully "collapsed" by the engine's sign field. @@ -203,6 +203,23 @@ The `paginate` method returns a `namedtuple` containing the following fields: Note that you should use `QuerySet.order_by` so that the ordering is unique, otherwise there might be inconsistencies in the pagination (such as an instance that appears on two different pages). +Mutations +--------- + +To delete all records that match a queryset's conditions use the `delete` method: + + Person.objects_in(database).filter(first_name='Max').delete() + +To update records that match a queryset's conditions call the `update` method and provide the field names to update and the expressions to use (as keyword arguments): + + Person.objects_in(database).filter(first_name='Max').update(first_name='Maximilian') + +Note a few caveats: + +- ClickHouse cannot update columns that are used in the calculation of the primary or the partition key. +- Mutations happen in the background, so they are not immediate. +- Only tables in the `MergeTree` family support mutations. + Aggregation ----------- diff --git a/docs/toc.md b/docs/toc.md index a0f7e5e..2fd878f 100644 --- a/docs/toc.md +++ b/docs/toc.md @@ -32,6 +32,7 @@ * [Final](querysets.md#final) * [Slicing](querysets.md#slicing) * [Pagination](querysets.md#pagination) + * [Mutations](querysets.md#mutations) * [Aggregation](querysets.md#aggregation) * [Adding totals](querysets.md#adding-totals) diff --git a/src/infi/clickhouse_orm/query.py b/src/infi/clickhouse_orm/query.py index 19cd9f4..92efec4 100644 --- a/src/infi/clickhouse_orm/query.py +++ b/src/infi/clickhouse_orm/query.py @@ -4,7 +4,7 @@ import pytz from copy import copy, deepcopy from math import ceil from datetime import date, datetime -from .utils import comma_join, string_or_func +from .utils import comma_join, string_or_func, arg_to_sql # TODO @@ -547,6 +547,40 @@ class QuerySet(object): qs._final = True return qs + def delete(self): + """ + Deletes all records matched by this queryset's conditions. + Note that ClickHouse performs deletions in the background, so they are not immediate. + """ + self._verify_mutation_allowed() + conditions = (self._where_q & self._prewhere_q).to_sql(self._model_cls) + sql = 'ALTER TABLE $db.`%s` DELETE WHERE %s' % (self._model_cls.table_name(), conditions) + self._database.raw(sql) + return self + + def update(self, **kwargs): + """ + Updates all records matched by this queryset's conditions. + Keyword arguments specify the field names and expressions to use for the update. + Note that ClickHouse performs updates in the background, so they are not immediate. + """ + assert kwargs, 'No fields specified for update' + self._verify_mutation_allowed() + fields = comma_join('`%s` = %s' % (name, arg_to_sql(expr)) for name, expr in kwargs.items()) + conditions = (self._where_q & self._prewhere_q).to_sql(self._model_cls) + sql = 'ALTER TABLE $db.`%s` UPDATE %s WHERE %s' % (self._model_cls.table_name(), fields, conditions) + self._database.raw(sql) + return self + + def _verify_mutation_allowed(self): + ''' + Checks that the queryset's state allows mutations. Raises an AssertionError if not. + ''' + assert not self._limits, 'Mutations are not allowed after slicing the queryset' + assert not self._limit_by, 'Mutations are not allowed after calling limit_by(...)' + assert not self._distinct, 'Mutations are not allowed after calling distinct()' + assert not self._final, 'Mutations are not allowed after calling final()' + def aggregate(self, *args, **kwargs): """ Returns an `AggregateQuerySet` over this query, with `args` serving as @@ -647,6 +681,9 @@ class AggregateQuerySet(QuerySet): qs._grouping_with_totals = True return qs + def _verify_mutation_allowed(self): + raise AssertionError('Cannot mutate an AggregateQuerySet') + # Expose only relevant classes in import * __all__ = [c.__name__ for c in [Q, QuerySet, AggregateQuerySet]] diff --git a/tests/base_test_with_data.py b/tests/base_test_with_data.py index 8cbea48..66d6a25 100644 --- a/tests/base_test_with_data.py +++ b/tests/base_test_with_data.py @@ -21,6 +21,10 @@ class TestCaseWithData(unittest.TestCase): self.database.drop_table(Person) self.database.drop_database() + def _insert_all(self): + self.database.insert(self._sample_data()) + self.assertTrue(self.database.count(Person)) + def _insert_and_check(self, data, count, batch_size=1000): self.database.insert(data, batch_size=batch_size) self.assertEqual(count, self.database.count(Person)) @@ -32,6 +36,7 @@ class TestCaseWithData(unittest.TestCase): yield Person(**entry) + class Person(Model): first_name = StringField() diff --git a/tests/test_mutations.py b/tests/test_mutations.py new file mode 100644 index 0000000..d677ae1 --- /dev/null +++ b/tests/test_mutations.py @@ -0,0 +1,88 @@ +from infi.clickhouse_orm import F +from .base_test_with_data import * +from time import sleep + + +class MutationsTestCase(TestCaseWithData): + + def _wait_for_mutations(self): + sql = 'SELECT * FROM system.mutations WHERE is_done = 0' + while list(self.database.raw(sql)): + sleep(0.25) + + def test_delete_all(self): + self._insert_all() + Person.objects_in(self.database).delete() + self._wait_for_mutations() + self.assertFalse(Person.objects_in(self.database)) + + def test_delete_with_where_cond(self): + self._insert_all() + cond = Person.first_name == 'Cassady' + self.assertTrue(Person.objects_in(self.database).filter(cond)) + Person.objects_in(self.database).filter(cond).delete() + self._wait_for_mutations() + self.assertFalse(Person.objects_in(self.database).filter(cond)) + self.assertTrue(Person.objects_in(self.database).exclude(cond)) + + def test_delete_with_prewhere_cond(self): + self._insert_all() + cond = F.toYear(Person.birthday) == 1977 + self.assertTrue(Person.objects_in(self.database).filter(cond)) + Person.objects_in(self.database).filter(cond, prewhere=True).delete() + self._wait_for_mutations() + self.assertFalse(Person.objects_in(self.database).filter(cond)) + self.assertTrue(Person.objects_in(self.database).exclude(cond)) + + def test_update_all(self): + self._insert_all() + Person.objects_in(self.database).update(height=0) + self._wait_for_mutations() + for p in Person.objects_in(self.database): print(p.height) + self.assertFalse(Person.objects_in(self.database).exclude(height=0)) + + def test_update_with_where_cond(self): + self._insert_all() + cond = Person.first_name == 'Cassady' + Person.objects_in(self.database).filter(cond).update(height=0) + self._wait_for_mutations() + self.assertFalse(Person.objects_in(self.database).filter(cond).exclude(height=0)) + + def test_update_with_prewhere_cond(self): + self._insert_all() + cond = F.toYear(Person.birthday) == 1977 + Person.objects_in(self.database).filter(cond, prewhere=True).update(height=0) + self._wait_for_mutations() + self.assertFalse(Person.objects_in(self.database).filter(cond).exclude(height=0)) + + def test_update_multiple_fields(self): + self._insert_all() + Person.objects_in(self.database).update(height=0, passport=None) + self._wait_for_mutations() + self.assertFalse(Person.objects_in(self.database).exclude(height=0)) + self.assertFalse(Person.objects_in(self.database).exclude(passport=None)) + + def test_chained_update(self): + self._insert_all() + Person.objects_in(self.database).update(height=F.rand()).update(passport=99999) + self._wait_for_mutations() + self.assertFalse(Person.objects_in(self.database).exclude(passport=99999)) + + def test_invalid_state_for_mutations(self): + base_query = Person.objects_in(self.database) + queries = [ + base_query[0:1], + base_query.limit_by(5, 'first_name'), + base_query.distinct(), + base_query.aggregate('first_name', count=F.count()) + ] + for query in queries: + print(query) + with self.assertRaises(AssertionError): + query.delete() + with self.assertRaises(AssertionError): + query.update(height=1.8) + + def test_missing_fields_for_update(self): + with self.assertRaises(AssertionError): + Person.objects_in(self.database).update()