fix test_freeze

This commit is contained in:
Itai Shirav 2017-08-20 09:46:00 +03:00
parent 59a4f1cecc
commit 06ed53e4ec

View File

@ -1,6 +1,5 @@
import unittest
from datetime import date
from time import sleep
import os
import shutil
from infi.clickhouse_orm.database import Database
@ -11,7 +10,8 @@ from infi.clickhouse_orm.system_models import SystemPart
class SystemPartTest(unittest.TestCase):
BACKUP_DIR = '/opt/clickhouse/shadow/'
BACKUP_DIRS = ['/var/lib/clickhouse/shadow', '/opt/clickhouse/shadow/']
def setUp(self):
self.database = Database('test-db')
@ -22,10 +22,11 @@ class SystemPartTest(unittest.TestCase):
self.database.drop_database()
def _get_backups(self):
if not os.path.exists(self.BACKUP_DIR):
return []
_, dirnames, _ = next(os.walk(self.BACKUP_DIR))
for dir in self.BACKUP_DIRS:
if os.path.exists(dir):
_, dirnames, _ = next(os.walk(dir))
return dirnames
raise unittest.SkipTest('Cannot find backups dir')
def test_get_all(self):
parts = SystemPart.get(self.database)
@ -61,11 +62,8 @@ class SystemPartTest(unittest.TestCase):
# There can be other backups in the folder
prev_backups = set(self._get_backups())
parts[0].freeze()
sleep(1)
backups = set(self._get_backups())
self.assertEqual(len(backups), len(prev_backups) + 1)
# Clean created backup
shutil.rmtree(self.BACKUP_DIR + '{0}'.format(list(backups - prev_backups)[0]))
def test_fetch(self):
# TODO Not tested, as I have no replication set