Use PY2, PY3 for conditional code instead of sys.version_info

This commit is contained in:
Daniele Varrazzo 2019-03-16 17:15:16 +00:00
parent f4a2630f1a
commit dfb301b42b
10 changed files with 47 additions and 61 deletions

View File

@ -28,10 +28,10 @@ extensions importing register_json from extras.
# License for more details. # License for more details.
import json import json
import sys
from psycopg2._psycopg import ISQLQuote, QuotedString from psycopg2._psycopg import ISQLQuote, QuotedString
from psycopg2._psycopg import new_type, new_array_type, register_type from psycopg2._psycopg import new_type, new_array_type, register_type
from psycopg2.compat import PY2
# oids from PostgreSQL 9.2 # oids from PostgreSQL 9.2
@ -81,7 +81,7 @@ class Json(object):
qs.prepare(self._conn) qs.prepare(self._conn)
return qs.getquoted() return qs.getquoted()
if sys.version_info < (3,): if PY2:
def __str__(self): def __str__(self):
return self.getquoted() return self.getquoted()
else: else:

View File

@ -26,7 +26,6 @@ and classes until a better place in the distribution is found.
# License for more details. # License for more details.
import os as _os import os as _os
import sys as _sys
import time as _time import time as _time
import re as _re import re as _re
from collections import namedtuple, OrderedDict from collections import namedtuple, OrderedDict
@ -38,7 +37,7 @@ from psycopg2 import extensions as _ext
from .extensions import cursor as _cursor from .extensions import cursor as _cursor
from .extensions import connection as _connection from .extensions import connection as _connection
from .extensions import adapt as _A, quote_ident from .extensions import adapt as _A, quote_ident
from .compat import lru_cache from .compat import PY2, PY3, lru_cache
from psycopg2._psycopg import ( # noqa from psycopg2._psycopg import ( # noqa
REPLICATION_PHYSICAL, REPLICATION_LOGICAL, REPLICATION_PHYSICAL, REPLICATION_LOGICAL,
@ -203,7 +202,7 @@ class DictRow(list):
self[:] = data[0] self[:] = data[0]
self._index = data[1] self._index = data[1]
if _sys.version_info[0] < 3: if PY2:
iterkeys = keys iterkeys = keys
itervalues = values itervalues = values
iteritems = items iteritems = items
@ -291,7 +290,7 @@ class RealDictRow(dict):
def items(self): def items(self):
return ((k, self[k]) for k in self._column_mapping) return ((k, self[k]) for k in self._column_mapping)
if _sys.version_info[0] < 3: if PY2:
iterkeys = keys iterkeys = keys
itervalues = values itervalues = values
iteritems = items iteritems = items
@ -438,7 +437,7 @@ class LoggingConnection(_connection):
def _logtofile(self, msg, curs): def _logtofile(self, msg, curs):
msg = self.filter(msg, curs) msg = self.filter(msg, curs)
if msg: if msg:
if _sys.version_info[0] >= 3 and isinstance(msg, bytes): if PY3 and isinstance(msg, bytes):
msg = msg.decode(_ext.encodings[self.encoding], 'replace') msg = msg.decode(_ext.encodings[self.encoding], 'replace')
self._logobj.write(msg + _os.linesep) self._logobj.write(msg + _os.linesep)
@ -492,7 +491,7 @@ class MinTimeLoggingConnection(LoggingConnection):
def filter(self, msg, curs): def filter(self, msg, curs):
t = (_time.time() - curs.timestamp) * 1000 t = (_time.time() - curs.timestamp) * 1000
if t > self._mintime: if t > self._mintime:
if _sys.version_info[0] >= 3 and isinstance(msg, bytes): if PY3 and isinstance(msg, bytes):
msg = msg.decode(_ext.encodings[self.encoding], 'replace') msg = msg.decode(_ext.encodings[self.encoding], 'replace')
return msg + _os.linesep + " (execution time: %d ms)" % t return msg + _os.linesep + " (execution time: %d ms)" % t
@ -992,7 +991,7 @@ def register_hstore(conn_or_curs, globally=False, unicode=False,
array_oid = tuple([x for x in array_oid if x]) array_oid = tuple([x for x in array_oid if x])
# create and register the typecaster # create and register the typecaster
if _sys.version_info[0] < 3 and unicode: if PY2 and unicode:
cast = HstoreAdapter.parse_unicode cast = HstoreAdapter.parse_unicode
else: else:
cast = HstoreAdapter.parse cast = HstoreAdapter.parse

View File

@ -23,11 +23,10 @@
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details. # License for more details.
import sys
import string import string
from psycopg2 import extensions as ext from psycopg2 import extensions as ext
from psycopg2.compat import string_types from psycopg2.compat import PY3, string_types
_formatter = string.Formatter() _formatter = string.Formatter()
@ -392,7 +391,7 @@ class Literal(Composable):
a.prepare(conn) a.prepare(conn)
rv = a.getquoted() rv = a.getquoted()
if sys.version_info[0] >= 3 and isinstance(rv, bytes): if PY3 and isinstance(rv, bytes):
rv = rv.decode(ext.encodings[conn.encoding]) rv = rv.decode(ext.encodings[conn.encoding])
return rv return rv

View File

@ -71,10 +71,6 @@ import sys
# nothing # nothing
# - Fix bugs in test_setoutputsize_basic and test_setinputsizes # - Fix bugs in test_setoutputsize_basic and test_setinputsizes
# #
def str2bytes(sval):
if sys.version_info < (3,0) and isinstance(sval, str):
sval = sval.decode("latin1")
return sval.encode("latin1")
class DatabaseAPI20Test(unittest.TestCase): class DatabaseAPI20Test(unittest.TestCase):
''' Test a database self.driver for DB API 2.0 compatibility. ''' Test a database self.driver for DB API 2.0 compatibility.
@ -842,8 +838,8 @@ class DatabaseAPI20Test(unittest.TestCase):
# self.assertEqual(str(t1),str(t2)) # self.assertEqual(str(t1),str(t2))
def test_Binary(self): def test_Binary(self):
b = self.driver.Binary(str2bytes('Something')) b = self.driver.Binary(b'Something')
b = self.driver.Binary(str2bytes('')) b = self.driver.Binary(b'')
def test_STRING(self): def test_STRING(self):
self.failUnless(hasattr(self.driver,'STRING'), self.failUnless(hasattr(self.driver,'STRING'),

View File

@ -39,7 +39,7 @@ import psycopg2.extras
from psycopg2 import extensions as ext from psycopg2 import extensions as ext
from .testutils import ( from .testutils import (
unittest, skip_if_no_superuser, skip_before_postgres, PY2, unittest, skip_if_no_superuser, skip_before_postgres,
skip_after_postgres, skip_before_libpq, skip_after_libpq, skip_after_postgres, skip_before_libpq, skip_after_libpq,
ConnectingTestCase, skip_if_tpc_disabled, skip_if_windows, slow) ConnectingTestCase, skip_if_tpc_disabled, skip_if_windows, slow)
@ -404,7 +404,7 @@ class ParseDsnTestCase(ConnectingTestCase):
def test_unicode_value(self): def test_unicode_value(self):
snowman = u"\u2603" snowman = u"\u2603"
d = ext.parse_dsn('dbname=' + snowman) d = ext.parse_dsn('dbname=' + snowman)
if sys.version_info[0] < 3: if PY2:
self.assertEqual(d['dbname'], snowman.encode('utf8')) self.assertEqual(d['dbname'], snowman.encode('utf8'))
else: else:
self.assertEqual(d['dbname'], snowman) self.assertEqual(d['dbname'], snowman)

View File

@ -32,7 +32,7 @@ from subprocess import Popen, PIPE
import psycopg2 import psycopg2
import psycopg2.extensions import psycopg2.extensions
from .testutils import skip_copy_if_green, TextIOBase from .testutils import skip_copy_if_green, PY2, TextIOBase
from .testconfig import dsn from .testconfig import dsn
@ -130,7 +130,7 @@ class CopyTests(ConnectingTestCase):
self.conn.set_client_encoding('latin1') self.conn.set_client_encoding('latin1')
self._create_temp_table() # the above call closed the xn self._create_temp_table() # the above call closed the xn
if sys.version_info[0] < 3: if PY2:
abin = ''.join(map(chr, range(32, 127) + range(160, 256))) abin = ''.join(map(chr, range(32, 127) + range(160, 256)))
about = abin.decode('latin1').replace('\\', '\\\\') about = abin.decode('latin1').replace('\\', '\\\\')
@ -152,7 +152,7 @@ class CopyTests(ConnectingTestCase):
self.conn.set_client_encoding('latin1') self.conn.set_client_encoding('latin1')
self._create_temp_table() # the above call closed the xn self._create_temp_table() # the above call closed the xn
if sys.version_info[0] < 3: if PY2:
abin = ''.join(map(chr, range(32, 127) + range(160, 255))) abin = ''.join(map(chr, range(32, 127) + range(160, 255)))
about = abin.replace('\\', '\\\\') about = abin.replace('\\', '\\\\')
else: else:
@ -173,7 +173,7 @@ class CopyTests(ConnectingTestCase):
self.conn.set_client_encoding('latin1') self.conn.set_client_encoding('latin1')
self._create_temp_table() # the above call closed the xn self._create_temp_table() # the above call closed the xn
if sys.version_info[0] < 3: if PY2:
abin = ''.join(map(chr, range(32, 127) + range(160, 256))) abin = ''.join(map(chr, range(32, 127) + range(160, 256)))
abin = abin.decode('latin1') abin = abin.decode('latin1')
about = abin.replace('\\', '\\\\') about = abin.replace('\\', '\\\\')

View File

@ -22,10 +22,9 @@
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details. # License for more details.
import sys
from . import testutils from . import testutils
import unittest import unittest
from .testutils import ConnectingTestCase, unichr from .testutils import ConnectingTestCase, unichr, PY2
import psycopg2 import psycopg2
import psycopg2.extensions import psycopg2.extensions
@ -79,14 +78,14 @@ class QuotingTestCase(ConnectingTestCase):
data = b"""some data with \000\013 binary data = b"""some data with \000\013 binary
stuff into, 'quotes' and \\ a backslash too. stuff into, 'quotes' and \\ a backslash too.
""" """
if sys.version_info[0] < 3: if PY2:
data += "".join(map(chr, range(256))) data += "".join(map(chr, range(256)))
else: else:
data += bytes(list(range(256))) data += bytes(list(range(256)))
curs = self.conn.cursor() curs = self.conn.cursor()
curs.execute("SELECT %s::bytea;", (psycopg2.Binary(data),)) curs.execute("SELECT %s::bytea;", (psycopg2.Binary(data),))
if sys.version_info[0] < 3: if PY2:
res = str(curs.fetchone()[0]) res = str(curs.fetchone()[0])
else: else:
res = curs.fetchone()[0].tobytes() res = curs.fetchone()[0].tobytes()
@ -124,7 +123,7 @@ class QuotingTestCase(ConnectingTestCase):
def test_latin1(self): def test_latin1(self):
self.conn.set_client_encoding('LATIN1') self.conn.set_client_encoding('LATIN1')
curs = self.conn.cursor() curs = self.conn.cursor()
if sys.version_info[0] < 3: if PY2:
data = ''.join(map(chr, range(32, 127) + range(160, 256))) data = ''.join(map(chr, range(32, 127) + range(160, 256)))
else: else:
data = bytes(list(range(32, 127)) data = bytes(list(range(32, 127))
@ -137,7 +136,7 @@ class QuotingTestCase(ConnectingTestCase):
self.assert_(not self.conn.notices) self.assert_(not self.conn.notices)
# as unicode # as unicode
if sys.version_info[0] < 3: if PY2:
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE, self.conn) psycopg2.extensions.register_type(psycopg2.extensions.UNICODE, self.conn)
data = data.decode('latin1') data = data.decode('latin1')
@ -149,7 +148,7 @@ class QuotingTestCase(ConnectingTestCase):
def test_koi8(self): def test_koi8(self):
self.conn.set_client_encoding('KOI8') self.conn.set_client_encoding('KOI8')
curs = self.conn.cursor() curs = self.conn.cursor()
if sys.version_info[0] < 3: if PY2:
data = ''.join(map(chr, range(32, 127) + range(128, 256))) data = ''.join(map(chr, range(32, 127) + range(128, 256)))
else: else:
data = bytes(list(range(32, 127)) data = bytes(list(range(32, 127))
@ -162,7 +161,7 @@ class QuotingTestCase(ConnectingTestCase):
self.assert_(not self.conn.notices) self.assert_(not self.conn.notices)
# as unicode # as unicode
if sys.version_info[0] < 3: if PY2:
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE, self.conn) psycopg2.extensions.register_type(psycopg2.extensions.UNICODE, self.conn)
data = data.decode('koi8_r') data = data.decode('koi8_r')
@ -202,7 +201,7 @@ class TestQuotedIdentifier(ConnectingTestCase):
def test_unicode_ident(self): def test_unicode_ident(self):
snowman = u"\u2603" snowman = u"\u2603"
quoted = '"' + snowman + '"' quoted = '"' + snowman + '"'
if sys.version_info[0] < 3: if PY2:
self.assertEqual(quote_ident(snowman, self.conn), quoted.encode('utf8')) self.assertEqual(quote_ident(snowman, self.conn), quoted.encode('utf8'))
else: else:
self.assertEqual(quote_ident(snowman, self.conn), quoted) self.assertEqual(quote_ident(snowman, self.conn), quoted)

View File

@ -28,13 +28,11 @@ import decimal
import datetime import datetime
import platform import platform
import sys
from . import testutils from . import testutils
import unittest import unittest
from .testutils import ConnectingTestCase, long from .testutils import PY2, long, text_type, ConnectingTestCase
import psycopg2 import psycopg2
from psycopg2.compat import text_type
from psycopg2.extensions import AsIs, adapt, register_adapter from psycopg2.extensions import AsIs, adapt, register_adapter
@ -110,7 +108,7 @@ class TypesBasicTests(ConnectingTestCase):
self.failUnless(str(s) == "-inf", "wrong float quoting: " + str(s)) self.failUnless(str(s) == "-inf", "wrong float quoting: " + str(s))
def testBinary(self): def testBinary(self):
if sys.version_info[0] < 3: if PY2:
s = ''.join([chr(x) for x in range(256)]) s = ''.join([chr(x) for x in range(256)])
b = psycopg2.Binary(s) b = psycopg2.Binary(s)
buf = self.execute("SELECT %s::bytea AS foo", (b,)) buf = self.execute("SELECT %s::bytea AS foo", (b,))
@ -128,7 +126,7 @@ class TypesBasicTests(ConnectingTestCase):
def testBinaryEmptyString(self): def testBinaryEmptyString(self):
# test to make sure an empty Binary is converted to an empty string # test to make sure an empty Binary is converted to an empty string
if sys.version_info[0] < 3: if PY2:
b = psycopg2.Binary('') b = psycopg2.Binary('')
self.assertEqual(str(b), "''::bytea") self.assertEqual(str(b), "''::bytea")
else: else:
@ -138,7 +136,7 @@ class TypesBasicTests(ConnectingTestCase):
def testBinaryRoundTrip(self): def testBinaryRoundTrip(self):
# test to make sure buffers returned by psycopg2 are # test to make sure buffers returned by psycopg2 are
# understood by execute: # understood by execute:
if sys.version_info[0] < 3: if PY2:
s = ''.join([chr(x) for x in range(256)]) s = ''.join([chr(x) for x in range(256)])
buf = self.execute("SELECT %s::bytea AS foo", (psycopg2.Binary(s),)) buf = self.execute("SELECT %s::bytea AS foo", (psycopg2.Binary(s),))
buf2 = self.execute("SELECT %s::bytea AS foo", (buf,)) buf2 = self.execute("SELECT %s::bytea AS foo", (buf,))
@ -328,7 +326,7 @@ class TypesBasicTests(ConnectingTestCase):
o1 = bytearray(range(256)) o1 = bytearray(range(256))
o2 = self.execute("select %s;", (o1,)) o2 = self.execute("select %s;", (o1,))
if sys.version_info[0] < 3: if PY2:
self.assertEqual(buffer, type(o2)) self.assertEqual(buffer, type(o2))
else: else:
self.assertEqual(memoryview, type(o2)) self.assertEqual(memoryview, type(o2))
@ -342,7 +340,7 @@ class TypesBasicTests(ConnectingTestCase):
o2 = self.execute("select %s;", (o1,)) o2 = self.execute("select %s;", (o1,))
self.assertEqual(len(o2), 0) self.assertEqual(len(o2), 0)
if sys.version_info[0] < 3: if PY2:
self.assertEqual(buffer, type(o2)) self.assertEqual(buffer, type(o2))
else: else:
self.assertEqual(memoryview, type(o2)) self.assertEqual(memoryview, type(o2))
@ -350,7 +348,7 @@ class TypesBasicTests(ConnectingTestCase):
def testAdaptMemoryview(self): def testAdaptMemoryview(self):
o1 = memoryview(bytearray(range(256))) o1 = memoryview(bytearray(range(256)))
o2 = self.execute("select %s;", (o1,)) o2 = self.execute("select %s;", (o1,))
if sys.version_info[0] < 3: if PY2:
self.assertEqual(buffer, type(o2)) self.assertEqual(buffer, type(o2))
else: else:
self.assertEqual(memoryview, type(o2)) self.assertEqual(memoryview, type(o2))
@ -358,7 +356,7 @@ class TypesBasicTests(ConnectingTestCase):
# Test with an empty buffer # Test with an empty buffer
o1 = memoryview(bytearray([])) o1 = memoryview(bytearray([]))
o2 = self.execute("select %s;", (o1,)) o2 = self.execute("select %s;", (o1,))
if sys.version_info[0] < 3: if PY2:
self.assertEqual(buffer, type(o2)) self.assertEqual(buffer, type(o2))
else: else:
self.assertEqual(memoryview, type(o2)) self.assertEqual(memoryview, type(o2))
@ -513,7 +511,7 @@ class ByteaParserTest(unittest.TestCase):
if rv is None: if rv is None:
return None return None
if sys.version_info[0] < 3: if PY2:
return str(rv) return str(rv)
else: else:
return rv.tobytes() return rv.tobytes()
@ -537,7 +535,7 @@ class ByteaParserTest(unittest.TestCase):
buf = buf.upper() buf = buf.upper()
buf = '\\x' + buf buf = '\\x' + buf
rv = self.cast(buf.encode('utf8')) rv = self.cast(buf.encode('utf8'))
if sys.version_info[0] < 3: if PY2:
self.assertEqual(rv, ''.join(map(chr, range(256)))) self.assertEqual(rv, ''.join(map(chr, range(256))))
else: else:
self.assertEqual(rv, bytes(range(256))) self.assertEqual(rv, bytes(range(256)))
@ -548,7 +546,7 @@ class ByteaParserTest(unittest.TestCase):
def test_full_escaped_octal(self): def test_full_escaped_octal(self):
buf = ''.join(("\\%03o" % i) for i in range(256)) buf = ''.join(("\\%03o" % i) for i in range(256))
rv = self.cast(buf.encode('utf8')) rv = self.cast(buf.encode('utf8'))
if sys.version_info[0] < 3: if PY2:
self.assertEqual(rv, ''.join(map(chr, range(256)))) self.assertEqual(rv, ''.join(map(chr, range(256))))
else: else:
self.assertEqual(rv, bytes(range(256))) self.assertEqual(rv, bytes(range(256)))
@ -559,7 +557,7 @@ class ByteaParserTest(unittest.TestCase):
buf += ''.join('\\' + c for c in string.ascii_letters) buf += ''.join('\\' + c for c in string.ascii_letters)
buf += '\\\\' buf += '\\\\'
rv = self.cast(buf.encode('utf8')) rv = self.cast(buf.encode('utf8'))
if sys.version_info[0] < 3: if PY2:
tgt = ''.join(map(chr, range(32))) \ tgt = ''.join(map(chr, range(32))) \
+ string.ascii_letters * 2 + '\\' + string.ascii_letters * 2 + '\\'
else: else:

View File

@ -15,7 +15,6 @@
# License for more details. # License for more details.
import re import re
import sys
import json import json
import uuid import uuid
import warnings import warnings
@ -25,7 +24,7 @@ from functools import wraps
from pickle import dumps, loads from pickle import dumps, loads
import unittest import unittest
from .testutils import (skip_if_no_uuid, skip_before_postgres, from .testutils import (PY2, text_type, skip_if_no_uuid, skip_before_postgres,
ConnectingTestCase, py3_raises_typeerror, slow, skip_from_python) ConnectingTestCase, py3_raises_typeerror, slow, skip_from_python)
import psycopg2 import psycopg2
@ -301,7 +300,7 @@ class HstoreTestCase(ConnectingTestCase):
ok({''.join(ab): ''.join(ab)}) ok({''.join(ab): ''.join(ab)})
self.conn.set_client_encoding('latin1') self.conn.set_client_encoding('latin1')
if sys.version_info[0] < 3: if PY2:
ab = map(chr, range(32, 127) + range(160, 255)) ab = map(chr, range(32, 127) + range(160, 255))
else: else:
ab = bytes(list(range(32, 127)) + list(range(160, 255))).decode('latin1') ab = bytes(list(range(32, 127)) + list(range(160, 255))).decode('latin1')
@ -363,7 +362,7 @@ class HstoreTestCase(ConnectingTestCase):
ds.append({''.join(ab): ''.join(ab)}) ds.append({''.join(ab): ''.join(ab)})
self.conn.set_client_encoding('latin1') self.conn.set_client_encoding('latin1')
if sys.version_info[0] < 3: if PY2:
ab = map(chr, range(32, 127) + range(160, 255)) ab = map(chr, range(32, 127) + range(160, 255))
else: else:
ab = bytes(list(range(32, 127)) + list(range(160, 255))).decode('latin1') ab = bytes(list(range(32, 127)) + list(range(160, 255))).decode('latin1')
@ -1352,14 +1351,12 @@ class RangeTestCase(unittest.TestCase):
] ]
results = [] results = []
converter = unicode if sys.version_info < (3, 0) else str
for bounds in ('()', '[]', '(]', '[)'): for bounds in ('()', '[]', '(]', '[)'):
r = Range(0, 4, bounds=bounds) r = Range(0, 4, bounds=bounds)
results.append(converter(r)) results.append(text_type(r))
r = Range(empty=True) r = Range(empty=True)
results.append(converter(r)) results.append(text_type(r))
self.assertEqual(results, expected) self.assertEqual(results, expected)
def test_str_datetime(self): def test_str_datetime(self):
@ -1367,13 +1364,11 @@ class RangeTestCase(unittest.TestCase):
Date-Time ranges should return a human-readable string as well on Date-Time ranges should return a human-readable string as well on
string conversion. string conversion.
''' '''
converter = unicode if sys.version_info < (3, 0) else str
tz = FixedOffsetTimezone(-5 * 60, "EST") tz = FixedOffsetTimezone(-5 * 60, "EST")
r = DateTimeTZRange(datetime(2010, 1, 1, tzinfo=tz), r = DateTimeTZRange(datetime(2010, 1, 1, tzinfo=tz),
datetime(2011, 1, 1, tzinfo=tz)) datetime(2011, 1, 1, tzinfo=tz))
expected = u'[2010-01-01 00:00:00-05:00, 2011-01-01 00:00:00-05:00)' expected = u'[2010-01-01 00:00:00-05:00, 2011-01-01 00:00:00-05:00)'
result = converter(r) result = text_type(r)
self.assertEqual(result, expected) self.assertEqual(result, expected)

View File

@ -36,13 +36,13 @@ from ctypes.util import find_library
import psycopg2 import psycopg2
import psycopg2.errors import psycopg2.errors
import psycopg2.extensions import psycopg2.extensions
from psycopg2.compat import text_type from psycopg2.compat import PY2, PY3, text_type
from .testconfig import green, dsn, repl_dsn from .testconfig import green, dsn, repl_dsn
# Python 2/3 compatibility # Python 2/3 compatibility
if sys.version_info[0] == 2: if PY2:
# Python 2 # Python 2
from StringIO import StringIO from StringIO import StringIO
TextIOBase = object TextIOBase = object
@ -411,7 +411,7 @@ class py3_raises_typeerror(object):
pass pass
def __exit__(self, type, exc, tb): def __exit__(self, type, exc, tb):
if sys.version_info[0] >= 3: if PY3:
assert type is TypeError assert type is TypeError
return True return True