diff --git a/tests/test_pool.py b/tests/test_pool.py index f22ddbf0..ca160123 100644 --- a/tests/test_pool.py +++ b/tests/test_pool.py @@ -23,37 +23,37 @@ import unittest import psycopg2 -import psycopg2.pool as psycopg_pool import psycopg2.extensions as _ext - -from .testutils import ConnectingTestCase +import psycopg2.pool from .testconfig import dsn, dbname +from .testutils import ConnectingTestCase + class PoolTests(ConnectingTestCase): - def test_caching_pool_get_conn(self): + + def test_get_conn(self): """Test the call to getconn. Should just return an open connection.""" - lifetime = 30 - pool = psycopg_pool.SimpleConnectionPool(0, 1, dsn, idle_timeout=lifetime) + pool = psycopg2.pool.SimpleConnectionPool(0, 1, dsn, idle_timeout=30) conn = pool.getconn() - #make sure we got an open connection + # Make sure we got an open connection self.assertFalse(conn.closed) - #Try again. We should get an error, since we only allowed one connection + # Try again. We should get an error, since we only allowed one connection. self.assertRaises(psycopg2.pool.PoolError, pool.getconn) - # Put the connection back, the return time should be set. + # Put the connection back, the return time should be set pool.putconn(conn) self.assertIn(id(conn), pool._return_times) - # Get the connection back. + # Get the connection back new_conn = pool.getconn() self.assertIs(new_conn, conn) - def test_caching_pool_prune(self): + def test_prune(self): """Test the prune function to make sure it closes conenctions and removes them from the pool""" - pool = psycopg_pool.SimpleConnectionPool(0, 3, dsn, idle_timeout=30) + pool = psycopg2.pool.SimpleConnectionPool(0, 3, dsn, idle_timeout=30) # Get a connection that we use, so it can't be pruned. sticky_conn = pool.getconn() @@ -62,13 +62,13 @@ class PoolTests(ConnectingTestCase): self.assertFalse(sticky_conn.closed) self.assertFalse(id(sticky_conn) in pool._return_times) - # create a second connection that is put back into the pool, available to be pruned. + # Create a second connection that is put back into the pool, available to be pruned conn = pool.getconn() - # create a third connection that is put back into the pool, but won't be expired + # Create a third connection that is put back into the pool, but won't be expired new_conn = pool.getconn() - # Put the connections back in the pool. + # Put the connections back in the pool pool.putconn(conn) pool.putconn(new_conn) @@ -86,14 +86,14 @@ class PoolTests(ConnectingTestCase): self.assertNotEqual(conn, sticky_conn) self.assertNotEqual(new_conn, conn) - #Make the connections expire a minute ago (but not new_con) + # Make the connections expire a minute ago (but not new_con) pool._return_times[id(conn)] -= 60 pool._return_times[id(sticky_conn)] = pool._return_times[id(conn)] - #prune connections + # Prune connections pool._prune() - # make sure the unused expired connection is gone and closed, + # Make sure the unused expired connection is gone and closed, # but the used connection isn't self.assertFalse(conn in pool._pool) self.assertTrue(conn.closed) @@ -106,15 +106,15 @@ class PoolTests(ConnectingTestCase): self.assertFalse(new_conn.closed) self.assertTrue(id(new_conn) in pool._return_times) - def test_caching_pool_prune_below_min(self): - pool = psycopg_pool.SimpleConnectionPool(1, 1, dsn, idle_timeout=30) + def test_prune_below_min(self): + pool = psycopg2.pool.SimpleConnectionPool(1, 1, dsn, idle_timeout=30) conn = pool.getconn() self.assertFalse(conn in pool._pool) - #nothing in pool, since we only allow one connection here + # Nothing in pool, since we only allow one connection here self.assertEqual(len(pool._pool), 0) - pool.putconn(conn, close= True) + pool.putconn(conn, close=True) # This connection should *not* be in the pool, since we said close self.assertFalse(conn in pool._pool) @@ -122,56 +122,55 @@ class PoolTests(ConnectingTestCase): # But we should still have *a* connection available self.assertEqual(len(pool._pool), 1) - - def test_caching_pool_putconn_normal(self): - pool = psycopg_pool.SimpleConnectionPool(0, 1, dsn, idle_timeout=30) + def test_putconn_normal(self): + pool = psycopg2.pool.SimpleConnectionPool(0, 1, dsn, idle_timeout=30) conn = pool.getconn() self.assertFalse(conn in pool._pool) pool.putconn(conn) self.assertTrue(conn in pool._pool) - def test_caching_pool_putconn_closecon(self): - pool = psycopg_pool.SimpleConnectionPool(0, 1, dsn, idle_timeout=30) + def test_putconn_closecon(self): + pool = psycopg2.pool.SimpleConnectionPool(0, 1, dsn, idle_timeout=30) conn = pool.getconn() self.assertFalse(conn in pool._pool) - pool.putconn(conn, close = True) + pool.putconn(conn, close=True) self.assertFalse(conn in pool._pool) self.assertFalse(id(conn) in pool._return_times) - def test_caching_pool_getconn_expired(self): - pool = psycopg_pool.SimpleConnectionPool(0, 1, dsn, idle_timeout=30) + def test_getconn_expired(self): + pool = psycopg2.pool.SimpleConnectionPool(0, 1, dsn, idle_timeout=30) conn = pool.getconn() - #expire the connection + # Expire the connection pool.putconn(conn) pool._return_times[id(conn)] -= 60 - #connection should be discarded + # Connection should be discarded new_conn = pool.getconn() self.assertIsNot(new_conn, conn) self.assertFalse(conn in pool._pool) self.assertFalse(id(conn) in pool._return_times) self.assertTrue(conn.closed) - def test_caching_pool_putconn_unkeyed(self): - pool = psycopg_pool.SimpleConnectionPool(0, 1, dsn, idle_timeout=30) + def test_putconn_unkeyed(self): + pool = psycopg2.pool.SimpleConnectionPool(0, 1, dsn, idle_timeout=30) - #Test put with missing key + # Test put with missing key conn = pool.getconn() del pool._rused[id(conn)] - self.assertRaises(psycopg_pool.PoolError, pool.putconn, conn) + self.assertRaises(psycopg2.pool.PoolError, pool.putconn, conn) - def test_caching_pool_putconn_errorState(self): - pool = psycopg_pool.SimpleConnectionPool(0, 1, dsn, idle_timeout=30) + def test_putconn_errorState(self): + pool = psycopg2.pool.SimpleConnectionPool(0, 1, dsn, idle_timeout=30) conn = pool.getconn() - #Get connection into transaction state + # Get connection into transaction state cursor = conn.cursor() try: - cursor.execute("INSERT INTO test (id) VALUES (1)") #table does not exist, error - except: + cursor.execute("INSERT INTO nonexistent (id) VALUES (1)") + except psycopg2.ProgrammingError: pass self.assertNotEqual(conn.get_transaction_status(), @@ -179,16 +178,16 @@ class PoolTests(ConnectingTestCase): pool.putconn(conn) - # make sure we got back into the pool and are now showing idle + # Make sure we got back into the pool and are now showing idle self.assertEqual(conn.get_transaction_status(), _ext.TRANSACTION_STATUS_IDLE) self.assertTrue(conn in pool._pool) - def test_caching_pool_putconn_closed(self): - pool = psycopg_pool.SimpleConnectionPool(0, 1, dsn, idle_timeout=30) + def test_putconn_closed(self): + pool = psycopg2.pool.SimpleConnectionPool(0, 1, dsn, idle_timeout=30) conn = pool.getconn() - # The connection should be open and shouldn't have a return time. + # The connection should be open and shouldn't have a return time self.assertFalse(conn.closed) self.assertFalse(id(conn) in pool._return_times) @@ -199,12 +198,12 @@ class PoolTests(ConnectingTestCase): pool.putconn(conn) - # the connection should have been discarded + # The connection should have been discarded self.assertFalse(conn in pool._pool) self.assertFalse(id(conn) in pool._return_times) - def test_caching_pool_caching(self): - pool = psycopg_pool.SimpleConnectionPool(0, 10, dsn, idle_timeout=30) + def test_caching(self): + pool = psycopg2.pool.SimpleConnectionPool(0, 10, dsn, idle_timeout=30) # Get a connection to use to check the number of connections check_conn = pool.getconn() @@ -215,10 +214,10 @@ class PoolTests(ConnectingTestCase): SQL = "SELECT numbackends from pg_stat_database WHERE datname=%s" check_cursor.execute(SQL, (dbname, )) - #not trying to test anything yet, so hopefully this always works :) + # Not trying to test anything yet, so hopefully this always works :) starting_conns = check_cursor.fetchone()[0] - #Get a couple more connections + # Get a couple more connections conn2 = pool.getconn() conn3 = pool.getconn() @@ -230,7 +229,7 @@ class PoolTests(ConnectingTestCase): self.assertEqual(total_cons, starting_conns + 2) - #Put the connections back in the pool and verify they don't close + # Put the connections back in the pool and verify they don't close pool.putconn(conn2) pool.putconn(conn3) @@ -250,13 +249,13 @@ class PoolTests(ConnectingTestCase): self.assertEqual(total_cons_after_get, total_cons) - def test_caching_pool_closeall(self): - pool = psycopg_pool.SimpleConnectionPool(0, 10, dsn, idle_timeout=30) + def test_closeall(self): + pool = psycopg2.pool.SimpleConnectionPool(0, 10, dsn, idle_timeout=30) conn1 = pool.getconn() conn2 = pool.getconn() pool.putconn(conn2) - self.assertEqual(len(pool._pool), 1) #1 in use, 1 put back + self.assertEqual(len(pool._pool), 1) # 1 in use, 1 put back self.assertEqual(len(pool._used), 1) # and we have one used connection # Both connections should be open at this point @@ -277,7 +276,7 @@ class PoolTests(ConnectingTestCase): # To maintain consistancy with existing code, closeall doesn't mess with the _return_times dict either # self.assertEqual(len(pool._return_times), 0) - #We should get an error if we try to put conn1 back now + # We should get an error if we try to put conn1 back now self.assertRaises(psycopg2.pool.PoolError, pool.putconn, conn1)