psycopg2/tests/test_async.py

456 lines
15 KiB
Python
Raw Normal View History

2010-03-26 06:03:54 +03:00
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# test_async.py - unit test for asynchronous API
#
# Copyright (C) 2010-2011 Jan Urbański <wulczer@wulczer.org>
#
# psycopg2 is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# In addition, as a special exception, the copyright holders give
# permission to link this program with the OpenSSL library (or with
# modified versions of OpenSSL that use the same license as OpenSSL),
# and distribute linked combinations including the two.
#
# You must obey the GNU Lesser General Public License in all respects for
# all of the code used other than OpenSSL.
#
# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
from testutils import unittest, skip_if_no_pg_sleep
2010-03-26 06:03:54 +03:00
import psycopg2
from psycopg2 import extensions
import time
2010-03-26 06:03:54 +03:00
import select
import StringIO
from testconfig import dsn
2010-03-26 06:03:54 +03:00
class PollableStub(object):
"""A 'pollable' wrapper allowing analysis of the `poll()` calls."""
def __init__(self, pollable):
self.pollable = pollable
self.polls = []
def fileno(self):
return self.pollable.fileno()
def poll(self):
rv = self.pollable.poll()
self.polls.append(rv)
return rv
2010-03-26 06:03:54 +03:00
class AsyncTests(unittest.TestCase):
def setUp(self):
self.sync_conn = psycopg2.connect(dsn)
self.conn = psycopg2.connect(dsn, async=True)
2010-03-26 06:03:54 +03:00
self.wait(self.conn)
2010-03-26 06:03:54 +03:00
curs = self.conn.cursor()
curs.execute('''
CREATE TEMPORARY TABLE table1 (
id int PRIMARY KEY
)''')
self.wait(curs)
2010-03-26 06:03:54 +03:00
def tearDown(self):
self.sync_conn.close()
self.conn.close()
def wait(self, cur_or_conn):
pollable = cur_or_conn
if not hasattr(pollable, 'poll'):
pollable = cur_or_conn.connection
while True:
state = pollable.poll()
if state == psycopg2.extensions.POLL_OK:
break
elif state == psycopg2.extensions.POLL_READ:
select.select([pollable], [], [])
2010-03-26 06:03:54 +03:00
elif state == psycopg2.extensions.POLL_WRITE:
select.select([], [pollable], [])
else:
raise Exception("Unexpected result from poll: %r", state)
2010-03-26 06:03:54 +03:00
def test_connection_setup(self):
2010-03-26 06:03:54 +03:00
cur = self.conn.cursor()
sync_cur = self.sync_conn.cursor()
self.assert_(self.conn.async)
self.assert_(not self.sync_conn.async)
2010-03-26 06:03:54 +03:00
# the async connection should be in isolevel 0
self.assertEquals(self.conn.isolation_level, 0)
# check other properties to be found on the connection
self.assert_(self.conn.server_version)
self.assert_(self.conn.protocol_version in (2,3))
self.assert_(self.conn.encoding in psycopg2.extensions.encodings)
def test_async_named_cursor(self):
self.assertRaises(psycopg2.ProgrammingError,
self.conn.cursor, "name")
2010-03-26 06:03:54 +03:00
def test_async_select(self):
cur = self.conn.cursor()
self.assertFalse(self.conn.isexecuting())
2010-03-26 06:03:54 +03:00
cur.execute("select 'a'")
self.assertTrue(self.conn.isexecuting())
2010-03-26 06:03:54 +03:00
self.wait(cur)
2010-03-26 06:03:54 +03:00
self.assertFalse(self.conn.isexecuting())
2010-03-26 06:03:54 +03:00
self.assertEquals(cur.fetchone()[0], "a")
@skip_if_no_pg_sleep('conn')
2010-03-26 06:03:54 +03:00
def test_async_callproc(self):
cur = self.conn.cursor()
cur.callproc("pg_sleep", (0.1, ))
self.assertTrue(self.conn.isexecuting())
self.wait(cur)
self.assertFalse(self.conn.isexecuting())
self.assertEquals(cur.fetchall()[0][0], '')
2010-03-26 06:03:54 +03:00
def test_async_after_async(self):
cur = self.conn.cursor()
cur2 = self.conn.cursor()
cur.execute("insert into table1 values (1)")
# an async execute after an async one raises an exception
self.assertRaises(psycopg2.ProgrammingError,
cur.execute, "select * from table1")
# same for callproc
self.assertRaises(psycopg2.ProgrammingError,
cur.callproc, "version")
# but after you've waited it should be good
self.wait(cur)
2010-03-26 06:03:54 +03:00
cur.execute("select * from table1")
self.wait(cur)
2010-03-26 06:03:54 +03:00
self.assertEquals(cur.fetchall()[0][0], 1)
cur.execute("delete from table1")
self.wait(cur)
2010-03-26 06:03:54 +03:00
cur.execute("select * from table1")
self.wait(cur)
2010-03-26 06:03:54 +03:00
self.assertEquals(cur.fetchone(), None)
def test_fetch_after_async(self):
cur = self.conn.cursor()
cur.execute("select 'a'")
# a fetch after an asynchronous query should raise an error
self.assertRaises(psycopg2.ProgrammingError,
cur.fetchall)
# but after waiting it should work
self.wait(cur)
2010-03-26 06:03:54 +03:00
self.assertEquals(cur.fetchall()[0][0], "a")
def test_rollback_while_async(self):
cur = self.conn.cursor()
cur.execute("select 'a'")
# a rollback should not work in asynchronous mode
self.assertRaises(psycopg2.ProgrammingError, self.conn.rollback)
2010-03-26 06:03:54 +03:00
def test_commit_while_async(self):
cur = self.conn.cursor()
cur.execute("begin")
self.wait(cur)
2010-03-26 06:03:54 +03:00
cur.execute("insert into table1 values (1)")
# a commit should not work in asynchronous mode
self.assertRaises(psycopg2.ProgrammingError, self.conn.commit)
self.assertTrue(self.conn.isexecuting())
# but a manual commit should
self.wait(cur)
cur.execute("commit")
self.wait(cur)
2010-03-26 06:03:54 +03:00
cur.execute("select * from table1")
self.wait(cur)
2010-03-26 06:03:54 +03:00
self.assertEquals(cur.fetchall()[0][0], 1)
cur.execute("delete from table1")
self.wait(cur)
2010-03-26 06:03:54 +03:00
cur.execute("select * from table1")
self.wait(cur)
2010-03-26 06:03:54 +03:00
self.assertEquals(cur.fetchone(), None)
def test_set_parameters_while_async(self):
cur = self.conn.cursor()
cur.execute("select 'c'")
self.assertTrue(self.conn.isexecuting())
2010-03-26 06:03:54 +03:00
# getting transaction status works
self.assertEquals(self.conn.get_transaction_status(),
extensions.TRANSACTION_STATUS_ACTIVE)
self.assertTrue(self.conn.isexecuting())
2010-03-26 06:03:54 +03:00
# setting connection encoding should fail
self.assertRaises(psycopg2.ProgrammingError,
self.conn.set_client_encoding, "LATIN1")
2010-03-26 06:03:54 +03:00
# same for transaction isolation
self.assertRaises(psycopg2.ProgrammingError,
self.conn.set_isolation_level, 1)
2010-03-26 06:03:54 +03:00
def test_reset_while_async(self):
cur = self.conn.cursor()
cur.execute("select 'c'")
self.assertTrue(self.conn.isexecuting())
2010-03-26 06:03:54 +03:00
# a reset should fail
self.assertRaises(psycopg2.ProgrammingError, self.conn.reset)
2010-03-26 06:03:54 +03:00
def test_async_iter(self):
cur = self.conn.cursor()
cur.execute("begin")
self.wait(cur)
cur.execute("""
insert into table1 values (1);
insert into table1 values (2);
insert into table1 values (3);
""")
self.wait(cur)
2010-03-26 06:03:54 +03:00
cur.execute("select id from table1 order by id")
# iteration fails if a query is underway
self.assertRaises(psycopg2.ProgrammingError, list, cur)
# but after it's done it should work
self.wait(cur)
2010-03-26 06:03:54 +03:00
self.assertEquals(list(cur), [(1, ), (2, ), (3, )])
self.assertFalse(self.conn.isexecuting())
2010-03-26 06:03:54 +03:00
def test_copy_while_async(self):
cur = self.conn.cursor()
cur.execute("select 'a'")
# copy should fail
self.assertRaises(psycopg2.ProgrammingError,
cur.copy_from,
StringIO.StringIO("1\n3\n5\n\\.\n"), "table1")
2010-03-26 06:03:54 +03:00
def test_lobject_while_async(self):
# large objects should be prohibited
self.assertRaises(psycopg2.ProgrammingError,
self.conn.lobject)
2010-03-26 06:03:54 +03:00
def test_async_executemany(self):
cur = self.conn.cursor()
self.assertRaises(
psycopg2.ProgrammingError,
cur.executemany, "insert into table1 values (%s)", [1, 2, 3])
def test_async_scroll(self):
cur = self.conn.cursor()
cur.execute("""
insert into table1 values (1);
insert into table1 values (2);
insert into table1 values (3);
""")
self.wait(cur)
2010-03-26 06:03:54 +03:00
cur.execute("select id from table1 order by id")
# scroll should fail if a query is underway
self.assertRaises(psycopg2.ProgrammingError, cur.scroll, 1)
self.assertTrue(self.conn.isexecuting())
# but after it's done it should work
self.wait(cur)
2010-03-26 06:03:54 +03:00
cur.scroll(1)
self.assertEquals(cur.fetchall(), [(2, ), (3, )])
cur = self.conn.cursor()
cur.execute("select id from table1 order by id")
self.wait(cur)
2010-03-26 06:03:54 +03:00
cur2 = self.conn.cursor()
self.assertRaises(psycopg2.ProgrammingError, cur2.scroll, 1)
self.assertRaises(psycopg2.ProgrammingError, cur.scroll, 4)
cur = self.conn.cursor()
cur.execute("select id from table1 order by id")
self.wait(cur)
cur.scroll(2)
cur.scroll(-1)
self.assertEquals(cur.fetchall(), [(2, ), (3, )])
def test_scroll(self):
cur = self.sync_conn.cursor()
cur.execute("create table table1 (id int)")
cur.execute("""
insert into table1 values (1);
insert into table1 values (2);
insert into table1 values (3);
""")
cur.execute("select id from table1 order by id")
2010-03-26 06:03:54 +03:00
cur.scroll(2)
cur.scroll(-1)
self.assertEquals(cur.fetchall(), [(2, ), (3, )])
def test_async_dont_read_all(self):
cur = self.conn.cursor()
cur.execute("select repeat('a', 10000); select repeat('b', 10000)")
2010-03-26 06:03:54 +03:00
# fetch the result
self.wait(cur)
2010-03-26 06:03:54 +03:00
# it should be the result of the second query
self.assertEquals(cur.fetchone()[0], "b" * 10000)
2010-03-26 06:03:54 +03:00
def test_async_subclass(self):
class MyConn(psycopg2.extensions.connection):
def __init__(self, dsn, async=0):
psycopg2.extensions.connection.__init__(self, dsn, async=async)
conn = psycopg2.connect(dsn, connection_factory=MyConn, async=True)
self.assert_(isinstance(conn, MyConn))
self.assert_(conn.async)
conn.close()
def test_flush_on_write(self):
# a very large query requires a flush loop to be sent to the backend
curs = self.conn.cursor()
for mb in 1, 5, 10, 20, 50:
size = mb * 1024 * 1024
stub = PollableStub(self.conn)
curs.execute("select %s;", ('x' * size,))
self.wait(stub)
self.assertEqual(size, len(curs.fetchone()[0]))
if stub.polls.count(psycopg2.extensions.POLL_WRITE) > 1:
return
# This is more a testing glitch than an error: it happens
# on high load on linux: probably because the kernel has more
# buffers ready. A warning may be useful during development,
# but an error is bad during regression testing.
import warnings
warnings.warn("sending a large query didn't trigger block on write.")
def test_sync_poll(self):
cur = self.sync_conn.cursor()
cur.execute("select 1")
# polling with a sync query works
cur.connection.poll()
self.assertEquals(cur.fetchone()[0], 1)
def test_notify(self):
cur = self.conn.cursor()
sync_cur = self.sync_conn.cursor()
sync_cur.execute("listen test_notify")
self.sync_conn.commit()
cur.execute("notify test_notify")
self.wait(cur)
self.assertEquals(self.sync_conn.notifies, [])
pid = self.conn.get_backend_pid()
for _ in range(5):
self.wait(self.sync_conn)
if not self.sync_conn.notifies:
time.sleep(0.5)
continue
self.assertEquals(len(self.sync_conn.notifies), 1)
self.assertEquals(self.sync_conn.notifies.pop(),
(pid, "test_notify"))
return
self.fail("No NOTIFY in 2.5 seconds")
def test_async_fetch_wrong_cursor(self):
cur1 = self.conn.cursor()
cur2 = self.conn.cursor()
cur1.execute("select 1")
self.wait(cur1)
self.assertFalse(self.conn.isexecuting())
# fetching from a cursor with no results is an error
self.assertRaises(psycopg2.ProgrammingError, cur2.fetchone)
# fetching from the correct cursor works
self.assertEquals(cur1.fetchone()[0], 1)
def test_error(self):
cur = self.conn.cursor()
cur.execute("insert into table1 values (%s)", (1, ))
self.wait(cur)
cur.execute("insert into table1 values (%s)", (1, ))
# this should fail
self.assertRaises(psycopg2.IntegrityError, self.wait, cur)
cur.execute("insert into table1 values (%s); "
"insert into table1 values (%s)", (2, 2))
# this should fail as well
self.assertRaises(psycopg2.IntegrityError, self.wait, cur)
# but this should work
cur.execute("insert into table1 values (%s)", (2, ))
self.wait(cur)
# and the cursor should be usable afterwards
cur.execute("insert into table1 values (%s)", (3, ))
self.wait(cur)
cur.execute("select * from table1 order by id")
self.wait(cur)
self.assertEquals(cur.fetchall(), [(1, ), (2, ), (3, )])
cur.execute("delete from table1")
self.wait(cur)
def test_error_two_cursors(self):
cur = self.conn.cursor()
cur2 = self.conn.cursor()
cur.execute("select * from no_such_table")
self.assertRaises(psycopg2.ProgrammingError, self.wait, cur)
cur2.execute("select 1")
self.wait(cur2)
self.assertEquals(cur2.fetchone()[0], 1)
def test_notices(self):
del self.conn.notices[:]
cur = self.conn.cursor()
cur.execute("create temp table chatty (id serial primary key);")
self.wait(cur)
self.assertEqual("CREATE TABLE", cur.statusmessage)
self.assert_(self.conn.notices)
def test_async_cursor_gone(self):
cur = self.conn.cursor()
cur.execute("select 42;");
del cur
self.assertRaises(psycopg2.InterfaceError, self.wait, self.conn)
# The connection is still usable
cur = self.conn.cursor()
cur.execute("select 42;");
self.wait(self.conn)
self.assertEqual(cur.fetchone(), (42,))
2010-03-26 06:03:54 +03:00
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)
2010-04-11 03:06:54 +04:00
if __name__ == "__main__":
unittest.main()