Merge branch 'python2' into python3

Conflicts:
	tests/bug_gc.py
	tests/types_extras.py
This commit is contained in:
Daniele Varrazzo 2011-01-18 02:11:58 +00:00
commit 9f90f049ab
3 changed files with 61 additions and 30 deletions

View File

@ -30,15 +30,23 @@ import gc
from testconfig import dsn
from testutils import skip_if_no_uuid
class StolenReferenceTestCase(unittest.TestCase):
def setUp(self):
self.conn = psycopg2.connect(dsn)
def tearDown(self):
self.conn.close()
@skip_if_no_uuid
def test_stolen_reference_bug(self):
def fish(val, cur):
gc.collect()
return 42
conn = psycopg2.connect(dsn)
UUID = psycopg2.extensions.new_type((2950,), "UUID", fish)
psycopg2.extensions.register_type(UUID, conn)
curs = conn.cursor()
psycopg2.extensions.register_type(UUID, self.conn)
curs = self.conn.cursor()
curs.execute("select 'b5219e01-19ab-4994-b71e-149225dc51e4'::uuid")
curs.fetchone()

View File

@ -78,6 +78,29 @@ def decorate_all_tests(cls, decorator):
setattr(cls, n, decorator(getattr(cls, n)))
def skip_if_no_uuid(f):
"""Decorator to skip a test if uuid is not supported by Py/PG."""
def skip_if_no_uuid_(self):
try:
import uuid
except ImportError:
return self.skipTest("uuid not available in this Python version")
try:
cur = self.conn.cursor()
cur.execute("select typname from pg_type where typname = 'uuid'")
has = cur.fetchone()
finally:
self.conn.rollback()
if has:
return f(self)
else:
return self.skipTest("uuid type not available on the server")
return skip_if_no_uuid_
def skip_if_no_pg_sleep(name):
"""Decorator to skip a test if pg_sleep is not supported by the server.
@ -90,7 +113,7 @@ def skip_if_no_pg_sleep(name):
if callable(cnn):
cnn = cnn()
if cnn.server_version < 80100:
if cnn.server_version < 80200:
return self.skipTest(
"server version %s doesn't support pg_sleep"
% cnn.server_version)

View File

@ -22,7 +22,7 @@ import re
import sys
from datetime import date
from testutils import unittest
from testutils import unittest, skip_if_no_uuid
import psycopg2
import psycopg2.extras
@ -31,27 +31,6 @@ from psycopg2.extensions import b
from testconfig import dsn
def skip_if_no_uuid(f):
def skip_if_no_uuid_(self):
try:
import uuid
except ImportError:
return self.skipTest("uuid not available in this Python version")
try:
cur = self.conn.cursor()
cur.execute("select typname from pg_type where typname = 'uuid'")
has = cur.fetchone()
finally:
self.conn.rollback()
if has:
return f(self)
else:
return self.skipTest("uuid type not available on the server")
return skip_if_no_uuid_
def filter_scs(conn, s):
if conn.get_parameter_status("standard_conforming_strings") == 'off':
return s
@ -202,13 +181,16 @@ class HstoreTestCase(unittest.TestCase):
ii = zip(kk, vv)
ii.sort()
def f(*args):
return tuple([filter_scs(self.conn, s) for s in args])
self.assertEqual(len(ii), len(o))
self.assertEqual(ii[0], (b("E'a'"), b("E'1'")))
self.assertEqual(ii[1], (b("E'b'"), b("E''''")))
self.assertEqual(ii[2], (b("E'c'"), b("NULL")))
self.assertEqual(ii[0], f(b("E'a'"), b("E'1'")))
self.assertEqual(ii[1], f(b("E'b'"), b("E''''")))
self.assertEqual(ii[2], f(b("E'c'"), b("NULL")))
if 'd' in o:
encc = u'\xe0'.encode(psycopg2.extensions.encodings[self.conn.encoding])
self.assertEqual(ii[3], (b("E'd'"), b("E'") + encc + b("'")))
self.assertEqual(ii[3], f(b("E'd'"), b("E'") + encc + b("'")))
def test_parse(self):
from psycopg2.extras import HstoreAdapter
@ -356,6 +338,18 @@ class HstoreTestCase(unittest.TestCase):
ok(dict(zip(ab, ab)))
def skip_if_no_composite(f):
def skip_if_no_composite_(self):
if self.conn.server_version < 80000:
return self.skipTest(
"server version %s doesn't support composite types"
% self.conn.server_version)
return f(self)
skip_if_no_composite_.__name__ = f.__name__
return skip_if_no_composite_
class AdaptTypeTestCase(unittest.TestCase):
def setUp(self):
self.conn = psycopg2.connect(dsn)
@ -363,6 +357,7 @@ class AdaptTypeTestCase(unittest.TestCase):
def tearDown(self):
self.conn.close()
@skip_if_no_composite
def test_none_in_record(self):
curs = self.conn.cursor()
s = curs.mogrify("SELECT %s;", [(42, None)])
@ -420,6 +415,7 @@ class AdaptTypeTestCase(unittest.TestCase):
'^_`abcdefghijklmnopqrstuvwxyz{|}~\x7f")',
[None, ''.join(map(chr, range(1, 128)))])
@skip_if_no_composite
def test_cast_composite(self):
oid = self._create_type("type_isd",
[('anint', 'integer'), ('astring', 'text'), ('adate', 'date')])
@ -450,6 +446,7 @@ class AdaptTypeTestCase(unittest.TestCase):
self.assertEqual(v.astring, "hello")
self.assertEqual(v.adate, date(2011,1,2))
@skip_if_no_composite
def test_cast_nested(self):
self._create_type("type_is",
[("anint", "integer"), ("astring", "text")])
@ -476,6 +473,7 @@ class AdaptTypeTestCase(unittest.TestCase):
else:
self.assertEqual(v.anotherpair.apair.astring, "hello")
@skip_if_no_composite
def test_register_on_cursor(self):
self._create_type("type_ii", [("a", "integer"), ("b", "integer")])
@ -487,6 +485,7 @@ class AdaptTypeTestCase(unittest.TestCase):
curs2.execute("select (1,2)::type_ii")
self.assertEqual(curs2.fetchone()[0], "(1,2)")
@skip_if_no_composite
def test_register_on_connection(self):
self._create_type("type_ii", [("a", "integer"), ("b", "integer")])
@ -504,6 +503,7 @@ class AdaptTypeTestCase(unittest.TestCase):
conn1.close()
conn2.close()
@skip_if_no_composite
def test_register_globally(self):
self._create_type("type_ii", [("a", "integer"), ("b", "integer")])