Dropped use of b() "macro" and 2to3 fixer

Just use the b"" strings syntax supported from python 2.6.
This commit is contained in:
Daniele Varrazzo 2016-08-15 01:55:57 +01:00
parent 3b41c3a6f3
commit 78649f8e90
10 changed files with 84 additions and 124 deletions

View File

@ -27,7 +27,7 @@
import re import re
from psycopg2._psycopg import ProgrammingError, InterfaceError from psycopg2._psycopg import ProgrammingError, InterfaceError
from psycopg2.extensions import ISQLQuote, adapt, register_adapter, b from psycopg2.extensions import ISQLQuote, adapt, register_adapter
from psycopg2.extensions import new_type, new_array_type, register_type from psycopg2.extensions import new_type, new_array_type, register_type
class Range(object): class Range(object):
@ -240,7 +240,7 @@ class RangeAdapter(object):
r = self.adapted r = self.adapted
if r.isempty: if r.isempty:
return b("'empty'::" + self.name) return b"'empty'::" + self.name.encode('utf8')
if r.lower is not None: if r.lower is not None:
a = adapt(r.lower) a = adapt(r.lower)
@ -248,7 +248,7 @@ class RangeAdapter(object):
a.prepare(self._conn) a.prepare(self._conn)
lower = a.getquoted() lower = a.getquoted()
else: else:
lower = b('NULL') lower = b'NULL'
if r.upper is not None: if r.upper is not None:
a = adapt(r.upper) a = adapt(r.upper)
@ -256,10 +256,10 @@ class RangeAdapter(object):
a.prepare(self._conn) a.prepare(self._conn)
upper = a.getquoted() upper = a.getquoted()
else: else:
upper = b('NULL') upper = b'NULL'
return b(self.name + '(') + lower + b(', ') + upper \ return self.name.encode('utf8') + b'(' + lower + b', ' + upper \
+ b(", '%s')" % r._bounds) + b", '" + r._bounds.encode('utf8') + b"')"
class RangeCaster(object): class RangeCaster(object):
@ -459,7 +459,7 @@ class NumberRangeAdapter(RangeAdapter):
def getquoted(self): def getquoted(self):
r = self.adapted r = self.adapted
if r.isempty: if r.isempty:
return b("'empty'") return b"'empty'"
if not r.lower_inf: if not r.lower_inf:
# not exactly: we are relying that none of these object is really # not exactly: we are relying that none of these object is really

View File

@ -103,15 +103,6 @@ TRANSACTION_STATUS_INERROR = 3
TRANSACTION_STATUS_UNKNOWN = 4 TRANSACTION_STATUS_UNKNOWN = 4
# Return bytes from a string
if _sys.version_info[0] < 3:
def b(s):
return s
else:
def b(s):
return s.encode('utf8')
def register_adapter(typ, callable): def register_adapter(typ, callable):
"""Register 'callable' as an ISQLQuote adapter for type 'typ'.""" """Register 'callable' as an ISQLQuote adapter for type 'typ'."""
adapters[(typ, ISQLQuote)] = callable adapters[(typ, ISQLQuote)] = callable
@ -136,7 +127,7 @@ class SQL_IN(object):
if hasattr(obj, 'prepare'): if hasattr(obj, 'prepare'):
obj.prepare(self._conn) obj.prepare(self._conn)
qobjs = [o.getquoted() for o in pobjs] qobjs = [o.getquoted() for o in pobjs]
return b('(') + b(', ').join(qobjs) + b(')') return b'(' + b', '.join(qobjs) + b')'
def __str__(self): def __str__(self):
return str(self.getquoted()) return str(self.getquoted())
@ -151,7 +142,7 @@ class NoneAdapter(object):
def __init__(self, obj): def __init__(self, obj):
pass pass
def getquoted(self, _null=b("NULL")): def getquoted(self, _null=b"NULL"):
return _null return _null

View File

