Added json typecaster

This commit is contained in:
Daniele Varrazzo 2012-09-19 04:26:35 +01:00
parent b8e7f02256
commit 024f0dbada
5 changed files with 301 additions and 55 deletions

View File

@ -153,6 +153,8 @@ versions the `simplejson`_ module is be used if available. Note that the last
.. autoclass:: Json
.. autofunction:: register_json
.. _adapt-hstore:

187
lib/_json.py Normal file
View File

@ -0,0 +1,187 @@
"""Implementation of the JSON adaptation objects
This module exists to avoid a circular import problem: pyscopg2.extras depends
on psycopg2.extension, so I can't create the default JSON typecasters in
extensions importing register_json from extras.
"""
# psycopg/extras.py - miscellaneous extra goodies for psycopg
#
# Copyright (C) 2012 Daniele Varrazzo <daniele.varrazzo@gmail.com>
#
# psycopg2 is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# In addition, as a special exception, the copyright holders give
# permission to link this program with the OpenSSL library (or with
# modified versions of OpenSSL that use the same license as OpenSSL),
# and distribute linked combinations including the two.
#
# You must obey the GNU Lesser General Public License in all respects for
# all of the code used other than OpenSSL.
#
# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
import sys
from psycopg2._psycopg import ISQLQuote, QuotedString
from psycopg2._psycopg import new_type, new_array_type, register_type
# import the best json implementation available
if sys.version_info[:2] >= (2,6):
import json
else:
try:
import simplejson as json
except ImportError:
json = None
class Json(object):
"""A wrapper to adapt a Python object to :sql:`json` data type.
`!Json` can be used to wrap any object supported by the underlying
`!json` module. Raise `!ImportError` if no module is available.
Any keyword argument will be passed to the underlying
:py:func:`json.dumps()` function, allowing extension and customization. ::
curs.execute("insert into mytable (jsondata) values (%s)",
(Json({'a': 100}),))
.. note::
You can use `~psycopg2.extensions.register_adapter()` to adapt Python
dictionaries to JSON::
psycopg2.extensions.register_adapter(dict,
psycopg2.extras.Json)
This setting is global though, so it is not compatible with the use of
`register_hstore()`. Any other object supported by the `!json` library
used by Psycopg can be registered the same way, but this will clobber
the default adaptation rule, so be careful to unwanted side effects.
"""
def __init__(self, adapted, **kwargs):
self.adapted = adapted
self.kwargs = kwargs
def __conform__(self, proto):
if proto is ISQLQuote:
return self
def getquoted(self):
s = json.dumps(self.adapted, **self.kwargs)
return QuotedString(s).getquoted()
# clobber the above class if json is not available
if json is None:
class Json(Json):
def __init__(self, adapted):
raise ImportError("no json module available")
def register_json(conn_or_curs, globally=False, loads=None,
oid=None, array_oid=None):
"""Create and register typecasters converting :sql:`json` type to Python objects.
:param conn_or_curs: a connection or cursor used to find the :sql:`json`
and :sql:`json[]` oids; the typecasters are registered in a scope
limited to this object, unless *globally* is set to `!True`. It can be
`!None` if the oids are provided
:param globally: if `!False` register the typecasters only on
*conn_or_curs*, otherwise register them globally
:param loads: the function used to parse the data into a Python object. If
`!None` use `!json.loads()`, where `!json` is the module chosen
according to the Python version (see above)
:param oid: the OID of the :sql:`json` type if known; If not, it will be
queried on *conn_or_curs*
:param array_oid: the OID of the :sql:`json[]` array type if known;
if not, it will be queried on *conn_or_curs*
Using the function is required to convert :sql:`json` data in PostgreSQL
versions before 9.2. Since 9.2 the oids are hardcoded so a default
typecaster is already registered. The :sql:`json` type is available as
`extension for PostgreSQL 9.1`__.
.. __: http://people.planetpostgresql.org/andrew/index.php?/archives/255-JSON-for-PG-9.2-...-and-now-for-9.1!.html
Another use of the function is to adapt :sql:`json` using a customized
load function. For example, if you want to convert the float values in the
:sql:`json` into :py:class:`~decimal.Decimal` you can use::
loads = lambda x: json.loads(x, parse_float=Decimal)
psycopg2.extras.register_json(conn, loads=loads)
The connection or cursor passed to the function will be used to query the
database and look for the OID of the :sql:`json` type. No query is
performed if *oid* and *array_oid* are provided. Raise
`~psycopg2.ProgrammingError` if the type is not found.
"""
if oid is None:
oid, array_oid = _get_json_oids(conn_or_curs)
JSON, JSONARRAY = create_json_typecasters(oid, array_oid, loads)
register_type(JSON, not globally and conn_or_curs or None)
if JSONARRAY is not None:
register_type(JSONARRAY, not globally and conn_or_curs or None)
return JSON, JSONARRAY
def create_json_typecasters(oid, array_oid, loads=None):
"""Create typecasters for json data type."""
if loads is None:
if json is None:
raise ImportError("no json module available")
else:
loads = json.loads
def typecast_json(s, cur):
return loads(s)
JSON = new_type((oid, ), 'JSON', typecast_json)
JSONARRAY = new_array_type((array_oid, ), "JSONARRAY", JSON)
return JSON, JSONARRAY
def _get_json_oids(conn_or_curs):
# lazy imports
from psycopg2.extensions import STATUS_IN_TRANSACTION
from psycopg2.extras import _solve_conn_curs
conn, curs = _solve_conn_curs(conn_or_curs)
# Store the transaction status of the connection to revert it after use
conn_status = conn.status
# column typarray not available before PG 8.3
typarray = conn.server_version >= 80300 and "typarray" or "NULL"
# get the oid for the hstore
curs.execute(
"SELECT t.oid, %s FROM pg_type t WHERE t.typname = 'json';"
% typarray)
r = curs.fetchone()
# revert the status of the connection as before the command
if (conn_status != STATUS_IN_TRANSACTION and not conn.autocommit):
conn.rollback()
if not r:
raise conn.ProgrammingError("json data type not found")
return r

