Python3.4, fix most of failing tests with python3.4+postgres9.3

This commit is contained in:
Andriy Sokolovskiy 2014-11-28 16:27:41 +02:00
parent 85ba098cd8
commit 30fa1e7d29
15 changed files with 125 additions and 69 deletions

View File

@ -332,7 +332,7 @@ class NamedTupleCursor(_cursor):
try: try:
from collections import namedtuple from collections import namedtuple
except ImportError, _exc: except ImportError as _exc:
def _make_nt(self): def _make_nt(self):
raise self._exc raise self._exc
else: else:

View File

@ -55,9 +55,9 @@ def test_suite():
import psycopg2 import psycopg2
try: try:
cnn = psycopg2.connect(dsn) cnn = psycopg2.connect(dsn)
except Exception, e: except Exception as e:
print "Failed connection to test db:", e.__class__.__name__, e print("Failed connection to test db:", e.__class__.__name__, e)
print "Please set env vars 'PSYCOPG2_TESTDB*' to valid values." print("Please set env vars 'PSYCOPG2_TESTDB*' to valid values.")
sys.exit(1) sys.exit(1)
else: else:
cnn.close() cnn.close()

View File

@ -30,7 +30,11 @@ from psycopg2 import extensions
import time import time
import select import select
import StringIO
try:
from StringIO import StringIO
except ImportError:
from io import StringIO
from testutils import ConnectingTestCase from testutils import ConnectingTestCase
@ -250,7 +254,7 @@ class AsyncTests(ConnectingTestCase):
# copy should fail # copy should fail
self.assertRaises(psycopg2.ProgrammingError, self.assertRaises(psycopg2.ProgrammingError,
cur.copy_from, cur.copy_from,
StringIO.StringIO("1\n3\n5\n\\.\n"), "table1") StringIO("1\n3\n5\n\\.\n"), "table1")
def test_lobject_while_async(self): def test_lobject_while_async(self):
# large objects should be prohibited # large objects should be prohibited
@ -453,7 +457,7 @@ class AsyncTests(ConnectingTestCase):
try: try:
cnn = psycopg2.connect('dbname=thisdatabasedoesntexist', async=True) cnn = psycopg2.connect('dbname=thisdatabasedoesntexist', async=True)
self.wait(cnn) self.wait(cnn)
except psycopg2.Error, e: except psycopg2.Error as e:
self.assertNotEqual(str(e), "asynchronous connection failed", self.assertNotEqual(str(e), "asynchronous connection failed",
"connection error reason lost") "connection error reason lost")
else: else:

View File

@ -60,7 +60,7 @@ class CancelTests(ConnectingTestCase):
conn.rollback() conn.rollback()
cur.execute("select 1") cur.execute("select 1")
self.assertEqual(cur.fetchall(), [(1, )]) self.assertEqual(cur.fetchall(), [(1, )])
except Exception, e: except Exception as e:
errors.append(e) errors.append(e)
raise raise
@ -68,7 +68,7 @@ class CancelTests(ConnectingTestCase):
cur = conn.cursor() cur = conn.cursor()
try: try:
conn.cancel() conn.cancel()
except Exception, e: except Exception as e:
errors.append(e) errors.append(e)
raise raise

View File

@ -72,10 +72,10 @@ class ConnectionTests(ConnectingTestCase):
cur = conn.cursor() cur = conn.cursor()
try: try:
cur.execute("select pg_terminate_backend(pg_backend_pid())") cur.execute("select pg_terminate_backend(pg_backend_pid())")
except psycopg2.OperationalError, e: except psycopg2.OperationalError as e:
if e.pgcode != psycopg2.errorcodes.ADMIN_SHUTDOWN: if e.pgcode != psycopg2.errorcodes.ADMIN_SHUTDOWN:
raise raise
except psycopg2.DatabaseError, e: except psycopg2.DatabaseError as e:
# curiously when disconnected in green mode we get a DatabaseError # curiously when disconnected in green mode we get a DatabaseError
# without pgcode. # without pgcode.
if e.pgcode is not None: if e.pgcode is not None:

View File

