The hstore typecast can be registered globally.

This commit is contained in:
Daniele Varrazzo 2010-09-27 01:46:54 +01:00
parent bb41acd1da
commit d5bf400cb4
2 changed files with 62 additions and 28 deletions

View File

@ -581,11 +581,47 @@ class HstoreAdapter(object):
parse_unicode = classmethod(parse_unicode)
def register_hstore(conn_or_curs, unicode=False):
def get_oids(self, conn_or_curs):
"""Return the oid of the hstore and hstore[] types.
Return None if hstore is not available.
"""
if hasattr(conn_or_curs, 'execute'):
conn = conn_or_curs.connection
curs = conn_or_curs
else:
conn = conn_or_curs
curs = conn_or_curs.cursor()
# Store the transaction status of the connection to revert it after use
conn_status = conn.status
# get the oid for the hstore
curs.execute("""\
SELECT t.oid, typarray
FROM pg_type t JOIN pg_namespace ns
ON typnamespace = ns.oid
WHERE typname = 'hstore' and nspname = 'public';
""")
oids = curs.fetchone()
# revert the status of the connection as before the command
if (conn_status != _ext.STATUS_IN_TRANSACTION
and conn.isolation_level != _ext.ISOLATION_LEVEL_AUTOCOMMIT):
conn.rollback()
return oids
get_oids = classmethod(get_oids)
def register_hstore(conn_or_curs, globally=False, unicode=False):
"""Register adapter/typecaster for dict/hstore reading/writing.
The adapter must be registered on a connection or cursor as the hstore
oid is different in every database.
The function must receive a connection or cursor as the :sql:`hstore` oid
is different in every database. The typecaster will be registered only on
the connection or cursor passed as argument. If your application uses a
single database you can pass *globally*=True to have hstore registered on
all the connections.
Raise `~psycopg2.ProgrammingError` if hstore is not installed in the
target database.
@ -593,30 +629,7 @@ def register_hstore(conn_or_curs, unicode=False):
By default the returned dicts have string keys and values: use
*unicode*=True to return `unicode` objects instead.
"""
if hasattr(conn_or_curs, 'execute'):
conn = conn_or_curs.connection
curs = conn_or_curs
else:
conn = conn_or_curs
curs = conn_or_curs.cursor()
# Store the transaction status of the connection to revert it after use
conn_status = conn.status
# get the oid for the hstore
curs.execute("""\
SELECT t.oid, typarray
FROM pg_type t JOIN pg_namespace ns
ON typnamespace = ns.oid
WHERE typname = 'hstore' and nspname = 'public';
""")
oids = curs.fetchone()
# revert the status of the connection as before the command
if (conn_status != _ext.STATUS_IN_TRANSACTION
and conn.isolation_level != _ext.ISOLATION_LEVEL_AUTOCOMMIT):
conn.rollback()
oids = HstoreAdapter.get_oids(conn_or_curs)
if oids is None:
raise psycopg2.ProgrammingError(
"hstore type not found in the database. "
@ -629,7 +642,7 @@ WHERE typname = 'hstore' and nspname = 'public';
cast = HstoreAdapter.parse
HSTORE = _ext.new_type((oids[0],), "HSTORE", cast)
_ext.register_type(HSTORE, conn_or_curs)
_ext.register_type(HSTORE, not globally and conn_or_curs or None)
_ext.register_adapter(dict, HstoreAdapter)

View File

@ -222,6 +222,27 @@ class HstoreTestCase(unittest.TestCase):
self.assert_(isinstance(t[2].keys()[0], unicode))
self.assert_(isinstance(t[2].values()[0], unicode))
def test_register_globally(self):
from psycopg2.extras import register_hstore, HstoreAdapter
oids = HstoreAdapter.get_oids(self.conn)
try:
register_hstore(self.conn, globally=True)
conn2 = psycopg2.connect(self.conn.dsn)
cur2 = self.conn.cursor()
cur2.execute("select 'a => b'::hstore")
r = cur2.fetchone()
self.assert_(isinstance(r[0], dict))
conn2.close()
finally:
psycopg2.extensions.string_types.pop(oids[0])
# verify the caster is not around anymore
cur = self.conn.cursor()
cur.execute("select 'a => b'::hstore")
r = cur.fetchone()
self.assert_(isinstance(r[0], str))
def test_roundtrip(self):
from psycopg2.extras import register_hstore
register_hstore(self.conn)