mirror of
https://github.com/psycopg/psycopg2.git
synced 2024-11-22 08:56:34 +03:00
a2ee25ecfe
Rollback connections in transaction or in error. Discard broken connections. Closes ticket #62.
256 lines
8.5 KiB
Python
256 lines
8.5 KiB
Python
"""Connection pooling for psycopg2
|
|
|
|
This module implements thread-safe (and not) connection pools.
|
|
"""
|
|
# psycopg/pool.py - pooling code for psycopg
|
|
#
|
|
# Copyright (C) 2003-2010 Federico Di Gregorio <fog@debian.org>
|
|
#
|
|
# psycopg2 is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Lesser General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# In addition, as a special exception, the copyright holders give
|
|
# permission to link this program with the OpenSSL library (or with
|
|
# modified versions of OpenSSL that use the same license as OpenSSL),
|
|
# and distribute linked combinations including the two.
|
|
#
|
|
# You must obey the GNU Lesser General Public License in all respects for
|
|
# all of the code used other than OpenSSL.
|
|
#
|
|
# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
|
|
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
|
# License for more details.
|
|
|
|
import psycopg2
|
|
import psycopg2.extensions as _ext
|
|
|
|
try:
|
|
import logging
|
|
# create logger object for psycopg2 module and sub-modules
|
|
_logger = logging.getLogger("psycopg2")
|
|
def dbg(*args):
|
|
_logger.debug("psycopg2", ' '.join([str(x) for x in args]))
|
|
try:
|
|
import App # does this make sure that we're running in Zope?
|
|
_logger.info("installed. Logging using Python logging module")
|
|
except:
|
|
_logger.debug("installed. Logging using Python logging module")
|
|
|
|
except ImportError:
|
|
from zLOG import LOG, DEBUG, INFO
|
|
def dbg(*args):
|
|
LOG('ZPsycopgDA', DEBUG, "",
|
|
' '.join([str(x) for x in args])+'\n')
|
|
LOG('ZPsycopgDA', INFO, "Installed", "Logging using Zope's zLOG\n")
|
|
|
|
except:
|
|
import sys
|
|
def dbg(*args):
|
|
sys.stderr.write(' '.join(args)+'\n')
|
|
|
|
|
|
class PoolError(psycopg2.Error):
|
|
pass
|
|
|
|
|
|
class AbstractConnectionPool(object):
|
|
"""Generic key-based pooling code."""
|
|
|
|
def __init__(self, minconn, maxconn, *args, **kwargs):
|
|
"""Initialize the connection pool.
|
|
|
|
New 'minconn' connections are created immediately calling 'connfunc'
|
|
with given parameters. The connection pool will support a maximum of
|
|
about 'maxconn' connections.
|
|
"""
|
|
self.minconn = minconn
|
|
self.maxconn = maxconn
|
|
self.closed = False
|
|
|
|
self._args = args
|
|
self._kwargs = kwargs
|
|
|
|
self._pool = []
|
|
self._used = {}
|
|
self._rused = {} # id(conn) -> key map
|
|
self._keys = 0
|
|
|
|
for i in range(self.minconn):
|
|
self._connect()
|
|
|
|
def _connect(self, key=None):
|
|
"""Create a new connection and assign it to 'key' if not None."""
|
|
conn = psycopg2.connect(*self._args, **self._kwargs)
|
|
if key is not None:
|
|
self._used[key] = conn
|
|
self._rused[id(conn)] = key
|
|
else:
|
|
self._pool.append(conn)
|
|
return conn
|
|
|
|
def _getkey(self):
|
|
"""Return a new unique key."""
|
|
self._keys += 1
|
|
return self._keys
|
|
|
|
def _getconn(self, key=None):
|
|
"""Get a free connection and assign it to 'key' if not None."""
|
|
if self.closed: raise PoolError("connection pool is closed")
|
|
if key is None: key = self._getkey()
|
|
|
|
if key in self._used:
|
|
return self._used[key]
|
|
|
|
if self._pool:
|
|
self._used[key] = conn = self._pool.pop()
|
|
self._rused[id(conn)] = key
|
|
return conn
|
|
else:
|
|
if len(self._used) == self.maxconn:
|
|
raise PoolError("connection pool exausted")
|
|
return self._connect(key)
|
|
|
|
def _putconn(self, conn, key=None, close=False):
|
|
"""Put away a connection."""
|
|
if self.closed: raise PoolError("connection pool is closed")
|
|
if key is None: key = self._rused.get(id(conn))
|
|
|
|
if not key:
|
|
raise PoolError("trying to put unkeyed connection")
|
|
|
|
if len(self._pool) < self.minconn and not close:
|
|
# Return the connection into a consistent state before putting
|
|
# it back into the pool
|
|
if not conn.closed:
|
|
status = conn.get_transaction_status()
|
|
if status == _ext.TRANSACTION_STATUS_UNKNOWN:
|
|
# server connection lost
|
|
conn.close()
|
|
elif status != _ext.TRANSACTION_STATUS_IDLE:
|
|
# connection in error or in transaction
|
|
conn.rollback()
|
|
self._pool.append(conn)
|
|
else:
|
|
# regular idle connection
|
|
self._pool.append(conn)
|
|
# If the connection is closed, we just discard it.
|
|
else:
|
|
conn.close()
|
|
|
|
# here we check for the presence of key because it can happen that a
|
|
# thread tries to put back a connection after a call to close
|
|
if not self.closed or key in self._used:
|
|
del self._used[key]
|
|
del self._rused[id(conn)]
|
|
|
|
def _closeall(self):
|
|
"""Close all connections.
|
|
|
|
Note that this can lead to some code fail badly when trying to use
|
|
an already closed connection. If you call .closeall() make sure
|
|
your code can deal with it.
|
|
"""
|
|
if self.closed: raise PoolError("connection pool is closed")
|
|
for conn in self._pool + list(self._used.values()):
|
|
try:
|
|
conn.close()
|
|
except:
|
|
pass
|
|
self.closed = True
|
|
|
|
|
|
class SimpleConnectionPool(AbstractConnectionPool):
|
|
"""A connection pool that can't be shared across different threads."""
|
|
|
|
getconn = AbstractConnectionPool._getconn
|
|
putconn = AbstractConnectionPool._putconn
|
|
closeall = AbstractConnectionPool._closeall
|
|
|
|
|
|
class ThreadedConnectionPool(AbstractConnectionPool):
|
|
"""A connection pool that works with the threading module."""
|
|
|
|
def __init__(self, minconn, maxconn, *args, **kwargs):
|
|
"""Initialize the threading lock."""
|
|
import threading
|
|
AbstractConnectionPool.__init__(
|
|
self, minconn, maxconn, *args, **kwargs)
|
|
self._lock = threading.Lock()
|
|
|
|
def getconn(self, key=None):
|
|
"""Get a free connection and assign it to 'key' if not None."""
|
|
self._lock.acquire()
|
|
try:
|
|
return self._getconn(key)
|
|
finally:
|
|
self._lock.release()
|
|
|
|
def putconn(self, conn=None, key=None, close=False):
|
|
"""Put away an unused connection."""
|
|
self._lock.acquire()
|
|
try:
|
|
self._putconn(conn, key, close)
|
|
finally:
|
|
self._lock.release()
|
|
|
|
def closeall(self):
|
|
"""Close all connections (even the one currently in use.)"""
|
|
self._lock.acquire()
|
|
try:
|
|
self._closeall()
|
|
finally:
|
|
self._lock.release()
|
|
|
|
|
|
class PersistentConnectionPool(AbstractConnectionPool):
|
|
"""A pool that assigns persistent connections to different threads.
|
|
|
|
Note that this connection pool generates by itself the required keys
|
|
using the current thread id. This means that until a thread puts away
|
|
a connection it will always get the same connection object by successive
|
|
`!getconn()` calls. This also means that a thread can't use more than one
|
|
single connection from the pool.
|
|
"""
|
|
|
|
def __init__(self, minconn, maxconn, *args, **kwargs):
|
|
"""Initialize the threading lock."""
|
|
import threading
|
|
AbstractConnectionPool.__init__(
|
|
self, minconn, maxconn, *args, **kwargs)
|
|
self._lock = threading.Lock()
|
|
|
|
# we we'll need the thread module, to determine thread ids, so we
|
|
# import it here and copy it in an instance variable
|
|
import thread
|
|
self.__thread = thread
|
|
|
|
def getconn(self):
|
|
"""Generate thread id and return a connection."""
|
|
key = self.__thread.get_ident()
|
|
self._lock.acquire()
|
|
try:
|
|
return self._getconn(key)
|
|
finally:
|
|
self._lock.release()
|
|
|
|
def putconn(self, conn=None, close=False):
|
|
"""Put away an unused connection."""
|
|
key = self.__thread.get_ident()
|
|
self._lock.acquire()
|
|
try:
|
|
if not conn: conn = self._used[key]
|
|
self._putconn(conn, key, close)
|
|
finally:
|
|
self._lock.release()
|
|
|
|
def closeall(self):
|
|
"""Close all connections (even the one currently in use.)"""
|
|
self._lock.acquire()
|
|
try:
|
|
self._closeall()
|
|
finally:
|
|
self._lock.release()
|