Other import moved to top level in tests

This commit is contained in:
Daniele Varrazzo 2019-03-16 18:56:56 +00:00
parent 8cfe176a85
commit b0119fef81
12 changed files with 44 additions and 77 deletions

View File

@ -23,15 +23,15 @@
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details. # License for more details.
import gc
import time
import unittest import unittest
from .testutils import skip_before_postgres, slow import warnings
import psycopg2 import psycopg2
from psycopg2 import extensions as ext from psycopg2 import extensions as ext
import time from .testutils import ConnectingTestCase, StringIO, skip_before_postgres, slow
from .testutils import ConnectingTestCase, StringIO
class PollableStub(object): class PollableStub(object):
@ -338,7 +338,6 @@ class AsyncTests(ConnectingTestCase):
# on high load on linux: probably because the kernel has more # on high load on linux: probably because the kernel has more
# buffers ready. A warning may be useful during development, # buffers ready. A warning may be useful during development,
# but an error is bad during regression testing. # but an error is bad during regression testing.
import warnings
warnings.warn("sending a large query didn't trigger block on write.") warnings.warn("sending a large query didn't trigger block on write.")
def test_sync_poll(self): def test_sync_poll(self):
@ -427,7 +426,6 @@ class AsyncTests(ConnectingTestCase):
self.assert_(self.conn.notices) self.assert_(self.conn.notices)
def test_async_cursor_gone(self): def test_async_cursor_gone(self):
import gc
cur = self.conn.cursor() cur = self.conn.cursor()
cur.execute("select 42;") cur.execute("select 42;")
del cur del cur

View File

