clean up the pool tests

This commit is contained in:
Changaco 2019-09-23 11:15:10 +02:00
parent 6e2ddd26c5
commit 11dc2691f4

View File

@ -23,37 +23,37 @@
import unittest
import psycopg2
import psycopg2.pool as psycopg_pool
import psycopg2.extensions as _ext
from .testutils import ConnectingTestCase
import psycopg2.pool
from .testconfig import dsn, dbname
from .testutils import ConnectingTestCase
class PoolTests(ConnectingTestCase):
def test_caching_pool_get_conn(self):
def test_get_conn(self):
"""Test the call to getconn. Should just return an open connection."""
lifetime = 30
pool = psycopg_pool.SimpleConnectionPool(0, 1, dsn, idle_timeout=lifetime)
pool = psycopg2.pool.SimpleConnectionPool(0, 1, dsn, idle_timeout=30)
conn = pool.getconn()
#make sure we got an open connection
# Make sure we got an open connection
self.assertFalse(conn.closed)
#Try again. We should get an error, since we only allowed one connection
# Try again. We should get an error, since we only allowed one connection.
self.assertRaises(psycopg2.pool.PoolError, pool.getconn)
# Put the connection back, the return time should be set.
# Put the connection back, the return time should be set
pool.putconn(conn)
self.assertIn(id(conn), pool._return_times)
# Get the connection back.
# Get the connection back
new_conn = pool.getconn()
self.assertIs(new_conn, conn)
def test_caching_pool_prune(self):
def test_prune(self):
"""Test the prune function to make sure it closes conenctions and removes them from the pool"""
pool = psycopg_pool.SimpleConnectionPool(0, 3, dsn, idle_timeout=30)
pool = psycopg2.pool.SimpleConnectionPool(0, 3, dsn, idle_timeout=30)
# Get a connection that we use, so it can't be pruned.
sticky_conn = pool.getconn()
@ -62,13 +62,13 @@ class PoolTests(ConnectingTestCase):
self.assertFalse(sticky_conn.closed)
self.assertFalse(id(sticky_conn) in pool._return_times)
# create a second connection that is put back into the pool, available to be pruned.
# Create a second connection that is put back into the pool, available to be pruned
conn = pool.getconn()
# create a third connection that is put back into the pool, but won't be expired
# Create a third connection that is put back into the pool, but won't be expired
new_conn = pool.getconn()
# Put the connections back in the pool.
# Put the connections back in the pool
pool.putconn(conn)
pool.putconn(new_conn)
@ -86,14 +86,14 @@ class PoolTests(ConnectingTestCase):
self.assertNotEqual(conn, sticky_conn)
self.assertNotEqual(new_conn, conn)
#Make the connections expire a minute ago (but not new_con)
# Make the connections expire a minute ago (but not new_con)
pool._return_times[id(conn)] -= 60
pool._return_times[id(sticky_conn)] = pool._return_times[id(conn)]
#prune connections
# Prune connections
pool._prune()
# make sure the unused expired connection is gone and closed,
# Make sure the unused expired connection is gone and closed,
# but the used connection isn't
self.assertFalse(conn in pool._pool)
self.assertTrue(conn.closed)
@ -106,15 +106,15 @@ class PoolTests(ConnectingTestCase):
self.assertFalse(new_conn.closed)
self.assertTrue(id(new_conn) in pool._return_times)
def test_caching_pool_prune_below_min(self):
pool = psycopg_pool.SimpleConnectionPool(1, 1, dsn, idle_timeout=30)
def test_prune_below_min(self):
pool = psycopg2.pool.SimpleConnectionPool(1, 1, dsn, idle_timeout=30)
conn = pool.getconn()
self.assertFalse(conn in pool._pool)
#nothing in pool, since we only allow one connection here
# Nothing in pool, since we only allow one connection here
self.assertEqual(len(pool._pool), 0)
pool.putconn(conn, close= True)
pool.putconn(conn, close=True)
# This connection should *not* be in the pool, since we said close
self.assertFalse(conn in pool._pool)
@ -122,56 +122,55 @@ class PoolTests(ConnectingTestCase):
# But we should still have *a* connection available
self.assertEqual(len(pool._pool), 1)
def test_caching_pool_putconn_normal(self):
pool = psycopg_pool.SimpleConnectionPool(0, 1, dsn, idle_timeout=30)
def test_putconn_normal(self):
pool = psycopg2.pool.SimpleConnectionPool(0, 1, dsn, idle_timeout=30)
conn = pool.getconn()
self.assertFalse(conn in pool._pool)
pool.putconn(conn)
self.assertTrue(conn in pool._pool)
def test_caching_pool_putconn_closecon(self):
pool = psycopg_pool.SimpleConnectionPool(0, 1, dsn, idle_timeout=30)
def test_putconn_closecon(self):
pool = psycopg2.pool.SimpleConnectionPool(0, 1, dsn, idle_timeout=30)
conn = pool.getconn()
self.assertFalse(conn in pool._pool)
pool.putconn(conn, close = True)
pool.putconn(conn, close=True)
self.assertFalse(conn in pool._pool)
self.assertFalse(id(conn) in pool._return_times)
def test_caching_pool_getconn_expired(self):
pool = psycopg_pool.SimpleConnectionPool(0, 1, dsn, idle_timeout=30)
def test_getconn_expired(self):
pool = psycopg2.pool.SimpleConnectionPool(0, 1, dsn, idle_timeout=30)
conn = pool.getconn()
#expire the connection
# Expire the connection
pool.putconn(conn)
pool._return_times[id(conn)] -= 60
#connection should be discarded
# Connection should be discarded
new_conn = pool.getconn()
self.assertIsNot(new_conn, conn)
self.assertFalse(conn in pool._pool)
self.assertFalse(id(conn) in pool._return_times)
self.assertTrue(conn.closed)
def test_caching_pool_putconn_unkeyed(self):
pool = psycopg_pool.SimpleConnectionPool(0, 1, dsn, idle_timeout=30)
def test_putconn_unkeyed(self):
pool = psycopg2.pool.SimpleConnectionPool(0, 1, dsn, idle_timeout=30)
#Test put with missing key
# Test put with missing key
conn = pool.getconn()
del pool._rused[id(conn)]
self.assertRaises(psycopg_pool.PoolError, pool.putconn, conn)
self.assertRaises(psycopg2.pool.PoolError, pool.putconn, conn)
def test_caching_pool_putconn_errorState(self):
pool = psycopg_pool.SimpleConnectionPool(0, 1, dsn, idle_timeout=30)
def test_putconn_errorState(self):
pool = psycopg2.pool.SimpleConnectionPool(0, 1, dsn, idle_timeout=30)
conn = pool.getconn()
#Get connection into transaction state
# Get connection into transaction state
cursor = conn.cursor()
try:
cursor.execute("INSERT INTO test (id) VALUES (1)") #table does not exist, error
except:
cursor.execute("INSERT INTO nonexistent (id) VALUES (1)")
except psycopg2.ProgrammingError:
pass
self.assertNotEqual(conn.get_transaction_status(),
@ -179,16 +178,16 @@ class PoolTests(ConnectingTestCase):
pool.putconn(conn)
# make sure we got back into the pool and are now showing idle
# Make sure we got back into the pool and are now showing idle
self.assertEqual(conn.get_transaction_status(),
_ext.TRANSACTION_STATUS_IDLE)
self.assertTrue(conn in pool._pool)
def test_caching_pool_putconn_closed(self):
pool = psycopg_pool.SimpleConnectionPool(0, 1, dsn, idle_timeout=30)
def test_putconn_closed(self):
pool = psycopg2.pool.SimpleConnectionPool(0, 1, dsn, idle_timeout=30)
conn = pool.getconn()
# The connection should be open and shouldn't have a return time.
# The connection should be open and shouldn't have a return time
self.assertFalse(conn.closed)
self.assertFalse(id(conn) in pool._return_times)
@ -199,12 +198,12 @@ class PoolTests(ConnectingTestCase):
pool.putconn(conn)
# the connection should have been discarded
# The connection should have been discarded
self.assertFalse(conn in pool._pool)
self.assertFalse(id(conn) in pool._return_times)
def test_caching_pool_caching(self):
pool = psycopg_pool.SimpleConnectionPool(0, 10, dsn, idle_timeout=30)
def test_caching(self):
pool = psycopg2.pool.SimpleConnectionPool(0, 10, dsn, idle_timeout=30)
# Get a connection to use to check the number of connections
check_conn = pool.getconn()
@ -215,10 +214,10 @@ class PoolTests(ConnectingTestCase):
SQL = "SELECT numbackends from pg_stat_database WHERE datname=%s"
check_cursor.execute(SQL, (dbname, ))
#not trying to test anything yet, so hopefully this always works :)
# Not trying to test anything yet, so hopefully this always works :)
starting_conns = check_cursor.fetchone()[0]
#Get a couple more connections
# Get a couple more connections
conn2 = pool.getconn()
conn3 = pool.getconn()
@ -230,7 +229,7 @@ class PoolTests(ConnectingTestCase):
self.assertEqual(total_cons, starting_conns + 2)
#Put the connections back in the pool and verify they don't close
# Put the connections back in the pool and verify they don't close
pool.putconn(conn2)
pool.putconn(conn3)
@ -250,13 +249,13 @@ class PoolTests(ConnectingTestCase):
self.assertEqual(total_cons_after_get, total_cons)
def test_caching_pool_closeall(self):
pool = psycopg_pool.SimpleConnectionPool(0, 10, dsn, idle_timeout=30)
def test_closeall(self):
pool = psycopg2.pool.SimpleConnectionPool(0, 10, dsn, idle_timeout=30)
conn1 = pool.getconn()
conn2 = pool.getconn()
pool.putconn(conn2)
self.assertEqual(len(pool._pool), 1) #1 in use, 1 put back
self.assertEqual(len(pool._pool), 1) # 1 in use, 1 put back
self.assertEqual(len(pool._used), 1) # and we have one used connection
# Both connections should be open at this point
@ -277,7 +276,7 @@ class PoolTests(ConnectingTestCase):
# To maintain consistancy with existing code, closeall doesn't mess with the _return_times dict either
# self.assertEqual(len(pool._return_times), 0)
#We should get an error if we try to put conn1 back now
# We should get an error if we try to put conn1 back now
self.assertRaises(psycopg2.pool.PoolError, pool.putconn, conn1)