Add inital CachingConnectionPool class

This commit is contained in:
Israel Brewster 2017-06-08 09:14:44 -08:00 committed by Changaco
parent f08019e356
commit 74238d52e7

View File

@ -184,3 +184,163 @@ class ThreadedConnectionPool(AbstractConnectionPool):
self._closeall()
finally:
self._lock.release()
class CachingConnectionPool(AbstractConnectionPool):
"""A connection pool that works with the threading module and caches connections"""
# A dictionary to hold connection ID's and when they should be removed from the pool
# Keys are id(connection) and vlaues are expiration time
# Storing the expiration time on the connection itself might be preferable, if possible.
from collections import OrderedDict
_expirations = OrderedDict()
def __init__(self, minconn, maxconn, lifetime = 3600, *args, **kwargs):
"""Initialize the threading lock."""
import threading
from datetime import datetime, timedelta
AbstractConnectionPool.__init__(
self, minconn, maxconn, *args, **kwargs)
self._lock = threading.Lock()
self._lifetime = lifetime
def _connect(self, key=None):
"""Create a new connection, assign it to 'key' if not None,
And assign an expiration time"""
conn = psycopg2.connect(*self._args, **self._kwargs)
if key is not None:
self._used[key] = conn
self._rused[id(conn)] = key
else:
self._pool.append(conn)
#Add expiration time
self._expirations[id(conn)] = datetime.now() + timedelta(seconds = self._lifetime)
return conn
# Override the _putconn function to put the connection back into the pool even if we are over minconn, and to run the _prune command.
def _putconn(self, conn, key=None, close=False):
"""Put away a connection."""
if self.closed:
raise PoolError("connection pool is closed")
if key is None:
key = self._rused.get(id(conn))
if not key:
raise PoolError("trying to put unkeyed connection")
if len(self._pool) < self.maxconn and not close:
# Return the connection into a consistent state before putting
# it back into the pool
if not conn.closed:
status = conn.get_transaction_status()
if status == _ext.TRANSACTION_STATUS_UNKNOWN:
# server connection lost
conn.close()
try:
del self._expirations[id(conn)]
except KeyError:
pass
elif status != _ext.TRANSACTION_STATUS_IDLE:
# connection in error or in transaction
conn.rollback()
self._pool.append(conn)
else:
# regular idle connection
self._pool.append(conn)
# If the connection is closed, we just discard it.
else:
try:
del self._expirations[id(conn)]
except KeyError:
pass
else:
conn.close()
#remove this connection from the expiration list
try:
del self._expirations[id(conn)]
except KeyError:
pass #not in the expiration list for some reason, can't remove it.
# here we check for the presence of key because it can happen that a
# thread tries to put back a connection after a call to close
if not self.closed or key in self._used:
del self._used[key]
del self._rused[id(conn)]
# remove any expired connections from the pool
self._prune()
def getconn(self, key=None):
"""Get a free connection and assign it to 'key' if not None."""
self._lock.acquire()
try:
return self._getconn(key)
finally:
self._lock.release()
def putconn(self, conn=None, key=None, close=False):
"""Put away an unused connection."""
self._lock.acquire()
try:
self._putconn(conn, key, close)
finally:
self._lock.release()
def closeall(self):
"""Close all connections (even the one currently in use.)"""
self._lock.acquire()
try:
self._closeall()
finally:
self._lock.release()
def _prune(self):
"""Remove any expired connections from the connection pool."""
junk_expirations = []
for obj_id, exp_time in self._expirations.items():
# _expirations is an ordered dict, so results should be in chronological order
if exp_time > datetime.now():
break;
del_idx = None
#find index of connection in _pool. May not be there if connection is in use
for index, conn in enumerate(self._pool):
if id(conn) == obj_id:
conn.close()
junk_expirations.append(obj_id)
del_idx = index
break
else:
# See if this connection is used. If not, we need to remove
# the reference to it.
for conn in self._used.values():
if id(conn) == obj_id:
break #found it, so just move on. Don't expire the
# connection till we are done with it.
else:
# This connection doesn't exist any more, so get rid
# of the reference to the expiration.
# Can't delete here because we'd be changing the item
# we are itterating over.
junk_expirations.append(obj_id)
# Delete connection from pool if expired
if del_idx is not None:
del self._pool[del_idx]
# Remove any junk expirations
for item in junk_expirations:
try:
del self._expirations[item]
except KeyError:
pass #expiration doesn't exist??
# Make sure we still have at least minconn connections
# Connections may be available or used
total_conns = len(self._pool) + len(self._used)
if total_conns < self.minconn:
for i in range(self.minconn - total_conns):
self._connect()