@ -35,11 +35,8 @@ from operator import attrgetter
from weakref import ref from weakref import ref
import psycopg2 import psycopg2
import psycopg2.errorcodes import psycopg2.extras
from psycopg2 import extensions as ext from psycopg2 import extensions as ext
from psycopg2 import ProgrammingError
from psycopg2.extensions import Xid
from psycopg2.extras import RealDictConnection
from .testutils import ( from .testutils import (
unittest, skip_if_no_superuser, skip_before_postgres, unittest, skip_if_no_superuser, skip_before_postgres,
@ -296,7 +293,6 @@ class ConnectionTests(ConnectingTestCase):
self.assert_(not notices, "%d notices raised" % len(notices)) self.assert_(not notices, "%d notices raised" % len(notices))
def test_connect_cursor_factory(self): def test_connect_cursor_factory(self):
import psycopg2.extras
conn = self.connect(cursor_factory=psycopg2.extras.DictCursor) conn = self.connect(cursor_factory=psycopg2.extras.DictCursor)
cur = conn.cursor() cur = conn.cursor()
cur.execute("select 1 as a") cur.execute("select 1 as a")
@ -369,7 +365,7 @@ class ParseDsnTestCase(ConnectingTestCase):
dict(user='tester', password='secret', dbname='test'), dict(user='tester', password='secret', dbname='test'),
"simple DSN parsed") "simple DSN parsed")
self.assertRaises(ProgrammingError, ext.parse_dsn, self.assertRaises(psycopg2.ProgrammingError, ext.parse_dsn,
"dbname=test 2 user=tester password=secret") "dbname=test 2 user=tester password=secret")
self.assertEqual( self.assertEqual(
@ -383,7 +379,7 @@ class ParseDsnTestCase(ConnectingTestCase):
try: try:
# unterminated quote after dbname: # unterminated quote after dbname:
ext.parse_dsn("dbname='test 2 user=tester password=secret") ext.parse_dsn("dbname='test 2 user=tester password=secret")
except ProgrammingError as e: except psycopg2.ProgrammingError as e:
raised = True raised = True
self.assertTrue(str(e).find('secret') < 0, self.assertTrue(str(e).find('secret') < 0,
"DSN was not exposed in error message") "DSN was not exposed in error message")
@ -1124,27 +1120,27 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
cnn.tpc_rollback(xid) cnn.tpc_rollback(xid)
def test_xid_construction(self): def test_xid_construction(self):
x1 = Xid(74, 'foo', 'bar') x1 = ext.Xid(74, 'foo', 'bar')
self.assertEqual(74, x1.format_id) self.assertEqual(74, x1.format_id)
self.assertEqual('foo', x1.gtrid) self.assertEqual('foo', x1.gtrid)
self.assertEqual('bar', x1.bqual) self.assertEqual('bar', x1.bqual)
def test_xid_from_string(self): def test_xid_from_string(self):
x2 = Xid.from_string('42_Z3RyaWQ=_YnF1YWw=') x2 = ext.Xid.from_string('42_Z3RyaWQ=_YnF1YWw=')
self.assertEqual(42, x2.format_id) self.assertEqual(42, x2.format_id)
self.assertEqual('gtrid', x2.gtrid) self.assertEqual('gtrid', x2.gtrid)
self.assertEqual('bqual', x2.bqual) self.assertEqual('bqual', x2.bqual)
x3 = Xid.from_string('99_xxx_yyy') x3 = ext.Xid.from_string('99_xxx_yyy')
self.assertEqual(None, x3.format_id) self.assertEqual(None, x3.format_id)
self.assertEqual('99_xxx_yyy', x3.gtrid) self.assertEqual('99_xxx_yyy', x3.gtrid)
self.assertEqual(None, x3.bqual) self.assertEqual(None, x3.bqual)
def test_xid_to_string(self): def test_xid_to_string(self):
x1 = Xid.from_string('42_Z3RyaWQ=_YnF1YWw=') x1 = ext.Xid.from_string('42_Z3RyaWQ=_YnF1YWw=')
self.assertEqual(str(x1), '42_Z3RyaWQ=_YnF1YWw=') self.assertEqual(str(x1), '42_Z3RyaWQ=_YnF1YWw=')
x2 = Xid.from_string('99_xxx_yyy') x2 = ext.Xid.from_string('99_xxx_yyy')
self.assertEqual(str(x2), '99_xxx_yyy') self.assertEqual(str(x2), '99_xxx_yyy')
def test_xid_unicode(self): def test_xid_unicode(self):
@ -1180,7 +1176,7 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
self.assertRaises(psycopg2.ProgrammingError, cnn.cancel) self.assertRaises(psycopg2.ProgrammingError, cnn.cancel)
def test_tpc_recover_non_dbapi_connection(self): def test_tpc_recover_non_dbapi_connection(self):
cnn = self.connect(connection_factory=RealDictConnection) cnn = self.connect(connection_factory=psycopg2.extras.RealDictConnection)
cnn.tpc_begin('dict-connection') cnn.tpc_begin('dict-connection')
cnn.tpc_prepare() cnn.tpc_prepare()
cnn.reset() cnn.reset()

View File

@ -22,6 +22,7 @@
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details. # License for more details.
import io
import sys import sys
import string import string
import unittest import unittest
@ -142,7 +143,6 @@ class CopyTests(ConnectingTestCase):
curs.execute('insert into tcopy values (%s, %s)', curs.execute('insert into tcopy values (%s, %s)',
(42, abin)) (42, abin))
import io
f = io.StringIO() f = io.StringIO()
curs.copy_to(f, 'tcopy', columns=('data',)) curs.copy_to(f, 'tcopy', columns=('data',))
f.seek(0) f.seek(0)
@ -164,7 +164,6 @@ class CopyTests(ConnectingTestCase):
curs.execute('insert into tcopy values (%s, %s)', curs.execute('insert into tcopy values (%s, %s)',
(42, abin)) (42, abin))
import io
f = io.BytesIO() f = io.BytesIO()
curs.copy_to(f, 'tcopy', columns=('data',)) curs.copy_to(f, 'tcopy', columns=('data',))
f.seek(0) f.seek(0)
@ -184,7 +183,6 @@ class CopyTests(ConnectingTestCase):
+ list(range(160, 256))).decode('latin1') + list(range(160, 256))).decode('latin1')
about = abin.replace('\\', '\\\\') about = abin.replace('\\', '\\\\')
import io
f = io.StringIO() f = io.StringIO()
f.write(about) f.write(about)
f.seek(0) f.seek(0)