@ -26,8 +26,11 @@ import sys
import string import string
from testutils import unittest, ConnectingTestCase, decorate_all_tests from testutils import unittest, ConnectingTestCase, decorate_all_tests
from testutils import skip_if_no_iobase, skip_before_postgres from testutils import skip_if_no_iobase, skip_before_postgres
from cStringIO import StringIO try:
from itertools import cycle, izip from cStringIO import StringIO
except ImportError:
from io import StringIO
from itertools import cycle
from subprocess import Popen, PIPE from subprocess import Popen, PIPE
import psycopg2 import psycopg2
@ -38,8 +41,10 @@ from testconfig import dsn
if sys.version_info[0] < 3: if sys.version_info[0] < 3:
_base = object _base = object
from itertools import izip
else: else:
from io import TextIOBase as _base from io import TextIOBase as _base
izip = zip
class MinimalRead(_base): class MinimalRead(_base):
"""A file wrapper exposing the minimal interface to copy from.""" """A file wrapper exposing the minimal interface to copy from."""
@ -94,7 +99,7 @@ class CopyTests(ConnectingTestCase):
def test_copy_from_cols(self): def test_copy_from_cols(self):
curs = self.conn.cursor() curs = self.conn.cursor()
f = StringIO() f = StringIO()
for i in xrange(10): for i in range(10):
f.write("%s\n" % (i,)) f.write("%s\n" % (i,))
f.seek(0) f.seek(0)
@ -106,7 +111,7 @@ class CopyTests(ConnectingTestCase):
def test_copy_from_cols_err(self): def test_copy_from_cols_err(self):
curs = self.conn.cursor() curs = self.conn.cursor()
f = StringIO() f = StringIO()
for i in xrange(10): for i in range(10):
f.write("%s\n" % (i,)) f.write("%s\n" % (i,))
f.seek(0) f.seek(0)
@ -135,7 +140,7 @@ class CopyTests(ConnectingTestCase):
about = abin.decode('latin1').replace('\\', '\\\\') about = abin.decode('latin1').replace('\\', '\\\\')
else: else:
abin = bytes(range(32, 127) + range(160, 256)).decode('latin1') abin = bytes(list(range(32, 127)) + list(range(160, 256))).decode('latin1')
about = abin.replace('\\', '\\\\') about = abin.replace('\\', '\\\\')
curs = self.conn.cursor() curs = self.conn.cursor()
@ -157,7 +162,7 @@ class CopyTests(ConnectingTestCase):
abin = ''.join(map(chr, range(32, 127) + range(160, 255))) abin = ''.join(map(chr, range(32, 127) + range(160, 255)))
about = abin.replace('\\', '\\\\') about = abin.replace('\\', '\\\\')
else: else:
abin = bytes(range(32, 127) + range(160, 255)).decode('latin1') abin = bytes(list(range(32, 127)) + list(range(160, 255))).decode('latin1')
about = abin.replace('\\', '\\\\').encode('latin1') about = abin.replace('\\', '\\\\').encode('latin1')
curs = self.conn.cursor() curs = self.conn.cursor()
@ -179,9 +184,8 @@ class CopyTests(ConnectingTestCase):
abin = ''.join(map(chr, range(32, 127) + range(160, 256))) abin = ''.join(map(chr, range(32, 127) + range(160, 256)))
abin = abin.decode('latin1') abin = abin.decode('latin1')
about = abin.replace('\\', '\\\\') about = abin.replace('\\', '\\\\')
else: else:
abin = bytes(range(32, 127) + range(160, 256)).decode('latin1') abin = bytes(list(range(32, 127)) + list(range(160, 256))).decode('latin1')
about = abin.replace('\\', '\\\\') about = abin.replace('\\', '\\\\')
import io import io
@ -219,7 +223,7 @@ class CopyTests(ConnectingTestCase):
def _copy_from(self, curs, nrecs, srec, copykw): def _copy_from(self, curs, nrecs, srec, copykw):
f = StringIO() f = StringIO()
for i, c in izip(xrange(nrecs), cycle(string.ascii_letters)): for i, c in izip(range(nrecs), cycle(string.ascii_letters)):
l = c * srec l = c * srec
f.write("%s\t%s\n" % (i,l)) f.write("%s\t%s\n" % (i,l))

View File

