Added support for arrays of hstores

This commit is contained in:
Daniele Varrazzo 2011-09-22 19:53:21 +02:00
parent c4e6d7d982
commit 2f9ceeac64
3 changed files with 84 additions and 6 deletions

1
NEWS
View File

@ -3,6 +3,7 @@ What's new in psycopg 2.4.3
- Added 'new_array_type()' function for easy creation of array
typecasters.
- Added support for arrays of hstores (ticket #66).
- Fixed segfault in case of transaction started with connection lost
(and possibly other events).
- Rollback connections in transaction or in error before putting them

View File

@ -699,7 +699,8 @@ WHERE typname = 'hstore';
return tuple(rv0), tuple(rv1)
def register_hstore(conn_or_curs, globally=False, unicode=False, oid=None):
def register_hstore(conn_or_curs, globally=False, unicode=False,
oid=None, array_oid=None):
"""Register adapter and typecaster for `!dict`\-\ |hstore| conversions.
:param conn_or_curs: a connection or cursor: the typecaster will be
@ -709,14 +710,18 @@ def register_hstore(conn_or_curs, globally=False, unicode=False, oid=None):
will be `!unicode` instead of `!str`. The option is not available on
Python 3
:param oid: the OID of the |hstore| type if known. If not, it will be
queried on *conn_or_curs*
queried on *conn_or_curs*.
:param array_oid: the OID of the |hstore| array type if known. If not, it
will be queried on *conn_or_curs*.
The connection or cursor passed to the function will be used to query the
database and look for the OID of the |hstore| type (which may be different
across databases). If querying is not desirable (e.g. with
:ref:`asynchronous connections <async-support>`) you may specify it in the
*oid* parameter (it can be found using a query such as :sql:`SELECT
'hstore'::regtype::oid;`).
*oid* parameter, which can be found using a query such as :sql:`SELECT
'hstore'::regtype::oid`. Analogously you can obtain a value for *array_oid*
using a query such as :sql:`SELECT 'hstore[]'::regtype::oid`.
Note that, when passing a dictionary from Python to the database, both
strings and unicode keys and values are supported. Dictionaries returned
@ -730,6 +735,10 @@ def register_hstore(conn_or_curs, globally=False, unicode=False, oid=None):
added the *oid* parameter. If not specified, the typecaster is
installed also if |hstore| is not installed in the :sql:`public`
schema.
.. versionchanged:: 2.4.3
added support for |hstore| array.
"""
if oid is None:
oid = HstoreAdapter.get_oids(conn_or_curs)
@ -738,11 +747,18 @@ def register_hstore(conn_or_curs, globally=False, unicode=False, oid=None):
"hstore type not found in the database. "
"please install it from your 'contrib/hstore.sql' file")
else:
oid = oid[0] # for the moment we don't have a HSTOREARRAY
array_oid = oid[1]
oid = oid[0]
if isinstance(oid, int):
oid = (oid,)
if array_oid is not None:
if isinstance(array_oid, int):
array_oid = (array_oid,)
else:
array_oid = tuple([x for x in array_oid if x])
# create and register the typecaster
if sys.version_info[0] < 3 and unicode:
cast = HstoreAdapter.parse_unicode
@ -753,6 +769,10 @@ def register_hstore(conn_or_curs, globally=False, unicode=False, oid=None):
_ext.register_type(HSTORE, not globally and conn_or_curs or None)
_ext.register_adapter(dict, HstoreAdapter)
if array_oid:
HSTOREARRAY = _ext.new_array_type(array_oid, "HSTOREARRAY", HSTORE)
_ext.register_type(HSTOREARRAY, not globally and conn_or_curs or None)
class CompositeCaster(object):
"""Helps conversion of a PostgreSQL composite type into a Python object.

View File

@ -22,7 +22,7 @@ import re
import sys
from datetime import date
from testutils import unittest, skip_if_no_uuid
from testutils import unittest, skip_if_no_uuid, skip_before_postgres
import psycopg2
import psycopg2.extras
@ -357,6 +357,63 @@ class HstoreTestCase(unittest.TestCase):
finally:
psycopg2.extensions.string_types.pop(oid)
@skip_if_no_hstore
@skip_before_postgres(8, 3)
def test_roundtrip_array(self):
from psycopg2.extras import register_hstore
register_hstore(self.conn)
ds = []
ds.append({})
ds.append({'a': 'b', 'c': None})
ab = map(chr, range(32, 128))
ds.append(dict(zip(ab, ab)))
ds.append({''.join(ab): ''.join(ab)})
self.conn.set_client_encoding('latin1')
if sys.version_info[0] < 3:
ab = map(chr, range(32, 127) + range(160, 255))
else:
ab = bytes(range(32, 127) + range(160, 255)).decode('latin1')
ds.append({''.join(ab): ''.join(ab)})
ds.append(dict(zip(ab, ab)))
cur = self.conn.cursor()
cur.execute("select %s", (ds,))
ds1 = cur.fetchone()[0]
self.assertEqual(ds, ds1)
@skip_if_no_hstore
@skip_before_postgres(8, 3)
def test_array_cast(self):
from psycopg2.extras import register_hstore
register_hstore(self.conn)
cur = self.conn.cursor()
cur.execute("select array['a=>1'::hstore, 'b=>2'::hstore];")
a = cur.fetchone()[0]
self.assertEqual(a, [{'a': '1'}, {'b': '2'}])
@skip_if_no_hstore
def test_array_cast_oid(self):
cur = self.conn.cursor()
cur.execute("select 'hstore'::regtype::oid, 'hstore[]'::regtype::oid")
oid, aoid = cur.fetchone()
from psycopg2.extras import register_hstore
register_hstore(None, globally=True, oid=oid, array_oid=aoid)
try:
cur.execute("select null::hstore, ''::hstore, 'a => b'::hstore, '{a=>b}'::hstore[]")
t = cur.fetchone()
self.assert_(t[0] is None)
self.assertEqual(t[1], {})
self.assertEqual(t[2], {'a': 'b'})
self.assertEqual(t[3], [{'a': 'b'}])
finally:
psycopg2.extensions.string_types.pop(oid)
psycopg2.extensions.string_types.pop(aoid)
def skip_if_no_composite(f):
def skip_if_no_composite_(self):