Support Distributed DDL create table queries

This commit is contained in:
Jake Lazarus 2021-01-05 15:07:49 -08:00
parent 7c90c1e4c3
commit caac044257
3 changed files with 18 additions and 12 deletions

View File

@ -145,6 +145,9 @@ Using the `Database` instance you can create a table for your model, and insert
db.create_table(Person) db.create_table(Person)
db.insert([dan, suzy]) db.insert([dan, suzy])
Including the `cluster` parameter in `create_table` uses a [Distributed DDL](https://clickhouse.tech/docs/en/sql-reference/distributed-ddl/) to create the table on
the cluster.
The `insert` method can take any iterable of model instances, but they all must belong to the same model class. The `insert` method can take any iterable of model instances, but they all must belong to the same model class.

View File

@ -138,7 +138,7 @@ class Database(object):
self._send('DROP DATABASE `%s`' % self.db_name) self._send('DROP DATABASE `%s`' % self.db_name)
self.db_exists = False self.db_exists = False
def create_table(self, model_class): def create_table(self, model_class, cluster=None):
''' '''
Creates a table for the given model class, if it does not exist already. Creates a table for the given model class, if it does not exist already.
''' '''
@ -146,7 +146,7 @@ class Database(object):
raise DatabaseException("You can't create system table") raise DatabaseException("You can't create system table")
if getattr(model_class, 'engine') is None: if getattr(model_class, 'engine') is None:
raise DatabaseException("%s class must define an engine" % model_class.__name__) raise DatabaseException("%s class must define an engine" % model_class.__name__)
self._send(model_class.create_table_sql(self)) self._send(model_class.create_table_sql(self, cluster=cluster))
def drop_table(self, model_class): def drop_table(self, model_class):
''' '''

View File

@ -348,11 +348,12 @@ class Model(metaclass=ModelBase):
return cls._has_funcs_as_defaults return cls._has_funcs_as_defaults
@classmethod @classmethod
def create_table_sql(cls, db): def create_table_sql(cls, db, cluster):
''' '''
Returns the SQL statement for creating a table for this model. Returns the SQL statement for creating a table for this model.
''' '''
parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` (' % (db.db_name, cls.table_name())] parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` %s (' % (
db.db_name, cls.table_name(), 'ON CLUSTER `%s`' % (cluster) if cluster else '')]
# Fields # Fields
items = [] items = []
for name, field in cls.fields().items(): for name, field in cls.fields().items():
@ -483,12 +484,13 @@ class Model(metaclass=ModelBase):
class BufferModel(Model): class BufferModel(Model):
@classmethod @classmethod
def create_table_sql(cls, db): def create_table_sql(cls, db, cluster):
''' '''
Returns the SQL statement for creating a table for this model. Returns the SQL statement for creating a table for this model.
''' '''
parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` AS `%s`.`%s`' % (db.db_name, cls.table_name(), db.db_name, parts = ['CREATE TABLE IF NOT EXISTS `{0}`.`{1}` {2} AS `{0}`.`{3}`'.format(
cls.engine.main_model.table_name())] db.db_name, cls.table_name(), 'ON CLUSTER `%s`' % (cluster) if cluster else '',
cls.engine.main_model.table_name())]
engine_str = cls.engine.create_table_sql(db) engine_str = cls.engine.create_table_sql(db)
parts.append(engine_str) parts.append(engine_str)
return ' '.join(parts) return ' '.join(parts)
@ -506,12 +508,13 @@ class MergeModel(Model):
_table = StringField(readonly=True) _table = StringField(readonly=True)
@classmethod @classmethod
def create_table_sql(cls, db): def create_table_sql(cls, db, cluster):
''' '''
Returns the SQL statement for creating a table for this model. Returns the SQL statement for creating a table for this model.
''' '''
assert isinstance(cls.engine, Merge), "engine must be an instance of engines.Merge" assert isinstance(cls.engine, Merge), "engine must be an instance of engines.Merge"
parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` (' % (db.db_name, cls.table_name())] parts = ['CREATE TABLE IF NOT EXISTS `%s`.`%s` %s (' % (
db.db_name, cls.table_name(), 'ON CLUSTER `%s`' % (cluster) if cluster else '')]
cols = [] cols = []
for name, field in cls.fields().items(): for name, field in cls.fields().items():
if name != '_table': if name != '_table':
@ -590,7 +593,7 @@ class DistributedModel(Model):
cls.engine.table = storage_models[0] cls.engine.table = storage_models[0]
@classmethod @classmethod
def create_table_sql(cls, db): def create_table_sql(cls, db, cluster):
''' '''
Returns the SQL statement for creating a table for this model. Returns the SQL statement for creating a table for this model.
''' '''
@ -599,8 +602,8 @@ class DistributedModel(Model):
cls.fix_engine_table() cls.fix_engine_table()
parts = [ parts = [
'CREATE TABLE IF NOT EXISTS `{0}`.`{1}` AS `{0}`.`{2}`'.format( 'CREATE TABLE IF NOT EXISTS `{0}`.`{1}` {3} AS `{0}`.`{2}`'.format(
db.db_name, cls.table_name(), cls.engine.table_name), db.db_name, cls.table_name(), cls.engine.table_name, 'ON CLUSTER `%s`' % (cluster) if cluster else ''),
'ENGINE = ' + cls.engine.create_table_sql(db)] 'ENGINE = ' + cls.engine.create_table_sql(db)]
return '\n'.join(parts) return '\n'.join(parts)