More careful connections handling during tests.

This commit is contained in:
Daniele Varrazzo 2010-11-28 16:00:32 +00:00
parent 598b9424d2
commit 361786f4a8
7 changed files with 81 additions and 41 deletions

View File

@ -137,6 +137,10 @@ class NamedTupleCursorTest(unittest.TestCase):
curs.execute("INSERT INTO nttest VALUES (3, 'baz')")
self.conn.commit()
def tearDown(self):
if self.conn is not None:
self.conn.close()
@if_has_namedtuple
def test_fetchone(self):
curs = self.conn.cursor()
@ -194,6 +198,8 @@ class NamedTupleCursorTest(unittest.TestCase):
# an import error somewhere
from psycopg2.extras import NamedTupleConnection
try:
if self.conn is not None:
self.conn.close()
self.conn = psycopg2.connect(tests.dsn,
connection_factory=NamedTupleConnection)
curs = self.conn.cursor()

View File

@ -11,17 +11,21 @@ import tests
class ConnectionTests(unittest.TestCase):
def connect(self):
return psycopg2.connect(tests.dsn)
def setUp(self):
self.conn = psycopg2.connect(tests.dsn)
def tearDown(self):
if not self.conn.closed:
self.conn.close()
def test_closed_attribute(self):
conn = self.connect()
conn = self.conn
self.assertEqual(conn.closed, False)
conn.close()
self.assertEqual(conn.closed, True)
def test_cursor_closed_attribute(self):
conn = self.connect()
conn = self.conn
curs = conn.cursor()
self.assertEqual(curs.closed, False)
curs.close()
@ -33,7 +37,7 @@ class ConnectionTests(unittest.TestCase):
self.assertEqual(curs.closed, True)
def test_reset(self):
conn = self.connect()
conn = self.conn
# switch isolation level, then reset
level = conn.isolation_level
conn.set_isolation_level(0)
@ -43,15 +47,14 @@ class ConnectionTests(unittest.TestCase):
self.assertEqual(conn.isolation_level, level)
def test_notices(self):
conn = self.connect()
conn = self.conn
cur = conn.cursor()
cur.execute("create temp table chatty (id serial primary key);")
self.assertEqual("CREATE TABLE", cur.statusmessage)
self.assert_(conn.notices)
conn.close()
def test_notices_consistent_order(self):
conn = self.connect()
conn = self.conn
cur = conn.cursor()
cur.execute("create temp table table1 (id serial); create temp table table2 (id serial);")
cur.execute("create temp table table3 (id serial); create temp table table4 (id serial);")
@ -60,10 +63,9 @@ class ConnectionTests(unittest.TestCase):
self.assert_('table2' in conn.notices[1])
self.assert_('table3' in conn.notices[2])
self.assert_('table4' in conn.notices[3])
conn.close()
def test_notices_limited(self):
conn = self.connect()
conn = self.conn
cur = conn.cursor()
for i in range(0, 100, 10):
sql = " ".join(["create temp table table%d (id serial);" % j for j in range(i, i+10)])
@ -74,36 +76,33 @@ class ConnectionTests(unittest.TestCase):
self.assert_('table51' in conn.notices[1], conn.notices[1])
self.assert_('table98' in conn.notices[-2], conn.notices[-2])
self.assert_('table99' in conn.notices[-1], conn.notices[-1])
conn.close()
def test_server_version(self):
conn = self.connect()
self.assert_(conn.server_version)
self.assert_(self.conn.server_version)
def test_protocol_version(self):
conn = self.connect()
self.assert_(conn.protocol_version in (2,3), conn.protocol_version)
self.assert_(self.conn.protocol_version in (2,3),
self.conn.protocol_version)
def test_tpc_unsupported(self):
cnn = self.connect()
cnn = self.conn
if cnn.server_version >= 80100:
return self.skipTest("tpc is supported")
self.assertRaises(psycopg2.NotSupportedError,
cnn.xid, 42, "foo", "bar")
@skip_if_no_pg_sleep('connect')
@skip_if_no_pg_sleep('conn')
def test_concurrent_execution(self):
def slave(cnn):
def slave():
cnn = psycopg2.connect(tests.dsn)
cur = cnn.cursor()
cur.execute("select pg_sleep(2)")
cur.close()
cnn.close()
cnn1 = self.connect()
cnn2 = self.connect()
t1 = threading.Thread(target=slave, args=(cnn1,))
t2 = threading.Thread(target=slave, args=(cnn2,))
t1 = threading.Thread(target=slave)
t2 = threading.Thread(target=slave)
t0 = time.time()
t1.start()
t2.start()
@ -116,6 +115,7 @@ class ConnectionTests(unittest.TestCase):
class IsolationLevelsTestCase(unittest.TestCase):
def setUp(self):
self._conns = []
conn = self.connect()
cur = conn.cursor()
try:
@ -126,8 +126,16 @@ class IsolationLevelsTestCase(unittest.TestCase):
conn.commit()
conn.close()
def tearDown(self):
# close the connections used in the test
for conn in self._conns:
if not conn.closed:
conn.close()
def connect(self):
return psycopg2.connect(tests.dsn)
conn = psycopg2.connect(tests.dsn)
self._conns.append(conn)
return conn
def test_isolation_level(self):
conn = self.connect()
@ -292,14 +300,23 @@ def skip_if_tpc_disabled(f):
skip_if_tpc_disabled_.__name__ = f.__name__
return skip_if_tpc_disabled_
class ConnectionTwoPhaseTests(unittest.TestCase):
def setUp(self):
self._conns = []
self.make_test_table()
self.clear_test_xacts()
def tearDown(self):
self.clear_test_xacts()
# close the connections used in the test
for conn in self._conns:
if not conn.closed:
conn.close()
def clear_test_xacts(self):
"""Rollback all the prepared transaction in the testing db."""
cnn = self.connect()
@ -352,7 +369,9 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
return rv
def connect(self):
return psycopg2.connect(tests.dsn)
conn = psycopg2.connect(tests.dsn)
self._conns.append(conn)
return conn
def test_tpc_commit(self):
cnn = self.connect()

