Converted all typing hints to python 3 style

This commit is contained in:
M1ha 2020-02-07 12:19:53 +05:00
parent 210c36a127
commit 5cb43ca6cd
13 changed files with 91 additions and 85 deletions

View File

@ -39,20 +39,27 @@ env:
- PG=9.6 DJANGO=2.1 - PG=9.6 DJANGO=2.1
- PG=10 DJANGO=2.1 - PG=10 DJANGO=2.1
- PG=11 DJANGO=2.1 - PG=11 DJANGO=2.1
- PG=12 DJANGO=2.1
- PG=9.6 DJANGO=2.2 - PG=9.6 DJANGO=2.2
- PG=10 DJANGO=2.2 - PG=10 DJANGO=2.2
- PG=11 DJANGO=2.2 - PG=11 DJANGO=2.2
- PG=12 DJANGO=2.2
- PG=9.6 DJANGO=3.0 - PG=9.6 DJANGO=3.0
- PG=10 DJANGO=3.0 - PG=10 DJANGO=3.0
- PG=11 DJANGO=3.0 - PG=11 DJANGO=3.0
- PG=12 DJANGO=3.0
before_install: before_install:
# Use default PostgreSQL 11 port # Use default PostgreSQL 11 port
- sudo sed -i 's/port = 5433/port = 5432/' /etc/postgresql/11/main/postgresql.conf - sudo sed -i 's/port = 5433/port = 5432/' /etc/postgresql/11/main/postgresql.conf
- sudo cp /etc/postgresql/{10,11}/main/pg_hba.conf - sudo cp /etc/postgresql/{10,11}/main/pg_hba.conf
- sudo sed -i 's/port = 5434/port = 5432/' /etc/postgresql/12/main/postgresql.conf
- sudo cp /etc/postgresql/{10,12}/main/pg_hba.conf
# Start PostgreSQL version we need # Start PostgreSQL version we need
- sudo systemctl stop postgresql && sudo systemctl start postgresql@$PG-main - sudo systemctl stop postgresql
- sudo systemctl start postgresql@$PG-main
# ClickHouse sources # ClickHouse sources
- sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv E0C56BD4 - sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv E0C56BD4

View File

