mirror of
				https://github.com/psycopg/psycopg2.git
				synced 2025-10-26 05:21:03 +03:00 
			
		
		
		
	Added specific pool implementation for ZPsycopgDA
The implementation is based on psycopg 2.4, which should be less broken (zope-wise) of the current one. Instantiating psycopg2.pool.PersistentConnectionPool now raises a warning. This should fix ticket #123, #125. The issue of the reset on set_client_encoding() is still present but that's always been there and I'm no good at fixing it.
This commit is contained in:
		
							parent
							
								
									fb24777200
								
							
						
					
					
						commit
						bf45539585
					
				
							
								
								
									
										1
									
								
								NEWS
									
									
									
									
									
								
							
							
						
						
									
										1
									
								
								NEWS
									
									
									
									
									
								
							|  | @ -12,6 +12,7 @@ What's new in psycopg 2.4.6 | ||||||
|     (ticket #113). |     (ticket #113). | ||||||
|   - 'register_hstore()', 'register_composite()', 'tpc_recover()' work with |   - 'register_hstore()', 'register_composite()', 'tpc_recover()' work with | ||||||
|     RealDictConnection and Cursor (ticket #114). |     RealDictConnection and Cursor (ticket #114). | ||||||
|  |   - Fixed broken pool for Zope (tickets #123, #125). | ||||||
|   - connect() raises an exception instead of swallowing keyword arguments |   - connect() raises an exception instead of swallowing keyword arguments | ||||||
|     when a connection string is specified as well (ticket #131). |     when a connection string is specified as well (ticket #131). | ||||||
|   - Discard any result produced by 'executemany()' (ticket #133). |   - Discard any result produced by 'executemany()' (ticket #133). | ||||||
|  |  | ||||||
|  | @ -19,7 +19,151 @@ | ||||||
| # ZPsycopgDA code in db.py. | # ZPsycopgDA code in db.py. | ||||||
| 
 | 
 | ||||||
| import threading | import threading | ||||||
| import psycopg2.pool | import psycopg2 | ||||||
|  | from psycopg2.pool import PoolError | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | class AbstractConnectionPool(object): | ||||||
|  |     """Generic key-based pooling code.""" | ||||||
|  | 
 | ||||||
|  |     def __init__(self, minconn, maxconn, *args, **kwargs): | ||||||
|  |         """Initialize the connection pool. | ||||||
|  | 
 | ||||||
|  |         New 'minconn' connections are created immediately calling 'connfunc' | ||||||
|  |         with given parameters. The connection pool will support a maximum of | ||||||
|  |         about 'maxconn' connections. | ||||||
|  |         """ | ||||||
|  |         self.minconn = minconn | ||||||
|  |         self.maxconn = maxconn | ||||||
|  |         self.closed = False | ||||||
|  | 
 | ||||||
|  |         self._args = args | ||||||
|  |         self._kwargs = kwargs | ||||||
|  | 
 | ||||||
|  |         self._pool = [] | ||||||
|  |         self._used = {} | ||||||
|  |         self._rused = {} # id(conn) -> key map | ||||||
|  |         self._keys = 0 | ||||||
|  | 
 | ||||||
|  |         for i in range(self.minconn): | ||||||
|  |             self._connect() | ||||||
|  | 
 | ||||||
|  |     def _connect(self, key=None): | ||||||
|  |         """Create a new connection and assign it to 'key' if not None.""" | ||||||
|  |         conn = psycopg2.connect(*self._args, **self._kwargs) | ||||||
|  |         if key is not None: | ||||||
|  |             self._used[key] = conn | ||||||
|  |             self._rused[id(conn)] = key | ||||||
|  |         else: | ||||||
|  |             self._pool.append(conn) | ||||||
|  |         return conn | ||||||
|  | 
 | ||||||
|  |     def _getkey(self): | ||||||
|  |         """Return a new unique key.""" | ||||||
|  |         self._keys += 1 | ||||||
|  |         return self._keys | ||||||
|  | 
 | ||||||
|  |     def _getconn(self, key=None): | ||||||
|  |         """Get a free connection and assign it to 'key' if not None.""" | ||||||
|  |         if self.closed: raise PoolError("connection pool is closed") | ||||||
|  |         if key is None: key = self._getkey() | ||||||
|  | 
 | ||||||
|  |         if key in self._used: | ||||||
|  |             return self._used[key] | ||||||
|  | 
 | ||||||
|  |         if self._pool: | ||||||
|  |             self._used[key] = conn = self._pool.pop() | ||||||
|  |             self._rused[id(conn)] = key | ||||||
|  |             return conn | ||||||
|  |         else: | ||||||
|  |             if len(self._used) == self.maxconn: | ||||||
|  |                 raise PoolError("connection pool exausted") | ||||||
|  |             return self._connect(key) | ||||||
|  | 
 | ||||||
|  |     def _putconn(self, conn, key=None, close=False): | ||||||
|  |         """Put away a connection.""" | ||||||
|  |         if self.closed: raise PoolError("connection pool is closed") | ||||||
|  |         if key is None: key = self._rused[id(conn)] | ||||||
|  | 
 | ||||||
|  |         if not key: | ||||||
|  |             raise PoolError("trying to put unkeyed connection") | ||||||
|  | 
 | ||||||
|  |         if len(self._pool) < self.minconn and not close: | ||||||
|  |             self._pool.append(conn) | ||||||
|  |         else: | ||||||
|  |             conn.close() | ||||||
|  | 
 | ||||||
|  |         # here we check for the presence of key because it can happen that a | ||||||
|  |         # thread tries to put back a connection after a call to close | ||||||
|  |         if not self.closed or key in self._used: | ||||||
|  |             del self._used[key] | ||||||
|  |             del self._rused[id(conn)] | ||||||
|  | 
 | ||||||
|  |     def _closeall(self): | ||||||
|  |         """Close all connections. | ||||||
|  | 
 | ||||||
|  |         Note that this can lead to some code fail badly when trying to use | ||||||
|  |         an already closed connection. If you call .closeall() make sure | ||||||
|  |         your code can deal with it. | ||||||
|  |         """ | ||||||
|  |         if self.closed: raise PoolError("connection pool is closed") | ||||||
|  |         for conn in self._pool + list(self._used.values()): | ||||||
|  |             try: | ||||||
|  |                 conn.close() | ||||||
|  |             except: | ||||||
|  |                 pass | ||||||
|  |         self.closed = True | ||||||
|  | 
 | ||||||
|  | 
 | ||||||
|  | class PersistentConnectionPool(AbstractConnectionPool): | ||||||
|  |     """A pool that assigns persistent connections to different threads. | ||||||
|  | 
 | ||||||
|  |     Note that this connection pool generates by itself the required keys | ||||||
|  |     using the current thread id.  This means that until a thread puts away | ||||||
|  |     a connection it will always get the same connection object by successive | ||||||
|  |     `!getconn()` calls. This also means that a thread can't use more than one | ||||||
|  |     single connection from the pool. | ||||||
|  |     """ | ||||||
|  | 
 | ||||||
|  |     def __init__(self, minconn, maxconn, *args, **kwargs): | ||||||
|  |         """Initialize the threading lock.""" | ||||||
|  |         import threading | ||||||
|  |         AbstractConnectionPool.__init__( | ||||||
|  |             self, minconn, maxconn, *args, **kwargs) | ||||||
|  |         self._lock = threading.Lock() | ||||||
|  | 
 | ||||||
|  |         # we we'll need the thread module, to determine thread ids, so we | ||||||
|  |         # import it here and copy it in an instance variable | ||||||
|  |         import thread | ||||||
|  |         self.__thread = thread | ||||||
|  | 
 | ||||||
|  |     def getconn(self): | ||||||
|  |         """Generate thread id and return a connection.""" | ||||||
|  |         key = self.__thread.get_ident() | ||||||
|  |         self._lock.acquire() | ||||||
|  |         try: | ||||||
|  |             return self._getconn(key) | ||||||
|  |         finally: | ||||||
|  |             self._lock.release() | ||||||
|  | 
 | ||||||
|  |     def putconn(self, conn=None, close=False): | ||||||
|  |         """Put away an unused connection.""" | ||||||
|  |         key = self.__thread.get_ident() | ||||||
|  |         self._lock.acquire() | ||||||
|  |         try: | ||||||
|  |             if not conn: conn = self._used[key] | ||||||
|  |             self._putconn(conn, key, close) | ||||||
|  |         finally: | ||||||
|  |             self._lock.release() | ||||||
|  | 
 | ||||||
|  |     def closeall(self): | ||||||
|  |         """Close all connections (even the one currently in use.)""" | ||||||
|  |         self._lock.acquire() | ||||||
|  |         try: | ||||||
|  |             self._closeall() | ||||||
|  |         finally: | ||||||
|  |             self._lock.release() | ||||||
|  | 
 | ||||||
| 
 | 
 | ||||||
| _connections_pool = {} | _connections_pool = {} | ||||||
| _connections_lock = threading.Lock() | _connections_lock = threading.Lock() | ||||||
|  | @ -29,7 +173,7 @@ def getpool(dsn, create=True): | ||||||
|     try: |     try: | ||||||
|         if not _connections_pool.has_key(dsn) and create: |         if not _connections_pool.has_key(dsn) and create: | ||||||
|             _connections_pool[dsn] = \ |             _connections_pool[dsn] = \ | ||||||
|                 psycopg2.pool.PersistentConnectionPool(4, 200, dsn) |                 PersistentConnectionPool(4, 200, dsn) | ||||||
|     finally: |     finally: | ||||||
|         _connections_lock.release() |         _connections_lock.release() | ||||||
|     return _connections_pool[dsn] |     return _connections_pool[dsn] | ||||||
|  | @ -41,7 +185,7 @@ def flushpool(dsn): | ||||||
|         del _connections_pool[dsn] |         del _connections_pool[dsn] | ||||||
|     finally: |     finally: | ||||||
|         _connections_lock.release() |         _connections_lock.release() | ||||||
|          | 
 | ||||||
| def getconn(dsn, create=True): | def getconn(dsn, create=True): | ||||||
|     return getpool(dsn, create=create).getconn() |     return getpool(dsn, create=create).getconn() | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -217,6 +217,10 @@ class PersistentConnectionPool(AbstractConnectionPool): | ||||||
| 
 | 
 | ||||||
|     def __init__(self, minconn, maxconn, *args, **kwargs): |     def __init__(self, minconn, maxconn, *args, **kwargs): | ||||||
|         """Initialize the threading lock.""" |         """Initialize the threading lock.""" | ||||||
|  |         import warnings | ||||||
|  |         warnings.warn("deprecated: use ZPsycopgDA.pool implementation", | ||||||
|  |             DeprecationWarning) | ||||||
|  | 
 | ||||||
|         import threading |         import threading | ||||||
|         AbstractConnectionPool.__init__( |         AbstractConnectionPool.__init__( | ||||||
|             self, minconn, maxconn, *args, **kwargs) |             self, minconn, maxconn, *args, **kwargs) | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue
	
	Block a user