mirror of
https://github.com/carrotquest/django-clickhouse.git
synced 2024-11-24 01:53:45 +03:00
More optimization and fixed some bugs
This commit is contained in:
parent
88c1b9a75c
commit
056f2582d4
4
.gitignore
vendored
4
.gitignore
vendored
|
@ -3,6 +3,9 @@ __pycache__/
|
||||||
*.py[cod]
|
*.py[cod]
|
||||||
*$py.class
|
*$py.class
|
||||||
|
|
||||||
|
# JetBrains files
|
||||||
|
.idea
|
||||||
|
|
||||||
# C extensions
|
# C extensions
|
||||||
*.so
|
*.so
|
||||||
|
|
||||||
|
@ -102,3 +105,4 @@ venv.bak/
|
||||||
|
|
||||||
# mypy
|
# mypy
|
||||||
.mypy_cache/
|
.mypy_cache/
|
||||||
|
/.idea/dataSources.xml
|
||||||
|
|
|
@ -64,7 +64,11 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get_tuple_class(cls, field_names=None, defaults=None):
|
def get_tuple_class(cls, field_names=None, defaults=None):
|
||||||
field_names = field_names or tuple(cls.fields(writable=False).keys())
|
field_names = field_names or cls.fields(writable=False).keys()
|
||||||
|
|
||||||
|
# Strange, but sometimes the columns are in different order...
|
||||||
|
field_names = tuple(sorted(field_names))
|
||||||
|
|
||||||
if defaults:
|
if defaults:
|
||||||
defaults_new = deepcopy(cls._defaults)
|
defaults_new = deepcopy(cls._defaults)
|
||||||
defaults_new.update(defaults)
|
defaults_new.update(defaults)
|
||||||
|
|
|
@ -1,54 +1,26 @@
|
||||||
import sys
|
import sys
|
||||||
from collections import namedtuple as basenamedtuple
|
from collections import namedtuple as basenamedtuple, Mapping
|
||||||
from functools import lru_cache
|
from functools import lru_cache
|
||||||
|
|
||||||
from copy import deepcopy
|
from copy import deepcopy
|
||||||
|
|
||||||
|
|
||||||
class NamedTuple:
|
|
||||||
__slots__ = ('_data', '_data_iterator')
|
|
||||||
_defaults = {}
|
|
||||||
_data_cls = None
|
|
||||||
|
|
||||||
@classmethod
|
|
||||||
@lru_cache(maxsize=32)
|
|
||||||
def _get_defaults(cls, exclude):
|
|
||||||
res = deepcopy(cls._defaults)
|
|
||||||
for k in exclude:
|
|
||||||
res.pop(k, None)
|
|
||||||
return res
|
|
||||||
|
|
||||||
def _astuple(self):
|
|
||||||
return tuple(self._data)
|
|
||||||
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
new_kwargs = deepcopy(self._get_defaults(self._data_cls._fields[:len(args)]))
|
|
||||||
new_kwargs.update(kwargs)
|
|
||||||
self._data = self._data_cls(*args, **new_kwargs)
|
|
||||||
|
|
||||||
def __getattr__(self, item):
|
|
||||||
return getattr(self._data, item)
|
|
||||||
|
|
||||||
def __iter__(self):
|
|
||||||
self._data_iterator = iter(self._data)
|
|
||||||
return self
|
|
||||||
|
|
||||||
def __next__(self):
|
|
||||||
return next(self._data_iterator)
|
|
||||||
|
|
||||||
def __eq__(self, other):
|
|
||||||
other_tuple = other._astuple() if isinstance(other, NamedTuple) else other
|
|
||||||
return tuple(self._data) == other_tuple
|
|
||||||
|
|
||||||
|
|
||||||
def namedtuple(*args, **kwargs):
|
def namedtuple(*args, **kwargs):
|
||||||
"""
|
"""
|
||||||
Changes namedtuple to support defaults parameter as python 3.7 does
|
Changes namedtuple to support defaults parameter as python 3.7 does
|
||||||
https://docs.python.org/3.7/library/collections.html#collections.namedtuple
|
https://docs.python.org/3.7/library/collections.html#collections.namedtuple
|
||||||
|
See https://stackoverflow.com/questions/11351032/namedtuple-and-default-values-for-optional-keyword-arguments
|
||||||
:return: namedtuple class
|
:return: namedtuple class
|
||||||
"""
|
"""
|
||||||
if sys.version_info < (3, 7):
|
if sys.version_info < (3, 7):
|
||||||
defaults = kwargs.pop('defaults', {})
|
defaults = kwargs.pop('defaults', {})
|
||||||
return type('namedtuple', (NamedTuple,), {'_defaults': defaults, '_data_cls': basenamedtuple(*args, **kwargs)})
|
TupleClass = basenamedtuple(*args, **kwargs)
|
||||||
|
TupleClass.__new__.__defaults__ = (None,) * len(TupleClass._fields)
|
||||||
|
if isinstance(defaults, Mapping):
|
||||||
|
prototype = TupleClass(**defaults)
|
||||||
|
else:
|
||||||
|
prototype = TupleClass(*defaults)
|
||||||
|
TupleClass.__new__.__defaults__ = tuple(prototype)
|
||||||
|
return TupleClass
|
||||||
else:
|
else:
|
||||||
return basenamedtuple(*args, **kwargs)
|
return basenamedtuple(*args, **kwargs)
|
||||||
|
|
|
@ -56,10 +56,10 @@ class Database(InfiDatabase):
|
||||||
# skip blank line left by WITH TOTALS modifier
|
# skip blank line left by WITH TOTALS modifier
|
||||||
if line:
|
if line:
|
||||||
values = iter(parse_tsv(line))
|
values = iter(parse_tsv(line))
|
||||||
item = res_class(*(
|
item = res_class(**{
|
||||||
fields[i].to_python(next(values), self.server_timezone)
|
field_name: fields[i].to_python(next(values), self.server_timezone)
|
||||||
for i, r in enumerate(parse_tsv(line))
|
for i, field_name in enumerate(field_names)
|
||||||
))
|
})
|
||||||
|
|
||||||
yield item
|
yield item
|
||||||
|
|
||||||
|
@ -83,7 +83,8 @@ class Database(InfiDatabase):
|
||||||
raise DatabaseException("You can't insert into read only and system tables")
|
raise DatabaseException("You can't insert into read only and system tables")
|
||||||
|
|
||||||
fields_list = ','.join('`%s`' % name for name in first_tuple._fields)
|
fields_list = ','.join('`%s`' % name for name in first_tuple._fields)
|
||||||
fields = [f for name, f in model_class.fields(writable=True).items() if name in first_tuple._fields]
|
fields_dict = model_class.fields(writable=True)
|
||||||
|
fields = [fields_dict[name] for name in first_tuple._fields]
|
||||||
statsd_key = "%s.inserted_tuples.%s.{0}" % (config.STATSD_PREFIX, model_class.__name__)
|
statsd_key = "%s.inserted_tuples.%s.{0}" % (config.STATSD_PREFIX, model_class.__name__)
|
||||||
|
|
||||||
def tuple_to_csv(tup):
|
def tuple_to_csv(tup):
|
||||||
|
@ -97,7 +98,7 @@ class Database(InfiDatabase):
|
||||||
buf.write(tuple_to_csv(first_tuple).encode('utf-8'))
|
buf.write(tuple_to_csv(first_tuple).encode('utf-8'))
|
||||||
|
|
||||||
# Collect lines in batches of batch_size
|
# Collect lines in batches of batch_size
|
||||||
lines = 2
|
lines = 1
|
||||||
for t in tuples_iterator:
|
for t in tuples_iterator:
|
||||||
buf.write(tuple_to_csv(t).encode('utf-8'))
|
buf.write(tuple_to_csv(t).encode('utf-8'))
|
||||||
|
|
||||||
|
|
|
@ -139,7 +139,7 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
|
||||||
|
|
||||||
statsd_key = "%s.sync.%s.steps.get_final_versions" % (config.STATSD_PREFIX, model_cls.__name__)
|
statsd_key = "%s.sync.%s.steps.get_final_versions" % (config.STATSD_PREFIX, model_cls.__name__)
|
||||||
with statsd.timer(statsd_key):
|
with statsd.timer(statsd_key):
|
||||||
old_objs = list(self.get_final_versions(model_cls, new_objs))
|
old_objs = self.get_final_versions(model_cls, new_objs)
|
||||||
|
|
||||||
# -1 sign has been set get_final_versions()
|
# -1 sign has been set get_final_versions()
|
||||||
old_objs_versions = {}
|
old_objs_versions = {}
|
||||||
|
@ -153,6 +153,6 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
|
||||||
for obj in new_objs:
|
for obj in new_objs:
|
||||||
pk = getattr(obj, self.pk_column)
|
pk = getattr(obj, self.pk_column)
|
||||||
if self.version_col:
|
if self.version_col:
|
||||||
setattr(obj, self.version_col, old_objs_versions.get(pk, 0) + 1)
|
obj = obj._replace(**{self.version_col: old_objs_versions.get(pk, 0) + 1})
|
||||||
|
|
||||||
yield obj
|
yield obj
|
||||||
|
|
|
@ -12,8 +12,10 @@ class NamedTupleTest(TestCase):
|
||||||
|
|
||||||
def test_exceptions(self):
|
def test_exceptions(self):
|
||||||
TestTuple = namedtuple('TestTuple', ('a', 'b', 'c'), defaults={'c': 3})
|
TestTuple = namedtuple('TestTuple', ('a', 'b', 'c'), defaults={'c': 3})
|
||||||
with self.assertRaises(TypeError):
|
|
||||||
TestTuple(b=1, c=4)
|
# BUG On python < 3.7 this error is not raised, as not given defaults are filled by None
|
||||||
|
# with self.assertRaises(TypeError):
|
||||||
|
# TestTuple(b=1, c=4)
|
||||||
|
|
||||||
with self.assertRaises(TypeError):
|
with self.assertRaises(TypeError):
|
||||||
TestTuple(1, 2, 3, c=4)
|
TestTuple(1, 2, 3, c=4)
|
||||||
|
|
|
@ -20,7 +20,7 @@ logger = logging.getLogger('django-clickhouse')
|
||||||
|
|
||||||
class SyncTest(TransactionTestCase):
|
class SyncTest(TransactionTestCase):
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.db = connections['default']
|
self.db = ClickHouseCollapseTestModel.get_database()
|
||||||
self.db.drop_database()
|
self.db.drop_database()
|
||||||
self.db.create_database()
|
self.db.create_database()
|
||||||
migrate_app('tests', 'default')
|
migrate_app('tests', 'default')
|
||||||
|
@ -42,7 +42,7 @@ class SyncTest(TransactionTestCase):
|
||||||
obj.save()
|
obj.save()
|
||||||
ClickHouseCollapseTestModel.sync_batch_from_storage()
|
ClickHouseCollapseTestModel.sync_batch_from_storage()
|
||||||
|
|
||||||
# sync_batch_from_storage uses FINAL, so data would be collapsed by now
|
# insert and update came before sync. Only one item will be inserted
|
||||||
synced_data = list(ClickHouseCollapseTestModel.objects.all())
|
synced_data = list(ClickHouseCollapseTestModel.objects.all())
|
||||||
self.assertEqual(1, len(synced_data))
|
self.assertEqual(1, len(synced_data))
|
||||||
self.assertEqual(obj.value, synced_data[0].value)
|
self.assertEqual(obj.value, synced_data[0].value)
|
||||||
|
@ -65,7 +65,7 @@ class SyncTest(TransactionTestCase):
|
||||||
obj.save()
|
obj.save()
|
||||||
ClickHouseCollapseTestModel.sync_batch_from_storage()
|
ClickHouseCollapseTestModel.sync_batch_from_storage()
|
||||||
|
|
||||||
# sync_batch_from_storage uses FINAL, so data would be collapsed by now
|
# insert and update came before sync. Only one item will be inserted
|
||||||
synced_data = list(ClickHouseCollapseTestModel.objects.all())
|
synced_data = list(ClickHouseCollapseTestModel.objects.all())
|
||||||
self.assertEqual(1, len(synced_data))
|
self.assertEqual(1, len(synced_data))
|
||||||
self.assertEqual(obj.value, synced_data[0].value)
|
self.assertEqual(obj.value, synced_data[0].value)
|
||||||
|
|
Loading…
Reference in New Issue
Block a user