Deal consistently with E'' quotes in tests

This commit is contained in:
Daniele Varrazzo 2017-02-06 18:43:39 +00:00
parent d23fe42873
commit 3873c6c09f
5 changed files with 41 additions and 41 deletions

View File

@ -79,20 +79,20 @@ class CursorTests(ConnectingTestCase):
# unicode query with non-ascii data # unicode query with non-ascii data
cur.execute(u"SELECT '%s';" % snowman) cur.execute(u"SELECT '%s';" % snowman)
self.assertEqual(snowman.encode('utf8'), b(cur.fetchone()[0])) self.assertEqual(snowman.encode('utf8'), b(cur.fetchone()[0]))
self.assertEqual(("SELECT '%s';" % snowman).encode('utf8'), self.assertQuotedEqual(("SELECT '%s';" % snowman).encode('utf8'),
cur.mogrify(u"SELECT '%s';" % snowman).replace(b"E'", b"'")) cur.mogrify(u"SELECT '%s';" % snowman))
# unicode args # unicode args
cur.execute("SELECT %s;", (snowman,)) cur.execute("SELECT %s;", (snowman,))
self.assertEqual(snowman.encode("utf-8"), b(cur.fetchone()[0])) self.assertEqual(snowman.encode("utf-8"), b(cur.fetchone()[0]))
self.assertEqual(("SELECT '%s';" % snowman).encode('utf8'), self.assertQuotedEqual(("SELECT '%s';" % snowman).encode('utf8'),
cur.mogrify("SELECT %s;", (snowman,)).replace(b"E'", b"'")) cur.mogrify("SELECT %s;", (snowman,)))
# unicode query and args # unicode query and args
cur.execute(u"SELECT %s;", (snowman,)) cur.execute(u"SELECT %s;", (snowman,))
self.assertEqual(snowman.encode("utf-8"), b(cur.fetchone()[0])) self.assertEqual(snowman.encode("utf-8"), b(cur.fetchone()[0]))
self.assertEqual(("SELECT '%s';" % snowman).encode('utf8'), self.assertQuotedEqual(("SELECT '%s';" % snowman).encode('utf8'),
cur.mogrify(u"SELECT %s;", (snowman,)).replace(b"E'", b"'")) cur.mogrify(u"SELECT %s;", (snowman,)))
def test_mogrify_decimal_explodes(self): def test_mogrify_decimal_explodes(self):
# issue #7: explodes on windows with python 2.5 and psycopg 2.2.2 # issue #7: explodes on windows with python 2.5 and psycopg 2.2.2

View File

@ -236,7 +236,7 @@ class TestStringAdapter(ConnectingTestCase):
a.prepare(self.conn) a.prepare(self.conn)
self.assertEqual(a.encoding, 'utf_8') self.assertEqual(a.encoding, 'utf_8')
self.assertEqual(a.getquoted(), b"'\xe2\x98\x83'") self.assertQuotedEqual(a.getquoted(), b"'\xe2\x98\x83'")
@testutils.skip_before_python(3) @testutils.skip_before_python(3)
def test_adapt_bytes(self): def test_adapt_bytes(self):
@ -244,7 +244,7 @@ class TestStringAdapter(ConnectingTestCase):
self.conn.set_client_encoding('utf8') self.conn.set_client_encoding('utf8')
a = psycopg2.extensions.QuotedString(snowman.encode('utf8')) a = psycopg2.extensions.QuotedString(snowman.encode('utf8'))
a.prepare(self.conn) a.prepare(self.conn)
self.assertEqual(a.getquoted(), b"'\xe2\x98\x83'") self.assertQuotedEqual(a.getquoted(), b"'\xe2\x98\x83'")
def test_suite(): def test_suite():

View File

