Added PostgreSQL composite types typecaster to Python tuples.

This commit is contained in:
Daniele Varrazzo 2011-01-02 00:34:13 +01:00
parent 159cda3688
commit 7ac0bdd661
5 changed files with 362 additions and 1 deletions

View File

@ -1,6 +1,13 @@
What's new in psycopg 2.3.3
---------------------------
* New features:
- Added `register_composite()` function to cast PostgreSQL composite types
into Python tuples/namedtuples.
* Bug fixes:
- Fixed adaptation of None in composite types (ticket #26). Bug report by
Karsten Hilbert.

View File

@ -157,6 +157,55 @@ can be enabled using the `register_hstore()` function.
.. index::
pair: Composite types; Data types
pair: tuple; Adaptation
pair: namedtuple; Adaptation
Composite types casting
^^^^^^^^^^^^^^^^^^^^^^^
.. versionadded:: 2.3.3
Using `register_composite()` it is possible to cast a PostgreSQL composite
type (e.g. created with |CREATE TYPE|_ command) into a Python named tuple, or
into a regular tuple if `!collections.namedtuple()` is not found.
.. |CREATE TYPE| replace:: :sql:`CREATE TYPE`
.. _CREATE TYPE: http://www.postgresql.org/docs/9.0/static/sql-createtype.html
.. doctest::
>>> cur.execute("CREATE TYPE card AS (value int, suit text);")
>>> psycopg2.extras.register_composite('card', cur)
<psycopg2.extras.CompositeCaster object at 0x...>
>>> cur.execute("select (8, 'hearts')::card")
>>> cur.fetchone()[0]
card(value=8, suit='hearts')
Nested composite types are handled as expected, but the type of the composite
components must be registered as well.
.. doctest::
>>> cur.execute("CREATE TYPE card_back AS (face card, back text);")
>>> psycopg2.extras.register_composite('card_back', cur)
<psycopg2.extras.CompositeCaster object at 0x...>
>>> cur.execute("select ((8, 'hearts'), 'blue')::card_back")
>>> cur.fetchone()[0]
card_back(face=card(value=8, suit='hearts'), back='blue')
Adaptation from Python tuples to composite types is automatic instead and
requires no adapter registration.
.. autofunction:: register_composite
.. autoclass:: CompositeCaster
.. index::
pair: UUID; Data types

View File

@ -280,7 +280,7 @@ the SQL string that would be sent to the database.
single: IN operator
- Python tuples are converted in a syntax suitable for the SQL :sql:`IN`
operator::
operator and to represent a composite type::
>>> cur.mogrify("SELECT %s IN %s;", (10, (10, 20, 30)))
'SELECT 10 IN (10, 20, 30);'
@ -290,6 +290,10 @@ the SQL string that would be sent to the database.
SQL doesn't allow an empty list in the IN operator, so your code should
guard against empty tuples.
If you want PostgreSQL composite types to be converted into a Python
tuple/namedtuple you can use the `~psycopg2.extras.register_composite()`
function.
.. versionadded:: 2.0.6
the tuple :sql:`IN` adaptation.
@ -298,6 +302,10 @@ the SQL string that would be sent to the database.
was necessary to import the `~psycopg2.extensions` module to have it
registered.
.. versionchanged:: 2.3
named tuples are adapted like regular tuples and can thus be used to
represent composite types.
- Python dictionaries are converted into the |hstore|_ data type. See
`~psycopg2.extras.register_hstore()` for further details.
@ -307,6 +315,7 @@ the SQL string that would be sent to the database.
.. versionadded:: 2.3
the :sql:`hstore` adaptation.
.. index::
single: Unicode

View File

@ -731,4 +731,146 @@ def register_hstore(conn_or_curs, globally=False, unicode=False):
_ext.register_adapter(dict, HstoreAdapter)
class CompositeCaster(object):
"""Helps conversion of a PostgreSQL composite type into a Python object.
The class is usually created by the `register_composite()` function.
.. attribute:: name
The name of the PostgreSQL type.
.. attribute:: oid
The oid of the PostgreSQL type.
.. attribute:: type
The type of the Python objects returned. If `!collections.namedtuple()`
is available, it is a named tuple with attributes equal to the type
components. Otherwise it is just the `tuple` object.
.. attribute:: attnames
List of component names of the type to be casted.
.. attribute:: atttypes
List of component type oids of the type to be casted.
"""
def __init__(self, name, oid, attrs):
self.name = name
self.oid = oid
self.attnames = [ a[0] for a in attrs ]
self.atttypes = [ a[1] for a in attrs ]
self.type = self._create_type(name, self.attnames)
self.typecaster = _ext.new_type((oid,), name, self.parse)
def parse(self, s, curs):
if s is None:
return None
tokens = self.tokenize(s)
if len(tokens) != len(self.atttypes):
raise psycopg2.DataError(
"expecting %d components for the type %s, %d found instead",
(len(self.atttypes), self.name, len(self.tokens)))
attrs = [ curs.cast(oid, token)
for oid, token in zip(self.atttypes, tokens) ]
return self.type(*attrs)
_re_tokenize = regex.compile(r"""
\(? ([,\)]) # an empty token, representing NULL
| \(? " ((?: [^"] | "")*) " [,)] # or a quoted string
| \(? ([^",\)]+) [,\)] # or an unquoted string
""", regex.VERBOSE)
_re_undouble = regex.compile(r'(["\\])\1')
@classmethod
def tokenize(self, s):
rv = []
for m in self._re_tokenize.finditer(s):
if m is None:
raise psycopg2.InterfaceError("can't parse type: %r", s)
if m.group(1):
rv.append(None)
elif m.group(2):
rv.append(self._re_undouble.sub(r"\1", m.group(2)))
else:
rv.append(m.group(3))
return rv
def _create_type(self, name, attnames):
try:
from collections import namedtuple
except ImportError:
return tuple
else:
return namedtuple(name, attnames)
@classmethod
def _from_db(self, name, conn_or_curs):
"""Return a `CompositeCaster` instance for the type *name*.
Raise `ProgrammingError` if the type is not found.
"""
if hasattr(conn_or_curs, 'execute'):
conn = conn_or_curs.connection
curs = conn_or_curs
else:
conn = conn_or_curs
curs = conn_or_curs.cursor()
# Store the transaction status of the connection to revert it after use
conn_status = conn.status
# get the type oid and attributes
curs.execute("""\
SELECT t.oid, attname, atttypid
FROM pg_type t
JOIN pg_namespace ns ON typnamespace = ns.oid
JOIN pg_attribute a ON attrelid = typrelid
WHERE typname = %s and nspname = 'public';
""", (name, ))
recs = curs.fetchall()
# revert the status of the connection as before the command
if (conn_status != _ext.STATUS_IN_TRANSACTION
and conn.isolation_level != _ext.ISOLATION_LEVEL_AUTOCOMMIT):
conn.rollback()
if not recs:
raise psycopg2.ProgrammingError(
"PostgreSQL type '%s' not found" % name)
type_oid = recs[0][0]
type_attrs = [ (r[1], r[2]) for r in recs ]
return CompositeCaster(name, type_oid, type_attrs)
def register_composite(name, conn_or_curs, globally=False):
"""Register a typecaster to convert a composite type into a tuple.
:param name: the name of a PostgreSQL composite type, e.g. created using
the |CREATE TYPE|_ command
:param conn_or_curs: a connection or cursor used to find the type oid and
components; the typecaster is registered in a scope limited to this
object, unless *globally* is set to `True`
:param globally: if `False` (default) register the typecaster only on
*conn_or_curs*, otherwise register it globally
:return: the registered `CompositeCaster` instance responsible for the
conversion
"""
caster = CompositeCaster._from_db(name, conn_or_curs)
_ext.register_type(caster.typecaster, not globally and conn_or_curs or None)
return caster
__all__ = filter(lambda k: not k.startswith('_'), locals().keys())

View File

@ -20,6 +20,8 @@ except:
pass
import re
import sys
from datetime import date
from testutils import unittest
import psycopg2
@ -385,6 +387,158 @@ class AdaptTypeTestCase(unittest.TestCase):
finally:
ext.register_adapter(type(None), orig_adapter)
def test_tokenization(self):
from psycopg2.extras import CompositeCaster
def ok(s, v):
self.assertEqual(CompositeCaster.tokenize(s), v)
ok("(,)", [None, None])
ok('(hello,,10.234,2010-11-11)', ['hello', None, '10.234', '2010-11-11'])
ok('(10,"""")', ['10', '"'])
ok('(10,",")', ['10', ','])
ok(r'(10,"\\")', ['10', '\\'])
ok(r'''(10,"\\',""")''', ['10', '''\\',"'''])
ok('(10,"(20,""(30,40)"")")', ['10', '(20,"(30,40)")'])
ok('(10,"(20,""(30,""""(40,50)"""")"")")', ['10', '(20,"(30,""(40,50)"")")'])
ok('(,"(,""(a\nb\tc)"")")', [None, '(,"(a\nb\tc)")'])
ok('(\x01,\x02,\x03,\x04,\x05,\x06,\x07,\x08,"\t","\n","\x0b",'
'"\x0c","\r",\x0e,\x0f,\x10,\x11,\x12,\x13,\x14,\x15,\x16,'
'\x17,\x18,\x19,\x1a,\x1b,\x1c,\x1d,\x1e,\x1f," ",!,"""",#,'
'$,%,&,\',"(",")",*,+,",",-,.,/,0,1,2,3,4,5,6,7,8,9,:,;,<,=,>,?,'
'@,A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P,Q,R,S,T,U,V,W,X,Y,Z,[,"\\\\",],'
'^,_,`,a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t,u,v,w,x,y,z,{,|,},'
'~,\x7f)',
map(chr, range(1, 128)))
ok('(,"\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f'
'\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !'
'""#$%&\'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\\\]'
'^_`abcdefghijklmnopqrstuvwxyz{|}~\x7f")',
[None, ''.join(map(chr, range(1, 128)))])
def test_cast_composite(self):
oid = self._create_type("type_isd",
[('anint', 'integer'), ('astring', 'text'), ('adate', 'date')])
t = psycopg2.extras.register_composite("type_isd", self.conn)
self.assertEqual(t.name, 'type_isd')
self.assertEqual(t.oid, oid)
self.assert_(issubclass(t.type, tuple))
self.assertEqual(t.attnames, ['anint', 'astring', 'adate'])
self.assertEqual(t.atttypes, [23,25,1082])
curs = self.conn.cursor()
r = (10, 'hello', date(2011,1,2))
curs.execute("select %s::type_isd;", (r,))
v = curs.fetchone()[0]
self.assert_(isinstance(v, t.type))
self.assertEqual(v[0], 10)
self.assertEqual(v[1], "hello")
self.assertEqual(v[2], date(2011,1,2))
try:
from collections import namedtuple
except ImportError:
pass
else:
self.assert_(t.type is not tuple)
self.assertEqual(v.anint, 10)
self.assertEqual(v.astring, "hello")
self.assertEqual(v.adate, date(2011,1,2))
def test_cast_nested(self):
self._create_type("type_is",
[("anint", "integer"), ("astring", "text")])
self._create_type("type_r_dt",
[("adate", "date"), ("apair", "type_is")])
self._create_type("type_r_ft",
[("afloat", "float8"), ("anotherpair", "type_r_dt")])
psycopg2.extras.register_composite("type_is", self.conn)
psycopg2.extras.register_composite("type_r_dt", self.conn)
psycopg2.extras.register_composite("type_r_ft", self.conn)
curs = self.conn.cursor()
r = (0.25, (date(2011,1,2), (42, "hello")))
curs.execute("select %s::type_r_ft;", (r,))
v = curs.fetchone()[0]
self.assertEqual(r, v)
try:
from collections import namedtuple
except ImportError:
pass
else:
self.assertEqual(v.anotherpair.apair.astring, "hello")
def test_register_on_cursor(self):
self._create_type("type_ii", [("a", "integer"), ("b", "integer")])
curs1 = self.conn.cursor()
curs2 = self.conn.cursor()
psycopg2.extras.register_composite("type_ii", curs1)
curs1.execute("select (1,2)::type_ii")
self.assertEqual(curs1.fetchone()[0], (1,2))
curs2.execute("select (1,2)::type_ii")
self.assertEqual(curs2.fetchone()[0], "(1,2)")
def test_register_on_connection(self):
self._create_type("type_ii", [("a", "integer"), ("b", "integer")])
conn1 = psycopg2.connect(self.conn.dsn)
conn2 = psycopg2.connect(self.conn.dsn)
try:
psycopg2.extras.register_composite("type_ii", conn1)
curs1 = conn1.cursor()
curs2 = conn2.cursor()
curs1.execute("select (1,2)::type_ii")
self.assertEqual(curs1.fetchone()[0], (1,2))
curs2.execute("select (1,2)::type_ii")
self.assertEqual(curs2.fetchone()[0], "(1,2)")
finally:
conn1.close()
conn2.close()
def test_register_globally(self):
self._create_type("type_ii", [("a", "integer"), ("b", "integer")])
conn1 = psycopg2.connect(self.conn.dsn)
conn2 = psycopg2.connect(self.conn.dsn)
try:
t = psycopg2.extras.register_composite("type_ii", conn1, globally=True)
try:
curs1 = conn1.cursor()
curs2 = conn2.cursor()
curs1.execute("select (1,2)::type_ii")
self.assertEqual(curs1.fetchone()[0], (1,2))
curs2.execute("select (1,2)::type_ii")
self.assertEqual(curs2.fetchone()[0], (1,2))
finally:
del psycopg2.extensions.string_types[t.oid]
finally:
conn1.close()
conn2.close()
def _create_type(self, name, fields):
curs = self.conn.cursor()
try:
curs.execute("drop type %s cascade;" % name)
except psycopg2.ProgrammingError:
self.conn.rollback()
curs.execute("create type %s as (%s);" % (name,
", ".join(["%s %s" % p for p in fields])))
curs.execute("""\
SELECT t.oid
FROM pg_type t JOIN pg_namespace ns ON typnamespace = ns.oid
WHERE typname = %s and nspname = 'public';
""", (name,))
oid = curs.fetchone()[0]
self.conn.commit()
return oid
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)