@ -40,7 +40,6 @@ from psycopg2 import extensions as _ext
from psycopg2.extensions import cursor as _cursor from psycopg2.extensions import cursor as _cursor
from psycopg2.extensions import connection as _connection from psycopg2.extensions import connection as _connection
from psycopg2.extensions import adapt as _A, quote_ident from psycopg2.extensions import adapt as _A, quote_ident
from psycopg2.extensions import b
from psycopg2._psycopg import REPLICATION_PHYSICAL, REPLICATION_LOGICAL from psycopg2._psycopg import REPLICATION_PHYSICAL, REPLICATION_LOGICAL
from psycopg2._psycopg import ReplicationConnection as _replicationConnection from psycopg2._psycopg import ReplicationConnection as _replicationConnection
from psycopg2._psycopg import ReplicationCursor as _replicationCursor from psycopg2._psycopg import ReplicationCursor as _replicationCursor
@ -575,7 +574,7 @@ class UUID_adapter(object):
return self return self
def getquoted(self): def getquoted(self):
return b("'%s'::uuid" % self._uuid) return ("'%s'::uuid" % self._uuid).encode('utf8')
def __str__(self): def __str__(self):
return "'%s'::uuid" % self._uuid return "'%s'::uuid" % self._uuid
@ -635,7 +634,7 @@ class Inet(object):
obj = _A(self.addr) obj = _A(self.addr)
if hasattr(obj, 'prepare'): if hasattr(obj, 'prepare'):
obj.prepare(self._conn) obj.prepare(self._conn)
return obj.getquoted() + b("::inet") return obj.getquoted() + b"::inet"
def __conform__(self, proto): def __conform__(self, proto):
if proto is _ext.ISQLQuote: if proto is _ext.ISQLQuote:
@ -742,7 +741,7 @@ class HstoreAdapter(object):
def _getquoted_8(self): def _getquoted_8(self):
"""Use the operators available in PG pre-9.0.""" """Use the operators available in PG pre-9.0."""
if not self.wrapped: if not self.wrapped:
return b("''::hstore") return b"''::hstore"
adapt = _ext.adapt adapt = _ext.adapt
rv = [] rv = []
@ -756,23 +755,23 @@ class HstoreAdapter(object):
v.prepare(self.conn) v.prepare(self.conn)
v = v.getquoted() v = v.getquoted()
else: else:
v = b('NULL') v = b'NULL'
# XXX this b'ing is painfully inefficient! # XXX this b'ing is painfully inefficient!
rv.append(b("(") + k + b(" => ") + v + b(")")) rv.append(b"(" + k + b" => " + v + b")")
return b("(") + b('||').join(rv) + b(")") return b"(" + b'||'.join(rv) + b")"
def _getquoted_9(self): def _getquoted_9(self):
"""Use the hstore(text[], text[]) function.""" """Use the hstore(text[], text[]) function."""
if not self.wrapped: if not self.wrapped:
return b("''::hstore") return b"''::hstore"
k = _ext.adapt(self.wrapped.keys()) k = _ext.adapt(self.wrapped.keys())
k.prepare(self.conn) k.prepare(self.conn)
v = _ext.adapt(self.wrapped.values()) v = _ext.adapt(self.wrapped.values())
v.prepare(self.conn) v.prepare(self.conn)
return b("hstore(") + k.getquoted() + b(", ") + v.getquoted() + b(")") return b"hstore(" + k.getquoted() + b", " + v.getquoted() + b")"
getquoted = _getquoted_9 getquoted = _getquoted_9

View File

@ -1,20 +0,0 @@
"""Fixer to change b('string') into b'string'."""
# Author: Daniele Varrazzo
import token
from lib2to3 import fixer_base
from lib2to3.pytree import Leaf
class FixB(fixer_base.BaseFix):
PATTERN = """
power< wrapper='b' trailer< '(' arg=[any] ')' > rest=any* >
"""
def transform(self, node, results):
arg = results['arg']
wrapper = results["wrapper"]
if len(arg) == 1 and arg[0].type == token.STRING:
b = Leaf(token.STRING, 'b' + arg[0].value, prefix=wrapper.prefix)
node.children = [ b ] + results['rest']

View File

@ -75,10 +75,6 @@ else:
# workaround subclass for ticket #153 # workaround subclass for ticket #153
pass pass
# Configure distutils to run our custom 2to3 fixers as well
from lib2to3.refactor import get_fixers_from_package
build_py.fixer_names = get_fixers_from_package('lib2to3.fixes') \
+ [ 'fix_b' ]
sys.path.insert(0, 'scripts') sys.path.insert(0, 'scripts')
try: try:

View File