View File

@ -7,11 +7,14 @@ import tests
class CursorTests(unittest.TestCase):
def connect(self):
return psycopg2.connect(tests.dsn)
def setUp(self):
self.conn = psycopg2.connect(tests.dsn)
def tearDown(self):
self.conn.close()
def test_executemany_propagate_exceptions(self):
conn = self.connect()
conn = self.conn
cur = conn.cursor()
cur.execute("create temp table test_exc (data int);")
def buggygen():
@ -19,10 +22,9 @@ class CursorTests(unittest.TestCase):
self.assertRaises(ZeroDivisionError,
cur.executemany, "insert into test_exc values (%s)", buggygen())
cur.close()
conn.close()
def test_mogrify_unicode(self):
conn = self.connect()
conn = self.conn
cur = conn.cursor()
# test consistency between execute and mogrify.
@ -60,7 +62,7 @@ class CursorTests(unittest.TestCase):
except:
return
conn = self.connect()
conn = self.conn
cur = conn.cursor()
self.assertEqual('SELECT 10.3;',
cur.mogrify("SELECT %s;", (Decimal("10.3"),)))

View File

@ -21,14 +21,13 @@ class ConnectionStub(object):
return rv
class GreenTests(unittest.TestCase):
def connect(self):
return psycopg2.connect(tests.dsn)
def setUp(self):
self._cb = psycopg2.extensions.get_wait_callback()
psycopg2.extensions.set_wait_callback(psycopg2.extras.wait_select)
self.conn = psycopg2.connect(tests.dsn)
def tearDown(self):
self.conn.close()
psycopg2.extensions.set_wait_callback(self._cb)
def set_stub_wait_callback(self, conn):
@ -39,7 +38,7 @@ class GreenTests(unittest.TestCase):
def test_flush_on_write(self):
# a very large query requires a flush loop to be sent to the backend
conn = self.connect()
conn = self.conn
stub = self.set_stub_wait_callback(conn)
curs = conn.cursor()
for mb in 1, 5, 10, 20, 50:
@ -58,7 +57,7 @@ class GreenTests(unittest.TestCase):
warnings.warn("sending a large query didn't trigger block on write.")
def test_error_in_callback(self):
conn = self.connect()
conn = self.conn
curs = conn.cursor()
curs.execute("select 1") # have a BEGIN
curs.fetchone()

View File

@ -211,6 +211,9 @@ class QueryCancellationTests(unittest.TestCase):
self.conn = psycopg2.connect(tests.dsn)
self.conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE)
def tearDown(self):
self.conn.close()
@skip_if_no_pg_sleep('conn')
def test_statement_timeout(self):
curs = self.conn.cursor()

View File

@ -39,6 +39,9 @@ class TypesBasicTests(unittest.TestCase):
def setUp(self):
self.conn = psycopg2.connect(tests.dsn)
def tearDown(self):
self.conn.close()
def execute(self, *args):
curs = self.conn.cursor()
curs.execute(*args)

View File

@ -60,6 +60,9 @@ class TypesExtrasTests(unittest.TestCase):
def setUp(self):
self.conn = psycopg2.connect(tests.dsn)
def tearDown(self):
self.conn.close()
def execute(self, *args):
curs = self.conn.cursor()
curs.execute(*args)
@ -144,6 +147,9 @@ class HstoreTestCase(unittest.TestCase):
def setUp(self):
self.conn = psycopg2.connect(tests.dsn)
def tearDown(self):
self.conn.close()
def test_adapt_8(self):
if self.conn.server_version >= 90000:
return self.skipTest("skipping dict adaptation with PG pre-9 syntax")
@ -277,10 +283,12 @@ class HstoreTestCase(unittest.TestCase):
try:
register_hstore(self.conn, globally=True)
conn2 = psycopg2.connect(self.conn.dsn)
try:
cur2 = self.conn.cursor()
cur2.execute("select 'a => b'::hstore")
r = cur2.fetchone()
self.assert_(isinstance(r[0], dict))
finally:
conn2.close()
finally:
psycopg2.extensions.string_types.pop(oids[0])