diff --git a/Makefile b/Makefile index fb0c145c..16298d60 100644 --- a/Makefile +++ b/Makefile @@ -71,7 +71,7 @@ docs-txt: doc/psycopg2.txt sdist: $(SDIST) runtests: package - PSYCOPG2_TESTDB=$(TESTDB) PYTHONPATH=$(BUILD_DIR):. $(PYTHON) tests/__init__.py --verbose + PSYCOPG2_TESTDB=$(TESTDB) PYTHONPATH=$(BUILD_DIR):.:$(PYTHONPATH) $(PYTHON) tests/__init__.py --verbose # The environment is currently required to build the documentation. diff --git a/tests/__init__.py b/tests/__init__.py index 1c8f6f2b..23193413 100755 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -1,7 +1,7 @@ #!/usr/bin/env python import os -import unittest +from testutils import unittest dbname = os.environ.get('PSYCOPG2_TESTDB', 'psycopg2_test') dbhost = os.environ.get('PSYCOPG2_TESTDB_HOST', None) @@ -59,14 +59,8 @@ def test_suite(): suite.addTest(test_transaction.test_suite()) suite.addTest(types_basic.test_suite()) suite.addTest(types_extras.test_suite()) - - if not green: - suite.addTest(test_lobject.test_suite()) - suite.addTest(test_copy.test_suite()) - else: - import warnings - warnings.warn("copy/lobjects not implemented in green mode: skipping tests") - + suite.addTest(test_lobject.test_suite()) + suite.addTest(test_copy.test_suite()) suite.addTest(test_notify.test_suite()) suite.addTest(test_async.test_suite()) suite.addTest(test_green.test_suite()) diff --git a/tests/extras_dictcursor.py b/tests/extras_dictcursor.py index 6bc5cad0..a0583c54 100644 --- a/tests/extras_dictcursor.py +++ b/tests/extras_dictcursor.py @@ -16,7 +16,7 @@ import psycopg2 import psycopg2.extras -import unittest +from testutils import unittest import tests @@ -111,8 +111,7 @@ def if_has_namedtuple(f): try: from collections import namedtuple except ImportError: - import warnings - warnings.warn("collections.namedtuple not available") + return self.skipTest("collections.namedtuple not available") else: return f(self) @@ -133,8 +132,9 @@ class NamedTupleCursorTest(unittest.TestCase): connection_factory=NamedTupleConnection) curs = self.conn.cursor() curs.execute("CREATE TEMPORARY TABLE nttest (i int, s text)") - curs.execute( - "INSERT INTO nttest VALUES (1, 'foo'), (2, 'bar'), (3, 'baz')") + curs.execute("INSERT INTO nttest VALUES (1, 'foo')") + curs.execute("INSERT INTO nttest VALUES (2, 'bar')") + curs.execute("INSERT INTO nttest VALUES (3, 'baz')") self.conn.commit() @if_has_namedtuple diff --git a/tests/test_async.py b/tests/test_async.py index 9187cec9..c839604d 100755 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -import unittest +from testutils import unittest import psycopg2 from psycopg2 import extensions @@ -98,14 +98,13 @@ class AsyncTests(unittest.TestCase): cur = self.conn.cursor() try: cur.callproc("pg_sleep", (0.1, )) - except psycopg2.ProgrammingError: - # PG <8.1 did not have pg_sleep - return - self.assertTrue(self.conn.isexecuting()) + self.assertTrue(self.conn.isexecuting()) - self.wait(cur) - self.assertFalse(self.conn.isexecuting()) - self.assertEquals(cur.fetchall()[0][0], '') + self.wait(cur) + self.assertFalse(self.conn.isexecuting()) + self.assertEquals(cur.fetchall()[0][0], '') + except psycopg2.ProgrammingError: + return self.skipTest("PG < 8.1 did not have pg_sleep") def test_async_after_async(self): cur = self.conn.cursor() @@ -213,7 +212,11 @@ class AsyncTests(unittest.TestCase): cur.execute("begin") self.wait(cur) - cur.execute("insert into table1 values (1), (2), (3)") + cur.execute(""" + insert into table1 values (1); + insert into table1 values (2); + insert into table1 values (3); + """) self.wait(cur) cur.execute("select id from table1 order by id") @@ -247,7 +250,11 @@ class AsyncTests(unittest.TestCase): def test_async_scroll(self): cur = self.conn.cursor() - cur.execute("insert into table1 values (1), (2), (3)") + cur.execute(""" + insert into table1 values (1); + insert into table1 values (2); + insert into table1 values (3); + """) self.wait(cur) cur.execute("select id from table1 order by id") @@ -279,7 +286,11 @@ class AsyncTests(unittest.TestCase): def test_scroll(self): cur = self.sync_conn.cursor() cur.execute("create table table1 (id int)") - cur.execute("insert into table1 values (1), (2), (3)") + cur.execute(""" + insert into table1 values (1); + insert into table1 values (2); + insert into table1 values (3); + """) cur.execute("select id from table1 order by id") cur.scroll(2) cur.scroll(-1) diff --git a/tests/test_connection.py b/tests/test_connection.py index 36ce6406..400ddd62 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -1,6 +1,6 @@ #!/usr/bin/env python -import unittest +from testutils import unittest, decorate_all_tests from operator import attrgetter import psycopg2 @@ -82,13 +82,24 @@ class ConnectionTests(unittest.TestCase): conn = self.connect() self.assert_(conn.protocol_version in (2,3), conn.protocol_version) + def test_tpc_unsupported(self): + cnn = self.connect() + if cnn.server_version >= 80100: + return self.skipTest("tpc is supported") + + self.assertRaises(psycopg2.NotSupportedError, + cnn.xid, 42, "foo", "bar") + class IsolationLevelsTestCase(unittest.TestCase): def setUp(self): conn = self.connect() cur = conn.cursor() - cur.execute("drop table if exists isolevel;") + try: + cur.execute("drop table isolevel;") + except psycopg2.ProgrammingError: + conn.rollback() cur.execute("create table isolevel (id integer);") conn.commit() conn.close() @@ -244,18 +255,16 @@ def skip_if_tpc_disabled(f): try: cur.execute("SHOW max_prepared_transactions;") except psycopg2.ProgrammingError: - # Server version too old: let's die a different death - mtp = 1 + return self.skipTest( + "server too old: two phase transactions not supported.") else: mtp = int(cur.fetchone()[0]) cnn.close() if not mtp: - import warnings - warnings.warn( + return self.skipTest( "server not configured for two phase transactions. " "set max_prepared_transactions to > 0 to run the test") - return return f(self) skip_if_tpc_disabled_.__name__ = f.__name__ @@ -274,9 +283,15 @@ class ConnectionTwoPhaseTests(unittest.TestCase): cnn = self.connect() cnn.set_isolation_level(0) cur = cnn.cursor() - cur.execute( - "select gid from pg_prepared_xacts where database = %s", - (tests.dbname,)) + try: + cur.execute( + "select gid from pg_prepared_xacts where database = %s", + (tests.dbname,)) + except psycopg2.ProgrammingError: + cnn.rollback() + cnn.close() + return + gids = [ r[0] for r in cur ] for gid in gids: cur.execute("rollback prepared %s;", (gid,)) @@ -285,7 +300,10 @@ class ConnectionTwoPhaseTests(unittest.TestCase): def make_test_table(self): cnn = self.connect() cur = cnn.cursor() - cur.execute("DROP TABLE IF EXISTS test_tpc;") + try: + cur.execute("DROP TABLE test_tpc;") + except psycopg2.ProgrammingError: + cnn.rollback() cur.execute("CREATE TABLE test_tpc (data text);") cnn.commit() cnn.close() @@ -314,7 +332,6 @@ class ConnectionTwoPhaseTests(unittest.TestCase): def connect(self): return psycopg2.connect(tests.dsn) - @skip_if_tpc_disabled def test_tpc_commit(self): cnn = self.connect() xid = cnn.xid(1, "gtrid", "bqual") @@ -356,7 +373,6 @@ class ConnectionTwoPhaseTests(unittest.TestCase): self.assertEqual(0, self.count_xacts()) self.assertEqual(1, self.count_test_records()) - @skip_if_tpc_disabled def test_tpc_commit_recovered(self): cnn = self.connect() xid = cnn.xid(1, "gtrid", "bqual") @@ -383,7 +399,6 @@ class ConnectionTwoPhaseTests(unittest.TestCase): self.assertEqual(0, self.count_xacts()) self.assertEqual(1, self.count_test_records()) - @skip_if_tpc_disabled def test_tpc_rollback(self): cnn = self.connect() xid = cnn.xid(1, "gtrid", "bqual") @@ -425,7 +440,6 @@ class ConnectionTwoPhaseTests(unittest.TestCase): self.assertEqual(0, self.count_xacts()) self.assertEqual(0, self.count_test_records()) - @skip_if_tpc_disabled def test_tpc_rollback_recovered(self): cnn = self.connect() xid = cnn.xid(1, "gtrid", "bqual") @@ -464,7 +478,6 @@ class ConnectionTwoPhaseTests(unittest.TestCase): xns = cnn.tpc_recover() self.assertEqual(psycopg2.extensions.STATUS_BEGIN, cnn.status) - @skip_if_tpc_disabled def test_recovered_xids(self): # insert a few test xns cnn = self.connect() @@ -495,7 +508,6 @@ class ConnectionTwoPhaseTests(unittest.TestCase): self.assertEqual(xid.owner, owner) self.assertEqual(xid.database, database) - @skip_if_tpc_disabled def test_xid_encoding(self): cnn = self.connect() xid = cnn.xid(42, "gtrid", "bqual") @@ -508,7 +520,6 @@ class ConnectionTwoPhaseTests(unittest.TestCase): (tests.dbname,)) self.assertEqual('42_Z3RyaWQ=_YnF1YWw=', cur.fetchone()[0]) - @skip_if_tpc_disabled def test_xid_roundtrip(self): for fid, gtrid, bqual in [ (0, "", ""), @@ -532,7 +543,6 @@ class ConnectionTwoPhaseTests(unittest.TestCase): cnn.tpc_rollback(xid) - @skip_if_tpc_disabled def test_unparsed_roundtrip(self): for tid in [ '', @@ -585,7 +595,6 @@ class ConnectionTwoPhaseTests(unittest.TestCase): x2 = Xid.from_string('99_xxx_yyy') self.assertEqual(str(x2), '99_xxx_yyy') - @skip_if_tpc_disabled def test_xid_unicode(self): cnn = self.connect() x1 = cnn.xid(10, u'uni', u'code') @@ -598,7 +607,6 @@ class ConnectionTwoPhaseTests(unittest.TestCase): self.assertEqual('uni', xid.gtrid) self.assertEqual('code', xid.bqual) - @skip_if_tpc_disabled def test_xid_unicode_unparsed(self): # We don't expect people shooting snowmen as transaction ids, # so if something explodes in an encode error I don't mind. @@ -615,6 +623,8 @@ class ConnectionTwoPhaseTests(unittest.TestCase): self.assertEqual('transaction-id', xid.gtrid) self.assertEqual(None, xid.bqual) +decorate_all_tests(ConnectionTwoPhaseTests, skip_if_tpc_disabled) + def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__) diff --git a/tests/test_copy.py b/tests/test_copy.py index 19fabe1b..b0d9965c 100644 --- a/tests/test_copy.py +++ b/tests/test_copy.py @@ -1,7 +1,7 @@ #!/usr/bin/env python import os import string -import unittest +from testutils import unittest, decorate_all_tests from cStringIO import StringIO from itertools import cycle, izip @@ -9,6 +9,15 @@ import psycopg2 import psycopg2.extensions import tests +def skip_if_green(f): + def skip_if_green_(self): + if tests.green: + return self.skipTest("copy in async mode currently not supported") + else: + return f(self) + + return skip_if_green_ + class MinimalRead(object): """A file wrapper exposing the minimal interface to copy from.""" @@ -29,6 +38,7 @@ class MinimalWrite(object): def write(self, data): return self.f.write(data) + class CopyTests(unittest.TestCase): def setUp(self): @@ -124,6 +134,9 @@ class CopyTests(unittest.TestCase): self.assertEqual(ntests, len(string.letters)) +decorate_all_tests(CopyTests, skip_if_green) + + def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__) diff --git a/tests/test_lobject.py b/tests/test_lobject.py index f4a684fd..88e18d6f 100644 --- a/tests/test_lobject.py +++ b/tests/test_lobject.py @@ -2,16 +2,33 @@ import os import shutil import tempfile -import unittest -import warnings +from testutils import unittest, decorate_all_tests import psycopg2 import psycopg2.extensions import tests +def skip_if_no_lo(f): + def skip_if_no_lo_(self): + if self.conn.server_version < 80100: + return self.skipTest("large objects only supported from PG 8.1") + else: + return f(self) -class LargeObjectTests(unittest.TestCase): + return skip_if_no_lo_ +def skip_if_green(f): + def skip_if_green_(self): + if tests.green: + return self.skipTest("libpq doesn't support LO in async mode") + else: + return f(self) + + return skip_if_green_ + + +class LargeObjectMixin(object): + # doesn't derive from TestCase to avoid repeating tests twice. def setUp(self): self.conn = psycopg2.connect(tests.dsn) self.lo_oid = None @@ -30,6 +47,8 @@ class LargeObjectTests(unittest.TestCase): lo.unlink() self.conn.close() + +class LargeObjectTests(LargeObjectMixin, unittest.TestCase): def test_create(self): lo = self.conn.lobject() self.assertNotEqual(lo, None) @@ -261,30 +280,25 @@ class LargeObjectTests(unittest.TestCase): self.assertTrue(os.path.exists(filename)) self.assertEqual(open(filename, "rb").read(), "some data") +decorate_all_tests(LargeObjectTests, skip_if_no_lo) +decorate_all_tests(LargeObjectTests, skip_if_green) -class LargeObjectTruncateTests(LargeObjectTests): - skip = None +def skip_if_no_truncate(f): + def skip_if_no_truncate_(self): + if self.conn.server_version < 80300: + return self.skipTest( + "the server doesn't support large object truncate") - def setUp(self): - LargeObjectTests.setUp(self) + if not hasattr(psycopg2.extensions.lobject, 'truncate'): + return self.skipTest( + "psycopg2 has been built against a libpq " + "without large object truncate support.") - if self.skip is None: - self.skip = False - if self.conn.server_version < 80300: - warnings.warn("Large object truncate tests skipped, " - "the server does not support them") - self.skip = True - - if not hasattr(psycopg2.extensions.lobject, 'truncate'): - warnings.warn("Large object truncate tests skipped, " - "psycopg2 has been built against an old library") - self.skip = True + return f(self) +class LargeObjectTruncateTests(LargeObjectMixin, unittest.TestCase): def test_truncate(self): - if self.skip: - return - lo = self.conn.lobject() lo.write("some data") lo.close() @@ -308,23 +322,22 @@ class LargeObjectTruncateTests(LargeObjectTests): self.assertEqual(lo.read(), "") def test_truncate_after_close(self): - if self.skip: - return - lo = self.conn.lobject() lo.close() self.assertRaises(psycopg2.InterfaceError, lo.truncate) def test_truncate_after_commit(self): - if self.skip: - return - lo = self.conn.lobject() self.lo_oid = lo.oid self.conn.commit() self.assertRaises(psycopg2.ProgrammingError, lo.truncate) +decorate_all_tests(LargeObjectTruncateTests, skip_if_no_lo) +decorate_all_tests(LargeObjectTruncateTests, skip_if_green) +decorate_all_tests(LargeObjectTruncateTests, skip_if_no_truncate) + + def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__) diff --git a/tests/test_notify.py b/tests/test_notify.py index 8dd8c709..75dae1ee 100755 --- a/tests/test_notify.py +++ b/tests/test_notify.py @@ -1,6 +1,5 @@ #!/usr/bin/env python -import unittest -import warnings +from testutils import unittest import psycopg2 from psycopg2 import extensions @@ -127,9 +126,8 @@ conn.close() def test_notify_payload(self): if self.conn.server_version < 90000: - warnings.warn("server version %s doesn't support notify payload: skipping test" + return self.skipTest("server version %s doesn't support notify payload" % self.conn.server_version) - return self.autocommit(self.conn) self.listen('foo') pid = int(self.notify('foo', payload="Hello, world!").communicate()[0]) diff --git a/tests/test_psycopg2_dbapi20.py b/tests/test_psycopg2_dbapi20.py index 1d83f066..93fdfa60 100755 --- a/tests/test_psycopg2_dbapi20.py +++ b/tests/test_psycopg2_dbapi20.py @@ -2,7 +2,7 @@ import dbapi20 import dbapi20_tpc from test_connection import skip_if_tpc_disabled -import unittest +from testutils import unittest, decorate_all_tests import psycopg2 import tests @@ -23,19 +23,14 @@ class Psycopg2Tests(dbapi20.DatabaseAPI20Test): pass -class Psycopg2TPCTests(dbapi20_tpc.TwoPhaseCommitTests): +class Psycopg2TPCTests(dbapi20_tpc.TwoPhaseCommitTests, unittest.TestCase): driver = psycopg2 def connect(self): return psycopg2.connect(dsn=tests.dsn) - @skip_if_tpc_disabled - def test_tpc_commit_with_prepare(self): - super(Psycopg2TPCTests, self).test_tpc_commit_with_prepare() +decorate_all_tests(Psycopg2TPCTests, skip_if_tpc_disabled) - @skip_if_tpc_disabled - def test_tpc_rollback_with_prepare(self): - super(Psycopg2TPCTests, self).test_tpc_rollback_with_prepare() def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__) diff --git a/tests/test_quote.py b/tests/test_quote.py index 6da10fa5..95c5d7a5 100755 --- a/tests/test_quote.py +++ b/tests/test_quote.py @@ -1,6 +1,5 @@ #!/usr/bin/env python -import unittest -import warnings +from testutils import unittest import psycopg2 import psycopg2.extensions @@ -61,9 +60,9 @@ class QuotingTestCase(unittest.TestCase): curs.execute("SHOW server_encoding") server_encoding = curs.fetchone()[0] if server_encoding != "UTF8": - warnings.warn("Unicode test skipped since server encoding is %s" - % server_encoding) - return + return self.skipTest( + "Unicode test skipped since server encoding is %s" + % server_encoding) data = u"""some data with \t chars to escape into, 'quotes', \u20ac euro sign and \\ a backslash too. diff --git a/tests/test_transaction.py b/tests/test_transaction.py index 030bf175..3ee5ee7c 100755 --- a/tests/test_transaction.py +++ b/tests/test_transaction.py @@ -1,6 +1,6 @@ #!/usr/bin/env python import threading -import unittest +from testutils import unittest import psycopg2 from psycopg2.extensions import ( @@ -215,8 +215,11 @@ class QueryCancellationTests(unittest.TestCase): curs = self.conn.cursor() # Set a low statement timeout, then sleep for a longer period. curs.execute('SET statement_timeout TO 10') - self.assertRaises(psycopg2.extensions.QueryCanceledError, - curs.execute, 'SELECT pg_sleep(50)') + try: + self.assertRaises(psycopg2.extensions.QueryCanceledError, + curs.execute, 'SELECT pg_sleep(50)') + except psycopg2.ProgrammingError: + return self.skipTest("pg_sleep not available") def test_suite(): diff --git a/tests/testutils.py b/tests/testutils.py new file mode 100644 index 00000000..c047827c --- /dev/null +++ b/tests/testutils.py @@ -0,0 +1,46 @@ +# Utility module for psycopg2 testing. +# +# Copyright (C) 2010 Daniele Varrazzo + +# Use unittest2 if available. Otherwise mock a skip facility with warnings. + +try: + import unittest2 + unittest = unittest2 +except ImportError: + import unittest + unittest2 = None + +if hasattr(unittest, 'skipIf'): + from unittest2 import skip, skipIf + +else: + import warnings + + def skipIf(cond, msg): + def skipIf_(f): + def skipIf__(self): + if cond: + warnings.warn(msg) + return + else: + return f(self) + return skipIf__ + return skipIf_ + + def skip(msg): + return skipIf(True, msg) + + def skipTest(self, msg): + warnings.warn(msg) + return + + unittest.TestCase.skipTest = skipTest + + +def decorate_all_tests(cls, decorator): + """Apply *decorator* to all the tests defined in the TestCase *cls*.""" + for n in dir(cls): + if n.startswith('test'): + setattr(cls, n, decorator(getattr(cls, n))) + diff --git a/tests/types_basic.py b/tests/types_basic.py index aee4754f..8e6f1993 100755 --- a/tests/types_basic.py +++ b/tests/types_basic.py @@ -27,7 +27,7 @@ try: except: pass import sys -import unittest +from testutils import unittest import psycopg2 import tests @@ -78,14 +78,14 @@ class TypesBasicTests(unittest.TestCase): s = self.execute("SELECT %s AS foo", (decimal.Decimal("-infinity"),)) self.failUnless(str(s) == "NaN", "wrong decimal quoting: " + str(s)) self.failUnless(type(s) == decimal.Decimal, "wrong decimal conversion: " + repr(s)) + else: + return self.skipTest("decimal not available") def testFloat(self): try: float("nan") except ValueError: - import warnings - warnings.warn("nan not available on this platform") - return + return self.skipTest("nan not available on this platform") s = self.execute("SELECT %s AS foo", (float("nan"),)) self.failUnless(str(s) == "nan", "wrong float quoting: " + str(s)) diff --git a/tests/types_extras.py b/tests/types_extras.py index dc5a585f..062289e3 100644 --- a/tests/types_extras.py +++ b/tests/types_extras.py @@ -20,14 +20,35 @@ except: pass import re import sys -import unittest -import warnings +from testutils import unittest import psycopg2 import psycopg2.extras import tests +def skip_if_no_uuid(f): + def skip_if_no_uuid_(self): + try: + cur = self.conn.cursor() + cur.execute("select typname from pg_type where typname = 'uuid'") + has = cur.fetchone() + finally: + self.conn.rollback() + + if has: + return f(self) + else: + return self.skipTest("uuid type not available") + + return skip_if_no_uuid_ + +def filter_scs(conn, s): + if conn.get_parameter_status("standard_conforming_strings") == 'off': + return s + else: + return s.replace("E'", "'") + class TypesExtrasTests(unittest.TestCase): """Test that all type conversions are working.""" @@ -39,12 +60,10 @@ class TypesExtrasTests(unittest.TestCase): curs.execute(*args) return curs.fetchone()[0] + @skip_if_no_uuid def testUUID(self): - try: - import uuid - psycopg2.extras.register_uuid() - except: - return + import uuid + psycopg2.extras.register_uuid() u = uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350') s = self.execute("SELECT %s AS foo", (u,)) self.failUnless(u == s) @@ -52,12 +71,10 @@ class TypesExtrasTests(unittest.TestCase): s = self.execute("SELECT NULL::uuid AS foo") self.failUnless(s is None) + @skip_if_no_uuid def testUUIDARRAY(self): - try: - import uuid - psycopg2.extras.register_uuid() - except: - return + import uuid + psycopg2.extras.register_uuid() u = [uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350'), uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e352')] s = self.execute("SELECT %s AS foo", (u,)) self.failUnless(u == s) @@ -86,13 +103,17 @@ class TypesExtrasTests(unittest.TestCase): i = Inet("192.168.1.0/24") a = psycopg2.extensions.adapt(i) a.prepare(self.conn) - self.assertEqual("E'192.168.1.0/24'::inet", a.getquoted()) + self.assertEqual( + filter_scs(self.conn, "E'192.168.1.0/24'::inet"), + a.getquoted()) # adapts ok with unicode too i = Inet(u"192.168.1.0/24") a = psycopg2.extensions.adapt(i) a.prepare(self.conn) - self.assertEqual("E'192.168.1.0/24'::inet", a.getquoted()) + self.assertEqual( + filter_scs(self.conn, "E'192.168.1.0/24'::inet"), + a.getquoted()) def test_adapt_fail(self): class Foo(object): pass @@ -109,8 +130,7 @@ def skip_if_no_hstore(f): from psycopg2.extras import HstoreAdapter oids = HstoreAdapter.get_oids(self.conn) if oids is None: - warnings.warn("hstore not available in test database: skipping test") - return + return self.skipTest("hstore not available in test database") return f(self) return skip_if_no_hstore_ @@ -121,8 +141,7 @@ class HstoreTestCase(unittest.TestCase): def test_adapt_8(self): if self.conn.server_version >= 90000: - warnings.warn("skipping dict adaptation with PG pre-9 syntax") - return + return self.skipTest("skipping dict adaptation with PG pre-9 syntax") from psycopg2.extras import HstoreAdapter @@ -136,16 +155,15 @@ class HstoreTestCase(unittest.TestCase): ii = q[1:-1].split("||") ii.sort() - self.assertEqual(ii[0], "(E'a' => E'1')") - self.assertEqual(ii[1], "(E'b' => E'''')") - self.assertEqual(ii[2], "(E'c' => NULL)") + self.assertEqual(ii[0], filter_scs(self.conn, "(E'a' => E'1')")) + self.assertEqual(ii[1], filter_scs(self.conn, "(E'b' => E'''')")) + self.assertEqual(ii[2], filter_scs(self.conn, "(E'c' => NULL)")) encc = u'\xe0'.encode(psycopg2.extensions.encodings[self.conn.encoding]) - self.assertEqual(ii[3], "(E'd' => E'%s')" % encc) + self.assertEqual(ii[3], filter_scs(self.conn, "(E'd' => E'%s')" % encc)) def test_adapt_9(self): if self.conn.server_version < 90000: - warnings.warn("skipping dict adaptation with PG 9 syntax") - return + return self.skipTest("skipping dict adaptation with PG 9 syntax") from psycopg2.extras import HstoreAdapter