mirror of
https://github.com/carrotquest/django-clickhouse.git
synced 2025-02-23 22:40:32 +03:00
Merge pull request #53 from carrotquest/fix-lock-hard-release
Fix bug in Storage lock hard release system if sync process works on multiple hosts
This commit is contained in:
commit
3b79ad697a
2
setup.py
2
setup.py
|
@ -13,7 +13,7 @@ with open('requirements.txt') as f:
|
||||||
|
|
||||||
setup(
|
setup(
|
||||||
name='django-clickhouse',
|
name='django-clickhouse',
|
||||||
version='1.2.1',
|
version='1.2.2',
|
||||||
packages=['django_clickhouse', 'django_clickhouse.management.commands'],
|
packages=['django_clickhouse', 'django_clickhouse.management.commands'],
|
||||||
package_dir={'': 'src'},
|
package_dir={'': 'src'},
|
||||||
url='https://github.com/carrotquest/django-clickhouse',
|
url='https://github.com/carrotquest/django-clickhouse',
|
||||||
|
|
|
@ -12,13 +12,14 @@ from typing import Any, Optional, List, Tuple
|
||||||
|
|
||||||
import os
|
import os
|
||||||
|
|
||||||
|
from celery.utils.nodenames import gethostname
|
||||||
from django.utils.timezone import now
|
from django.utils.timezone import now
|
||||||
from statsd.defaults.django import statsd
|
from statsd.defaults.django import statsd
|
||||||
|
|
||||||
from .configuration import config
|
from .configuration import config
|
||||||
from .exceptions import ConfigurationError, RedisLockTimeoutError
|
from .exceptions import ConfigurationError, RedisLockTimeoutError
|
||||||
from .redis import redis_zadd
|
from .redis import redis_zadd
|
||||||
from .utils import check_pid, get_subclasses, SingletonMeta
|
from .utils import check_pid_exists, get_subclasses, SingletonMeta
|
||||||
|
|
||||||
logger = logging.getLogger('django-clickhouse')
|
logger = logging.getLogger('django-clickhouse')
|
||||||
|
|
||||||
|
@ -215,19 +216,31 @@ class RedisStorage(Storage, metaclass=SingletonMeta):
|
||||||
# Block process to be single threaded. Default sync delay is 10 * default sync delay.
|
# Block process to be single threaded. Default sync delay is 10 * default sync delay.
|
||||||
# It can be changed for model, by passing `lock_timeout` argument to pre_sync
|
# It can be changed for model, by passing `lock_timeout` argument to pre_sync
|
||||||
lock = self.get_lock(import_key, **kwargs)
|
lock = self.get_lock(import_key, **kwargs)
|
||||||
|
current_host_name = gethostname()
|
||||||
lock_pid_key = self.REDIS_KEY_LOCK_PID.format(import_key=import_key)
|
lock_pid_key = self.REDIS_KEY_LOCK_PID.format(import_key=import_key)
|
||||||
try:
|
try:
|
||||||
lock.acquire()
|
lock.acquire()
|
||||||
self._redis.set(lock_pid_key, os.getpid())
|
self._redis.set(lock_pid_key, '%s:%s' % (current_host_name, os.getpid()))
|
||||||
except RedisLockTimeoutError:
|
except RedisLockTimeoutError:
|
||||||
statsd.incr('%s.sync.%s.lock.timeout' % (config.STATSD_PREFIX, import_key))
|
statsd.incr('%s.sync.%s.lock.timeout' % (config.STATSD_PREFIX, import_key))
|
||||||
|
|
||||||
# Lock is busy. But If the process has been killed, I don't want to wait any more.
|
# Lock is busy. But If the process has been killed, I don't want to wait any more.
|
||||||
# Let's check if pid exists
|
# I assume that lock has been killed if it works on the same host (other than localhost)
|
||||||
pid = int(self._redis.get(lock_pid_key) or 0)
|
# and there is no process alive.
|
||||||
if pid and not check_pid(pid):
|
# I also assume that there are no hosts with same hostname other than localhost.
|
||||||
|
# Note: previously value contained only pid. Let's support old value for back compatibility
|
||||||
|
active_lock_data = self._redis.get(lock_pid_key).split(b":")
|
||||||
|
active_pid = int(active_lock_data[-1] or 0)
|
||||||
|
active_host_name = active_lock_data[0] \
|
||||||
|
if len(active_lock_data) > 1 and active_lock_data[0] != "localhost" else None
|
||||||
|
|
||||||
|
if (
|
||||||
|
active_pid and active_host_name
|
||||||
|
and active_host_name == current_host_name and not check_pid_exists(active_pid)
|
||||||
|
):
|
||||||
statsd.incr('%s.sync.%s.lock.hard_release' % (config.STATSD_PREFIX, import_key))
|
statsd.incr('%s.sync.%s.lock.hard_release' % (config.STATSD_PREFIX, import_key))
|
||||||
logger.warning('django-clickhouse: hard releasing lock "%s" locked by pid %d (process is dead)'
|
logger.warning('django-clickhouse: hard releasing lock "%s" locked by pid %d (process is dead)'
|
||||||
% (import_key, pid))
|
% (import_key, active_pid))
|
||||||
self._redis.delete(lock_pid_key)
|
self._redis.delete(lock_pid_key)
|
||||||
lock.hard_release()
|
lock.hard_release()
|
||||||
self.pre_sync(import_key, **kwargs)
|
self.pre_sync(import_key, **kwargs)
|
||||||
|
|
|
@ -127,7 +127,7 @@ def model_to_dict(instance: DjangoModel, fields: Optional[Iterable[str]] = None
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
|
||||||
def check_pid(pid):
|
def check_pid_exists(pid):
|
||||||
"""
|
"""
|
||||||
Check For the existence of a unix pid.
|
Check For the existence of a unix pid.
|
||||||
"""
|
"""
|
||||||
|
|
Loading…
Reference in New Issue
Block a user