@ -324,14 +324,22 @@ class CursorTests(ConnectingTestCase):
@skip_before_postgres(8, 2) @skip_before_postgres(8, 2)
def test_iter_named_cursor_efficient(self): def test_iter_named_cursor_efficient(self):
import sys
curs = self.conn.cursor('tmp') curs = self.conn.cursor('tmp')
# if these records are fetched in the same roundtrip their # if these records are fetched in the same roundtrip their
# timestamp will not be influenced by the pause in Python world. # timestamp will not be influenced by the pause in Python world.
curs.execute("""select clock_timestamp() from generate_series(1,2)""") curs.execute("""select clock_timestamp() from generate_series(1,2)""")
i = iter(curs) i = iter(curs)
if sys.version[0] == '2':
t1 = (i.next())[0] # the brackets work around a 2to3 bug t1 = (i.next())[0] # the brackets work around a 2to3 bug
time.sleep(0.2) time.sleep(0.2)
t2 = (i.next())[0] t2 = (i.next())[0]
else:
t1 = (next(i))[0] # the brackets work around a 2to3 bug
time.sleep(0.2)
t2 = (next(i))[0]
self.assert_((t2 - t1).microseconds * 1e-6 < 0.1, self.assert_((t2 - t1).microseconds * 1e-6 < 0.1,
"named cursor records fetched in 2 roundtrips (delta: %s)" "named cursor records fetched in 2 roundtrips (delta: %s)"
% (t2 - t1)) % (t2 - t1))

View File

@ -15,7 +15,9 @@
# License for more details. # License for more details.
import time import time
import sys
from datetime import timedelta from datetime import timedelta
import psycopg2 import psycopg2
import psycopg2.extras import psycopg2.extras
from testutils import unittest, ConnectingTestCase, skip_before_postgres from testutils import unittest, ConnectingTestCase, skip_before_postgres
@ -325,22 +327,37 @@ class NamedTupleCursorTest(ConnectingTestCase):
i = iter(curs) i = iter(curs)
self.assertEqual(curs.rownumber, 0) self.assertEqual(curs.rownumber, 0)
if sys.version[0] == '2':
t = i.next() t = i.next()
else:
t = next(i)
self.assertEqual(t.i, 1) self.assertEqual(t.i, 1)
self.assertEqual(t.s, 'foo') self.assertEqual(t.s, 'foo')
self.assertEqual(curs.rownumber, 1) self.assertEqual(curs.rownumber, 1)
self.assertEqual(curs.rowcount, 3) self.assertEqual(curs.rowcount, 3)
if sys.version[0] == '2':
t = i.next() t = i.next()
else:
t = next(i)
self.assertEqual(t.i, 2) self.assertEqual(t.i, 2)
self.assertEqual(t.s, 'bar') self.assertEqual(t.s, 'bar')
self.assertEqual(curs.rownumber, 2) self.assertEqual(curs.rownumber, 2)
self.assertEqual(curs.rowcount, 3) self.assertEqual(curs.rowcount, 3)
if sys.version[0] == '2':
t = i.next() t = i.next()
else:
t = next(i)
self.assertEqual(t.i, 3) self.assertEqual(t.i, 3)
self.assertEqual(t.s, 'baz') self.assertEqual(t.s, 'baz')
if sys.version[0] == '2':
self.assertRaises(StopIteration, i.next) self.assertRaises(StopIteration, i.next)
else:
self.assertRaises(StopIteration, i.__next__)
self.assertEqual(curs.rownumber, 3) self.assertEqual(curs.rownumber, 3)
self.assertEqual(curs.rowcount, 3) self.assertEqual(curs.rowcount, 3)
@ -426,7 +443,7 @@ class NamedTupleCursorTest(ConnectingTestCase):
recs.extend(curs.fetchmany(5)) recs.extend(curs.fetchmany(5))
recs.append(curs.fetchone()) recs.append(curs.fetchone())
recs.extend(curs.fetchall()) recs.extend(curs.fetchall())
self.assertEqual(range(10), [t.i for t in recs]) self.assertEqual(list(range(10)), [t.i for t in recs])
@skip_if_no_namedtuple @skip_if_no_namedtuple
def test_named_fetchone(self): def test_named_fetchone(self):

View File