View File

@ -22,6 +22,8 @@
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details. # License for more details.
import gc
import sys
import time import time
import ctypes import ctypes
import pickle import pickle
@ -115,7 +117,6 @@ class CursorTests(ConnectingTestCase):
# more than once from a dict. # more than once from a dict.
cur = self.conn.cursor() cur = self.conn.cursor()
foo = (lambda x: x)('foo') * 10 foo = (lambda x: x)('foo') * 10
import sys
nref1 = sys.getrefcount(foo) nref1 = sys.getrefcount(foo)
cur.mogrify("select %(foo)s, %(foo)s, %(foo)s", {'foo': foo}) cur.mogrify("select %(foo)s, %(foo)s, %(foo)s", {'foo': foo})
nref2 = sys.getrefcount(foo) nref2 = sys.getrefcount(foo)
@ -168,7 +169,6 @@ class CursorTests(ConnectingTestCase):
curs = self.conn.cursor() curs = self.conn.cursor()
w = ref(curs) w = ref(curs)
del curs del curs
import gc
gc.collect() gc.collect()
self.assert_(w() is None) self.assert_(w() is None)

View File

@ -23,14 +23,20 @@
# License for more details. # License for more details.
import math import math
import pickle
from datetime import date, datetime, time, timedelta from datetime import date, datetime, time, timedelta
import psycopg2 import psycopg2
import psycopg2.tz
from psycopg2.tz import FixedOffsetTimezone, ZERO from psycopg2.tz import FixedOffsetTimezone, ZERO
import unittest import unittest
from .testutils import ConnectingTestCase, skip_before_postgres from .testutils import ConnectingTestCase, skip_before_postgres
try:
from mx.DateTime import Date, Time, DateTime, DateTimeDeltaFrom
except ImportError:
# Tests will be skipped
pass
def total_seconds(d): def total_seconds(d):
"""Return total number of seconds of a timedelta as a float.""" """Return total number of seconds of a timedelta as a float."""
@ -278,7 +284,7 @@ class DatetimeTests(ConnectingTestCase, CommonDatetimeTestsMixin):
self.assertEqual(None, dt.tzinfo) self.assertEqual(None, dt.tzinfo)
def test_type_roundtrip_datetimetz(self): def test_type_roundtrip_datetimetz(self):
tz = psycopg2.tz.FixedOffsetTimezone(8 * 60) tz = FixedOffsetTimezone(8 * 60)
dt1 = datetime(2010, 5, 3, 10, 20, 30, tzinfo=tz) dt1 = datetime(2010, 5, 3, 10, 20, 30, tzinfo=tz)
dt2 = self._test_type_roundtrip(dt1) dt2 = self._test_type_roundtrip(dt1)
self.assertNotEqual(None, dt2.tzinfo) self.assertNotEqual(None, dt2.tzinfo)
@ -289,7 +295,7 @@ class DatetimeTests(ConnectingTestCase, CommonDatetimeTestsMixin):
self.assertEqual(None, tm.tzinfo) self.assertEqual(None, tm.tzinfo)
def test_type_roundtrip_timetz(self): def test_type_roundtrip_timetz(self):
tz = psycopg2.tz.FixedOffsetTimezone(8 * 60) tz = FixedOffsetTimezone(8 * 60)
tm1 = time(10, 20, 30, tzinfo=tz) tm1 = time(10, 20, 30, tzinfo=tz)
tm2 = self._test_type_roundtrip(tm1) tm2 = self._test_type_roundtrip(tm1)
self.assertNotEqual(None, tm2.tzinfo) self.assertNotEqual(None, tm2.tzinfo)
@ -487,7 +493,6 @@ class mxDateTimeTests(ConnectingTestCase, CommonDatetimeTestsMixin):
def test_parse_time_timezone(self): def test_parse_time_timezone(self):
# Time zone information is ignored. # Time zone information is ignored.
from mx.DateTime import Time
expected = Time(13, 30, 29) expected = Time(13, 30, 29)
self.assertEqual(expected, self.TIME("13:30:29+01", self.curs)) self.assertEqual(expected, self.TIME("13:30:29+01", self.curs))
self.assertEqual(expected, self.TIME("13:30:29-01", self.curs)) self.assertEqual(expected, self.TIME("13:30:29-01", self.curs))
@ -498,7 +503,6 @@ class mxDateTimeTests(ConnectingTestCase, CommonDatetimeTestsMixin):
def test_parse_datetime_timezone(self): def test_parse_datetime_timezone(self):
# Time zone information is ignored. # Time zone information is ignored.
from mx.DateTime import DateTime
expected = DateTime(2007, 1, 1, 13, 30, 29) expected = DateTime(2007, 1, 1, 13, 30, 29)
self.assertEqual( self.assertEqual(
expected, self.DATETIME("2007-01-01 13:30:29+01", self.curs)) expected, self.DATETIME("2007-01-01 13:30:29+01", self.curs))
@ -522,19 +526,16 @@ class mxDateTimeTests(ConnectingTestCase, CommonDatetimeTestsMixin):
self.assertEqual(value.second, 5) self.assertEqual(value.second, 5)
def test_adapt_time(self): def test_adapt_time(self):
from mx.DateTime import Time
value = self.execute('select (%s)::time::text', value = self.execute('select (%s)::time::text',
[Time(13, 30, 29)]) [Time(13, 30, 29)])
self.assertEqual(value, '13:30:29') self.assertEqual(value, '13:30:29')
def test_adapt_datetime(self): def test_adapt_datetime(self):
from mx.DateTime import DateTime
value = self.execute('select (%s)::timestamp::text', value = self.execute('select (%s)::timestamp::text',
[DateTime(2007, 1, 1, 13, 30, 29.123456)]) [DateTime(2007, 1, 1, 13, 30, 29.123456)])
self.assertEqual(value, '2007-01-01 13:30:29.123456') self.assertEqual(value, '2007-01-01 13:30:29.123456')
def test_adapt_bc_datetime(self): def test_adapt_bc_datetime(self):
from mx.DateTime import DateTime
value = self.execute('select (%s)::timestamp::text', value = self.execute('select (%s)::timestamp::text',
[DateTime(-41, 1, 1, 13, 30, 29.123456)]) [DateTime(-41, 1, 1, 13, 30, 29.123456)])
# microsecs for BC timestamps look not available in PG < 8.4 # microsecs for BC timestamps look not available in PG < 8.4
@ -544,7 +545,6 @@ class mxDateTimeTests(ConnectingTestCase, CommonDatetimeTestsMixin):
'0042-01-01 13:30:29 BC'), value) '0042-01-01 13:30:29 BC'), value)
def test_adapt_timedelta(self): def test_adapt_timedelta(self):
from mx.DateTime import DateTimeDeltaFrom
value = self.execute('select extract(epoch from (%s)::interval)', value = self.execute('select extract(epoch from (%s)::interval)',
[DateTimeDeltaFrom(days=42, [DateTimeDeltaFrom(days=42,
seconds=45296.123456)]) seconds=45296.123456)])
@ -553,7 +553,6 @@ class mxDateTimeTests(ConnectingTestCase, CommonDatetimeTestsMixin):
self.assertEqual(int(round((value - seconds) * 1000000)), 123456) self.assertEqual(int(round((value - seconds) * 1000000)), 123456)
def test_adapt_negative_timedelta(self): def test_adapt_negative_timedelta(self):
from mx.DateTime import DateTimeDeltaFrom
value = self.execute('select extract(epoch from (%s)::interval)', value = self.execute('select extract(epoch from (%s)::interval)',
[DateTimeDeltaFrom(days=-42, [DateTimeDeltaFrom(days=-42,
seconds=45296.123456)]) seconds=45296.123456)])
@ -571,35 +570,27 @@ class mxDateTimeTests(ConnectingTestCase, CommonDatetimeTestsMixin):
self.assertEqual(type(o1[0]), type(o2[0])) self.assertEqual(type(o1[0]), type(o2[0]))
def test_type_roundtrip_date(self): def test_type_roundtrip_date(self):
from mx.DateTime import Date
self._test_type_roundtrip(Date(2010, 5, 3)) self._test_type_roundtrip(Date(2010, 5, 3))
def test_type_roundtrip_datetime(self): def test_type_roundtrip_datetime(self):
from mx.DateTime import DateTime
self._test_type_roundtrip(DateTime(2010, 5, 3, 10, 20, 30)) self._test_type_roundtrip(DateTime(2010, 5, 3, 10, 20, 30))
def test_type_roundtrip_time(self): def test_type_roundtrip_time(self):
from mx.DateTime import Time
self._test_type_roundtrip(Time(10, 20, 30)) self._test_type_roundtrip(Time(10, 20, 30))
def test_type_roundtrip_interval(self): def test_type_roundtrip_interval(self):
from mx.DateTime import DateTimeDeltaFrom
self._test_type_roundtrip(DateTimeDeltaFrom(seconds=30)) self._test_type_roundtrip(DateTimeDeltaFrom(seconds=30))
def test_type_roundtrip_date_array(self): def test_type_roundtrip_date_array(self):
from mx.DateTime import Date
self._test_type_roundtrip_array(Date(2010, 5, 3)) self._test_type_roundtrip_array(Date(2010, 5, 3))
def test_type_roundtrip_datetime_array(self): def test_type_roundtrip_datetime_array(self):
from mx.DateTime import DateTime
self._test_type_roundtrip_array(DateTime(2010, 5, 3, 10, 20, 30)) self._test_type_roundtrip_array(DateTime(2010, 5, 3, 10, 20, 30))
def test_type_roundtrip_time_array(self): def test_type_roundtrip_time_array(self):
from mx.DateTime import Time
self._test_type_roundtrip_array(Time(10, 20, 30)) self._test_type_roundtrip_array(Time(10, 20, 30))
def test_type_roundtrip_interval_array(self): def test_type_roundtrip_interval_array(self):
from mx.DateTime import DateTimeDeltaFrom
self._test_type_roundtrip_array(DateTimeDeltaFrom(seconds=30)) self._test_type_roundtrip_array(DateTimeDeltaFrom(seconds=30))
@ -659,8 +650,6 @@ class FixedOffsetTimezoneTests(unittest.TestCase):
def test_pickle(self): def test_pickle(self):
# ticket #135 # ticket #135
import pickle
tz11 = FixedOffsetTimezone(60) tz11 = FixedOffsetTimezone(60)
tz12 = FixedOffsetTimezone(120) tz12 = FixedOffsetTimezone(120)
for proto in [-1, 0, 1, 2]: for proto in [-1, 0, 1, 2]:

