diff --git a/tests/test_async.py b/tests/test_async.py index c2bbf4ce..5aad8d39 100644 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -33,7 +33,7 @@ class AsyncTests(unittest.TestCase): CREATE TEMPORARY TABLE table1 ( id int PRIMARY KEY )''') - self.conn.commit() + self.wait_for_query(curs) def tearDown(self): self.sync_conn.close() @@ -48,18 +48,19 @@ class AsyncTests(unittest.TestCase): select.select([], [cur.fileno()], []) state = cur.poll() - def test_wrong_execution_type(self): + def test_connection_setup(self): cur = self.conn.cursor() sync_cur = self.sync_conn.cursor() - self.assertRaises(psycopg2.ProgrammingError, cur.execute, - "select 'a'", async=False) - self.assertRaises(psycopg2.ProgrammingError, sync_cur.execute, - "select 'a'", async=True) + self.assertEquals(self.conn.issync(), False) + self.assertEquals(self.sync_conn.issync(), True) - # but this should work anyway - sync_cur.execute("select 'a'", async=False) - cur.execute("select 'a'", async=True) + # the async connection should be in isolevel 0 + self.assertEquals(self.conn.isolation_level, 0) + + def test_async_named_cursor(self): + self.assertRaises(psycopg2.ProgrammingError, + self.conn.cursor, "name") def test_async_select(self): cur = self.conn.cursor() @@ -75,7 +76,7 @@ class AsyncTests(unittest.TestCase): def test_async_callproc(self): cur = self.conn.cursor() try: - cur.callproc("pg_sleep", (0.1, ), True) + cur.callproc("pg_sleep", (0.1, )) except psycopg2.ProgrammingError: # PG <8.1 did not have pg_sleep return @@ -91,7 +92,14 @@ class AsyncTests(unittest.TestCase): cur.execute("insert into table1 values (1)") - # an async execute after an async one blocks and waits for completion + # an async execute after an async one raises an exception + self.assertRaises(psycopg2.ProgrammingError, + cur.execute, "select * from table1") + # same for callproc + self.assertRaises(psycopg2.ProgrammingError, + cur.callproc, "version") + # but after you've waited it should be good + self.wait_for_query(cur) cur.execute("select * from table1") self.wait_for_query(cur) @@ -109,7 +117,11 @@ class AsyncTests(unittest.TestCase): cur = self.conn.cursor() cur.execute("select 'a'") - # a fetch after an asynchronous query blocks and waits for completion + # a fetch after an asynchronous query should raise an error + self.assertRaises(psycopg2.ProgrammingError, + cur.fetchall) + # but after waiting it should work + self.wait_for_query(cur) self.assertEquals(cur.fetchall()[0][0], "a") def test_rollback_while_async(self): @@ -117,42 +129,38 @@ class AsyncTests(unittest.TestCase): cur.execute("select 'a'") - # a rollback blocks and should leave the connection in a workable state - self.conn.rollback() - self.assertFalse(self.conn.executing()) - - # try a sync cursor first - sync_cur = self.sync_conn.cursor() - sync_cur.execute("select 'b'") - self.assertEquals(sync_cur.fetchone()[0], "b") - - # now try the async cursor - cur.execute("select 'c'") - self.wait_for_query(cur) - self.assertEquals(cur.fetchmany()[0][0], "c") + # a rollback should not work in asynchronous mode + self.assertRaises(psycopg2.ProgrammingError, self.conn.rollback) def test_commit_while_async(self): cur = self.conn.cursor() + cur.execute("begin") + self.wait_for_query(cur) + cur.execute("insert into table1 values (1)") - # a commit blocks - self.conn.commit() - self.assertFalse(self.conn.executing()) + # a commit should not work in asynchronous mode + self.assertRaises(psycopg2.ProgrammingError, self.conn.commit) + self.assertTrue(self.conn.executing()) + + # but a manual commit should + self.wait_for_query(cur) + cur.execute("commit") + self.wait_for_query(cur) cur.execute("select * from table1") self.wait_for_query(cur) self.assertEquals(cur.fetchall()[0][0], 1) cur.execute("delete from table1") - self.conn.commit() + self.wait_for_query(cur) cur.execute("select * from table1") self.wait_for_query(cur) self.assertEquals(cur.fetchone(), None) def test_set_parameters_while_async(self): - prev_encoding = self.conn.encoding cur = self.conn.cursor() cur.execute("select 'c'") @@ -163,37 +171,36 @@ class AsyncTests(unittest.TestCase): extensions.TRANSACTION_STATUS_ACTIVE) self.assertTrue(self.conn.executing()) - # this issues a ROLLBACK internally - self.conn.set_client_encoding("LATIN1") + # setting connection encoding should fail + self.assertRaises(psycopg2.ProgrammingError, + self.conn.set_client_encoding, "LATIN1") - self.assertFalse(self.conn.executing()) - self.assertEquals(self.conn.encoding, "LATIN1") - - self.conn.set_client_encoding(prev_encoding) + # same for transaction isolation + self.assertRaises(psycopg2.ProgrammingError, + self.conn.set_isolation_level, 1) def test_reset_while_async(self): - prev_encoding = self.conn.encoding - # pick something different than the current encoding - new_encoding = (prev_encoding == "LATIN1") and "UTF8" or "LATIN1" - - self.conn.set_client_encoding(new_encoding) - cur = self.conn.cursor() cur.execute("select 'c'") self.assertTrue(self.conn.executing()) - self.conn.reset() - self.assertFalse(self.conn.executing()) - self.assertEquals(self.conn.encoding, prev_encoding) + # a reset should fail + self.assertRaises(psycopg2.ProgrammingError, self.conn.reset) def test_async_iter(self): cur = self.conn.cursor() + cur.execute("begin") + self.wait_for_query(cur) cur.execute("insert into table1 values (1), (2), (3)") self.wait_for_query(cur) cur.execute("select id from table1 order by id") - # iteration just blocks + # iteration fails if a query is underway + self.assertRaises(psycopg2.ProgrammingError, list, cur) + + # but after it's done it should work + self.wait_for_query(cur) self.assertEquals(list(cur), [(1, ), (2, ), (3, )]) self.assertFalse(self.conn.executing()) @@ -201,11 +208,15 @@ class AsyncTests(unittest.TestCase): cur = self.conn.cursor() cur.execute("select 'a'") - # copy just blocks - cur.copy_from(StringIO.StringIO("1\n3\n5\n\\.\n"), "table1") + # copy should fail + self.assertRaises(psycopg2.ProgrammingError, + cur.copy_from, + StringIO.StringIO("1\n3\n5\n\\.\n"), "table1") - cur.execute("select * from table1 order by id") - self.assertEquals(cur.fetchall(), [(1, ), (3, ), (5, )]) + def test_lobject_while_async(self): + # large objects should be prohibited + self.assertRaises(psycopg2.ProgrammingError, + self.conn.lobject) def test_async_executemany(self): cur = self.conn.cursor() @@ -219,13 +230,18 @@ class AsyncTests(unittest.TestCase): self.wait_for_query(cur) cur.execute("select id from table1 order by id") - # scroll blocks, but should work + # scroll should fail if a query is underway + self.assertRaises(psycopg2.ProgrammingError, cur.scroll, 1) + self.assertTrue(self.conn.executing()) + + # but after it's done it should work + self.wait_for_query(cur) cur.scroll(1) - self.assertFalse(self.conn.executing()) self.assertEquals(cur.fetchall(), [(2, ), (3, )]) cur = self.conn.cursor() cur.execute("select id from table1 order by id") + self.wait_for_query(cur) cur2 = self.conn.cursor() self.assertRaises(psycopg2.ProgrammingError, cur2.scroll, 1) @@ -234,19 +250,29 @@ class AsyncTests(unittest.TestCase): cur = self.conn.cursor() cur.execute("select id from table1 order by id") + self.wait_for_query(cur) + cur.scroll(2) + cur.scroll(-1) + self.assertEquals(cur.fetchall(), [(2, ), (3, )]) + + def test_scroll(self): + cur = self.sync_conn.cursor() + cur.execute("create table table1 (id int)") + cur.execute("insert into table1 values (1), (2), (3)") + cur.execute("select id from table1 order by id") cur.scroll(2) cur.scroll(-1) self.assertEquals(cur.fetchall(), [(2, ), (3, )]) def test_async_dont_read_all(self): cur = self.conn.cursor() - cur.execute("select 'a'; select 'b'") + cur.execute("select repeat('a', 10000); select repeat('b', 10000)") # fetch the result self.wait_for_query(cur) # it should be the result of the second query - self.assertEquals(cur.fetchone()[0][0], "b") + self.assertEquals(cur.fetchone()[0], "b" * 10000) def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__)