Merge branch 'cockroachdb-tests'

This commit is contained in:
Daniele Varrazzo 2020-08-18 20:55:13 +01:00
commit b0ddf6ea90
20 changed files with 491 additions and 271 deletions

View File

@ -33,7 +33,8 @@ import psycopg2
import psycopg2.errors import psycopg2.errors
from psycopg2 import extensions as ext from psycopg2 import extensions as ext
from .testutils import ConnectingTestCase, StringIO, skip_before_postgres, slow from .testutils import (ConnectingTestCase, StringIO, skip_before_postgres,
skip_if_crdb, crdb_version, slow)
class PollableStub(object): class PollableStub(object):
@ -62,6 +63,10 @@ class AsyncTests(ConnectingTestCase):
self.wait(self.conn) self.wait(self.conn)
curs = self.conn.cursor() curs = self.conn.cursor()
if crdb_version(self.sync_conn) is not None:
curs.execute("set experimental_enable_temp_tables = 'on'")
self.wait(curs)
curs.execute(''' curs.execute('''
CREATE TEMPORARY TABLE table1 ( CREATE TEMPORARY TABLE table1 (
id int PRIMARY KEY id int PRIMARY KEY
@ -109,7 +114,6 @@ class AsyncTests(ConnectingTestCase):
self.wait(cur) self.wait(cur)
self.assertFalse(self.conn.isexecuting()) self.assertFalse(self.conn.isexecuting())
self.assertEquals(cur.fetchall()[0][0], '')
@slow @slow
def test_async_after_async(self): def test_async_after_async(self):
@ -324,6 +328,7 @@ class AsyncTests(ConnectingTestCase):
conn.close() conn.close()
@slow @slow
@skip_if_crdb("flush on write flakey")
def test_flush_on_write(self): def test_flush_on_write(self):
# a very large query requires a flush loop to be sent to the backend # a very large query requires a flush loop to be sent to the backend
curs = self.conn.cursor() curs = self.conn.cursor()
@ -350,6 +355,7 @@ class AsyncTests(ConnectingTestCase):
self.assertEquals(cur.fetchone()[0], 1) self.assertEquals(cur.fetchone()[0], 1)
@slow @slow
@skip_if_crdb("notify")
def test_notify(self): def test_notify(self):
cur = self.conn.cursor() cur = self.conn.cursor()
sync_cur = self.sync_conn.cursor() sync_cur = self.sync_conn.cursor()
@ -394,11 +400,12 @@ class AsyncTests(ConnectingTestCase):
self.assertRaises(psycopg2.IntegrityError, self.wait, cur) self.assertRaises(psycopg2.IntegrityError, self.wait, cur)
cur.execute("insert into table1 values (%s); " cur.execute("insert into table1 values (%s); "
"insert into table1 values (%s)", (2, 2)) "insert into table1 values (%s)", (2, 2))
# this should fail as well # this should fail as well (Postgres behaviour)
self.assertRaises(psycopg2.IntegrityError, self.wait, cur) self.assertRaises(psycopg2.IntegrityError, self.wait, cur)
# but this should work # but this should work
cur.execute("insert into table1 values (%s)", (2, )) if crdb_version(self.sync_conn) is None:
self.wait(cur) cur.execute("insert into table1 values (%s)", (2, ))
self.wait(cur)
# and the cursor should be usable afterwards # and the cursor should be usable afterwards
cur.execute("insert into table1 values (%s)", (3, )) cur.execute("insert into table1 values (%s)", (3, ))
self.wait(cur) self.wait(cur)
@ -426,6 +433,7 @@ class AsyncTests(ConnectingTestCase):
self.wait(cur2) self.wait(cur2)
self.assertEquals(cur2.fetchone()[0], 1) self.assertEquals(cur2.fetchone()[0], 1)
@skip_if_crdb("notice")
def test_notices(self): def test_notices(self):
del self.conn.notices[:] del self.conn.notices[:]
cur = self.conn.cursor() cur = self.conn.cursor()
@ -450,6 +458,7 @@ class AsyncTests(ConnectingTestCase):
self.wait(self.conn) self.wait(self.conn)
self.assertEqual(cur.fetchone(), (42,)) self.assertEqual(cur.fetchone(), (42,))
@skip_if_crdb("copy")
def test_async_connection_error_message(self): def test_async_connection_error_message(self):
try: try:
cnn = psycopg2.connect('dbname=thisdatabasedoesntexist', async_=True) cnn = psycopg2.connect('dbname=thisdatabasedoesntexist', async_=True)
@ -467,6 +476,7 @@ class AsyncTests(ConnectingTestCase):
self.assertRaises(psycopg2.ProgrammingError, self.wait, self.conn) self.assertRaises(psycopg2.ProgrammingError, self.wait, self.conn)
@slow @slow
@skip_if_crdb("notice")
@skip_before_postgres(9, 0) @skip_before_postgres(9, 0)
def test_non_block_after_notification(self): def test_non_block_after_notification(self):
from select import select from select import select
@ -500,6 +510,7 @@ class AsyncTests(ConnectingTestCase):
def test_poll_noop(self): def test_poll_noop(self):
self.conn.poll() self.conn.poll()
@skip_if_crdb("notify")
@skip_before_postgres(9, 0) @skip_before_postgres(9, 0)
def test_poll_conn_for_notification(self): def test_poll_conn_for_notification(self):
with self.conn.cursor() as cur: with self.conn.cursor() as cur:
@ -522,6 +533,11 @@ class AsyncTests(ConnectingTestCase):
else: else:
self.fail("No notification received") self.fail("No notification received")
def test_close(self):
self.conn.close()
self.assertTrue(self.conn.closed)
self.assertTrue(self.conn.async_)
def test_suite(): def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__) return unittest.TestLoader().loadTestsFromName(__name__)

View File

@ -34,6 +34,7 @@ from psycopg2 import extras
from .testconfig import dsn from .testconfig import dsn
import unittest import unittest
from .testutils import ConnectingTestCase, skip_before_postgres, slow from .testutils import ConnectingTestCase, skip_before_postgres, slow
from .testutils import skip_if_crdb
class CancelTests(ConnectingTestCase): class CancelTests(ConnectingTestCase):
@ -41,6 +42,8 @@ class CancelTests(ConnectingTestCase):
def setUp(self): def setUp(self):
ConnectingTestCase.setUp(self) ConnectingTestCase.setUp(self)
skip_if_crdb("cancel", self.conn)
cur = self.conn.cursor() cur = self.conn.cursor()
cur.execute(''' cur.execute('''
CREATE TEMPORARY TABLE table1 ( CREATE TEMPORARY TABLE table1 (
@ -106,11 +109,6 @@ class CancelTests(ConnectingTestCase):
extras.wait_select(async_conn) extras.wait_select(async_conn)
self.assertEqual(cur.fetchall(), [(1, )]) self.assertEqual(cur.fetchall(), [(1, )])
def test_async_connection_cancel(self):
async_conn = psycopg2.connect(dsn, async_=True)
async_conn.close()
self.assertTrue(async_conn.closed)
def test_suite(): def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__) return unittest.TestLoader().loadTestsFromName(__name__)

View File

@ -44,7 +44,8 @@ from psycopg2 import extensions as ext
from .testutils import ( from .testutils import (
PY2, unittest, skip_if_no_superuser, skip_before_postgres, PY2, unittest, skip_if_no_superuser, skip_before_postgres,
skip_after_postgres, skip_before_libpq, skip_after_libpq, skip_after_postgres, skip_before_libpq, skip_after_libpq,
ConnectingTestCase, skip_if_tpc_disabled, skip_if_windows, slow) ConnectingTestCase, skip_if_tpc_disabled, skip_if_windows, slow,
skip_if_crdb, crdb_version)
from .testconfig import dbhost, dsn, dbname from .testconfig import dbhost, dsn, dbname
@ -74,6 +75,7 @@ class ConnectionTests(ConnectingTestCase):
conn.close() conn.close()
self.assertEqual(curs.closed, True) self.assertEqual(curs.closed, True)
@skip_if_crdb("backend pid")
@skip_before_postgres(8, 4) @skip_before_postgres(8, 4)
@skip_if_no_superuser @skip_if_no_superuser
@skip_if_windows @skip_if_windows
@ -88,6 +90,7 @@ class ConnectionTests(ConnectingTestCase):
conn.close() conn.close()
self.assertEqual(conn.closed, 1) self.assertEqual(conn.closed, 1)
@skip_if_crdb("isolation level")
def test_reset(self): def test_reset(self):
conn = self.conn conn = self.conn
# switch session characteristics # switch session characteristics
@ -111,6 +114,7 @@ class ConnectionTests(ConnectingTestCase):
if self.conn.info.server_version >= 90100: if self.conn.info.server_version >= 90100:
self.assert_(conn.deferrable is None) self.assert_(conn.deferrable is None)
@skip_if_crdb("notice")
def test_notices(self): def test_notices(self):
conn = self.conn conn = self.conn
cur = conn.cursor() cur = conn.cursor()
@ -120,6 +124,7 @@ class ConnectionTests(ConnectingTestCase):
self.assertEqual("CREATE TABLE", cur.statusmessage) self.assertEqual("CREATE TABLE", cur.statusmessage)
self.assert_(conn.notices) self.assert_(conn.notices)
@skip_if_crdb("notice")
def test_notices_consistent_order(self): def test_notices_consistent_order(self):
conn = self.conn conn = self.conn
cur = conn.cursor() cur = conn.cursor()
@ -140,6 +145,7 @@ class ConnectionTests(ConnectingTestCase):
self.assert_('table4' in conn.notices[3]) self.assert_('table4' in conn.notices[3])
@slow @slow
@skip_if_crdb("notice")
def test_notices_limited(self): def test_notices_limited(self):
conn = self.conn conn = self.conn
cur = conn.cursor() cur = conn.cursor()
@ -154,6 +160,7 @@ class ConnectionTests(ConnectingTestCase):
self.assert_('table99' in conn.notices[-1], conn.notices[-1]) self.assert_('table99' in conn.notices[-1], conn.notices[-1])
@slow @slow
@skip_if_crdb("notice")
def test_notices_deque(self): def test_notices_deque(self):
conn = self.conn conn = self.conn
self.conn.notices = deque() self.conn.notices = deque()
@ -184,6 +191,7 @@ class ConnectionTests(ConnectingTestCase):
self.assertEqual(len([n for n in conn.notices if 'CREATE TABLE' in n]), self.assertEqual(len([n for n in conn.notices if 'CREATE TABLE' in n]),
100) 100)
@skip_if_crdb("notice")
def test_notices_noappend(self): def test_notices_noappend(self):
conn = self.conn conn = self.conn
self.conn.notices = None # will make an error swallowes ok self.conn.notices = None # will make an error swallowes ok
@ -230,6 +238,7 @@ class ConnectionTests(ConnectingTestCase):
self.assert_(time.time() - t0 < 7, self.assert_(time.time() - t0 < 7,
"something broken in concurrency") "something broken in concurrency")
@skip_if_crdb("encoding")
def test_encoding_name(self): def test_encoding_name(self):
self.conn.set_client_encoding("EUC_JP") self.conn.set_client_encoding("EUC_JP")
# conn.encoding is 'EUCJP' now. # conn.encoding is 'EUCJP' now.
@ -329,6 +338,7 @@ class ConnectionTests(ConnectingTestCase):
cur = conn.cursor(cursor_factory=None) cur = conn.cursor(cursor_factory=None)
self.assertEqual(type(cur), psycopg2.extras.DictCursor) self.assertEqual(type(cur), psycopg2.extras.DictCursor)
@skip_if_crdb("connect any db")
def test_failed_init_status(self): def test_failed_init_status(self):
class SubConnection(ext.connection): class SubConnection(ext.connection):
def __init__(self, dsn): def __init__(self, dsn):
@ -563,6 +573,12 @@ class IsolationLevelsTestCase(ConnectingTestCase):
conn = self.connect() conn = self.connect()
cur = conn.cursor() cur = conn.cursor()
if crdb_version(conn) is not None:
cur.execute("create table if not exists isolevel (id integer)")
cur.execute("truncate isolevel")
conn.commit()
return
try: try:
cur.execute("drop table isolevel;") cur.execute("drop table isolevel;")
except psycopg2.ProgrammingError: except psycopg2.ProgrammingError:
@ -583,6 +599,7 @@ class IsolationLevelsTestCase(ConnectingTestCase):
conn = self.connect() conn = self.connect()
self.assert_(conn.encoding in ext.encodings) self.assert_(conn.encoding in ext.encodings)
@skip_if_crdb("isolation level")
def test_set_isolation_level(self): def test_set_isolation_level(self):
conn = self.connect() conn = self.connect()
curs = conn.cursor() curs = conn.cursor()
@ -630,6 +647,7 @@ class IsolationLevelsTestCase(ConnectingTestCase):
curs.execute('show transaction_isolation;') curs.execute('show transaction_isolation;')
self.assertEqual(curs.fetchone()[0], 'serializable') self.assertEqual(curs.fetchone()[0], 'serializable')
@skip_if_crdb("isolation level")
def test_set_isolation_level_default(self): def test_set_isolation_level_default(self):
conn = self.connect() conn = self.connect()
curs = conn.cursor() curs = conn.cursor()
@ -704,6 +722,7 @@ class IsolationLevelsTestCase(ConnectingTestCase):
cur1.execute("select count(*) from isolevel;") cur1.execute("select count(*) from isolevel;")
self.assertEqual(1, cur1.fetchone()[0]) self.assertEqual(1, cur1.fetchone()[0])
@skip_if_crdb("isolation level")
def test_isolation_level_read_committed(self): def test_isolation_level_read_committed(self):
cnn1 = self.connect() cnn1 = self.connect()
cnn2 = self.connect() cnn2 = self.connect()
@ -730,6 +749,7 @@ class IsolationLevelsTestCase(ConnectingTestCase):
cur1.execute("select count(*) from isolevel;") cur1.execute("select count(*) from isolevel;")
self.assertEqual(2, cur1.fetchone()[0]) self.assertEqual(2, cur1.fetchone()[0])
@skip_if_crdb("isolation level")
def test_isolation_level_serializable(self): def test_isolation_level_serializable(self):
cnn1 = self.connect() cnn1 = self.connect()
cnn2 = self.connect() cnn2 = self.connect()
@ -767,6 +787,7 @@ class IsolationLevelsTestCase(ConnectingTestCase):
self.assertRaises(psycopg2.InterfaceError, self.assertRaises(psycopg2.InterfaceError,
cnn.set_isolation_level, 1) cnn.set_isolation_level, 1)
@skip_if_crdb("isolation level")
def test_setattr_isolation_level_int(self): def test_setattr_isolation_level_int(self):
cur = self.conn.cursor() cur = self.conn.cursor()
self.conn.isolation_level = ext.ISOLATION_LEVEL_SERIALIZABLE self.conn.isolation_level = ext.ISOLATION_LEVEL_SERIALIZABLE
@ -815,6 +836,7 @@ class IsolationLevelsTestCase(ConnectingTestCase):
cur.execute("SHOW default_transaction_isolation;") cur.execute("SHOW default_transaction_isolation;")
self.assertEqual(cur.fetchone()[0], isol) self.assertEqual(cur.fetchone()[0], isol)
@skip_if_crdb("isolation level")
def test_setattr_isolation_level_str(self): def test_setattr_isolation_level_str(self):
cur = self.conn.cursor() cur = self.conn.cursor()
self.conn.isolation_level = "serializable" self.conn.isolation_level = "serializable"
@ -911,6 +933,13 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
def make_test_table(self): def make_test_table(self):
cnn = self.connect() cnn = self.connect()
cur = cnn.cursor() cur = cnn.cursor()
if crdb_version(cnn) is not None:
cur.execute("CREATE TABLE IF NOT EXISTS test_tpc (data text)")
cur.execute("TRUNCATE test_tpc")
cnn.commit()
cnn.close()
return
try: try:
cur.execute("DROP TABLE test_tpc;") cur.execute("DROP TABLE test_tpc;")
except psycopg2.ProgrammingError: except psycopg2.ProgrammingError:
@ -1244,6 +1273,7 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
self.assertEqual(None, xid.bqual) self.assertEqual(None, xid.bqual)
@skip_if_crdb("isolation level")
class TransactionControlTests(ConnectingTestCase): class TransactionControlTests(ConnectingTestCase):
def test_closed(self): def test_closed(self):
self.conn.close() self.conn.close()
@ -1672,6 +1702,7 @@ class PasswordLeakTestCase(ConnectingTestCase):
# the password away # the password away
PasswordLeakTestCase.dsn = self.dsn PasswordLeakTestCase.dsn = self.dsn
@skip_if_crdb("connect any db")
def test_leak(self): def test_leak(self):
self.assertRaises(psycopg2.DatabaseError, self.assertRaises(psycopg2.DatabaseError,
self.GrassingConnection, "dbname=nosuch password=whateva") self.GrassingConnection, "dbname=nosuch password=whateva")
@ -1857,6 +1888,7 @@ class TestConnectionInfo(ConnectingTestCase):
self.assert_(self.conn.info.socket >= 0) self.assert_(self.conn.info.socket >= 0)
self.assert_(self.bconn.info.socket < 0) self.assert_(self.bconn.info.socket < 0)
@skip_if_crdb("backend pid")
def test_backend_pid(self): def test_backend_pid(self):
cur = self.conn.cursor() cur = self.conn.cursor()
try: try:

View File

@ -27,7 +27,8 @@ import io
import sys import sys
import string import string
import unittest import unittest
from .testutils import (ConnectingTestCase, skip_before_postgres, slow, StringIO) from .testutils import ConnectingTestCase, skip_before_postgres, slow, StringIO
from .testutils import skip_if_crdb
from itertools import cycle from itertools import cycle
from subprocess import Popen, PIPE from subprocess import Popen, PIPE
@ -66,6 +67,7 @@ class CopyTests(ConnectingTestCase):
self._create_temp_table() self._create_temp_table()
def _create_temp_table(self): def _create_temp_table(self):
skip_if_crdb("copy", self.conn)
curs = self.conn.cursor() curs = self.conn.cursor()
curs.execute(''' curs.execute('''
CREATE TEMPORARY TABLE tcopy ( CREATE TEMPORARY TABLE tcopy (

View File

@ -36,7 +36,7 @@ from decimal import Decimal
from weakref import ref from weakref import ref
from .testutils import (ConnectingTestCase, skip_before_postgres, from .testutils import (ConnectingTestCase, skip_before_postgres,
skip_if_no_getrefcount, slow, skip_if_no_superuser, skip_if_no_getrefcount, slow, skip_if_no_superuser,
skip_if_windows) skip_if_windows, skip_if_crdb, crdb_version)
import psycopg2.extras import psycopg2.extras
from psycopg2.compat import text_type from psycopg2.compat import text_type
@ -59,7 +59,7 @@ class CursorTests(ConnectingTestCase):
def test_executemany_propagate_exceptions(self): def test_executemany_propagate_exceptions(self):
conn = self.conn conn = self.conn
cur = conn.cursor() cur = conn.cursor()
cur.execute("create temp table test_exc (data int);") cur.execute("create table test_exc (data int);")
def buggygen(): def buggygen():
yield 1 // 0 yield 1 // 0
@ -177,9 +177,237 @@ class CursorTests(ConnectingTestCase):
curs = self.conn.cursor(None) curs = self.conn.cursor(None)
self.assertEqual(curs.name, None) self.assertEqual(curs.name, None)
def test_description_attribs(self):
curs = self.conn.cursor()
curs.execute("""select
3.14::decimal(10,2) as pi,
'hello'::text as hi,
'2010-02-18'::date as now;
""")
self.assertEqual(len(curs.description), 3)
for c in curs.description:
self.assertEqual(len(c), 7) # DBAPI happy
for a in ('name', 'type_code', 'display_size', 'internal_size',
'precision', 'scale', 'null_ok'):
self.assert_(hasattr(c, a), a)
c = curs.description[0]
self.assertEqual(c.name, 'pi')
self.assert_(c.type_code in psycopg2.extensions.DECIMAL.values)
if crdb_version(self.conn) is None:
self.assert_(c.internal_size > 0)
self.assertEqual(c.precision, 10)
self.assertEqual(c.scale, 2)
c = curs.description[1]
self.assertEqual(c.name, 'hi')
self.assert_(c.type_code in psycopg2.STRING.values)
self.assert_(c.internal_size < 0)
self.assertEqual(c.precision, None)
self.assertEqual(c.scale, None)
c = curs.description[2]
self.assertEqual(c.name, 'now')
self.assert_(c.type_code in psycopg2.extensions.DATE.values)
self.assert_(c.internal_size > 0)
self.assertEqual(c.precision, None)
self.assertEqual(c.scale, None)
@skip_if_crdb("table oid")
def test_description_extra_attribs(self):
curs = self.conn.cursor()
curs.execute("""
create table testcol (
pi decimal(10,2),
hi text)
""")
curs.execute("select oid from pg_class where relname = %s", ('testcol',))
oid = curs.fetchone()[0]
curs.execute("insert into testcol values (3.14, 'hello')")
curs.execute("select hi, pi, 42 from testcol")
self.assertEqual(curs.description[0].table_oid, oid)
self.assertEqual(curs.description[0].table_column, 2)
self.assertEqual(curs.description[1].table_oid, oid)
self.assertEqual(curs.description[1].table_column, 1)
self.assertEqual(curs.description[2].table_oid, None)
self.assertEqual(curs.description[2].table_column, None)
def test_description_slice(self):
curs = self.conn.cursor()
curs.execute("select 1::int4 as a")
self.assertEqual(curs.description[0][0:2], ('a', 23))
def test_pickle_description(self):
curs = self.conn.cursor()
curs.execute('SELECT 1 AS foo')
description = curs.description
pickled = pickle.dumps(description, pickle.HIGHEST_PROTOCOL)
unpickled = pickle.loads(pickled)
self.assertEqual(description, unpickled)
def test_bad_subclass(self):
# check that we get an error message instead of a segfault
# for badly written subclasses.
# see https://stackoverflow.com/questions/22019341/
class StupidCursor(psycopg2.extensions.cursor):
def __init__(self, *args, **kwargs):
# I am stupid so not calling superclass init
pass
cur = StupidCursor()
self.assertRaises(psycopg2.InterfaceError, cur.execute, 'select 1')
self.assertRaises(psycopg2.InterfaceError, cur.executemany,
'select 1', [])
def test_callproc_badparam(self):
cur = self.conn.cursor()
self.assertRaises(TypeError, cur.callproc, 'lower', 42)
# It would be inappropriate to test callproc's named parameters in the
# DBAPI2.0 test section because they are a psycopg2 extension.
@skip_before_postgres(9, 0)
@skip_if_crdb("stored procedure")
def test_callproc_dict(self):
# This parameter name tests for injection and quote escaping
paramname = '''
Robert'); drop table "students" --
'''.strip()
escaped_paramname = '"%s"' % paramname.replace('"', '""')
procname = 'pg_temp.randall'
cur = self.conn.cursor()
# Set up the temporary function
cur.execute('''
CREATE FUNCTION %s(%s INT)
RETURNS INT AS
'SELECT $1 * $1'
LANGUAGE SQL
''' % (procname, escaped_paramname))
# Make sure callproc works right
cur.callproc(procname, {paramname: 2})
self.assertEquals(cur.fetchone()[0], 4)
# Make sure callproc fails right
failing_cases = [
({paramname: 2, 'foo': 'bar'}, psycopg2.ProgrammingError),
({paramname: '2'}, psycopg2.ProgrammingError),
({paramname: 'two'}, psycopg2.ProgrammingError),
({u'bj\xc3rn': 2}, psycopg2.ProgrammingError),
({3: 2}, TypeError),
({self: 2}, TypeError),
]
for parameter_sequence, exception in failing_cases:
self.assertRaises(exception, cur.callproc, procname, parameter_sequence)
self.conn.rollback()
@skip_if_no_superuser
@skip_if_windows
@skip_if_crdb("backend pid")
@skip_before_postgres(8, 4)
def test_external_close_sync(self):
# If a "victim" connection is closed by a "control" connection
# behind psycopg2's back, psycopg2 always handles it correctly:
# raise OperationalError, set conn.closed to 2. This reproduces
# issue #443, a race between control_conn closing victim_conn and
# psycopg2 noticing.
control_conn = self.conn
connect_func = self.connect
def wait_func(conn):
pass
self._test_external_close(control_conn, connect_func, wait_func)
@skip_if_no_superuser
@skip_if_windows
@skip_if_crdb("backend pid")
@skip_before_postgres(8, 4)
def test_external_close_async(self):
# Issue #443 is in the async code too. Since the fix is duplicated,
# so is the test.
control_conn = self.conn
def connect_func():
return self.connect(async_=True)
wait_func = psycopg2.extras.wait_select
self._test_external_close(control_conn, connect_func, wait_func)
def _test_external_close(self, control_conn, connect_func, wait_func):
# The short sleep before using victim_conn the second time makes it
# much more likely to lose the race and see the bug. Repeating the
# test several times makes it even more likely.
for i in range(10):
victim_conn = connect_func()
wait_func(victim_conn)
with victim_conn.cursor() as cur:
cur.execute('select pg_backend_pid()')
wait_func(victim_conn)
pid1 = cur.fetchall()[0][0]
with control_conn.cursor() as cur:
cur.execute('select pg_terminate_backend(%s)', (pid1,))
time.sleep(0.001)
def f():
with victim_conn.cursor() as cur:
cur.execute('select 1')
wait_func(victim_conn)
self.assertRaises(psycopg2.OperationalError, f)
self.assertEqual(victim_conn.closed, 2)
@skip_before_postgres(8, 2)
def test_rowcount_on_executemany_returning(self):
cur = self.conn.cursor()
cur.execute("create table execmany(id serial primary key, data int)")
cur.executemany(
"insert into execmany (data) values (%s)",
[(i,) for i in range(4)])
self.assertEqual(cur.rowcount, 4)
cur.executemany(
"insert into execmany (data) values (%s) returning data",
[(i,) for i in range(5)])
self.assertEqual(cur.rowcount, 5)
@skip_before_postgres(9)
def test_pgresult_ptr(self):
curs = self.conn.cursor()
self.assert_(curs.pgresult_ptr is None)
curs.execute("select 'x'")
self.assert_(curs.pgresult_ptr is not None)
try:
f = self.libpq.PQcmdStatus
except AttributeError:
pass
else:
f.argtypes = [ctypes.c_void_p]
f.restype = ctypes.c_char_p
status = f(curs.pgresult_ptr)
self.assertEqual(status, b'SELECT 1')
curs.close()
self.assert_(curs.pgresult_ptr is None)
@skip_if_crdb("named cursor")
class NamedCursorTests(ConnectingTestCase):
def test_invalid_name(self): def test_invalid_name(self):
curs = self.conn.cursor() curs = self.conn.cursor()
curs.execute("create temp table invname (data int);") curs.execute("create table invname (data int);")
for i in (10, 20, 30): for i in (10, 20, 30):
curs.execute("insert into invname values (%s)", (i,)) curs.execute("insert into invname values (%s)", (i,))
curs.close() curs.close()
@ -377,77 +605,6 @@ class CursorTests(ConnectingTestCase):
for i, rec in enumerate(curs): for i, rec in enumerate(curs):
self.assertEqual(i + 1, curs.rownumber) self.assertEqual(i + 1, curs.rownumber)
def test_description_attribs(self):
curs = self.conn.cursor()
curs.execute("""select
3.14::decimal(10,2) as pi,
'hello'::text as hi,
'2010-02-18'::date as now;
""")
self.assertEqual(len(curs.description), 3)
for c in curs.description:
self.assertEqual(len(c), 7) # DBAPI happy
for a in ('name', 'type_code', 'display_size', 'internal_size',
'precision', 'scale', 'null_ok'):
self.assert_(hasattr(c, a), a)
c = curs.description[0]
self.assertEqual(c.name, 'pi')
self.assert_(c.type_code in psycopg2.extensions.DECIMAL.values)
self.assert_(c.internal_size > 0)
self.assertEqual(c.precision, 10)
self.assertEqual(c.scale, 2)
c = curs.description[1]
self.assertEqual(c.name, 'hi')
self.assert_(c.type_code in psycopg2.STRING.values)
self.assert_(c.internal_size < 0)
self.assertEqual(c.precision, None)
self.assertEqual(c.scale, None)
c = curs.description[2]
self.assertEqual(c.name, 'now')
self.assert_(c.type_code in psycopg2.extensions.DATE.values)
self.assert_(c.internal_size > 0)
self.assertEqual(c.precision, None)
self.assertEqual(c.scale, None)
def test_description_extra_attribs(self):
curs = self.conn.cursor()
curs.execute("""
create table testcol (
pi decimal(10,2),
hi text)
""")
curs.execute("select oid from pg_class where relname = %s", ('testcol',))
oid = curs.fetchone()[0]
curs.execute("insert into testcol values (3.14, 'hello')")
curs.execute("select hi, pi, 42 from testcol")
self.assertEqual(curs.description[0].table_oid, oid)
self.assertEqual(curs.description[0].table_column, 2)
self.assertEqual(curs.description[1].table_oid, oid)
self.assertEqual(curs.description[1].table_column, 1)
self.assertEqual(curs.description[2].table_oid, None)
self.assertEqual(curs.description[2].table_column, None)
def test_description_slice(self):
curs = self.conn.cursor()
curs.execute("select 1::int as a")
self.assertEqual(curs.description[0][0:2], ('a', 23))
def test_pickle_description(self):
curs = self.conn.cursor()
curs.execute('SELECT 1 AS foo')
description = curs.description
pickled = pickle.dumps(description, pickle.HIGHEST_PROTOCOL)
unpickled = pickle.loads(pickled)
self.assertEqual(description, unpickled)
@skip_before_postgres(8, 0) @skip_before_postgres(8, 0)
def test_named_cursor_stealing(self): def test_named_cursor_stealing(self):
# you can use a named cursor to iterate on a refcursor created # you can use a named cursor to iterate on a refcursor created
@ -527,155 +684,6 @@ class CursorTests(ConnectingTestCase):
cur.scroll(9, mode='absolute') cur.scroll(9, mode='absolute')
self.assertEqual(cur.fetchone(), (9,)) self.assertEqual(cur.fetchone(), (9,))
def test_bad_subclass(self):
# check that we get an error message instead of a segfault
# for badly written subclasses.
# see https://stackoverflow.com/questions/22019341/
class StupidCursor(psycopg2.extensions.cursor):
def __init__(self, *args, **kwargs):
# I am stupid so not calling superclass init
pass
cur = StupidCursor()
self.assertRaises(psycopg2.InterfaceError, cur.execute, 'select 1')
self.assertRaises(psycopg2.InterfaceError, cur.executemany,
'select 1', [])
def test_callproc_badparam(self):
cur = self.conn.cursor()
self.assertRaises(TypeError, cur.callproc, 'lower', 42)
# It would be inappropriate to test callproc's named parameters in the
# DBAPI2.0 test section because they are a psycopg2 extension.
@skip_before_postgres(9, 0)
def test_callproc_dict(self):
# This parameter name tests for injection and quote escaping
paramname = '''
Robert'); drop table "students" --
'''.strip()
escaped_paramname = '"%s"' % paramname.replace('"', '""')
procname = 'pg_temp.randall'
cur = self.conn.cursor()
# Set up the temporary function
cur.execute('''
CREATE FUNCTION %s(%s INT)
RETURNS INT AS
'SELECT $1 * $1'
LANGUAGE SQL
''' % (procname, escaped_paramname))
# Make sure callproc works right
cur.callproc(procname, {paramname: 2})
self.assertEquals(cur.fetchone()[0], 4)
# Make sure callproc fails right
failing_cases = [
({paramname: 2, 'foo': 'bar'}, psycopg2.ProgrammingError),
({paramname: '2'}, psycopg2.ProgrammingError),
({paramname: 'two'}, psycopg2.ProgrammingError),
({u'bj\xc3rn': 2}, psycopg2.ProgrammingError),
({3: 2}, TypeError),
({self: 2}, TypeError),
]
for parameter_sequence, exception in failing_cases:
self.assertRaises(exception, cur.callproc, procname, parameter_sequence)
self.conn.rollback()
@skip_if_no_superuser
@skip_if_windows
@skip_before_postgres(8, 4)
def test_external_close_sync(self):
# If a "victim" connection is closed by a "control" connection
# behind psycopg2's back, psycopg2 always handles it correctly:
# raise OperationalError, set conn.closed to 2. This reproduces
# issue #443, a race between control_conn closing victim_conn and
# psycopg2 noticing.
control_conn = self.conn
connect_func = self.connect
def wait_func(conn):
pass
self._test_external_close(control_conn, connect_func, wait_func)
@skip_if_no_superuser
@skip_if_windows
@skip_before_postgres(8, 4)
def test_external_close_async(self):
# Issue #443 is in the async code too. Since the fix is duplicated,
# so is the test.
control_conn = self.conn
def connect_func():
return self.connect(async_=True)
wait_func = psycopg2.extras.wait_select
self._test_external_close(control_conn, connect_func, wait_func)
def _test_external_close(self, control_conn, connect_func, wait_func):
# The short sleep before using victim_conn the second time makes it
# much more likely to lose the race and see the bug. Repeating the
# test several times makes it even more likely.
for i in range(10):
victim_conn = connect_func()
wait_func(victim_conn)
with victim_conn.cursor() as cur:
cur.execute('select pg_backend_pid()')
wait_func(victim_conn)
pid1 = cur.fetchall()[0][0]
with control_conn.cursor() as cur:
cur.execute('select pg_terminate_backend(%s)', (pid1,))
time.sleep(0.001)
def f():
with victim_conn.cursor() as cur:
cur.execute('select 1')
wait_func(victim_conn)
self.assertRaises(psycopg2.OperationalError, f)
self.assertEqual(victim_conn.closed, 2)
@skip_before_postgres(8, 2)
def test_rowcount_on_executemany_returning(self):
cur = self.conn.cursor()
cur.execute("create table execmany(id serial primary key, data int)")
cur.executemany(
"insert into execmany (data) values (%s)",
[(i,) for i in range(4)])
self.assertEqual(cur.rowcount, 4)
cur.executemany(
"insert into execmany (data) values (%s) returning data",
[(i,) for i in range(5)])
self.assertEqual(cur.rowcount, 5)
@skip_before_postgres(9)
def test_pgresult_ptr(self):
curs = self.conn.cursor()
self.assert_(curs.pgresult_ptr is None)
curs.execute("select 'x'")
self.assert_(curs.pgresult_ptr is not None)
try:
f = self.libpq.PQcmdStatus
except AttributeError:
pass
else:
f.argtypes = [ctypes.c_void_p]
f.restype = ctypes.c_char_p
status = f(curs.pgresult_ptr)
self.assertEqual(status, b'SELECT 1')
curs.close()
self.assert_(curs.pgresult_ptr is None)
def test_suite(): def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__) return unittest.TestLoader().loadTestsFromName(__name__)

View File

@ -30,7 +30,7 @@ from datetime import date, datetime, time, timedelta
import psycopg2 import psycopg2
from psycopg2.tz import FixedOffsetTimezone, ZERO from psycopg2.tz import FixedOffsetTimezone, ZERO
import unittest import unittest
from .testutils import ConnectingTestCase, skip_before_postgres from .testutils import ConnectingTestCase, skip_before_postgres, skip_if_crdb
try: try:
from mx.DateTime import Date, Time, DateTime, DateTimeDeltaFrom from mx.DateTime import Date, Time, DateTime, DateTimeDeltaFrom
@ -246,6 +246,7 @@ class DatetimeTests(ConnectingTestCase, CommonDatetimeTestsMixin):
[time(13, 30, 29)]) [time(13, 30, 29)])
self.assertEqual(value, '13:30:29') self.assertEqual(value, '13:30:29')
@skip_if_crdb("cast adds tz")
def test_adapt_datetime(self): def test_adapt_datetime(self):
value = self.execute('select (%s)::timestamp::text', value = self.execute('select (%s)::timestamp::text',
[datetime(2007, 1, 1, 13, 30, 29)]) [datetime(2007, 1, 1, 13, 30, 29)])
@ -386,6 +387,7 @@ class DatetimeTests(ConnectingTestCase, CommonDatetimeTestsMixin):
self.assertRaises(OverflowError, f, '00:00:100000000000000000:00') self.assertRaises(OverflowError, f, '00:00:100000000000000000:00')
self.assertRaises(OverflowError, f, '00:00:00.100000000000000000') self.assertRaises(OverflowError, f, '00:00:00.100000000000000000')
@skip_if_crdb("infinity date")
def test_adapt_infinity_tz(self): def test_adapt_infinity_tz(self):
t = self.execute("select 'infinity'::timestamp") t = self.execute("select 'infinity'::timestamp")
self.assert_(t.tzinfo is None) self.assert_(t.tzinfo is None)
@ -423,6 +425,7 @@ class DatetimeTests(ConnectingTestCase, CommonDatetimeTestsMixin):
r = cur.fetchone()[0] r = cur.fetchone()[0]
self.assertEqual(r, v, "%s -> %s != %s" % (s, r, v)) self.assertEqual(r, v, "%s -> %s != %s" % (s, r, v))
@skip_if_crdb("interval style")
@skip_before_postgres(8, 4) @skip_before_postgres(8, 4)
def test_interval_iso_8601_not_supported(self): def test_interval_iso_8601_not_supported(self):
# We may end up supporting, but no pressure for it # We may end up supporting, but no pressure for it

View File

@ -27,13 +27,15 @@ import psycopg2.extras
from psycopg2.extras import NamedTupleConnection, NamedTupleCursor from psycopg2.extras import NamedTupleConnection, NamedTupleCursor
from .testutils import ConnectingTestCase, skip_before_postgres, \ from .testutils import ConnectingTestCase, skip_before_postgres, \
skip_before_python, skip_from_python skip_before_python, skip_from_python, crdb_version, skip_if_crdb
class _DictCursorBase(ConnectingTestCase): class _DictCursorBase(ConnectingTestCase):
def setUp(self): def setUp(self):
ConnectingTestCase.setUp(self) ConnectingTestCase.setUp(self)
curs = self.conn.cursor() curs = self.conn.cursor()
if crdb_version(self.conn) is not None:
curs.execute("SET experimental_enable_temp_tables = 'on'")
curs.execute("CREATE TEMPORARY TABLE ExtrasDictCursorTests (foo text)") curs.execute("CREATE TEMPORARY TABLE ExtrasDictCursorTests (foo text)")
curs.execute("INSERT INTO ExtrasDictCursorTests VALUES ('bar')") curs.execute("INSERT INTO ExtrasDictCursorTests VALUES ('bar')")
self.conn.commit() self.conn.commit()
@ -62,6 +64,7 @@ class _DictCursorBase(ConnectingTestCase):
class ExtrasDictCursorTests(_DictCursorBase): class ExtrasDictCursorTests(_DictCursorBase):
"""Test if DictCursor extension class works.""" """Test if DictCursor extension class works."""
@skip_if_crdb("named cursor")
def testDictConnCursorArgs(self): def testDictConnCursorArgs(self):
self.conn.close() self.conn.close()
self.conn = self.connect(connection_factory=psycopg2.extras.DictConnection) self.conn = self.connect(connection_factory=psycopg2.extras.DictConnection)
@ -129,16 +132,19 @@ class ExtrasDictCursorTests(_DictCursorBase):
return row return row
self._testWithNamedCursor(getter) self._testWithNamedCursor(getter)
@skip_if_crdb("named cursor")
@skip_before_postgres(8, 2) @skip_before_postgres(8, 2)
def testDictCursorWithNamedCursorNotGreedy(self): def testDictCursorWithNamedCursorNotGreedy(self):
curs = self.conn.cursor('tmp', cursor_factory=psycopg2.extras.DictCursor) curs = self.conn.cursor('tmp', cursor_factory=psycopg2.extras.DictCursor)
self._testNamedCursorNotGreedy(curs) self._testNamedCursorNotGreedy(curs)
@skip_if_crdb("named cursor")
@skip_before_postgres(8, 0) @skip_before_postgres(8, 0)
def testDictCursorWithNamedCursorIterRowNumber(self): def testDictCursorWithNamedCursorIterRowNumber(self):
curs = self.conn.cursor('tmp', cursor_factory=psycopg2.extras.DictCursor) curs = self.conn.cursor('tmp', cursor_factory=psycopg2.extras.DictCursor)
self._testIterRowNumber(curs) self._testIterRowNumber(curs)
@skip_if_crdb("named cursor")
def _testWithNamedCursor(self, getter): def _testWithNamedCursor(self, getter):
curs = self.conn.cursor('aname', cursor_factory=psycopg2.extras.DictCursor) curs = self.conn.cursor('aname', cursor_factory=psycopg2.extras.DictCursor)
curs.execute("SELECT * FROM ExtrasDictCursorTests") curs.execute("SELECT * FROM ExtrasDictCursorTests")
@ -314,16 +320,19 @@ class ExtrasDictCursorRealTests(_DictCursorBase):
return row return row
self._testWithNamedCursorReal(getter) self._testWithNamedCursorReal(getter)
@skip_if_crdb("named cursor")
@skip_before_postgres(8, 2) @skip_before_postgres(8, 2)
def testDictCursorRealWithNamedCursorNotGreedy(self): def testDictCursorRealWithNamedCursorNotGreedy(self):
curs = self.conn.cursor('tmp', cursor_factory=psycopg2.extras.RealDictCursor) curs = self.conn.cursor('tmp', cursor_factory=psycopg2.extras.RealDictCursor)
self._testNamedCursorNotGreedy(curs) self._testNamedCursorNotGreedy(curs)
@skip_if_crdb("named cursor")
@skip_before_postgres(8, 0) @skip_before_postgres(8, 0)
def testDictCursorRealWithNamedCursorIterRowNumber(self): def testDictCursorRealWithNamedCursorIterRowNumber(self):
curs = self.conn.cursor('tmp', cursor_factory=psycopg2.extras.RealDictCursor) curs = self.conn.cursor('tmp', cursor_factory=psycopg2.extras.RealDictCursor)
self._testIterRowNumber(curs) self._testIterRowNumber(curs)
@skip_if_crdb("named cursor")
def _testWithNamedCursorReal(self, getter): def _testWithNamedCursorReal(self, getter):
curs = self.conn.cursor('aname', curs = self.conn.cursor('aname',
cursor_factory=psycopg2.extras.RealDictCursor) cursor_factory=psycopg2.extras.RealDictCursor)
@ -429,12 +438,15 @@ class NamedTupleCursorTest(ConnectingTestCase):
self.conn = self.connect(connection_factory=NamedTupleConnection) self.conn = self.connect(connection_factory=NamedTupleConnection)
curs = self.conn.cursor() curs = self.conn.cursor()
if crdb_version(self.conn) is not None:
curs.execute("SET experimental_enable_temp_tables = 'on'")
curs.execute("CREATE TEMPORARY TABLE nttest (i int, s text)") curs.execute("CREATE TEMPORARY TABLE nttest (i int, s text)")
curs.execute("INSERT INTO nttest VALUES (1, 'foo')") curs.execute("INSERT INTO nttest VALUES (1, 'foo')")
curs.execute("INSERT INTO nttest VALUES (2, 'bar')") curs.execute("INSERT INTO nttest VALUES (2, 'bar')")
curs.execute("INSERT INTO nttest VALUES (3, 'baz')") curs.execute("INSERT INTO nttest VALUES (3, 'baz')")
self.conn.commit() self.conn.commit()
@skip_if_crdb("named cursor")
def test_cursor_args(self): def test_cursor_args(self):
cur = self.conn.cursor('foo', cursor_factory=psycopg2.extras.DictCursor) cur = self.conn.cursor('foo', cursor_factory=psycopg2.extras.DictCursor)
self.assertEqual(cur.name, 'foo') self.assertEqual(cur.name, 'foo')
@ -592,6 +604,7 @@ class NamedTupleCursorTest(ConnectingTestCase):
finally: finally:
NamedTupleCursor._make_nt = f_orig NamedTupleCursor._make_nt = f_orig
@skip_if_crdb("named cursor")
@skip_before_postgres(8, 0) @skip_before_postgres(8, 0)
def test_named(self): def test_named(self):
curs = self.conn.cursor('tmp') curs = self.conn.cursor('tmp')
@ -602,24 +615,28 @@ class NamedTupleCursorTest(ConnectingTestCase):
recs.extend(curs.fetchall()) recs.extend(curs.fetchall())
self.assertEqual(list(range(10)), [t.i for t in recs]) self.assertEqual(list(range(10)), [t.i for t in recs])
@skip_if_crdb("named cursor")
def test_named_fetchone(self): def test_named_fetchone(self):
curs = self.conn.cursor('tmp') curs = self.conn.cursor('tmp')
curs.execute("""select 42 as i""") curs.execute("""select 42 as i""")
t = curs.fetchone() t = curs.fetchone()
self.assertEqual(t.i, 42) self.assertEqual(t.i, 42)
@skip_if_crdb("named cursor")
def test_named_fetchmany(self): def test_named_fetchmany(self):
curs = self.conn.cursor('tmp') curs = self.conn.cursor('tmp')
curs.execute("""select 42 as i""") curs.execute("""select 42 as i""")
recs = curs.fetchmany(10) recs = curs.fetchmany(10)
self.assertEqual(recs[0].i, 42) self.assertEqual(recs[0].i, 42)
@skip_if_crdb("named cursor")
def test_named_fetchall(self): def test_named_fetchall(self):
curs = self.conn.cursor('tmp') curs = self.conn.cursor('tmp')
curs.execute("""select 42 as i""") curs.execute("""select 42 as i""")
recs = curs.fetchall() recs = curs.fetchall()
self.assertEqual(recs[0].i, 42) self.assertEqual(recs[0].i, 42)
@skip_if_crdb("named cursor")
@skip_before_postgres(8, 2) @skip_before_postgres(8, 2)
def test_not_greedy(self): def test_not_greedy(self):
curs = self.conn.cursor('tmp') curs = self.conn.cursor('tmp')
@ -634,6 +651,7 @@ class NamedTupleCursorTest(ConnectingTestCase):
self.assert_(recs[1].ts - recs[0].ts < timedelta(seconds=0.005)) self.assert_(recs[1].ts - recs[0].ts < timedelta(seconds=0.005))
self.assert_(recs[2].ts - recs[1].ts > timedelta(seconds=0.0099)) self.assert_(recs[2].ts - recs[1].ts > timedelta(seconds=0.0099))
@skip_if_crdb("named cursor")
@skip_before_postgres(8, 0) @skip_before_postgres(8, 0)
def test_named_rownumber(self): def test_named_rownumber(self):
curs = self.conn.cursor('tmp') curs = self.conn.cursor('tmp')

View File

@ -33,6 +33,7 @@ import psycopg2.extras
from psycopg2.extensions import POLL_OK, POLL_READ, POLL_WRITE from psycopg2.extensions import POLL_OK, POLL_READ, POLL_WRITE
from .testutils import ConnectingTestCase, skip_before_postgres, slow from .testutils import ConnectingTestCase, skip_before_postgres, slow
from .testutils import skip_if_crdb
class ConnectionStub(object): class ConnectionStub(object):
@ -67,6 +68,7 @@ class GreenTestCase(ConnectingTestCase):
return stub return stub
@slow @slow
@skip_if_crdb("flush on write flakey")
def test_flush_on_write(self): def test_flush_on_write(self):
# a very large query requires a flush loop to be sent to the backend # a very large query requires a flush loop to be sent to the backend
conn = self.conn conn = self.conn
@ -122,8 +124,9 @@ class GreenTestCase(ConnectingTestCase):
cur.execute, "copy (select 1) to stdout") cur.execute, "copy (select 1) to stdout")
@slow @slow
@skip_if_crdb("notice")
@skip_before_postgres(9, 0) @skip_before_postgres(9, 0)
def test_non_block_after_notification(self): def test_non_block_after_notice(self):
def wait(conn): def wait(conn):
while 1: while 1:
state = conn.poll() state = conn.poll()
@ -216,6 +219,7 @@ class CallbackErrorTestCase(ConnectingTestCase):
self.fail("you should have had a success or an error by now") self.fail("you should have had a success or an error by now")
@skip_if_crdb("named cursor")
def test_errors_named_cursor(self): def test_errors_named_cursor(self):
for i in range(100): for i in range(100):
self.to_error = None self.to_error = None

View File

@ -71,6 +71,7 @@ class NetworkingTestCase(testutils.ConnectingTestCase):
cur.execute("select %s", [ip.ip_interface('::ffff:102:300/128')]) cur.execute("select %s", [ip.ip_interface('::ffff:102:300/128')])
self.assertEquals(cur.fetchone()[0], '::ffff:102:300/128') self.assertEquals(cur.fetchone()[0], '::ffff:102:300/128')
@testutils.skip_if_crdb("cidr")
def test_cidr_cast(self): def test_cidr_cast(self):
cur = self.conn.cursor() cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur) psycopg2.extras.register_ipaddress(cur)
@ -88,6 +89,7 @@ class NetworkingTestCase(testutils.ConnectingTestCase):
self.assert_(isinstance(obj, ip.IPv6Network), repr(obj)) self.assert_(isinstance(obj, ip.IPv6Network), repr(obj))
self.assertEquals(obj, ip.ip_network('::ffff:102:300/128')) self.assertEquals(obj, ip.ip_network('::ffff:102:300/128'))
@testutils.skip_if_crdb("cidr")
@testutils.skip_before_postgres(8, 2) @testutils.skip_before_postgres(8, 2)
def test_cidr_array_cast(self): def test_cidr_array_cast(self):
cur = self.conn.cursor() cur = self.conn.cursor()

View File

@ -32,13 +32,14 @@ import psycopg2
import psycopg2.extensions import psycopg2.extensions
import unittest import unittest
from .testutils import (decorate_all_tests, skip_if_tpc_disabled, from .testutils import (decorate_all_tests, skip_if_tpc_disabled,
skip_before_postgres, ConnectingTestCase, skip_if_green, slow) skip_before_postgres, ConnectingTestCase, skip_if_green, skip_if_crdb, slow)
skip_if_no_lo = skip_before_postgres(8, 1, def skip_if_no_lo(f):
"large objects only supported from PG 8.1") f = skip_before_postgres(8, 1, "large objects only supported from PG 8.1")(f)
f = skip_if_green("libpq doesn't support LO in async mode")(f)
skip_lo_if_green = skip_if_green("libpq doesn't support LO in async mode") f = skip_if_crdb("large objects")(f)
return f
class LargeObjectTestCase(ConnectingTestCase): class LargeObjectTestCase(ConnectingTestCase):
@ -67,7 +68,6 @@ class LargeObjectTestCase(ConnectingTestCase):
@skip_if_no_lo @skip_if_no_lo
@skip_lo_if_green
class LargeObjectTests(LargeObjectTestCase): class LargeObjectTests(LargeObjectTestCase):
def test_create(self): def test_create(self):
lo = self.conn.lobject() lo = self.conn.lobject()
@ -413,7 +413,6 @@ def skip_if_no_truncate(f):
@skip_if_no_lo @skip_if_no_lo
@skip_lo_if_green
@skip_if_no_truncate @skip_if_no_truncate
class LargeObjectTruncateTests(LargeObjectTestCase): class LargeObjectTruncateTests(LargeObjectTestCase):
def test_truncate(self): def test_truncate(self):
@ -478,7 +477,6 @@ def skip_if_no_lo64(f):
@skip_if_no_lo @skip_if_no_lo
@skip_lo_if_green
@skip_if_no_truncate @skip_if_no_truncate
@skip_if_no_lo64 @skip_if_no_lo64
class LargeObject64Tests(LargeObjectTestCase): class LargeObject64Tests(LargeObjectTestCase):
@ -506,7 +504,6 @@ def skip_if_lo64(f):
@skip_if_no_lo @skip_if_no_lo
@skip_lo_if_green
@skip_if_no_truncate @skip_if_no_truncate
@skip_if_lo64 @skip_if_lo64
class LargeObjectNot64Tests(LargeObjectTestCase): class LargeObjectNot64Tests(LargeObjectTestCase):

View File

@ -32,7 +32,7 @@ from weakref import ref
import unittest import unittest
from .testutils import (skip_before_postgres, from .testutils import (skip_before_postgres,
ConnectingTestCase, skip_copy_if_green, slow, StringIO) ConnectingTestCase, skip_copy_if_green, skip_if_crdb, slow, StringIO)
import psycopg2 import psycopg2
@ -216,6 +216,7 @@ class ExceptionsTestCase(ConnectingTestCase):
gc.collect() gc.collect()
assert(w() is None) assert(w() is None)
@skip_if_crdb("copy")
@skip_copy_if_green @skip_copy_if_green
def test_diagnostics_copy(self): def test_diagnostics_copy(self):
f = StringIO() f = StringIO()
@ -244,6 +245,7 @@ class ExceptionsTestCase(ConnectingTestCase):
self.assertEqual(diag1.sqlstate, '42601') self.assertEqual(diag1.sqlstate, '42601')
self.assertEqual(diag2.sqlstate, '42P01') self.assertEqual(diag2.sqlstate, '42P01')
@skip_if_crdb("deferrable")
def test_diagnostics_from_commit(self): def test_diagnostics_from_commit(self):
cur = self.conn.cursor() cur = self.conn.cursor()
cur.execute(""" cur.execute("""
@ -259,6 +261,7 @@ class ExceptionsTestCase(ConnectingTestCase):
e = exc e = exc
self.assertEqual(e.diag.sqlstate, '23503') self.assertEqual(e.diag.sqlstate, '23503')
@skip_if_crdb("diagnostic")
@skip_before_postgres(9, 3) @skip_before_postgres(9, 3)
def test_9_3_diagnostics(self): def test_9_3_diagnostics(self):
cur = self.conn.cursor() cur = self.conn.cursor()
@ -299,6 +302,7 @@ class ExceptionsTestCase(ConnectingTestCase):
self.assertEqual(e.pgcode, e1.pgcode) self.assertEqual(e.pgcode, e1.pgcode)
self.assert_(e1.cursor is None) self.assert_(e1.cursor is None)
@skip_if_crdb("connect any db")
def test_pickle_connection_error(self): def test_pickle_connection_error(self):
# segfaults on psycopg 2.5.1 - see ticket #170 # segfaults on psycopg 2.5.1 - see ticket #170
try: try:

View File

@ -29,7 +29,7 @@ from collections import deque
import psycopg2 import psycopg2
from psycopg2 import extensions from psycopg2 import extensions
from psycopg2.extensions import Notify from psycopg2.extensions import Notify
from .testutils import ConnectingTestCase, slow from .testutils import ConnectingTestCase, skip_if_crdb, slow
from .testconfig import dsn from .testconfig import dsn
import sys import sys
@ -38,6 +38,7 @@ import select
from subprocess import Popen, PIPE from subprocess import Popen, PIPE
@skip_if_crdb("notify")
class NotifiesTests(ConnectingTestCase): class NotifiesTests(ConnectingTestCase):
def autocommit(self, conn): def autocommit(self, conn):

View File

@ -25,7 +25,7 @@
from . import testutils from . import testutils
import unittest import unittest
from .testutils import ConnectingTestCase, unichr, PY2 from .testutils import ConnectingTestCase, skip_if_crdb, unichr, PY2
import psycopg2 import psycopg2
import psycopg2.extensions import psycopg2.extensions
@ -121,6 +121,7 @@ class QuotingTestCase(ConnectingTestCase):
self.assertEqual(res, data) self.assertEqual(res, data)
self.assert_(not self.conn.notices) self.assert_(not self.conn.notices)
@skip_if_crdb("encoding")
def test_latin1(self): def test_latin1(self):
self.conn.set_client_encoding('LATIN1') self.conn.set_client_encoding('LATIN1')
curs = self.conn.cursor() curs = self.conn.cursor()
@ -146,6 +147,7 @@ class QuotingTestCase(ConnectingTestCase):
self.assertEqual(res, data) self.assertEqual(res, data)
self.assert_(not self.conn.notices) self.assert_(not self.conn.notices)
@skip_if_crdb("encoding")
def test_koi8(self): def test_koi8(self):
self.conn.set_client_encoding('KOI8') self.conn.set_client_encoding('KOI8')
curs = self.conn.cursor() curs = self.conn.cursor()

View File

@ -26,7 +26,8 @@
import datetime as dt import datetime as dt
import unittest import unittest
from .testutils import ( from .testutils import (
ConnectingTestCase, skip_before_postgres, skip_copy_if_green, StringIO) ConnectingTestCase, skip_before_postgres, skip_copy_if_green, StringIO,
skip_if_crdb)
import psycopg2 import psycopg2
from psycopg2 import sql from psycopg2 import sql
@ -151,6 +152,7 @@ class SqlFormatTests(ConnectingTestCase):
self.assertEqual(cur.fetchall(), self.assertEqual(cur.fetchall(),
[(10, 'a', 'b', 'c'), (20, 'd', 'e', 'f')]) [(10, 'a', 'b', 'c'), (20, 'd', 'e', 'f')])
@skip_if_crdb("copy")
@skip_copy_if_green @skip_copy_if_green
@skip_before_postgres(8, 2) @skip_before_postgres(8, 2)
def test_copy(self): def test_copy(self):

View File

@ -26,6 +26,7 @@
import threading import threading
import unittest import unittest
from .testutils import ConnectingTestCase, skip_before_postgres, slow from .testutils import ConnectingTestCase, skip_before_postgres, slow
from .testutils import skip_if_crdb
import psycopg2 import psycopg2
from psycopg2.extensions import ( from psycopg2.extensions import (
@ -36,6 +37,7 @@ class TransactionTests(ConnectingTestCase):
def setUp(self): def setUp(self):
ConnectingTestCase.setUp(self) ConnectingTestCase.setUp(self)
skip_if_crdb("isolation level", self.conn)
self.conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE) self.conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE)
curs = self.conn.cursor() curs = self.conn.cursor()
curs.execute(''' curs.execute('''
@ -102,6 +104,7 @@ class DeadlockSerializationTests(ConnectingTestCase):
def setUp(self): def setUp(self):
ConnectingTestCase.setUp(self) ConnectingTestCase.setUp(self)
skip_if_crdb("isolation level", self.conn)
curs = self.conn.cursor() curs = self.conn.cursor()
# Drop table if it already exists # Drop table if it already exists

View File

@ -32,6 +32,7 @@ import platform
from . import testutils from . import testutils
import unittest import unittest
from .testutils import PY2, long, text_type, ConnectingTestCase, restore_types from .testutils import PY2, long, text_type, ConnectingTestCase, restore_types
from .testutils import skip_if_crdb
import psycopg2 import psycopg2
from psycopg2.extensions import AsIs, adapt, register_adapter from psycopg2.extensions import AsIs, adapt, register_adapter
@ -148,12 +149,14 @@ class TypesBasicTests(ConnectingTestCase):
buf2 = self.execute("SELECT %s::bytea AS foo", (buf,)) buf2 = self.execute("SELECT %s::bytea AS foo", (buf,))
self.assertEqual(s, buf2.tobytes()) self.assertEqual(s, buf2.tobytes())
@skip_if_crdb("nested array")
def testArray(self): def testArray(self):
s = self.execute("SELECT %s AS foo", ([[1, 2], [3, 4]],)) s = self.execute("SELECT %s AS foo", ([[1, 2], [3, 4]],))
self.failUnlessEqual(s, [[1, 2], [3, 4]]) self.failUnlessEqual(s, [[1, 2], [3, 4]])
s = self.execute("SELECT %s AS foo", (['one', 'two', 'three'],)) s = self.execute("SELECT %s AS foo", (['one', 'two', 'three'],))
self.failUnlessEqual(s, ['one', 'two', 'three']) self.failUnlessEqual(s, ['one', 'two', 'three'])
@skip_if_crdb("nested array")
def testEmptyArrayRegression(self): def testEmptyArrayRegression(self):
# ticket #42 # ticket #42
curs = self.conn.cursor() curs = self.conn.cursor()
@ -170,6 +173,7 @@ class TypesBasicTests(ConnectingTestCase):
curs.execute("select col from array_test where id = 2") curs.execute("select col from array_test where id = 2")
self.assertEqual(curs.fetchone()[0], []) self.assertEqual(curs.fetchone()[0], [])
@skip_if_crdb("nested array")
@testutils.skip_before_postgres(8, 4) @testutils.skip_before_postgres(8, 4)
def testNestedEmptyArray(self): def testNestedEmptyArray(self):
# issue #788 # issue #788
@ -235,6 +239,7 @@ class TypesBasicTests(ConnectingTestCase):
self.assert_(isinstance(x[0], bytes)) self.assert_(isinstance(x[0], bytes))
self.assertEqual(x, [b'a', b'b', b'c']) self.assertEqual(x, [b'a', b'b', b'c'])
@skip_if_crdb("nested array")
@testutils.skip_before_postgres(8, 2) @testutils.skip_before_postgres(8, 2)
def testArrayOfNulls(self): def testArrayOfNulls(self):
curs = self.conn.cursor() curs = self.conn.cursor()
@ -271,6 +276,7 @@ class TypesBasicTests(ConnectingTestCase):
curs.execute("insert into na (boolaa) values (%s)", ([[True, None]],)) curs.execute("insert into na (boolaa) values (%s)", ([[True, None]],))
curs.execute("insert into na (boolaa) values (%s)", ([[None, None]],)) curs.execute("insert into na (boolaa) values (%s)", ([[None, None]],))
@skip_if_crdb("nested array")
@testutils.skip_before_postgres(8, 2) @testutils.skip_before_postgres(8, 2)
def testNestedArrays(self): def testNestedArrays(self):
curs = self.conn.cursor() curs = self.conn.cursor()
@ -400,6 +406,7 @@ class TypesBasicTests(ConnectingTestCase):
a = self.execute("select '{1, 2, NULL}'::int4[]") a = self.execute("select '{1, 2, NULL}'::int4[]")
self.assertEqual(a, [2, 4, 'nada']) self.assertEqual(a, [2, 4, 'nada'])
@skip_if_crdb("cidr")
@testutils.skip_before_postgres(8, 2) @testutils.skip_before_postgres(8, 2)
def testNetworkArray(self): def testNetworkArray(self):
# we don't know these types, but we know their arrays # we don't know these types, but we know their arrays

View File

@ -27,7 +27,7 @@ from pickle import dumps, loads
import unittest import unittest
from .testutils import (PY2, text_type, skip_if_no_uuid, skip_before_postgres, from .testutils import (PY2, text_type, skip_if_no_uuid, skip_before_postgres,
ConnectingTestCase, py3_raises_typeerror, slow, skip_from_python, ConnectingTestCase, py3_raises_typeerror, slow, skip_from_python,
restore_types) restore_types, skip_if_crdb, crdb_version)
import psycopg2 import psycopg2
import psycopg2.extras import psycopg2.extras
@ -134,6 +134,7 @@ class TypesExtrasTests(ConnectingTestCase):
def skip_if_no_hstore(f): def skip_if_no_hstore(f):
@wraps(f) @wraps(f)
@skip_if_crdb("hstore")
def skip_if_no_hstore_(self): def skip_if_no_hstore_(self):
oids = HstoreAdapter.get_oids(self.conn) oids = HstoreAdapter.get_oids(self.conn)
if oids is None or not oids[0]: if oids is None or not oids[0]:
@ -417,6 +418,7 @@ class HstoreTestCase(ConnectingTestCase):
def skip_if_no_composite(f): def skip_if_no_composite(f):
@wraps(f) @wraps(f)
@skip_if_crdb("composite")
def skip_if_no_composite_(self): def skip_if_no_composite_(self):
if self.conn.info.server_version < 80000: if self.conn.info.server_version < 80000:
return self.skipTest( return self.skipTest(
@ -786,6 +788,7 @@ def skip_if_no_json_type(f):
return skip_if_no_json_type_ return skip_if_no_json_type_
@skip_if_crdb("json")
class JsonTestCase(ConnectingTestCase): class JsonTestCase(ConnectingTestCase):
def test_adapt(self): def test_adapt(self):
objs = [None, "te'xt", 123, 123.45, objs = [None, "te'xt", 123, 123.45,
@ -990,8 +993,9 @@ class JsonbTestCase(ConnectingTestCase):
curs.execute("""select '{"a": 100.0, "b": null}'::jsonb""") curs.execute("""select '{"a": 100.0, "b": null}'::jsonb""")
self.assertEqual(curs.fetchone()[0], {'a': 100.0, 'b': None}) self.assertEqual(curs.fetchone()[0], {'a': 100.0, 'b': None})
curs.execute("""select array['{"a": 100.0, "b": null}']::jsonb[]""") if crdb_version(self.conn) is None:
self.assertEqual(curs.fetchone()[0], [{'a': 100.0, 'b': None}]) curs.execute("""select array['{"a": 100.0, "b": null}']::jsonb[]""")
self.assertEqual(curs.fetchone()[0], [{'a': 100.0, 'b': None}])
def test_register_on_connection(self): def test_register_on_connection(self):
psycopg2.extras.register_json(self.conn, loads=self.myloads, name='jsonb') psycopg2.extras.register_json(self.conn, loads=self.myloads, name='jsonb')
@ -1025,11 +1029,12 @@ class JsonbTestCase(ConnectingTestCase):
data = curs.fetchone()[0] data = curs.fetchone()[0]
self.assert_(isinstance(data['a'], Decimal)) self.assert_(isinstance(data['a'], Decimal))
self.assertEqual(data['a'], Decimal('100.0')) self.assertEqual(data['a'], Decimal('100.0'))
# sure we are not manling json too? # sure we are not mangling json too?
curs.execute("""select '{"a": 100.0, "b": null}'::json""") if crdb_version(self.conn) is None:
data = curs.fetchone()[0] curs.execute("""select '{"a": 100.0, "b": null}'::json""")
self.assert_(isinstance(data['a'], float)) data = curs.fetchone()[0]
self.assertEqual(data['a'], 100.0) self.assert_(isinstance(data['a'], float))
self.assertEqual(data['a'], 100.0)
def test_register_default(self): def test_register_default(self):
curs = self.conn.cursor() curs = self.conn.cursor()
@ -1044,17 +1049,19 @@ class JsonbTestCase(ConnectingTestCase):
self.assert_(isinstance(data['a'], Decimal)) self.assert_(isinstance(data['a'], Decimal))
self.assertEqual(data['a'], Decimal('100.0')) self.assertEqual(data['a'], Decimal('100.0'))
curs.execute("""select array['{"a": 100.0, "b": null}']::jsonb[]""") if crdb_version(self.conn) is None:
data = curs.fetchone()[0] curs.execute("""select array['{"a": 100.0, "b": null}']::jsonb[]""")
self.assert_(isinstance(data[0]['a'], Decimal)) data = curs.fetchone()[0]
self.assertEqual(data[0]['a'], Decimal('100.0')) self.assert_(isinstance(data[0]['a'], Decimal))
self.assertEqual(data[0]['a'], Decimal('100.0'))
def test_null(self): def test_null(self):
curs = self.conn.cursor() curs = self.conn.cursor()
curs.execute("""select NULL::jsonb""") curs.execute("""select NULL::jsonb""")
self.assertEqual(curs.fetchone()[0], None) self.assertEqual(curs.fetchone()[0], None)
curs.execute("""select NULL::jsonb[]""") if crdb_version(self.conn) is None:
self.assertEqual(curs.fetchone()[0], None) curs.execute("""select NULL::jsonb[]""")
self.assertEqual(curs.fetchone()[0], None)
class RangeTestCase(unittest.TestCase): class RangeTestCase(unittest.TestCase):
@ -1325,6 +1332,7 @@ class RangeTestCase(unittest.TestCase):
self.assertEqual(result, expected) self.assertEqual(result, expected)
@skip_if_crdb("range")
@skip_before_postgres(9, 2, "range not supported before postgres 9.2") @skip_before_postgres(9, 2, "range not supported before postgres 9.2")
class RangeCasterTestCase(ConnectingTestCase): class RangeCasterTestCase(ConnectingTestCase):

View File

@ -27,7 +27,7 @@ import psycopg2
import psycopg2.extensions as ext import psycopg2.extensions as ext
import unittest import unittest
from .testutils import ConnectingTestCase, skip_before_postgres from .testutils import ConnectingTestCase, skip_before_postgres, skip_if_crdb
class WithTestCase(ConnectingTestCase): class WithTestCase(ConnectingTestCase):
@ -203,6 +203,7 @@ class WithCursorTestCase(WithTestCase):
self.assert_(curs.closed) self.assert_(curs.closed)
self.assert_(closes) self.assert_(closes)
@skip_if_crdb("named cursor")
def test_exception_swallow(self): def test_exception_swallow(self):
# bug #262: __exit__ calls cur.close() that hides the exception # bug #262: __exit__ calls cur.close() that hides the exception
# with another error. # with another error.
@ -216,6 +217,7 @@ class WithCursorTestCase(WithTestCase):
else: else:
self.fail("where is my exception?") self.fail("where is my exception?")
@skip_if_crdb("named cursor")
@skip_before_postgres(8, 2) @skip_before_postgres(8, 2)
def test_named_with_noop(self): def test_named_with_noop(self):
with self.conn.cursor('named'): with self.conn.cursor('named'):

View File

@ -3,10 +3,10 @@
import os import os
dbname = os.environ.get('PSYCOPG2_TESTDB', 'psycopg2_test') dbname = os.environ.get('PSYCOPG2_TESTDB', 'psycopg2_test')
dbhost = os.environ.get('PSYCOPG2_TESTDB_HOST', None) dbhost = os.environ.get('PSYCOPG2_TESTDB_HOST', os.environ.get('PGHOST'))
dbport = os.environ.get('PSYCOPG2_TESTDB_PORT', None) dbport = os.environ.get('PSYCOPG2_TESTDB_PORT', os.environ.get('PGPORT'))
dbuser = os.environ.get('PSYCOPG2_TESTDB_USER', None) dbuser = os.environ.get('PSYCOPG2_TESTDB_USER', os.environ.get('PGUSER'))
dbpass = os.environ.get('PSYCOPG2_TESTDB_PASSWORD', None) dbpass = os.environ.get('PSYCOPG2_TESTDB_PASSWORD', os.environ.get('PGPASSWORD'))
# Check if we want to test psycopg's green path. # Check if we want to test psycopg's green path.
green = os.environ.get('PSYCOPG2_TEST_GREEN', None) green = os.environ.get('PSYCOPG2_TEST_GREEN', None)

View File

@ -29,6 +29,7 @@ import sys
import types import types
import ctypes import ctypes
import select import select
import operator
import platform import platform
import unittest import unittest
from functools import wraps from functools import wraps
@ -37,7 +38,7 @@ from ctypes.util import find_library
import psycopg2 import psycopg2
import psycopg2.errors import psycopg2.errors
import psycopg2.extensions import psycopg2.extensions
from psycopg2.compat import PY2, PY3, text_type from psycopg2.compat import PY2, PY3, string_types, text_type
from .testconfig import green, dsn, repl_dsn from .testconfig import green, dsn, repl_dsn
@ -248,6 +249,8 @@ def skip_if_tpc_disabled(f):
@wraps(f) @wraps(f)
def skip_if_tpc_disabled_(self): def skip_if_tpc_disabled_(self):
cnn = self.connect() cnn = self.connect()
skip_if_crdb("2-phase commit", cnn)
cur = cnn.cursor() cur = cnn.cursor()
try: try:
cur.execute("SHOW max_prepared_transactions;") cur.execute("SHOW max_prepared_transactions;")
@ -407,6 +410,114 @@ def skip_if_windows(cls):
return decorator(cls) return decorator(cls)
def crdb_version(conn, __crdb_version=[]):
"""
Return the CockroachDB version if that's the db being tested, else None.
Return the number as an integer similar to PQserverVersion: return
v20.1.3 as 200103.
Assume all the connections are on the same db: return a cached result on
following calls.
"""
if __crdb_version:
return __crdb_version[0]
sver = conn.info.parameter_status("crdb_version")
if sver is None:
__crdb_version.append(None)
else:
m = re.search(r"\bv(\d+)\.(\d+)\.(\d+)", sver)
if not m:
raise ValueError(
"can't parse CockroachDB version from %s" % sver)
ver = int(m.group(1)) * 10000 + int(m.group(2)) * 100 + int(m.group(3))
__crdb_version.append(ver)
return __crdb_version[0]
def skip_if_crdb(reason, conn=None, version=None):
"""Skip a test or test class if we are testing against CockroachDB.
Can be used as a decorator for tests function or classes:
@skip_if_crdb("my reason")
class SomeUnitTest(UnitTest):
# ...
Or as a normal function if the *conn* argument is passed.
If *version* is specified it should be a string such as ">= 20.1", "< 20",
"== 20.1.3": the test will be skipped only if the version matches.
"""
if not isinstance(reason, string_types):
raise TypeError("reason should be a string, got %r instead" % reason)
if conn is not None:
ver = crdb_version(conn)
if ver is not None and _crdb_match_version(ver, version):
if reason in crdb_reasons:
reason = (
"%s (https://github.com/cockroachdb/cockroach/issues/%s)"
% (reason, crdb_reasons[reason]))
raise unittest.SkipTest(
"not supported on CockroachDB %s: %s" % (ver, reason))
@decorate_all_tests
def skip_if_crdb_(f):
@wraps(f)
def skip_if_crdb__(self, *args, **kwargs):
skip_if_crdb(reason, conn=self.connect(), version=version)
return f(self, *args, **kwargs)
return skip_if_crdb__
return skip_if_crdb_
# mapping from reason description to ticket number
crdb_reasons = {
"2-phase commit": 22329,
"backend pid": 35897,
"cancel": 41335,
"cast adds tz": 51692,
"cidr": 18846,
"composite": 27792,
"copy": 41608,
"deferrable": 48307,
"encoding": 35882,
"hstore": 41284,
"infinity date": 41564,
"interval style": 35807,
"large objects": 243,
"named cursor": 41412,
"nested array": 32552,
"notify": 41522,
"range": 41282,
"stored procedure": 1751,
}
def _crdb_match_version(version, pattern):
if pattern is None:
return True
m = re.match(r'^(>|>=|<|<=|==|!=)\s*(\d+)(?:\.(\d+))?(?:\.(\d+))?$', pattern)
if m is None:
raise ValueError(
"bad crdb version pattern %r: should be 'OP MAJOR[.MINOR[.BUGFIX]]'"
% pattern)
ops = {'>': 'gt', '>=': 'ge', '<': 'lt', '<=': 'le', '==': 'eq', '!=': 'ne'}
op = getattr(operator, ops[m.group(1)])
ref = int(m.group(2)) * 10000 + int(m.group(3) or 0) * 100 + int(m.group(4) or 0)
return op(version, ref)
class py3_raises_typeerror(object): class py3_raises_typeerror(object):
def __enter__(self): def __enter__(self):
pass pass