View File

@ -24,6 +24,7 @@
import select import select
import unittest import unittest
import warnings
import psycopg2 import psycopg2
import psycopg2.extensions import psycopg2.extensions
import psycopg2.extras import psycopg2.extras
@ -81,7 +82,6 @@ class GreenTestCase(ConnectingTestCase):
# on high load on linux: probably because the kernel has more # on high load on linux: probably because the kernel has more
# buffers ready. A warning may be useful during development, # buffers ready. A warning may be useful during development,
# but an error is bad during regression testing. # but an error is bad during regression testing.
import warnings
warnings.warn("sending a large query didn't trigger block on write.") warnings.warn("sending a large query didn't trigger block on write.")
def test_error_in_callback(self): def test_error_in_callback(self):

View File

@ -23,16 +23,15 @@ import psycopg2
import psycopg2.extras import psycopg2.extras
try: try:
import ipaddress import ipaddress as ip
except ImportError: except ImportError:
# Python 2 # Python 2
ipaddress = None ip = None
@unittest.skipIf(ipaddress is None, "'ipaddress' module not available") @unittest.skipIf(ip is None, "'ipaddress' module not available")
class NetworkingTestCase(testutils.ConnectingTestCase): class NetworkingTestCase(testutils.ConnectingTestCase):
def test_inet_cast(self): def test_inet_cast(self):
import ipaddress as ip
cur = self.conn.cursor() cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur) psycopg2.extras.register_ipaddress(cur)
@ -51,7 +50,6 @@ class NetworkingTestCase(testutils.ConnectingTestCase):
@testutils.skip_before_postgres(8, 2) @testutils.skip_before_postgres(8, 2)
def test_inet_array_cast(self): def test_inet_array_cast(self):
import ipaddress as ip
cur = self.conn.cursor() cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur) psycopg2.extras.register_ipaddress(cur)
cur.execute("select '{NULL,127.0.0.1,::ffff:102:300/128}'::inet[]") cur.execute("select '{NULL,127.0.0.1,::ffff:102:300/128}'::inet[]")
@ -63,7 +61,6 @@ class NetworkingTestCase(testutils.ConnectingTestCase):
self.assert_(isinstance(l[2], ip.IPv6Interface), l) self.assert_(isinstance(l[2], ip.IPv6Interface), l)
def test_inet_adapt(self): def test_inet_adapt(self):
import ipaddress as ip
cur = self.conn.cursor() cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur) psycopg2.extras.register_ipaddress(cur)
@ -74,7 +71,6 @@ class NetworkingTestCase(testutils.ConnectingTestCase):
self.assertEquals(cur.fetchone()[0], '::ffff:102:300/128') self.assertEquals(cur.fetchone()[0], '::ffff:102:300/128')
def test_cidr_cast(self): def test_cidr_cast(self):
import ipaddress as ip
cur = self.conn.cursor() cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur) psycopg2.extras.register_ipaddress(cur)
@ -93,7 +89,6 @@ class NetworkingTestCase(testutils.ConnectingTestCase):
@testutils.skip_before_postgres(8, 2) @testutils.skip_before_postgres(8, 2)
def test_cidr_array_cast(self): def test_cidr_array_cast(self):
import ipaddress as ip
cur = self.conn.cursor() cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur) psycopg2.extras.register_ipaddress(cur)
cur.execute("select '{NULL,127.0.0.1,::ffff:102:300/128}'::cidr[]") cur.execute("select '{NULL,127.0.0.1,::ffff:102:300/128}'::cidr[]")
@ -105,7 +100,6 @@ class NetworkingTestCase(testutils.ConnectingTestCase):
self.assert_(isinstance(l[2], ip.IPv6Network), l) self.assert_(isinstance(l[2], ip.IPv6Network), l)
def test_cidr_adapt(self): def test_cidr_adapt(self):
import ipaddress as ip
cur = self.conn.cursor() cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur) psycopg2.extras.register_ipaddress(cur)

