From aaba4dcf87431079203c900fbec8ca2a519392b9 Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Sat, 16 Mar 2019 18:41:59 +0000 Subject: [PATCH 1/4] TextIOBase moved to tests compat imports --- tests/test_copy.py | 16 +++++----------- tests/testutils.py | 3 +++ 2 files changed, 8 insertions(+), 11 deletions(-) diff --git a/tests/test_copy.py b/tests/test_copy.py index e800aed0..639c1691 100755 --- a/tests/test_copy.py +++ b/tests/test_copy.py @@ -31,17 +31,11 @@ from subprocess import Popen, PIPE import psycopg2 import psycopg2.extensions -from .testutils import skip_copy_if_green +from .testutils import skip_copy_if_green, TextIOBase from .testconfig import dsn -if sys.version_info[0] < 3: - _base = object -else: - from io import TextIOBase as _base - - -class MinimalRead(_base): +class MinimalRead(TextIOBase): """A file wrapper exposing the minimal interface to copy from.""" def __init__(self, f): self.f = f @@ -53,7 +47,7 @@ class MinimalRead(_base): return self.f.readline() -class MinimalWrite(_base): +class MinimalWrite(TextIOBase): """A file wrapper exposing the minimal interface to copy to.""" def __init__(self, f): self.f = f @@ -351,7 +345,7 @@ conn.close() self.assertEqual(0, proc.returncode) def test_copy_from_propagate_error(self): - class BrokenRead(_base): + class BrokenRead(TextIOBase): def read(self, size): return 1 / 0 @@ -368,7 +362,7 @@ conn.close() self.assert_('ZeroDivisionError' in str(e)) def test_copy_to_propagate_error(self): - class BrokenWrite(_base): + class BrokenWrite(TextIOBase): def write(self, data): return 1 / 0 diff --git a/tests/testutils.py b/tests/testutils.py index 8dab1b63..24b99e52 100644 --- a/tests/testutils.py +++ b/tests/testutils.py @@ -43,12 +43,15 @@ from .testconfig import green if sys.version_info[0] == 2: # Python 2 from StringIO import StringIO + TextIOBase = object long = long reload = reload unichr = unichr + else: # Python 3 from io import StringIO # noqa + from io import TextIOBase # noqa from importlib import reload # noqa long = int unichr = chr From 8cfe176a8501be37a925a364e510b32db43e82cc Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Sat, 16 Mar 2019 16:53:38 +0000 Subject: [PATCH 2/4] Dropped repeated conditional import of reload in test --- tests/test_errcodes.py | 8 -------- 1 file changed, 8 deletions(-) diff --git a/tests/test_errcodes.py b/tests/test_errcodes.py index c510f720..676d12d2 100755 --- a/tests/test_errcodes.py +++ b/tests/test_errcodes.py @@ -25,14 +25,6 @@ import unittest from .testutils import ConnectingTestCase, slow, reload -try: - reload -except NameError: - try: - from importlib import reload - except ImportError: - from imp import reload - from threading import Thread from psycopg2 import errorcodes From b0119fef816daf4f47191359dc3fa609f9783e2c Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Sat, 16 Mar 2019 18:56:56 +0000 Subject: [PATCH 3/4] Other import moved to top level in tests --- tests/test_async.py | 10 ++++------ tests/test_connection.py | 22 +++++++++------------- tests/test_copy.py | 4 +--- tests/test_cursor.py | 4 ++-- tests/test_dates.py | 29 +++++++++-------------------- tests/test_green.py | 2 +- tests/test_ipaddress.py | 12 +++--------- tests/test_module.py | 3 +-- tests/test_replication.py | 2 +- tests/test_types_basic.py | 9 +++------ tests/test_types_extras.py | 7 +++---- tests/testutils.py | 17 +++++++---------- 12 files changed, 44 insertions(+), 77 deletions(-) diff --git a/tests/test_async.py b/tests/test_async.py index 894d8100..057deac1 100755 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -23,15 +23,15 @@ # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. +import gc +import time import unittest -from .testutils import skip_before_postgres, slow +import warnings import psycopg2 from psycopg2 import extensions as ext -import time - -from .testutils import ConnectingTestCase, StringIO +from .testutils import ConnectingTestCase, StringIO, skip_before_postgres, slow class PollableStub(object): @@ -338,7 +338,6 @@ class AsyncTests(ConnectingTestCase): # on high load on linux: probably because the kernel has more # buffers ready. A warning may be useful during development, # but an error is bad during regression testing. - import warnings warnings.warn("sending a large query didn't trigger block on write.") def test_sync_poll(self): @@ -427,7 +426,6 @@ class AsyncTests(ConnectingTestCase): self.assert_(self.conn.notices) def test_async_cursor_gone(self): - import gc cur = self.conn.cursor() cur.execute("select 42;") del cur diff --git a/tests/test_connection.py b/tests/test_connection.py index 1c35e8ba..0ad7d37e 100755 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -35,11 +35,8 @@ from operator import attrgetter from weakref import ref import psycopg2 -import psycopg2.errorcodes +import psycopg2.extras from psycopg2 import extensions as ext -from psycopg2 import ProgrammingError -from psycopg2.extensions import Xid -from psycopg2.extras import RealDictConnection from .testutils import ( unittest, skip_if_no_superuser, skip_before_postgres, @@ -296,7 +293,6 @@ class ConnectionTests(ConnectingTestCase): self.assert_(not notices, "%d notices raised" % len(notices)) def test_connect_cursor_factory(self): - import psycopg2.extras conn = self.connect(cursor_factory=psycopg2.extras.DictCursor) cur = conn.cursor() cur.execute("select 1 as a") @@ -369,7 +365,7 @@ class ParseDsnTestCase(ConnectingTestCase): dict(user='tester', password='secret', dbname='test'), "simple DSN parsed") - self.assertRaises(ProgrammingError, ext.parse_dsn, + self.assertRaises(psycopg2.ProgrammingError, ext.parse_dsn, "dbname=test 2 user=tester password=secret") self.assertEqual( @@ -383,7 +379,7 @@ class ParseDsnTestCase(ConnectingTestCase): try: # unterminated quote after dbname: ext.parse_dsn("dbname='test 2 user=tester password=secret") - except ProgrammingError as e: + except psycopg2.ProgrammingError as e: raised = True self.assertTrue(str(e).find('secret') < 0, "DSN was not exposed in error message") @@ -1124,27 +1120,27 @@ class ConnectionTwoPhaseTests(ConnectingTestCase): cnn.tpc_rollback(xid) def test_xid_construction(self): - x1 = Xid(74, 'foo', 'bar') + x1 = ext.Xid(74, 'foo', 'bar') self.assertEqual(74, x1.format_id) self.assertEqual('foo', x1.gtrid) self.assertEqual('bar', x1.bqual) def test_xid_from_string(self): - x2 = Xid.from_string('42_Z3RyaWQ=_YnF1YWw=') + x2 = ext.Xid.from_string('42_Z3RyaWQ=_YnF1YWw=') self.assertEqual(42, x2.format_id) self.assertEqual('gtrid', x2.gtrid) self.assertEqual('bqual', x2.bqual) - x3 = Xid.from_string('99_xxx_yyy') + x3 = ext.Xid.from_string('99_xxx_yyy') self.assertEqual(None, x3.format_id) self.assertEqual('99_xxx_yyy', x3.gtrid) self.assertEqual(None, x3.bqual) def test_xid_to_string(self): - x1 = Xid.from_string('42_Z3RyaWQ=_YnF1YWw=') + x1 = ext.Xid.from_string('42_Z3RyaWQ=_YnF1YWw=') self.assertEqual(str(x1), '42_Z3RyaWQ=_YnF1YWw=') - x2 = Xid.from_string('99_xxx_yyy') + x2 = ext.Xid.from_string('99_xxx_yyy') self.assertEqual(str(x2), '99_xxx_yyy') def test_xid_unicode(self): @@ -1180,7 +1176,7 @@ class ConnectionTwoPhaseTests(ConnectingTestCase): self.assertRaises(psycopg2.ProgrammingError, cnn.cancel) def test_tpc_recover_non_dbapi_connection(self): - cnn = self.connect(connection_factory=RealDictConnection) + cnn = self.connect(connection_factory=psycopg2.extras.RealDictConnection) cnn.tpc_begin('dict-connection') cnn.tpc_prepare() cnn.reset() diff --git a/tests/test_copy.py b/tests/test_copy.py index 639c1691..05513f5a 100755 --- a/tests/test_copy.py +++ b/tests/test_copy.py @@ -22,6 +22,7 @@ # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. +import io import sys import string import unittest @@ -142,7 +143,6 @@ class CopyTests(ConnectingTestCase): curs.execute('insert into tcopy values (%s, %s)', (42, abin)) - import io f = io.StringIO() curs.copy_to(f, 'tcopy', columns=('data',)) f.seek(0) @@ -164,7 +164,6 @@ class CopyTests(ConnectingTestCase): curs.execute('insert into tcopy values (%s, %s)', (42, abin)) - import io f = io.BytesIO() curs.copy_to(f, 'tcopy', columns=('data',)) f.seek(0) @@ -184,7 +183,6 @@ class CopyTests(ConnectingTestCase): + list(range(160, 256))).decode('latin1') about = abin.replace('\\', '\\\\') - import io f = io.StringIO() f.write(about) f.seek(0) diff --git a/tests/test_cursor.py b/tests/test_cursor.py index 675699ca..47b41e63 100755 --- a/tests/test_cursor.py +++ b/tests/test_cursor.py @@ -22,6 +22,8 @@ # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. +import gc +import sys import time import ctypes import pickle @@ -115,7 +117,6 @@ class CursorTests(ConnectingTestCase): # more than once from a dict. cur = self.conn.cursor() foo = (lambda x: x)('foo') * 10 - import sys nref1 = sys.getrefcount(foo) cur.mogrify("select %(foo)s, %(foo)s, %(foo)s", {'foo': foo}) nref2 = sys.getrefcount(foo) @@ -168,7 +169,6 @@ class CursorTests(ConnectingTestCase): curs = self.conn.cursor() w = ref(curs) del curs - import gc gc.collect() self.assert_(w() is None) diff --git a/tests/test_dates.py b/tests/test_dates.py index 0c1c5046..58298223 100755 --- a/tests/test_dates.py +++ b/tests/test_dates.py @@ -23,14 +23,20 @@ # License for more details. import math +import pickle from datetime import date, datetime, time, timedelta import psycopg2 -import psycopg2.tz from psycopg2.tz import FixedOffsetTimezone, ZERO import unittest from .testutils import ConnectingTestCase, skip_before_postgres +try: + from mx.DateTime import Date, Time, DateTime, DateTimeDeltaFrom +except ImportError: + # Tests will be skipped + pass + def total_seconds(d): """Return total number of seconds of a timedelta as a float.""" @@ -278,7 +284,7 @@ class DatetimeTests(ConnectingTestCase, CommonDatetimeTestsMixin): self.assertEqual(None, dt.tzinfo) def test_type_roundtrip_datetimetz(self): - tz = psycopg2.tz.FixedOffsetTimezone(8 * 60) + tz = FixedOffsetTimezone(8 * 60) dt1 = datetime(2010, 5, 3, 10, 20, 30, tzinfo=tz) dt2 = self._test_type_roundtrip(dt1) self.assertNotEqual(None, dt2.tzinfo) @@ -289,7 +295,7 @@ class DatetimeTests(ConnectingTestCase, CommonDatetimeTestsMixin): self.assertEqual(None, tm.tzinfo) def test_type_roundtrip_timetz(self): - tz = psycopg2.tz.FixedOffsetTimezone(8 * 60) + tz = FixedOffsetTimezone(8 * 60) tm1 = time(10, 20, 30, tzinfo=tz) tm2 = self._test_type_roundtrip(tm1) self.assertNotEqual(None, tm2.tzinfo) @@ -487,7 +493,6 @@ class mxDateTimeTests(ConnectingTestCase, CommonDatetimeTestsMixin): def test_parse_time_timezone(self): # Time zone information is ignored. - from mx.DateTime import Time expected = Time(13, 30, 29) self.assertEqual(expected, self.TIME("13:30:29+01", self.curs)) self.assertEqual(expected, self.TIME("13:30:29-01", self.curs)) @@ -498,7 +503,6 @@ class mxDateTimeTests(ConnectingTestCase, CommonDatetimeTestsMixin): def test_parse_datetime_timezone(self): # Time zone information is ignored. - from mx.DateTime import DateTime expected = DateTime(2007, 1, 1, 13, 30, 29) self.assertEqual( expected, self.DATETIME("2007-01-01 13:30:29+01", self.curs)) @@ -522,19 +526,16 @@ class mxDateTimeTests(ConnectingTestCase, CommonDatetimeTestsMixin): self.assertEqual(value.second, 5) def test_adapt_time(self): - from mx.DateTime import Time value = self.execute('select (%s)::time::text', [Time(13, 30, 29)]) self.assertEqual(value, '13:30:29') def test_adapt_datetime(self): - from mx.DateTime import DateTime value = self.execute('select (%s)::timestamp::text', [DateTime(2007, 1, 1, 13, 30, 29.123456)]) self.assertEqual(value, '2007-01-01 13:30:29.123456') def test_adapt_bc_datetime(self): - from mx.DateTime import DateTime value = self.execute('select (%s)::timestamp::text', [DateTime(-41, 1, 1, 13, 30, 29.123456)]) # microsecs for BC timestamps look not available in PG < 8.4 @@ -544,7 +545,6 @@ class mxDateTimeTests(ConnectingTestCase, CommonDatetimeTestsMixin): '0042-01-01 13:30:29 BC'), value) def test_adapt_timedelta(self): - from mx.DateTime import DateTimeDeltaFrom value = self.execute('select extract(epoch from (%s)::interval)', [DateTimeDeltaFrom(days=42, seconds=45296.123456)]) @@ -553,7 +553,6 @@ class mxDateTimeTests(ConnectingTestCase, CommonDatetimeTestsMixin): self.assertEqual(int(round((value - seconds) * 1000000)), 123456) def test_adapt_negative_timedelta(self): - from mx.DateTime import DateTimeDeltaFrom value = self.execute('select extract(epoch from (%s)::interval)', [DateTimeDeltaFrom(days=-42, seconds=45296.123456)]) @@ -571,35 +570,27 @@ class mxDateTimeTests(ConnectingTestCase, CommonDatetimeTestsMixin): self.assertEqual(type(o1[0]), type(o2[0])) def test_type_roundtrip_date(self): - from mx.DateTime import Date self._test_type_roundtrip(Date(2010, 5, 3)) def test_type_roundtrip_datetime(self): - from mx.DateTime import DateTime self._test_type_roundtrip(DateTime(2010, 5, 3, 10, 20, 30)) def test_type_roundtrip_time(self): - from mx.DateTime import Time self._test_type_roundtrip(Time(10, 20, 30)) def test_type_roundtrip_interval(self): - from mx.DateTime import DateTimeDeltaFrom self._test_type_roundtrip(DateTimeDeltaFrom(seconds=30)) def test_type_roundtrip_date_array(self): - from mx.DateTime import Date self._test_type_roundtrip_array(Date(2010, 5, 3)) def test_type_roundtrip_datetime_array(self): - from mx.DateTime import DateTime self._test_type_roundtrip_array(DateTime(2010, 5, 3, 10, 20, 30)) def test_type_roundtrip_time_array(self): - from mx.DateTime import Time self._test_type_roundtrip_array(Time(10, 20, 30)) def test_type_roundtrip_interval_array(self): - from mx.DateTime import DateTimeDeltaFrom self._test_type_roundtrip_array(DateTimeDeltaFrom(seconds=30)) @@ -659,8 +650,6 @@ class FixedOffsetTimezoneTests(unittest.TestCase): def test_pickle(self): # ticket #135 - import pickle - tz11 = FixedOffsetTimezone(60) tz12 = FixedOffsetTimezone(120) for proto in [-1, 0, 1, 2]: diff --git a/tests/test_green.py b/tests/test_green.py index 0f45efbb..76ee1c83 100755 --- a/tests/test_green.py +++ b/tests/test_green.py @@ -24,6 +24,7 @@ import select import unittest +import warnings import psycopg2 import psycopg2.extensions import psycopg2.extras @@ -81,7 +82,6 @@ class GreenTestCase(ConnectingTestCase): # on high load on linux: probably because the kernel has more # buffers ready. A warning may be useful during development, # but an error is bad during regression testing. - import warnings warnings.warn("sending a large query didn't trigger block on write.") def test_error_in_callback(self): diff --git a/tests/test_ipaddress.py b/tests/test_ipaddress.py index ec67866b..bf394211 100755 --- a/tests/test_ipaddress.py +++ b/tests/test_ipaddress.py @@ -23,16 +23,15 @@ import psycopg2 import psycopg2.extras try: - import ipaddress + import ipaddress as ip except ImportError: # Python 2 - ipaddress = None + ip = None -@unittest.skipIf(ipaddress is None, "'ipaddress' module not available") +@unittest.skipIf(ip is None, "'ipaddress' module not available") class NetworkingTestCase(testutils.ConnectingTestCase): def test_inet_cast(self): - import ipaddress as ip cur = self.conn.cursor() psycopg2.extras.register_ipaddress(cur) @@ -51,7 +50,6 @@ class NetworkingTestCase(testutils.ConnectingTestCase): @testutils.skip_before_postgres(8, 2) def test_inet_array_cast(self): - import ipaddress as ip cur = self.conn.cursor() psycopg2.extras.register_ipaddress(cur) cur.execute("select '{NULL,127.0.0.1,::ffff:102:300/128}'::inet[]") @@ -63,7 +61,6 @@ class NetworkingTestCase(testutils.ConnectingTestCase): self.assert_(isinstance(l[2], ip.IPv6Interface), l) def test_inet_adapt(self): - import ipaddress as ip cur = self.conn.cursor() psycopg2.extras.register_ipaddress(cur) @@ -74,7 +71,6 @@ class NetworkingTestCase(testutils.ConnectingTestCase): self.assertEquals(cur.fetchone()[0], '::ffff:102:300/128') def test_cidr_cast(self): - import ipaddress as ip cur = self.conn.cursor() psycopg2.extras.register_ipaddress(cur) @@ -93,7 +89,6 @@ class NetworkingTestCase(testutils.ConnectingTestCase): @testutils.skip_before_postgres(8, 2) def test_cidr_array_cast(self): - import ipaddress as ip cur = self.conn.cursor() psycopg2.extras.register_ipaddress(cur) cur.execute("select '{NULL,127.0.0.1,::ffff:102:300/128}'::cidr[]") @@ -105,7 +100,6 @@ class NetworkingTestCase(testutils.ConnectingTestCase): self.assert_(isinstance(l[2], ip.IPv6Network), l) def test_cidr_adapt(self): - import ipaddress as ip cur = self.conn.cursor() psycopg2.extras.register_ipaddress(cur) diff --git a/tests/test_module.py b/tests/test_module.py index c9b9c6b3..fd8ae46a 100755 --- a/tests/test_module.py +++ b/tests/test_module.py @@ -25,6 +25,7 @@ import gc import os import sys +import pickle from subprocess import Popen from weakref import ref @@ -285,7 +286,6 @@ class ExceptionsTestCase(ConnectingTestCase): self.assertEqual(e.diag.severity_nonlocalized, 'ERROR') def test_pickle(self): - import pickle cur = self.conn.cursor() try: cur.execute("select * from nonexist") @@ -300,7 +300,6 @@ class ExceptionsTestCase(ConnectingTestCase): def test_pickle_connection_error(self): # segfaults on psycopg 2.5.1 - see ticket #170 - import pickle try: psycopg2.connect('dbname=nosuchdatabasemate') except psycopg2.Error as exc: diff --git a/tests/test_replication.py b/tests/test_replication.py index 7cb43272..697d53cf 100755 --- a/tests/test_replication.py +++ b/tests/test_replication.py @@ -22,6 +22,7 @@ # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. +import time from select import select import psycopg2 @@ -47,7 +48,6 @@ class ReplicationTestCase(ConnectingTestCase): # first close all connections, as they might keep the slot(s) active super(ReplicationTestCase, self).tearDown() - import time time.sleep(0.025) # sometimes the slot is still active, wait a little if self._slots: diff --git a/tests/test_types_basic.py b/tests/test_types_basic.py index ff1dba03..13d733fc 100755 --- a/tests/test_types_basic.py +++ b/tests/test_types_basic.py @@ -22,8 +22,10 @@ # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. +import string import ctypes import decimal +import datetime import platform import sys @@ -155,7 +157,6 @@ class TypesBasicTests(ConnectingTestCase): def testEmptyArrayRegression(self): # ticket #42 - import datetime curs = self.conn.cursor() curs.execute( "create table array_test " @@ -478,9 +479,6 @@ class AdaptSubclassTest(unittest.TestCase): del psycopg2.extensions.adapters[A, psycopg2.extensions.ISQLQuote] def test_conform_subclass_precedence(self): - - import psycopg2.extensions as ext - class foo(tuple): def __conform__(self, proto): return self @@ -488,7 +486,7 @@ class AdaptSubclassTest(unittest.TestCase): def getquoted(self): return 'bar' - self.assertEqual(ext.adapt(foo((1, 2, 3))).getquoted(), 'bar') + self.assertEqual(adapt(foo((1, 2, 3))).getquoted(), 'bar') @unittest.skipIf( @@ -556,7 +554,6 @@ class ByteaParserTest(unittest.TestCase): self.assertEqual(rv, bytes(range(256))) def test_escaped_mixed(self): - import string buf = ''.join(("\\%03o" % i) for i in range(32)) buf += string.ascii_letters buf += ''.join('\\' + c for c in string.ascii_letters) diff --git a/tests/test_types_extras.py b/tests/test_types_extras.py index a10d4760..66d4ddf0 100755 --- a/tests/test_types_extras.py +++ b/tests/test_types_extras.py @@ -16,6 +16,8 @@ import re import sys +import json +import uuid import warnings from decimal import Decimal from datetime import date, datetime @@ -32,7 +34,7 @@ import psycopg2.extensions as ext from psycopg2._json import _get_json_oids from psycopg2.extras import ( CompositeCaster, DateRange, DateTimeRange, DateTimeTZRange, HstoreAdapter, - Inet, Json, NumericRange, Range, RealDictConnection, json, + Inet, Json, NumericRange, Range, RealDictConnection, register_composite, register_hstore, register_range, ) from psycopg2.tz import FixedOffsetTimezone @@ -48,7 +50,6 @@ class TypesExtrasTests(ConnectingTestCase): @skip_if_no_uuid def testUUID(self): - import uuid psycopg2.extras.register_uuid() u = uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350') s = self.execute("SELECT %s AS foo", (u,)) @@ -59,7 +60,6 @@ class TypesExtrasTests(ConnectingTestCase): @skip_if_no_uuid def testUUIDARRAY(self): - import uuid psycopg2.extras.register_uuid() u = [uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350'), uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e352')] @@ -1019,7 +1019,6 @@ def skip_if_no_jsonb_type(f): class JsonbTestCase(ConnectingTestCase): @staticmethod def myloads(s): - import json rv = json.loads(s) rv['test'] = 1 return rv diff --git a/tests/testutils.py b/tests/testutils.py index 24b99e52..5038f394 100644 --- a/tests/testutils.py +++ b/tests/testutils.py @@ -32,11 +32,12 @@ import platform import unittest from functools import wraps from ctypes.util import find_library -from .testconfig import dsn, repl_dsn -from psycopg2 import ProgrammingError + +import psycopg2 +import psycopg2.extensions from psycopg2.compat import text_type -from .testconfig import green +from .testconfig import green, dsn, repl_dsn # Python 2/3 compatibility @@ -119,7 +120,6 @@ class ConnectingTestCase(unittest.TestCase): conninfo = kwargs.pop('dsn') else: conninfo = dsn - import psycopg2 conn = psycopg2.connect(conninfo, **kwargs) self._conns.append(conn) return conn @@ -138,7 +138,6 @@ class ConnectingTestCase(unittest.TestCase): if 'dsn' not in kwargs: kwargs['dsn'] = repl_dsn - import psycopg2 try: conn = self.connect(**kwargs) if conn.async_ == 1: @@ -167,7 +166,6 @@ class ConnectingTestCase(unittest.TestCase): # for use with async connections only def wait(self, cur_or_conn): - import psycopg2.extensions pollable = cur_or_conn if not hasattr(pollable, 'poll'): pollable = cur_or_conn.connection @@ -224,7 +222,7 @@ def decorate_all_tests(obj, *decorators): @decorate_all_tests def skip_if_no_uuid(f): - """Decorator to skip a test if uuid is not supported by Py/PG.""" + """Decorator to skip a test if uuid is not supported by PG.""" @wraps(f) def skip_if_no_uuid_(self): try: @@ -251,7 +249,7 @@ def skip_if_tpc_disabled(f): cur = cnn.cursor() try: cur.execute("SHOW max_prepared_transactions;") - except ProgrammingError: + except psycopg2.ProgrammingError: return self.skipTest( "server too old: two phase transactions not supported.") else: @@ -309,7 +307,6 @@ def skip_after_postgres(*ver): def libpq_version(): - import psycopg2 v = psycopg2.__libpq_version__ if v >= 90100: v = min(v, psycopg2.extensions.libpq_version()) @@ -375,7 +372,7 @@ def skip_if_no_superuser(f): def skip_if_no_superuser_(self): try: return f(self) - except ProgrammingError as e: + except psycopg2.ProgrammingError as e: import psycopg2.errorcodes if e.pgcode == psycopg2.errorcodes.INSUFFICIENT_PRIVILEGE: self.skipTest("skipped because not superuser") From e8135ee2cf511aa855c62fc074f414617666df43 Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Sat, 16 Mar 2019 19:07:27 +0000 Subject: [PATCH 4/4] Use errors module to catch a specific postgres error --- tests/testutils.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/tests/testutils.py b/tests/testutils.py index 5038f394..959b3dda 100644 --- a/tests/testutils.py +++ b/tests/testutils.py @@ -34,6 +34,7 @@ from functools import wraps from ctypes.util import find_library import psycopg2 +import psycopg2.errors import psycopg2.extensions from psycopg2.compat import text_type @@ -372,12 +373,8 @@ def skip_if_no_superuser(f): def skip_if_no_superuser_(self): try: return f(self) - except psycopg2.ProgrammingError as e: - import psycopg2.errorcodes - if e.pgcode == psycopg2.errorcodes.INSUFFICIENT_PRIVILEGE: - self.skipTest("skipped because not superuser") - else: - raise + except psycopg2.errors.InsufficientPrivilege: + self.skipTest("skipped because not superuser") return skip_if_no_superuser_