diff --git a/lib/pool.py b/lib/pool.py index 4a5f0b5c..b90267d9 100644 --- a/lib/pool.py +++ b/lib/pool.py @@ -184,3 +184,163 @@ class ThreadedConnectionPool(AbstractConnectionPool): self._closeall() finally: self._lock.release() + + +class CachingConnectionPool(AbstractConnectionPool): + """A connection pool that works with the threading module and caches connections""" + + # A dictionary to hold connection ID's and when they should be removed from the pool + # Keys are id(connection) and vlaues are expiration time + # Storing the expiration time on the connection itself might be preferable, if possible. + from collections import OrderedDict + _expirations = OrderedDict() + + def __init__(self, minconn, maxconn, lifetime = 3600, *args, **kwargs): + """Initialize the threading lock.""" + import threading + from datetime import datetime, timedelta + AbstractConnectionPool.__init__( + self, minconn, maxconn, *args, **kwargs) + self._lock = threading.Lock() + self._lifetime = lifetime + + def _connect(self, key=None): + """Create a new connection, assign it to 'key' if not None, + And assign an expiration time""" + conn = psycopg2.connect(*self._args, **self._kwargs) + if key is not None: + self._used[key] = conn + self._rused[id(conn)] = key + else: + self._pool.append(conn) + + #Add expiration time + self._expirations[id(conn)] = datetime.now() + timedelta(seconds = self._lifetime) + return conn + + # Override the _putconn function to put the connection back into the pool even if we are over minconn, and to run the _prune command. + def _putconn(self, conn, key=None, close=False): + """Put away a connection.""" + if self.closed: + raise PoolError("connection pool is closed") + if key is None: + key = self._rused.get(id(conn)) + + if not key: + raise PoolError("trying to put unkeyed connection") + + if len(self._pool) < self.maxconn and not close: + # Return the connection into a consistent state before putting + # it back into the pool + if not conn.closed: + status = conn.get_transaction_status() + if status == _ext.TRANSACTION_STATUS_UNKNOWN: + # server connection lost + conn.close() + try: + del self._expirations[id(conn)] + except KeyError: + pass + elif status != _ext.TRANSACTION_STATUS_IDLE: + # connection in error or in transaction + conn.rollback() + self._pool.append(conn) + else: + # regular idle connection + self._pool.append(conn) + # If the connection is closed, we just discard it. + else: + try: + del self._expirations[id(conn)] + except KeyError: + pass + else: + conn.close() + #remove this connection from the expiration list + try: + del self._expirations[id(conn)] + except KeyError: + pass #not in the expiration list for some reason, can't remove it. + + # here we check for the presence of key because it can happen that a + # thread tries to put back a connection after a call to close + if not self.closed or key in self._used: + del self._used[key] + del self._rused[id(conn)] + + # remove any expired connections from the pool + self._prune() + + + def getconn(self, key=None): + """Get a free connection and assign it to 'key' if not None.""" + self._lock.acquire() + try: + return self._getconn(key) + finally: + self._lock.release() + + def putconn(self, conn=None, key=None, close=False): + """Put away an unused connection.""" + self._lock.acquire() + try: + self._putconn(conn, key, close) + finally: + self._lock.release() + + def closeall(self): + """Close all connections (even the one currently in use.)""" + self._lock.acquire() + try: + self._closeall() + finally: + self._lock.release() + + def _prune(self): + """Remove any expired connections from the connection pool.""" + junk_expirations = [] + for obj_id, exp_time in self._expirations.items(): + # _expirations is an ordered dict, so results should be in chronological order + if exp_time > datetime.now(): + break; + + del_idx = None + #find index of connection in _pool. May not be there if connection is in use + for index, conn in enumerate(self._pool): + if id(conn) == obj_id: + conn.close() + junk_expirations.append(obj_id) + del_idx = index + break + else: + # See if this connection is used. If not, we need to remove + # the reference to it. + for conn in self._used.values(): + if id(conn) == obj_id: + break #found it, so just move on. Don't expire the + # connection till we are done with it. + else: + # This connection doesn't exist any more, so get rid + # of the reference to the expiration. + # Can't delete here because we'd be changing the item + # we are itterating over. + junk_expirations.append(obj_id) + + # Delete connection from pool if expired + if del_idx is not None: + del self._pool[del_idx] + + + # Remove any junk expirations + for item in junk_expirations: + try: + del self._expirations[item] + except KeyError: + pass #expiration doesn't exist?? + + # Make sure we still have at least minconn connections + # Connections may be available or used + total_conns = len(self._pool) + len(self._used) + if total_conns < self.minconn: + for i in range(self.minconn - total_conns): + self._connect()