Merge pull request #2 from carrotquest/namedtuple

Namedtuple
This commit is contained in:
M1ha Shvn 2019-02-04 11:13:22 +05:00 committed by GitHub
commit 2128dd0efc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 268 additions and 137 deletions

4
.gitignore vendored
View File

@ -3,6 +3,9 @@ __pycache__/
*.py[cod]
*$py.class
# JetBrains files
.idea
# C extensions
*.so
@ -102,3 +105,4 @@ venv.bak/
# mypy
.mypy_cache/
/.idea/dataSources.xml

View File

@ -5,4 +5,5 @@ typing
psycopg2
infi.clickhouse-orm
celery
statsd
statsd
django-pg-returning

View File

@ -2,28 +2,27 @@
This file defines base abstract models to inherit from
"""
import datetime
from collections import defaultdict
from itertools import chain
from typing import List, Tuple, Iterable, Set, Any, Dict, Optional
import logging
import pytz
from collections import defaultdict
from copy import deepcopy
from itertools import chain
from typing import List, Tuple, Iterable, Set, Any, Optional
from django.db.models import Model as DjangoModel, QuerySet as DjangoQuerySet
from infi.clickhouse_orm.database import Database
from infi.clickhouse_orm.engines import CollapsingMergeTree
from infi.clickhouse_orm.models import Model as InfiModel, ModelBase as InfiModelBase
from six import with_metaclass
from statsd.defaults.django import statsd
from .compatibility import namedtuple
from .configuration import config
from .database import connections
from .database import connections, Database
from .exceptions import RedisLockTimeoutError
from .models import ClickHouseSyncModel
from .query import QuerySet
from .serializers import Django2ClickHouseModelSerializer
from .utils import lazy_class_import, exec_multi_arg_func
logger = logging.getLogger('django-clickhouse')
@ -63,40 +62,20 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
# This attribute is initialized in metaclass, as it must get model class as a parameter
objects = None # type: QuerySet
def __init__(self, **kwargs):
multi_init = kwargs.pop('__multi_init', False)
if multi_init:
pass
else:
super(ClickHouseModel, self).__init__(**kwargs)
@classmethod
def init_many(cls, kwargs_list, database=None):
# type: (Iterable[Dict[str, Any]], Optional[Database]) -> Iterable['ClickHouseModel']
"""
Basic __init__ methods if not effective if we need to init 100k objects
:return: A list of inited classes
"""
assert database is None or isinstance(database, Database), "database must be database.Database instance"
def get_tuple_class(cls, field_names=None, defaults=None):
field_names = field_names or cls.fields(writable=False).keys()
# Assign default values
valid_field_names = set(cls._fields.keys())
for kwargs in kwargs_list:
invalid_fields = set(kwargs.keys()) - valid_field_names
if invalid_fields:
raise AttributeError('%s does not have a fields called %s' % (cls.__name__, ', '.join(invalid_fields)))
# Strange, but sometimes the columns are in different order...
field_names = tuple(sorted(field_names))
item = cls(__multi_init=True)
item.__dict__.update(cls._defaults)
item._database = database
if defaults:
defaults_new = deepcopy(cls._defaults)
defaults_new.update(defaults)
else:
defaults_new = cls._defaults
for name in kwargs:
field = cls._fields[name]
kwargs[name] = field.to_python(kwargs[name], pytz.utc)
field.validate(kwargs[name])
item.__dict__.update(kwargs)
yield item
return namedtuple("%sTuple" % cls.__name__, field_names, defaults=defaults_new)
@classmethod
def objects_in(cls, database): # type: (Database) -> QuerySet
@ -128,9 +107,10 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
return connections[db_alias]
@classmethod
def get_django_model_serializer(cls, writable=False): # type: (bool) -> Django2ClickHouseModelSerializer
def get_django_model_serializer(cls, writable=False, defaults=None):
# type: (bool, Optional[dict]) -> Django2ClickHouseModelSerializer
serializer_cls = lazy_class_import(cls.django_model_serializer)
return serializer_cls(cls, writable=writable)
return serializer_cls(cls, writable=writable, defaults=defaults)
@classmethod
def get_sync_batch_size(cls):
@ -223,8 +203,7 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
:return:
"""
if batch:
# +10 constant is to avoid 0 insert, generated by infi
cls.get_database(for_write=True).insert(batch, batch_size=len(batch) + 10)
cls.get_database(for_write=True).insert_tuples(cls, batch)
@classmethod
def sync_batch_from_storage(cls):
@ -257,7 +236,6 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
if import_objects:
with statsd.timer(statsd_key.format('steps.get_insert_batch')):
batch = cls.get_insert_batch(import_objects)
statsd.incr(statsd_key.format('insert_batch'), len(batch))
with statsd.timer(statsd_key.format('steps.insert')):
cls.insert_batch(batch)
@ -333,8 +311,6 @@ class ClickHouseMultiModel(ClickHouseModel):
model_statsd_key = "%s.sync.%s.{0}" % (config.STATSD_PREFIX, model_cls.__name__)
with statsd.timer(model_statsd_key.format('steps.get_insert_batch')):
batch = model_cls.get_insert_batch(import_objects)
statsd.incr(model_statsd_key.format('insert_batch'), len(batch))
statsd.incr(statsd_key.format('insert_batch'), len(batch))
return model_cls, batch
res = exec_multi_arg_func(_sub_model_func, cls.sub_models, threads_count=len(cls.sub_models))