@ -26,7 +26,6 @@ import time
import pickle import pickle
import psycopg2 import psycopg2
import psycopg2.extensions import psycopg2.extensions
from psycopg2.extensions import b
from testutils import unittest, ConnectingTestCase, skip_before_postgres from testutils import unittest, ConnectingTestCase, skip_before_postgres
from testutils import skip_if_no_namedtuple, skip_if_no_getrefcount from testutils import skip_if_no_namedtuple, skip_if_no_getrefcount
@ -63,28 +62,28 @@ class CursorTests(ConnectingTestCase):
# unicode query containing only ascii data # unicode query containing only ascii data
cur.execute(u"SELECT 'foo';") cur.execute(u"SELECT 'foo';")
self.assertEqual('foo', cur.fetchone()[0]) self.assertEqual('foo', cur.fetchone()[0])
self.assertEqual(b("SELECT 'foo';"), cur.mogrify(u"SELECT 'foo';")) self.assertEqual(b"SELECT 'foo';", cur.mogrify(u"SELECT 'foo';"))
conn.set_client_encoding('UTF8') conn.set_client_encoding('UTF8')
snowman = u"\u2603" snowman = u"\u2603"
# unicode query with non-ascii data # unicode query with non-ascii data
cur.execute(u"SELECT '%s';" % snowman) cur.execute(u"SELECT '%s';" % snowman)
self.assertEqual(snowman.encode('utf8'), b(cur.fetchone()[0])) self.assertEqual(snowman.encode('utf8'), cur.fetchone()[0].encode('utf8'))
self.assertEqual(("SELECT '%s';" % snowman).encode('utf8'), self.assertEqual(("SELECT '%s';" % snowman).encode('utf8'),
cur.mogrify(u"SELECT '%s';" % snowman).replace(b("E'"), b("'"))) cur.mogrify(u"SELECT '%s';" % snowman).replace(b"E'", b"'"))
# unicode args # unicode args
cur.execute("SELECT %s;", (snowman,)) cur.execute("SELECT %s;", (snowman,))
self.assertEqual(snowman.encode("utf-8"), b(cur.fetchone()[0])) self.assertEqual(snowman.encode("utf-8"), cur.fetchone()[0].encode('utf8'))
self.assertEqual(("SELECT '%s';" % snowman).encode('utf8'), self.assertEqual(("SELECT '%s';" % snowman).encode('utf8'),
cur.mogrify("SELECT %s;", (snowman,)).replace(b("E'"), b("'"))) cur.mogrify("SELECT %s;", (snowman,)).replace(b"E'", b"'"))
# unicode query and args # unicode query and args
cur.execute(u"SELECT %s;", (snowman,)) cur.execute(u"SELECT %s;", (snowman,))
self.assertEqual(snowman.encode("utf-8"), b(cur.fetchone()[0])) self.assertEqual(snowman.encode("utf-8"), cur.fetchone()[0].encode('utf8'))
self.assertEqual(("SELECT '%s';" % snowman).encode('utf8'), self.assertEqual(("SELECT '%s';" % snowman).encode('utf8'),
cur.mogrify(u"SELECT %s;", (snowman,)).replace(b("E'"), b("'"))) cur.mogrify(u"SELECT %s;", (snowman,)).replace(b"E'", b"'"))
def test_mogrify_decimal_explodes(self): def test_mogrify_decimal_explodes(self):
# issue #7: explodes on windows with python 2.5 and psycopg 2.2.2 # issue #7: explodes on windows with python 2.5 and psycopg 2.2.2
@ -95,7 +94,7 @@ class CursorTests(ConnectingTestCase):
conn = self.conn conn = self.conn
cur = conn.cursor() cur = conn.cursor()
self.assertEqual(b('SELECT 10.3;'), self.assertEqual(b'SELECT 10.3;',
cur.mogrify("SELECT %s;", (Decimal("10.3"),))) cur.mogrify("SELECT %s;", (Decimal("10.3"),)))
@skip_if_no_getrefcount @skip_if_no_getrefcount

View File

