mirror of
https://github.com/psycopg/psycopg2.git
synced 2025-01-31 09:24:07 +03:00
Cleanup of skipping of testing methods on certain Py/PG versions
This commit is contained in:
parent
e4a84b9ce9
commit
c96ba553da
|
@ -23,7 +23,7 @@
|
||||||
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
||||||
# License for more details.
|
# License for more details.
|
||||||
|
|
||||||
from testutils import unittest, skip_if_no_pg_sleep
|
from testutils import unittest, skip_before_postgres
|
||||||
|
|
||||||
import psycopg2
|
import psycopg2
|
||||||
from psycopg2 import extensions
|
from psycopg2 import extensions
|
||||||
|
@ -113,7 +113,7 @@ class AsyncTests(unittest.TestCase):
|
||||||
self.assertFalse(self.conn.isexecuting())
|
self.assertFalse(self.conn.isexecuting())
|
||||||
self.assertEquals(cur.fetchone()[0], "a")
|
self.assertEquals(cur.fetchone()[0], "a")
|
||||||
|
|
||||||
@skip_if_no_pg_sleep('conn')
|
@skip_before_postgres(8, 2)
|
||||||
def test_async_callproc(self):
|
def test_async_callproc(self):
|
||||||
cur = self.conn.cursor()
|
cur = self.conn.cursor()
|
||||||
cur.callproc("pg_sleep", (0.1, ))
|
cur.callproc("pg_sleep", (0.1, ))
|
||||||
|
|
|
@ -31,7 +31,7 @@ import psycopg2.extensions
|
||||||
from psycopg2 import extras
|
from psycopg2 import extras
|
||||||
|
|
||||||
from testconfig import dsn
|
from testconfig import dsn
|
||||||
from testutils import unittest, skip_if_no_pg_sleep
|
from testutils import unittest, skip_before_postgres
|
||||||
|
|
||||||
class CancelTests(unittest.TestCase):
|
class CancelTests(unittest.TestCase):
|
||||||
|
|
||||||
|
@ -50,7 +50,7 @@ class CancelTests(unittest.TestCase):
|
||||||
def test_empty_cancel(self):
|
def test_empty_cancel(self):
|
||||||
self.conn.cancel()
|
self.conn.cancel()
|
||||||
|
|
||||||
@skip_if_no_pg_sleep('conn')
|
@skip_before_postgres(8, 2)
|
||||||
def test_cancel(self):
|
def test_cancel(self):
|
||||||
errors = []
|
errors = []
|
||||||
|
|
||||||
|
@ -86,7 +86,7 @@ class CancelTests(unittest.TestCase):
|
||||||
|
|
||||||
self.assertEqual(errors, [])
|
self.assertEqual(errors, [])
|
||||||
|
|
||||||
@skip_if_no_pg_sleep('conn')
|
@skip_before_postgres(8, 2)
|
||||||
def test_async_cancel(self):
|
def test_async_cancel(self):
|
||||||
async_conn = psycopg2.connect(dsn, async=True)
|
async_conn = psycopg2.connect(dsn, async=True)
|
||||||
self.assertRaises(psycopg2.OperationalError, async_conn.cancel)
|
self.assertRaises(psycopg2.OperationalError, async_conn.cancel)
|
||||||
|
|
|
@ -24,7 +24,7 @@
|
||||||
|
|
||||||
import time
|
import time
|
||||||
import threading
|
import threading
|
||||||
from testutils import unittest, decorate_all_tests, skip_if_no_pg_sleep
|
from testutils import unittest, decorate_all_tests, skip_before_postgres
|
||||||
from operator import attrgetter
|
from operator import attrgetter
|
||||||
|
|
||||||
import psycopg2
|
import psycopg2
|
||||||
|
@ -114,7 +114,7 @@ class ConnectionTests(unittest.TestCase):
|
||||||
self.assertRaises(psycopg2.NotSupportedError,
|
self.assertRaises(psycopg2.NotSupportedError,
|
||||||
cnn.xid, 42, "foo", "bar")
|
cnn.xid, 42, "foo", "bar")
|
||||||
|
|
||||||
@skip_if_no_pg_sleep('conn')
|
@skip_before_postgres(8, 2)
|
||||||
def test_concurrent_execution(self):
|
def test_concurrent_execution(self):
|
||||||
def slave():
|
def slave():
|
||||||
cnn = psycopg2.connect(dsn)
|
cnn = psycopg2.connect(dsn)
|
||||||
|
|
|
@ -27,7 +27,7 @@ import psycopg2
|
||||||
import psycopg2.extensions
|
import psycopg2.extensions
|
||||||
from psycopg2.extensions import b
|
from psycopg2.extensions import b
|
||||||
from testconfig import dsn
|
from testconfig import dsn
|
||||||
from testutils import unittest, skip_if_no_pg_sleep
|
from testutils import unittest, skip_before_postgres
|
||||||
|
|
||||||
class CursorTests(unittest.TestCase):
|
class CursorTests(unittest.TestCase):
|
||||||
|
|
||||||
|
@ -130,7 +130,7 @@ class CursorTests(unittest.TestCase):
|
||||||
del curs
|
del curs
|
||||||
self.assert_(w() is None)
|
self.assert_(w() is None)
|
||||||
|
|
||||||
@skip_if_no_pg_sleep('conn')
|
@skip_before_postgres(8, 2)
|
||||||
def test_iter_named_cursor_efficient(self):
|
def test_iter_named_cursor_efficient(self):
|
||||||
curs = self.conn.cursor('tmp')
|
curs = self.conn.cursor('tmp')
|
||||||
# if these records are fetched in the same roundtrip their
|
# if these records are fetched in the same roundtrip their
|
||||||
|
@ -144,6 +144,7 @@ class CursorTests(unittest.TestCase):
|
||||||
"named cursor records fetched in 2 roundtrips (delta: %s)"
|
"named cursor records fetched in 2 roundtrips (delta: %s)"
|
||||||
% (t2 - t1))
|
% (t2 - t1))
|
||||||
|
|
||||||
|
@skip_before_postgres(8, 0)
|
||||||
def test_iter_named_cursor_default_arraysize(self):
|
def test_iter_named_cursor_default_arraysize(self):
|
||||||
curs = self.conn.cursor('tmp')
|
curs = self.conn.cursor('tmp')
|
||||||
curs.execute('select generate_series(1,50)')
|
curs.execute('select generate_series(1,50)')
|
||||||
|
@ -151,6 +152,7 @@ class CursorTests(unittest.TestCase):
|
||||||
# everything swallowed in one gulp
|
# everything swallowed in one gulp
|
||||||
self.assertEqual(rv, [(i,i) for i in range(1,51)])
|
self.assertEqual(rv, [(i,i) for i in range(1,51)])
|
||||||
|
|
||||||
|
@skip_before_postgres(8, 0)
|
||||||
def test_iter_named_cursor_arraysize(self):
|
def test_iter_named_cursor_arraysize(self):
|
||||||
curs = self.conn.cursor('tmp')
|
curs = self.conn.cursor('tmp')
|
||||||
curs.arraysize = 30
|
curs.arraysize = 30
|
||||||
|
|
|
@ -23,7 +23,7 @@
|
||||||
# License for more details.
|
# License for more details.
|
||||||
|
|
||||||
import threading
|
import threading
|
||||||
from testutils import unittest, skip_if_no_pg_sleep
|
from testutils import unittest, skip_before_postgres
|
||||||
|
|
||||||
import psycopg2
|
import psycopg2
|
||||||
from psycopg2.extensions import (
|
from psycopg2.extensions import (
|
||||||
|
@ -236,7 +236,7 @@ class QueryCancellationTests(unittest.TestCase):
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
self.conn.close()
|
self.conn.close()
|
||||||
|
|
||||||
@skip_if_no_pg_sleep('conn')
|
@skip_before_postgres(8, 2)
|
||||||
def test_statement_timeout(self):
|
def test_statement_timeout(self):
|
||||||
curs = self.conn.cursor()
|
curs = self.conn.cursor()
|
||||||
# Set a low statement timeout, then sleep for a longer period.
|
# Set a low statement timeout, then sleep for a longer period.
|
||||||
|
|
|
@ -102,31 +102,6 @@ def skip_if_no_uuid(f):
|
||||||
return skip_if_no_uuid_
|
return skip_if_no_uuid_
|
||||||
|
|
||||||
|
|
||||||
def skip_if_no_pg_sleep(name):
|
|
||||||
"""Decorator to skip a test if pg_sleep is not supported by the server.
|
|
||||||
|
|
||||||
Pass it the name of an attribute containing a connection or of a method
|
|
||||||
returning a connection.
|
|
||||||
"""
|
|
||||||
def skip_if_no_pg_sleep_(f):
|
|
||||||
def skip_if_no_pg_sleep__(self):
|
|
||||||
cnn = getattr(self, name)
|
|
||||||
if callable(cnn):
|
|
||||||
cnn = cnn()
|
|
||||||
|
|
||||||
if cnn.server_version < 80200:
|
|
||||||
return self.skipTest(
|
|
||||||
"server version %s doesn't support pg_sleep"
|
|
||||||
% cnn.server_version)
|
|
||||||
|
|
||||||
return f(self)
|
|
||||||
|
|
||||||
skip_if_no_pg_sleep__.__name__ = f.__name__
|
|
||||||
return skip_if_no_pg_sleep__
|
|
||||||
|
|
||||||
return skip_if_no_pg_sleep_
|
|
||||||
|
|
||||||
|
|
||||||
def skip_if_tpc_disabled(f):
|
def skip_if_tpc_disabled(f):
|
||||||
"""Skip a test if the server has tpc support disabled."""
|
"""Skip a test if the server has tpc support disabled."""
|
||||||
def skip_if_tpc_disabled_(self):
|
def skip_if_tpc_disabled_(self):
|
||||||
|
@ -165,25 +140,60 @@ def skip_if_no_iobase(f):
|
||||||
return skip_if_no_iobase_
|
return skip_if_no_iobase_
|
||||||
|
|
||||||
|
|
||||||
def skip_on_python2(f):
|
def skip_before_postgres(*ver):
|
||||||
"""Skip a test on Python 3 and following."""
|
"""Skip a test on PostgreSQL before a certain version."""
|
||||||
def skip_on_python2_(self):
|
ver = ver + (0,) * (3 - len(ver))
|
||||||
if sys.version_info[0] < 3:
|
def skip_before_postgres_(f):
|
||||||
return self.skipTest("skipped because Python 2")
|
def skip_before_postgres__(self):
|
||||||
else:
|
if self.conn.server_version < int("%d%02d%02d" % ver):
|
||||||
return f(self)
|
return self.skipTest("skipped because PostgreSQL %s"
|
||||||
|
% self.conn.server_version)
|
||||||
|
else:
|
||||||
|
return f(self)
|
||||||
|
|
||||||
return skip_on_python2_
|
return skip_before_postgres__
|
||||||
|
return skip_before_postgres_
|
||||||
|
|
||||||
def skip_on_python3(f):
|
def skip_after_postgres(*ver):
|
||||||
"""Skip a test on Python 3 and following."""
|
"""Skip a test on PostgreSQL after (including) a certain version."""
|
||||||
def skip_on_python3_(self):
|
ver = ver + (0,) * (3 - len(ver))
|
||||||
if sys.version_info[0] >= 3:
|
def skip_after_postgres_(f):
|
||||||
return self.skipTest("skipped because Python 3")
|
def skip_after_postgres__(self):
|
||||||
else:
|
if self.conn.server_version >= int("%d%02d%02d" % ver):
|
||||||
return f(self)
|
return self.skipTest("skipped because PostgreSQL %s"
|
||||||
|
% self.conn.server_version)
|
||||||
|
else:
|
||||||
|
return f(self)
|
||||||
|
|
||||||
|
return skip_after_postgres__
|
||||||
|
return skip_after_postgres_
|
||||||
|
|
||||||
|
def skip_before_python(*ver):
|
||||||
|
"""Skip a test on Python before a certain version."""
|
||||||
|
def skip_before_python_(f):
|
||||||
|
def skip_before_python__(self):
|
||||||
|
if sys.version_info[:len(ver)] < ver:
|
||||||
|
return self.skipTest("skipped because Python %s"
|
||||||
|
% ".".join(map(str, sys.version_info[:len(ver)])))
|
||||||
|
else:
|
||||||
|
return f(self)
|
||||||
|
|
||||||
|
return skip_before_python__
|
||||||
|
return skip_before_python_
|
||||||
|
|
||||||
|
def skip_from_python(*ver):
|
||||||
|
"""Skip a test on Python after (including) a certain version."""
|
||||||
|
def skip_from_python_(f):
|
||||||
|
def skip_from_python__(self):
|
||||||
|
if sys.version_info[:len(ver)] >= ver:
|
||||||
|
return self.skipTest("skipped because Python %s"
|
||||||
|
% ".".join(map(str, sys.version_info[:len(ver)])))
|
||||||
|
else:
|
||||||
|
return f(self)
|
||||||
|
|
||||||
|
return skip_from_python__
|
||||||
|
return skip_from_python_
|
||||||
|
|
||||||
return skip_on_python3_
|
|
||||||
|
|
||||||
def script_to_py3(script):
|
def script_to_py3(script):
|
||||||
"""Convert a script to Python3 syntax if required."""
|
"""Convert a script to Python3 syntax if required."""
|
||||||
|
|
|
@ -171,7 +171,7 @@ class TypesBasicTests(unittest.TestCase):
|
||||||
curs.execute("select col from array_test where id = 2")
|
curs.execute("select col from array_test where id = 2")
|
||||||
self.assertEqual(curs.fetchone()[0], [])
|
self.assertEqual(curs.fetchone()[0], [])
|
||||||
|
|
||||||
@testutils.skip_on_python3
|
@testutils.skip_from_python(3)
|
||||||
def testTypeRoundtripBuffer(self):
|
def testTypeRoundtripBuffer(self):
|
||||||
o1 = buffer("".join(map(chr, range(256))))
|
o1 = buffer("".join(map(chr, range(256))))
|
||||||
o2 = self.execute("select %s;", (o1,))
|
o2 = self.execute("select %s;", (o1,))
|
||||||
|
@ -182,14 +182,14 @@ class TypesBasicTests(unittest.TestCase):
|
||||||
o2 = self.execute("select %s;", (o1,))
|
o2 = self.execute("select %s;", (o1,))
|
||||||
self.assertEqual(type(o1), type(o2))
|
self.assertEqual(type(o1), type(o2))
|
||||||
|
|
||||||
@testutils.skip_on_python3
|
@testutils.skip_from_python(3)
|
||||||
def testTypeRoundtripBufferArray(self):
|
def testTypeRoundtripBufferArray(self):
|
||||||
o1 = buffer("".join(map(chr, range(256))))
|
o1 = buffer("".join(map(chr, range(256))))
|
||||||
o1 = [o1]
|
o1 = [o1]
|
||||||
o2 = self.execute("select %s;", (o1,))
|
o2 = self.execute("select %s;", (o1,))
|
||||||
self.assertEqual(type(o1[0]), type(o2[0]))
|
self.assertEqual(type(o1[0]), type(o2[0]))
|
||||||
|
|
||||||
@testutils.skip_on_python2
|
@testutils.skip_before_python(3)
|
||||||
def testTypeRoundtripBytes(self):
|
def testTypeRoundtripBytes(self):
|
||||||
o1 = bytes(range(256))
|
o1 = bytes(range(256))
|
||||||
o2 = self.execute("select %s;", (o1,))
|
o2 = self.execute("select %s;", (o1,))
|
||||||
|
@ -200,14 +200,14 @@ class TypesBasicTests(unittest.TestCase):
|
||||||
o2 = self.execute("select %s;", (o1,))
|
o2 = self.execute("select %s;", (o1,))
|
||||||
self.assertEqual(memoryview, type(o2))
|
self.assertEqual(memoryview, type(o2))
|
||||||
|
|
||||||
@testutils.skip_on_python2
|
@testutils.skip_before_python(3)
|
||||||
def testTypeRoundtripBytesArray(self):
|
def testTypeRoundtripBytesArray(self):
|
||||||
o1 = bytes(range(256))
|
o1 = bytes(range(256))
|
||||||
o1 = [o1]
|
o1 = [o1]
|
||||||
o2 = self.execute("select %s;", (o1,))
|
o2 = self.execute("select %s;", (o1,))
|
||||||
self.assertEqual(memoryview, type(o2[0]))
|
self.assertEqual(memoryview, type(o2[0]))
|
||||||
|
|
||||||
@testutils.skip_on_python2
|
@testutils.skip_before_python(3)
|
||||||
def testAdaptBytearray(self):
|
def testAdaptBytearray(self):
|
||||||
o1 = bytearray(range(256))
|
o1 = bytearray(range(256))
|
||||||
o2 = self.execute("select %s;", (o1,))
|
o2 = self.execute("select %s;", (o1,))
|
||||||
|
@ -218,7 +218,7 @@ class TypesBasicTests(unittest.TestCase):
|
||||||
o2 = self.execute("select %s;", (o1,))
|
o2 = self.execute("select %s;", (o1,))
|
||||||
self.assertEqual(memoryview, type(o2))
|
self.assertEqual(memoryview, type(o2))
|
||||||
|
|
||||||
@testutils.skip_on_python2
|
@testutils.skip_before_python(3)
|
||||||
def testAdaptMemoryview(self):
|
def testAdaptMemoryview(self):
|
||||||
o1 = memoryview(bytes(range(256)))
|
o1 = memoryview(bytes(range(256)))
|
||||||
o2 = self.execute("select %s;", (o1,))
|
o2 = self.execute("select %s;", (o1,))
|
||||||
|
@ -253,7 +253,7 @@ class AdaptSubclassTest(unittest.TestCase):
|
||||||
del psycopg2.extensions.adapters[A, psycopg2.extensions.ISQLQuote]
|
del psycopg2.extensions.adapters[A, psycopg2.extensions.ISQLQuote]
|
||||||
del psycopg2.extensions.adapters[B, psycopg2.extensions.ISQLQuote]
|
del psycopg2.extensions.adapters[B, psycopg2.extensions.ISQLQuote]
|
||||||
|
|
||||||
@testutils.skip_on_python3
|
@testutils.skip_from_python(3)
|
||||||
def test_no_mro_no_joy(self):
|
def test_no_mro_no_joy(self):
|
||||||
from psycopg2.extensions import adapt, register_adapter, AsIs
|
from psycopg2.extensions import adapt, register_adapter, AsIs
|
||||||
|
|
||||||
|
@ -267,7 +267,7 @@ class AdaptSubclassTest(unittest.TestCase):
|
||||||
del psycopg2.extensions.adapters[A, psycopg2.extensions.ISQLQuote]
|
del psycopg2.extensions.adapters[A, psycopg2.extensions.ISQLQuote]
|
||||||
|
|
||||||
|
|
||||||
@testutils.skip_on_python2
|
@testutils.skip_before_python(3)
|
||||||
def test_adapt_subtype_3(self):
|
def test_adapt_subtype_3(self):
|
||||||
from psycopg2.extensions import adapt, register_adapter, AsIs
|
from psycopg2.extensions import adapt, register_adapter, AsIs
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user