From a84d0af9593c5e7de1a2914eecb13267279dcbb4 Mon Sep 17 00:00:00 2001 From: sw <935405794@qq.com> Date: Sun, 29 May 2022 18:30:48 +0800 Subject: [PATCH] Support for temporary tables --- src/clickhouse_orm/database.py | 3 ++ src/clickhouse_orm/engines.py | 8 ++++-- src/clickhouse_orm/models.py | 51 ++++++++++++++++++++++++++++++++-- src/clickhouse_orm/session.py | 3 +- 4 files changed, 58 insertions(+), 7 deletions(-) diff --git a/src/clickhouse_orm/database.py b/src/clickhouse_orm/database.py index 332568d..4698a5c 100644 --- a/src/clickhouse_orm/database.py +++ b/src/clickhouse_orm/database.py @@ -156,6 +156,7 @@ class Database(object): raise DatabaseException("You can't create system table") if getattr(model_class, 'engine') is None: raise DatabaseException("%s class must define an engine" % model_class.__name__) + print(model_class.create_table_sql(self)) self._send(model_class.create_table_sql(self)) def drop_table(self, model_class: Type[MODEL]) -> None: @@ -435,6 +436,8 @@ class Database(object): if model_class: if model_class.is_system_model(): mapping['table'] = "`system`.`%s`" % model_class.table_name() + elif model_class.is_temporary_model(): + mapping['table'] = "`%s`" % model_class.table_name() else: mapping['table'] = "`%s`.`%s`" % (self.db_name, model_class.table_name()) query = Template(query).safe_substitute(mapping) diff --git a/src/clickhouse_orm/engines.py b/src/clickhouse_orm/engines.py index 285a848..4938932 100644 --- a/src/clickhouse_orm/engines.py +++ b/src/clickhouse_orm/engines.py @@ -92,9 +92,11 @@ class MergeTree(Engine): elif not self.date_col: # Can't import it globally due to circular import from clickhouse_orm.database import DatabaseException - raise DatabaseException("Custom partitioning is not supported before ClickHouse 1.1.54310. " - "Please update your server or use date_col syntax." - "https://clickhouse.tech/docs/en/table_engines/custom_partitioning_key/") + raise DatabaseException( + "Custom partitioning is not supported before ClickHouse 1.1.54310. " + "Please update your server or use date_col syntax." + "https://clickhouse.tech/docs/en/table_engines/custom_partitioning_key/" + ) else: partition_sql = '' diff --git a/src/clickhouse_orm/models.py b/src/clickhouse_orm/models.py index 4a3d3ec..b6eeab9 100644 --- a/src/clickhouse_orm/models.py +++ b/src/clickhouse_orm/models.py @@ -11,7 +11,7 @@ from .fields import Field, StringField from .utils import parse_tsv, NO_VALUE, get_subclass_names, arg_to_sql from .query import QuerySet from .funcs import F -from .engines import Merge, Distributed +from .engines import Merge, Distributed, Memory logger = getLogger('clickhouse_orm') @@ -273,6 +273,8 @@ class Model(metaclass=ModelBase): # Create table, drop table, insert operations are restricted for system models _system = False + _temporary = False + _database = None _fields: Dict[str, Field] @@ -483,6 +485,13 @@ class Model(metaclass=ModelBase): """ return cls._system + @classmethod + def is_temporary_model(cls): + """ + Returns true if the model represents a temporary table. + """ + return cls._temporary + class BufferModel(Model): @@ -603,8 +612,6 @@ class DistributedModel(Model): """ assert isinstance(cls.engine, Distributed), "engine must be engines.Distributed instance" - cls.fix_engine_table() - parts = [ 'CREATE TABLE IF NOT EXISTS `{0}`.`{1}` AS `{0}`.`{2}`'.format( db.db_name, cls.table_name(), cls.engine.table_name), @@ -612,6 +619,44 @@ class DistributedModel(Model): return '\n'.join(parts) +class TemporaryModel(Model): + """Temporary Tables + + Temporary tables disappear when the session ends, including if the connection is lost. + A temporary table uses the Memory engine only. + The DB can’t be specified for a temporary table. It is created outside of databases. + Impossible to create a temporary table with distributed DDL query on all cluster servers + (by using ON CLUSTER): this table exists only in the current session. + If a temporary table has the same name as another one and a query specifies the table name + without specifying the DB, the temporary table will be used. + For distributed query processing, temporary tables used in a query are passed to remote servers. + + https://clickhouse.com/docs/en/sql-reference/statements/create/table/#temporary-tables + """ + _temporary = True + + @classmethod + def create_table_sql(cls, db) -> str: + assert isinstance(cls.engine, Memory), "engine must be engines.Memory instance" + + parts = ['CREATE TEMPORARY TABLE IF NOT EXISTS `%s` (' % cls.table_name()] + # Fields + items = [] + for name, field in cls.fields().items(): + items.append(' %s %s' % (name, field.get_sql(db=db))) + # Constraints + for c in cls._constraints.values(): + items.append(' %s' % c.create_table_sql()) + # Indexes + for i in cls._indexes.values(): + items.append(' %s' % i.create_table_sql()) + parts.append(',\n'.join(items)) + # Engine + parts.append(')') + parts.append('ENGINE = Memory') + return '\n'.join(parts) + + # Expose only relevant classes in import * MODEL = TypeVar('MODEL', bound=Model) __all__ = get_subclass_names(locals(), (Model, Constraint, Index)) diff --git a/src/clickhouse_orm/session.py b/src/clickhouse_orm/session.py index b6062c0..ccd99bf 100644 --- a/src/clickhouse_orm/session.py +++ b/src/clickhouse_orm/session.py @@ -13,9 +13,10 @@ class SessionContext: self.token1 = None self.token2 = None - def __enter__(self): + def __enter__(self) -> str: self.token1 = ctx_session_id.set(self.session) self.token2 = ctx_session_timeout.set(self.timeout) + return self.session def __exit__(self, exc_type, exc_val, exc_tb): ctx_session_id.reset(self.token1)