Fixed register_hstore and register_composite with non-dbapi objects

Closed ticket #114.
This commit is contained in:
Daniele Varrazzo 2012-08-14 23:26:17 +01:00
parent 8666693512
commit 21d323d2c8
3 changed files with 64 additions and 12 deletions

2
NEWS
View File

@ -7,6 +7,8 @@ What's new in psycopg 2.4.6
- Dropped GIL release during string adaptation around a function call - Dropped GIL release during string adaptation around a function call
invoking a Python API function, which could cause interpreter crash. invoking a Python API function, which could cause interpreter crash.
Thanks to Manu Cupcic for the report (ticket #110). Thanks to Manu Cupcic for the report (ticket #110).
- 'register_hstore()', 'register_composite()' work with RealDictConnection
and Cursor. Thanks to Adrian Klaver for the report (ticket #114).
- connection.reset() implemented using DISCARD ALL on server versions - connection.reset() implemented using DISCARD ALL on server versions
supporting it. supporting it.

View File

@ -571,6 +571,18 @@ def wait_select(conn):
raise OperationalError("bad state from poll: %s" % state) raise OperationalError("bad state from poll: %s" % state)
def _solve_conn_curs(conn_or_curs):
"""Return the connection and a DBAPI cursor from a connection or cursor."""
if hasattr(conn_or_curs, 'execute'):
conn = conn_or_curs.connection
curs = conn.cursor(cursor_factory=_cursor)
else:
conn = conn_or_curs
curs = conn.cursor(cursor_factory=_cursor)
return conn, curs
class HstoreAdapter(object): class HstoreAdapter(object):
"""Adapt a Python dict to the hstore syntax.""" """Adapt a Python dict to the hstore syntax."""
def __init__(self, wrapped): def __init__(self, wrapped):
@ -679,12 +691,7 @@ class HstoreAdapter(object):
def get_oids(self, conn_or_curs): def get_oids(self, conn_or_curs):
"""Return the lists of OID of the hstore and hstore[] types. """Return the lists of OID of the hstore and hstore[] types.
""" """
if hasattr(conn_or_curs, 'execute'): conn, curs = _solve_conn_curs(conn_or_curs)
conn = conn_or_curs.connection
curs = conn_or_curs
else:
conn = conn_or_curs
curs = conn_or_curs.cursor()
# Store the transaction status of the connection to revert it after use # Store the transaction status of the connection to revert it after use
conn_status = conn.status conn_status = conn.status
@ -890,12 +897,7 @@ class CompositeCaster(object):
Raise `ProgrammingError` if the type is not found. Raise `ProgrammingError` if the type is not found.
""" """
if hasattr(conn_or_curs, 'execute'): conn, curs = _solve_conn_curs(conn_or_curs)
conn = conn_or_curs.connection
curs = conn_or_curs
else:
conn = conn_or_curs
curs = conn_or_curs.cursor()
# Store the transaction status of the connection to revert it after use # Store the transaction status of the connection to revert it after use
conn_status = conn.status conn_status = conn.status

View File

@ -424,6 +424,30 @@ class HstoreTestCase(unittest.TestCase):
psycopg2.extensions.string_types.pop(oid) psycopg2.extensions.string_types.pop(oid)
psycopg2.extensions.string_types.pop(aoid) psycopg2.extensions.string_types.pop(aoid)
@skip_if_no_hstore
def test_non_dbapi_connection(self):
from psycopg2.extras import RealDictConnection
from psycopg2.extras import register_hstore
conn = psycopg2.connect(dsn, connection_factory=RealDictConnection)
try:
register_hstore(conn)
curs = conn.cursor()
curs.execute("select ''::hstore x")
self.assertEqual(curs.fetchone()['x'], {})
finally:
conn.close()
conn = psycopg2.connect(dsn, connection_factory=RealDictConnection)
try:
curs = conn.cursor()
register_hstore(curs)
curs.execute("select ''::hstore x")
self.assertEqual(curs.fetchone()['x'], {})
finally:
conn.close()
def skip_if_no_composite(f): def skip_if_no_composite(f):
def skip_if_no_composite_(self): def skip_if_no_composite_(self):
if self.conn.server_version < 80000: if self.conn.server_version < 80000:
@ -712,6 +736,30 @@ class AdaptTypeTestCase(unittest.TestCase):
self.assertEqual(r[0], (2, 'test2')) self.assertEqual(r[0], (2, 'test2'))
self.assertEqual(r[1], [(3, 'testc', 2), (4, 'testd', 2)]) self.assertEqual(r[1], [(3, 'testc', 2), (4, 'testd', 2)])
@skip_if_no_hstore
def test_non_dbapi_connection(self):
from psycopg2.extras import RealDictConnection
from psycopg2.extras import register_composite
self._create_type("type_ii", [("a", "integer"), ("b", "integer")])
conn = psycopg2.connect(dsn, connection_factory=RealDictConnection)
try:
register_composite('type_ii', conn)
curs = conn.cursor()
curs.execute("select '(1,2)'::type_ii x")
self.assertEqual(curs.fetchone()['x'], (1,2))
finally:
conn.close()
conn = psycopg2.connect(dsn, connection_factory=RealDictConnection)
try:
curs = conn.cursor()
register_composite('type_ii', conn)
curs.execute("select '(1,2)'::type_ii x")
self.assertEqual(curs.fetchone()['x'], (1,2))
finally:
conn.close()
def _create_type(self, name, fields): def _create_type(self, name, fields):
curs = self.conn.cursor() curs = self.conn.cursor()
try: try: