mirror of
https://github.com/psycopg/psycopg2.git
synced 2024-11-11 03:26:37 +03:00
Merge branch 'zope-fixes' into devel
This commit is contained in:
commit
170636d46d
3
NEWS
3
NEWS
|
@ -27,7 +27,8 @@ What's new in psycopg 2.4.6
|
|||
(ticket #113).
|
||||
- 'register_hstore()', 'register_composite()', 'tpc_recover()' work with
|
||||
RealDictConnection and Cursor (ticket #114).
|
||||
- Fixed broken pool for Zope (tickets #123, #125).
|
||||
- Fixed broken pool for Zope and connections re-init across ZSQL methods
|
||||
in the same request (tickets #123, #125, #142).
|
||||
- connect() raises an exception instead of swallowing keyword arguments
|
||||
when a connection string is specified as well (ticket #131).
|
||||
- Discard any result produced by 'executemany()' (ticket #133).
|
||||
|
|
|
@ -16,7 +16,7 @@
|
|||
# their work without bothering about the module dependencies.
|
||||
|
||||
|
||||
ALLOWED_PSYCOPG_VERSIONS = ('2.4-beta1', '2.4-beta2', '2.4', '2.4.1', '2.4.2', '2.4.3', '2.4.4', '2.4.5')
|
||||
ALLOWED_PSYCOPG_VERSIONS = ('2.4', '2.4.1', '2.4.4', '2.4.5', '2.4.6')
|
||||
|
||||
import sys
|
||||
import time
|
||||
|
|
|
@ -32,7 +32,7 @@ from psycopg2 import NUMBER, STRING, ROWID, DATETIME
|
|||
# the DB object, managing all the real query work
|
||||
|
||||
class DB(TM, dbi_db.DB):
|
||||
|
||||
|
||||
_p_oid = _p_changed = _registered = None
|
||||
|
||||
def __init__(self, dsn, tilevel, typecasts, enc='utf-8'):
|
||||
|
@ -46,13 +46,18 @@ class DB(TM, dbi_db.DB):
|
|||
self.failures = 0
|
||||
self.calls = 0
|
||||
self.make_mappings()
|
||||
|
||||
|
||||
def getconn(self, init=True):
|
||||
# if init is False we are trying to get hold on an already existing
|
||||
# connection, so we avoid to (re)initialize it risking errors.
|
||||
conn = pool.getconn(self.dsn)
|
||||
if init:
|
||||
conn.set_isolation_level(int(self.tilevel))
|
||||
# use set_session where available as in these versions
|
||||
# set_isolation_level generates an extra query.
|
||||
if psycopg2.__version__ >= '2.4.2':
|
||||
conn.set_session(isolation_level=int(self.tilevel))
|
||||
else:
|
||||
conn.set_isolation_level(int(self.tilevel))
|
||||
conn.set_client_encoding(self.encoding)
|
||||
for tc in self.typecasts:
|
||||
register_type(tc, conn)
|
||||
|
@ -64,9 +69,9 @@ class DB(TM, dbi_db.DB):
|
|||
except AttributeError:
|
||||
pass
|
||||
pool.putconn(self.dsn, conn, close)
|
||||
|
||||
|
||||
def getcursor(self):
|
||||
conn = self.getconn()
|
||||
conn = self.getconn(False)
|
||||
return conn.cursor()
|
||||
|
||||
def _finish(self, *ignored):
|
||||
|
@ -76,7 +81,7 @@ class DB(TM, dbi_db.DB):
|
|||
self.putconn()
|
||||
except AttributeError:
|
||||
pass
|
||||
|
||||
|
||||
def _abort(self, *ignored):
|
||||
try:
|
||||
conn = self.getconn(False)
|
||||
|
@ -90,7 +95,7 @@ class DB(TM, dbi_db.DB):
|
|||
# then get and immediately release a connection
|
||||
self.getconn()
|
||||
self.putconn()
|
||||
|
||||
|
||||
def close(self):
|
||||
# FIXME: if this connection is closed we flush all the pool associated
|
||||
# with the current DSN; does this makes sense?
|
||||
|
|
|
@ -78,8 +78,10 @@ or DSN for short) is a string... (TODO: finish docs)
|
|||
</td>
|
||||
<td align="left" valign="top">
|
||||
<select name="tilevel:int">
|
||||
<option value="4">Read uncommitted</option>
|
||||
<option value="1">Read committed</option>
|
||||
<option value="2" selected="YES">Serializable</option>
|
||||
<option value="2" selected="YES">Repeatable read</option>
|
||||
<option value="3">Serializable</option>
|
||||
</select>
|
||||
</td>
|
||||
</tr>
|
||||
|
|
|
@ -44,11 +44,17 @@
|
|||
</td>
|
||||
<td align="left" valign="top">
|
||||
<select name="tilevel:int">
|
||||
<option value="4"
|
||||
<dtml-if expr="tilevel==4">selected="YES"</dtml-if>>
|
||||
Read uncommitted</option>
|
||||
<option value="1"
|
||||
<dtml-if expr="tilevel==1">selected="YES"</dtml-if>>
|
||||
Read committed</option>
|
||||
<option value="2"
|
||||
<dtml-if expr="tilevel==2">selected="YES"</dtml-if>>
|
||||
Repeatable read</option>
|
||||
<option value="3"
|
||||
<dtml-if expr="tilevel==3">selected="YES"</dtml-if>>
|
||||
Serializable</option>
|
||||
</select>
|
||||
</td>
|
||||
|
|
|
@ -19,7 +19,151 @@
|
|||
# ZPsycopgDA code in db.py.
|
||||
|
||||
import threading
|
||||
import psycopg2.pool
|
||||
import psycopg2
|
||||
from psycopg2.pool import PoolError
|
||||
|
||||
|
||||
class AbstractConnectionPool(object):
|
||||
"""Generic key-based pooling code."""
|
||||
|
||||
def __init__(self, minconn, maxconn, *args, **kwargs):
|
||||
"""Initialize the connection pool.
|
||||
|
||||
New 'minconn' connections are created immediately calling 'connfunc'
|
||||
with given parameters. The connection pool will support a maximum of
|
||||
about 'maxconn' connections.
|
||||
"""
|
||||
self.minconn = minconn
|
||||
self.maxconn = maxconn
|
||||
self.closed = False
|
||||
|
||||
self._args = args
|
||||
self._kwargs = kwargs
|
||||
|
||||
self._pool = []
|
||||
self._used = {}
|
||||
self._rused = {} # id(conn) -> key map
|
||||
self._keys = 0
|
||||
|
||||
for i in range(self.minconn):
|
||||
self._connect()
|
||||
|
||||
def _connect(self, key=None):
|
||||
"""Create a new connection and assign it to 'key' if not None."""
|
||||
conn = psycopg2.connect(*self._args, **self._kwargs)
|
||||
if key is not None:
|
||||
self._used[key] = conn
|
||||
self._rused[id(conn)] = key
|
||||
else:
|
||||
self._pool.append(conn)
|
||||
return conn
|
||||
|
||||
def _getkey(self):
|
||||
"""Return a new unique key."""
|
||||
self._keys += 1
|
||||
return self._keys
|
||||
|
||||
def _getconn(self, key=None):
|
||||
"""Get a free connection and assign it to 'key' if not None."""
|
||||
if self.closed: raise PoolError("connection pool is closed")
|
||||
if key is None: key = self._getkey()
|
||||
|
||||
if key in self._used:
|
||||
return self._used[key]
|
||||
|
||||
if self._pool:
|
||||
self._used[key] = conn = self._pool.pop()
|
||||
self._rused[id(conn)] = key
|
||||
return conn
|
||||
else:
|
||||
if len(self._used) == self.maxconn:
|
||||
raise PoolError("connection pool exausted")
|
||||
return self._connect(key)
|
||||
|
||||
def _putconn(self, conn, key=None, close=False):
|
||||
"""Put away a connection."""
|
||||
if self.closed: raise PoolError("connection pool is closed")
|
||||
if key is None: key = self._rused[id(conn)]
|
||||
|
||||
if not key:
|
||||
raise PoolError("trying to put unkeyed connection")
|
||||
|
||||
if len(self._pool) < self.minconn and not close:
|
||||
self._pool.append(conn)
|
||||
else:
|
||||
conn.close()
|
||||
|
||||
# here we check for the presence of key because it can happen that a
|
||||
# thread tries to put back a connection after a call to close
|
||||
if not self.closed or key in self._used:
|
||||
del self._used[key]
|
||||
del self._rused[id(conn)]
|
||||
|
||||
def _closeall(self):
|
||||
"""Close all connections.
|
||||
|
||||
Note that this can lead to some code fail badly when trying to use
|
||||
an already closed connection. If you call .closeall() make sure
|
||||
your code can deal with it.
|
||||
"""
|
||||
if self.closed: raise PoolError("connection pool is closed")
|
||||
for conn in self._pool + list(self._used.values()):
|
||||
try:
|
||||
conn.close()
|
||||
except:
|
||||
pass
|
||||
self.closed = True
|
||||
|
||||
|
||||
class PersistentConnectionPool(AbstractConnectionPool):
|
||||
"""A pool that assigns persistent connections to different threads.
|
||||
|
||||
Note that this connection pool generates by itself the required keys
|
||||
using the current thread id. This means that until a thread puts away
|
||||
a connection it will always get the same connection object by successive
|
||||
`!getconn()` calls. This also means that a thread can't use more than one
|
||||
single connection from the pool.
|
||||
"""
|
||||
|
||||
def __init__(self, minconn, maxconn, *args, **kwargs):
|
||||
"""Initialize the threading lock."""
|
||||
import threading
|
||||
AbstractConnectionPool.__init__(
|
||||
self, minconn, maxconn, *args, **kwargs)
|
||||
self._lock = threading.Lock()
|
||||
|
||||
# we we'll need the thread module, to determine thread ids, so we
|
||||
# import it here and copy it in an instance variable
|
||||
import thread
|
||||
self.__thread = thread
|
||||
|
||||
def getconn(self):
|
||||
"""Generate thread id and return a connection."""
|
||||
key = self.__thread.get_ident()
|
||||
self._lock.acquire()
|
||||
try:
|
||||
return self._getconn(key)
|
||||
finally:
|
||||
self._lock.release()
|
||||
|
||||
def putconn(self, conn=None, close=False):
|
||||
"""Put away an unused connection."""
|
||||
key = self.__thread.get_ident()
|
||||
self._lock.acquire()
|
||||
try:
|
||||
if not conn: conn = self._used[key]
|
||||
self._putconn(conn, key, close)
|
||||
finally:
|
||||
self._lock.release()
|
||||
|
||||
def closeall(self):
|
||||
"""Close all connections (even the one currently in use.)"""
|
||||
self._lock.acquire()
|
||||
try:
|
||||
self._closeall()
|
||||
finally:
|
||||
self._lock.release()
|
||||
|
||||
|
||||
_connections_pool = {}
|
||||
_connections_lock = threading.Lock()
|
||||
|
@ -29,7 +173,7 @@ def getpool(dsn, create=True):
|
|||
try:
|
||||
if not _connections_pool.has_key(dsn) and create:
|
||||
_connections_pool[dsn] = \
|
||||
psycopg2.pool.PersistentConnectionPool(4, 200, dsn)
|
||||
PersistentConnectionPool(4, 200, dsn)
|
||||
finally:
|
||||
_connections_lock.release()
|
||||
return _connections_pool[dsn]
|
||||
|
@ -41,7 +185,7 @@ def flushpool(dsn):
|
|||
del _connections_pool[dsn]
|
||||
finally:
|
||||
_connections_lock.release()
|
||||
|
||||
|
||||
def getconn(dsn, create=True):
|
||||
return getpool(dsn, create=create).getconn()
|
||||
|
||||
|
|
|
@ -217,6 +217,10 @@ class PersistentConnectionPool(AbstractConnectionPool):
|
|||
|
||||
def __init__(self, minconn, maxconn, *args, **kwargs):
|
||||
"""Initialize the threading lock."""
|
||||
import warnings
|
||||
warnings.warn("deprecated: use ZPsycopgDA.pool implementation",
|
||||
DeprecationWarning)
|
||||
|
||||
import threading
|
||||
AbstractConnectionPool.__init__(
|
||||
self, minconn, maxconn, *args, **kwargs)
|
||||
|
|
Loading…
Reference in New Issue
Block a user