@ -145,7 +145,7 @@ class ExceptionsTestCase(ConnectingTestCase):
cur = self.conn.cursor() cur = self.conn.cursor()
try: try:
cur.execute("select * from nonexist") cur.execute("select * from nonexist")
except psycopg2.Error, exc: except psycopg2.Error as exc:
e = exc e = exc
self.assertEqual(e.pgcode, '42P01') self.assertEqual(e.pgcode, '42P01')
@ -156,7 +156,7 @@ class ExceptionsTestCase(ConnectingTestCase):
cur = self.conn.cursor() cur = self.conn.cursor()
try: try:
cur.execute("select * from nonexist") cur.execute("select * from nonexist")
except psycopg2.Error, exc: except psycopg2.Error as exc:
e = exc e = exc
diag = e.diag diag = e.diag
@ -175,7 +175,7 @@ class ExceptionsTestCase(ConnectingTestCase):
cur = self.conn.cursor() cur = self.conn.cursor()
try: try:
cur.execute("select * from nonexist") cur.execute("select * from nonexist")
except psycopg2.Error, exc: except psycopg2.Error as exc:
e = exc e = exc
self.assertEqual(e.diag.sqlstate, '42P01') self.assertEqual(e.diag.sqlstate, '42P01')
@ -189,7 +189,7 @@ class ExceptionsTestCase(ConnectingTestCase):
cur = self.conn.cursor() cur = self.conn.cursor()
try: try:
cur.execute("select * from nonexist") cur.execute("select * from nonexist")
except psycopg2.Error, exc: except psycopg2.Error as exc:
return cur, exc return cur, exc
cur, e = tmp() cur, e = tmp()
@ -208,12 +208,16 @@ class ExceptionsTestCase(ConnectingTestCase):
@skip_copy_if_green @skip_copy_if_green
def test_diagnostics_copy(self): def test_diagnostics_copy(self):
try:
from StringIO import StringIO from StringIO import StringIO
except ImportError:
from io import StringIO
f = StringIO() f = StringIO()
cur = self.conn.cursor() cur = self.conn.cursor()
try: try:
cur.copy_to(f, 'nonexist') cur.copy_to(f, 'nonexist')
except psycopg2.Error, exc: except psycopg2.Error as exc:
diag = exc.diag diag = exc.diag
self.assertEqual(diag.sqlstate, '42P01') self.assertEqual(diag.sqlstate, '42P01')
@ -222,14 +226,14 @@ class ExceptionsTestCase(ConnectingTestCase):
cur = self.conn.cursor() cur = self.conn.cursor()
try: try:
cur.execute("l'acqua e' poca e 'a papera nun galleggia") cur.execute("l'acqua e' poca e 'a papera nun galleggia")
except Exception, exc: except Exception as exc:
diag1 = exc.diag diag1 = exc.diag
self.conn.rollback() self.conn.rollback()
try: try:
cur.execute("select level from water where ducks > 1") cur.execute("select level from water where ducks > 1")
except psycopg2.Error, exc: except psycopg2.Error as exc:
diag2 = exc.diag diag2 = exc.diag
self.assertEqual(diag1.sqlstate, '42601') self.assertEqual(diag1.sqlstate, '42601')
@ -246,7 +250,7 @@ class ExceptionsTestCase(ConnectingTestCase):
cur.execute("insert into test_deferred values (1,2)") cur.execute("insert into test_deferred values (1,2)")
try: try:
self.conn.commit() self.conn.commit()
except psycopg2.Error, exc: except psycopg2.Error as exc:
e = exc e = exc
self.assertEqual(e.diag.sqlstate, '23503') self.assertEqual(e.diag.sqlstate, '23503')
@ -259,7 +263,7 @@ class ExceptionsTestCase(ConnectingTestCase):
)""") )""")
try: try:
cur.execute("insert into test_exc values(2)") cur.execute("insert into test_exc values(2)")
except psycopg2.Error, exc: except psycopg2.Error as exc:
e = exc e = exc
self.assertEqual(e.pgcode, '23514') self.assertEqual(e.pgcode, '23514')
self.assertEqual(e.diag.schema_name[:7], "pg_temp") self.assertEqual(e.diag.schema_name[:7], "pg_temp")
@ -274,7 +278,7 @@ class ExceptionsTestCase(ConnectingTestCase):
cur = self.conn.cursor() cur = self.conn.cursor()
try: try:
cur.execute("select * from nonexist") cur.execute("select * from nonexist")
except psycopg2.Error, exc: except psycopg2.Error as exc:
e = exc e = exc
e1 = pickle.loads(pickle.dumps(e)) e1 = pickle.loads(pickle.dumps(e))
@ -289,7 +293,7 @@ class ExceptionsTestCase(ConnectingTestCase):
import pickle import pickle
try: try:
psycopg2.connect('dbname=nosuchdatabasemate') psycopg2.connect('dbname=nosuchdatabasemate')
except psycopg2.Error, exc: except psycopg2.Error as exc:
e = exc e = exc
e1 = pickle.loads(pickle.dumps(e)) e1 = pickle.loads(pickle.dumps(e))

