mirror of
https://github.com/Infinidat/infi.clickhouse_orm.git
synced 2025-08-02 19:20:14 +03:00
Chore: improve code quality for unit tests
This commit is contained in:
parent
bd62a3c1de
commit
c4e2175db3
|
@ -42,6 +42,7 @@ pytest = "^6.2.4"
|
|||
flake8-isort = "^4.0.0"
|
||||
black = "^21.7b0"
|
||||
isort = "^5.9.2"
|
||||
freezegun = "^1.1.0"
|
||||
|
||||
[build-system]
|
||||
requires = ["poetry-core>=1.0.0"]
|
||||
|
|
|
@ -7,10 +7,11 @@ from ipaddress import IPv4Address, IPv6Address
|
|||
import pytz
|
||||
|
||||
from clickhouse_orm.database import ServerError
|
||||
from clickhouse_orm.fields import DateTimeField
|
||||
from clickhouse_orm.funcs import F
|
||||
from clickhouse_orm.utils import NO_VALUE
|
||||
|
||||
from .base_test_with_data import *
|
||||
from .base_test_with_data import Person, TestCaseWithData
|
||||
from .test_querysets import SampleModel
|
||||
|
||||
|
||||
|
@ -28,15 +29,12 @@ class FuncsTestCase(TestCaseWithData):
|
|||
self.assertEqual(count, expected_count)
|
||||
self.assertEqual(qs.count(), expected_count)
|
||||
|
||||
def _test_func(self, func, expected_value=NO_VALUE):
|
||||
def _call_func(self, func):
|
||||
sql = "SELECT %s AS value" % func.to_sql()
|
||||
logging.info(sql)
|
||||
try:
|
||||
result = list(self.database.select(sql))
|
||||
logging.info("\t==> %s", result[0].value if result else "<empty>")
|
||||
if expected_value != NO_VALUE:
|
||||
print("Comparing %s to %s" % (result[0].value, expected_value))
|
||||
self.assertEqual(result[0].value, expected_value)
|
||||
return result[0].value if result else None
|
||||
except ServerError as e:
|
||||
if "Unknown function" in str(e):
|
||||
|
@ -44,6 +42,14 @@ class FuncsTestCase(TestCaseWithData):
|
|||
return # ignore functions that don't exist in the used ClickHouse version
|
||||
raise
|
||||
|
||||
def _test_func(self, func, expected_value=NO_VALUE):
|
||||
result = self._call_func(func)
|
||||
if expected_value != NO_VALUE:
|
||||
print("Comparing %s to %s" % (result, expected_value))
|
||||
self.assertEqual(result, expected_value)
|
||||
|
||||
return result if result else None
|
||||
|
||||
def _test_aggr(self, func, expected_value=NO_VALUE):
|
||||
qs = Person.objects_in(self.database).aggregate(value=func)
|
||||
logging.info(qs.as_sql())
|
||||
|
@ -313,7 +319,7 @@ class FuncsTestCase(TestCaseWithData):
|
|||
F.now() + F.toIntervalSecond(3000) - F.toIntervalDay(3000) == F.now() + timedelta(seconds=3000, days=-3000)
|
||||
)
|
||||
|
||||
def test_date_functions__utc_only(self):
|
||||
def test_date_functions_utc_only(self):
|
||||
if self.database.server_timezone != pytz.utc:
|
||||
raise unittest.SkipTest("This test must run with UTC as the server timezone")
|
||||
d = date(2018, 12, 31)
|
||||
|
@ -325,9 +331,6 @@ class FuncsTestCase(TestCaseWithData):
|
|||
self._test_func(F.toTime(dt, "Europe/Athens"), athens_tz.localize(datetime(1970, 1, 2, 13, 22, 33)))
|
||||
self._test_func(F.toTime(dt, athens_tz), athens_tz.localize(datetime(1970, 1, 2, 13, 22, 33)))
|
||||
self._test_func(F.toTimeZone(dt, "Europe/Athens"), athens_tz.localize(datetime(2018, 12, 31, 13, 22, 33)))
|
||||
self._test_func(
|
||||
F.now(), datetime.utcnow().replace(tzinfo=pytz.utc, microsecond=0)
|
||||
) # FIXME this may fail if the timing is just right
|
||||
self._test_func(F.today(), datetime.utcnow().date())
|
||||
self._test_func(F.yesterday(), datetime.utcnow().date() - timedelta(days=1))
|
||||
self._test_func(F.toYYYYMMDDhhmmss(dt), 20181231112233)
|
||||
|
@ -335,6 +338,10 @@ class FuncsTestCase(TestCaseWithData):
|
|||
self._test_func(F.addHours(d, 7), datetime(2018, 12, 31, 7, 0, 0, tzinfo=pytz.utc))
|
||||
self._test_func(F.addMinutes(d, 7), datetime(2018, 12, 31, 0, 7, 0, tzinfo=pytz.utc))
|
||||
|
||||
actual = self._call_func(F.now())
|
||||
expected = datetime.utcnow().replace(tzinfo=pytz.utc, microsecond=0)
|
||||
self.assertLess((actual - expected).total_seconds(), 1e-3)
|
||||
|
||||
def test_type_conversion_functions(self):
|
||||
for f in (
|
||||
F.toUInt8,
|
||||
|
|
|
@ -5,10 +5,13 @@ from enum import Enum
|
|||
from logging import getLogger
|
||||
|
||||
from clickhouse_orm.database import Database
|
||||
from clickhouse_orm.engines import CollapsingMergeTree, Memory, MergeTree
|
||||
from clickhouse_orm.fields import DateField, DateTimeField, Enum8Field, Int8Field, Int32Field, UInt64Field
|
||||
from clickhouse_orm.funcs import F
|
||||
from clickhouse_orm.models import Model
|
||||
from clickhouse_orm.query import Q
|
||||
|
||||
from .base_test_with_data import *
|
||||
from .base_test_with_data import Person, TestCaseWithData, data
|
||||
|
||||
logger = getLogger("tests")
|
||||
|
||||
|
|
|
@ -27,9 +27,7 @@ class ReadonlyTestCase(TestCaseWithData):
|
|||
except ServerError as e:
|
||||
if e.code == 192 and str(e).startswith("Unknown user"): # ClickHouse version < 20.3
|
||||
raise unittest.SkipTest('Database user "%s" is not defined' % username)
|
||||
elif e.code == 516 and str(e).startswith(
|
||||
"readonly: Authentication failed"
|
||||
): # ClickHouse version >= 20.3
|
||||
elif e.code == 516 and str(e).startswith("readonly: Authentication failed"): # ClickHouse version >= 20.3
|
||||
raise unittest.SkipTest('Database user "%s" is not defined' % username)
|
||||
else:
|
||||
raise
|
||||
|
|
|
@ -3,8 +3,8 @@ import unittest
|
|||
from datetime import date
|
||||
|
||||
from clickhouse_orm.database import Database, DatabaseException
|
||||
from clickhouse_orm.engines import *
|
||||
from clickhouse_orm.fields import *
|
||||
from clickhouse_orm.engines import MergeTree
|
||||
from clickhouse_orm.fields import DateField, UInt32Field
|
||||
from clickhouse_orm.models import Model
|
||||
from clickhouse_orm.system_models import SystemPart
|
||||
|
||||
|
@ -36,9 +36,9 @@ class SystemPartTest(unittest.TestCase):
|
|||
|
||||
def setUp(self):
|
||||
self.database = Database("test-db", log_statements=True)
|
||||
self.database.create_table(TestTable)
|
||||
self.database.create_table(SomeTestTable)
|
||||
self.database.create_table(CustomPartitionedTable)
|
||||
self.database.insert([TestTable(date_field=date.today())])
|
||||
self.database.insert([SomeTestTable(date_field=date.today())])
|
||||
self.database.insert([CustomPartitionedTable(date_field=date.today(), group_field=13)])
|
||||
|
||||
def tearDown(self):
|
||||
|
@ -69,7 +69,7 @@ class SystemPartTest(unittest.TestCase):
|
|||
self.assertEqual(len(parts), 1)
|
||||
|
||||
def test_get_conditions(self):
|
||||
parts = list(SystemPart.get(self.database, conditions="table='testtable'"))
|
||||
parts = list(SystemPart.get(self.database, conditions="table='sometesttable'"))
|
||||
self.assertEqual(len(parts), 1)
|
||||
parts = list(SystemPart.get(self.database, conditions=u"table='custompartitionedtable'"))
|
||||
self.assertEqual(len(parts), 1)
|
||||
|
@ -107,10 +107,10 @@ class SystemPartTest(unittest.TestCase):
|
|||
|
||||
def test_query(self):
|
||||
SystemPart.objects_in(self.database).count()
|
||||
list(SystemPart.objects_in(self.database).filter(table="testtable"))
|
||||
list(SystemPart.objects_in(self.database).filter(table="sometesttable"))
|
||||
|
||||
|
||||
class TestTable(Model):
|
||||
class SomeTestTable(Model):
|
||||
date_field = DateField()
|
||||
|
||||
engine = MergeTree("date_field", ("date_field",))
|
||||
|
|
|
@ -1,7 +1,16 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
from clickhouse_orm.utils import escape, unescape
|
||||
|
||||
SPECIAL_CHARS = {"\b": "\\x08", "\f": "\\x0c", "\r": "\\r", "\n": "\\n", "\t": "\\t", "\0": "\\x00", "\\": "\\\\", "'": "\\'"}
|
||||
SPECIAL_CHARS = {
|
||||
"\b": "\\x08",
|
||||
"\f": "\\x0c",
|
||||
"\r": "\\r",
|
||||
"\n": "\\n",
|
||||
"\t": "\\t",
|
||||
"\0": "\\x00",
|
||||
"\\": "\\\\",
|
||||
"'": "\\'",
|
||||
}
|
||||
|
||||
|
||||
def test_unescape():
|
||||
|
|
Loading…
Reference in New Issue
Block a user