Merge branch 'fix-imports'

This commit is contained in:
Daniele Varrazzo 2019-03-16 19:53:45 +00:00
commit 4ace9544ff
13 changed files with 54 additions and 101 deletions

View File

@ -23,15 +23,15 @@
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
import gc
import time
import unittest
from .testutils import skip_before_postgres, slow
import warnings
import psycopg2
from psycopg2 import extensions as ext
import time
from .testutils import ConnectingTestCase, StringIO
from .testutils import ConnectingTestCase, StringIO, skip_before_postgres, slow
class PollableStub(object):
@ -338,7 +338,6 @@ class AsyncTests(ConnectingTestCase):
# on high load on linux: probably because the kernel has more
# buffers ready. A warning may be useful during development,
# but an error is bad during regression testing.
import warnings
warnings.warn("sending a large query didn't trigger block on write.")
def test_sync_poll(self):
@ -427,7 +426,6 @@ class AsyncTests(ConnectingTestCase):
self.assert_(self.conn.notices)
def test_async_cursor_gone(self):
import gc
cur = self.conn.cursor()
cur.execute("select 42;")
del cur

View File

@ -35,11 +35,8 @@ from operator import attrgetter
from weakref import ref
import psycopg2
import psycopg2.errorcodes
import psycopg2.extras
from psycopg2 import extensions as ext
from psycopg2 import ProgrammingError
from psycopg2.extensions import Xid
from psycopg2.extras import RealDictConnection
from .testutils import (
unittest, skip_if_no_superuser, skip_before_postgres,
@ -296,7 +293,6 @@ class ConnectionTests(ConnectingTestCase):
self.assert_(not notices, "%d notices raised" % len(notices))
def test_connect_cursor_factory(self):
import psycopg2.extras
conn = self.connect(cursor_factory=psycopg2.extras.DictCursor)
cur = conn.cursor()
cur.execute("select 1 as a")
@ -369,7 +365,7 @@ class ParseDsnTestCase(ConnectingTestCase):
dict(user='tester', password='secret', dbname='test'),
"simple DSN parsed")
self.assertRaises(ProgrammingError, ext.parse_dsn,
self.assertRaises(psycopg2.ProgrammingError, ext.parse_dsn,
"dbname=test 2 user=tester password=secret")
self.assertEqual(
@ -383,7 +379,7 @@ class ParseDsnTestCase(ConnectingTestCase):
try:
# unterminated quote after dbname:
ext.parse_dsn("dbname='test 2 user=tester password=secret")
except ProgrammingError as e:
except psycopg2.ProgrammingError as e:
raised = True
self.assertTrue(str(e).find('secret') < 0,
"DSN was not exposed in error message")
@ -1124,27 +1120,27 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
cnn.tpc_rollback(xid)
def test_xid_construction(self):
x1 = Xid(74, 'foo', 'bar')
x1 = ext.Xid(74, 'foo', 'bar')
self.assertEqual(74, x1.format_id)
self.assertEqual('foo', x1.gtrid)
self.assertEqual('bar', x1.bqual)
def test_xid_from_string(self):
x2 = Xid.from_string('42_Z3RyaWQ=_YnF1YWw=')
x2 = ext.Xid.from_string('42_Z3RyaWQ=_YnF1YWw=')
self.assertEqual(42, x2.format_id)
self.assertEqual('gtrid', x2.gtrid)
self.assertEqual('bqual', x2.bqual)
x3 = Xid.from_string('99_xxx_yyy')
x3 = ext.Xid.from_string('99_xxx_yyy')
self.assertEqual(None, x3.format_id)
self.assertEqual('99_xxx_yyy', x3.gtrid)
self.assertEqual(None, x3.bqual)
def test_xid_to_string(self):
x1 = Xid.from_string('42_Z3RyaWQ=_YnF1YWw=')
x1 = ext.Xid.from_string('42_Z3RyaWQ=_YnF1YWw=')
self.assertEqual(str(x1), '42_Z3RyaWQ=_YnF1YWw=')
x2 = Xid.from_string('99_xxx_yyy')
x2 = ext.Xid.from_string('99_xxx_yyy')
self.assertEqual(str(x2), '99_xxx_yyy')
def test_xid_unicode(self):
@ -1180,7 +1176,7 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
self.assertRaises(psycopg2.ProgrammingError, cnn.cancel)
def test_tpc_recover_non_dbapi_connection(self):
cnn = self.connect(connection_factory=RealDictConnection)
cnn = self.connect(connection_factory=psycopg2.extras.RealDictConnection)
cnn.tpc_begin('dict-connection')
cnn.tpc_prepare()
cnn.reset()

View File

@ -22,6 +22,7 @@
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
import io
import sys
import string
import unittest
@ -31,17 +32,11 @@ from subprocess import Popen, PIPE
import psycopg2
import psycopg2.extensions
from .testutils import skip_copy_if_green
from .testutils import skip_copy_if_green, TextIOBase
from .testconfig import dsn
if sys.version_info[0] < 3:
_base = object
else:
from io import TextIOBase as _base
class MinimalRead(_base):
class MinimalRead(TextIOBase):
"""A file wrapper exposing the minimal interface to copy from."""
def __init__(self, f):
self.f = f
@ -53,7 +48,7 @@ class MinimalRead(_base):
return self.f.readline()
class MinimalWrite(_base):
class MinimalWrite(TextIOBase):
"""A file wrapper exposing the minimal interface to copy to."""
def __init__(self, f):
self.f = f
@ -148,7 +143,6 @@ class CopyTests(ConnectingTestCase):
curs.execute('insert into tcopy values (%s, %s)',
(42, abin))
import io
f = io.StringIO()
curs.copy_to(f, 'tcopy', columns=('data',))
f.seek(0)
@ -170,7 +164,6 @@ class CopyTests(ConnectingTestCase):
curs.execute('insert into tcopy values (%s, %s)',
(42, abin))
import io
f = io.BytesIO()
curs.copy_to(f, 'tcopy', columns=('data',))
f.seek(0)
@ -190,7 +183,6 @@ class CopyTests(ConnectingTestCase):
+ list(range(160, 256))).decode('latin1')
about = abin.replace('\\', '\\\\')
import io
f = io.StringIO()
f.write(about)
f.seek(0)
@ -351,7 +343,7 @@ conn.close()
self.assertEqual(0, proc.returncode)
def test_copy_from_propagate_error(self):
class BrokenRead(_base):
class BrokenRead(TextIOBase):
def read(self, size):
return 1 / 0
@ -368,7 +360,7 @@ conn.close()
self.assert_('ZeroDivisionError' in str(e))
def test_copy_to_propagate_error(self):
class BrokenWrite(_base):
class BrokenWrite(TextIOBase):
def write(self, data):
return 1 / 0

View File

@ -22,6 +22,8 @@
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
import gc
import sys
import time
import ctypes
import pickle
@ -115,7 +117,6 @@ class CursorTests(ConnectingTestCase):
# more than once from a dict.
cur = self.conn.cursor()
foo = (lambda x: x)('foo') * 10
import sys
nref1 = sys.getrefcount(foo)
cur.mogrify("select %(foo)s, %(foo)s, %(foo)s", {'foo': foo})
nref2 = sys.getrefcount(foo)
@ -168,7 +169,6 @@ class CursorTests(ConnectingTestCase):
curs = self.conn.cursor()
w = ref(curs)
del curs
import gc
gc.collect()
self.assert_(w() is None)

View File

@ -23,14 +23,20 @@
# License for more details.
import math
import pickle
from datetime import date, datetime, time, timedelta
import psycopg2
import psycopg2.tz
from psycopg2.tz import FixedOffsetTimezone, ZERO
import unittest
from .testutils import ConnectingTestCase, skip_before_postgres
try:
from mx.DateTime import Date, Time, DateTime, DateTimeDeltaFrom
except ImportError:
# Tests will be skipped
pass
def total_seconds(d):
"""Return total number of seconds of a timedelta as a float."""
@ -278,7 +284,7 @@ class DatetimeTests(ConnectingTestCase, CommonDatetimeTestsMixin):
self.assertEqual(None, dt.tzinfo)
def test_type_roundtrip_datetimetz(self):
tz = psycopg2.tz.FixedOffsetTimezone(8 * 60)
tz = FixedOffsetTimezone(8 * 60)
dt1 = datetime(2010, 5, 3, 10, 20, 30, tzinfo=tz)
dt2 = self._test_type_roundtrip(dt1)
self.assertNotEqual(None, dt2.tzinfo)
@ -289,7 +295,7 @@ class DatetimeTests(ConnectingTestCase, CommonDatetimeTestsMixin):
self.assertEqual(None, tm.tzinfo)
def test_type_roundtrip_timetz(self):
tz = psycopg2.tz.FixedOffsetTimezone(8 * 60)
tz = FixedOffsetTimezone(8 * 60)
tm1 = time(10, 20, 30, tzinfo=tz)
tm2 = self._test_type_roundtrip(tm1)
self.assertNotEqual(None, tm2.tzinfo)
@ -487,7 +493,6 @@ class mxDateTimeTests(ConnectingTestCase, CommonDatetimeTestsMixin):
def test_parse_time_timezone(self):
# Time zone information is ignored.
from mx.DateTime import Time
expected = Time(13, 30, 29)
self.assertEqual(expected, self.TIME("13:30:29+01", self.curs))
self.assertEqual(expected, self.TIME("13:30:29-01", self.curs))
@ -498,7 +503,6 @@ class mxDateTimeTests(ConnectingTestCase, CommonDatetimeTestsMixin):
def test_parse_datetime_timezone(self):
# Time zone information is ignored.
from mx.DateTime import DateTime
expected = DateTime(2007, 1, 1, 13, 30, 29)
self.assertEqual(
expected, self.DATETIME("2007-01-01 13:30:29+01", self.curs))
@ -522,19 +526,16 @@ class mxDateTimeTests(ConnectingTestCase, CommonDatetimeTestsMixin):
self.assertEqual(value.second, 5)
def test_adapt_time(self):
from mx.DateTime import Time
value = self.execute('select (%s)::time::text',
[Time(13, 30, 29)])
self.assertEqual(value, '13:30:29')
def test_adapt_datetime(self):
from mx.DateTime import DateTime
value = self.execute('select (%s)::timestamp::text',
[DateTime(2007, 1, 1, 13, 30, 29.123456)])
self.assertEqual(value, '2007-01-01 13:30:29.123456')
def test_adapt_bc_datetime(self):
from mx.DateTime import DateTime
value = self.execute('select (%s)::timestamp::text',
[DateTime(-41, 1, 1, 13, 30, 29.123456)])
# microsecs for BC timestamps look not available in PG < 8.4
@ -544,7 +545,6 @@ class mxDateTimeTests(ConnectingTestCase, CommonDatetimeTestsMixin):
'0042-01-01 13:30:29 BC'), value)
def test_adapt_timedelta(self):
from mx.DateTime import DateTimeDeltaFrom
value = self.execute('select extract(epoch from (%s)::interval)',
[DateTimeDeltaFrom(days=42,
seconds=45296.123456)])
@ -553,7 +553,6 @@ class mxDateTimeTests(ConnectingTestCase, CommonDatetimeTestsMixin):
self.assertEqual(int(round((value - seconds) * 1000000)), 123456)
def test_adapt_negative_timedelta(self):
from mx.DateTime import DateTimeDeltaFrom
value = self.execute('select extract(epoch from (%s)::interval)',
[DateTimeDeltaFrom(days=-42,
seconds=45296.123456)])
@ -571,35 +570,27 @@ class mxDateTimeTests(ConnectingTestCase, CommonDatetimeTestsMixin):
self.assertEqual(type(o1[0]), type(o2[0]))
def test_type_roundtrip_date(self):
from mx.DateTime import Date
self._test_type_roundtrip(Date(2010, 5, 3))
def test_type_roundtrip_datetime(self):
from mx.DateTime import DateTime
self._test_type_roundtrip(DateTime(2010, 5, 3, 10, 20, 30))
def test_type_roundtrip_time(self):
from mx.DateTime import Time
self._test_type_roundtrip(Time(10, 20, 30))
def test_type_roundtrip_interval(self):
from mx.DateTime import DateTimeDeltaFrom
self._test_type_roundtrip(DateTimeDeltaFrom(seconds=30))
def test_type_roundtrip_date_array(self):
from mx.DateTime import Date
self._test_type_roundtrip_array(Date(2010, 5, 3))
def test_type_roundtrip_datetime_array(self):
from mx.DateTime import DateTime
self._test_type_roundtrip_array(DateTime(2010, 5, 3, 10, 20, 30))
def test_type_roundtrip_time_array(self):
from mx.DateTime import Time
self._test_type_roundtrip_array(Time(10, 20, 30))
def test_type_roundtrip_interval_array(self):
from mx.DateTime import DateTimeDeltaFrom
self._test_type_roundtrip_array(DateTimeDeltaFrom(seconds=30))
@ -659,8 +650,6 @@ class FixedOffsetTimezoneTests(unittest.TestCase):
def test_pickle(self):
# ticket #135
import pickle
tz11 = FixedOffsetTimezone(60)
tz12 = FixedOffsetTimezone(120)
for proto in [-1, 0, 1, 2]:

View File

@ -25,14 +25,6 @@
import unittest
from .testutils import ConnectingTestCase, slow, reload
try:
reload
except NameError:
try:
from importlib import reload
except ImportError:
from imp import reload
from threading import Thread
from psycopg2 import errorcodes

View File

@ -24,6 +24,7 @@
import select
import unittest
import warnings
import psycopg2
import psycopg2.extensions
import psycopg2.extras
@ -81,7 +82,6 @@ class GreenTestCase(ConnectingTestCase):
# on high load on linux: probably because the kernel has more
# buffers ready. A warning may be useful during development,
# but an error is bad during regression testing.
import warnings
warnings.warn("sending a large query didn't trigger block on write.")
def test_error_in_callback(self):

View File

@ -23,16 +23,15 @@ import psycopg2
import psycopg2.extras
try:
import ipaddress
import ipaddress as ip
except ImportError:
# Python 2
ipaddress = None
ip = None
@unittest.skipIf(ipaddress is None, "'ipaddress' module not available")
@unittest.skipIf(ip is None, "'ipaddress' module not available")
class NetworkingTestCase(testutils.ConnectingTestCase):
def test_inet_cast(self):
import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
@ -51,7 +50,6 @@ class NetworkingTestCase(testutils.ConnectingTestCase):
@testutils.skip_before_postgres(8, 2)
def test_inet_array_cast(self):
import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
cur.execute("select '{NULL,127.0.0.1,::ffff:102:300/128}'::inet[]")
@ -63,7 +61,6 @@ class NetworkingTestCase(testutils.ConnectingTestCase):
self.assert_(isinstance(l[2], ip.IPv6Interface), l)
def test_inet_adapt(self):
import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
@ -74,7 +71,6 @@ class NetworkingTestCase(testutils.ConnectingTestCase):
self.assertEquals(cur.fetchone()[0], '::ffff:102:300/128')
def test_cidr_cast(self):
import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
@ -93,7 +89,6 @@ class NetworkingTestCase(testutils.ConnectingTestCase):
@testutils.skip_before_postgres(8, 2)
def test_cidr_array_cast(self):
import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
cur.execute("select '{NULL,127.0.0.1,::ffff:102:300/128}'::cidr[]")
@ -105,7 +100,6 @@ class NetworkingTestCase(testutils.ConnectingTestCase):
self.assert_(isinstance(l[2], ip.IPv6Network), l)
def test_cidr_adapt(self):
import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)