@ -29,7 +29,6 @@ from functools import wraps
import psycopg2 import psycopg2
import psycopg2.extensions import psycopg2.extensions
from psycopg2.extensions import b
from testutils import unittest, decorate_all_tests, skip_if_tpc_disabled from testutils import unittest, decorate_all_tests, skip_if_tpc_disabled
from testutils import ConnectingTestCase, skip_if_green from testutils import ConnectingTestCase, skip_if_green
@ -99,7 +98,7 @@ class LargeObjectTests(LargeObjectTestCase):
lo = self.conn.lobject() lo = self.conn.lobject()
lo2 = self.conn.lobject(lo.oid, "w") lo2 = self.conn.lobject(lo.oid, "w")
self.assertEqual(lo2.mode[0], "w") self.assertEqual(lo2.mode[0], "w")
lo2.write(b("some data")) lo2.write(b"some data")
def test_open_mode_n(self): def test_open_mode_n(self):
# Openning an object in mode "n" gives us a closed lobject. # Openning an object in mode "n" gives us a closed lobject.
@ -136,7 +135,7 @@ class LargeObjectTests(LargeObjectTestCase):
self.tmpdir = tempfile.mkdtemp() self.tmpdir = tempfile.mkdtemp()
filename = os.path.join(self.tmpdir, "data.txt") filename = os.path.join(self.tmpdir, "data.txt")
fp = open(filename, "wb") fp = open(filename, "wb")
fp.write(b("some data")) fp.write(b"some data")
fp.close() fp.close()
lo = self.conn.lobject(0, "r", 0, filename) lo = self.conn.lobject(0, "r", 0, filename)
@ -150,7 +149,7 @@ class LargeObjectTests(LargeObjectTestCase):
def test_write(self): def test_write(self):
lo = self.conn.lobject() lo = self.conn.lobject()
self.assertEqual(lo.write(b("some data")), len("some data")) self.assertEqual(lo.write(b"some data"), len("some data"))
def test_write_large(self): def test_write_large(self):
lo = self.conn.lobject() lo = self.conn.lobject()
@ -159,7 +158,7 @@ class LargeObjectTests(LargeObjectTestCase):
def test_read(self): def test_read(self):
lo = self.conn.lobject() lo = self.conn.lobject()
length = lo.write(b("some data")) length = lo.write(b"some data")
lo.close() lo.close()
lo = self.conn.lobject(lo.oid) lo = self.conn.lobject(lo.oid)
@ -170,14 +169,14 @@ class LargeObjectTests(LargeObjectTestCase):
def test_read_binary(self): def test_read_binary(self):
lo = self.conn.lobject() lo = self.conn.lobject()
length = lo.write(b("some data")) length = lo.write(b"some data")
lo.close() lo.close()
lo = self.conn.lobject(lo.oid, "rb") lo = self.conn.lobject(lo.oid, "rb")
x = lo.read(4) x = lo.read(4)
self.assertEqual(type(x), type(b(''))) self.assertEqual(type(x), type(b''))
self.assertEqual(x, b("some")) self.assertEqual(x, b"some")
self.assertEqual(lo.read(), b(" data")) self.assertEqual(lo.read(), b" data")
def test_read_text(self): def test_read_text(self):
lo = self.conn.lobject() lo = self.conn.lobject()
@ -206,7 +205,7 @@ class LargeObjectTests(LargeObjectTestCase):
def test_seek_tell(self): def test_seek_tell(self):
lo = self.conn.lobject() lo = self.conn.lobject()
length = lo.write(b("some data")) length = lo.write(b"some data")
self.assertEqual(lo.tell(), length) self.assertEqual(lo.tell(), length)
lo.close() lo.close()
lo = self.conn.lobject(lo.oid) lo = self.conn.lobject(lo.oid)
@ -236,7 +235,7 @@ class LargeObjectTests(LargeObjectTestCase):
def test_export(self): def test_export(self):
lo = self.conn.lobject() lo = self.conn.lobject()
lo.write(b("some data")) lo.write(b"some data")
self.tmpdir = tempfile.mkdtemp() self.tmpdir = tempfile.mkdtemp()
filename = os.path.join(self.tmpdir, "data.txt") filename = os.path.join(self.tmpdir, "data.txt")
@ -244,7 +243,7 @@ class LargeObjectTests(LargeObjectTestCase):
self.assertTrue(os.path.exists(filename)) self.assertTrue(os.path.exists(filename))
f = open(filename, "rb") f = open(filename, "rb")
try: try:
self.assertEqual(f.read(), b("some data")) self.assertEqual(f.read(), b"some data")
finally: finally:
f.close() f.close()
@ -256,7 +255,7 @@ class LargeObjectTests(LargeObjectTestCase):
def test_write_after_close(self): def test_write_after_close(self):
lo = self.conn.lobject() lo = self.conn.lobject()
lo.close() lo.close()
self.assertRaises(psycopg2.InterfaceError, lo.write, b("some data")) self.assertRaises(psycopg2.InterfaceError, lo.write, b"some data")
def test_read_after_close(self): def test_read_after_close(self):
lo = self.conn.lobject() lo = self.conn.lobject()
@ -281,7 +280,7 @@ class LargeObjectTests(LargeObjectTestCase):
def test_export_after_close(self): def test_export_after_close(self):
lo = self.conn.lobject() lo = self.conn.lobject()
lo.write(b("some data")) lo.write(b"some data")
lo.close() lo.close()
self.tmpdir = tempfile.mkdtemp() self.tmpdir = tempfile.mkdtemp()
@ -290,7 +289,7 @@ class LargeObjectTests(LargeObjectTestCase):
self.assertTrue(os.path.exists(filename)) self.assertTrue(os.path.exists(filename))
f = open(filename, "rb") f = open(filename, "rb")
try: try:
self.assertEqual(f.read(), b("some data")) self.assertEqual(f.read(), b"some data")
finally: finally:
f.close() f.close()
@ -307,7 +306,7 @@ class LargeObjectTests(LargeObjectTestCase):
self.lo_oid = lo.oid self.lo_oid = lo.oid
self.conn.commit() self.conn.commit()
self.assertRaises(psycopg2.ProgrammingError, lo.write, b("some data")) self.assertRaises(psycopg2.ProgrammingError, lo.write, b"some data")
def test_read_after_commit(self): def test_read_after_commit(self):
lo = self.conn.lobject() lo = self.conn.lobject()
@ -340,7 +339,7 @@ class LargeObjectTests(LargeObjectTestCase):
def test_export_after_commit(self): def test_export_after_commit(self):
lo = self.conn.lobject() lo = self.conn.lobject()
lo.write(b("some data")) lo.write(b"some data")
self.conn.commit() self.conn.commit()
self.tmpdir = tempfile.mkdtemp() self.tmpdir = tempfile.mkdtemp()
@ -349,7 +348,7 @@ class LargeObjectTests(LargeObjectTestCase):
self.assertTrue(os.path.exists(filename)) self.assertTrue(os.path.exists(filename))
f = open(filename, "rb") f = open(filename, "rb")
try: try:
self.assertEqual(f.read(), b("some data")) self.assertEqual(f.read(), b"some data")
finally: finally:
f.close() f.close()

