django-clickhouse/docs/storages.md
2019-04-21 10:04:41 +05:00

71 lines
3.7 KiB
Markdown

# Storages
Storage class is a facade, that stores information about operations, which where performed on django models.
It has three main purposes:
* Storage should be fast to insert single records. It forms a batch of data, which is then inserted to ClickHouse.
* Storage guarantees, that no data is lost.
Intermediate data in storage is deleted only after importing batch finishes successfully.
If it fails in some point - starting new import process should import failed data again.
* Keep information about sync process. For instance, last time the model sync has been called.
In order to determine different models from each other storage uses `import_key`.
By default, it is generated by `ClickHouseModel.get_import_key()` method and is equal to class name.
Each method of abstract `Storage` class takes `kwargs` parameters, which can be used in concrete storage.
## Storage methods
* `register_operations(import_key: str, operation: str, *pks: *Any) -> int`
Saves a new operation in source database to storage. This method should be fast.
It is called after source database transaction is committed.
Method returns number of operations registered.
`operation` is one of `insert`, `update` or `delete`
`pks` is an iterable of strings, enough to select needed records from source database.
* `get_last_sync_time(import_key: str) -> Optional[datetime.datetime]`
Returns last time, a model sync has been called. If no sync has been done, returns None.
* `set_last_sync_time(import_key: str, dt: datetime.datetime) -> None`
Saves datetime, when a sync process has been called last time.
* `register_operations_wrapped(self, import_key: str, operation: str, *pks: *Any) -> int`
A wrapper for register_operations. It's goal is to write metrics and logs.
* `pre_sync(import_key: str, **kwargs) -> None`
Called before import process starts. It initializes storage for importing new batch.
* `operations_count(import_key: str, **kwargs) -> int`
Counts, how many operations are waiting for import in storage.
* `get_operations(import_key: str, count: int, **kwargs) -> List[Tuple[str, str]]`
Returns a next batch of operations to import. `count` parameter gives a number of operations to return.
Operation is a tuple `(operation, primary_key)`, where `operation` is one of insert, update or delete
and `primary_key` is a string enough to select record from source database.
* `post_sync(import_key: str, **kwargs) -> None`
Called after import process have finished. It cleans storage after importing a batch.
* `post_batch_removed(import_key: str, batch_size: int) -> None`
This method should be called by `post_sync` method after data is removed from storage.
By default, it marks queue size metric.
* `post_sync_failed(import_key: str, exception: Exception, **kwargs) -> None:`
Called if any exception has occurred during import process. It cleans storage after unsuccessful import.
Note that if import process is hardly killed (with OOM, for instance) this method is not called.
* `flush() -> None`
*Dangerous*. Drops all data, kept by storage. It is used for cleaning up between tests.
## Predefined storages
### <a name="redis_storage">RedisStorage</a>
This storage uses [Redis database](https://redis.io/) as intermediate storage.
To communicate with Redis it uses [redis-py](https://redis-py.readthedocs.io/en/latest/) library.
It is not required, but should be installed to use RedisStorage.
In order to use RedisStorage you must also fill [CLICKHOUSE_REDIS_CONFIG](configuration.md#redis_config) parameter.
Stored operation contains:
* Django database alias where original record can be found.
* Record primary key
* Operation performed (insert, update, delete)
This storage does not allow multi-threaded sync.