From 7a58546669aa9f91abeb93a16c1eb8827219a5f1 Mon Sep 17 00:00:00 2001 From: sw <935405794@qq.com> Date: Sun, 29 May 2022 17:50:01 +0800 Subject: [PATCH] Add session_id support for clients --- src/clickhouse_orm/database.py | 29 ++++++++++++++++++++++++----- src/clickhouse_orm/session.py | 27 +++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 5 deletions(-) create mode 100644 src/clickhouse_orm/session.py diff --git a/src/clickhouse_orm/database.py b/src/clickhouse_orm/database.py index 783a7e6..332568d 100644 --- a/src/clickhouse_orm/database.py +++ b/src/clickhouse_orm/database.py @@ -13,6 +13,7 @@ import requests from .models import ModelBase, MODEL from .utils import parse_tsv, import_submodules from .query import Q +from .session import ctx_session_id, ctx_session_timeout logger = logging.getLogger('clickhouse_orm') @@ -114,11 +115,13 @@ class Database(object): self.request_session.auth = (username, password or '') self.log_statements = log_statements self.settings = {} - self.db_exists = False # this is required before running _is_existing_database + self.db_exists = False # this is required before running _is_existing_database self.db_exists = self._is_existing_database() if readonly: if not self.db_exists: - raise DatabaseException('Database does not exist, and cannot be created under readonly connection') + raise DatabaseException( + 'Database does not exist, and cannot be created under readonly connection' + ) self.connection_readonly = self._is_connection_readonly() self.readonly = True elif autocreate and not self.db_exists: @@ -379,6 +382,19 @@ class Database(object): if int(name[:4]) >= up_to: break + @property + def session_id(self): + """return current client session_id""" + return ctx_session_id.get(None) + + @property + def _context_params(self): + """return context params""" + params = {} + if ctx_session_id.get(None): + params.update(session_id=self.session_id, session_timeout=ctx_session_timeout.get(60)) + return params + def _get_applied_migrations(self, migrations_package_name): from .migrations import MigrationHistory self.create_table(MigrationHistory) @@ -392,7 +408,9 @@ class Database(object): if self.log_statements: logger.info(data) params = self._build_params(settings) - r = self.request_session.post(self.db_url, params=params, data=data, stream=stream, timeout=self.timeout) + r = self.request_session.post( + self.db_url, params=params, data=data, stream=stream, timeout=self.timeout + ) if r.status_code != 200: raise ServerError(r.text) return r @@ -400,6 +418,7 @@ class Database(object): def _build_params(self, settings): params = dict(settings or {}) params.update(self.settings) + params.update(self._context_params) if self.db_exists: params['database'] = self.db_name # Send the readonly flag, unless the connection is already readonly (to prevent db error) @@ -408,9 +427,9 @@ class Database(object): return params def _substitute(self, query, model_class=None): - ''' + """ Replaces $db and $table placeholders in the query. - ''' + """ if '$' in query: mapping = dict(db="`%s`" % self.db_name) if model_class: diff --git a/src/clickhouse_orm/session.py b/src/clickhouse_orm/session.py new file mode 100644 index 0000000..b6062c0 --- /dev/null +++ b/src/clickhouse_orm/session.py @@ -0,0 +1,27 @@ +import uuid +from typing import Optional +from contextvars import ContextVar + +ctx_session_id: ContextVar[str] = ContextVar('ck.session_id') +ctx_session_timeout: ContextVar[int] = ContextVar('ck.session_timeout') + + +class SessionContext: + def __init__(self, session: str, timeout: int): + self.session = session + self.timeout = timeout + self.token1 = None + self.token2 = None + + def __enter__(self): + self.token1 = ctx_session_id.set(self.session) + self.token2 = ctx_session_timeout.set(self.timeout) + + def __exit__(self, exc_type, exc_val, exc_tb): + ctx_session_id.reset(self.token1) + ctx_session_timeout.reset(self.token2) + + +def in_session(session: Optional[str] = None, timeout: int = 60): + session = session or str(uuid.uuid4()) + return SessionContext(session, timeout)