View File

@ -25,6 +25,7 @@
import gc import gc
import os import os
import sys import sys
import pickle
from subprocess import Popen from subprocess import Popen
from weakref import ref from weakref import ref
@ -285,7 +286,6 @@ class ExceptionsTestCase(ConnectingTestCase):
self.assertEqual(e.diag.severity_nonlocalized, 'ERROR') self.assertEqual(e.diag.severity_nonlocalized, 'ERROR')
def test_pickle(self): def test_pickle(self):
import pickle
cur = self.conn.cursor() cur = self.conn.cursor()
try: try:
cur.execute("select * from nonexist") cur.execute("select * from nonexist")
@ -300,7 +300,6 @@ class ExceptionsTestCase(ConnectingTestCase):
def test_pickle_connection_error(self): def test_pickle_connection_error(self):
# segfaults on psycopg 2.5.1 - see ticket #170 # segfaults on psycopg 2.5.1 - see ticket #170
import pickle
try: try:
psycopg2.connect('dbname=nosuchdatabasemate') psycopg2.connect('dbname=nosuchdatabasemate')
except psycopg2.Error as exc: except psycopg2.Error as exc:

View File

@ -22,6 +22,7 @@
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details. # License for more details.
import time
from select import select from select import select
import psycopg2 import psycopg2
@ -47,7 +48,6 @@ class ReplicationTestCase(ConnectingTestCase):
# first close all connections, as they might keep the slot(s) active # first close all connections, as they might keep the slot(s) active
super(ReplicationTestCase, self).tearDown() super(ReplicationTestCase, self).tearDown()
import time
time.sleep(0.025) # sometimes the slot is still active, wait a little time.sleep(0.025) # sometimes the slot is still active, wait a little
if self._slots: if self._slots:

