mirror of
https://github.com/carrotquest/django-clickhouse.git
synced 2025-04-15 14:22:01 +03:00
Compare commits
8 Commits
Author | SHA1 | Date | |
---|---|---|---|
|
3b79ad697a | ||
|
95f1bdb26a | ||
|
85d27a65f9 | ||
|
43a1821408 | ||
|
1b247e95fe | ||
|
244b4fbd37 | ||
|
0ac4615649 | ||
|
ee723fca2a |
52
.github/workflows/python-tests.yml
vendored
52
.github/workflows/python-tests.yml
vendored
|
@ -11,12 +11,56 @@ jobs:
|
|||
runs-on: ubuntu-latest
|
||||
strategy:
|
||||
matrix:
|
||||
python-version: ["3.6", "3.7", "3.8", "3.9", "3.10"]
|
||||
postgres-version: ["9.6", "10", "11", "12", "13"]
|
||||
django-version: ["2.1", "2.2", "3.0", "3.1", "3.2"]
|
||||
python-version: ["3.8", "3.9", "3.10", "3.11", "3.12"]
|
||||
postgres-version: ["9.6", "10", "11", "12", "13", "14", "15", "16"]
|
||||
django-version: ["3.2", "4.0", "4.1", "4.2", "5.0", "5.1"]
|
||||
clickhouse-version: ["latest"]
|
||||
redis-version: ["latest"]
|
||||
|
||||
exclude:
|
||||
# Django 4.0+ doesn't support PostgreSQL 9.6
|
||||
- django-version: "4.0"
|
||||
postgres-version: "9.6"
|
||||
- django-version: "4.1"
|
||||
postgres-version: "9.6"
|
||||
- django-version: "4.2"
|
||||
postgres-version: "9.6"
|
||||
- django-version: "5.0"
|
||||
postgres-version: "9.6"
|
||||
- django-version: "5.1"
|
||||
postgres-version: "9.6"
|
||||
|
||||
# Django 4.1+ doesn't support PostgreSQL 10
|
||||
- django-version: "4.1"
|
||||
postgres-version: "10"
|
||||
- django-version: "4.2"
|
||||
postgres-version: "10"
|
||||
- django-version: "5.0"
|
||||
postgres-version: "10"
|
||||
- django-version: "5.1"
|
||||
postgres-version: "10"
|
||||
|
||||
# Django 4.2+ doesn't support PostgreSQL 11
|
||||
- django-version: "4.2"
|
||||
postgres-version: "11"
|
||||
- django-version: "5.0"
|
||||
postgres-version: "11"
|
||||
- django-version: "5.1"
|
||||
postgres-version: "11"
|
||||
|
||||
# Django 5.1+ doesn't support PostgreSQL 12
|
||||
- django-version: "5.1"
|
||||
postgres-version: "12"
|
||||
|
||||
# Django 5.0+ does not support python 3.8, 3.9
|
||||
- django-version: "5.0"
|
||||
python-version: "3.8"
|
||||
- django-version: "5.0"
|
||||
python-version: "3.9"
|
||||
- django-version: "5.1"
|
||||
python-version: "3.8"
|
||||
- django-version: "5.1"
|
||||
python-version: "3.9"
|
||||
|
||||
services:
|
||||
postgres:
|
||||
image: postgres:${{ matrix.postgres-version }}
|
||||
|
|
|
@ -96,6 +96,19 @@ class ClickHouseUser(ClickHouseModel):
|
|||
engine = MergeTree('birthday', ('birthday',))
|
||||
```
|
||||
|
||||
**Important note**: `clickhouse_model.py` file is not anyhow imported by django initialization code. So if your models are not used anywhere excluding this file, you should import it somewhere in your code if you want synchroniztion working correctly. For instance, you can customise [AppConfig](https://docs.djangoproject.com/en/5.0/ref/applications/#django.apps.AppConfig.ready) like:
|
||||
|
||||
```python
|
||||
from django.apps import AppConfig
|
||||
|
||||
|
||||
class MyAppConfig(AppConfig):
|
||||
name = 'my_app'
|
||||
|
||||
def ready(self):
|
||||
from my_app.clickhouse_models import ClickHouseUser
|
||||
```
|
||||
|
||||
## Migration to create table in ClickHouse
|
||||
1. Read [migrations](migrations.md) section
|
||||
2. Create `clickhouse_migrations` package in your django app
|
||||
|
@ -112,7 +125,7 @@ class ClickHouseUser(ClickHouseModel):
|
|||
4. Add content to file `0001_initial.py`:
|
||||
```python
|
||||
from django_clickhouse import migrations
|
||||
from my_app.cilckhouse_models import ClickHouseUser
|
||||
from my_app.clickhouse_models import ClickHouseUser
|
||||
|
||||
class Migration(migrations.Migration):
|
||||
operations = [
|
||||
|
|
2
setup.py
2
setup.py
|
@ -13,7 +13,7 @@ with open('requirements.txt') as f:
|
|||
|
||||
setup(
|
||||
name='django-clickhouse',
|
||||
version='1.2.1',
|
||||
version='1.2.2',
|
||||
packages=['django_clickhouse', 'django_clickhouse.management.commands'],
|
||||
package_dir={'': 'src'},
|
||||
url='https://github.com/carrotquest/django-clickhouse',
|
||||
|
|
|
@ -9,6 +9,7 @@ from itertools import chain
|
|||
from typing import List, Tuple, Iterable, Set, Any, Optional
|
||||
|
||||
from django.db.models import Model as DjangoModel, QuerySet as DjangoQuerySet
|
||||
from django.utils.timezone import now
|
||||
from infi.clickhouse_orm.engines import CollapsingMergeTree
|
||||
from infi.clickhouse_orm.models import Model as InfiModel, ModelBase as InfiModelBase
|
||||
from statsd.defaults.django import statsd
|
||||
|
@ -290,7 +291,7 @@ class ClickHouseModel(InfiModel, metaclass=ClickHouseModelMeta):
|
|||
res = (datetime.datetime.now() - last_sync_time).total_seconds() >= cls.get_sync_delay()
|
||||
logger.debug('django-clickhouse: need_sync returned %s for class %s as no last sync found'
|
||||
' (now: %s, last: %s, delay: %d)'
|
||||
% (res, cls.__name__, datetime.datetime.now().isoformat(), last_sync_time.isoformat(),
|
||||
% (res, cls.__name__, now().isoformat(), last_sync_time.isoformat(),
|
||||
cls.get_sync_delay()))
|
||||
|
||||
return res
|
||||
|
|
|
@ -11,12 +11,15 @@ import logging
|
|||
from typing import Any, Optional, List, Tuple
|
||||
|
||||
import os
|
||||
|
||||
from celery.utils.nodenames import gethostname
|
||||
from django.utils.timezone import now
|
||||
from statsd.defaults.django import statsd
|
||||
|
||||
from .configuration import config
|
||||
from .exceptions import ConfigurationError, RedisLockTimeoutError
|
||||
from .redis import redis_zadd
|
||||
from .utils import check_pid, get_subclasses, SingletonMeta
|
||||
from .utils import check_pid_exists, get_subclasses, SingletonMeta
|
||||
|
||||
logger = logging.getLogger('django-clickhouse')
|
||||
|
||||
|
@ -186,8 +189,7 @@ class RedisStorage(Storage, metaclass=SingletonMeta):
|
|||
|
||||
def get_operations(self, import_key, count, **kwargs):
|
||||
ops_key = self.REDIS_KEY_OPS_TEMPLATE.format(import_key=import_key)
|
||||
res = self._redis.zrangebyscore(ops_key, '-inf', datetime.datetime.now().timestamp(), start=0, num=count,
|
||||
withscores=True)
|
||||
res = self._redis.zrangebyscore(ops_key, '-inf', now().timestamp(), start=0, num=count, withscores=True)
|
||||
|
||||
if res:
|
||||
ops, scores = zip(*res)
|
||||
|
@ -214,19 +216,31 @@ class RedisStorage(Storage, metaclass=SingletonMeta):
|
|||
# Block process to be single threaded. Default sync delay is 10 * default sync delay.
|
||||
# It can be changed for model, by passing `lock_timeout` argument to pre_sync
|
||||
lock = self.get_lock(import_key, **kwargs)
|
||||
current_host_name = gethostname()
|
||||
lock_pid_key = self.REDIS_KEY_LOCK_PID.format(import_key=import_key)
|
||||
try:
|
||||
lock.acquire()
|
||||
self._redis.set(lock_pid_key, os.getpid())
|
||||
self._redis.set(lock_pid_key, '%s:%s' % (current_host_name, os.getpid()))
|
||||
except RedisLockTimeoutError:
|
||||
statsd.incr('%s.sync.%s.lock.timeout' % (config.STATSD_PREFIX, import_key))
|
||||
|
||||
# Lock is busy. But If the process has been killed, I don't want to wait any more.
|
||||
# Let's check if pid exists
|
||||
pid = int(self._redis.get(lock_pid_key) or 0)
|
||||
if pid and not check_pid(pid):
|
||||
# I assume that lock has been killed if it works on the same host (other than localhost)
|
||||
# and there is no process alive.
|
||||
# I also assume that there are no hosts with same hostname other than localhost.
|
||||
# Note: previously value contained only pid. Let's support old value for back compatibility
|
||||
active_lock_data = self._redis.get(lock_pid_key).split(b":")
|
||||
active_pid = int(active_lock_data[-1] or 0)
|
||||
active_host_name = active_lock_data[0] \
|
||||
if len(active_lock_data) > 1 and active_lock_data[0] != "localhost" else None
|
||||
|
||||
if (
|
||||
active_pid and active_host_name
|
||||
and active_host_name == current_host_name and not check_pid_exists(active_pid)
|
||||
):
|
||||
statsd.incr('%s.sync.%s.lock.hard_release' % (config.STATSD_PREFIX, import_key))
|
||||
logger.warning('django-clickhouse: hard releasing lock "%s" locked by pid %d (process is dead)'
|
||||
% (import_key, pid))
|
||||
% (import_key, active_pid))
|
||||
self._redis.delete(lock_pid_key)
|
||||
lock.hard_release()
|
||||
self.pre_sync(import_key, **kwargs)
|
||||
|
|
|
@ -127,7 +127,7 @@ def model_to_dict(instance: DjangoModel, fields: Optional[Iterable[str]] = None
|
|||
return data
|
||||
|
||||
|
||||
def check_pid(pid):
|
||||
def check_pid_exists(pid):
|
||||
"""
|
||||
Check For the existence of a unix pid.
|
||||
"""
|
||||
|
|
10
tests/fixtures/test_model.json
vendored
10
tests/fixtures/test_model.json
vendored
|
@ -5,7 +5,7 @@
|
|||
"fields": {
|
||||
"value": 100,
|
||||
"created_date": "2018-01-01",
|
||||
"created": "2018-01-01 00:00:00"
|
||||
"created": "2018-01-01 00:00:00+0000"
|
||||
}
|
||||
},
|
||||
{
|
||||
|
@ -14,7 +14,7 @@
|
|||
"fields": {
|
||||
"value": 200,
|
||||
"created_date": "2018-02-01",
|
||||
"created": "2018-02-01 00:00:00"
|
||||
"created": "2018-02-01 00:00:00+0000"
|
||||
}
|
||||
},
|
||||
{
|
||||
|
@ -23,7 +23,7 @@
|
|||
"fields": {
|
||||
"value": 300,
|
||||
"created_date": "2018-03-01",
|
||||
"created": "2018-03-01 00:00:00"
|
||||
"created": "2018-03-01 00:00:00+0000"
|
||||
}
|
||||
},
|
||||
{
|
||||
|
@ -32,7 +32,7 @@
|
|||
"fields": {
|
||||
"value": 400,
|
||||
"created_date": "2018-04-01",
|
||||
"created": "2018-04-01 00:00:00"
|
||||
"created": "2018-04-01 00:00:00+0000"
|
||||
}
|
||||
},
|
||||
{
|
||||
|
@ -41,7 +41,7 @@
|
|||
"fields": {
|
||||
"value": 500,
|
||||
"created_date": "2018-05-01",
|
||||
"created": "2018-05-01 00:00:00"
|
||||
"created": "2018-05-01 00:00:00+0000"
|
||||
}
|
||||
}
|
||||
]
|
4
tests/fixtures/test_secondary_model.json
vendored
4
tests/fixtures/test_secondary_model.json
vendored
|
@ -5,7 +5,7 @@
|
|||
"fields": {
|
||||
"value": 100,
|
||||
"created_date": "2018-01-01",
|
||||
"created": "2018-02-01 00:00:00"
|
||||
"created": "2018-02-01 00:00:00+0000"
|
||||
}
|
||||
},
|
||||
{
|
||||
|
@ -14,7 +14,7 @@
|
|||
"fields": {
|
||||
"value": 200,
|
||||
"created_date": "2018-02-01",
|
||||
"created": "2018-02-01 00:00:00"
|
||||
"created": "2018-02-01 00:00:00+0000"
|
||||
}
|
||||
}
|
||||
]
|
|
@ -8,6 +8,7 @@ from time import sleep
|
|||
|
||||
import datetime
|
||||
|
||||
from django.utils.timezone import now
|
||||
|
||||
# set Django environment
|
||||
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
|
@ -25,7 +26,7 @@ logger = logging.getLogger('django-clickhouse')
|
|||
def create(batch_size=1000, test_time=60, period=1, **kwargs):
|
||||
for iteration in range(int(test_time / period)):
|
||||
res = TestModel.objects.db_manager('test_db').bulk_create([
|
||||
TestModel(created=datetime.datetime.now(), created_date='2018-01-01', value=iteration * batch_size + i)
|
||||
TestModel(created=now(), created_date='2018-01-01', value=iteration * batch_size + i)
|
||||
for i in range(batch_size)
|
||||
])
|
||||
logger.info('django-clickhouse: test created %d records' % len(res))
|
||||
|
@ -54,8 +55,8 @@ def sync(period=1, test_time=60, **kwargs):
|
|||
if kwargs['once']:
|
||||
ClickHouseCollapseTestModel.sync_batch_from_storage()
|
||||
else:
|
||||
start = datetime.datetime.now()
|
||||
while (datetime.datetime.now() - start).total_seconds() < test_time:
|
||||
start = now()
|
||||
while (now() - start).total_seconds() < test_time:
|
||||
ClickHouseCollapseTestModel.sync_batch_from_storage()
|
||||
sleep(period)
|
||||
|
||||
|
|
|
@ -4,6 +4,7 @@ This file contains django settings to run tests with runtests.py
|
|||
from os import environ
|
||||
|
||||
SECRET_KEY = 'fake-key'
|
||||
USE_TZ = True
|
||||
|
||||
DATABASES = {
|
||||
'default': {
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
import datetime
|
||||
|
||||
from django.test import TestCase
|
||||
from django.utils.timezone import now
|
||||
|
||||
from tests.clickhouse_models import ClickHouseTestModel
|
||||
|
||||
|
@ -20,11 +21,11 @@ class ClickHouseModelTest(TestCase):
|
|||
self.assertTrue(ClickHouseTestModel.need_sync())
|
||||
|
||||
# Time hasn't passed - no sync
|
||||
self.storage.set_last_sync_time(ClickHouseTestModel.get_import_key(), datetime.datetime.now())
|
||||
self.storage.set_last_sync_time(ClickHouseTestModel.get_import_key(), now())
|
||||
self.assertFalse(ClickHouseTestModel.need_sync())
|
||||
|
||||
# Time has passed
|
||||
sync_delay = ClickHouseTestModel.get_sync_delay()
|
||||
self.storage.set_last_sync_time(ClickHouseTestModel.get_import_key(),
|
||||
datetime.datetime.now() - datetime.timedelta(seconds=sync_delay + 1))
|
||||
now() - datetime.timedelta(seconds=sync_delay + 1))
|
||||
self.assertTrue(ClickHouseTestModel.need_sync())
|
||||
|
|
|
@ -40,7 +40,7 @@ class TestOperations(TransactionTestCase):
|
|||
|
||||
def test_save(self):
|
||||
# INSERT operation
|
||||
instance = self.django_model(created_date=datetime.date.today(), created=datetime.datetime.now(), value=2)
|
||||
instance = self.django_model(created_date=datetime.date.today(), created=now(), value=2)
|
||||
instance.save()
|
||||
self.assertListEqual([('insert', "%s.%d" % (self.db_alias, instance.pk))],
|
||||
self.storage.get_operations(self.clickhouse_model.get_import_key(), 10))
|
||||
|
@ -52,13 +52,13 @@ class TestOperations(TransactionTestCase):
|
|||
self.storage.get_operations(self.clickhouse_model.get_import_key(), 10))
|
||||
|
||||
def test_create(self):
|
||||
instance = self.django_model.objects.create(pk=100555, created_date=datetime.date.today(),
|
||||
created=datetime.datetime.now(), value=2)
|
||||
instance = self.django_model.objects.create(pk=100555, created_date=datetime.date.today(), created=now(),
|
||||
value=2)
|
||||
self.assertListEqual([('insert', "%s.%d" % (self.db_alias, instance.pk))],
|
||||
self.storage.get_operations(self.clickhouse_model.get_import_key(), 10))
|
||||
|
||||
def test_bulk_create(self):
|
||||
items = [self.django_model(created_date=datetime.date.today(), created=datetime.datetime.now(), value=i)
|
||||
items = [self.django_model(created_date=datetime.date.today(), created=now(), value=i)
|
||||
for i in range(5)]
|
||||
items = self.django_model.objects.bulk_create(items)
|
||||
self.assertEqual(5, len(items))
|
||||
|
@ -187,7 +187,7 @@ class TestOperations(TransactionTestCase):
|
|||
|
||||
def test_get_or_create(self):
|
||||
instance, created = self.django_model.objects. \
|
||||
get_or_create(pk=100, defaults={'created_date': datetime.date.today(), 'created': datetime.datetime.now(),
|
||||
get_or_create(pk=100, defaults={'created_date': datetime.date.today(), 'created': now(),
|
||||
'value': 2})
|
||||
|
||||
self.assertTrue(created)
|
||||
|
@ -203,8 +203,7 @@ class TestOperations(TransactionTestCase):
|
|||
|
||||
def test_update_or_create(self):
|
||||
instance, created = self.django_model.objects. \
|
||||
update_or_create(pk=100, defaults={'created_date': datetime.date.today(),
|
||||
'created': datetime.datetime.now(), 'value': 2})
|
||||
update_or_create(pk=100, defaults={'created_date': datetime.date.today(), 'created': now(), 'value': 2})
|
||||
self.assertTrue(created)
|
||||
self.assertListEqual([('insert', "%s.%d" % (self.db_alias, instance.pk))],
|
||||
self.storage.get_operations(self.clickhouse_model.get_import_key(), 10))
|
||||
|
@ -229,7 +228,7 @@ class TestOperations(TransactionTestCase):
|
|||
|
||||
def test_bulk_create_returning(self):
|
||||
items = [
|
||||
self.django_model(created_date=datetime.date.today(), created=datetime.datetime.now(), value=i)
|
||||
self.django_model(created_date=datetime.date.today(), created=now(), value=i)
|
||||
for i in range(5)
|
||||
]
|
||||
items = self.django_model.objects.bulk_create_returning(items)
|
||||
|
@ -260,7 +259,7 @@ class TestOperations(TransactionTestCase):
|
|||
|
||||
def test_save_returning(self):
|
||||
# INSERT operation
|
||||
instance = self.django_model(created_date=datetime.date.today(), created=datetime.datetime.now(), value=2)
|
||||
instance = self.django_model(created_date=datetime.date.today(), created=now(), value=2)
|
||||
instance.save_returning()
|
||||
self.assertListEqual([('insert', "%s.%d" % (self.db_alias, instance.pk))],
|
||||
self.storage.get_operations(self.clickhouse_model.get_import_key(), 10))
|
||||
|
@ -284,6 +283,7 @@ class TestOperations(TransactionTestCase):
|
|||
|
||||
|
||||
class TestSecondaryOperations(TestOperations):
|
||||
# from django.db.models.fields import *
|
||||
fixtures = ['test_secondary_model']
|
||||
django_model = SecondaryTestModel
|
||||
clickhouse_model = ClickHouseSecondTestModel
|
||||
|
|
|
@ -30,7 +30,7 @@ class SyncTest(TransactionTestCase):
|
|||
ClickHouseTestModel.get_storage().flush()
|
||||
|
||||
def test_simple(self):
|
||||
obj = TestModel.objects.create(value=1, created=datetime.datetime.now(), created_date=datetime.date.today())
|
||||
obj = TestModel.objects.create(value=1, created=now(), created_date=datetime.date.today())
|
||||
ClickHouseTestModel.sync_batch_from_storage()
|
||||
|
||||
synced_data = list(ClickHouseTestModel.objects.all())
|
||||
|
@ -40,7 +40,7 @@ class SyncTest(TransactionTestCase):
|
|||
self.assertEqual(obj.id, synced_data[0].id)
|
||||
|
||||
def test_collapsing_update_by_final(self):
|
||||
obj = TestModel.objects.create(value=1, created=datetime.datetime.now(), created_date=datetime.date.today())
|
||||
obj = TestModel.objects.create(value=1, created=now(), created_date=datetime.date.today())
|
||||
obj.value = 2
|
||||
obj.save()
|
||||
ClickHouseCollapseTestModel.sync_batch_from_storage()
|
||||
|
@ -63,7 +63,7 @@ class SyncTest(TransactionTestCase):
|
|||
def test_collapsing_update_by_version(self):
|
||||
ClickHouseCollapseTestModel.engine.version_col = 'version'
|
||||
|
||||
obj = TestModel.objects.create(value=1, created=datetime.datetime.now(), created_date=datetime.date.today())
|
||||
obj = TestModel.objects.create(value=1, created=now(), created_date=datetime.date.today())
|
||||
obj.value = 2
|
||||
obj.save()
|
||||
ClickHouseCollapseTestModel.sync_batch_from_storage()
|
||||
|
@ -97,7 +97,7 @@ class SyncTest(TransactionTestCase):
|
|||
self.assertEqual(0, len(synced_data))
|
||||
|
||||
def test_multi_model(self):
|
||||
obj = TestModel.objects.create(value=1, created=datetime.datetime.now(), created_date=datetime.date.today())
|
||||
obj = TestModel.objects.create(value=1, created=now(), created_date=datetime.date.today())
|
||||
obj.value = 2
|
||||
obj.save()
|
||||
ClickHouseMultiTestModel.sync_batch_from_storage()
|
||||
|
@ -268,7 +268,7 @@ class ProfileTest(TransactionTestCase):
|
|||
ClickHouseTestModel.sync_enabled = False
|
||||
|
||||
TestModel.objects.bulk_create([
|
||||
TestModel(created=datetime.datetime.now(), created_date='2018-01-01', value=i)
|
||||
TestModel(created=now(), created_date='2018-01-01', value=i)
|
||||
for i in range(self.BATCH_SIZE)
|
||||
])
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user