View File

@ -25,6 +25,7 @@
import gc
import os
import sys
import pickle
from subprocess import Popen
from weakref import ref
@ -285,7 +286,6 @@ class ExceptionsTestCase(ConnectingTestCase):
self.assertEqual(e.diag.severity_nonlocalized, 'ERROR')
def test_pickle(self):
import pickle
cur = self.conn.cursor()
try:
cur.execute("select * from nonexist")
@ -300,7 +300,6 @@ class ExceptionsTestCase(ConnectingTestCase):
def test_pickle_connection_error(self):
# segfaults on psycopg 2.5.1 - see ticket #170
import pickle
try:
psycopg2.connect('dbname=nosuchdatabasemate')
except psycopg2.Error as exc:

View File

@ -22,6 +22,7 @@
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
import time
from select import select
import psycopg2
@ -47,7 +48,6 @@ class ReplicationTestCase(ConnectingTestCase):
# first close all connections, as they might keep the slot(s) active
super(ReplicationTestCase, self).tearDown()
import time
time.sleep(0.025) # sometimes the slot is still active, wait a little
if self._slots:

View File

@ -22,8 +22,10 @@
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
import string
import ctypes
import decimal
import datetime
import platform
import sys
@ -155,7 +157,6 @@ class TypesBasicTests(ConnectingTestCase):
def testEmptyArrayRegression(self):
# ticket #42
import datetime
curs = self.conn.cursor()
curs.execute(
"create table array_test "
@ -478,9 +479,6 @@ class AdaptSubclassTest(unittest.TestCase):
del psycopg2.extensions.adapters[A, psycopg2.extensions.ISQLQuote]
def test_conform_subclass_precedence(self):
import psycopg2.extensions as ext
class foo(tuple):
def __conform__(self, proto):
return self
@ -488,7 +486,7 @@ class AdaptSubclassTest(unittest.TestCase):
def getquoted(self):
return 'bar'
self.assertEqual(ext.adapt(foo((1, 2, 3))).getquoted(), 'bar')
self.assertEqual(adapt(foo((1, 2, 3))).getquoted(), 'bar')
@unittest.skipIf(
@ -556,7 +554,6 @@ class ByteaParserTest(unittest.TestCase):
self.assertEqual(rv, bytes(range(256)))
def test_escaped_mixed(self):
import string
buf = ''.join(("\\%03o" % i) for i in range(32))
buf += string.ascii_letters
buf += ''.join('\\' + c for c in string.ascii_letters)

View File

@ -16,6 +16,8 @@
import re
import sys
import json
import uuid
import warnings
from decimal import Decimal
from datetime import date, datetime
@ -32,7 +34,7 @@ import psycopg2.extensions as ext
from psycopg2._json import _get_json_oids
from psycopg2.extras import (
CompositeCaster, DateRange, DateTimeRange, DateTimeTZRange, HstoreAdapter,
Inet, Json, NumericRange, Range, RealDictConnection, json,
Inet, Json, NumericRange, Range, RealDictConnection,
register_composite, register_hstore, register_range,
)
from psycopg2.tz import FixedOffsetTimezone
@ -48,7 +50,6 @@ class TypesExtrasTests(ConnectingTestCase):
@skip_if_no_uuid
def testUUID(self):
import uuid
psycopg2.extras.register_uuid()
u = uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350')
s = self.execute("SELECT %s AS foo", (u,))
@ -59,7 +60,6 @@ class TypesExtrasTests(ConnectingTestCase):
@skip_if_no_uuid
def testUUIDARRAY(self):
import uuid
psycopg2.extras.register_uuid()
u = [uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350'),
uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e352')]
@ -1019,7 +1019,6 @@ def skip_if_no_jsonb_type(f):
class JsonbTestCase(ConnectingTestCase):
@staticmethod
def myloads(s):
import json
rv = json.loads(s)
rv['test'] = 1
return rv

View File

@ -32,23 +32,28 @@ import platform
import unittest
from functools import wraps
from ctypes.util import find_library
from .testconfig import dsn, repl_dsn
from psycopg2 import ProgrammingError
import psycopg2
import psycopg2.errors
import psycopg2.extensions
from psycopg2.compat import text_type
from .testconfig import green
from .testconfig import green, dsn, repl_dsn
# Python 2/3 compatibility
if sys.version_info[0] == 2:
# Python 2
from StringIO import StringIO
TextIOBase = object
long = long
reload = reload
unichr = unichr
else:
# Python 3
from io import StringIO # noqa
from io import TextIOBase # noqa
from importlib import reload # noqa
long = int
unichr = chr
@ -116,7 +121,6 @@ class ConnectingTestCase(unittest.TestCase):
conninfo = kwargs.pop('dsn')
else:
conninfo = dsn
import psycopg2
conn = psycopg2.connect(conninfo, **kwargs)
self._conns.append(conn)
return conn
@ -135,7 +139,6 @@ class ConnectingTestCase(unittest.TestCase):
if 'dsn' not in kwargs:
kwargs['dsn'] = repl_dsn
import psycopg2
try:
conn = self.connect(**kwargs)
if conn.async_ == 1:
@ -164,7 +167,6 @@ class ConnectingTestCase(unittest.TestCase):
# for use with async connections only
def wait(self, cur_or_conn):
import psycopg2.extensions
pollable = cur_or_conn
if not hasattr(pollable, 'poll'):
pollable = cur_or_conn.connection
@ -221,7 +223,7 @@ def decorate_all_tests(obj, *decorators):
@decorate_all_tests
def skip_if_no_uuid(f):
"""Decorator to skip a test if uuid is not supported by Py/PG."""
"""Decorator to skip a test if uuid is not supported by PG."""
@wraps(f)
def skip_if_no_uuid_(self):
try:
@ -248,7 +250,7 @@ def skip_if_tpc_disabled(f):
cur = cnn.cursor()
try:
cur.execute("SHOW max_prepared_transactions;")
except ProgrammingError:
except psycopg2.ProgrammingError:
return self.skipTest(
"server too old: two phase transactions not supported.")
else:
@ -306,7 +308,6 @@ def skip_after_postgres(*ver):
def libpq_version():
import psycopg2
v = psycopg2.__libpq_version__
if v >= 90100:
v = min(v, psycopg2.extensions.libpq_version())
@ -372,12 +373,8 @@ def skip_if_no_superuser(f):
def skip_if_no_superuser_(self):
try:
return f(self)
except ProgrammingError as e:
import psycopg2.errorcodes
if e.pgcode == psycopg2.errorcodes.INSUFFICIENT_PRIVILEGE:
self.skipTest("skipped because not superuser")
else:
raise
except psycopg2.errors.InsufficientPrivilege:
self.skipTest("skipped because not superuser")
return skip_if_no_superuser_