Adapt the tests to recent changes

Some methods were forbidden in asynchronous mode, the isolation level
of an asynchronous connection is not always 0 and these changes
influenced expected test results.
This commit is contained in:
Jan Urbański 2010-03-31 02:00:52 +02:00 committed by Federico Di Gregorio
parent 31f60be000
commit e15bc9da05

View File

@ -33,7 +33,7 @@ class AsyncTests(unittest.TestCase):
CREATE TEMPORARY TABLE table1 ( CREATE TEMPORARY TABLE table1 (
id int PRIMARY KEY id int PRIMARY KEY
)''') )''')
self.conn.commit() self.wait_for_query(curs)
def tearDown(self): def tearDown(self):
self.sync_conn.close() self.sync_conn.close()
@ -48,18 +48,19 @@ class AsyncTests(unittest.TestCase):
select.select([], [cur.fileno()], []) select.select([], [cur.fileno()], [])
state = cur.poll() state = cur.poll()
def test_wrong_execution_type(self): def test_connection_setup(self):
cur = self.conn.cursor() cur = self.conn.cursor()
sync_cur = self.sync_conn.cursor() sync_cur = self.sync_conn.cursor()
self.assertRaises(psycopg2.ProgrammingError, cur.execute, self.assertEquals(self.conn.issync(), False)
"select 'a'", async=False) self.assertEquals(self.sync_conn.issync(), True)
self.assertRaises(psycopg2.ProgrammingError, sync_cur.execute,
"select 'a'", async=True)
# but this should work anyway # the async connection should be in isolevel 0
sync_cur.execute("select 'a'", async=False) self.assertEquals(self.conn.isolation_level, 0)
cur.execute("select 'a'", async=True)
def test_async_named_cursor(self):
self.assertRaises(psycopg2.ProgrammingError,
self.conn.cursor, "name")
def test_async_select(self): def test_async_select(self):
cur = self.conn.cursor() cur = self.conn.cursor()
@ -75,7 +76,7 @@ class AsyncTests(unittest.TestCase):
def test_async_callproc(self): def test_async_callproc(self):
cur = self.conn.cursor() cur = self.conn.cursor()
try: try:
cur.callproc("pg_sleep", (0.1, ), True) cur.callproc("pg_sleep", (0.1, ))
except psycopg2.ProgrammingError: except psycopg2.ProgrammingError:
# PG <8.1 did not have pg_sleep # PG <8.1 did not have pg_sleep
return return
@ -91,7 +92,14 @@ class AsyncTests(unittest.TestCase):
cur.execute("insert into table1 values (1)") cur.execute("insert into table1 values (1)")
# an async execute after an async one blocks and waits for completion # an async execute after an async one raises an exception
self.assertRaises(psycopg2.ProgrammingError,
cur.execute, "select * from table1")
# same for callproc
self.assertRaises(psycopg2.ProgrammingError,
cur.callproc, "version")
# but after you've waited it should be good
self.wait_for_query(cur)
cur.execute("select * from table1") cur.execute("select * from table1")
self.wait_for_query(cur) self.wait_for_query(cur)
@ -109,7 +117,11 @@ class AsyncTests(unittest.TestCase):
cur = self.conn.cursor() cur = self.conn.cursor()
cur.execute("select 'a'") cur.execute("select 'a'")
# a fetch after an asynchronous query blocks and waits for completion # a fetch after an asynchronous query should raise an error
self.assertRaises(psycopg2.ProgrammingError,
cur.fetchall)
# but after waiting it should work
self.wait_for_query(cur)
self.assertEquals(cur.fetchall()[0][0], "a") self.assertEquals(cur.fetchall()[0][0], "a")
def test_rollback_while_async(self): def test_rollback_while_async(self):
@ -117,42 +129,38 @@ class AsyncTests(unittest.TestCase):
cur.execute("select 'a'") cur.execute("select 'a'")
# a rollback blocks and should leave the connection in a workable state # a rollback should not work in asynchronous mode
self.conn.rollback() self.assertRaises(psycopg2.ProgrammingError, self.conn.rollback)
self.assertFalse(self.conn.executing())
# try a sync cursor first
sync_cur = self.sync_conn.cursor()
sync_cur.execute("select 'b'")
self.assertEquals(sync_cur.fetchone()[0], "b")
# now try the async cursor
cur.execute("select 'c'")
self.wait_for_query(cur)
self.assertEquals(cur.fetchmany()[0][0], "c")
def test_commit_while_async(self): def test_commit_while_async(self):
cur = self.conn.cursor() cur = self.conn.cursor()
cur.execute("begin")
self.wait_for_query(cur)
cur.execute("insert into table1 values (1)") cur.execute("insert into table1 values (1)")
# a commit blocks # a commit should not work in asynchronous mode
self.conn.commit() self.assertRaises(psycopg2.ProgrammingError, self.conn.commit)
self.assertFalse(self.conn.executing()) self.assertTrue(self.conn.executing())
# but a manual commit should
self.wait_for_query(cur)
cur.execute("commit")
self.wait_for_query(cur)
cur.execute("select * from table1") cur.execute("select * from table1")
self.wait_for_query(cur) self.wait_for_query(cur)
self.assertEquals(cur.fetchall()[0][0], 1) self.assertEquals(cur.fetchall()[0][0], 1)
cur.execute("delete from table1") cur.execute("delete from table1")
self.conn.commit() self.wait_for_query(cur)
cur.execute("select * from table1") cur.execute("select * from table1")
self.wait_for_query(cur) self.wait_for_query(cur)
self.assertEquals(cur.fetchone(), None) self.assertEquals(cur.fetchone(), None)
def test_set_parameters_while_async(self): def test_set_parameters_while_async(self):
prev_encoding = self.conn.encoding
cur = self.conn.cursor() cur = self.conn.cursor()
cur.execute("select 'c'") cur.execute("select 'c'")
@ -163,37 +171,36 @@ class AsyncTests(unittest.TestCase):
extensions.TRANSACTION_STATUS_ACTIVE) extensions.TRANSACTION_STATUS_ACTIVE)
self.assertTrue(self.conn.executing()) self.assertTrue(self.conn.executing())
# this issues a ROLLBACK internally # setting connection encoding should fail
self.conn.set_client_encoding("LATIN1") self.assertRaises(psycopg2.ProgrammingError,
self.conn.set_client_encoding, "LATIN1")
self.assertFalse(self.conn.executing()) # same for transaction isolation
self.assertEquals(self.conn.encoding, "LATIN1") self.assertRaises(psycopg2.ProgrammingError,
self.conn.set_isolation_level, 1)
self.conn.set_client_encoding(prev_encoding)
def test_reset_while_async(self): def test_reset_while_async(self):
prev_encoding = self.conn.encoding
# pick something different than the current encoding
new_encoding = (prev_encoding == "LATIN1") and "UTF8" or "LATIN1"
self.conn.set_client_encoding(new_encoding)
cur = self.conn.cursor() cur = self.conn.cursor()
cur.execute("select 'c'") cur.execute("select 'c'")
self.assertTrue(self.conn.executing()) self.assertTrue(self.conn.executing())
self.conn.reset() # a reset should fail
self.assertFalse(self.conn.executing()) self.assertRaises(psycopg2.ProgrammingError, self.conn.reset)
self.assertEquals(self.conn.encoding, prev_encoding)
def test_async_iter(self): def test_async_iter(self):
cur = self.conn.cursor() cur = self.conn.cursor()
cur.execute("begin")
self.wait_for_query(cur)
cur.execute("insert into table1 values (1), (2), (3)") cur.execute("insert into table1 values (1), (2), (3)")
self.wait_for_query(cur) self.wait_for_query(cur)
cur.execute("select id from table1 order by id") cur.execute("select id from table1 order by id")
# iteration just blocks # iteration fails if a query is underway
self.assertRaises(psycopg2.ProgrammingError, list, cur)
# but after it's done it should work
self.wait_for_query(cur)
self.assertEquals(list(cur), [(1, ), (2, ), (3, )]) self.assertEquals(list(cur), [(1, ), (2, ), (3, )])
self.assertFalse(self.conn.executing()) self.assertFalse(self.conn.executing())
@ -201,11 +208,15 @@ class AsyncTests(unittest.TestCase):
cur = self.conn.cursor() cur = self.conn.cursor()
cur.execute("select 'a'") cur.execute("select 'a'")
# copy just blocks # copy should fail
cur.copy_from(StringIO.StringIO("1\n3\n5\n\\.\n"), "table1") self.assertRaises(psycopg2.ProgrammingError,
cur.copy_from,
StringIO.StringIO("1\n3\n5\n\\.\n"), "table1")
cur.execute("select * from table1 order by id") def test_lobject_while_async(self):
self.assertEquals(cur.fetchall(), [(1, ), (3, ), (5, )]) # large objects should be prohibited
self.assertRaises(psycopg2.ProgrammingError,
self.conn.lobject)
def test_async_executemany(self): def test_async_executemany(self):
cur = self.conn.cursor() cur = self.conn.cursor()
@ -219,13 +230,18 @@ class AsyncTests(unittest.TestCase):
self.wait_for_query(cur) self.wait_for_query(cur)
cur.execute("select id from table1 order by id") cur.execute("select id from table1 order by id")
# scroll blocks, but should work # scroll should fail if a query is underway
self.assertRaises(psycopg2.ProgrammingError, cur.scroll, 1)
self.assertTrue(self.conn.executing())
# but after it's done it should work
self.wait_for_query(cur)
cur.scroll(1) cur.scroll(1)
self.assertFalse(self.conn.executing())
self.assertEquals(cur.fetchall(), [(2, ), (3, )]) self.assertEquals(cur.fetchall(), [(2, ), (3, )])
cur = self.conn.cursor() cur = self.conn.cursor()
cur.execute("select id from table1 order by id") cur.execute("select id from table1 order by id")
self.wait_for_query(cur)
cur2 = self.conn.cursor() cur2 = self.conn.cursor()
self.assertRaises(psycopg2.ProgrammingError, cur2.scroll, 1) self.assertRaises(psycopg2.ProgrammingError, cur2.scroll, 1)
@ -234,19 +250,29 @@ class AsyncTests(unittest.TestCase):
cur = self.conn.cursor() cur = self.conn.cursor()
cur.execute("select id from table1 order by id") cur.execute("select id from table1 order by id")
self.wait_for_query(cur)
cur.scroll(2)
cur.scroll(-1)
self.assertEquals(cur.fetchall(), [(2, ), (3, )])
def test_scroll(self):
cur = self.sync_conn.cursor()
cur.execute("create table table1 (id int)")
cur.execute("insert into table1 values (1), (2), (3)")
cur.execute("select id from table1 order by id")
cur.scroll(2) cur.scroll(2)
cur.scroll(-1) cur.scroll(-1)
self.assertEquals(cur.fetchall(), [(2, ), (3, )]) self.assertEquals(cur.fetchall(), [(2, ), (3, )])
def test_async_dont_read_all(self): def test_async_dont_read_all(self):
cur = self.conn.cursor() cur = self.conn.cursor()
cur.execute("select 'a'; select 'b'") cur.execute("select repeat('a', 10000); select repeat('b', 10000)")
# fetch the result # fetch the result
self.wait_for_query(cur) self.wait_for_query(cur)
# it should be the result of the second query # it should be the result of the second query
self.assertEquals(cur.fetchone()[0][0], "b") self.assertEquals(cur.fetchone()[0], "b" * 10000)
def test_suite(): def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__) return unittest.TestLoader().loadTestsFromName(__name__)