From 5ccd977e2b03bf1874fe35c7f364111158bf0cda Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Tue, 21 Jul 2020 22:16:36 +0100 Subject: [PATCH] Cursor tests adapted to CockroachDB Named cursor tests separated to skip all in one go --- tests/test_cursor.py | 454 ++++++++++++++++++++++--------------------- 1 file changed, 231 insertions(+), 223 deletions(-) diff --git a/tests/test_cursor.py b/tests/test_cursor.py index 9bf9ccff..cc783380 100755 --- a/tests/test_cursor.py +++ b/tests/test_cursor.py @@ -36,7 +36,7 @@ from decimal import Decimal from weakref import ref from .testutils import (ConnectingTestCase, skip_before_postgres, skip_if_no_getrefcount, slow, skip_if_no_superuser, - skip_if_windows) + skip_if_windows, skip_if_crdb, crdb_version) import psycopg2.extras from psycopg2.compat import text_type @@ -59,7 +59,7 @@ class CursorTests(ConnectingTestCase): def test_executemany_propagate_exceptions(self): conn = self.conn cur = conn.cursor() - cur.execute("create temp table test_exc (data int);") + cur.execute("create table test_exc (data int);") def buggygen(): yield 1 // 0 @@ -177,9 +177,237 @@ class CursorTests(ConnectingTestCase): curs = self.conn.cursor(None) self.assertEqual(curs.name, None) + def test_description_attribs(self): + curs = self.conn.cursor() + curs.execute("""select + 3.14::decimal(10,2) as pi, + 'hello'::text as hi, + '2010-02-18'::date as now; + """) + self.assertEqual(len(curs.description), 3) + for c in curs.description: + self.assertEqual(len(c), 7) # DBAPI happy + for a in ('name', 'type_code', 'display_size', 'internal_size', + 'precision', 'scale', 'null_ok'): + self.assert_(hasattr(c, a), a) + + c = curs.description[0] + self.assertEqual(c.name, 'pi') + self.assert_(c.type_code in psycopg2.extensions.DECIMAL.values) + if crdb_version(self.conn) is None: + self.assert_(c.internal_size > 0) + self.assertEqual(c.precision, 10) + self.assertEqual(c.scale, 2) + + c = curs.description[1] + self.assertEqual(c.name, 'hi') + self.assert_(c.type_code in psycopg2.STRING.values) + self.assert_(c.internal_size < 0) + self.assertEqual(c.precision, None) + self.assertEqual(c.scale, None) + + c = curs.description[2] + self.assertEqual(c.name, 'now') + self.assert_(c.type_code in psycopg2.extensions.DATE.values) + self.assert_(c.internal_size > 0) + self.assertEqual(c.precision, None) + self.assertEqual(c.scale, None) + + @skip_if_crdb + def test_description_extra_attribs(self): + curs = self.conn.cursor() + curs.execute(""" + create table testcol ( + pi decimal(10,2), + hi text) + """) + curs.execute("select oid from pg_class where relname = %s", ('testcol',)) + oid = curs.fetchone()[0] + + curs.execute("insert into testcol values (3.14, 'hello')") + curs.execute("select hi, pi, 42 from testcol") + self.assertEqual(curs.description[0].table_oid, oid) + self.assertEqual(curs.description[0].table_column, 2) + + self.assertEqual(curs.description[1].table_oid, oid) + self.assertEqual(curs.description[1].table_column, 1) + + self.assertEqual(curs.description[2].table_oid, None) + self.assertEqual(curs.description[2].table_column, None) + + def test_description_slice(self): + curs = self.conn.cursor() + curs.execute("select 1::int4 as a") + self.assertEqual(curs.description[0][0:2], ('a', 23)) + + def test_pickle_description(self): + curs = self.conn.cursor() + curs.execute('SELECT 1 AS foo') + description = curs.description + + pickled = pickle.dumps(description, pickle.HIGHEST_PROTOCOL) + unpickled = pickle.loads(pickled) + + self.assertEqual(description, unpickled) + + def test_bad_subclass(self): + # check that we get an error message instead of a segfault + # for badly written subclasses. + # see https://stackoverflow.com/questions/22019341/ + class StupidCursor(psycopg2.extensions.cursor): + def __init__(self, *args, **kwargs): + # I am stupid so not calling superclass init + pass + + cur = StupidCursor() + self.assertRaises(psycopg2.InterfaceError, cur.execute, 'select 1') + self.assertRaises(psycopg2.InterfaceError, cur.executemany, + 'select 1', []) + + def test_callproc_badparam(self): + cur = self.conn.cursor() + self.assertRaises(TypeError, cur.callproc, 'lower', 42) + + # It would be inappropriate to test callproc's named parameters in the + # DBAPI2.0 test section because they are a psycopg2 extension. + @skip_before_postgres(9, 0) + @skip_if_crdb + def test_callproc_dict(self): + # This parameter name tests for injection and quote escaping + paramname = ''' + Robert'); drop table "students" -- + '''.strip() + escaped_paramname = '"%s"' % paramname.replace('"', '""') + procname = 'pg_temp.randall' + + cur = self.conn.cursor() + + # Set up the temporary function + cur.execute(''' + CREATE FUNCTION %s(%s INT) + RETURNS INT AS + 'SELECT $1 * $1' + LANGUAGE SQL + ''' % (procname, escaped_paramname)) + + # Make sure callproc works right + cur.callproc(procname, {paramname: 2}) + self.assertEquals(cur.fetchone()[0], 4) + + # Make sure callproc fails right + failing_cases = [ + ({paramname: 2, 'foo': 'bar'}, psycopg2.ProgrammingError), + ({paramname: '2'}, psycopg2.ProgrammingError), + ({paramname: 'two'}, psycopg2.ProgrammingError), + ({u'bj\xc3rn': 2}, psycopg2.ProgrammingError), + ({3: 2}, TypeError), + ({self: 2}, TypeError), + ] + for parameter_sequence, exception in failing_cases: + self.assertRaises(exception, cur.callproc, procname, parameter_sequence) + self.conn.rollback() + + @skip_if_no_superuser + @skip_if_windows + @skip_if_crdb + @skip_before_postgres(8, 4) + def test_external_close_sync(self): + # If a "victim" connection is closed by a "control" connection + # behind psycopg2's back, psycopg2 always handles it correctly: + # raise OperationalError, set conn.closed to 2. This reproduces + # issue #443, a race between control_conn closing victim_conn and + # psycopg2 noticing. + control_conn = self.conn + connect_func = self.connect + + def wait_func(conn): + pass + + self._test_external_close(control_conn, connect_func, wait_func) + + @skip_if_no_superuser + @skip_if_windows + @skip_if_crdb + @skip_before_postgres(8, 4) + def test_external_close_async(self): + # Issue #443 is in the async code too. Since the fix is duplicated, + # so is the test. + control_conn = self.conn + + def connect_func(): + return self.connect(async_=True) + + wait_func = psycopg2.extras.wait_select + self._test_external_close(control_conn, connect_func, wait_func) + + def _test_external_close(self, control_conn, connect_func, wait_func): + # The short sleep before using victim_conn the second time makes it + # much more likely to lose the race and see the bug. Repeating the + # test several times makes it even more likely. + for i in range(10): + victim_conn = connect_func() + wait_func(victim_conn) + + with victim_conn.cursor() as cur: + cur.execute('select pg_backend_pid()') + wait_func(victim_conn) + pid1 = cur.fetchall()[0][0] + + with control_conn.cursor() as cur: + cur.execute('select pg_terminate_backend(%s)', (pid1,)) + + time.sleep(0.001) + + def f(): + with victim_conn.cursor() as cur: + cur.execute('select 1') + wait_func(victim_conn) + + self.assertRaises(psycopg2.OperationalError, f) + + self.assertEqual(victim_conn.closed, 2) + + @skip_before_postgres(8, 2) + def test_rowcount_on_executemany_returning(self): + cur = self.conn.cursor() + cur.execute("create table execmany(id serial primary key, data int)") + cur.executemany( + "insert into execmany (data) values (%s)", + [(i,) for i in range(4)]) + self.assertEqual(cur.rowcount, 4) + + cur.executemany( + "insert into execmany (data) values (%s) returning data", + [(i,) for i in range(5)]) + self.assertEqual(cur.rowcount, 5) + + @skip_before_postgres(9) + def test_pgresult_ptr(self): + curs = self.conn.cursor() + self.assert_(curs.pgresult_ptr is None) + + curs.execute("select 'x'") + self.assert_(curs.pgresult_ptr is not None) + + try: + f = self.libpq.PQcmdStatus + except AttributeError: + pass + else: + f.argtypes = [ctypes.c_void_p] + f.restype = ctypes.c_char_p + status = f(curs.pgresult_ptr) + self.assertEqual(status, b'SELECT 1') + + curs.close() + self.assert_(curs.pgresult_ptr is None) + + +@skip_if_crdb +class NamedCursorTests(ConnectingTestCase): def test_invalid_name(self): curs = self.conn.cursor() - curs.execute("create temp table invname (data int);") + curs.execute("create table invname (data int);") for i in (10, 20, 30): curs.execute("insert into invname values (%s)", (i,)) curs.close() @@ -377,77 +605,6 @@ class CursorTests(ConnectingTestCase): for i, rec in enumerate(curs): self.assertEqual(i + 1, curs.rownumber) - def test_description_attribs(self): - curs = self.conn.cursor() - curs.execute("""select - 3.14::decimal(10,2) as pi, - 'hello'::text as hi, - '2010-02-18'::date as now; - """) - self.assertEqual(len(curs.description), 3) - for c in curs.description: - self.assertEqual(len(c), 7) # DBAPI happy - for a in ('name', 'type_code', 'display_size', 'internal_size', - 'precision', 'scale', 'null_ok'): - self.assert_(hasattr(c, a), a) - - c = curs.description[0] - self.assertEqual(c.name, 'pi') - self.assert_(c.type_code in psycopg2.extensions.DECIMAL.values) - self.assert_(c.internal_size > 0) - self.assertEqual(c.precision, 10) - self.assertEqual(c.scale, 2) - - c = curs.description[1] - self.assertEqual(c.name, 'hi') - self.assert_(c.type_code in psycopg2.STRING.values) - self.assert_(c.internal_size < 0) - self.assertEqual(c.precision, None) - self.assertEqual(c.scale, None) - - c = curs.description[2] - self.assertEqual(c.name, 'now') - self.assert_(c.type_code in psycopg2.extensions.DATE.values) - self.assert_(c.internal_size > 0) - self.assertEqual(c.precision, None) - self.assertEqual(c.scale, None) - - def test_description_extra_attribs(self): - curs = self.conn.cursor() - curs.execute(""" - create table testcol ( - pi decimal(10,2), - hi text) - """) - curs.execute("select oid from pg_class where relname = %s", ('testcol',)) - oid = curs.fetchone()[0] - - curs.execute("insert into testcol values (3.14, 'hello')") - curs.execute("select hi, pi, 42 from testcol") - self.assertEqual(curs.description[0].table_oid, oid) - self.assertEqual(curs.description[0].table_column, 2) - - self.assertEqual(curs.description[1].table_oid, oid) - self.assertEqual(curs.description[1].table_column, 1) - - self.assertEqual(curs.description[2].table_oid, None) - self.assertEqual(curs.description[2].table_column, None) - - def test_description_slice(self): - curs = self.conn.cursor() - curs.execute("select 1::int as a") - self.assertEqual(curs.description[0][0:2], ('a', 23)) - - def test_pickle_description(self): - curs = self.conn.cursor() - curs.execute('SELECT 1 AS foo') - description = curs.description - - pickled = pickle.dumps(description, pickle.HIGHEST_PROTOCOL) - unpickled = pickle.loads(pickled) - - self.assertEqual(description, unpickled) - @skip_before_postgres(8, 0) def test_named_cursor_stealing(self): # you can use a named cursor to iterate on a refcursor created @@ -527,155 +684,6 @@ class CursorTests(ConnectingTestCase): cur.scroll(9, mode='absolute') self.assertEqual(cur.fetchone(), (9,)) - def test_bad_subclass(self): - # check that we get an error message instead of a segfault - # for badly written subclasses. - # see https://stackoverflow.com/questions/22019341/ - class StupidCursor(psycopg2.extensions.cursor): - def __init__(self, *args, **kwargs): - # I am stupid so not calling superclass init - pass - - cur = StupidCursor() - self.assertRaises(psycopg2.InterfaceError, cur.execute, 'select 1') - self.assertRaises(psycopg2.InterfaceError, cur.executemany, - 'select 1', []) - - def test_callproc_badparam(self): - cur = self.conn.cursor() - self.assertRaises(TypeError, cur.callproc, 'lower', 42) - - # It would be inappropriate to test callproc's named parameters in the - # DBAPI2.0 test section because they are a psycopg2 extension. - @skip_before_postgres(9, 0) - def test_callproc_dict(self): - # This parameter name tests for injection and quote escaping - paramname = ''' - Robert'); drop table "students" -- - '''.strip() - escaped_paramname = '"%s"' % paramname.replace('"', '""') - procname = 'pg_temp.randall' - - cur = self.conn.cursor() - - # Set up the temporary function - cur.execute(''' - CREATE FUNCTION %s(%s INT) - RETURNS INT AS - 'SELECT $1 * $1' - LANGUAGE SQL - ''' % (procname, escaped_paramname)) - - # Make sure callproc works right - cur.callproc(procname, {paramname: 2}) - self.assertEquals(cur.fetchone()[0], 4) - - # Make sure callproc fails right - failing_cases = [ - ({paramname: 2, 'foo': 'bar'}, psycopg2.ProgrammingError), - ({paramname: '2'}, psycopg2.ProgrammingError), - ({paramname: 'two'}, psycopg2.ProgrammingError), - ({u'bj\xc3rn': 2}, psycopg2.ProgrammingError), - ({3: 2}, TypeError), - ({self: 2}, TypeError), - ] - for parameter_sequence, exception in failing_cases: - self.assertRaises(exception, cur.callproc, procname, parameter_sequence) - self.conn.rollback() - - @skip_if_no_superuser - @skip_if_windows - @skip_before_postgres(8, 4) - def test_external_close_sync(self): - # If a "victim" connection is closed by a "control" connection - # behind psycopg2's back, psycopg2 always handles it correctly: - # raise OperationalError, set conn.closed to 2. This reproduces - # issue #443, a race between control_conn closing victim_conn and - # psycopg2 noticing. - control_conn = self.conn - connect_func = self.connect - - def wait_func(conn): - pass - - self._test_external_close(control_conn, connect_func, wait_func) - - @skip_if_no_superuser - @skip_if_windows - @skip_before_postgres(8, 4) - def test_external_close_async(self): - # Issue #443 is in the async code too. Since the fix is duplicated, - # so is the test. - control_conn = self.conn - - def connect_func(): - return self.connect(async_=True) - - wait_func = psycopg2.extras.wait_select - self._test_external_close(control_conn, connect_func, wait_func) - - def _test_external_close(self, control_conn, connect_func, wait_func): - # The short sleep before using victim_conn the second time makes it - # much more likely to lose the race and see the bug. Repeating the - # test several times makes it even more likely. - for i in range(10): - victim_conn = connect_func() - wait_func(victim_conn) - - with victim_conn.cursor() as cur: - cur.execute('select pg_backend_pid()') - wait_func(victim_conn) - pid1 = cur.fetchall()[0][0] - - with control_conn.cursor() as cur: - cur.execute('select pg_terminate_backend(%s)', (pid1,)) - - time.sleep(0.001) - - def f(): - with victim_conn.cursor() as cur: - cur.execute('select 1') - wait_func(victim_conn) - - self.assertRaises(psycopg2.OperationalError, f) - - self.assertEqual(victim_conn.closed, 2) - - @skip_before_postgres(8, 2) - def test_rowcount_on_executemany_returning(self): - cur = self.conn.cursor() - cur.execute("create table execmany(id serial primary key, data int)") - cur.executemany( - "insert into execmany (data) values (%s)", - [(i,) for i in range(4)]) - self.assertEqual(cur.rowcount, 4) - - cur.executemany( - "insert into execmany (data) values (%s) returning data", - [(i,) for i in range(5)]) - self.assertEqual(cur.rowcount, 5) - - @skip_before_postgres(9) - def test_pgresult_ptr(self): - curs = self.conn.cursor() - self.assert_(curs.pgresult_ptr is None) - - curs.execute("select 'x'") - self.assert_(curs.pgresult_ptr is not None) - - try: - f = self.libpq.PQcmdStatus - except AttributeError: - pass - else: - f.argtypes = [ctypes.c_void_p] - f.restype = ctypes.c_char_p - status = f(curs.pgresult_ptr) - self.assertEqual(status, b'SELECT 1') - - curs.close() - self.assert_(curs.pgresult_ptr is None) - def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__)