django-clickhouse/docs/storages.md
2020-02-06 13:39:56 +05:00

3.7 KiB

Storages

Storage class is a facade, that stores information about operations, which where performed on django models.
It has three main purposes:

  • Storage should be fast to insert single records. It forms a batch of data, which is then inserted to ClickHouse.
  • Storage guarantees, that no data is lost. Intermediate data in storage is deleted only after importing batch finishes successfully. If it fails in some point - starting new import process should import failed data again.
  • Keep information about sync process. For instance, last time the model sync has been called.

In order to determine different models from each other storage uses import_key. By default, it is generated by ClickHouseModel.get_import_key() method and is equal to class name.

Each method of abstract Storage class takes kwargs parameters, which can be used in concrete storage.

Storage methods

  • register_operations(import_key: str, operation: str, *pks: *Any) -> int
    Saves a new operation in source database to storage. This method should be fast. It is called after source database transaction is committed.
    Method returns number of operations registered.
    operation is one of insert, update or delete
    pks is an iterable of strings, enough to select needed records from source database.

  • get_last_sync_time(import_key: str) -> Optional[datetime.datetime]
    Returns last time, a model sync has been called. If no sync has been done, returns None.

  • set_last_sync_time(import_key: str, dt: datetime.datetime) -> None
    Saves datetime, when a sync process has been called last time.

  • register_operations_wrapped(self, import_key: str, operation: str, *pks: *Any) -> int
    A wrapper for register_operations. It's goal is to write metrics and logs.

  • pre_sync(import_key: str, **kwargs) -> None
    Called before import process starts. It initializes storage for importing new batch.

  • operations_count(import_key: str, **kwargs) -> int
    Counts, how many operations are waiting for import in storage.

  • get_operations(import_key: str, count: int, **kwargs) -> List[Tuple[str, str]]
    Returns a next batch of operations to import. count parameter gives a number of operations to return. Operation is a tuple (operation, primary_key), where operation is one of insert, update or delete and primary_key is a string enough to select record from source database.

  • post_sync(import_key: str, **kwargs) -> None
    Called after import process have finished. It cleans storage after importing a batch.

  • post_batch_removed(import_key: str, batch_size: int) -> None
    This method should be called by post_sync method after data is removed from storage. By default, it marks queue size metric.

  • post_sync_failed(import_key: str, exception: Exception, **kwargs) -> None:
    Called if any exception has occurred during import process. It cleans storage after unsuccessful import. Note that if import process is hardly killed (with OOM killer, for instance) this method is not called.

  • flush() -> None
    Dangerous. Drops all data, kept by storage. It is used for cleaning up between tests.

Predefined storages

RedisStorage

This storage uses Redis database as intermediate storage. To communicate with Redis it uses redis-py library. It is not required, but should be installed to use RedisStorage. In order to use RedisStorage you must also fill CLICKHOUSE_REDIS_CONFIG parameter.

Stored operation contains:

  • Django database alias where original record can be found.
  • Record primary key
  • Operation performed (insert, update, delete)

This storage does not allow multi-threaded sync.