From dfb301b42be462f5a4f66b12a401753ab441543c Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Sat, 16 Mar 2019 17:15:16 +0000 Subject: [PATCH] Use PY2, PY3 for conditional code instead of sys.version_info --- lib/_json.py | 4 ++-- lib/extras.py | 13 ++++++------- lib/sql.py | 5 ++--- tests/dbapi20.py | 8 ++------ tests/test_connection.py | 4 ++-- tests/test_copy.py | 8 ++++---- tests/test_quote.py | 17 ++++++++--------- tests/test_types_basic.py | 26 ++++++++++++-------------- tests/test_types_extras.py | 17 ++++++----------- tests/testutils.py | 6 +++--- 10 files changed, 47 insertions(+), 61 deletions(-) diff --git a/lib/_json.py b/lib/_json.py index 79af8562..880084d7 100644 --- a/lib/_json.py +++ b/lib/_json.py @@ -28,10 +28,10 @@ extensions importing register_json from extras. # License for more details. import json -import sys from psycopg2._psycopg import ISQLQuote, QuotedString from psycopg2._psycopg import new_type, new_array_type, register_type +from psycopg2.compat import PY2 # oids from PostgreSQL 9.2 @@ -81,7 +81,7 @@ class Json(object): qs.prepare(self._conn) return qs.getquoted() - if sys.version_info < (3,): + if PY2: def __str__(self): return self.getquoted() else: diff --git a/lib/extras.py b/lib/extras.py index 0f471a19..ad31e8ed 100644 --- a/lib/extras.py +++ b/lib/extras.py @@ -26,7 +26,6 @@ and classes until a better place in the distribution is found. # License for more details. import os as _os -import sys as _sys import time as _time import re as _re from collections import namedtuple, OrderedDict @@ -38,7 +37,7 @@ from psycopg2 import extensions as _ext from .extensions import cursor as _cursor from .extensions import connection as _connection from .extensions import adapt as _A, quote_ident -from .compat import lru_cache +from .compat import PY2, PY3, lru_cache from psycopg2._psycopg import ( # noqa REPLICATION_PHYSICAL, REPLICATION_LOGICAL, @@ -203,7 +202,7 @@ class DictRow(list): self[:] = data[0] self._index = data[1] - if _sys.version_info[0] < 3: + if PY2: iterkeys = keys itervalues = values iteritems = items @@ -291,7 +290,7 @@ class RealDictRow(dict): def items(self): return ((k, self[k]) for k in self._column_mapping) - if _sys.version_info[0] < 3: + if PY2: iterkeys = keys itervalues = values iteritems = items @@ -438,7 +437,7 @@ class LoggingConnection(_connection): def _logtofile(self, msg, curs): msg = self.filter(msg, curs) if msg: - if _sys.version_info[0] >= 3 and isinstance(msg, bytes): + if PY3 and isinstance(msg, bytes): msg = msg.decode(_ext.encodings[self.encoding], 'replace') self._logobj.write(msg + _os.linesep) @@ -492,7 +491,7 @@ class MinTimeLoggingConnection(LoggingConnection): def filter(self, msg, curs): t = (_time.time() - curs.timestamp) * 1000 if t > self._mintime: - if _sys.version_info[0] >= 3 and isinstance(msg, bytes): + if PY3 and isinstance(msg, bytes): msg = msg.decode(_ext.encodings[self.encoding], 'replace') return msg + _os.linesep + " (execution time: %d ms)" % t @@ -992,7 +991,7 @@ def register_hstore(conn_or_curs, globally=False, unicode=False, array_oid = tuple([x for x in array_oid if x]) # create and register the typecaster - if _sys.version_info[0] < 3 and unicode: + if PY2 and unicode: cast = HstoreAdapter.parse_unicode else: cast = HstoreAdapter.parse diff --git a/lib/sql.py b/lib/sql.py index f7d2c7f3..93aa8c3e 100644 --- a/lib/sql.py +++ b/lib/sql.py @@ -23,11 +23,10 @@ # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. -import sys import string from psycopg2 import extensions as ext -from psycopg2.compat import string_types +from psycopg2.compat import PY3, string_types _formatter = string.Formatter() @@ -392,7 +391,7 @@ class Literal(Composable): a.prepare(conn) rv = a.getquoted() - if sys.version_info[0] >= 3 and isinstance(rv, bytes): + if PY3 and isinstance(rv, bytes): rv = rv.decode(ext.encodings[conn.encoding]) return rv diff --git a/tests/dbapi20.py b/tests/dbapi20.py index 212d0544..fe89bb0e 100644 --- a/tests/dbapi20.py +++ b/tests/dbapi20.py @@ -71,10 +71,6 @@ import sys # nothing # - Fix bugs in test_setoutputsize_basic and test_setinputsizes # -def str2bytes(sval): - if sys.version_info < (3,0) and isinstance(sval, str): - sval = sval.decode("latin1") - return sval.encode("latin1") class DatabaseAPI20Test(unittest.TestCase): ''' Test a database self.driver for DB API 2.0 compatibility. @@ -842,8 +838,8 @@ class DatabaseAPI20Test(unittest.TestCase): # self.assertEqual(str(t1),str(t2)) def test_Binary(self): - b = self.driver.Binary(str2bytes('Something')) - b = self.driver.Binary(str2bytes('')) + b = self.driver.Binary(b'Something') + b = self.driver.Binary(b'') def test_STRING(self): self.failUnless(hasattr(self.driver,'STRING'), diff --git a/tests/test_connection.py b/tests/test_connection.py index 0ad7d37e..77f3b29d 100755 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -39,7 +39,7 @@ import psycopg2.extras from psycopg2 import extensions as ext from .testutils import ( - unittest, skip_if_no_superuser, skip_before_postgres, + PY2, unittest, skip_if_no_superuser, skip_before_postgres, skip_after_postgres, skip_before_libpq, skip_after_libpq, ConnectingTestCase, skip_if_tpc_disabled, skip_if_windows, slow) @@ -404,7 +404,7 @@ class ParseDsnTestCase(ConnectingTestCase): def test_unicode_value(self): snowman = u"\u2603" d = ext.parse_dsn('dbname=' + snowman) - if sys.version_info[0] < 3: + if PY2: self.assertEqual(d['dbname'], snowman.encode('utf8')) else: self.assertEqual(d['dbname'], snowman) diff --git a/tests/test_copy.py b/tests/test_copy.py index 05513f5a..b0490f54 100755 --- a/tests/test_copy.py +++ b/tests/test_copy.py @@ -32,7 +32,7 @@ from subprocess import Popen, PIPE import psycopg2 import psycopg2.extensions -from .testutils import skip_copy_if_green, TextIOBase +from .testutils import skip_copy_if_green, PY2, TextIOBase from .testconfig import dsn @@ -130,7 +130,7 @@ class CopyTests(ConnectingTestCase): self.conn.set_client_encoding('latin1') self._create_temp_table() # the above call closed the xn - if sys.version_info[0] < 3: + if PY2: abin = ''.join(map(chr, range(32, 127) + range(160, 256))) about = abin.decode('latin1').replace('\\', '\\\\') @@ -152,7 +152,7 @@ class CopyTests(ConnectingTestCase): self.conn.set_client_encoding('latin1') self._create_temp_table() # the above call closed the xn - if sys.version_info[0] < 3: + if PY2: abin = ''.join(map(chr, range(32, 127) + range(160, 255))) about = abin.replace('\\', '\\\\') else: @@ -173,7 +173,7 @@ class CopyTests(ConnectingTestCase): self.conn.set_client_encoding('latin1') self._create_temp_table() # the above call closed the xn - if sys.version_info[0] < 3: + if PY2: abin = ''.join(map(chr, range(32, 127) + range(160, 256))) abin = abin.decode('latin1') about = abin.replace('\\', '\\\\') diff --git a/tests/test_quote.py b/tests/test_quote.py index fe0b9cd8..cbba08c9 100755 --- a/tests/test_quote.py +++ b/tests/test_quote.py @@ -22,10 +22,9 @@ # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. -import sys from . import testutils import unittest -from .testutils import ConnectingTestCase, unichr +from .testutils import ConnectingTestCase, unichr, PY2 import psycopg2 import psycopg2.extensions @@ -79,14 +78,14 @@ class QuotingTestCase(ConnectingTestCase): data = b"""some data with \000\013 binary stuff into, 'quotes' and \\ a backslash too. """ - if sys.version_info[0] < 3: + if PY2: data += "".join(map(chr, range(256))) else: data += bytes(list(range(256))) curs = self.conn.cursor() curs.execute("SELECT %s::bytea;", (psycopg2.Binary(data),)) - if sys.version_info[0] < 3: + if PY2: res = str(curs.fetchone()[0]) else: res = curs.fetchone()[0].tobytes() @@ -124,7 +123,7 @@ class QuotingTestCase(ConnectingTestCase): def test_latin1(self): self.conn.set_client_encoding('LATIN1') curs = self.conn.cursor() - if sys.version_info[0] < 3: + if PY2: data = ''.join(map(chr, range(32, 127) + range(160, 256))) else: data = bytes(list(range(32, 127)) @@ -137,7 +136,7 @@ class QuotingTestCase(ConnectingTestCase): self.assert_(not self.conn.notices) # as unicode - if sys.version_info[0] < 3: + if PY2: psycopg2.extensions.register_type(psycopg2.extensions.UNICODE, self.conn) data = data.decode('latin1') @@ -149,7 +148,7 @@ class QuotingTestCase(ConnectingTestCase): def test_koi8(self): self.conn.set_client_encoding('KOI8') curs = self.conn.cursor() - if sys.version_info[0] < 3: + if PY2: data = ''.join(map(chr, range(32, 127) + range(128, 256))) else: data = bytes(list(range(32, 127)) @@ -162,7 +161,7 @@ class QuotingTestCase(ConnectingTestCase): self.assert_(not self.conn.notices) # as unicode - if sys.version_info[0] < 3: + if PY2: psycopg2.extensions.register_type(psycopg2.extensions.UNICODE, self.conn) data = data.decode('koi8_r') @@ -202,7 +201,7 @@ class TestQuotedIdentifier(ConnectingTestCase): def test_unicode_ident(self): snowman = u"\u2603" quoted = '"' + snowman + '"' - if sys.version_info[0] < 3: + if PY2: self.assertEqual(quote_ident(snowman, self.conn), quoted.encode('utf8')) else: self.assertEqual(quote_ident(snowman, self.conn), quoted) diff --git a/tests/test_types_basic.py b/tests/test_types_basic.py index 13d733fc..d0c0c919 100755 --- a/tests/test_types_basic.py +++ b/tests/test_types_basic.py @@ -28,13 +28,11 @@ import decimal import datetime import platform -import sys from . import testutils import unittest -from .testutils import ConnectingTestCase, long +from .testutils import PY2, long, text_type, ConnectingTestCase import psycopg2 -from psycopg2.compat import text_type from psycopg2.extensions import AsIs, adapt, register_adapter @@ -110,7 +108,7 @@ class TypesBasicTests(ConnectingTestCase): self.failUnless(str(s) == "-inf", "wrong float quoting: " + str(s)) def testBinary(self): - if sys.version_info[0] < 3: + if PY2: s = ''.join([chr(x) for x in range(256)]) b = psycopg2.Binary(s) buf = self.execute("SELECT %s::bytea AS foo", (b,)) @@ -128,7 +126,7 @@ class TypesBasicTests(ConnectingTestCase): def testBinaryEmptyString(self): # test to make sure an empty Binary is converted to an empty string - if sys.version_info[0] < 3: + if PY2: b = psycopg2.Binary('') self.assertEqual(str(b), "''::bytea") else: @@ -138,7 +136,7 @@ class TypesBasicTests(ConnectingTestCase): def testBinaryRoundTrip(self): # test to make sure buffers returned by psycopg2 are # understood by execute: - if sys.version_info[0] < 3: + if PY2: s = ''.join([chr(x) for x in range(256)]) buf = self.execute("SELECT %s::bytea AS foo", (psycopg2.Binary(s),)) buf2 = self.execute("SELECT %s::bytea AS foo", (buf,)) @@ -328,7 +326,7 @@ class TypesBasicTests(ConnectingTestCase): o1 = bytearray(range(256)) o2 = self.execute("select %s;", (o1,)) - if sys.version_info[0] < 3: + if PY2: self.assertEqual(buffer, type(o2)) else: self.assertEqual(memoryview, type(o2)) @@ -342,7 +340,7 @@ class TypesBasicTests(ConnectingTestCase): o2 = self.execute("select %s;", (o1,)) self.assertEqual(len(o2), 0) - if sys.version_info[0] < 3: + if PY2: self.assertEqual(buffer, type(o2)) else: self.assertEqual(memoryview, type(o2)) @@ -350,7 +348,7 @@ class TypesBasicTests(ConnectingTestCase): def testAdaptMemoryview(self): o1 = memoryview(bytearray(range(256))) o2 = self.execute("select %s;", (o1,)) - if sys.version_info[0] < 3: + if PY2: self.assertEqual(buffer, type(o2)) else: self.assertEqual(memoryview, type(o2)) @@ -358,7 +356,7 @@ class TypesBasicTests(ConnectingTestCase): # Test with an empty buffer o1 = memoryview(bytearray([])) o2 = self.execute("select %s;", (o1,)) - if sys.version_info[0] < 3: + if PY2: self.assertEqual(buffer, type(o2)) else: self.assertEqual(memoryview, type(o2)) @@ -513,7 +511,7 @@ class ByteaParserTest(unittest.TestCase): if rv is None: return None - if sys.version_info[0] < 3: + if PY2: return str(rv) else: return rv.tobytes() @@ -537,7 +535,7 @@ class ByteaParserTest(unittest.TestCase): buf = buf.upper() buf = '\\x' + buf rv = self.cast(buf.encode('utf8')) - if sys.version_info[0] < 3: + if PY2: self.assertEqual(rv, ''.join(map(chr, range(256)))) else: self.assertEqual(rv, bytes(range(256))) @@ -548,7 +546,7 @@ class ByteaParserTest(unittest.TestCase): def test_full_escaped_octal(self): buf = ''.join(("\\%03o" % i) for i in range(256)) rv = self.cast(buf.encode('utf8')) - if sys.version_info[0] < 3: + if PY2: self.assertEqual(rv, ''.join(map(chr, range(256)))) else: self.assertEqual(rv, bytes(range(256))) @@ -559,7 +557,7 @@ class ByteaParserTest(unittest.TestCase): buf += ''.join('\\' + c for c in string.ascii_letters) buf += '\\\\' rv = self.cast(buf.encode('utf8')) - if sys.version_info[0] < 3: + if PY2: tgt = ''.join(map(chr, range(32))) \ + string.ascii_letters * 2 + '\\' else: diff --git a/tests/test_types_extras.py b/tests/test_types_extras.py index 66d4ddf0..2b24ce25 100755 --- a/tests/test_types_extras.py +++ b/tests/test_types_extras.py @@ -15,7 +15,6 @@ # License for more details. import re -import sys import json import uuid import warnings @@ -25,7 +24,7 @@ from functools import wraps from pickle import dumps, loads import unittest -from .testutils import (skip_if_no_uuid, skip_before_postgres, +from .testutils import (PY2, text_type, skip_if_no_uuid, skip_before_postgres, ConnectingTestCase, py3_raises_typeerror, slow, skip_from_python) import psycopg2 @@ -301,7 +300,7 @@ class HstoreTestCase(ConnectingTestCase): ok({''.join(ab): ''.join(ab)}) self.conn.set_client_encoding('latin1') - if sys.version_info[0] < 3: + if PY2: ab = map(chr, range(32, 127) + range(160, 255)) else: ab = bytes(list(range(32, 127)) + list(range(160, 255))).decode('latin1') @@ -363,7 +362,7 @@ class HstoreTestCase(ConnectingTestCase): ds.append({''.join(ab): ''.join(ab)}) self.conn.set_client_encoding('latin1') - if sys.version_info[0] < 3: + if PY2: ab = map(chr, range(32, 127) + range(160, 255)) else: ab = bytes(list(range(32, 127)) + list(range(160, 255))).decode('latin1') @@ -1352,14 +1351,12 @@ class RangeTestCase(unittest.TestCase): ] results = [] - converter = unicode if sys.version_info < (3, 0) else str - for bounds in ('()', '[]', '(]', '[)'): r = Range(0, 4, bounds=bounds) - results.append(converter(r)) + results.append(text_type(r)) r = Range(empty=True) - results.append(converter(r)) + results.append(text_type(r)) self.assertEqual(results, expected) def test_str_datetime(self): @@ -1367,13 +1364,11 @@ class RangeTestCase(unittest.TestCase): Date-Time ranges should return a human-readable string as well on string conversion. ''' - - converter = unicode if sys.version_info < (3, 0) else str tz = FixedOffsetTimezone(-5 * 60, "EST") r = DateTimeTZRange(datetime(2010, 1, 1, tzinfo=tz), datetime(2011, 1, 1, tzinfo=tz)) expected = u'[2010-01-01 00:00:00-05:00, 2011-01-01 00:00:00-05:00)' - result = converter(r) + result = text_type(r) self.assertEqual(result, expected) diff --git a/tests/testutils.py b/tests/testutils.py index 959b3dda..843bfdfb 100644 --- a/tests/testutils.py +++ b/tests/testutils.py @@ -36,13 +36,13 @@ from ctypes.util import find_library import psycopg2 import psycopg2.errors import psycopg2.extensions -from psycopg2.compat import text_type +from psycopg2.compat import PY2, PY3, text_type from .testconfig import green, dsn, repl_dsn # Python 2/3 compatibility -if sys.version_info[0] == 2: +if PY2: # Python 2 from StringIO import StringIO TextIOBase = object @@ -411,7 +411,7 @@ class py3_raises_typeerror(object): pass def __exit__(self, type, exc, tb): - if sys.version_info[0] >= 3: + if PY3: assert type is TypeError return True