View File

@ -95,7 +95,11 @@ class QuotingTestCase(ConnectingTestCase):
data = u"""some data with \t chars data = u"""some data with \t chars
to escape into, 'quotes', \u20ac euro sign and \\ a backslash too. to escape into, 'quotes', \u20ac euro sign and \\ a backslash too.
""" """
data += u"".join(map(unichr, [ u for u in range(1,65536) if sys.version[0] == '3':
chrtype = chr
else:
chrtype = unichr
data += u"".join(map(chrtype, [ u for u in range(1,65536)
if not 0xD800 <= u <= 0xDFFF ])) # surrogate area if not 0xD800 <= u <= 0xDFFF ])) # surrogate area
self.conn.set_client_encoding('UNICODE') self.conn.set_client_encoding('UNICODE')
@ -112,7 +116,7 @@ class QuotingTestCase(ConnectingTestCase):
if sys.version_info[0] < 3: if sys.version_info[0] < 3:
data = ''.join(map(chr, range(32, 127) + range(160, 256))) data = ''.join(map(chr, range(32, 127) + range(160, 256)))
else: else:
data = bytes(range(32, 127) + range(160, 256)).decode('latin1') data = bytes(list(range(32, 127)) + list(range(160, 256))).decode('latin1')
# as string # as string
curs.execute("SELECT %s::text;", (data,)) curs.execute("SELECT %s::text;", (data,))
@ -136,7 +140,7 @@ class QuotingTestCase(ConnectingTestCase):
if sys.version_info[0] < 3: if sys.version_info[0] < 3:
data = ''.join(map(chr, range(32, 127) + range(128, 256))) data = ''.join(map(chr, range(32, 127) + range(128, 256)))
else: else:
data = bytes(range(32, 127) + range(128, 256)).decode('koi8_r') data = bytes(list(range(32, 127)) + list(range(128, 256))).decode('koi8_r')
# as string # as string
curs.execute("SELECT %s::text;", (data,)) curs.execute("SELECT %s::text;", (data,))

View File

@ -143,7 +143,7 @@ class DeadlockSerializationTests(ConnectingTestCase):
step1.set() step1.set()
step2.wait() step2.wait()
curs.execute("LOCK table2 IN ACCESS EXCLUSIVE MODE") curs.execute("LOCK table2 IN ACCESS EXCLUSIVE MODE")
except psycopg2.DatabaseError, exc: except psycopg2.DatabaseError as exc:
self.thread1_error = exc self.thread1_error = exc
step1.set() step1.set()
conn.close() conn.close()
@ -155,7 +155,7 @@ class DeadlockSerializationTests(ConnectingTestCase):
curs.execute("LOCK table2 IN ACCESS EXCLUSIVE MODE") curs.execute("LOCK table2 IN ACCESS EXCLUSIVE MODE")
step2.set() step2.set()
curs.execute("LOCK table1 IN ACCESS EXCLUSIVE MODE") curs.execute("LOCK table1 IN ACCESS EXCLUSIVE MODE")
except psycopg2.DatabaseError, exc: except psycopg2.DatabaseError as exc:
self.thread2_error = exc self.thread2_error = exc
step2.set() step2.set()
conn.close() conn.close()
@ -191,7 +191,7 @@ class DeadlockSerializationTests(ConnectingTestCase):
step2.wait() step2.wait()
curs.execute("UPDATE table1 SET name='task1' WHERE id = 1") curs.execute("UPDATE table1 SET name='task1' WHERE id = 1")
conn.commit() conn.commit()
except psycopg2.DatabaseError, exc: except psycopg2.DatabaseError as exc:
self.thread1_error = exc self.thread1_error = exc
step1.set() step1.set()
conn.close() conn.close()
@ -202,7 +202,7 @@ class DeadlockSerializationTests(ConnectingTestCase):
step1.wait() step1.wait()
curs.execute("UPDATE table1 SET name='task2' WHERE id = 1") curs.execute("UPDATE table1 SET name='task2' WHERE id = 1")
conn.commit() conn.commit()
except psycopg2.DatabaseError, exc: except psycopg2.DatabaseError as exc:
self.thread2_error = exc self.thread2_error = exc
step2.set() step2.set()
conn.close() conn.close()

View File

@ -54,8 +54,11 @@ class TypesBasicTests(ConnectingTestCase):
def testNumber(self): def testNumber(self):
s = self.execute("SELECT %s AS foo", (1971,)) s = self.execute("SELECT %s AS foo", (1971,))
self.failUnless(s == 1971, "wrong integer quoting: " + str(s)) self.failUnless(s == 1971, "wrong integer quoting: " + str(s))
s = self.execute("SELECT %s AS foo", (1971L,))
self.failUnless(s == 1971L, "wrong integer quoting: " + str(s)) @testutils.skip_from_python(3)
def testLongNumber(self):
s = self.execute("SELECT %s AS foo", (long(1971),))
self.failUnless(s == long(1971), "wrong integer quoting: " + str(s))
def testBoolean(self): def testBoolean(self):
x = self.execute("SELECT %s as foo", (False,)) x = self.execute("SELECT %s as foo", (False,))
@ -285,7 +288,8 @@ class TypesBasicTests(ConnectingTestCase):
self.assertEqual(1, f1) self.assertEqual(1, f1)
i1 = self.execute("select -%s;", (-1,)) i1 = self.execute("select -%s;", (-1,))
self.assertEqual(1, i1) self.assertEqual(1, i1)
l1 = self.execute("select -%s;", (-1L,)) if sys.version[0] == 2:
l1 = self.execute("select -%s;", (long(-1),))
self.assertEqual(1, l1) self.assertEqual(1, l1)
def testGenericArray(self): def testGenericArray(self):
@ -365,7 +369,7 @@ class ByteaParserTest(unittest.TestCase):
def setUp(self): def setUp(self):
try: try:
self._cast = self._import_cast() self._cast = self._import_cast()
except Exception, e: except Exception as e:
self._cast = None self._cast = None
self._exc = e self._exc = e

View File

@ -115,7 +115,7 @@ class TypesExtrasTests(ConnectingTestCase):
psycopg2.extensions.adapt, Foo(), ext.ISQLQuote, None) psycopg2.extensions.adapt, Foo(), ext.ISQLQuote, None)
try: try:
psycopg2.extensions.adapt(Foo(), ext.ISQLQuote, None) psycopg2.extensions.adapt(Foo(), ext.ISQLQuote, None)
except psycopg2.ProgrammingError, err: except psycopg2.ProgrammingError as err:
self.failUnless(str(err) == "can't adapt type 'Foo'") self.failUnless(str(err) == "can't adapt type 'Foo'")
@ -176,7 +176,7 @@ class HstoreTestCase(ConnectingTestCase):
kk = m.group(1).split(b(", ")) kk = m.group(1).split(b(", "))
vv = m.group(2).split(b(", ")) vv = m.group(2).split(b(", "))
ii = zip(kk, vv) ii = list(zip(kk, vv))
ii.sort() ii.sort()
def f(*args): def f(*args):
@ -255,6 +255,10 @@ class HstoreTestCase(ConnectingTestCase):
self.assert_(t[0] is None) self.assert_(t[0] is None)
self.assertEqual(t[1], {}) self.assertEqual(t[1], {})
self.assertEqual(t[2], {u'a': u'b'}) self.assertEqual(t[2], {u'a': u'b'})
if sys.version[0] == '3':
self.assert_(isinstance(t[2].keys()[0], str))
self.assert_(isinstance(t[2].values()[0], str))
else:
self.assert_(isinstance(t[2].keys()[0], unicode)) self.assert_(isinstance(t[2].keys()[0], unicode))
self.assert_(isinstance(t[2].values()[0], unicode)) self.assert_(isinstance(t[2].values()[0], unicode))
@ -315,6 +319,11 @@ class HstoreTestCase(ConnectingTestCase):
@skip_if_no_hstore @skip_if_no_hstore
def test_roundtrip_unicode(self): def test_roundtrip_unicode(self):
from psycopg2.extras import register_hstore from psycopg2.extras import register_hstore
if sys.version[0] == '3':
chrtype = chr
else:
chrtype = unichr
register_hstore(self.conn, unicode=True) register_hstore(self.conn, unicode=True)
cur = self.conn.cursor() cur = self.conn.cursor()
@ -325,13 +334,15 @@ class HstoreTestCase(ConnectingTestCase):
for k, v in d1.iteritems(): for k, v in d1.iteritems():
self.assert_(k in d, k) self.assert_(k in d, k)
self.assertEqual(d[k], v) self.assertEqual(d[k], v)
if sys.version[0] == '3':
self.assert_(isinstance(k, str))
self.assert_(v is None or isinstance(v, str))
else:
self.assert_(isinstance(k, unicode)) self.assert_(isinstance(k, unicode))
self.assert_(v is None or isinstance(v, unicode)) self.assert_(v is None or isinstance(v, unicode))
ok({}) ok({})
ok({'a': 'b', 'c': None, 'd': u'\u20ac', u'\u2603': 'e'}) ok({'a': 'b', 'c': None, 'd': u'\u20ac', u'\u2603': 'e'})
ab = map(chrtype, range(1, 1024))
ab = map(unichr, range(1, 1024))
ok({u''.join(ab): u''.join(ab)}) ok({u''.join(ab): u''.join(ab)})
ok(dict(zip(ab, ab))) ok(dict(zip(ab, ab)))
@ -501,7 +512,7 @@ class AdaptTypeTestCase(ConnectingTestCase):
'@,A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P,Q,R,S,T,U,V,W,X,Y,Z,[,"\\\\",],' '@,A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P,Q,R,S,T,U,V,W,X,Y,Z,[,"\\\\",],'
'^,_,`,a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t,u,v,w,x,y,z,{,|,},' '^,_,`,a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t,u,v,w,x,y,z,{,|,},'
'~,\x7f)', '~,\x7f)',
map(chr, range(1, 128))) list(map(chr, range(1, 128))))
ok('(,"\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f' ok('(,"\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f'
'\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !' '\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !'
'""#$%&\'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\\\]' '""#$%&\'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\\\]'
@ -1577,7 +1588,7 @@ class RangeCasterTestCase(ConnectingTestCase):
from psycopg2.tz import FixedOffsetTimezone from psycopg2.tz import FixedOffsetTimezone
cur = self.conn.cursor() cur = self.conn.cursor()
d1 = date(2012, 01, 01) d1 = date(2012, 1, 1)
d2 = date(2012, 12, 31) d2 = date(2012, 12, 31)
r = DateRange(d1, d2) r = DateRange(d1, d2)
cur.execute("select %s", (r,)) cur.execute("select %s", (r,))
@ -1650,8 +1661,8 @@ class RangeCasterTestCase(ConnectingTestCase):
bounds = [ '[)', '(]', '()', '[]' ] bounds = [ '[)', '(]', '()', '[]' ]
ranges = [ TextRange(low, up, bounds[i % 4]) ranges = [ TextRange(low, up, bounds[i % 4])
for i, (low, up) in enumerate(zip( for i, (low, up) in enumerate(zip(
[None] + map(chr, range(1, 128)), [None] + list(map(chr, range(1, 128))),
map(chr, range(1,128)) + [None], list(map(chr, range(1,128))) + [None],
))] ))]
ranges.append(TextRange()) ranges.append(TextRange())
ranges.append(TextRange(empty=True)) ranges.append(TextRange(empty=True))

View File

@ -207,7 +207,7 @@ class WithCursorTestCase(WithTestCase):
with self.conn as conn: with self.conn as conn:
with conn.cursor('named') as cur: with conn.cursor('named') as cur:
cur.execute("select 1/0") cur.execute("select 1/0")
except psycopg2.DataError, e: except psycopg2.DataError as e:
self.assertEqual(e.pgcode, '22012') self.assertEqual(e.pgcode, '22012')
else: else:
self.fail("where is my exception?") self.fail("where is my exception?")

View File

@ -97,7 +97,7 @@ class ConnectingTestCase(unittest.TestCase):
def connect(self, **kwargs): def connect(self, **kwargs):
try: try:
self._conns self._conns
except AttributeError, e: except AttributeError as e:
raise AttributeError( raise AttributeError(
"%s (did you remember calling ConnectingTestCase.setUp()?)" "%s (did you remember calling ConnectingTestCase.setUp()?)"
% e) % e)
@ -270,7 +270,7 @@ def skip_if_no_superuser(f):
from psycopg2 import ProgrammingError from psycopg2 import ProgrammingError
try: try:
return f(self) return f(self)
except ProgrammingError, e: except ProgrammingError as e:
import psycopg2.errorcodes import psycopg2.errorcodes
if e.pgcode == psycopg2.errorcodes.INSUFFICIENT_PRIVILEGE: if e.pgcode == psycopg2.errorcodes.INSUFFICIENT_PRIVILEGE:
self.skipTest("skipped because not superuser") self.skipTest("skipped because not superuser")