mirror of
				https://github.com/carrotquest/django-clickhouse.git
				synced 2025-11-04 01:47:46 +03:00 
			
		
		
		
	
		
			
				
	
	
		
			71 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Markdown
		
	
	
	
	
	
			
		
		
	
	
			71 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Markdown
		
	
	
	
	
	
# Storages
 | 
						|
Storage class is a facade, that stores information about operations, which where performed on django models.  
 | 
						|
It has three main purposes:
 | 
						|
* Storage should be fast to insert single records. It forms a batch of data, which is then inserted to ClickHouse.
 | 
						|
* Storage guarantees, that no data is lost.
 | 
						|
  Intermediate data in storage is deleted only after importing batch finishes successfully.
 | 
						|
  If it fails in some point - starting new import process should import failed data again.  
 | 
						|
* Keep information about sync process. For instance, last time the model sync has been called.
 | 
						|
 | 
						|
In order to determine different models from each other storage uses `import_key`. 
 | 
						|
By default, it is generated by `ClickHouseModel.get_import_key()` method and is equal to class name.
 | 
						|
 | 
						|
Each method of abstract `Storage` class takes `kwargs` parameters, which can be used in concrete storage.
 | 
						|
  
 | 
						|
## Storage methods
 | 
						|
* `register_operations(import_key: str, operation: str, *pks: *Any) -> int`  
 | 
						|
  Saves a new operation in source database to storage. This method should be fast. 
 | 
						|
  It is called after source database transaction is committed.  
 | 
						|
  Method returns number of operations registered.   
 | 
						|
  `operation` is one of `insert`, `update` or `delete`  
 | 
						|
  `pks` is an iterable of strings, enough to select needed records from source database. 
 | 
						|
  
 | 
						|
* `get_last_sync_time(import_key: str) -> Optional[datetime.datetime]`  
 | 
						|
  Returns last time, a model sync has been called. If no sync has been done, returns None.
 | 
						|
  
 | 
						|
* `set_last_sync_time(import_key: str, dt: datetime.datetime) -> None`  
 | 
						|
  Saves datetime, when a sync process has been called last time.
 | 
						|
 
 | 
						|
* `register_operations_wrapped(self, import_key: str, operation: str, *pks: *Any) -> int`  
 | 
						|
  A wrapper for register_operations. It's goal is to write metrics and logs.
 | 
						|
  
 | 
						|
* `pre_sync(import_key: str, **kwargs) -> None`  
 | 
						|
  Called before import process starts. It initializes storage for importing new batch.
 | 
						|
  
 | 
						|
* `operations_count(import_key: str, **kwargs) -> int`  
 | 
						|
  Counts, how many operations are waiting for import in storage.
 | 
						|
  
 | 
						|
* `get_operations(import_key: str, count: int, **kwargs) -> List[Tuple[str, str]]`  
 | 
						|
  Returns a next batch of operations to import. `count` parameter gives a number of operations to return.
 | 
						|
  Operation is a tuple `(operation, primary_key)`, where `operation` is one of insert, update or delete
 | 
						|
  and `primary_key` is a string enough to select record from source database.
 | 
						|
  
 | 
						|
* `post_sync(import_key: str, **kwargs) -> None`  
 | 
						|
  Called after import process have finished. It cleans storage after importing a batch.
 | 
						|
  
 | 
						|
* `post_batch_removed(import_key: str, batch_size: int) -> None`  
 | 
						|
  This method should be called by `post_sync` method after data is removed from storage.
 | 
						|
  By default, it marks queue size metric.
 | 
						|
  
 | 
						|
* `post_sync_failed(import_key: str, exception: Exception, **kwargs) -> None:`  
 | 
						|
  Called if any exception has occurred during import process. It cleans storage after unsuccessful import.
 | 
						|
  Note that if import process is hardly killed (with OOM killer, for instance) this method is not called.
 | 
						|
  
 | 
						|
* `flush() -> None`  
 | 
						|
  *Dangerous*. Drops all data, kept by storage. It is used for cleaning up between tests.
 | 
						|
 | 
						|
 | 
						|
## Predefined storages
 | 
						|
### RedisStorage
 | 
						|
This storage uses [Redis database](https://redis.io/) as intermediate storage. 
 | 
						|
To communicate with Redis it uses [redis-py](https://redis-py.readthedocs.io/en/latest/) library. 
 | 
						|
It is not required, but should be installed to use RedisStorage. 
 | 
						|
In order to use RedisStorage you must also fill [CLICKHOUSE_REDIS_CONFIG](configuration.md#clickhouse_redis_config) parameter.
 | 
						|
 | 
						|
Stored operation contains:
 | 
						|
* Django database alias where original record can be found.
 | 
						|
* Record primary key
 | 
						|
* Operation performed (insert, update, delete)
 | 
						|
 | 
						|
This storage does not allow multi-threaded sync.
 |