Support for temporary tables

This commit is contained in:
sw 2022-05-29 18:30:48 +08:00
parent 7a58546669
commit a84d0af959
4 changed files with 58 additions and 7 deletions

View File

@ -156,6 +156,7 @@ class Database(object):
raise DatabaseException("You can't create system table") raise DatabaseException("You can't create system table")
if getattr(model_class, 'engine') is None: if getattr(model_class, 'engine') is None:
raise DatabaseException("%s class must define an engine" % model_class.__name__) raise DatabaseException("%s class must define an engine" % model_class.__name__)
print(model_class.create_table_sql(self))
self._send(model_class.create_table_sql(self)) self._send(model_class.create_table_sql(self))
def drop_table(self, model_class: Type[MODEL]) -> None: def drop_table(self, model_class: Type[MODEL]) -> None:
@ -435,6 +436,8 @@ class Database(object):
if model_class: if model_class:
if model_class.is_system_model(): if model_class.is_system_model():
mapping['table'] = "`system`.`%s`" % model_class.table_name() mapping['table'] = "`system`.`%s`" % model_class.table_name()
elif model_class.is_temporary_model():
mapping['table'] = "`%s`" % model_class.table_name()
else: else:
mapping['table'] = "`%s`.`%s`" % (self.db_name, model_class.table_name()) mapping['table'] = "`%s`.`%s`" % (self.db_name, model_class.table_name())
query = Template(query).safe_substitute(mapping) query = Template(query).safe_substitute(mapping)

View File

@ -92,9 +92,11 @@ class MergeTree(Engine):
elif not self.date_col: elif not self.date_col:
# Can't import it globally due to circular import # Can't import it globally due to circular import
from clickhouse_orm.database import DatabaseException from clickhouse_orm.database import DatabaseException
raise DatabaseException("Custom partitioning is not supported before ClickHouse 1.1.54310. " raise DatabaseException(
"Please update your server or use date_col syntax." "Custom partitioning is not supported before ClickHouse 1.1.54310. "
"https://clickhouse.tech/docs/en/table_engines/custom_partitioning_key/") "Please update your server or use date_col syntax."
"https://clickhouse.tech/docs/en/table_engines/custom_partitioning_key/"
)
else: else:
partition_sql = '' partition_sql = ''

View File

@ -11,7 +11,7 @@ from .fields import Field, StringField
from .utils import parse_tsv, NO_VALUE, get_subclass_names, arg_to_sql from .utils import parse_tsv, NO_VALUE, get_subclass_names, arg_to_sql
from .query import QuerySet from .query import QuerySet
from .funcs import F from .funcs import F
from .engines import Merge, Distributed from .engines import Merge, Distributed, Memory
logger = getLogger('clickhouse_orm') logger = getLogger('clickhouse_orm')
@ -273,6 +273,8 @@ class Model(metaclass=ModelBase):
# Create table, drop table, insert operations are restricted for system models # Create table, drop table, insert operations are restricted for system models
_system = False _system = False
_temporary = False
_database = None _database = None
_fields: Dict[str, Field] _fields: Dict[str, Field]
@ -483,6 +485,13 @@ class Model(metaclass=ModelBase):
""" """
return cls._system return cls._system
@classmethod
def is_temporary_model(cls):
"""
Returns true if the model represents a temporary table.
"""
return cls._temporary
class BufferModel(Model): class BufferModel(Model):
@ -603,8 +612,6 @@ class DistributedModel(Model):
""" """
assert isinstance(cls.engine, Distributed), "engine must be engines.Distributed instance" assert isinstance(cls.engine, Distributed), "engine must be engines.Distributed instance"
cls.fix_engine_table()
parts = [ parts = [
'CREATE TABLE IF NOT EXISTS `{0}`.`{1}` AS `{0}`.`{2}`'.format( 'CREATE TABLE IF NOT EXISTS `{0}`.`{1}` AS `{0}`.`{2}`'.format(
db.db_name, cls.table_name(), cls.engine.table_name), db.db_name, cls.table_name(), cls.engine.table_name),
@ -612,6 +619,44 @@ class DistributedModel(Model):
return '\n'.join(parts) return '\n'.join(parts)
class TemporaryModel(Model):
"""Temporary Tables
Temporary tables disappear when the session ends, including if the connection is lost.
A temporary table uses the Memory engine only.
The DB cant be specified for a temporary table. It is created outside of databases.
Impossible to create a temporary table with distributed DDL query on all cluster servers
(by using ON CLUSTER): this table exists only in the current session.
If a temporary table has the same name as another one and a query specifies the table name
without specifying the DB, the temporary table will be used.
For distributed query processing, temporary tables used in a query are passed to remote servers.
https://clickhouse.com/docs/en/sql-reference/statements/create/table/#temporary-tables
"""
_temporary = True
@classmethod
def create_table_sql(cls, db) -> str:
assert isinstance(cls.engine, Memory), "engine must be engines.Memory instance"
parts = ['CREATE TEMPORARY TABLE IF NOT EXISTS `%s` (' % cls.table_name()]
# Fields
items = []
for name, field in cls.fields().items():
items.append(' %s %s' % (name, field.get_sql(db=db)))
# Constraints
for c in cls._constraints.values():
items.append(' %s' % c.create_table_sql())
# Indexes
for i in cls._indexes.values():
items.append(' %s' % i.create_table_sql())
parts.append(',\n'.join(items))
# Engine
parts.append(')')
parts.append('ENGINE = Memory')
return '\n'.join(parts)
# Expose only relevant classes in import * # Expose only relevant classes in import *
MODEL = TypeVar('MODEL', bound=Model) MODEL = TypeVar('MODEL', bound=Model)
__all__ = get_subclass_names(locals(), (Model, Constraint, Index)) __all__ = get_subclass_names(locals(), (Model, Constraint, Index))

View File

@ -13,9 +13,10 @@ class SessionContext:
self.token1 = None self.token1 = None
self.token2 = None self.token2 = None
def __enter__(self): def __enter__(self) -> str:
self.token1 = ctx_session_id.set(self.session) self.token1 = ctx_session_id.set(self.session)
self.token2 = ctx_session_timeout.set(self.timeout) self.token2 = ctx_session_timeout.set(self.timeout)
return self.session
def __exit__(self, exc_type, exc_val, exc_tb): def __exit__(self, exc_type, exc_val, exc_tb):
ctx_session_id.reset(self.token1) ctx_session_id.reset(self.token1)