@ -200,9 +200,7 @@ class LiteralTests(ConnectingTestCase):
def test_repr(self): def test_repr(self):
self.assertEqual(repr(sql.Literal("foo")), "Literal('foo')") self.assertEqual(repr(sql.Literal("foo")), "Literal('foo')")
self.assertEqual(str(sql.Literal("foo")), "Literal('foo')") self.assertEqual(str(sql.Literal("foo")), "Literal('foo')")
self.assertEqual( self.assertQuotedEqual(sql.Literal("foo").as_string(self.conn), "'foo'")
sql.Literal("foo").as_string(self.conn).replace("E'", "'"),
"'foo'")
self.assertEqual(sql.Literal(42).as_string(self.conn), "42") self.assertEqual(sql.Literal(42).as_string(self.conn), "42")
self.assertEqual( self.assertEqual(
sql.Literal(dt.date(2017, 1, 1)).as_string(self.conn), sql.Literal(dt.date(2017, 1, 1)).as_string(self.conn),
@ -302,24 +300,24 @@ class ComposedTest(ConnectingTestCase):
obj = sql.Composed([sql.Literal("foo"), sql.Identifier("b'ar")]) obj = sql.Composed([sql.Literal("foo"), sql.Identifier("b'ar")])
obj = obj.join(", ") obj = obj.join(", ")
self.assert_(isinstance(obj, sql.Composed)) self.assert_(isinstance(obj, sql.Composed))
self.assertEqual(obj.as_string(self.conn), "'foo', \"b'ar\"") self.assertQuotedEqual(obj.as_string(self.conn), "'foo', \"b'ar\"")
def test_sum(self): def test_sum(self):
obj = sql.Composed([sql.SQL("foo ")]) obj = sql.Composed([sql.SQL("foo ")])
obj = obj + sql.Literal("bar") obj = obj + sql.Literal("bar")
self.assert_(isinstance(obj, sql.Composed)) self.assert_(isinstance(obj, sql.Composed))
self.assertEqual(obj.as_string(self.conn), "foo 'bar'") self.assertQuotedEqual(obj.as_string(self.conn), "foo 'bar'")
def test_sum_inplace(self): def test_sum_inplace(self):
obj = sql.Composed([sql.SQL("foo ")]) obj = sql.Composed([sql.SQL("foo ")])
obj += sql.Literal("bar") obj += sql.Literal("bar")
self.assert_(isinstance(obj, sql.Composed)) self.assert_(isinstance(obj, sql.Composed))
self.assertEqual(obj.as_string(self.conn), "foo 'bar'") self.assertQuotedEqual(obj.as_string(self.conn), "foo 'bar'")
obj = sql.Composed([sql.SQL("foo ")]) obj = sql.Composed([sql.SQL("foo ")])
obj += sql.Composed([sql.Literal("bar")]) obj += sql.Composed([sql.Literal("bar")])
self.assert_(isinstance(obj, sql.Composed)) self.assert_(isinstance(obj, sql.Composed))
self.assertEqual(obj.as_string(self.conn), "foo 'bar'") self.assertQuotedEqual(obj.as_string(self.conn), "foo 'bar'")
def test_iter(self): def test_iter(self):
obj = sql.Composed([sql.SQL("foo"), sql.SQL('bar')]) obj = sql.Composed([sql.SQL("foo"), sql.SQL('bar')])

View File

@ -31,13 +31,6 @@ import psycopg2.extras
import psycopg2.extensions as ext import psycopg2.extensions as ext
def filter_scs(conn, s):
if conn.get_parameter_status("standard_conforming_strings") == 'off':
return s
else:
return s.replace(b"E'", b"'")
class TypesExtrasTests(ConnectingTestCase): class TypesExtrasTests(ConnectingTestCase):
"""Test that all type conversions are working.""" """Test that all type conversions are working."""
@ -105,17 +98,13 @@ class TypesExtrasTests(ConnectingTestCase):
i = Inet("192.168.1.0/24") i = Inet("192.168.1.0/24")
a = psycopg2.extensions.adapt(i) a = psycopg2.extensions.adapt(i)
a.prepare(self.conn) a.prepare(self.conn)
self.assertEqual( self.assertQuotedEqual(a.getquoted(), b"'192.168.1.0/24'::inet")
filter_scs(self.conn, b"E'192.168.1.0/24'::inet"),
a.getquoted())
# adapts ok with unicode too # adapts ok with unicode too
i = Inet(u"192.168.1.0/24") i = Inet(u"192.168.1.0/24")
a = psycopg2.extensions.adapt(i) a = psycopg2.extensions.adapt(i)
a.prepare(self.conn) a.prepare(self.conn)
self.assertEqual( self.assertQuotedEqual(a.getquoted(), b"'192.168.1.0/24'::inet")
filter_scs(self.conn, b"E'192.168.1.0/24'::inet"),
a.getquoted())
def test_adapt_fail(self): def test_adapt_fail(self):
class Foo(object): class Foo(object):
@ -160,13 +149,12 @@ class HstoreTestCase(ConnectingTestCase):
ii.sort() ii.sort()
self.assertEqual(len(ii), len(o)) self.assertEqual(len(ii), len(o))
self.assertEqual(ii[0], filter_scs(self.conn, b"(E'a' => E'1')")) self.assertQuotedEqual(ii[0], b"('a' => '1')")
self.assertEqual(ii[1], filter_scs(self.conn, b"(E'b' => E'''')")) self.assertQuotedEqual(ii[1], b"('b' => '''')")
self.assertEqual(ii[2], filter_scs(self.conn, b"(E'c' => NULL)")) self.assertQuotedEqual(ii[2], b"('c' => NULL)")
if 'd' in o: if 'd' in o:
encc = u'\xe0'.encode(psycopg2.extensions.encodings[self.conn.encoding]) encc = u'\xe0'.encode(psycopg2.extensions.encodings[self.conn.encoding])
self.assertEqual(ii[3], self.assertQuotedEqual(ii[3], b"('d' => '" + encc + b"')")
filter_scs(self.conn, b"(E'd' => E'" + encc + b"')"))
def test_adapt_9(self): def test_adapt_9(self):
if self.conn.server_version < 90000: if self.conn.server_version < 90000:
@ -190,16 +178,17 @@ class HstoreTestCase(ConnectingTestCase):
ii = zip(kk, vv) ii = zip(kk, vv)
ii.sort() ii.sort()
def f(*args):
return tuple([filter_scs(self.conn, s) for s in args])
self.assertEqual(len(ii), len(o)) self.assertEqual(len(ii), len(o))
self.assertEqual(ii[0], f(b"E'a'", b"E'1'")) self.assertQuotedEqual(ii[0][0], b"'a'")
self.assertEqual(ii[1], f(b"E'b'", b"E''''")) self.assertQuotedEqual(ii[0][1], b"'1'")
self.assertEqual(ii[2], f(b"E'c'", b"NULL")) self.assertQuotedEqual(ii[1][0], b"'b'")
self.assertQuotedEqual(ii[1][1], b"''''")
self.assertQuotedEqual(ii[2][0], b"'c'")
self.assertQuotedEqual(ii[2][1], b"NULL")
if 'd' in o: if 'd' in o:
encc = u'\xe0'.encode(psycopg2.extensions.encodings[self.conn.encoding]) encc = u'\xe0'.encode(psycopg2.extensions.encodings[self.conn.encoding])
self.assertEqual(ii[3], f(b"E'd'", b"E'" + encc + b"'")) self.assertQuotedEqual(ii[3][0], b"'d'")
self.assertQuotedEqual(ii[3][1], b"'" + encc + b"'")
def test_parse(self): def test_parse(self):
from psycopg2.extras import HstoreAdapter from psycopg2.extras import HstoreAdapter

View File

@ -24,10 +24,11 @@
# Use unittest2 if available. Otherwise mock a skip facility with warnings. # Use unittest2 if available. Otherwise mock a skip facility with warnings.
import re
import os import os
import platform
import sys import sys
import select import select
import platform
from functools import wraps from functools import wraps
from testconfig import dsn, repl_dsn from testconfig import dsn, repl_dsn
@ -107,6 +108,18 @@ class ConnectingTestCase(unittest.TestCase):
if not conn.closed: if not conn.closed:
conn.close() conn.close()
def assertQuotedEqual(self, first, second, msg=None):
"""Compare two quoted strings disregarding eventual E'' quotes"""
def f(s):
if isinstance(s, unicode):
return re.sub(r"\bE'", "'", s)
elif isinstance(first, bytes):
return re.sub(br"\bE'", b"'", s)
else:
return s
return self.assertEqual(f(first), f(second), msg)
def connect(self, **kwargs): def connect(self, **kwargs):
try: try:
self._conns self._conns