From 43a18214083be1f1c27f2f97f87eab624c8fa98d Mon Sep 17 00:00:00 2001 From: M1ha-Shvn Date: Wed, 19 Feb 2025 15:18:16 +0500 Subject: [PATCH] Fix bug in Storage lock hard release system if sync process works on multiple hosts --- setup.py | 2 +- src/django_clickhouse/storages.py | 25 +++++++++++++++++++------ src/django_clickhouse/utils.py | 2 +- 3 files changed, 21 insertions(+), 8 deletions(-) diff --git a/setup.py b/setup.py index 206ae93..53fadb6 100644 --- a/setup.py +++ b/setup.py @@ -13,7 +13,7 @@ with open('requirements.txt') as f: setup( name='django-clickhouse', - version='1.2.1', + version='1.2.2', packages=['django_clickhouse', 'django_clickhouse.management.commands'], package_dir={'': 'src'}, url='https://github.com/carrotquest/django-clickhouse', diff --git a/src/django_clickhouse/storages.py b/src/django_clickhouse/storages.py index 9502024..039b100 100644 --- a/src/django_clickhouse/storages.py +++ b/src/django_clickhouse/storages.py @@ -12,13 +12,14 @@ from typing import Any, Optional, List, Tuple import os +from celery.utils.nodenames import gethostname from django.utils.timezone import now from statsd.defaults.django import statsd from .configuration import config from .exceptions import ConfigurationError, RedisLockTimeoutError from .redis import redis_zadd -from .utils import check_pid, get_subclasses, SingletonMeta +from .utils import check_pid_exists, get_subclasses, SingletonMeta logger = logging.getLogger('django-clickhouse') @@ -215,19 +216,31 @@ class RedisStorage(Storage, metaclass=SingletonMeta): # Block process to be single threaded. Default sync delay is 10 * default sync delay. # It can be changed for model, by passing `lock_timeout` argument to pre_sync lock = self.get_lock(import_key, **kwargs) + current_host_name = gethostname() lock_pid_key = self.REDIS_KEY_LOCK_PID.format(import_key=import_key) try: lock.acquire() - self._redis.set(lock_pid_key, os.getpid()) + self._redis.set(lock_pid_key, '%s:%s' % (current_host_name, os.getpid())) except RedisLockTimeoutError: statsd.incr('%s.sync.%s.lock.timeout' % (config.STATSD_PREFIX, import_key)) + # Lock is busy. But If the process has been killed, I don't want to wait any more. - # Let's check if pid exists - pid = int(self._redis.get(lock_pid_key) or 0) - if pid and not check_pid(pid): + # I assume that lock has been killed if it works on the same host (other than localhost) + # and there is no process alive. + # I also assume that there are no hosts with same hostname other than localhost. + # Note: previously value contained only pid. Let's support old value for back compatibility + active_lock_data = self._redis.get(lock_pid_key).split(b":") + active_pid = int(active_lock_data[-1] or 0) + active_host_name = active_lock_data[0] \ + if len(active_lock_data) > 1 and active_lock_data[0] != "localhost" else None + + if ( + active_pid and active_host_name + and active_host_name == current_host_name and not check_pid_exists(active_pid) + ): statsd.incr('%s.sync.%s.lock.hard_release' % (config.STATSD_PREFIX, import_key)) logger.warning('django-clickhouse: hard releasing lock "%s" locked by pid %d (process is dead)' - % (import_key, pid)) + % (import_key, active_pid)) self._redis.delete(lock_pid_key) lock.hard_release() self.pre_sync(import_key, **kwargs) diff --git a/src/django_clickhouse/utils.py b/src/django_clickhouse/utils.py index 74588a9..07fabd0 100644 --- a/src/django_clickhouse/utils.py +++ b/src/django_clickhouse/utils.py @@ -127,7 +127,7 @@ def model_to_dict(instance: DjangoModel, fields: Optional[Iterable[str]] = None return data -def check_pid(pid): +def check_pid_exists(pid): """ Check For the existence of a unix pid. """