mirror of
https://github.com/psycopg/psycopg2.git
synced 2024-11-10 19:16:34 +03:00
Full flake8 3.5 cleanup
This commit is contained in:
parent
1bb3d5cfe2
commit
05f9e231a0
|
@ -72,8 +72,8 @@ _ext.register_adapter(type(None), _ext.NoneAdapter)
|
|||
# Register the Decimal adapter here instead of in the C layer.
|
||||
# This way a new class is registered for each sub-interpreter.
|
||||
# See ticket #52
|
||||
from decimal import Decimal
|
||||
from psycopg2._psycopg import Decimal as Adapter
|
||||
from decimal import Decimal # noqa
|
||||
from psycopg2._psycopg import Decimal as Adapter # noqa
|
||||
_ext.register_adapter(Decimal, Adapter)
|
||||
del Decimal, Adapter
|
||||
|
||||
|
|
|
@ -506,10 +506,10 @@ class NumberRangeAdapter(RangeAdapter):
|
|||
return ("'%s%s,%s%s'" % (
|
||||
r._bounds[0], lower, upper, r._bounds[1])).encode('ascii')
|
||||
|
||||
|
||||
# TODO: probably won't work with infs, nans and other tricky cases.
|
||||
register_adapter(NumericRange, NumberRangeAdapter)
|
||||
|
||||
|
||||
# Register globally typecasters and adapters for builtin range types.
|
||||
|
||||
# note: the adapter is registered more than once, but this is harmless.
|
||||
|
|
|
@ -31,10 +31,7 @@ import time as _time
|
|||
import re as _re
|
||||
from collections import namedtuple, OrderedDict
|
||||
|
||||
try:
|
||||
import logging as _logging
|
||||
except:
|
||||
_logging = None
|
||||
import logging as _logging
|
||||
|
||||
import psycopg2
|
||||
from psycopg2 import extensions as _ext
|
||||
|
@ -189,7 +186,7 @@ class DictRow(list):
|
|||
def get(self, x, default=None):
|
||||
try:
|
||||
return self[x]
|
||||
except:
|
||||
except Exception:
|
||||
return default
|
||||
|
||||
def copy(self):
|
||||
|
|
|
@ -138,7 +138,7 @@ class AbstractConnectionPool(object):
|
|||
for conn in self._pool + list(self._used.values()):
|
||||
try:
|
||||
conn.close()
|
||||
except:
|
||||
except Exception:
|
||||
pass
|
||||
self.closed = True
|
||||
|
||||
|
|
|
@ -132,6 +132,7 @@ class LocalTimezone(datetime.tzinfo):
|
|||
tt = time.localtime(stamp)
|
||||
return tt.tm_isdst > 0
|
||||
|
||||
|
||||
LOCAL = LocalTimezone()
|
||||
|
||||
# TODO: pre-generate some interesting time zones?
|
||||
|
|
|
@ -460,5 +460,6 @@ class AsyncTests(ConnectingTestCase):
|
|||
def test_suite():
|
||||
return unittest.TestLoader().loadTestsFromName(__name__)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
|
@ -218,5 +218,6 @@ class AsyncReplicationTest(ReplicationTestCase):
|
|||
def test_suite():
|
||||
return unittest.TestLoader().loadTestsFromName(__name__)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
|
@ -42,5 +42,6 @@ class DateTimeAllocationBugTestCase(unittest.TestCase):
|
|||
def test_suite():
|
||||
return unittest.TestLoader().loadTestsFromName(__name__)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
|
@ -114,5 +114,6 @@ class CancelTests(ConnectingTestCase):
|
|||
def test_suite():
|
||||
return unittest.TestLoader().loadTestsFromName(__name__)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
|
@ -1845,5 +1845,6 @@ class TestConnectionInfo(ConnectingTestCase):
|
|||
def test_suite():
|
||||
return unittest.TestLoader().loadTestsFromName(__name__)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
|
@ -140,7 +140,8 @@ class CopyTests(ConnectingTestCase):
|
|||
about = abin.decode('latin1').replace('\\', '\\\\')
|
||||
|
||||
else:
|
||||
abin = bytes(list(range(32, 127)) + list(range(160, 256))).decode('latin1')
|
||||
abin = bytes(list(range(32, 127))
|
||||
+ list(range(160, 256))).decode('latin1')
|
||||
about = abin.replace('\\', '\\\\')
|
||||
|
||||
curs = self.conn.cursor()
|
||||
|
@ -161,7 +162,8 @@ class CopyTests(ConnectingTestCase):
|
|||
abin = ''.join(map(chr, range(32, 127) + range(160, 255)))
|
||||
about = abin.replace('\\', '\\\\')
|
||||
else:
|
||||
abin = bytes(list(range(32, 127)) + list(range(160, 255))).decode('latin1')
|
||||
abin = bytes(list(range(32, 127))
|
||||
+ list(range(160, 255))).decode('latin1')
|
||||
about = abin.replace('\\', '\\\\').encode('latin1')
|
||||
|
||||
curs = self.conn.cursor()
|
||||
|
@ -184,7 +186,8 @@ class CopyTests(ConnectingTestCase):
|
|||
about = abin.replace('\\', '\\\\')
|
||||
|
||||
else:
|
||||
abin = bytes(list(range(32, 127)) + list(range(160, 256))).decode('latin1')
|
||||
abin = bytes(list(range(32, 127))
|
||||
+ list(range(160, 256))).decode('latin1')
|
||||
about = abin.replace('\\', '\\\\')
|
||||
|
||||
import io
|
||||
|
@ -381,5 +384,6 @@ decorate_all_tests(CopyTests, skip_copy_if_green)
|
|||
def test_suite():
|
||||
return unittest.TestLoader().loadTestsFromName(__name__)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
|
@ -589,7 +589,10 @@ class CursorTests(ConnectingTestCase):
|
|||
# psycopg2 noticing.
|
||||
control_conn = self.conn
|
||||
connect_func = self.connect
|
||||
wait_func = lambda conn: None
|
||||
|
||||
def wait_func(conn):
|
||||
pass
|
||||
|
||||
self._test_external_close(control_conn, connect_func, wait_func)
|
||||
|
||||
@skip_if_no_superuser
|
||||
|
@ -599,7 +602,10 @@ class CursorTests(ConnectingTestCase):
|
|||
# Issue #443 is in the async code too. Since the fix is duplicated,
|
||||
# so is the test.
|
||||
control_conn = self.conn
|
||||
connect_func = lambda: self.connect(async_=True)
|
||||
|
||||
def connect_func():
|
||||
return self.connect(async_=True)
|
||||
|
||||
wait_func = psycopg2.extras.wait_select
|
||||
self._test_external_close(control_conn, connect_func, wait_func)
|
||||
|
||||
|
@ -648,5 +654,6 @@ class CursorTests(ConnectingTestCase):
|
|||
def test_suite():
|
||||
return unittest.TestLoader().loadTestsFromName(__name__)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
|
@ -713,5 +713,6 @@ class FixedOffsetTimezoneTests(unittest.TestCase):
|
|||
def test_suite():
|
||||
return unittest.TestLoader().loadTestsFromName(__name__)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
|
@ -68,5 +68,6 @@ class ErrocodeTests(ConnectingTestCase):
|
|||
def test_suite():
|
||||
return unittest.TestLoader().loadTestsFromName(__name__)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
|
@ -66,5 +66,6 @@ class ErrorsTests(ConnectingTestCase):
|
|||
def test_suite():
|
||||
return unittest.TestLoader().loadTestsFromName(__name__)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
|
@ -582,5 +582,6 @@ class NamedTupleCursorTest(ConnectingTestCase):
|
|||
def test_suite():
|
||||
return unittest.TestLoader().loadTestsFromName(__name__)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
|
@ -238,5 +238,6 @@ testutils.decorate_all_tests(TestExecuteValues,
|
|||
def test_suite():
|
||||
return unittest.TestLoader().loadTestsFromName(__name__)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
|
@ -206,5 +206,6 @@ class CallbackErrorTestCase(ConnectingTestCase):
|
|||
def test_suite():
|
||||
return unittest.TestLoader().loadTestsFromName(__name__)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
|
@ -125,6 +125,7 @@ class NetworkingTestCase(testutils.ConnectingTestCase):
|
|||
cur.execute("select %s", [ip.ip_network('::ffff:102:300/128')])
|
||||
self.assertEquals(cur.fetchone()[0], '::ffff:102:300/128')
|
||||
|
||||
|
||||
testutils.decorate_all_tests(NetworkingTestCase, skip_if_no_ipaddress)
|
||||
|
||||
|
||||
|
|
|
@ -44,6 +44,7 @@ def skip_if_no_lo(f):
|
|||
|
||||
return skip_if_no_lo_
|
||||
|
||||
|
||||
skip_lo_if_green = skip_if_green("libpq doesn't support LO in async mode")
|
||||
|
||||
|
||||
|
@ -397,6 +398,7 @@ class LargeObjectTests(LargeObjectTestCase):
|
|||
lo = self.conn.lobject(lobject_factory=lobject_subclass)
|
||||
self.assert_(isinstance(lo, lobject_subclass))
|
||||
|
||||
|
||||
decorate_all_tests(LargeObjectTests, skip_if_no_lo, skip_lo_if_green)
|
||||
|
||||
|
||||
|
@ -453,6 +455,7 @@ class LargeObjectTruncateTests(LargeObjectTestCase):
|
|||
|
||||
self.assertRaises(psycopg2.ProgrammingError, lo.truncate)
|
||||
|
||||
|
||||
decorate_all_tests(LargeObjectTruncateTests,
|
||||
skip_if_no_lo, skip_lo_if_green, skip_if_no_truncate)
|
||||
|
||||
|
@ -491,6 +494,7 @@ class LargeObject64Tests(LargeObjectTestCase):
|
|||
self.assertEqual(lo.seek(length, 0), length)
|
||||
self.assertEqual(lo.tell(), length)
|
||||
|
||||
|
||||
decorate_all_tests(LargeObject64Tests,
|
||||
skip_if_no_lo, skip_lo_if_green, skip_if_no_truncate, skip_if_no_lo64)
|
||||
|
||||
|
@ -522,6 +526,7 @@ class LargeObjectNot64Tests(LargeObjectTestCase):
|
|||
(OverflowError, psycopg2.InterfaceError, psycopg2.NotSupportedError),
|
||||
lo.truncate, length)
|
||||
|
||||
|
||||
decorate_all_tests(LargeObjectNot64Tests,
|
||||
skip_if_no_lo, skip_lo_if_green, skip_if_no_truncate, skip_if_lo64)
|
||||
|
||||
|
@ -529,5 +534,6 @@ decorate_all_tests(LargeObjectNot64Tests,
|
|||
def test_suite():
|
||||
return unittest.TestLoader().loadTestsFromName(__name__)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
|
@ -348,5 +348,6 @@ class TestVersionDiscovery(unittest.TestCase):
|
|||
def test_suite():
|
||||
return unittest.TestLoader().loadTestsFromName(__name__)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
|
@ -37,7 +37,7 @@ class Psycopg2Tests(dbapi20.DatabaseAPI20Test):
|
|||
connect_args = ()
|
||||
connect_kw_args = {'dsn': dsn}
|
||||
|
||||
lower_func = 'lower' # For stored procedure test
|
||||
lower_func = 'lower' # For stored procedure test
|
||||
|
||||
def test_callproc(self):
|
||||
# Until DBAPI 2.0 compliance, callproc should return None or it's just
|
||||
|
@ -50,16 +50,14 @@ class Psycopg2Tests(dbapi20.DatabaseAPI20Test):
|
|||
con = self._connect()
|
||||
try:
|
||||
cur = con.cursor()
|
||||
if self.lower_func and hasattr(cur,'callproc'):
|
||||
cur.callproc(self.lower_func,('FOO',))
|
||||
if self.lower_func and hasattr(cur, 'callproc'):
|
||||
cur.callproc(self.lower_func, ('FOO',))
|
||||
r = cur.fetchall()
|
||||
self.assertEqual(len(r),1,'callproc produced no result set')
|
||||
self.assertEqual(len(r[0]),1,
|
||||
'callproc produced invalid result set'
|
||||
)
|
||||
self.assertEqual(r[0][0],'foo',
|
||||
'callproc produced invalid results'
|
||||
)
|
||||
self.assertEqual(len(r), 1, 'callproc produced no result set')
|
||||
self.assertEqual(len(r[0]), 1,
|
||||
'callproc produced invalid result set')
|
||||
self.assertEqual(r[0][0], 'foo',
|
||||
'callproc produced invalid results')
|
||||
finally:
|
||||
con.close()
|
||||
|
||||
|
@ -78,11 +76,13 @@ class Psycopg2TPCTests(dbapi20_tpc.TwoPhaseCommitTests, unittest.TestCase):
|
|||
def connect(self):
|
||||
return psycopg2.connect(dsn=dsn)
|
||||
|
||||
|
||||
decorate_all_tests(Psycopg2TPCTests, skip_if_tpc_disabled)
|
||||
|
||||
|
||||
def test_suite():
|
||||
return unittest.TestLoader().loadTestsFromName(__name__)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
|
@ -126,7 +126,8 @@ class QuotingTestCase(ConnectingTestCase):
|
|||
if sys.version_info[0] < 3:
|
||||
data = ''.join(map(chr, range(32, 127) + range(160, 256)))
|
||||
else:
|
||||
data = bytes(list(range(32, 127)) + list(range(160, 256))).decode('latin1')
|
||||
data = bytes(list(range(32, 127))
|
||||
+ list(range(160, 256))).decode('latin1')
|
||||
|
||||
# as string
|
||||
curs.execute("SELECT %s::text;", (data,))
|
||||
|
@ -150,7 +151,8 @@ class QuotingTestCase(ConnectingTestCase):
|
|||
if sys.version_info[0] < 3:
|
||||
data = ''.join(map(chr, range(32, 127) + range(128, 256)))
|
||||
else:
|
||||
data = bytes(list(range(32, 127)) + list(range(128, 256))).decode('koi8_r')
|
||||
data = bytes(list(range(32, 127))
|
||||
+ list(range(128, 256))).decode('koi8_r')
|
||||
|
||||
# as string
|
||||
curs.execute("SELECT %s::text;", (data,))
|
||||
|
@ -252,5 +254,6 @@ class TestStringAdapter(ConnectingTestCase):
|
|||
def test_suite():
|
||||
return unittest.TestLoader().loadTestsFromName(__name__)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
|
@ -412,5 +412,6 @@ class ValuesTest(ConnectingTestCase):
|
|||
def test_suite():
|
||||
return unittest.TestLoader().loadTestsFromName(__name__)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
|
@ -249,5 +249,6 @@ class QueryCancellationTests(ConnectingTestCase):
|
|||
def test_suite():
|
||||
return unittest.TestLoader().loadTestsFromName(__name__)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
|
@ -168,8 +168,8 @@ class TypesBasicTests(ConnectingTestCase):
|
|||
self.assertEqual(curs.fetchone()[0], [])
|
||||
|
||||
# issue #788 (test commented out until issue fixed)
|
||||
#curs.execute("select null = any(%s)", ([[]], ))
|
||||
#self.assertFalse(curs.fetchone()[0])
|
||||
# curs.execute("select null = any(%s)", ([[]], ))
|
||||
# self.assertFalse(curs.fetchone()[0])
|
||||
|
||||
def testEmptyArrayNoCast(self):
|
||||
s = self.execute("SELECT '{}' AS foo")
|
||||
|
@ -232,9 +232,9 @@ class TypesBasicTests(ConnectingTestCase):
|
|||
curs.execute("insert into na (textaa) values (%s)", ([['a', None]],))
|
||||
curs.execute("insert into na (textaa) values (%s)", ([[None, None]],))
|
||||
|
||||
curs.execute("insert into na (intaa) values (%s)", ([[None]],))
|
||||
curs.execute("insert into na (intaa) values (%s)", ([[None]],))
|
||||
curs.execute("insert into na (intaa) values (%s)", ([[42, None]],))
|
||||
curs.execute("insert into na (intaa) values (%s)", ([[None, None]],))
|
||||
curs.execute("insert into na (intaa) values (%s)", ([[None, None]],))
|
||||
|
||||
curs.execute("insert into na (boolaa) values (%s)", ([[None]],))
|
||||
curs.execute("insert into na (boolaa) values (%s)", ([[True, None]],))
|
||||
|
@ -554,11 +554,13 @@ def skip_if_cant_cast(f):
|
|||
|
||||
return skip_if_cant_cast_
|
||||
|
||||
|
||||
decorate_all_tests(ByteaParserTest, skip_if_cant_cast)
|
||||
|
||||
|
||||
def test_suite():
|
||||
return unittest.TestLoader().loadTestsFromName(__name__)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
|
@ -123,6 +123,7 @@ class TypesExtrasTests(ConnectingTestCase):
|
|||
s = self.execute("""SELECT '{"(1,2)","(3,4)"}' AS foo""")
|
||||
self.failUnless(s == """{"(1,2)","(3,4)"}""")
|
||||
|
||||
|
||||
def skip_if_no_hstore(f):
|
||||
@wraps(f)
|
||||
def skip_if_no_hstore_(self):
|
||||
|
@ -1137,6 +1138,7 @@ class JsonbTestCase(ConnectingTestCase):
|
|||
curs.execute("""select NULL::jsonb[]""")
|
||||
self.assertEqual(curs.fetchone()[0], None)
|
||||
|
||||
|
||||
decorate_all_tests(JsonbTestCase, skip_if_no_jsonb_type)
|
||||
|
||||
|
||||
|
@ -1425,7 +1427,7 @@ class RangeTestCase(unittest.TestCase):
|
|||
from datetime import datetime
|
||||
from psycopg2.tz import FixedOffsetTimezone
|
||||
converter = unicode if sys.version_info < (3, 0) else str
|
||||
tz = FixedOffsetTimezone(-5*60, "EST")
|
||||
tz = FixedOffsetTimezone(-5 * 60, "EST")
|
||||
r = DateTimeTZRange(datetime(2010, 1, 1, tzinfo=tz),
|
||||
datetime(2011, 1, 1, tzinfo=tz))
|
||||
expected = u'[2010-01-01 00:00:00-05:00, 2011-01-01 00:00:00-05:00)'
|
||||
|
@ -1763,6 +1765,7 @@ class RangeCasterTestCase(ConnectingTestCase):
|
|||
for r in [ra1, ra2, rars2, rars3]:
|
||||
del ext.adapters[r.range, ext.ISQLQuote]
|
||||
|
||||
|
||||
decorate_all_tests(RangeCasterTestCase, skip_if_no_range)
|
||||
|
||||
|
||||
|
|
|
@ -217,12 +217,13 @@ class WithCursorTestCase(WithTestCase):
|
|||
|
||||
@skip_before_postgres(8, 2)
|
||||
def test_named_with_noop(self):
|
||||
with self.conn.cursor('named') as cur:
|
||||
with self.conn.cursor('named'):
|
||||
pass
|
||||
|
||||
|
||||
def test_suite():
|
||||
return unittest.TestLoader().loadTestsFromName(__name__)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
|
|
@ -42,8 +42,8 @@ if sys.version_info[0] == 2:
|
|||
unichr = unichr
|
||||
else:
|
||||
# Python 3
|
||||
from io import StringIO
|
||||
from importlib import reload
|
||||
from io import StringIO # noqa
|
||||
from importlib import reload # noqa
|
||||
long = int
|
||||
unichr = chr
|
||||
|
||||
|
@ -64,6 +64,7 @@ def assertDsnEqual(self, dsn1, dsn2, msg=None):
|
|||
"""Check that two conninfo string have the same content"""
|
||||
self.assertEqual(set(dsn1.split()), set(dsn2.split()), msg)
|
||||
|
||||
|
||||
unittest.TestCase.assertDsnEqual = assertDsnEqual
|
||||
|
||||
|
||||
|
@ -363,6 +364,7 @@ def skip_if_green(reason):
|
|||
return skip_if_green__
|
||||
return skip_if_green_
|
||||
|
||||
|
||||
skip_copy_if_green = skip_if_green("copy in async mode currently not supported")
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user