@ -94,12 +94,11 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
return namedtuple("%sTuple" % cls.__name__, field_names, defaults=default_values) return namedtuple("%sTuple" % cls.__name__, field_names, defaults=default_values)
@classmethod @classmethod
def objects_in(cls, database): # type: (Database) -> QuerySet def objects_in(cls, database: Database)-> QuerySet:
return QuerySet(cls, database) return QuerySet(cls, database)
@classmethod @classmethod
def get_database_alias(cls, for_write=False): def get_database_alias(cls, for_write: bool = False) -> str:
# type: (bool) -> str
""" """
Gets database alias for read or write purposes Gets database alias for read or write purposes
:param for_write: Boolean flag if database is neede for read or for write :param for_write: Boolean flag if database is neede for read or for write
@ -112,8 +111,7 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
return db_router.db_for_read(cls) return db_router.db_for_read(cls)
@classmethod @classmethod
def get_database(cls, for_write=False): def get_database(cls, for_write: bool = False) -> Database:
# type: (bool) -> Database
""" """
Gets database alias for read or write purposes Gets database alias for read or write purposes
:param for_write: Boolean flag if database is neede for read or for write :param for_write: Boolean flag if database is neede for read or for write
@ -123,8 +121,8 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
return connections[db_alias] return connections[db_alias]
@classmethod @classmethod
def get_django_model_serializer(cls, writable=False, defaults=None): def get_django_model_serializer(cls, writable: bool= False, defaults: Optional[dict] = None
# type: (bool, Optional[dict]) -> Django2ClickHouseModelSerializer ) -> Django2ClickHouseModelSerializer:
serializer_cls = lazy_class_import(cls.django_model_serializer) serializer_cls = lazy_class_import(cls.django_model_serializer)
return serializer_cls(cls, writable=writable, defaults=defaults) return serializer_cls(cls, writable=writable, defaults=defaults)
@ -171,7 +169,7 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
return True return True
@classmethod @classmethod
def get_sync_query_set(cls, using, pk_set): # type: (str, Set[Any]) -> DjangoQuerySet def get_sync_query_set(cls, using: str, pk_set: Set[Any]) -> DjangoQuerySet:
""" """
Forms django queryset to fetch for sync Forms django queryset to fetch for sync
:param using: Database to fetch from :param using: Database to fetch from
@ -181,7 +179,7 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
return cls.django_model.objects.filter(pk__in=pk_set).using(using) return cls.django_model.objects.filter(pk__in=pk_set).using(using)
@classmethod @classmethod
def get_sync_objects(cls, operations): # type: (List[Tuple[str, str]]) -> List[DjangoModel] def get_sync_objects(cls, operations: List[Tuple[str, str]]) -> List[DjangoModel]:
""" """
Returns objects from main database to sync Returns objects from main database to sync
:param operations: A list of operations to perform :param operations: A list of operations to perform
@ -203,7 +201,7 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
return list(chain(*objs)) return list(chain(*objs))
@classmethod @classmethod
def get_insert_batch(cls, import_objects): # type: (Iterable[DjangoModel]) -> List[ClickHouseModel] def get_insert_batch(cls, import_objects: Iterable[DjangoModel]) -> List['ClickHouseModel']:
""" """
Formats django model objects to batch of ClickHouse objects Formats django model objects to batch of ClickHouse objects
:param import_objects: DjangoModel objects to import :param import_objects: DjangoModel objects to import
@ -267,7 +265,7 @@ class ClickHouseModel(with_metaclass(ClickHouseModelMeta, InfiModel)):
raise ex raise ex
@classmethod @classmethod
def need_sync(cls): # type: () -> bool def need_sync(cls) -> bool:
""" """
Checks if this model needs synchronization: sync is enabled and delay has passed Checks if this model needs synchronization: sync is enabled and delay has passed
:return: Boolean :return: Boolean

View File

@ -28,7 +28,7 @@ DEFAULTS = {
class Config: class Config:
def __getattr__(self, item): # type: (str) -> Any def __getattr__(self, item: str) -> Any:
if item not in DEFAULTS: if item not in DEFAULTS:
raise AttributeError('Unknown config parameter `%s`' % item) raise AttributeError('Unknown config parameter `%s`' % item)

View File

@ -35,8 +35,8 @@ class Database(InfiDatabase):
def _get_applied_migrations(self, migrations_package_name): def _get_applied_migrations(self, migrations_package_name):
raise NotImplementedError("This method is not supported by django_clickhouse.") raise NotImplementedError("This method is not supported by django_clickhouse.")
def select_tuples(self, query, model_class, settings=None): def select_tuples(self, query: str, model_class: Type['ClickHouseModel'], settings: Optional[dict] = None
# type: (str, Type['ClickHouseModel'], Optional[dict], Optional[dict]) -> Generator[tuple] ) -> Iterable[tuple]:
""" """
This method selects model_class namedtuples, instead of class instances. This method selects model_class namedtuples, instead of class instances.
Less memory consumption, greater speed Less memory consumption, greater speed
@ -67,11 +67,11 @@ class Database(InfiDatabase):
yield item yield item
def insert_tuples(self, model_class, model_tuples, batch_size=None, formatted=False): def insert_tuples(self, model_class: Type['ClickHouseModel'], model_tuples: Iterable[tuple],
# type: (Type['ClickHouseModel'], Iterable[tuple], Optional[int], bool) -> None batch_size: Optional[int] = None, formatted: bool = False) -> None:
""" """
Inserts model_class namedtuples Inserts model_class namedtuples
:param model_class: Clickhouse model, namedtuples are made from :param model_class: ClickHouse model, namedtuples are made from
:param model_tuples: An iterable of tuples to insert :param model_tuples: An iterable of tuples to insert
:param batch_size: Size of batch :param batch_size: Size of batch
:param formatted: If flag is set, tuples are expected to be ready to insert without calling field.to_db_string :param formatted: If flag is set, tuples are expected to be ready to insert without calling field.to_db_string

View File

@ -2,7 +2,7 @@
This file contains wrappers for infi.clckhouse_orm engines to use in django-clickhouse This file contains wrappers for infi.clckhouse_orm engines to use in django-clickhouse
""" """
import datetime import datetime
from typing import List, Type, Union, Iterable, Generator from typing import List, Type, Union, Iterable, Generator, Optional
from django.db.models import Model as DjangoModel from django.db.models import Model as DjangoModel
from infi.clickhouse_orm import engines as infi_engines from infi.clickhouse_orm import engines as infi_engines
@ -14,8 +14,7 @@ from .utils import format_datetime
class InsertOnlyEngineMixin: class InsertOnlyEngineMixin:
def get_insert_batch(self, model_cls, objects): def get_insert_batch(self, model_cls: Type['ClickHouseModel'], objects: List[DjangoModel]) -> Iterable[tuple]:
# type: (Type['ClickHouseModel'], List[DjangoModel]) -> Generator[tuple]
""" """
Gets a list of model_cls instances to insert into database Gets a list of model_cls instances to insert into database
:param model_cls: ClickHouseModel subclass to import :param model_cls: ClickHouseModel subclass to import
@ -69,8 +68,8 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
max_date=max_date, object_pks=','.join(object_pks)) max_date=max_date, object_pks=','.join(object_pks))
return connections[db_alias].select_tuples(query, model_cls) return connections[db_alias].select_tuples(query, model_cls)
def get_final_versions(self, model_cls, objects, date_col=None): def get_final_versions(self, model_cls: Type['ClickHouseModel'], objects: Iterable[DjangoModel],
# type: (Type['ClickHouseModel'], Iterable[DjangoModel], str) -> Generator[tuple] date_col: Optional[str] = None) -> Iterable[tuple]:
""" """
Get objects, that are currently stored in ClickHouse. Get objects, that are currently stored in ClickHouse.
Depending on the partition key this can be different for different models. Depending on the partition key this can be different for different models.
@ -82,7 +81,7 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
:return: A generator of named tuples, representing previous state :return: A generator of named tuples, representing previous state
""" """
def _dt_to_str(dt): # type: (Union[datetime.date, datetime.datetime]) -> str def _dt_to_str(dt: Union[datetime.date, datetime.datetime]) -> str:
if isinstance(dt, datetime.datetime): if isinstance(dt, datetime.datetime):
return format_datetime(dt, 0, db_alias=db_alias) return format_datetime(dt, 0, db_alias=db_alias)
elif isinstance(dt, datetime.date): elif isinstance(dt, datetime.date):
@ -123,8 +122,7 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
else: else:
return self._get_final_versions_by_final(*params) return self._get_final_versions_by_final(*params)
def get_insert_batch(self, model_cls, objects): def get_insert_batch(self, model_cls: Type['ClickHouseModel'], objects: List[DjangoModel]) -> Iterable[tuple]:
# type: (Type['ClickHouseModel'], List[DjangoModel]) -> Generator[tuple]
""" """
Gets a list of model_cls instances to insert into database Gets a list of model_cls instances to insert into database
:param model_cls: ClickHouseModel subclass to import :param model_cls: ClickHouseModel subclass to import

View File

@ -23,7 +23,7 @@ class Migration:
""" """
operations = [] operations = []
def apply(self, db_alias, database=None): # type: (str, Optional[Database]) -> None def apply(self, db_alias: str, database: Optional[Database] = None) -> None:
""" """
Applies migration to given database Applies migration to given database
:param db_alias: Database alias to apply migration to :param db_alias: Database alias to apply migration to
@ -41,8 +41,7 @@ class Migration:
op.apply(database) op.apply(database)
def migrate_app(app_label, db_alias, up_to=9999, database=None): def migrate_app(app_label: str, db_alias: str, up_to: int = 9999, database: Optional[Database] = None) -> None:
# type: (str, str, int, Optional[Database]) -> None
""" """
Migrates given django app Migrates given django app
:param app_label: App label to migrate :param app_label: App label to migrate
@ -110,7 +109,7 @@ class MigrationHistory(ClickHouseModel):
engine = MergeTree('applied', ('db_alias', 'package_name', 'module_name')) engine = MergeTree('applied', ('db_alias', 'package_name', 'module_name'))
@classmethod @classmethod
def set_migration_applied(cls, db_alias, migrations_package, name): # type: (str, str, str) -> None def set_migration_applied(cls, db_alias: str, migrations_package: str, name: str) -> None:
""" """
Sets migration apply status Sets migration apply status
:param db_alias: Database alias migration is applied to :param db_alias: Database alias migration is applied to
@ -126,7 +125,7 @@ class MigrationHistory(ClickHouseModel):
applied=datetime.date.today()) applied=datetime.date.today())
@classmethod @classmethod
def get_applied_migrations(cls, db_alias, migrations_package): # type: (str, str) -> Set[str] def get_applied_migrations(cls, db_alias: str, migrations_package: str) -> Set[str]:
""" """
Returns applied migrations names Returns applied migrations names
:param db_alias: Database alias, to check :param db_alias: Database alias, to check

View File

@ -5,7 +5,6 @@ It saves all operations to storage in order to write them to ClickHouse later.
from typing import Optional, Any, Type, Set from typing import Optional, Any, Type, Set
import functools
import six import six
from django.db import transaction from django.db import transaction
from django.db.models import QuerySet as DjangoQuerySet, Model as DjangoModel, Manager as DjangoManager from django.db.models import QuerySet as DjangoQuerySet, Model as DjangoModel, Manager as DjangoManager
@ -147,7 +146,7 @@ class ClickHouseSyncModel(DjangoModel):
abstract = True abstract = True
@classmethod @classmethod
def get_clickhouse_storage(cls): # type: () -> Storage def get_clickhouse_storage(cls) -> Storage:
""" """
Returns Storage instance to save clickhouse sync data to Returns Storage instance to save clickhouse sync data to
:return: :return:
@ -156,8 +155,7 @@ class ClickHouseSyncModel(DjangoModel):
return storage_cls() return storage_cls()
@classmethod @classmethod
def register_clickhouse_sync_model(cls, model_cls): def register_clickhouse_sync_model(cls, model_cls: Type['ClickHouseModel']) -> None:
# type: (Type['django_clickhouse.clickhouse_models.ClickHouseModel']) -> None
""" """
Registers ClickHouse model to listen to this model updates Registers ClickHouse model to listen to this model updates
:param model_cls: Model class to register :param model_cls: Model class to register
@ -169,7 +167,7 @@ class ClickHouseSyncModel(DjangoModel):
cls._clickhouse_sync_models.add(model_cls) cls._clickhouse_sync_models.add(model_cls)
@classmethod @classmethod
def get_clickhouse_sync_models(cls): # type: () -> Set['django_clickhouse.clickhouse_models.ClickHouseModel'] def get_clickhouse_sync_models(cls) -> Set['ClickHouseModel']:
""" """
Returns all clickhouse models, listening to this class Returns all clickhouse models, listening to this class
:return: A set of model classes to sync :return: A set of model classes to sync
@ -177,8 +175,7 @@ class ClickHouseSyncModel(DjangoModel):
return getattr(cls, '_clickhouse_sync_models', set()) return getattr(cls, '_clickhouse_sync_models', set())
@classmethod @classmethod
def register_clickhouse_operations(cls, operation, *model_pks, using=None): def register_clickhouse_operations(cls, operation: str, *model_pks: Any, using: Optional[str] = None) -> None:
# type: (str, *Any, Optional[str]) -> None
""" """
Registers model operation in storage Registers model operation in storage
:param operation: Operation type - one of [insert, update, delete) :param operation: Operation type - one of [insert, update, delete)
@ -197,10 +194,10 @@ class ClickHouseSyncModel(DjangoModel):
storage = cls.get_clickhouse_storage() storage = cls.get_clickhouse_storage()
transaction.on_commit(_on_commit, using=using) transaction.on_commit(_on_commit, using=using)
def post_save(self, created, using=None): # type: (bool, Optional[str]) -> None def post_save(self, created: bool, using: Optional[str] = None) -> None:
self.register_clickhouse_operations('insert' if created else 'update', self.pk, using=using) self.register_clickhouse_operations('insert' if created else 'update', self.pk, using=using)
def post_delete(self, using=None): # type: (Optional[str]) -> None def post_delete(self, using: Optional[str] = None) -> None:
self.register_clickhouse_operations('delete', self.pk, using=using) self.register_clickhouse_operations('delete', self.pk, using=using)

View File

@ -1,4 +1,4 @@
from typing import Optional, Iterable, List from typing import Optional, Iterable, List, Type
from copy import copy from copy import copy
from infi.clickhouse_orm.database import Database from infi.clickhouse_orm.database import Database
@ -13,22 +13,22 @@ class QuerySet(InfiQuerySet):
Basic QuerySet to use Basic QuerySet to use
""" """
def __init__(self, model_cls, database=None): # type: (Type[InfiModel], Optional[Database]) -> None def __init__(self, model_cls: Type[InfiModel], database: Optional[Database] = None) -> None:
super(QuerySet, self).__init__(model_cls, database) super(QuerySet, self).__init__(model_cls, database)
self._db_alias = None self._db_alias = None
@property @property
def _database(self): # type: () -> Database def _database(self) -> Database:
# HACK for correct work of all infi.clickhouse-orm methods # HACK for correct work of all infi.clickhouse-orm methods
# There are no write QuerySet methods now, so I use for_write=False by default # There are no write QuerySet methods now, so I use for_write=False by default
return self.get_database(for_write=False) return self.get_database(for_write=False)
@_database.setter @_database.setter
def _database(self, database): # type: (Database) -> None def _database(self, database: Database) -> None:
# HACK for correct work of all infi.clickhouse-orm methods # HACK for correct work of all infi.clickhouse-orm methods
self._db = database self._db = database
def get_database(self, for_write=False): # type: (bool) -> Database def get_database(self, for_write: bool = False) -> Database:
""" """
Gets database to execute query on. Looks for constructor or using() method. Gets database to execute query on. Looks for constructor or using() method.
If nothing was set tries to get database from model class using router. If nothing was set tries to get database from model class using router.
@ -43,7 +43,7 @@ class QuerySet(InfiQuerySet):
return self._db return self._db
def using(self, db_alias): # type: (str) -> QuerySet def using(self, db_alias: str) -> 'QuerySet':
""" """
Sets database alias to use for this query Sets database alias to use for this query
:param db_alias: Database alias name from CLICKHOUSE_DATABASES config option :param db_alias: Database alias name from CLICKHOUSE_DATABASES config option
@ -54,7 +54,7 @@ class QuerySet(InfiQuerySet):
qs._db = None # Previous database should be forgotten qs._db = None # Previous database should be forgotten
return qs return qs
def all(self): # type: () -> QuerySet def all(self) -> 'QuerySet':
""" """
Returns all items of queryset Returns all items of queryset
:return: QuerySet :return: QuerySet
@ -70,7 +70,7 @@ class QuerySet(InfiQuerySet):
self.get_database(for_write=True).insert([instance]) self.get_database(for_write=True).insert([instance])
return instance return instance
def bulk_create(self, model_instances, batch_size=1000): # type: (Iterable[InfiModel], int) -> List[InfiModel] def bulk_create(self, model_instances: Iterable[InfiModel], batch_size: int = 1000) -> List[InfiModel]:
self.get_database(for_write=True).insert(model_instances=model_instances, batch_size=batch_size) self.get_database(for_write=True).insert(model_instances=model_instances, batch_size=batch_size)
return list(model_instances) return list(model_instances)

View File

@ -1,7 +1,7 @@
""" """
This file defines router to find appropriate database This file defines router to find appropriate database
""" """
from typing import Optional from typing import Type
import random import random
import six import six
@ -13,8 +13,7 @@ from .utils import lazy_class_import
class DefaultRouter: class DefaultRouter:
def db_for_read(self, model, **hints): def db_for_read(self, model: Type[ClickHouseModel], **hints) -> str:
# type: (ClickHouseModel, **dict) -> str
""" """
Gets database to read from for model Gets database to read from for model
:param model: Model to decide for :param model: Model to decide for
@ -23,8 +22,7 @@ class DefaultRouter:
""" """
return random.choice(model.read_db_aliases) return random.choice(model.read_db_aliases)
def db_for_write(self, model, **hints): def db_for_write(self, model: Type[ClickHouseModel], **hints) -> str:
# type: (ClickHouseModel, **dict) -> str
""" """
Gets database to write to for model Gets database to write to for model
:param model: Model to decide for :param model: Model to decide for
@ -33,8 +31,8 @@ class DefaultRouter:
""" """
return random.choice(model.write_db_aliases) return random.choice(model.write_db_aliases)
def allow_migrate(self, db_alias, app_label, operation, model=None, **hints): def allow_migrate(self, db_alias: str, app_label: str, operation: Operation,
# type: (str, str, Operation, Optional[ClickHouseModel], **dict) -> bool model=None, **hints) -> bool:
""" """
Checks if migration can be applied to given database Checks if migration can be applied to given database
:param db_alias: Database alias to check :param db_alias: Database alias to check

View File

@ -1,4 +1,4 @@
from typing import NamedTuple from typing import NamedTuple, Optional, Iterable, Type
import pytz import pytz
from django.db.models import Model as DjangoModel from django.db.models import Model as DjangoModel
@ -7,7 +7,19 @@ from .utils import model_to_dict
class Django2ClickHouseModelSerializer: class Django2ClickHouseModelSerializer:
def __init__(self, model_cls, fields=None, exclude_fields=None, writable=False, defaults=None): def __init__(self, model_cls: Type['ClickHouseModel'], fields: Optional[Iterable[str]] = None,
exclude_fields: Optional[Iterable[str]] = None, writable: bool = False,
defaults: Optional[dict] = None) -> None:
"""
Initializes serializer
:param model_cls: ClickHouseModel subclass to serialize to
:param fields: Optional. A list of fields to add into result tuple
:param exclude_fields: Fields to exclude from result tuple
:param writable: If fields parameter is not set directly,
this flags determines if only writable or all fields should be taken from model_cls
:param defaults: A dictionary of field: value which are taken as default values for model_cls instances
:return: None
"""
self._model_cls = model_cls self._model_cls = model_cls
if fields is not None: if fields is not None:
self.serialize_fields = fields self.serialize_fields = fields
@ -18,7 +30,7 @@ class Django2ClickHouseModelSerializer:
self._result_class = self._model_cls.get_tuple_class(defaults=defaults) self._result_class = self._model_cls.get_tuple_class(defaults=defaults)
self._fields = self._model_cls.fields(writable=False) self._fields = self._model_cls.fields(writable=False)
def _get_serialize_kwargs(self, obj): def _get_serialize_kwargs(self, obj: DjangoModel) -> dict:
data = model_to_dict(obj, fields=self.serialize_fields, exclude_fields=self.exclude_serialize_fields) data = model_to_dict(obj, fields=self.serialize_fields, exclude_fields=self.exclude_serialize_fields)
# Remove None values, they should be initialized as defaults # Remove None values, they should be initialized as defaults
@ -29,5 +41,5 @@ class Django2ClickHouseModelSerializer:
return result return result
def serialize(self, obj): # type: (DjangoModel) -> NamedTuple def serialize(self, obj: DjangoModel) -> NamedTuple:
return self._result_class(**self._get_serialize_kwargs(obj)) return self._result_class(**self._get_serialize_kwargs(obj))

View File

@ -39,7 +39,7 @@ class Storage:
But ClickHouse is idempotent to duplicate inserts. So we can insert one batch twice correctly. But ClickHouse is idempotent to duplicate inserts. So we can insert one batch twice correctly.
""" """
def pre_sync(self, import_key, **kwargs): # type: (str, **dict) -> None def pre_sync(self, import_key: str, **kwargs) -> None:
""" """
This method is called before import process starts This method is called before import process starts
:param import_key: A key, returned by ClickHouseModel.get_import_key() method :param import_key: A key, returned by ClickHouseModel.get_import_key() method
@ -48,7 +48,7 @@ class Storage:
""" """
pass pass
def post_sync(self, import_key, **kwargs): # type: (str, **dict) -> None def post_sync(self, import_key: str, **kwargs) -> None:
""" """
This method is called after import process has finished. This method is called after import process has finished.
:param import_key: A key, returned by ClickHouseModel.get_import_key() method :param import_key: A key, returned by ClickHouseModel.get_import_key() method
@ -57,7 +57,7 @@ class Storage:
""" """
pass pass
def post_sync_failed(self, import_key, **kwargs): # type: (str, **dict) -> None def post_sync_failed(self, import_key: str, **kwargs) -> None:
""" """
This method is called after import process has finished with exception. This method is called after import process has finished with exception.
:param import_key: A key, returned by ClickHouseModel.get_import_key() method :param import_key: A key, returned by ClickHouseModel.get_import_key() method
@ -66,7 +66,7 @@ class Storage:
""" """
pass pass
def post_batch_removed(self, import_key, batch_size): # type: (str, int) -> None def post_batch_removed(self, import_key: str, batch_size: int) -> None:
""" """
This method marks that batch has been removed in statsd This method marks that batch has been removed in statsd
:param import_key: A key, returned by ClickHouseModel.get_import_key() method :param import_key: A key, returned by ClickHouseModel.get_import_key() method
@ -76,8 +76,7 @@ class Storage:
key = "%s.sync.%s.queue" % (config.STATSD_PREFIX, import_key) key = "%s.sync.%s.queue" % (config.STATSD_PREFIX, import_key)
statsd.gauge(key, self.operations_count(import_key)) statsd.gauge(key, self.operations_count(import_key))
def operations_count(self, import_key, **kwargs): def operations_count(self, import_key: str, **kwargs) -> int:
# type: (str, **dict) -> int
""" """
Returns sync queue size Returns sync queue size
:param import_key: A key, returned by ClickHouseModel.get_import_key() method :param import_key: A key, returned by ClickHouseModel.get_import_key() method
@ -86,8 +85,7 @@ class Storage:
""" """
raise NotImplemented() raise NotImplemented()
def get_operations(self, import_key, count, **kwargs): def get_operations(self, import_key: str, count: int, **kwargs) -> List[Tuple[str, str]]:
# type: (str, int, **dict) -> List[Tuple[str, str]]
""" """
Must return a list of operations on the model. Must return a list of operations on the model.
Method should be error safe - if something goes wrong, import data should not be lost. Method should be error safe - if something goes wrong, import data should not be lost.
@ -98,7 +96,7 @@ class Storage:
""" """
raise NotImplemented() raise NotImplemented()
def register_operations(self, import_key, operation, *pks): # type: (str, str, *Any) -> int def register_operations(self, import_key: str, operation: str, *pks: Any) -> int:
""" """
Registers new incoming operation Registers new incoming operation
:param import_key: A key, returned by ClickHouseModel.get_import_key() method :param import_key: A key, returned by ClickHouseModel.get_import_key() method
@ -108,8 +106,7 @@ class Storage:
""" """
raise NotImplementedError() raise NotImplementedError()
def register_operations_wrapped(self, import_key, operation, *pks): def register_operations_wrapped(self, import_key: str, operation: str, *pks: Any) -> int:
# type: (str, str, *Any) -> int
""" """
This is a wrapper for register_operation method, checking main parameters. This is a wrapper for register_operation method, checking main parameters.
This method should be called from inner functions. This method should be called from inner functions.
@ -140,14 +137,14 @@ class Storage:
""" """
raise NotImplemented() raise NotImplemented()
def get_last_sync_time(self, import_key): # type: (str) -> Optional[datetime.datetime] def get_last_sync_time(self, import_key: str) -> Optional[datetime.datetime]:
""" """
Gets the last time, sync has been executed Gets the last time, sync has been executed
:return: datetime.datetime if last sync has been. Otherwise - None. :return: datetime.datetime if last sync has been. Otherwise - None.
""" """
raise NotImplemented() raise NotImplemented()
def set_last_sync_time(self, import_key, dt): # type: (str, datetime.datetime) -> None def set_last_sync_time(self, import_key: str, dt: datetime.datetime) -> None:
""" """
Sets successful sync time Sets successful sync time
:return: None :return: None

View File

@ -11,14 +11,14 @@ from .utils import get_subclasses
@shared_task(queue=config.CELERY_QUEUE) @shared_task(queue=config.CELERY_QUEUE)
def sync_clickhouse_model(cls): # type: (ClickHouseModel) -> None def sync_clickhouse_model(model_cls) -> None:
""" """
Syncs one batch of given ClickHouseModel Syncs one batch of given ClickHouseModel
:param cls: ClickHouseModel subclass :param model_cls: ClickHouseModel subclass
:return: None :return: None
""" """
cls.get_storage().set_last_sync_time(cls.get_import_key(), datetime.datetime.now()) model_cls.get_storage().set_last_sync_time(model_cls.get_import_key(), datetime.datetime.now())
cls.sync_batch_from_storage() model_cls.sync_batch_from_storage()
@shared_task(queue=config.CELERY_QUEUE) @shared_task(queue=config.CELERY_QUEUE)

View File

@ -18,7 +18,7 @@ from .database import connections
T = TypeVar('T') T = TypeVar('T')
def get_tz_offset(db_alias=None): # type: (Optional[str]) -> int def get_tz_offset(db_alias: Optional[str] = None) -> int:
""" """
Returns ClickHouse server timezone offset in minutes Returns ClickHouse server timezone offset in minutes
:param db_alias: The database alias used :param db_alias: The database alias used
@ -28,8 +28,8 @@ def get_tz_offset(db_alias=None): # type: (Optional[str]) -> int
return int(db.server_timezone.utcoffset(datetime.datetime.utcnow()).total_seconds() / 60) return int(db.server_timezone.utcoffset(datetime.datetime.utcnow()).total_seconds() / 60)
def format_datetime(dt, timezone_offset=0, day_end=False, db_alias=None): def format_datetime(dt: Union[datetime.date, datetime.datetime], timezone_offset: int = 0, day_end: bool = False,
# type: (Union[datetime.date, datetime.datetime], int, bool, Optional[str]) -> str db_alias: Optional[str] = None) -> str:
""" """
Formats datetime and date objects to format that can be used in WHERE conditions of query Formats datetime and date objects to format that can be used in WHERE conditions of query
:param dt: datetime.datetime or datetime.date object :param dt: datetime.datetime or datetime.date object
@ -58,9 +58,9 @@ def format_datetime(dt, timezone_offset=0, day_end=False, db_alias=None):
return server_dt.strftime("%Y-%m-%d %H:%M:%S") return server_dt.strftime("%Y-%m-%d %H:%M:%S")
def module_exists(module_name): # type: (str) -> bool def module_exists(module_name: str) -> bool:
""" """
Checks if moudle exists Checks if module exists
:param module_name: Dot-separated module name :param module_name: Dot-separated module name
:return: Boolean :return: Boolean
""" """
@ -69,7 +69,7 @@ def module_exists(module_name): # type: (str) -> bool
return spam_spec is not None return spam_spec is not None
def lazy_class_import(obj): # type: (Union[str, Any]) -> Any def lazy_class_import(obj: Union[str, Any]) -> Any:
""" """
If string is given, imports object by given module path. If string is given, imports object by given module path.
Otherwise returns the object Otherwise returns the object
@ -88,7 +88,7 @@ def lazy_class_import(obj): # type: (Union[str, Any]) -> Any
return obj return obj
def get_subclasses(cls, recursive=False): # type: (T, bool) -> Set[T] def get_subclasses(cls: T, recursive: bool = False) -> Set[T]:
""" """
Gets all subclasses of given class Gets all subclasses of given class
Attention!!! Classes would be found only if they were imported before using this function Attention!!! Classes would be found only if they were imported before using this function
@ -105,8 +105,8 @@ def get_subclasses(cls, recursive=False): # type: (T, bool) -> Set[T]
return subclasses return subclasses
def model_to_dict(instance, fields=None, exclude_fields=None): def model_to_dict(instance: DjangoModel, fields: Optional[Iterable[str]] = None,
# type: (DjangoModel, Optional[Iterable[str]], Optional[Iterable[str]]) -> Dict[str, Any] exclude_fields: Optional[Iterable[str]] = None) -> Dict[str, Any]:
""" """
Standard model_to_dict ignores some fields if they have invalid naming Standard model_to_dict ignores some fields if they have invalid naming
:param instance: Object to convert to dictionary :param instance: Object to convert to dictionary