View File

@ -150,6 +150,20 @@ class NoneAdapter(object):
return _null
# Create default json typecasters for PostgreSQL 9.2 oids
from psycopg2._json import create_json_typecasters
try:
JSON, JSONARRAY = create_json_typecasters(114, 199)
except ImportError:
pass
else:
register_type(JSON)
register_type(JSONARRAY)
del create_json_typecasters
# Add the "cleaned" version of the encodings to the key.
# When the encoding is set its name is cleaned up from - and _ and turned
# uppercase, so an encoding not respecting these rules wouldn't be found in the

View File

@ -573,6 +573,9 @@ def wait_select(conn):
def _solve_conn_curs(conn_or_curs):
"""Return the connection and a DBAPI cursor from a connection or cursor."""
if conn_or_curs is None:
raise psycopg2.ProgrammingError("no connection or cursor provided")
if hasattr(conn_or_curs, 'execute'):
conn = conn_or_curs.connection
curs = conn.cursor(cursor_factory=_cursor)
@ -742,7 +745,6 @@ def register_hstore(conn_or_curs, globally=False, unicode=False,
'hstore'::regtype::oid`. Analogously you can obtain a value for *array_oid*
using a query such as :sql:`SELECT 'hstore[]'::regtype::oid`.
Note that, when passing a dictionary from Python to the database, both
strings and unicode keys and values are supported. Dictionaries returned
from the database have keys/values according to the *unicode* parameter.
@ -967,59 +969,8 @@ def register_composite(name, conn_or_curs, globally=False):
return caster
# import the best json implementation available
if sys.version_info[:2] >= (2,6):
import json
else:
try:
import simplejson as json
except ImportError:
json = None
class Json(object):
"""A wrapper to adapt a Python object to :sql:`json` data type.
`!Json` can be used to wrap any object supported by the underlying
`!json` module. Any keyword argument will be passed to the
underlying :py:func:`json.dumps()` function, allowing extension and
customization. ::
curs.execute("insert into mytable (jsondata) values (%s)",
(Json({'a': 100}),))
.. note::
You can use `~psycopg2.extensions.register_adapter()` to adapt Python
dictionaries to JSON::
psycopg2.extensions.register_adapter(dict,
psycopg2.extras.Json)
This setting is global though, so it is not compatible with the use of
`register_hstore()`. Any other object supported by the `!json` library
used by Psycopg can be registered the same way, but this will clobber
the default adaptation rule, so be careful to unwanted side effects.
"""
def __init__(self, adapted, **kwargs):
self.adapted = adapted
self.kwargs = kwargs
def __conform__(self, proto):
if proto is _ext.ISQLQuote:
return self
def getquoted(self):
s = json.dumps(self.adapted, **self.kwargs)
return _ext.QuotedString(s).getquoted()
# clobber the above class if json is not available
if json is None:
class Json(Json):
def __init__(self, adapted):
raise ImportError("no json module available")
# expose the json adaptation stuff into the module
from psycopg2._json import json, Json, register_json
__all__ = filter(lambda k: not k.startswith('_'), locals().keys())

View File

@ -859,6 +859,98 @@ class JsonTestCase(unittest.TestCase):
del psycopg2.extensions.adapters[dict, psycopg2.extensions.ISQLQuote]
def test_type_not_available(self):
curs = self.conn.cursor()
curs.execute("select oid from pg_type where typname = 'json'")
if curs.fetchone():
return self.skipTest("json available in test database")
self.assertRaises(psycopg2.ProgrammingError,
psycopg2.extras.register_json, self.conn)
@skip_if_no_json_module
@skip_before_postgres(9, 2)
def test_default_cast(self):
curs = self.conn.cursor()
curs.execute("""select '{"a": 100.0, "b": null}'::json""")
self.assertEqual(curs.fetchone()[0], {'a': 100.0, 'b': None})
curs.execute("""select array['{"a": 100.0, "b": null}']::json[]""")
self.assertEqual(curs.fetchone()[0], [{'a': 100.0, 'b': None}])
@skip_if_no_json_module
@skip_if_no_json_type
def test_register_on_connection(self):
psycopg2.extras.register_json(self.conn)
curs = self.conn.cursor()
curs.execute("""select '{"a": 100.0, "b": null}'::json""")
self.assertEqual(curs.fetchone()[0], {'a': 100.0, 'b': None})
@skip_if_no_json_module
@skip_if_no_json_type
def test_register_on_cursor(self):
curs = self.conn.cursor()
psycopg2.extras.register_json(curs)
curs.execute("""select '{"a": 100.0, "b": null}'::json""")
self.assertEqual(curs.fetchone()[0], {'a': 100.0, 'b': None})
@skip_if_no_json_module
@skip_if_no_json_type
def test_register_globally(self):
old = psycopg2.extensions.string_types.get(114)
olda = psycopg2.extensions.string_types.get(199)
try:
new, newa = psycopg2.extras.register_json(self.conn, globally=True)
curs = self.conn.cursor()
curs.execute("""select '{"a": 100.0, "b": null}'::json""")
self.assertEqual(curs.fetchone()[0], {'a': 100.0, 'b': None})
finally:
psycopg2.extensions.string_types.pop(new.values[0])
psycopg2.extensions.string_types.pop(newa.values[0])
if old:
psycopg2.extensions.register_type(old)
if olda:
psycopg2.extensions.register_type(olda)
@skip_if_no_json_module
@skip_if_no_json_type
def test_loads(self):
json = psycopg2.extras.json
loads = lambda x: json.loads(x, parse_float=Decimal)
psycopg2.extras.register_json(self.conn, loads=loads)
curs = self.conn.cursor()
curs.execute("""select '{"a": 100.0, "b": null}'::json""")
data = curs.fetchone()[0]
self.assert_(isinstance(data['a'], Decimal))
self.assertEqual(data['a'], Decimal('100.0'))
@skip_if_no_json_module
@skip_if_no_json_type
def test_no_conn_curs(self):
from psycopg2._json import _get_json_oids
oid, array_oid = _get_json_oids(self.conn)
old = psycopg2.extensions.string_types.get(114)
olda = psycopg2.extensions.string_types.get(199)
loads = lambda x: psycopg2.extras.json.loads(x, parse_float=Decimal)
try:
new, newa = psycopg2.extras.register_json(None,
loads=loads, oid=oid, array_oid=array_oid)
curs = self.conn.cursor()
curs.execute("""select '{"a": 100.0, "b": null}'::json""")
data = curs.fetchone()[0]
self.assert_(isinstance(data['a'], Decimal))
self.assertEqual(data['a'], Decimal('100.0'))
finally:
psycopg2.extensions.string_types.pop(new.values[0])
psycopg2.extensions.string_types.pop(newa.values[0])
if old:
psycopg2.extensions.register_type(old)
if olda:
psycopg2.extensions.register_type(olda)
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)