3.7 KiB
		
	
	
	
	
	
	
	
			
		
		
	
	Storages
Storage class is a facade, that stores information about operations, which where performed on django models.
It has three main purposes:
- Storage should be fast to insert single records. It forms a batch of data, which is then inserted to ClickHouse.
- Storage guarantees, that no data is lost. Intermediate data in storage is deleted only after importing batch finishes successfully. If it fails in some point - starting new import process should import failed data again.
- Keep information about sync process. For instance, last time the model sync has been called.
In order to determine different models from each other storage uses import_key.
By default, it is generated by ClickHouseModel.get_import_key() method and is equal to class name.
Each method of abstract Storage class takes kwargs parameters, which can be used in concrete storage.
Storage methods
- 
register_operations(import_key: str, operation: str, *pks: *Any) -> int
 Saves a new operation in source database to storage. This method should be fast. It is called after source database transaction is committed.
 Method returns number of operations registered.
 operationis one ofinsert,updateordelete
 pksis an iterable of strings, enough to select needed records from source database.
- 
get_last_sync_time(import_key: str) -> Optional[datetime.datetime]
 Returns last time, a model sync has been called. If no sync has been done, returns None.
- 
set_last_sync_time(import_key: str, dt: datetime.datetime) -> None
 Saves datetime, when a sync process has been called last time.
- 
register_operations_wrapped(self, import_key: str, operation: str, *pks: *Any) -> int
 A wrapper for register_operations. It's goal is to write metrics and logs.
- 
pre_sync(import_key: str, **kwargs) -> None
 Called before import process starts. It initializes storage for importing new batch.
- 
operations_count(import_key: str, **kwargs) -> int
 Counts, how many operations are waiting for import in storage.
- 
get_operations(import_key: str, count: int, **kwargs) -> List[Tuple[str, str]]
 Returns a next batch of operations to import.countparameter gives a number of operations to return. Operation is a tuple(operation, primary_key), whereoperationis one of insert, update or delete andprimary_keyis a string enough to select record from source database.
- 
post_sync(import_key: str, **kwargs) -> None
 Called after import process have finished. It cleans storage after importing a batch.
- 
post_batch_removed(import_key: str, batch_size: int) -> None
 This method should be called bypost_syncmethod after data is removed from storage. By default, it marks queue size metric.
- 
post_sync_failed(import_key: str, exception: Exception, **kwargs) -> None:
 Called if any exception has occurred during import process. It cleans storage after unsuccessful import. Note that if import process is hardly killed (with OOM killer, for instance) this method is not called.
- 
flush() -> None
 Dangerous. Drops all data, kept by storage. It is used for cleaning up between tests.
Predefined storages
RedisStorage
This storage uses Redis database as intermediate storage. To communicate with Redis it uses redis-py library. It is not required, but should be installed to use RedisStorage. In order to use RedisStorage you must also fill CLICKHOUSE_REDIS_CONFIG parameter.
Stored operation contains:
- Django database alias where original record can be found.
- Record primary key
- Operation performed (insert, update, delete)
This storage does not allow multi-threaded sync.