View File

@ -22,8 +22,10 @@
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details. # License for more details.
import string
import ctypes import ctypes
import decimal import decimal
import datetime
import platform import platform
import sys import sys
@ -155,7 +157,6 @@ class TypesBasicTests(ConnectingTestCase):
def testEmptyArrayRegression(self): def testEmptyArrayRegression(self):
# ticket #42 # ticket #42
import datetime
curs = self.conn.cursor() curs = self.conn.cursor()
curs.execute( curs.execute(
"create table array_test " "create table array_test "
@ -478,9 +479,6 @@ class AdaptSubclassTest(unittest.TestCase):
del psycopg2.extensions.adapters[A, psycopg2.extensions.ISQLQuote] del psycopg2.extensions.adapters[A, psycopg2.extensions.ISQLQuote]
def test_conform_subclass_precedence(self): def test_conform_subclass_precedence(self):
import psycopg2.extensions as ext
class foo(tuple): class foo(tuple):
def __conform__(self, proto): def __conform__(self, proto):
return self return self
@ -488,7 +486,7 @@ class AdaptSubclassTest(unittest.TestCase):
def getquoted(self): def getquoted(self):
return 'bar' return 'bar'
self.assertEqual(ext.adapt(foo((1, 2, 3))).getquoted(), 'bar') self.assertEqual(adapt(foo((1, 2, 3))).getquoted(), 'bar')
@unittest.skipIf( @unittest.skipIf(
@ -556,7 +554,6 @@ class ByteaParserTest(unittest.TestCase):
self.assertEqual(rv, bytes(range(256))) self.assertEqual(rv, bytes(range(256)))
def test_escaped_mixed(self): def test_escaped_mixed(self):
import string
buf = ''.join(("\\%03o" % i) for i in range(32)) buf = ''.join(("\\%03o" % i) for i in range(32))
buf += string.ascii_letters buf += string.ascii_letters
buf += ''.join('\\' + c for c in string.ascii_letters) buf += ''.join('\\' + c for c in string.ascii_letters)

View File

@ -16,6 +16,8 @@
import re import re
import sys import sys
import json
import uuid
import warnings import warnings
from decimal import Decimal from decimal import Decimal
from datetime import date, datetime from datetime import date, datetime
@ -32,7 +34,7 @@ import psycopg2.extensions as ext
from psycopg2._json import _get_json_oids from psycopg2._json import _get_json_oids
from psycopg2.extras import ( from psycopg2.extras import (
CompositeCaster, DateRange, DateTimeRange, DateTimeTZRange, HstoreAdapter, CompositeCaster, DateRange, DateTimeRange, DateTimeTZRange, HstoreAdapter,
Inet, Json, NumericRange, Range, RealDictConnection, json, Inet, Json, NumericRange, Range, RealDictConnection,
register_composite, register_hstore, register_range, register_composite, register_hstore, register_range,
) )
from psycopg2.tz import FixedOffsetTimezone from psycopg2.tz import FixedOffsetTimezone
@ -48,7 +50,6 @@ class TypesExtrasTests(ConnectingTestCase):
@skip_if_no_uuid @skip_if_no_uuid
def testUUID(self): def testUUID(self):
import uuid
psycopg2.extras.register_uuid() psycopg2.extras.register_uuid()
u = uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350') u = uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350')
s = self.execute("SELECT %s AS foo", (u,)) s = self.execute("SELECT %s AS foo", (u,))
@ -59,7 +60,6 @@ class TypesExtrasTests(ConnectingTestCase):
@skip_if_no_uuid @skip_if_no_uuid
def testUUIDARRAY(self): def testUUIDARRAY(self):
import uuid
psycopg2.extras.register_uuid() psycopg2.extras.register_uuid()
u = [uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350'), u = [uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350'),
uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e352')] uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e352')]
@ -1019,7 +1019,6 @@ def skip_if_no_jsonb_type(f):
class JsonbTestCase(ConnectingTestCase): class JsonbTestCase(ConnectingTestCase):
@staticmethod @staticmethod
def myloads(s): def myloads(s):
import json
rv = json.loads(s) rv = json.loads(s)
rv['test'] = 1 rv['test'] = 1
return rv return rv

View File

@ -32,11 +32,12 @@ import platform
import unittest import unittest
from functools import wraps from functools import wraps
from ctypes.util import find_library from ctypes.util import find_library
from .testconfig import dsn, repl_dsn
from psycopg2 import ProgrammingError import psycopg2
import psycopg2.extensions
from psycopg2.compat import text_type from psycopg2.compat import text_type
from .testconfig import green from .testconfig import green, dsn, repl_dsn
# Python 2/3 compatibility # Python 2/3 compatibility
@ -119,7 +120,6 @@ class ConnectingTestCase(unittest.TestCase):
conninfo = kwargs.pop('dsn') conninfo = kwargs.pop('dsn')
else: else:
conninfo = dsn conninfo = dsn
import psycopg2
conn = psycopg2.connect(conninfo, **kwargs) conn = psycopg2.connect(conninfo, **kwargs)
self._conns.append(conn) self._conns.append(conn)
return conn return conn
@ -138,7 +138,6 @@ class ConnectingTestCase(unittest.TestCase):
if 'dsn' not in kwargs: if 'dsn' not in kwargs:
kwargs['dsn'] = repl_dsn kwargs['dsn'] = repl_dsn
import psycopg2
try: try:
conn = self.connect(**kwargs) conn = self.connect(**kwargs)
if conn.async_ == 1: if conn.async_ == 1:
@ -167,7 +166,6 @@ class ConnectingTestCase(unittest.TestCase):
# for use with async connections only # for use with async connections only
def wait(self, cur_or_conn): def wait(self, cur_or_conn):
import psycopg2.extensions
pollable = cur_or_conn pollable = cur_or_conn
if not hasattr(pollable, 'poll'): if not hasattr(pollable, 'poll'):
pollable = cur_or_conn.connection pollable = cur_or_conn.connection
@ -224,7 +222,7 @@ def decorate_all_tests(obj, *decorators):
@decorate_all_tests @decorate_all_tests
def skip_if_no_uuid(f): def skip_if_no_uuid(f):
"""Decorator to skip a test if uuid is not supported by Py/PG.""" """Decorator to skip a test if uuid is not supported by PG."""
@wraps(f) @wraps(f)
def skip_if_no_uuid_(self): def skip_if_no_uuid_(self):
try: try:
@ -251,7 +249,7 @@ def skip_if_tpc_disabled(f):
cur = cnn.cursor() cur = cnn.cursor()
try: try:
cur.execute("SHOW max_prepared_transactions;") cur.execute("SHOW max_prepared_transactions;")
except ProgrammingError: except psycopg2.ProgrammingError:
return self.skipTest( return self.skipTest(
"server too old: two phase transactions not supported.") "server too old: two phase transactions not supported.")
else: else:
@ -309,7 +307,6 @@ def skip_after_postgres(*ver):
def libpq_version(): def libpq_version():
import psycopg2
v = psycopg2.__libpq_version__ v = psycopg2.__libpq_version__
if v >= 90100: if v >= 90100:
v = min(v, psycopg2.extensions.libpq_version()) v = min(v, psycopg2.extensions.libpq_version())
@ -375,7 +372,7 @@ def skip_if_no_superuser(f):
def skip_if_no_superuser_(self): def skip_if_no_superuser_(self):
try: try:
return f(self) return f(self)
except ProgrammingError as e: except psycopg2.ProgrammingError as e:
import psycopg2.errorcodes import psycopg2.errorcodes
if e.pgcode == psycopg2.errorcodes.INSUFFICIENT_PRIVILEGE: if e.pgcode == psycopg2.errorcodes.INSUFFICIENT_PRIVILEGE:
self.skipTest("skipped because not superuser") self.skipTest("skipped because not superuser")