View File

@ -28,7 +28,6 @@ from testutils import unittest, ConnectingTestCase
import psycopg2 import psycopg2
import psycopg2.extensions import psycopg2.extensions
from psycopg2.extensions import b
class QuotingTestCase(ConnectingTestCase): class QuotingTestCase(ConnectingTestCase):
@ -69,12 +68,13 @@ class QuotingTestCase(ConnectingTestCase):
with self.assertRaises(ValueError) as e: with self.assertRaises(ValueError) as e:
curs.execute("SELECT %s", (data,)) curs.execute("SELECT %s", (data,))
self.assertEquals(e.exception.message, 'A string literal cannot contain NUL (0x00) characters.') self.assertEquals(str(e.exception),
'A string literal cannot contain NUL (0x00) characters.')
def test_binary(self): def test_binary(self):
data = b("""some data with \000\013 binary data = b"""some data with \000\013 binary
stuff into, 'quotes' and \\ a backslash too. stuff into, 'quotes' and \\ a backslash too.
""") """
if sys.version_info[0] < 3: if sys.version_info[0] < 3:
data += "".join(map(chr, range(256))) data += "".join(map(chr, range(256)))
else: else:
@ -87,7 +87,7 @@ class QuotingTestCase(ConnectingTestCase):
else: else:
res = curs.fetchone()[0].tobytes() res = curs.fetchone()[0].tobytes()
if res[0] in (b('x'), ord(b('x'))) and self.conn.server_version >= 90000: if res[0] in (b'x', ord(b'x')) and self.conn.server_version >= 90000:
return self.skipTest( return self.skipTest(
"bytea broken with server >= 9.0, libpq < 9") "bytea broken with server >= 9.0, libpq < 9")
@ -199,7 +199,7 @@ class TestStringAdapter(ConnectingTestCase):
from psycopg2.extensions import adapt from psycopg2.extensions import adapt
a = adapt("hello") a = adapt("hello")
self.assertEqual(a.encoding, 'latin1') self.assertEqual(a.encoding, 'latin1')
self.assertEqual(a.getquoted(), b("'hello'")) self.assertEqual(a.getquoted(), b"'hello'")
# NOTE: we can't really test an encoding different from utf8, because # NOTE: we can't really test an encoding different from utf8, because
# when encoding without connection the libpq will use parameters from # when encoding without connection the libpq will use parameters from
@ -222,7 +222,7 @@ class TestStringAdapter(ConnectingTestCase):
a = adapt(snowman) a = adapt(snowman)
a.encoding = 'utf8' a.encoding = 'utf8'
self.assertEqual(a.encoding, 'utf8') self.assertEqual(a.encoding, 'utf8')
self.assertEqual(a.getquoted(), b("'\xe2\x98\x83'")) self.assertEqual(a.getquoted(), b"'\xe2\x98\x83'")
def test_connection_wins_anyway(self): def test_connection_wins_anyway(self):
from psycopg2.extensions import adapt from psycopg2.extensions import adapt
@ -234,7 +234,7 @@ class TestStringAdapter(ConnectingTestCase):
a.prepare(self.conn) a.prepare(self.conn)
self.assertEqual(a.encoding, 'utf_8') self.assertEqual(a.encoding, 'utf_8')
self.assertEqual(a.getquoted(), b("'\xe2\x98\x83'")) self.assertEqual(a.getquoted(), b"'\xe2\x98\x83'")
@testutils.skip_before_python(3) @testutils.skip_before_python(3)
def test_adapt_bytes(self): def test_adapt_bytes(self):
@ -242,7 +242,7 @@ class TestStringAdapter(ConnectingTestCase):
self.conn.set_client_encoding('utf8') self.conn.set_client_encoding('utf8')
a = psycopg2.extensions.QuotedString(snowman.encode('utf8')) a = psycopg2.extensions.QuotedString(snowman.encode('utf8'))
a.prepare(self.conn) a.prepare(self.conn)
self.assertEqual(a.getquoted(), b("'\xe2\x98\x83'")) self.assertEqual(a.getquoted(), b"'\xe2\x98\x83'")
def test_suite(): def test_suite():

View File

@ -30,7 +30,6 @@ import testutils
from testutils import unittest, ConnectingTestCase, decorate_all_tests from testutils import unittest, ConnectingTestCase, decorate_all_tests
import psycopg2 import psycopg2
from psycopg2.extensions import b
class TypesBasicTests(ConnectingTestCase): class TypesBasicTests(ConnectingTestCase):
@ -190,7 +189,7 @@ class TypesBasicTests(ConnectingTestCase):
ss = ['', '{', '{}}', '{' * 20 + '}' * 20] ss = ['', '{', '{}}', '{' * 20 + '}' * 20]
for s in ss: for s in ss:
self.assertRaises(psycopg2.DataError, self.assertRaises(psycopg2.DataError,
psycopg2.extensions.STRINGARRAY, b(s), curs) psycopg2.extensions.STRINGARRAY, s.encode('utf8'), curs)
@testutils.skip_before_postgres(8, 2) @testutils.skip_before_postgres(8, 2)
def testArrayOfNulls(self): def testArrayOfNulls(self):
@ -309,9 +308,9 @@ class TypesBasicTests(ConnectingTestCase):
def testByteaHexCheckFalsePositive(self): def testByteaHexCheckFalsePositive(self):
# the check \x -> x to detect bad bytea decode # the check \x -> x to detect bad bytea decode
# may be fooled if the first char is really an 'x' # may be fooled if the first char is really an 'x'
o1 = psycopg2.Binary(b('x')) o1 = psycopg2.Binary(b'x')
o2 = self.execute("SELECT %s::bytea AS foo", (o1,)) o2 = self.execute("SELECT %s::bytea AS foo", (o1,))
self.assertEqual(b('x'), o2[0]) self.assertEqual(b'x', o2[0])
def testNegNumber(self): def testNegNumber(self):
d1 = self.execute("select -%s;", (decimal.Decimal('-1.0'),)) d1 = self.execute("select -%s;", (decimal.Decimal('-1.0'),))
@ -362,7 +361,7 @@ class AdaptSubclassTest(unittest.TestCase):
register_adapter(A, lambda a: AsIs("a")) register_adapter(A, lambda a: AsIs("a"))
register_adapter(B, lambda b: AsIs("b")) register_adapter(B, lambda b: AsIs("b"))
try: try:
self.assertEqual(b('b'), adapt(C()).getquoted()) self.assertEqual(b'b', adapt(C()).getquoted())
finally: finally:
del psycopg2.extensions.adapters[A, psycopg2.extensions.ISQLQuote] del psycopg2.extensions.adapters[A, psycopg2.extensions.ISQLQuote]
del psycopg2.extensions.adapters[B, psycopg2.extensions.ISQLQuote] del psycopg2.extensions.adapters[B, psycopg2.extensions.ISQLQuote]
@ -389,7 +388,7 @@ class AdaptSubclassTest(unittest.TestCase):
register_adapter(A, lambda a: AsIs("a")) register_adapter(A, lambda a: AsIs("a"))
try: try:
self.assertEqual(b("a"), adapt(B()).getquoted()) self.assertEqual(b"a", adapt(B()).getquoted())
finally: finally:
del psycopg2.extensions.adapters[A, psycopg2.extensions.ISQLQuote] del psycopg2.extensions.adapters[A, psycopg2.extensions.ISQLQuote]
@ -434,19 +433,19 @@ class ByteaParserTest(unittest.TestCase):
self.assertEqual(rv, None) self.assertEqual(rv, None)
def test_blank(self): def test_blank(self):
rv = self.cast(b('')) rv = self.cast(b'')
self.assertEqual(rv, b('')) self.assertEqual(rv, b'')
def test_blank_hex(self): def test_blank_hex(self):
# Reported as problematic in ticket #48 # Reported as problematic in ticket #48
rv = self.cast(b('\\x')) rv = self.cast(b'\\x')
self.assertEqual(rv, b('')) self.assertEqual(rv, b'')
def test_full_hex(self, upper=False): def test_full_hex(self, upper=False):
buf = ''.join(("%02x" % i) for i in range(256)) buf = ''.join(("%02x" % i) for i in range(256))
if upper: buf = buf.upper() if upper: buf = buf.upper()
buf = '\\x' + buf buf = '\\x' + buf
rv = self.cast(b(buf)) rv = self.cast(buf.encode('utf8'))
if sys.version_info[0] < 3: if sys.version_info[0] < 3:
self.assertEqual(rv, ''.join(map(chr, range(256)))) self.assertEqual(rv, ''.join(map(chr, range(256))))
else: else:
@ -457,7 +456,7 @@ class ByteaParserTest(unittest.TestCase):
def test_full_escaped_octal(self): def test_full_escaped_octal(self):
buf = ''.join(("\\%03o" % i) for i in range(256)) buf = ''.join(("\\%03o" % i) for i in range(256))
rv = self.cast(b(buf)) rv = self.cast(buf.encode('utf8'))
if sys.version_info[0] < 3: if sys.version_info[0] < 3:
self.assertEqual(rv, ''.join(map(chr, range(256)))) self.assertEqual(rv, ''.join(map(chr, range(256))))
else: else:
@ -469,7 +468,7 @@ class ByteaParserTest(unittest.TestCase):
buf += string.ascii_letters buf += string.ascii_letters
buf += ''.join('\\' + c for c in string.ascii_letters) buf += ''.join('\\' + c for c in string.ascii_letters)
buf += '\\\\' buf += '\\\\'
rv = self.cast(b(buf)) rv = self.cast(buf.encode('utf8'))
if sys.version_info[0] < 3: if sys.version_info[0] < 3:
tgt = ''.join(map(chr, range(32))) \ tgt = ''.join(map(chr, range(32))) \
+ string.ascii_letters * 2 + '\\' + string.ascii_letters * 2 + '\\'

View File

@ -29,14 +29,13 @@ from testutils import py3_raises_typeerror
import psycopg2 import psycopg2
import psycopg2.extras import psycopg2.extras
import psycopg2.extensions as ext import psycopg2.extensions as ext
from psycopg2.extensions import b
def filter_scs(conn, s): def filter_scs(conn, s):
if conn.get_parameter_status("standard_conforming_strings") == 'off': if conn.get_parameter_status("standard_conforming_strings") == 'off':
return s return s
else: else:
return s.replace(b("E'"), b("'")) return s.replace(b"E'", b"'")
class TypesExtrasTests(ConnectingTestCase): class TypesExtrasTests(ConnectingTestCase):
"""Test that all type conversions are working.""" """Test that all type conversions are working."""
@ -99,7 +98,7 @@ class TypesExtrasTests(ConnectingTestCase):
a = psycopg2.extensions.adapt(i) a = psycopg2.extensions.adapt(i)
a.prepare(self.conn) a.prepare(self.conn)
self.assertEqual( self.assertEqual(
filter_scs(self.conn, b("E'192.168.1.0/24'::inet")), filter_scs(self.conn, b"E'192.168.1.0/24'::inet"),
a.getquoted()) a.getquoted())
# adapts ok with unicode too # adapts ok with unicode too
@ -107,7 +106,7 @@ class TypesExtrasTests(ConnectingTestCase):
a = psycopg2.extensions.adapt(i) a = psycopg2.extensions.adapt(i)
a.prepare(self.conn) a.prepare(self.conn)
self.assertEqual( self.assertEqual(
filter_scs(self.conn, b("E'192.168.1.0/24'::inet")), filter_scs(self.conn, b"E'192.168.1.0/24'::inet"),
a.getquoted()) a.getquoted())
def test_adapt_fail(self): def test_adapt_fail(self):
@ -146,17 +145,17 @@ class HstoreTestCase(ConnectingTestCase):
a.prepare(self.conn) a.prepare(self.conn)
q = a.getquoted() q = a.getquoted()
self.assert_(q.startswith(b("((")), q) self.assert_(q.startswith(b"(("), q)
ii = q[1:-1].split(b("||")) ii = q[1:-1].split(b"||")
ii.sort() ii.sort()
self.assertEqual(len(ii), len(o)) self.assertEqual(len(ii), len(o))
self.assertEqual(ii[0], filter_scs(self.conn, b("(E'a' => E'1')"))) self.assertEqual(ii[0], filter_scs(self.conn, b"(E'a' => E'1')"))
self.assertEqual(ii[1], filter_scs(self.conn, b("(E'b' => E'''')"))) self.assertEqual(ii[1], filter_scs(self.conn, b"(E'b' => E'''')"))
self.assertEqual(ii[2], filter_scs(self.conn, b("(E'c' => NULL)"))) self.assertEqual(ii[2], filter_scs(self.conn, b"(E'c' => NULL)"))
if 'd' in o: if 'd' in o:
encc = u'\xe0'.encode(psycopg2.extensions.encodings[self.conn.encoding]) encc = u'\xe0'.encode(psycopg2.extensions.encodings[self.conn.encoding])
self.assertEqual(ii[3], filter_scs(self.conn, b("(E'd' => E'") + encc + b("')"))) self.assertEqual(ii[3], filter_scs(self.conn, b"(E'd' => E'" + encc + b"')"))
def test_adapt_9(self): def test_adapt_9(self):
if self.conn.server_version < 90000: if self.conn.server_version < 90000:
@ -172,11 +171,11 @@ class HstoreTestCase(ConnectingTestCase):
a.prepare(self.conn) a.prepare(self.conn)
q = a.getquoted() q = a.getquoted()
m = re.match(b(r'hstore\(ARRAY\[([^\]]+)\], ARRAY\[([^\]]+)\]\)'), q) m = re.match(br'hstore\(ARRAY\[([^\]]+)\], ARRAY\[([^\]]+)\]\)', q)
self.assert_(m, repr(q)) self.assert_(m, repr(q))
kk = m.group(1).split(b(", ")) kk = m.group(1).split(b", ")
vv = m.group(2).split(b(", ")) vv = m.group(2).split(b", ")
ii = zip(kk, vv) ii = zip(kk, vv)
ii.sort() ii.sort()
@ -184,12 +183,12 @@ class HstoreTestCase(ConnectingTestCase):
return tuple([filter_scs(self.conn, s) for s in args]) return tuple([filter_scs(self.conn, s) for s in args])
self.assertEqual(len(ii), len(o)) self.assertEqual(len(ii), len(o))
self.assertEqual(ii[0], f(b("E'a'"), b("E'1'"))) self.assertEqual(ii[0], f(b"E'a'", b"E'1'"))
self.assertEqual(ii[1], f(b("E'b'"), b("E''''"))) self.assertEqual(ii[1], f(b"E'b'", b"E''''"))
self.assertEqual(ii[2], f(b("E'c'"), b("NULL"))) self.assertEqual(ii[2], f(b"E'c'", b"NULL"))
if 'd' in o: if 'd' in o:
encc = u'\xe0'.encode(psycopg2.extensions.encodings[self.conn.encoding]) encc = u'\xe0'.encode(psycopg2.extensions.encodings[self.conn.encoding])
self.assertEqual(ii[3], f(b("E'd'"), b("E'") + encc + b("'"))) self.assertEqual(ii[3], f(b"E'd'", b"E'" + encc + b"'"))
def test_parse(self): def test_parse(self):
from psycopg2.extras import HstoreAdapter from psycopg2.extras import HstoreAdapter
@ -455,7 +454,7 @@ class AdaptTypeTestCase(ConnectingTestCase):
def test_none_in_record(self): def test_none_in_record(self):
curs = self.conn.cursor() curs = self.conn.cursor()
s = curs.mogrify("SELECT %s;", [(42, None)]) s = curs.mogrify("SELECT %s;", [(42, None)])
self.assertEqual(b("SELECT (42, NULL);"), s) self.assertEqual(b"SELECT (42, NULL);", s)
curs.execute("SELECT %s;", [(42, None)]) curs.execute("SELECT %s;", [(42, None)])
d = curs.fetchone()[0] d = curs.fetchone()[0]
self.assertEqual("(42,)", d) self.assertEqual("(42,)", d)
@ -475,7 +474,7 @@ class AdaptTypeTestCase(ConnectingTestCase):
self.assertEqual(ext.adapt(None).getquoted(), "NOPE!") self.assertEqual(ext.adapt(None).getquoted(), "NOPE!")
s = curs.mogrify("SELECT %s;", (None,)) s = curs.mogrify("SELECT %s;", (None,))
self.assertEqual(b("SELECT NULL;"), s) self.assertEqual(b"SELECT NULL;", s)
finally: finally:
ext.register_adapter(type(None), orig_adapter) ext.register_adapter(type(None), orig_adapter)
@ -892,7 +891,7 @@ class JsonTestCase(ConnectingTestCase):
obj = Decimal('123.45') obj = Decimal('123.45')
dumps = lambda obj: json.dumps(obj, cls=DecimalEncoder) dumps = lambda obj: json.dumps(obj, cls=DecimalEncoder)
self.assertEqual(curs.mogrify("%s", (Json(obj, dumps=dumps),)), self.assertEqual(curs.mogrify("%s", (Json(obj, dumps=dumps),)),
b("'123.45'")) b"'123.45'")
@skip_if_no_json_module @skip_if_no_json_module
def test_adapt_subclass(self): def test_adapt_subclass(self):
@ -910,8 +909,7 @@ class JsonTestCase(ConnectingTestCase):
curs = self.conn.cursor() curs = self.conn.cursor()
obj = Decimal('123.45') obj = Decimal('123.45')
self.assertEqual(curs.mogrify("%s", (MyJson(obj),)), self.assertEqual(curs.mogrify("%s", (MyJson(obj),)), b"'123.45'")
b("'123.45'"))
@skip_if_no_json_module @skip_if_no_json_module
def test_register_on_dict(self): def test_register_on_dict(self):
@ -921,8 +919,7 @@ class JsonTestCase(ConnectingTestCase):
try: try:
curs = self.conn.cursor() curs = self.conn.cursor()
obj = {'a': 123} obj = {'a': 123}
self.assertEqual(curs.mogrify("%s", (obj,)), self.assertEqual(curs.mogrify("%s", (obj,)), b"""'{"a": 123}'""")
b("""'{"a": 123}'"""))
finally: finally:
del psycopg2.extensions.adapters[dict, ext.ISQLQuote] del psycopg2.extensions.adapters[dict, ext.ISQLQuote]