mirror of
synced 2025-03-03 07:45:45 +03:00
Merge branch 'json' into devel
This commit is contained in:
@ -1,6 +1,7 @@
What's new in psycopg 2.4.6
- Added JSON adaptation.
- Added support for backward scrollable cursors. Thanks to Jon Nelson
for the initial patch (ticket #108).
- Added a simple way to customize casting of composite types into Python
@ -128,6 +128,94 @@ Additional data types
.. _adapt-json:
.. index::
pair: JSON; Data types
pair: JSON; Adaptation
JSON_ adaptation
.. versionadded:: 2.4.6
Psycopg can adapt Python objects to and from the PostgreSQL |pgjson|_ type.
With PostgreSQL 9.2 adaptation is available out-of-the-box. To use JSON data
with previous database versions (either with the `9.1 json extension`__, but
even if you want to convert text fields to JSON) you can use
.. __: http://people.planetpostgresql.org/andrew/index.php?/archives/255-JSON-for-PG-9.2-...-and-now-for-9.1!.html
The Python library used to convert Python objects to JSON depends on the
language version: with Python 2.6 and following the :py:mod:`json` module from
the standard library is used; with previous versions the `simplejson`_ module
is used if available. Note that the last `!simplejson` version supporting
Python 2.4 is the 2.0.9.
.. _JSON: http://www.json.org/
.. |pgjson| replace:: :sql:`json`
.. _pgjson: http://www.postgresql.org/docs/current/static/datatype-json.html
.. _simplejson: http://pypi.python.org/pypi/simplejson/
In order to pass a Python object to the database as query argument you can use
the `Json` adapter::
curs.execute("insert into mytable (jsondata) values (%s)",
[Json({'a': 100})])
Reading from the database, |pgjson| values will be automatically converted to
Python objects.
.. note::
You can use `~psycopg2.extensions.register_adapter()` to adapt any Python
dictionary to JSON, either registering `Json` or any subclass or factory
creating a compatible adapter::
psycopg2.extensions.register_adapter(dict, psycopg2.extras.Json)
This setting is global though, so it is not compatible with similar
adapters such as the one registered by `register_hstore()`. Any other
object supported by JSON can be registered the same way, but this will
clobber the default adaptation rule, so be careful to unwanted side
If you want to customize the adaptation from Python to PostgreSQL you can
either provide a custom `!dumps()` function to `!Json`::
curs.execute("insert into mytable (jsondata) values (%s)",
[Json({'a': 100}, dumps=simplejson.dumps)])
or you can subclass it overriding the `~Json.dumps()` method::
class MyJson(Json):
def dumps(self, obj):
return simplejson.dumps(obj)
curs.execute("insert into mytable (jsondata) values (%s)",
[MyJson({'a': 100})])
Customizing the conversion from PostgreSQL to Python can be done passing a
custom `!loads()` function to `register_json()` (or `register_default_json()`
for PostgreSQL 9.2). For example, if you want to convert the float values
from :sql:`json` into :py:class:`~decimal.Decimal` you can use::
loads = lambda x: json.loads(x, parse_float=Decimal)
psycopg2.extras.register_json(conn, loads=loads)
.. autoclass:: Json
.. automethod:: dumps
.. autofunction:: register_json
.. autofunction:: register_default_json
.. _adapt-hstore:
.. index::
Normal file
Normal file
@ -0,0 +1,194 @@
"""Implementation of the JSON adaptation objects
This module exists to avoid a circular import problem: pyscopg2.extras depends
on psycopg2.extension, so I can't create the default JSON typecasters in
extensions importing register_json from extras.
# psycopg/_json.py - Implementation of the JSON adaptation objects
# Copyright (C) 2012 Daniele Varrazzo <daniele.varrazzo@gmail.com>
# psycopg2 is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# In addition, as a special exception, the copyright holders give
# permission to link this program with the OpenSSL library (or with
# modified versions of OpenSSL that use the same license as OpenSSL),
# and distribute linked combinations including the two.
# You must obey the GNU Lesser General Public License in all respects for
# all of the code used other than OpenSSL.
# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# License for more details.
import sys
from psycopg2._psycopg import ISQLQuote, QuotedString
from psycopg2._psycopg import new_type, new_array_type, register_type
# import the best json implementation available
if sys.version_info[:2] >= (2,6):
import json
import simplejson as json
except ImportError:
json = None
# oids from PostgreSQL 9.2
JSON_OID = 114
class Json(object):
An `~psycopg2.extensions.ISQLQuote` wrapper to adapt a Python object to
:sql:`json` data type.
`!Json` can be used to wrap any object supported by the provided *dumps*
function. If none is provided, the standard :py:func:`json.dumps()` is
used (`!simplejson` for Python < 2.6;
`~psycopg2.extensions.ISQLQuote.getquoted()` will raise `!ImportError` if
the module is not available).
def __init__(self, adapted, dumps=None):
self.adapted = adapted
if dumps is not None:
self._dumps = dumps
elif json is not None:
self._dumps = json.dumps
self._dumps = None
def __conform__(self, proto):
if proto is ISQLQuote:
return self
def dumps(self, obj):
"""Serialize *obj* in JSON format.
The default is to call `!json.dumps()` or the *dumps* function
provided in the constructor. You can override this method to create a
customized JSON wrapper.
dumps = self._dumps
if dumps is not None:
return dumps(obj)
raise ImportError(
"json module not available: "
"you should provide a dumps function")
def getquoted(self):
s = self.dumps(self.adapted)
return QuotedString(s).getquoted()
def register_json(conn_or_curs=None, globally=False, loads=None,
oid=None, array_oid=None):
"""Create and register typecasters converting :sql:`json` type to Python objects.
:param conn_or_curs: a connection or cursor used to find the :sql:`json`
and :sql:`json[]` oids; the typecasters are registered in a scope
limited to this object, unless *globally* is set to `!True`. It can be
`!None` if the oids are provided
:param globally: if `!False` register the typecasters only on
*conn_or_curs*, otherwise register them globally
:param loads: the function used to parse the data into a Python object. If
`!None` use `!json.loads()`, where `!json` is the module chosen
according to the Python version (see above)
:param oid: the OID of the :sql:`json` type if known; If not, it will be
queried on *conn_or_curs*
:param array_oid: the OID of the :sql:`json[]` array type if known;
if not, it will be queried on *conn_or_curs*
The connection or cursor passed to the function will be used to query the
database and look for the OID of the :sql:`json` type. No query is
performed if *oid* and *array_oid* are provided. Raise
`~psycopg2.ProgrammingError` if the type is not found.
if oid is None:
oid, array_oid = _get_json_oids(conn_or_curs)
JSON, JSONARRAY = _create_json_typecasters(oid, array_oid, loads)
register_type(JSON, not globally and conn_or_curs or None)
if JSONARRAY is not None:
register_type(JSONARRAY, not globally and conn_or_curs or None)
def register_default_json(conn_or_curs=None, globally=False, loads=None):
Create and register :sql:`json` typecasters for PostgreSQL 9.2 and following.
Since PostgreSQL 9.2 :sql:`json` is a builtin type, hence its oid is known
and fixed. This function allows specifying a customized *loads* function
for the default :sql:`json` type without querying the database.
All the parameters have the same meaning of `register_json()`.
return register_json(conn_or_curs=conn_or_curs, globally=globally,
loads=loads, oid=JSON_OID, array_oid=JSONARRAY_OID)
def _create_json_typecasters(oid, array_oid, loads=None):
"""Create typecasters for json data type."""
if loads is None:
if json is None:
raise ImportError("no json module available")
loads = json.loads
def typecast_json(s, cur):
if s is None:
return None
return loads(s)
JSON = new_type((oid, ), 'JSON', typecast_json)
if array_oid is not None:
JSONARRAY = new_array_type((array_oid, ), "JSONARRAY", JSON)
def _get_json_oids(conn_or_curs):
# lazy imports
from psycopg2.extensions import STATUS_IN_TRANSACTION
from psycopg2.extras import _solve_conn_curs
conn, curs = _solve_conn_curs(conn_or_curs)
# Store the transaction status of the connection to revert it after use
conn_status = conn.status
# column typarray not available before PG 8.3
typarray = conn.server_version >= 80300 and "typarray" or "NULL"
# get the oid for the hstore
"SELECT t.oid, %s FROM pg_type t WHERE t.typname = 'json';"
% typarray)
r = curs.fetchone()
# revert the status of the connection as before the command
if (conn_status != STATUS_IN_TRANSACTION and not conn.autocommit):
if not r:
raise conn.ProgrammingError("json data type not found")
return r
@ -151,6 +151,17 @@ class NoneAdapter(object):
return _null
# Create default json typecasters for PostgreSQL 9.2 oids
from psycopg2._json import register_default_json
JSON, JSONARRAY = register_default_json()
except ImportError:
del register_default_json
# Add the "cleaned" version of the encodings to the key.
# When the encoding is set its name is cleaned up from - and _ and turned
# uppercase, so an encoding not respecting these rules wouldn't be found in the
@ -573,6 +573,9 @@ def wait_select(conn):
def _solve_conn_curs(conn_or_curs):
"""Return the connection and a DBAPI cursor from a connection or cursor."""
if conn_or_curs is None:
raise psycopg2.ProgrammingError("no connection or cursor provided")
if hasattr(conn_or_curs, 'execute'):
conn = conn_or_curs.connection
curs = conn.cursor(cursor_factory=_cursor)
@ -946,4 +949,8 @@ def register_composite(name, conn_or_curs, globally=False, factory=None):
return caster
# expose the json adaptation stuff into the module
from psycopg2._json import json, Json, register_json, register_default_json
__all__ = filter(lambda k: not k.startswith('_'), locals().keys())
@ -14,12 +14,9 @@
# License for more details.
import decimal
import re
import sys
from decimal import Decimal
from datetime import date
from testutils import unittest, skip_if_no_uuid, skip_before_postgres
@ -811,6 +808,253 @@ class AdaptTypeTestCase(unittest.TestCase):
return oid
def skip_if_json_module(f):
"""Skip a test if no Python json module is available"""
def skip_if_json_module_(self):
if psycopg2.extras.json is not None:
return self.skipTest("json module is available")
return f(self)
return skip_if_json_module_
def skip_if_no_json_module(f):
"""Skip a test if no Python json module is available"""
def skip_if_no_json_module_(self):
if psycopg2.extras.json is None:
return self.skipTest("json module not available")
return f(self)
return skip_if_no_json_module_
def skip_if_no_json_type(f):
"""Skip a test if PostgreSQL json type is not available"""
def skip_if_no_json_type_(self):
curs = self.conn.cursor()
curs.execute("select oid from pg_type where typname = 'json'")
if not curs.fetchone():
return self.skipTest("json not available in test database")
return f(self)
return skip_if_no_json_type_
class JsonTestCase(unittest.TestCase):
def setUp(self):
self.conn = psycopg2.connect(dsn)
def tearDown(self):
def test_module_not_available(self):
from psycopg2.extras import Json
self.assertRaises(ImportError, Json(None).getquoted)
def test_customizable_with_module_not_available(self):
from psycopg2.extras import Json
class MyJson(Json):
def dumps(self, obj):
assert obj is None
return "hi"
self.assertEqual(MyJson(None).getquoted(), "'hi'")
def test_adapt(self):
from psycopg2.extras import json, Json
objs = [None, "te'xt", 123, 123.45,
u'\xe0\u20ac', ['a', 100], {'a': 100} ]
curs = self.conn.cursor()
for obj in enumerate(objs):
self.assertEqual(curs.mogrify("%s", (Json(obj),)),
def test_adapt_dumps(self):
from psycopg2.extras import json, Json
class DecimalEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, Decimal):
return float(obj)
return json.JSONEncoder.default(self, obj)
curs = self.conn.cursor()
obj = Decimal('123.45')
dumps = lambda obj: json.dumps(obj, cls=DecimalEncoder)
self.assertEqual(curs.mogrify("%s", (Json(obj, dumps=dumps),)),
def test_adapt_subclass(self):
from psycopg2.extras import json, Json
class DecimalEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, Decimal):
return float(obj)
return json.JSONEncoder.default(self, obj)
class MyJson(Json):
def dumps(self, obj):
return json.dumps(obj, cls=DecimalEncoder)
curs = self.conn.cursor()
obj = Decimal('123.45')
self.assertEqual(curs.mogrify("%s", (MyJson(obj),)),
def test_register_on_dict(self):
from psycopg2.extras import Json
psycopg2.extensions.register_adapter(dict, Json)
curs = self.conn.cursor()
obj = {'a': 123}
self.assertEqual(curs.mogrify("%s", (obj,)),
b("""'{"a": 123}'"""))
del psycopg2.extensions.adapters[dict, psycopg2.extensions.ISQLQuote]
def test_type_not_available(self):
curs = self.conn.cursor()
curs.execute("select oid from pg_type where typname = 'json'")
if curs.fetchone():
return self.skipTest("json available in test database")
psycopg2.extras.register_json, self.conn)
@skip_before_postgres(9, 2)
def test_default_cast(self):
curs = self.conn.cursor()
curs.execute("""select '{"a": 100.0, "b": null}'::json""")
self.assertEqual(curs.fetchone()[0], {'a': 100.0, 'b': None})
curs.execute("""select array['{"a": 100.0, "b": null}']::json[]""")
self.assertEqual(curs.fetchone()[0], [{'a': 100.0, 'b': None}])
def test_register_on_connection(self):
curs = self.conn.cursor()
curs.execute("""select '{"a": 100.0, "b": null}'::json""")
self.assertEqual(curs.fetchone()[0], {'a': 100.0, 'b': None})
def test_register_on_cursor(self):
curs = self.conn.cursor()
curs.execute("""select '{"a": 100.0, "b": null}'::json""")
self.assertEqual(curs.fetchone()[0], {'a': 100.0, 'b': None})
def test_register_globally(self):
old = psycopg2.extensions.string_types.get(114)
olda = psycopg2.extensions.string_types.get(199)
new, newa = psycopg2.extras.register_json(self.conn, globally=True)
curs = self.conn.cursor()
curs.execute("""select '{"a": 100.0, "b": null}'::json""")
self.assertEqual(curs.fetchone()[0], {'a': 100.0, 'b': None})
if old:
if olda:
def test_loads(self):
json = psycopg2.extras.json
loads = lambda x: json.loads(x, parse_float=Decimal)
psycopg2.extras.register_json(self.conn, loads=loads)
curs = self.conn.cursor()
curs.execute("""select '{"a": 100.0, "b": null}'::json""")
data = curs.fetchone()[0]
self.assert_(isinstance(data['a'], Decimal))
self.assertEqual(data['a'], Decimal('100.0'))
def test_no_conn_curs(self):
from psycopg2._json import _get_json_oids
oid, array_oid = _get_json_oids(self.conn)
old = psycopg2.extensions.string_types.get(114)
olda = psycopg2.extensions.string_types.get(199)
loads = lambda x: psycopg2.extras.json.loads(x, parse_float=Decimal)
new, newa = psycopg2.extras.register_json(
loads=loads, oid=oid, array_oid=array_oid)
curs = self.conn.cursor()
curs.execute("""select '{"a": 100.0, "b": null}'::json""")
data = curs.fetchone()[0]
self.assert_(isinstance(data['a'], Decimal))
self.assertEqual(data['a'], Decimal('100.0'))
if old:
if olda:
@skip_before_postgres(9, 2)
def test_register_default(self):
curs = self.conn.cursor()
loads = lambda x: psycopg2.extras.json.loads(x, parse_float=Decimal)
psycopg2.extras.register_default_json(curs, loads=loads)
curs.execute("""select '{"a": 100.0, "b": null}'::json""")
data = curs.fetchone()[0]
self.assert_(isinstance(data['a'], Decimal))
self.assertEqual(data['a'], Decimal('100.0'))
curs.execute("""select array['{"a": 100.0, "b": null}']::json[]""")
data = curs.fetchone()[0]
self.assert_(isinstance(data[0]['a'], Decimal))
self.assertEqual(data[0]['a'], Decimal('100.0'))
def test_null(self):
curs = self.conn.cursor()
curs.execute("""select NULL::json""")
self.assertEqual(curs.fetchone()[0], None)
curs.execute("""select NULL::json[]""")
self.assertEqual(curs.fetchone()[0], None)
def test_no_array_oid(self):
curs = self.conn.cursor()
t1, t2 = psycopg2.extras.register_json(curs, oid=25)
self.assertEqual(t1.values[0], 25)
self.assertEqual(t2, None)
curs.execute("""select '{"a": 100.0, "b": null}'::text""")
data = curs.fetchone()[0]
self.assertEqual(data['a'], 100)
self.assertEqual(data['b'], None)
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)
Reference in New Issue
Block a user