From 2cf35b69de22a1f1532e9050d35634cb4fab87d5 Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Thu, 15 Dec 2011 20:11:17 +0000 Subject: [PATCH] 'register_composite()' also works with tables Skip dropped and hidden columns when inspecting the schema. --- doc/src/extras.rst | 5 +++-- lib/extras.py | 3 ++- tests/test_types_extras.py | 45 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 50 insertions(+), 3 deletions(-) diff --git a/doc/src/extras.rst b/doc/src/extras.rst index 5f85d616..0511c23f 100644 --- a/doc/src/extras.rst +++ b/doc/src/extras.rst @@ -168,8 +168,9 @@ Composite types casting .. versionadded:: 2.4 Using `register_composite()` it is possible to cast a PostgreSQL composite -type (e.g. created with |CREATE TYPE|_ command) into a Python named tuple, or -into a regular tuple if :py:func:`collections.namedtuple` is not found. +type (either created with the |CREATE TYPE|_ command or implicitly defined +after a table row type) into a Python named tuple, or into a regular tuple if +:py:func:`collections.namedtuple` is not found. .. |CREATE TYPE| replace:: :sql:`CREATE TYPE` .. _CREATE TYPE: http://www.postgresql.org/docs/9.0/static/sql-createtype.html diff --git a/lib/extras.py b/lib/extras.py index 965023d8..a6fcbc04 100644 --- a/lib/extras.py +++ b/lib/extras.py @@ -913,7 +913,8 @@ SELECT t.oid, %s, attname, atttypid FROM pg_type t JOIN pg_namespace ns ON typnamespace = ns.oid JOIN pg_attribute a ON attrelid = typrelid -WHERE typname = %%s and nspname = %%s +WHERE typname = %%s AND nspname = %%s + AND attnum > 0 AND NOT attisdropped ORDER BY attnum; """ % typarray, (tname, schema)) diff --git a/tests/test_types_extras.py b/tests/test_types_extras.py index b77e7acd..29a935ee 100755 --- a/tests/test_types_extras.py +++ b/tests/test_types_extras.py @@ -658,6 +658,51 @@ class AdaptTypeTestCase(unittest.TestCase): curs.execute("select (1,2)::type_ii") self.assertRaises(psycopg2.DataError, curs.fetchone) + @skip_if_no_composite + @skip_before_postgres(8, 4) + def test_from_tables(self): + curs = self.conn.cursor() + curs.execute("""create table ctest1 ( + id integer primary key, + temp int, + label varchar + );""") + + curs.execute("""alter table ctest1 drop temp;""") + + curs.execute("""create table ctest2 ( + id serial primary key, + label varchar, + test_id integer references ctest1(id) + );""") + + curs.execute("""insert into ctest1 (id, label) values + (1, 'test1'), + (2, 'test2');""") + curs.execute("""insert into ctest2 (label, test_id) values + ('testa', 1), + ('testb', 1), + ('testc', 2), + ('testd', 2);""") + + psycopg2.extras.register_composite("ctest1", curs) + psycopg2.extras.register_composite("ctest2", curs) + + curs.execute(""" + select ctest1, array_agg(ctest2) as test2s + from ( + select ctest1, ctest2 + from ctest1 inner join ctest2 on ctest1.id = ctest2.test_id + order by ctest1.id, ctest2.label + ) x group by ctest1;""") + + r = curs.fetchone() + self.assertEqual(r[0], (1, 'test1')) + self.assertEqual(r[1], [(1, 'testa', 1), (2, 'testb', 1)]) + r = curs.fetchone() + self.assertEqual(r[0], (2, 'test2')) + self.assertEqual(r[1], [(3, 'testc', 2), (4, 'testd', 2)]) + def _create_type(self, name, fields): curs = self.conn.cursor() try: