django-clickhouse/docs/storages.md

71 lines
3.7 KiB
Markdown
Raw Normal View History

2019-04-21 08:04:41 +03:00
# Storages
Storage class is a facade, that stores information about operations, which where performed on django models.
It has three main purposes:
* Storage should be fast to insert single records. It forms a batch of data, which is then inserted to ClickHouse.
* Storage guarantees, that no data is lost.
Intermediate data in storage is deleted only after importing batch finishes successfully.
If it fails in some point - starting new import process should import failed data again.
* Keep information about sync process. For instance, last time the model sync has been called.
In order to determine different models from each other storage uses `import_key`.
By default, it is generated by `ClickHouseModel.get_import_key()` method and is equal to class name.
Each method of abstract `Storage` class takes `kwargs` parameters, which can be used in concrete storage.
## Storage methods
* `register_operations(import_key: str, operation: str, *pks: *Any) -> int`
Saves a new operation in source database to storage. This method should be fast.
It is called after source database transaction is committed.
Method returns number of operations registered.
`operation` is one of `insert`, `update` or `delete`
`pks` is an iterable of strings, enough to select needed records from source database.
* `get_last_sync_time(import_key: str) -> Optional[datetime.datetime]`
Returns last time, a model sync has been called. If no sync has been done, returns None.
* `set_last_sync_time(import_key: str, dt: datetime.datetime) -> None`
Saves datetime, when a sync process has been called last time.
* `register_operations_wrapped(self, import_key: str, operation: str, *pks: *Any) -> int`
A wrapper for register_operations. It's goal is to write metrics and logs.
* `pre_sync(import_key: str, **kwargs) -> None`
Called before import process starts. It initializes storage for importing new batch.
* `operations_count(import_key: str, **kwargs) -> int`
Counts, how many operations are waiting for import in storage.
* `get_operations(import_key: str, count: int, **kwargs) -> List[Tuple[str, str]]`
Returns a next batch of operations to import. `count` parameter gives a number of operations to return.
Operation is a tuple `(operation, primary_key)`, where `operation` is one of insert, update or delete
and `primary_key` is a string enough to select record from source database.
* `post_sync(import_key: str, **kwargs) -> None`
Called after import process have finished. It cleans storage after importing a batch.
* `post_batch_removed(import_key: str, batch_size: int) -> None`
This method should be called by `post_sync` method after data is removed from storage.
By default, it marks queue size metric.
* `post_sync_failed(import_key: str, exception: Exception, **kwargs) -> None:`
Called if any exception has occurred during import process. It cleans storage after unsuccessful import.
2020-02-06 11:39:56 +03:00
Note that if import process is hardly killed (with OOM killer, for instance) this method is not called.
2019-04-21 08:04:41 +03:00
* `flush() -> None`
*Dangerous*. Drops all data, kept by storage. It is used for cleaning up between tests.
## Predefined storages
2020-02-06 11:39:56 +03:00
### RedisStorage
2019-04-21 08:04:41 +03:00
This storage uses [Redis database](https://redis.io/) as intermediate storage.
To communicate with Redis it uses [redis-py](https://redis-py.readthedocs.io/en/latest/) library.
It is not required, but should be installed to use RedisStorage.
2020-02-06 11:39:56 +03:00
In order to use RedisStorage you must also fill [CLICKHOUSE_REDIS_CONFIG](configuration.md#clickhouse_redis_config) parameter.
2019-04-21 08:04:41 +03:00
Stored operation contains:
* Django database alias where original record can be found.
* Record primary key
* Operation performed (insert, update, delete)
This storage does not allow multi-threaded sync.