mirror of
https://github.com/carrotquest/django-clickhouse.git
synced 2025-05-28 09:43:11 +03:00
Fix linter errors
This commit is contained in:
parent
116b6470ba
commit
a15af0c1b5
|
@ -2,19 +2,20 @@
|
|||
This file contains wrappers for infi.clckhouse_orm engines to use in django-clickhouse
|
||||
"""
|
||||
import datetime
|
||||
from typing import List, Type, Union, Iterable, Generator, Optional
|
||||
from typing import List, Type, Union, Iterable, Optional
|
||||
|
||||
from django.db.models import Model as DjangoModel
|
||||
from infi.clickhouse_orm import engines as infi_engines
|
||||
from statsd.defaults.django import statsd
|
||||
|
||||
from .clickhouse_models import ClickHouseModel
|
||||
from .configuration import config
|
||||
from .database import connections
|
||||
from .utils import format_datetime
|
||||
|
||||
|
||||
class InsertOnlyEngineMixin:
|
||||
def get_insert_batch(self, model_cls: Type['ClickHouseModel'], objects: List[DjangoModel]) -> Iterable[tuple]:
|
||||
def get_insert_batch(self, model_cls: Type[ClickHouseModel], objects: List[DjangoModel]) -> Iterable[tuple]:
|
||||
"""
|
||||
Gets a list of model_cls instances to insert into database
|
||||
:param model_cls: ClickHouseModel subclass to import
|
||||
|
@ -68,7 +69,7 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
|
|||
max_date=max_date, object_pks=','.join(object_pks))
|
||||
return connections[db_alias].select_tuples(query, model_cls)
|
||||
|
||||
def get_final_versions(self, model_cls: Type['ClickHouseModel'], objects: Iterable[DjangoModel],
|
||||
def get_final_versions(self, model_cls: Type[ClickHouseModel], objects: Iterable[DjangoModel],
|
||||
date_col: Optional[str] = None) -> Iterable[tuple]:
|
||||
"""
|
||||
Get objects, that are currently stored in ClickHouse.
|
||||
|
@ -122,7 +123,7 @@ class CollapsingMergeTree(InsertOnlyEngineMixin, infi_engines.CollapsingMergeTre
|
|||
else:
|
||||
return self._get_final_versions_by_final(*params)
|
||||
|
||||
def get_insert_batch(self, model_cls: Type['ClickHouseModel'], objects: List[DjangoModel]) -> Iterable[tuple]:
|
||||
def get_insert_batch(self, model_cls: Type[ClickHouseModel], objects: List[DjangoModel]) -> Iterable[tuple]:
|
||||
"""
|
||||
Gets a list of model_cls instances to insert into database
|
||||
:param model_cls: ClickHouseModel subclass to import
|
||||
|
|
Loading…
Reference in New Issue
Block a user