View File

@ -0,0 +1,26 @@
import sys
from collections import namedtuple as basenamedtuple, Mapping
from functools import lru_cache
from copy import deepcopy
def namedtuple(*args, **kwargs):
"""
Changes namedtuple to support defaults parameter as python 3.7 does
https://docs.python.org/3.7/library/collections.html#collections.namedtuple
See https://stackoverflow.com/questions/11351032/namedtuple-and-default-values-for-optional-keyword-arguments
:return: namedtuple class
"""
if sys.version_info < (3, 7):
defaults = kwargs.pop('defaults', {})
TupleClass = basenamedtuple(*args, **kwargs)
TupleClass.__new__.__defaults__ = (None,) * len(TupleClass._fields)
if isinstance(defaults, Mapping):
prototype = TupleClass(**defaults)
else:
prototype = TupleClass(*defaults)
TupleClass.__new__.__defaults__ = tuple(prototype)
return TupleClass
else:
return basenamedtuple(*args, **kwargs)

View File

@ -1,6 +1,10 @@
from infi.clickhouse_orm.database import Database as InfiDatabase
from typing import Generator, Optional, Type, Iterable
from infi.clickhouse_orm.database import Database as InfiDatabase, DatabaseException
from infi.clickhouse_orm.utils import parse_tsv
from six import next
from io import BytesIO
from statsd.defaults.django import statsd
from .configuration import config
from .exceptions import DBAliasError
@ -27,29 +31,92 @@ class Database(InfiDatabase):
def _get_applied_migrations(self, migrations_package_name):
raise NotImplementedError("This method is not supported by django_clickhouse.")
def select_init_many(self, query, model_class, settings=None):
def select_tuples(self, query, model_class, settings=None):
# type: (str, Type['ClickHouseModel'], Optional[dict], Optional[dict]) -> Generator[tuple]
"""
Base select doesn't use init_mult which is ineffective on big result lists
This method selects model_class namedtuples, instead of class instances.
Less memory consumption, greater speed
:param query: Query to execute. Can contain substitution variables.
:param model_class: A class of model to get fields from
:param settings: Optional connection settings
:return: Generator of namedtuple objects
"""
query += ' FORMAT TabSeparatedWithNames'
query = self._substitute(query, model_class)
r = self._send(query, settings, True)
lines = r.iter_lines()
field_names = parse_tsv(next(lines))
fields = [
field for name, field in model_class.fields(writable=True).items()
if name in field_names
]
res_class = model_class.get_tuple_class(field_names)
kwargs_list = []
for line in lines:
# skip blank line left by WITH TOTALS modifier
if line:
values = iter(parse_tsv(line))
kwargs = {}
for name in field_names:
field = getattr(model_class, name)
kwargs[name] = field.to_python(next(values), self.server_timezone)
item = res_class(**{
field_name: fields[i].to_python(next(values), self.server_timezone)
for i, field_name in enumerate(field_names)
})
kwargs_list.append(kwargs)
yield item
return model_class.init_many(kwargs_list, database=self)
def insert_tuples(self, model_class, model_tuples, batch_size=None):
# type: (Type['ClickHouseModel'], Iterable[tuple], Optional[int]) -> None
"""
Inserts model_class namedtuples
:param model_class: Clickhouse model, namedtuples are made from
:param model_tuples: An iterable of tuples to insert
:param batch_size: Size of batch
:return: None
"""
tuples_iterator = iter(model_tuples)
try:
first_tuple = next(tuples_iterator)
except StopIteration:
return # model_instances is empty
if model_class.is_read_only() or model_class.is_system_model():
raise DatabaseException("You can't insert into read only and system tables")
fields_list = ','.join('`%s`' % name for name in first_tuple._fields)
fields_dict = model_class.fields(writable=True)
fields = [fields_dict[name] for name in first_tuple._fields]
statsd_key = "%s.inserted_tuples.%s.{0}" % (config.STATSD_PREFIX, model_class.__name__)
def tuple_to_csv(tup):
return '\t'.join(field.to_db_string(val, quote=False) for field, val in zip(fields, tup)) + '\n'
def gen():
buf = BytesIO()
query = 'INSERT INTO `%s`.`%s` (%s) FORMAT TabSeparated\n' \
% (self.db_name, model_class.table_name(), fields_list)
buf.write(query.encode('utf-8'))
buf.write(tuple_to_csv(first_tuple).encode('utf-8'))
# Collect lines in batches of batch_size
lines = 1
for t in tuples_iterator:
buf.write(tuple_to_csv(t).encode('utf-8'))
lines += 1
if batch_size is not None and lines >= batch_size:
# Return the current batch of lines
statsd.incr(statsd_key.format('insert_batch'), lines)
yield buf.getvalue()
# Start a new batch
buf = BytesIO()
lines = 0
# Return any remaining lines in partial batch
if lines:
statsd.incr(statsd_key.format('insert_batch'), count=lines)
yield buf.getvalue()
self._send(gen())
class ConnectionProxy:

View File

@ -2,31 +2,28 @@
This file contains wrappers for infi.clckhouse_orm engines to use in django-clickhouse
"""
import datetime
from typing import List, TypeVar, Type, Union, Iterable
from typing import List, Type, Union, Iterable, Generator
from django.db.models import Model as DjangoModel
from infi.clickhouse_orm import engines as infi_engines
from infi.clickhouse_orm.models import Model as InfiModel
from statsd.defaults.django import statsd
from django_clickhouse.database import connections
from .configuration import config
from .database import connections
from .utils import format_datetime
T = TypeVar('T')
class InsertOnlyEngineMixin:
def get_insert_batch(self, model_cls, objects):
# type: (Type[T], List[DjangoModel]) -> List[T]
# type: (Type['ClickHouseModel'], List[DjangoModel]) -> Generator[tuple]
"""
Gets a list of model_cls instances to insert into database
:param model_cls: ClickHouseModel subclass to import
:param objects: A list of django Model instances to sync
:return: An iterator of model_cls instances
:return: A generator of model_cls named tuples
"""
serializer = model_cls.get_django_model_serializer(writable=True)
return list(serializer.serialize_many(objects))
return (serializer.serialize(obj) for obj in objects)
class MergeTree(InsertOnlyEngineMixin, infi_engines.MergeTree):
@ -48,33 +45,32 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
self.version_col = kwargs.pop('version_col', None)
super(CollapsingMergeTree, self).__init__(*args, **kwargs)
def _get_final_versions_by_version(self, db_alias, model_cls, min_date, max_date, object_pks, date_col):
def _get_final_versions_by_version(self, db_alias, model_cls, min_date, max_date, object_pks, date_col, columns):
query = """
SELECT * FROM $table WHERE (`{pk_column}`, `{version_col}`) IN (
SELECT {columns} FROM $table WHERE (`{pk_column}`, `{version_col}`) IN (
SELECT `{pk_column}`, MAX(`{version_col}`)
FROM $table
PREWHERE `{date_col}` >= '{min_date}' AND `{date_col}` <= '{max_date}'
AND `{pk_column}` IN ({object_pks})
GROUP BY `{pk_column}`
)
""".format(version_col=self.version_col, date_col=date_col, pk_column=self.pk_column,
""".format(columns=','.join(columns), version_col=self.version_col, date_col=date_col, pk_column=self.pk_column,
min_date=min_date, max_date=max_date, object_pks=','.join(object_pks))
qs = connections[db_alias].select_init_many(query, model_cls)
return list(qs)
return connections[db_alias].select_tuples(query, model_cls)
def _get_final_versions_by_final(self, db_alias, model_cls, min_date, max_date, object_pks, date_col):
def _get_final_versions_by_final(self, db_alias, model_cls, min_date, max_date, object_pks, date_col, columns):
query = """
SELECT * FROM $table FINAL
SELECT {columns} FROM $table FINAL
WHERE `{date_col}` >= '{min_date}' AND `{date_col}` <= '{max_date}'
AND `{pk_column}` IN ({object_pks})
"""
query = query.format(date_col=date_col, pk_column=self.pk_column, min_date=min_date,
query = query.format(columns=','.join(columns), date_col=date_col, pk_column=self.pk_column, min_date=min_date,
max_date=max_date, object_pks=','.join(object_pks))
qs = connections[db_alias].select_init_many(query, model_cls)
return list(qs)
return connections[db_alias].select_tuples(query, model_cls)
def get_final_versions(self, model_cls, objects, date_col=None):
# type: (Type['ClickHouseModel'], Iterable[DjangoModel], str) -> Generator[tuple]
"""
Get objects, that are currently stored in ClickHouse.
Depending on the partition key this can be different for different models.
@ -82,7 +78,8 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
It also supposes primary key to by self.pk_column
:param model_cls: ClickHouseModel subclass to import
:param objects: Objects for which final versions are searched
:return: A list of model objects
:param date_col: Optional column name, where partiion date is hold. Defaults to self.date_col
:return: A generator of named tuples, representing previous state
"""
def _dt_to_str(dt): # type: (Union[datetime.date, datetime.datetime]) -> str
@ -94,7 +91,7 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
raise Exception('Invalid date or datetime object: `%s`' % dt)
if not objects:
return []
raise StopIteration()
date_col = date_col or self.date_col
min_date, max_date = None, None
@ -114,41 +111,48 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
min_date = _dt_to_str(min_date)
max_date = _dt_to_str(max_date)
# Get fields. Sign is replaced to negative for further processing
columns = list(model_cls.fields(writable=True).keys())
columns.remove(self.sign_col)
columns.append('-1 AS sign')
params = (db_alias, model_cls, min_date, max_date, object_pks, date_col, columns)
if self.version_col:
return self._get_final_versions_by_version(db_alias, model_cls, min_date, max_date, object_pks, date_col)
return self._get_final_versions_by_version(*params)
else:
return self._get_final_versions_by_final(db_alias, model_cls, min_date, max_date, object_pks, date_col)
return self._get_final_versions_by_final(*params)
def get_insert_batch(self, model_cls, objects):
# type: (Type[T], List[DjangoModel]) -> List[T]
# type: (Type['ClickHouseModel'], List[DjangoModel]) -> Generator[tuple]
"""
Gets a list of model_cls instances to insert into database
:param model_cls: ClickHouseModel subclass to import
:param objects: A list of django Model instances to sync
:return: A list of model_cls objects
"""
new_objs = super(CollapsingMergeTree, self).get_insert_batch(model_cls, objects)
defaults = {self.sign_col: 1}
if self.version_col:
defaults[self.version_col] = 1
serializer = model_cls.get_django_model_serializer(writable=True, defaults=defaults)
new_objs = [serializer.serialize(obj) for obj in objects]
statsd_key = "%s.sync.%s.steps.get_final_versions" % (config.STATSD_PREFIX, model_cls.__name__)
with statsd.timer(statsd_key):
old_objs = self.get_final_versions(model_cls, new_objs)
# -1 sign has been set get_final_versions()
old_objs_versions = {}
for obj in old_objs:
self.set_obj_sign(obj, -1)
old_objs_versions[obj.id] = getattr(obj, self.version_col)
for obj in new_objs:
self.set_obj_sign(obj, 1)
pk = getattr(obj, self.pk_column)
if self.version_col:
setattr(obj, self.version_col, old_objs_versions.get(obj.id, 0) + 1)
old_objs_versions[pk] = getattr(obj, self.version_col)
yield obj
return old_objs + new_objs
# 1 sign is set by default in serializer
for obj in new_objs:
pk = getattr(obj, self.pk_column)
if self.version_col:
obj = obj._replace(**{self.version_col: old_objs_versions.get(pk, 0) + 1})
def set_obj_sign(self, obj, sign): # type: (InfiModel, int) -> None
"""
Sets objects sign. By default gets attribute name from sign_col
:return: None
"""
setattr(obj, self.sign_col, sign)
yield obj

View File

@ -1,11 +1,12 @@
from django.db.models import Model as DjangoModel
from typing import List, Iterable
import datetime
from typing import NamedTuple
from django.db.models import Model as DjangoModel
from django_clickhouse.utils import model_to_dict
class Django2ClickHouseModelSerializer:
def __init__(self, model_cls, fields=None, exclude_fields=None, writable=False):
def __init__(self, model_cls, fields=None, exclude_fields=None, writable=False, defaults=None):
self._model_cls = model_cls
if fields is not None:
self.serialize_fields = fields
@ -13,6 +14,7 @@ class Django2ClickHouseModelSerializer:
self.serialize_fields = model_cls.fields(writable=writable).keys()
self.exclude_serialize_fields = exclude_fields
self._result_class = self._model_cls.get_tuple_class(defaults=defaults)
def _get_serialize_kwargs(self, obj):
data = model_to_dict(obj, fields=self.serialize_fields, exclude_fields=self.exclude_serialize_fields)
@ -29,8 +31,5 @@ class Django2ClickHouseModelSerializer:
return result
def serialize(self, obj): # type: (DjangoModel) -> 'ClickHouseModel'
return self._model_cls(**self._get_serialize_kwargs(obj))
def serialize_many(self, objs): # type: (Iterable[DjangoModel]) -> Iterable['ClickHouseModel']
return self._model_cls.init_many((self._get_serialize_kwargs(obj) for obj in objs))
def serialize(self, obj): # type: (DjangoModel) -> NamedTuple
return self._result_class(**self._get_serialize_kwargs(obj))

View File

@ -106,7 +106,7 @@ class Storage:
This method should be called from inner functions.
:param import_key: A key, returned by ClickHouseModel.get_import_key() method
:param operation: One of insert, update, delete
:param pk: Primary key to find records in main database. Should be string-serializable with str() method.
:param pks: Primary keys to find records in main database. Should be string-serializable with str() method.
:return: None
"""
if operation not in {'insert', 'update', 'delete'}:

View File

@ -69,6 +69,9 @@ if __name__ == '__main__':
parser.add_argument('--once', type=bool, required=False, default=False)
params = vars(parser.parse_args())
# Disable registering not needed models
TestModel._clickhouse_sync_models = {ClickHouseCollapseTestModel}
func_name = params['process']
method = locals()[func_name]
method(**params)

View File

@ -0,0 +1,45 @@
from unittest import TestCase
from django_clickhouse.compatibility import namedtuple
class NamedTupleTest(TestCase):
def test_defaults(self):
TestTuple = namedtuple('TestTuple', ('a', 'b', 'c'), defaults={'c': 3})
self.assertTupleEqual((1, 2, 3), tuple(TestTuple(1, b=2)))
self.assertTupleEqual((1, 2, 4), tuple(TestTuple(1, 2, 4)))
self.assertTupleEqual((1, 2, 4), tuple(TestTuple(a=1, b=2, c=4)))
def test_exceptions(self):
TestTuple = namedtuple('TestTuple', ('a', 'b', 'c'), defaults={'c': 3})
# BUG On python < 3.7 this error is not raised, as not given defaults are filled by None
# with self.assertRaises(TypeError):
# TestTuple(b=1, c=4)
with self.assertRaises(TypeError):
TestTuple(1, 2, 3, c=4)
def test_different_defaults(self):
# Test that 2 tuple type defaults don't affect each other
TestTuple = namedtuple('TestTuple', ('a', 'b', 'c'), defaults={'c': 3})
OtherTuple = namedtuple('TestTuple', ('a', 'b', 'c'), defaults={'c': 4})
t1 = TestTuple(a=1, b=2)
t2 = OtherTuple(a=3, b=4)
self.assertTupleEqual((1, 2, 3), tuple(t1))
self.assertTupleEqual((3, 4, 4), tuple(t2))
def test_defaults_cache(self):
# Test that 2 tuple instances don't affect each other's defaults
TestTuple = namedtuple('TestTuple', ('a', 'b', 'c'), defaults={'c': 3})
self.assertTupleEqual((1, 2, 4), tuple(TestTuple(a=1, b=2, c=4)))
self.assertTupleEqual((1, 2, 3), tuple(TestTuple(a=1, b=2)))
def test_equal(self):
TestTuple = namedtuple('TestTuple', ('a', 'b', 'c'))
t1 = TestTuple(1, 2, 3)
t2 = TestTuple(1, 2, 3)
self.assertEqual(t1, t2)
self.assertEqual((1, 2, 3), t1)

View File

@ -1,10 +1,7 @@
import datetime
import pytz
from django.test import TestCase
from django_clickhouse.migrations import migrate_app
from django_clickhouse.database import connections
from django_clickhouse.migrations import migrate_app
from tests.clickhouse_models import ClickHouseCollapseTestModel
from tests.models import TestModel
@ -16,36 +13,43 @@ class CollapsingMergeTreeTest(TestCase):
collapse_fixture = [{
"id": 1,
"created": "2018-01-01 00:00:00",
"value": 0,
"sign": 1,
"version": 1
}, {
"id": 1,
"created": "2018-01-01 00:00:00",
"value": 0,
"sign": -1,
"version": 1
}, {
"id": 1,
"created": "2018-01-01 00:00:00",
"value": 0,
"sign": 1,
"version": 2
}, {
"id": 1,
"created": "2018-01-01 00:00:00",
"value": 0,
"sign": -1,
"version": 2
}, {
"id": 1,
"created": "2018-01-01 00:00:00",
"value": 0,
"sign": 1,
"version": 3
}, {
"id": 1,
"created": "2018-01-01 00:00:00",
"value": 0,
"sign": -1,
"version": 3
}, {
"id": 1,
"created": "2018-01-01 00:00:00",
"value": 0,
"sign": 1,
"version": 4
}]
@ -62,43 +66,38 @@ class CollapsingMergeTreeTest(TestCase):
])
self.objects = TestModel.objects.filter(id=1)
def _test_final_versions(self, final_versions):
final_versions = list(final_versions)
self.assertEqual(1, len(final_versions))
item = (final_versions[0].id, final_versions[0].sign, final_versions[0].version, final_versions[0].value)
self.assertTupleEqual((1, -1, 4, 0), item)
def test_get_final_versions_by_final_date(self):
final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel,
self.objects)
self.assertEqual(1, len(final_versions))
self.assertDictEqual({'id': 1, 'sign': 1, 'version': 4, 'value': 0},
final_versions[0].to_dict(field_names=('id', 'sign', 'value', 'version')))
self._test_final_versions(final_versions)
def test_get_final_versions_by_version_date(self):
ClickHouseCollapseTestModel.engine.version_col = 'version'
final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel,
self.objects)
self.assertEqual(1, len(final_versions))
self.assertDictEqual({'id': 1, 'sign': 1, 'version': 4, 'value': 0},
final_versions[0].to_dict(field_names=('id', 'sign', 'value', 'version')))
self._test_final_versions(final_versions)
def test_get_final_versions_by_final_datetime(self):
final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel,
self.objects, date_col='created')
self.assertEqual(1, len(final_versions))
self.assertDictEqual({'id': 1, 'sign': 1, 'version': 4, 'value': 0},
final_versions[0].to_dict(field_names=('id', 'sign', 'value', 'version')))
self._test_final_versions(final_versions)
def test_get_final_versions_by_version_datetime(self):
ClickHouseCollapseTestModel.engine.version_col = 'version'
final_versions = ClickHouseCollapseTestModel.engine.get_final_versions(ClickHouseCollapseTestModel,
self.objects, date_col='created')
self.assertEqual(1, len(final_versions))
self.assertDictEqual({'id': 1, 'sign': 1, 'version': 4, 'value': 0},
final_versions[0].to_dict(field_names=('id', 'sign', 'value', 'version')))
self._test_final_versions(final_versions)
def test_versions(self):
ClickHouseCollapseTestModel.engine.version_col = 'version'
batch = ClickHouseCollapseTestModel.get_insert_batch(self.objects)
batch = list(batch)
self.assertEqual(2, len(batch))
self.assertEqual(4, batch[0].version)
self.assertEqual(-1, batch[0].sign)

View File

@ -16,7 +16,6 @@ class Django2ClickHouseModelSerializerTest(TestCase):
def test_all(self):
serializer = Django2ClickHouseModelSerializer(ClickHouseTestModel)
res = serializer.serialize(self.obj)
self.assertIsInstance(res, ClickHouseTestModel)
self.assertEqual(self.obj.id, res.id)
self.assertEqual(self.obj.value, res.value)
self.assertEqual(self.obj.created_date, res.created_date)
@ -24,7 +23,6 @@ class Django2ClickHouseModelSerializerTest(TestCase):
def test_fields(self):
serializer = Django2ClickHouseModelSerializer(ClickHouseTestModel, fields=('value',))
res = serializer.serialize(self.obj)
self.assertIsInstance(res, ClickHouseTestModel)
self.assertEqual(0, res.id)
self.assertEqual(datetime.date(1970, 1, 1), res.created_date)
self.assertEqual(self.obj.value, res.value)
@ -32,7 +30,6 @@ class Django2ClickHouseModelSerializerTest(TestCase):
def test_exclude_fields(self):
serializer = Django2ClickHouseModelSerializer(ClickHouseTestModel, exclude_fields=('created_date',))
res = serializer.serialize(self.obj)
self.assertIsInstance(res, ClickHouseTestModel)
self.assertEqual(datetime.date(1970, 1, 1), res.created_date)
self.assertEqual(self.obj.id, res.id)
self.assertEqual(self.obj.value, res.value)

View File

@ -20,7 +20,7 @@ logger = logging.getLogger('django-clickhouse')
class SyncTest(TransactionTestCase):
def setUp(self):
self.db = connections['default']
self.db = ClickHouseCollapseTestModel.get_database()
self.db.drop_database()
self.db.create_database()
migrate_app('tests', 'default')
@ -42,7 +42,7 @@ class SyncTest(TransactionTestCase):
obj.save()
ClickHouseCollapseTestModel.sync_batch_from_storage()
# sync_batch_from_storage uses FINAL, so data would be collapsed by now
# insert and update came before sync. Only one item will be inserted
synced_data = list(ClickHouseCollapseTestModel.objects.all())
self.assertEqual(1, len(synced_data))
self.assertEqual(obj.value, synced_data[0].value)
@ -53,7 +53,7 @@ class SyncTest(TransactionTestCase):
ClickHouseCollapseTestModel.sync_batch_from_storage()
synced_data = list(self.db.select('SELECT * FROM $table FINAL', model_class=ClickHouseCollapseTestModel))
self.assertGreaterEqual(1, len(synced_data))
self.assertGreaterEqual(len(synced_data), 1)
self.assertEqual(obj.value, synced_data[0].value)
self.assertEqual(obj.id, synced_data[0].id)
@ -65,7 +65,7 @@ class SyncTest(TransactionTestCase):
obj.save()
ClickHouseCollapseTestModel.sync_batch_from_storage()
# sync_batch_from_storage uses FINAL, so data would be collapsed by now
# insert and update came before sync. Only one item will be inserted
synced_data = list(ClickHouseCollapseTestModel.objects.all())
self.assertEqual(1, len(synced_data))
self.assertEqual(obj.value, synced_data[0].value)
@ -76,7 +76,7 @@ class SyncTest(TransactionTestCase):
ClickHouseCollapseTestModel.sync_batch_from_storage()
synced_data = list(self.db.select('SELECT * FROM $table FINAL', model_class=ClickHouseCollapseTestModel))
self.assertGreaterEqual(1, len(synced_data))
self.assertGreaterEqual(len(synced_data), 1)
self.assertEqual(obj.value, synced_data[0].value)
self.assertEqual(obj.id, synced_data[0].id)
@ -116,7 +116,7 @@ class SyncTest(TransactionTestCase):
ClickHouseMultiTestModel.sync_batch_from_storage()
synced_data = list(self.db.select('SELECT * FROM $table FINAL', model_class=ClickHouseCollapseTestModel))
self.assertGreaterEqual(1, len(synced_data))
self.assertGreaterEqual(len(synced_data), 1)
self.assertEqual(obj.value, synced_data[0].value)
self.assertEqual(obj.id, synced_data[0].id)
@ -154,9 +154,14 @@ class KillTest(TransactionTestCase):
self.sync_iteration(False)
sync_left = storage.operations_count(import_key)
ch_data = list(connections['default'].select('SELECT * FROM $table FINAL ORDER BY id',
model_class=ClickHouseCollapseTestModel))
logger.debug('django_clickhouse: sync finished')
ch_data = list(connections['default'].select_tuples('SELECT * FROM $table FINAL ORDER BY id',
model_class=ClickHouseCollapseTestModel))
logger.debug('django_clickhouse: got clickhouse data')
pg_data = list(TestModel.objects.all().order_by('id'))
logger.debug('django_clickhouse: got postgres data')
if len(pg_data) != len(ch_data):
absent_ids = set(item.id for item in pg_data) - set(item.id for item in ch_data)
@ -165,8 +170,9 @@ class KillTest(TransactionTestCase):
min(item.id for item in pg_data), max(item.id for item in pg_data)))
self.assertEqual(len(pg_data), len(ch_data))
serializer = ClickHouseCollapseTestModel.get_django_model_serializer()
self.assertListEqual(ch_data, list(serializer.serialize_many(pg_data)))
for pg_item, ch_item in zip(pg_data, ch_data):
self.assertEqual(ch_item.id, pg_item.id)
self.assertEqual(ch_item.value, pg_item.value)
@classmethod
def sync_iteration(cls, kill=True):

View File

@ -1,5 +1,7 @@
import datetime
import time
from queue import Queue
from time import gmtime, localtime
import pytz
from django.test import TestCase
@ -12,7 +14,9 @@ from django_clickhouse.utils import get_tz_offset, format_datetime, lazy_class_i
class GetTZOffsetTest(TestCase):
def test_func(self):
self.assertEqual(300, get_tz_offset())
ts = time.time()
utc_offset = (datetime.datetime.fromtimestamp(ts) - datetime.datetime.utcfromtimestamp(ts)).total_seconds()
self.assertEqual(utc_offset / 60, get_tz_offset())
class FormatDateTimeTest(TestCase):