mirror of
https://github.com/psycopg/psycopg2.git
synced 2024-11-29 04:13:43 +03:00
Cursor tests adapted to CockroachDB
Named cursor tests separated to skip all in one go
This commit is contained in:
parent
9380f2a721
commit
5ccd977e2b
|
@ -36,7 +36,7 @@ from decimal import Decimal
|
||||||
from weakref import ref
|
from weakref import ref
|
||||||
from .testutils import (ConnectingTestCase, skip_before_postgres,
|
from .testutils import (ConnectingTestCase, skip_before_postgres,
|
||||||
skip_if_no_getrefcount, slow, skip_if_no_superuser,
|
skip_if_no_getrefcount, slow, skip_if_no_superuser,
|
||||||
skip_if_windows)
|
skip_if_windows, skip_if_crdb, crdb_version)
|
||||||
|
|
||||||
import psycopg2.extras
|
import psycopg2.extras
|
||||||
from psycopg2.compat import text_type
|
from psycopg2.compat import text_type
|
||||||
|
@ -59,7 +59,7 @@ class CursorTests(ConnectingTestCase):
|
||||||
def test_executemany_propagate_exceptions(self):
|
def test_executemany_propagate_exceptions(self):
|
||||||
conn = self.conn
|
conn = self.conn
|
||||||
cur = conn.cursor()
|
cur = conn.cursor()
|
||||||
cur.execute("create temp table test_exc (data int);")
|
cur.execute("create table test_exc (data int);")
|
||||||
|
|
||||||
def buggygen():
|
def buggygen():
|
||||||
yield 1 // 0
|
yield 1 // 0
|
||||||
|
@ -177,9 +177,237 @@ class CursorTests(ConnectingTestCase):
|
||||||
curs = self.conn.cursor(None)
|
curs = self.conn.cursor(None)
|
||||||
self.assertEqual(curs.name, None)
|
self.assertEqual(curs.name, None)
|
||||||
|
|
||||||
|
def test_description_attribs(self):
|
||||||
|
curs = self.conn.cursor()
|
||||||
|
curs.execute("""select
|
||||||
|
3.14::decimal(10,2) as pi,
|
||||||
|
'hello'::text as hi,
|
||||||
|
'2010-02-18'::date as now;
|
||||||
|
""")
|
||||||
|
self.assertEqual(len(curs.description), 3)
|
||||||
|
for c in curs.description:
|
||||||
|
self.assertEqual(len(c), 7) # DBAPI happy
|
||||||
|
for a in ('name', 'type_code', 'display_size', 'internal_size',
|
||||||
|
'precision', 'scale', 'null_ok'):
|
||||||
|
self.assert_(hasattr(c, a), a)
|
||||||
|
|
||||||
|
c = curs.description[0]
|
||||||
|
self.assertEqual(c.name, 'pi')
|
||||||
|
self.assert_(c.type_code in psycopg2.extensions.DECIMAL.values)
|
||||||
|
if crdb_version(self.conn) is None:
|
||||||
|
self.assert_(c.internal_size > 0)
|
||||||
|
self.assertEqual(c.precision, 10)
|
||||||
|
self.assertEqual(c.scale, 2)
|
||||||
|
|
||||||
|
c = curs.description[1]
|
||||||
|
self.assertEqual(c.name, 'hi')
|
||||||
|
self.assert_(c.type_code in psycopg2.STRING.values)
|
||||||
|
self.assert_(c.internal_size < 0)
|
||||||
|
self.assertEqual(c.precision, None)
|
||||||
|
self.assertEqual(c.scale, None)
|
||||||
|
|
||||||
|
c = curs.description[2]
|
||||||
|
self.assertEqual(c.name, 'now')
|
||||||
|
self.assert_(c.type_code in psycopg2.extensions.DATE.values)
|
||||||
|
self.assert_(c.internal_size > 0)
|
||||||
|
self.assertEqual(c.precision, None)
|
||||||
|
self.assertEqual(c.scale, None)
|
||||||
|
|
||||||
|
@skip_if_crdb
|
||||||
|
def test_description_extra_attribs(self):
|
||||||
|
curs = self.conn.cursor()
|
||||||
|
curs.execute("""
|
||||||
|
create table testcol (
|
||||||
|
pi decimal(10,2),
|
||||||
|
hi text)
|
||||||
|
""")
|
||||||
|
curs.execute("select oid from pg_class where relname = %s", ('testcol',))
|
||||||
|
oid = curs.fetchone()[0]
|
||||||
|
|
||||||
|
curs.execute("insert into testcol values (3.14, 'hello')")
|
||||||
|
curs.execute("select hi, pi, 42 from testcol")
|
||||||
|
self.assertEqual(curs.description[0].table_oid, oid)
|
||||||
|
self.assertEqual(curs.description[0].table_column, 2)
|
||||||
|
|
||||||
|
self.assertEqual(curs.description[1].table_oid, oid)
|
||||||
|
self.assertEqual(curs.description[1].table_column, 1)
|
||||||
|
|
||||||
|
self.assertEqual(curs.description[2].table_oid, None)
|
||||||
|
self.assertEqual(curs.description[2].table_column, None)
|
||||||
|
|
||||||
|
def test_description_slice(self):
|
||||||
|
curs = self.conn.cursor()
|
||||||
|
curs.execute("select 1::int4 as a")
|
||||||
|
self.assertEqual(curs.description[0][0:2], ('a', 23))
|
||||||
|
|
||||||
|
def test_pickle_description(self):
|
||||||
|
curs = self.conn.cursor()
|
||||||
|
curs.execute('SELECT 1 AS foo')
|
||||||
|
description = curs.description
|
||||||
|
|
||||||
|
pickled = pickle.dumps(description, pickle.HIGHEST_PROTOCOL)
|
||||||
|
unpickled = pickle.loads(pickled)
|
||||||
|
|
||||||
|
self.assertEqual(description, unpickled)
|
||||||
|
|
||||||
|
def test_bad_subclass(self):
|
||||||
|
# check that we get an error message instead of a segfault
|
||||||
|
# for badly written subclasses.
|
||||||
|
# see https://stackoverflow.com/questions/22019341/
|
||||||
|
class StupidCursor(psycopg2.extensions.cursor):
|
||||||
|
def __init__(self, *args, **kwargs):
|
||||||
|
# I am stupid so not calling superclass init
|
||||||
|
pass
|
||||||
|
|
||||||
|
cur = StupidCursor()
|
||||||
|
self.assertRaises(psycopg2.InterfaceError, cur.execute, 'select 1')
|
||||||
|
self.assertRaises(psycopg2.InterfaceError, cur.executemany,
|
||||||
|
'select 1', [])
|
||||||
|
|
||||||
|
def test_callproc_badparam(self):
|
||||||
|
cur = self.conn.cursor()
|
||||||
|
self.assertRaises(TypeError, cur.callproc, 'lower', 42)
|
||||||
|
|
||||||
|
# It would be inappropriate to test callproc's named parameters in the
|
||||||
|
# DBAPI2.0 test section because they are a psycopg2 extension.
|
||||||
|
@skip_before_postgres(9, 0)
|
||||||
|
@skip_if_crdb
|
||||||
|
def test_callproc_dict(self):
|
||||||
|
# This parameter name tests for injection and quote escaping
|
||||||
|
paramname = '''
|
||||||
|
Robert'); drop table "students" --
|
||||||
|
'''.strip()
|
||||||
|
escaped_paramname = '"%s"' % paramname.replace('"', '""')
|
||||||
|
procname = 'pg_temp.randall'
|
||||||
|
|
||||||
|
cur = self.conn.cursor()
|
||||||
|
|
||||||
|
# Set up the temporary function
|
||||||
|
cur.execute('''
|
||||||
|
CREATE FUNCTION %s(%s INT)
|
||||||
|
RETURNS INT AS
|
||||||
|
'SELECT $1 * $1'
|
||||||
|
LANGUAGE SQL
|
||||||
|
''' % (procname, escaped_paramname))
|
||||||
|
|
||||||
|
# Make sure callproc works right
|
||||||
|
cur.callproc(procname, {paramname: 2})
|
||||||
|
self.assertEquals(cur.fetchone()[0], 4)
|
||||||
|
|
||||||
|
# Make sure callproc fails right
|
||||||
|
failing_cases = [
|
||||||
|
({paramname: 2, 'foo': 'bar'}, psycopg2.ProgrammingError),
|
||||||
|
({paramname: '2'}, psycopg2.ProgrammingError),
|
||||||
|
({paramname: 'two'}, psycopg2.ProgrammingError),
|
||||||
|
({u'bj\xc3rn': 2}, psycopg2.ProgrammingError),
|
||||||
|
({3: 2}, TypeError),
|
||||||
|
({self: 2}, TypeError),
|
||||||
|
]
|
||||||
|
for parameter_sequence, exception in failing_cases:
|
||||||
|
self.assertRaises(exception, cur.callproc, procname, parameter_sequence)
|
||||||
|
self.conn.rollback()
|
||||||
|
|
||||||
|
@skip_if_no_superuser
|
||||||
|
@skip_if_windows
|
||||||
|
@skip_if_crdb
|
||||||
|
@skip_before_postgres(8, 4)
|
||||||
|
def test_external_close_sync(self):
|
||||||
|
# If a "victim" connection is closed by a "control" connection
|
||||||
|
# behind psycopg2's back, psycopg2 always handles it correctly:
|
||||||
|
# raise OperationalError, set conn.closed to 2. This reproduces
|
||||||
|
# issue #443, a race between control_conn closing victim_conn and
|
||||||
|
# psycopg2 noticing.
|
||||||
|
control_conn = self.conn
|
||||||
|
connect_func = self.connect
|
||||||
|
|
||||||
|
def wait_func(conn):
|
||||||
|
pass
|
||||||
|
|
||||||
|
self._test_external_close(control_conn, connect_func, wait_func)
|
||||||
|
|
||||||
|
@skip_if_no_superuser
|
||||||
|
@skip_if_windows
|
||||||
|
@skip_if_crdb
|
||||||
|
@skip_before_postgres(8, 4)
|
||||||
|
def test_external_close_async(self):
|
||||||
|
# Issue #443 is in the async code too. Since the fix is duplicated,
|
||||||
|
# so is the test.
|
||||||
|
control_conn = self.conn
|
||||||
|
|
||||||
|
def connect_func():
|
||||||
|
return self.connect(async_=True)
|
||||||
|
|
||||||
|
wait_func = psycopg2.extras.wait_select
|
||||||
|
self._test_external_close(control_conn, connect_func, wait_func)
|
||||||
|
|
||||||
|
def _test_external_close(self, control_conn, connect_func, wait_func):
|
||||||
|
# The short sleep before using victim_conn the second time makes it
|
||||||
|
# much more likely to lose the race and see the bug. Repeating the
|
||||||
|
# test several times makes it even more likely.
|
||||||
|
for i in range(10):
|
||||||
|
victim_conn = connect_func()
|
||||||
|
wait_func(victim_conn)
|
||||||
|
|
||||||
|
with victim_conn.cursor() as cur:
|
||||||
|
cur.execute('select pg_backend_pid()')
|
||||||
|
wait_func(victim_conn)
|
||||||
|
pid1 = cur.fetchall()[0][0]
|
||||||
|
|
||||||
|
with control_conn.cursor() as cur:
|
||||||
|
cur.execute('select pg_terminate_backend(%s)', (pid1,))
|
||||||
|
|
||||||
|
time.sleep(0.001)
|
||||||
|
|
||||||
|
def f():
|
||||||
|
with victim_conn.cursor() as cur:
|
||||||
|
cur.execute('select 1')
|
||||||
|
wait_func(victim_conn)
|
||||||
|
|
||||||
|
self.assertRaises(psycopg2.OperationalError, f)
|
||||||
|
|
||||||
|
self.assertEqual(victim_conn.closed, 2)
|
||||||
|
|
||||||
|
@skip_before_postgres(8, 2)
|
||||||
|
def test_rowcount_on_executemany_returning(self):
|
||||||
|
cur = self.conn.cursor()
|
||||||
|
cur.execute("create table execmany(id serial primary key, data int)")
|
||||||
|
cur.executemany(
|
||||||
|
"insert into execmany (data) values (%s)",
|
||||||
|
[(i,) for i in range(4)])
|
||||||
|
self.assertEqual(cur.rowcount, 4)
|
||||||
|
|
||||||
|
cur.executemany(
|
||||||
|
"insert into execmany (data) values (%s) returning data",
|
||||||
|
[(i,) for i in range(5)])
|
||||||
|
self.assertEqual(cur.rowcount, 5)
|
||||||
|
|
||||||
|
@skip_before_postgres(9)
|
||||||
|
def test_pgresult_ptr(self):
|
||||||
|
curs = self.conn.cursor()
|
||||||
|
self.assert_(curs.pgresult_ptr is None)
|
||||||
|
|
||||||
|
curs.execute("select 'x'")
|
||||||
|
self.assert_(curs.pgresult_ptr is not None)
|
||||||
|
|
||||||
|
try:
|
||||||
|
f = self.libpq.PQcmdStatus
|
||||||
|
except AttributeError:
|
||||||
|
pass
|
||||||
|
else:
|
||||||
|
f.argtypes = [ctypes.c_void_p]
|
||||||
|
f.restype = ctypes.c_char_p
|
||||||
|
status = f(curs.pgresult_ptr)
|
||||||
|
self.assertEqual(status, b'SELECT 1')
|
||||||
|
|
||||||
|
curs.close()
|
||||||
|
self.assert_(curs.pgresult_ptr is None)
|
||||||
|
|
||||||
|
|
||||||
|
@skip_if_crdb
|
||||||
|
class NamedCursorTests(ConnectingTestCase):
|
||||||
def test_invalid_name(self):
|
def test_invalid_name(self):
|
||||||
curs = self.conn.cursor()
|
curs = self.conn.cursor()
|
||||||
curs.execute("create temp table invname (data int);")
|
curs.execute("create table invname (data int);")
|
||||||
for i in (10, 20, 30):
|
for i in (10, 20, 30):
|
||||||
curs.execute("insert into invname values (%s)", (i,))
|
curs.execute("insert into invname values (%s)", (i,))
|
||||||
curs.close()
|
curs.close()
|
||||||
|
@ -377,77 +605,6 @@ class CursorTests(ConnectingTestCase):
|
||||||
for i, rec in enumerate(curs):
|
for i, rec in enumerate(curs):
|
||||||
self.assertEqual(i + 1, curs.rownumber)
|
self.assertEqual(i + 1, curs.rownumber)
|
||||||
|
|
||||||
def test_description_attribs(self):
|
|
||||||
curs = self.conn.cursor()
|
|
||||||
curs.execute("""select
|
|
||||||
3.14::decimal(10,2) as pi,
|
|
||||||
'hello'::text as hi,
|
|
||||||
'2010-02-18'::date as now;
|
|
||||||
""")
|
|
||||||
self.assertEqual(len(curs.description), 3)
|
|
||||||
for c in curs.description:
|
|
||||||
self.assertEqual(len(c), 7) # DBAPI happy
|
|
||||||
for a in ('name', 'type_code', 'display_size', 'internal_size',
|
|
||||||
'precision', 'scale', 'null_ok'):
|
|
||||||
self.assert_(hasattr(c, a), a)
|
|
||||||
|
|
||||||
c = curs.description[0]
|
|
||||||
self.assertEqual(c.name, 'pi')
|
|
||||||
self.assert_(c.type_code in psycopg2.extensions.DECIMAL.values)
|
|
||||||
self.assert_(c.internal_size > 0)
|
|
||||||
self.assertEqual(c.precision, 10)
|
|
||||||
self.assertEqual(c.scale, 2)
|
|
||||||
|
|
||||||
c = curs.description[1]
|
|
||||||
self.assertEqual(c.name, 'hi')
|
|
||||||
self.assert_(c.type_code in psycopg2.STRING.values)
|
|
||||||
self.assert_(c.internal_size < 0)
|
|
||||||
self.assertEqual(c.precision, None)
|
|
||||||
self.assertEqual(c.scale, None)
|
|
||||||
|
|
||||||
c = curs.description[2]
|
|
||||||
self.assertEqual(c.name, 'now')
|
|
||||||
self.assert_(c.type_code in psycopg2.extensions.DATE.values)
|
|
||||||
self.assert_(c.internal_size > 0)
|
|
||||||
self.assertEqual(c.precision, None)
|
|
||||||
self.assertEqual(c.scale, None)
|
|
||||||
|
|
||||||
def test_description_extra_attribs(self):
|
|
||||||
curs = self.conn.cursor()
|
|
||||||
curs.execute("""
|
|
||||||
create table testcol (
|
|
||||||
pi decimal(10,2),
|
|
||||||
hi text)
|
|
||||||
""")
|
|
||||||
curs.execute("select oid from pg_class where relname = %s", ('testcol',))
|
|
||||||
oid = curs.fetchone()[0]
|
|
||||||
|
|
||||||
curs.execute("insert into testcol values (3.14, 'hello')")
|
|
||||||
curs.execute("select hi, pi, 42 from testcol")
|
|
||||||
self.assertEqual(curs.description[0].table_oid, oid)
|
|
||||||
self.assertEqual(curs.description[0].table_column, 2)
|
|
||||||
|
|
||||||
self.assertEqual(curs.description[1].table_oid, oid)
|
|
||||||
self.assertEqual(curs.description[1].table_column, 1)
|
|
||||||
|
|
||||||
self.assertEqual(curs.description[2].table_oid, None)
|
|
||||||
self.assertEqual(curs.description[2].table_column, None)
|
|
||||||
|
|
||||||
def test_description_slice(self):
|
|
||||||
curs = self.conn.cursor()
|
|
||||||
curs.execute("select 1::int as a")
|
|
||||||
self.assertEqual(curs.description[0][0:2], ('a', 23))
|
|
||||||
|
|
||||||
def test_pickle_description(self):
|
|
||||||
curs = self.conn.cursor()
|
|
||||||
curs.execute('SELECT 1 AS foo')
|
|
||||||
description = curs.description
|
|
||||||
|
|
||||||
pickled = pickle.dumps(description, pickle.HIGHEST_PROTOCOL)
|
|
||||||
unpickled = pickle.loads(pickled)
|
|
||||||
|
|
||||||
self.assertEqual(description, unpickled)
|
|
||||||
|
|
||||||
@skip_before_postgres(8, 0)
|
@skip_before_postgres(8, 0)
|
||||||
def test_named_cursor_stealing(self):
|
def test_named_cursor_stealing(self):
|
||||||
# you can use a named cursor to iterate on a refcursor created
|
# you can use a named cursor to iterate on a refcursor created
|
||||||
|
@ -527,155 +684,6 @@ class CursorTests(ConnectingTestCase):
|
||||||
cur.scroll(9, mode='absolute')
|
cur.scroll(9, mode='absolute')
|
||||||
self.assertEqual(cur.fetchone(), (9,))
|
self.assertEqual(cur.fetchone(), (9,))
|
||||||
|
|
||||||
def test_bad_subclass(self):
|
|
||||||
# check that we get an error message instead of a segfault
|
|
||||||
# for badly written subclasses.
|
|
||||||
# see https://stackoverflow.com/questions/22019341/
|
|
||||||
class StupidCursor(psycopg2.extensions.cursor):
|
|
||||||
def __init__(self, *args, **kwargs):
|
|
||||||
# I am stupid so not calling superclass init
|
|
||||||
pass
|
|
||||||
|
|
||||||
cur = StupidCursor()
|
|
||||||
self.assertRaises(psycopg2.InterfaceError, cur.execute, 'select 1')
|
|
||||||
self.assertRaises(psycopg2.InterfaceError, cur.executemany,
|
|
||||||
'select 1', [])
|
|
||||||
|
|
||||||
def test_callproc_badparam(self):
|
|
||||||
cur = self.conn.cursor()
|
|
||||||
self.assertRaises(TypeError, cur.callproc, 'lower', 42)
|
|
||||||
|
|
||||||
# It would be inappropriate to test callproc's named parameters in the
|
|
||||||
# DBAPI2.0 test section because they are a psycopg2 extension.
|
|
||||||
@skip_before_postgres(9, 0)
|
|
||||||
def test_callproc_dict(self):
|
|
||||||
# This parameter name tests for injection and quote escaping
|
|
||||||
paramname = '''
|
|
||||||
Robert'); drop table "students" --
|
|
||||||
'''.strip()
|
|
||||||
escaped_paramname = '"%s"' % paramname.replace('"', '""')
|
|
||||||
procname = 'pg_temp.randall'
|
|
||||||
|
|
||||||
cur = self.conn.cursor()
|
|
||||||
|
|
||||||
# Set up the temporary function
|
|
||||||
cur.execute('''
|
|
||||||
CREATE FUNCTION %s(%s INT)
|
|
||||||
RETURNS INT AS
|
|
||||||
'SELECT $1 * $1'
|
|
||||||
LANGUAGE SQL
|
|
||||||
''' % (procname, escaped_paramname))
|
|
||||||
|
|
||||||
# Make sure callproc works right
|
|
||||||
cur.callproc(procname, {paramname: 2})
|
|
||||||
self.assertEquals(cur.fetchone()[0], 4)
|
|
||||||
|
|
||||||
# Make sure callproc fails right
|
|
||||||
failing_cases = [
|
|
||||||
({paramname: 2, 'foo': 'bar'}, psycopg2.ProgrammingError),
|
|
||||||
({paramname: '2'}, psycopg2.ProgrammingError),
|
|
||||||
({paramname: 'two'}, psycopg2.ProgrammingError),
|
|
||||||
({u'bj\xc3rn': 2}, psycopg2.ProgrammingError),
|
|
||||||
({3: 2}, TypeError),
|
|
||||||
({self: 2}, TypeError),
|
|
||||||
]
|
|
||||||
for parameter_sequence, exception in failing_cases:
|
|
||||||
self.assertRaises(exception, cur.callproc, procname, parameter_sequence)
|
|
||||||
self.conn.rollback()
|
|
||||||
|
|
||||||
@skip_if_no_superuser
|
|
||||||
@skip_if_windows
|
|
||||||
@skip_before_postgres(8, 4)
|
|
||||||
def test_external_close_sync(self):
|
|
||||||
# If a "victim" connection is closed by a "control" connection
|
|
||||||
# behind psycopg2's back, psycopg2 always handles it correctly:
|
|
||||||
# raise OperationalError, set conn.closed to 2. This reproduces
|
|
||||||
# issue #443, a race between control_conn closing victim_conn and
|
|
||||||
# psycopg2 noticing.
|
|
||||||
control_conn = self.conn
|
|
||||||
connect_func = self.connect
|
|
||||||
|
|
||||||
def wait_func(conn):
|
|
||||||
pass
|
|
||||||
|
|
||||||
self._test_external_close(control_conn, connect_func, wait_func)
|
|
||||||
|
|
||||||
@skip_if_no_superuser
|
|
||||||
@skip_if_windows
|
|
||||||
@skip_before_postgres(8, 4)
|
|
||||||
def test_external_close_async(self):
|
|
||||||
# Issue #443 is in the async code too. Since the fix is duplicated,
|
|
||||||
# so is the test.
|
|
||||||
control_conn = self.conn
|
|
||||||
|
|
||||||
def connect_func():
|
|
||||||
return self.connect(async_=True)
|
|
||||||
|
|
||||||
wait_func = psycopg2.extras.wait_select
|
|
||||||
self._test_external_close(control_conn, connect_func, wait_func)
|
|
||||||
|
|
||||||
def _test_external_close(self, control_conn, connect_func, wait_func):
|
|
||||||
# The short sleep before using victim_conn the second time makes it
|
|
||||||
# much more likely to lose the race and see the bug. Repeating the
|
|
||||||
# test several times makes it even more likely.
|
|
||||||
for i in range(10):
|
|
||||||
victim_conn = connect_func()
|
|
||||||
wait_func(victim_conn)
|
|
||||||
|
|
||||||
with victim_conn.cursor() as cur:
|
|
||||||
cur.execute('select pg_backend_pid()')
|
|
||||||
wait_func(victim_conn)
|
|
||||||
pid1 = cur.fetchall()[0][0]
|
|
||||||
|
|
||||||
with control_conn.cursor() as cur:
|
|
||||||
cur.execute('select pg_terminate_backend(%s)', (pid1,))
|
|
||||||
|
|
||||||
time.sleep(0.001)
|
|
||||||
|
|
||||||
def f():
|
|
||||||
with victim_conn.cursor() as cur:
|
|
||||||
cur.execute('select 1')
|
|
||||||
wait_func(victim_conn)
|
|
||||||
|
|
||||||
self.assertRaises(psycopg2.OperationalError, f)
|
|
||||||
|
|
||||||
self.assertEqual(victim_conn.closed, 2)
|
|
||||||
|
|
||||||
@skip_before_postgres(8, 2)
|
|
||||||
def test_rowcount_on_executemany_returning(self):
|
|
||||||
cur = self.conn.cursor()
|
|
||||||
cur.execute("create table execmany(id serial primary key, data int)")
|
|
||||||
cur.executemany(
|
|
||||||
"insert into execmany (data) values (%s)",
|
|
||||||
[(i,) for i in range(4)])
|
|
||||||
self.assertEqual(cur.rowcount, 4)
|
|
||||||
|
|
||||||
cur.executemany(
|
|
||||||
"insert into execmany (data) values (%s) returning data",
|
|
||||||
[(i,) for i in range(5)])
|
|
||||||
self.assertEqual(cur.rowcount, 5)
|
|
||||||
|
|
||||||
@skip_before_postgres(9)
|
|
||||||
def test_pgresult_ptr(self):
|
|
||||||
curs = self.conn.cursor()
|
|
||||||
self.assert_(curs.pgresult_ptr is None)
|
|
||||||
|
|
||||||
curs.execute("select 'x'")
|
|
||||||
self.assert_(curs.pgresult_ptr is not None)
|
|
||||||
|
|
||||||
try:
|
|
||||||
f = self.libpq.PQcmdStatus
|
|
||||||
except AttributeError:
|
|
||||||
pass
|
|
||||||
else:
|
|
||||||
f.argtypes = [ctypes.c_void_p]
|
|
||||||
f.restype = ctypes.c_char_p
|
|
||||||
status = f(curs.pgresult_ptr)
|
|
||||||
self.assertEqual(status, b'SELECT 1')
|
|
||||||
|
|
||||||
curs.close()
|
|
||||||
self.assert_(curs.pgresult_ptr is None)
|
|
||||||
|
|
||||||
|
|
||||||
def test_suite():
|
def test_suite():
|
||||||
return unittest.TestLoader().loadTestsFromName(__name__)
|
return unittest.TestLoader().loadTestsFromName(__name__)
|
||||||
|
|
Loading…
Reference in New Issue
Block a user