3.7 KiB
Storages
Storage class is a facade, that stores information about operations, which where performed on django models.
It has three main purposes:
- Storage should be fast to insert single records. It forms a batch of data, which is then inserted to ClickHouse.
- Storage guarantees, that no data is lost. Intermediate data in storage is deleted only after importing batch finishes successfully. If it fails in some point - starting new import process should import failed data again.
- Keep information about sync process. For instance, last time the model sync has been called.
In order to determine different models from each other storage uses import_key
.
By default, it is generated by ClickHouseModel.get_import_key()
method and is equal to class name.
Each method of abstract Storage
class takes kwargs
parameters, which can be used in concrete storage.
Storage methods
-
register_operations(import_key: str, operation: str, *pks: *Any) -> int
Saves a new operation in source database to storage. This method should be fast. It is called after source database transaction is committed.
Method returns number of operations registered.
operation
is one ofinsert
,update
ordelete
pks
is an iterable of strings, enough to select needed records from source database. -
get_last_sync_time(import_key: str) -> Optional[datetime.datetime]
Returns last time, a model sync has been called. If no sync has been done, returns None. -
set_last_sync_time(import_key: str, dt: datetime.datetime) -> None
Saves datetime, when a sync process has been called last time. -
register_operations_wrapped(self, import_key: str, operation: str, *pks: *Any) -> int
A wrapper for register_operations. It's goal is to write metrics and logs. -
pre_sync(import_key: str, **kwargs) -> None
Called before import process starts. It initializes storage for importing new batch. -
operations_count(import_key: str, **kwargs) -> int
Counts, how many operations are waiting for import in storage. -
get_operations(import_key: str, count: int, **kwargs) -> List[Tuple[str, str]]
Returns a next batch of operations to import.count
parameter gives a number of operations to return. Operation is a tuple(operation, primary_key)
, whereoperation
is one of insert, update or delete andprimary_key
is a string enough to select record from source database. -
post_sync(import_key: str, **kwargs) -> None
Called after import process have finished. It cleans storage after importing a batch. -
post_batch_removed(import_key: str, batch_size: int) -> None
This method should be called bypost_sync
method after data is removed from storage. By default, it marks queue size metric. -
post_sync_failed(import_key: str, exception: Exception, **kwargs) -> None:
Called if any exception has occurred during import process. It cleans storage after unsuccessful import. Note that if import process is hardly killed (with OOM killer, for instance) this method is not called. -
flush() -> None
Dangerous. Drops all data, kept by storage. It is used for cleaning up between tests.
Predefined storages
RedisStorage
This storage uses Redis database as intermediate storage. To communicate with Redis it uses redis-py library. It is not required, but should be installed to use RedisStorage. In order to use RedisStorage you must also fill CLICKHOUSE_REDIS_CONFIG parameter.
Stored operation contains:
- Django database alias where original record can be found.
- Record primary key
- Operation performed (insert, update, delete)
This storage does not allow multi-threaded sync.