From 3e1908c55d9c8aa3c92ab65c5754ff8a873187ba Mon Sep 17 00:00:00 2001 From: Christian Zagrodnick Date: Tue, 19 Mar 2013 11:41:23 +0100 Subject: [PATCH 1/3] retab to 8 whitespace characters, remove superflous whitespace --- ZPsycopgDA/db.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/ZPsycopgDA/db.py b/ZPsycopgDA/db.py index b594b3fd..37e6f9d5 100644 --- a/ZPsycopgDA/db.py +++ b/ZPsycopgDA/db.py @@ -26,7 +26,7 @@ import pool import psycopg2 from psycopg2.extensions import INTEGER, LONGINTEGER, FLOAT, BOOLEAN, DATE, TIME from psycopg2.extensions import TransactionRollbackError, register_type -from psycopg2 import NUMBER, STRING, ROWID, DATETIME +from psycopg2 import NUMBER, STRING, ROWID, DATETIME # the DB object, managing all the real query work @@ -107,17 +107,17 @@ class DB(TM, dbi_db.DB): def make_mappings(self): """Generate the mappings used later by self.convert_description().""" self.type_mappings = {} - for t, s in [(INTEGER,'i'), (LONGINTEGER, 'i'), (NUMBER, 'n'), - (BOOLEAN,'n'), (ROWID, 'i'), - (DATETIME, 'd'), (DATE, 'd'), (TIME, 'd')]: + for t, s in [(INTEGER,'i'), (LONGINTEGER, 'i'), (NUMBER, 'n'), + (BOOLEAN,'n'), (ROWID, 'i'), + (DATETIME, 'd'), (DATE, 'd'), (TIME, 'd')]: for v in t.values: - self.type_mappings[v] = (t, s) + self.type_mappings[v] = (t, s) def convert_description(self, desc, use_psycopg_types=False): """Convert DBAPI-2.0 description field to Zope format.""" items = [] for name, typ, width, ds, p, scale, null_ok in desc: - m = self.type_mappings.get(typ, (STRING, 's')) + m = self.type_mappings.get(typ, (STRING, 's')) items.append({ 'name': name, 'type': use_psycopg_types and m[0] or m[1], @@ -158,7 +158,7 @@ class DB(TM, dbi_db.DB): return () self.putconn() return self.convert_description(c.description, True) - + ## query execution ## def query(self, query_string, max_rows=None, query_data=None): @@ -205,5 +205,5 @@ class DB(TM, dbi_db.DB): except StandardError, err: self._abort() raise err - + return self.convert_description(desc), res From 5920d4f25dc5952ef32a4c4ceca93a75dc7f0fe0 Mon Sep 17 00:00:00 2001 From: Christian Zagrodnick Date: Tue, 19 Mar 2013 12:40:49 +0100 Subject: [PATCH 2/3] Fixed multi-thread connection initialization for ZPsycopgDA. The connection initialization (transaction isolation, typecasts) was only done for first connection. When there are multiple threads running in parallel, connections where used which had not been initialized correctly. --- ZPsycopgDA/test_da.py | 42 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) create mode 100644 ZPsycopgDA/test_da.py diff --git a/ZPsycopgDA/test_da.py b/ZPsycopgDA/test_da.py new file mode 100644 index 00000000..83197671 --- /dev/null +++ b/ZPsycopgDA/test_da.py @@ -0,0 +1,42 @@ +# zopectl run script to test the DA/threading behavior +# +# Usage: bin/zopectl run test_da.py "dbname=xxx" +# +from Products.ZPsycopgDA.DA import ZDATETIME +from Products.ZPsycopgDA.db import DB +import sys +import threading + + +dsn = sys.argv[1] + + +typecasts = [ZDATETIME] + + +def DA_connect(): + db = DB(dsn, tilevel=2, typecasts=typecasts) + db.open() + return db + + +def assert_casts(conn, name): + connection = conn.getcursor().connection + if (connection.string_types == + {1114: ZDATETIME, 1184: ZDATETIME}): + print '%s pass\n' % name + else: + print '%s fail (%s)\n' % (name, connection.string_types) + + +def test_connect(name): + assert_casts(conn1, name) + + +conn1 = DA_connect() +t1 = threading.Thread(target=test_connect, args=('t1',)) +t1.start() +t2 = threading.Thread(target=test_connect, args=('t2',)) +t2.start() +t1.join() +t2.join() From b4d22f42bd1e14429f3d82a7008371c28cbef959 Mon Sep 17 00:00:00 2001 From: Christian Zagrodnick Date: Tue, 19 Mar 2013 12:40:49 +0100 Subject: [PATCH 3/3] Fixed multi-thread connection initialization for ZPsycopgDA. The connection initialization (transaction isolation, typecasts) was only done for first connection. When there are multiple threads running in parallel, connections where used which had not been initialized correctly. --- NEWS | 5 +++++ ZPsycopgDA/db.py | 36 +++++++++++++++++------------------- ZPsycopgDA/pool.py | 17 ++++++++++------- ZPsycopgDA/test_da.py | 42 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 74 insertions(+), 26 deletions(-) create mode 100644 ZPsycopgDA/test_da.py diff --git a/NEWS b/NEWS index 2dfe49d5..3d1de53e 100644 --- a/NEWS +++ b/NEWS @@ -1,3 +1,8 @@ +What's new in psycopg XXXXX +--------------------------- + +- Fixed multi-thread connection initialization for ZPsycopgDA. + What's new in psycopg 2.4.6 --------------------------- diff --git a/ZPsycopgDA/db.py b/ZPsycopgDA/db.py index 37e6f9d5..c48197de 100644 --- a/ZPsycopgDA/db.py +++ b/ZPsycopgDA/db.py @@ -47,36 +47,34 @@ class DB(TM, dbi_db.DB): self.calls = 0 self.make_mappings() - def getconn(self, init=True): + def getconn(self): # if init is False we are trying to get hold on an already existing # connection, so we avoid to (re)initialize it risking errors. - conn = pool.getconn(self.dsn) - if init: - # use set_session where available as in these versions - # set_isolation_level generates an extra query. - if psycopg2.__version__ >= '2.4.2': - conn.set_session(isolation_level=int(self.tilevel)) - else: - conn.set_isolation_level(int(self.tilevel)) - conn.set_client_encoding(self.encoding) - for tc in self.typecasts: - register_type(tc, conn) + conn = pool.getconn(self.dsn, init=self.init_conn) return conn + def init_conn(self, conn): + # use set_session where available as in these versions + # set_isolation_level generates an extra query. + if psycopg2.__version__ >= '2.4.2': + conn.set_session(isolation_level=int(self.tilevel)) + else: + conn.set_isolation_level(int(self.tilevel)) + conn.set_client_encoding(self.encoding) + for tc in self.typecasts: + register_type(tc, conn) + def putconn(self, close=False): - try: - conn = pool.getconn(self.dsn, False) - except AttributeError: - pass + conn = pool.getconn(self.dsn, create_pool=False, init=self.init_conn) pool.putconn(self.dsn, conn, close) def getcursor(self): - conn = self.getconn(False) + conn = self.getconn() return conn.cursor() def _finish(self, *ignored): try: - conn = self.getconn(False) + conn = self.getconn() conn.commit() self.putconn() except AttributeError: @@ -84,7 +82,7 @@ class DB(TM, dbi_db.DB): def _abort(self, *ignored): try: - conn = self.getconn(False) + conn = self.getconn() conn.rollback() self.putconn() except AttributeError: diff --git a/ZPsycopgDA/pool.py b/ZPsycopgDA/pool.py index b47f46cc..135527c5 100644 --- a/ZPsycopgDA/pool.py +++ b/ZPsycopgDA/pool.py @@ -26,7 +26,7 @@ from psycopg2.pool import PoolError class AbstractConnectionPool(object): """Generic key-based pooling code.""" - def __init__(self, minconn, maxconn, *args, **kwargs): + def __init__(self, minconn, maxconn, init, *args, **kwargs): """Initialize the connection pool. New 'minconn' connections are created immediately calling 'connfunc' @@ -35,6 +35,7 @@ class AbstractConnectionPool(object): """ self.minconn = minconn self.maxconn = maxconn + self.init = init self.closed = False self._args = args @@ -56,6 +57,8 @@ class AbstractConnectionPool(object): self._rused[id(conn)] = key else: self._pool.append(conn) + if self.init: + self.init(conn) return conn def _getkey(self): @@ -125,11 +128,11 @@ class PersistentConnectionPool(AbstractConnectionPool): single connection from the pool. """ - def __init__(self, minconn, maxconn, *args, **kwargs): + def __init__(self, minconn, maxconn, init, *args, **kwargs): """Initialize the threading lock.""" import threading AbstractConnectionPool.__init__( - self, minconn, maxconn, *args, **kwargs) + self, minconn, maxconn, init, *args, **kwargs) self._lock = threading.Lock() # we we'll need the thread module, to determine thread ids, so we @@ -168,12 +171,12 @@ class PersistentConnectionPool(AbstractConnectionPool): _connections_pool = {} _connections_lock = threading.Lock() -def getpool(dsn, create=True): +def getpool(dsn, create=True, init=None): _connections_lock.acquire() try: if not _connections_pool.has_key(dsn) and create: _connections_pool[dsn] = \ - PersistentConnectionPool(4, 200, dsn) + PersistentConnectionPool(4, 200, init, dsn) finally: _connections_lock.release() return _connections_pool[dsn] @@ -186,8 +189,8 @@ def flushpool(dsn): finally: _connections_lock.release() -def getconn(dsn, create=True): - return getpool(dsn, create=create).getconn() +def getconn(dsn, create_pool=True, init=None): + return getpool(dsn, create=create_pool, init=init).getconn() def putconn(dsn, conn, close=False): getpool(dsn).putconn(conn, close=close) diff --git a/ZPsycopgDA/test_da.py b/ZPsycopgDA/test_da.py new file mode 100644 index 00000000..83197671 --- /dev/null +++ b/ZPsycopgDA/test_da.py @@ -0,0 +1,42 @@ +# zopectl run script to test the DA/threading behavior +# +# Usage: bin/zopectl run test_da.py "dbname=xxx" +# +from Products.ZPsycopgDA.DA import ZDATETIME +from Products.ZPsycopgDA.db import DB +import sys +import threading + + +dsn = sys.argv[1] + + +typecasts = [ZDATETIME] + + +def DA_connect(): + db = DB(dsn, tilevel=2, typecasts=typecasts) + db.open() + return db + + +def assert_casts(conn, name): + connection = conn.getcursor().connection + if (connection.string_types == + {1114: ZDATETIME, 1184: ZDATETIME}): + print '%s pass\n' % name + else: + print '%s fail (%s)\n' % (name, connection.string_types) + + +def test_connect(name): + assert_casts(conn1, name) + + +conn1 = DA_connect() +t1 = threading.Thread(target=test_connect, args=('t1',)) +t1.start() +t2 = threading.Thread(target=test_connect, args=('t2',)) +t2.start() +t1.join() +t2.join()