Fixed mapping for composite types defined in a schema

This commit is contained in:
Daniele Varrazzo 2011-02-09 02:20:25 +01:00
parent 8ac5f0070a
commit 7a058403ca
2 changed files with 35 additions and 5 deletions

View File

@ -835,15 +835,22 @@ class CompositeCaster(object):
# Store the transaction status of the connection to revert it after use # Store the transaction status of the connection to revert it after use
conn_status = conn.status conn_status = conn.status
# Use the correct schema
if '.' in name:
schema, tname = name.split('.', 1)
else:
tname = name
schema = 'public'
# get the type oid and attributes # get the type oid and attributes
curs.execute("""\ curs.execute("""\
SELECT t.oid, attname, atttypid SELECT t.oid, attname, atttypid
FROM pg_type t FROM pg_type t
JOIN pg_namespace ns ON typnamespace = ns.oid JOIN pg_namespace ns ON typnamespace = ns.oid
JOIN pg_attribute a ON attrelid = typrelid JOIN pg_attribute a ON attrelid = typrelid
WHERE typname = %s and nspname = 'public' WHERE typname = %s and nspname = %s
ORDER BY attnum; ORDER BY attnum;
""", (name, )) """, (tname, schema))
recs = curs.fetchall() recs = curs.fetchall()
@ -859,7 +866,7 @@ ORDER BY attnum;
type_oid = recs[0][0] type_oid = recs[0][0]
type_attrs = [ (r[1], r[2]) for r in recs ] type_attrs = [ (r[1], r[2]) for r in recs ]
return CompositeCaster(name, type_oid, type_attrs) return CompositeCaster(tname, type_oid, type_attrs)
def register_composite(name, conn_or_curs, globally=False): def register_composite(name, conn_or_curs, globally=False):
"""Register a typecaster to convert a composite type into a tuple. """Register a typecaster to convert a composite type into a tuple.

View File

@ -525,6 +525,24 @@ class AdaptTypeTestCase(unittest.TestCase):
conn1.close() conn1.close()
conn2.close() conn2.close()
@skip_if_no_composite
def test_composite_namespace(self):
curs = self.conn.cursor()
curs.execute("""
select nspname from pg_namespace
where nspname = 'typens';
""")
if not curs.fetchone():
curs.execute("create schema typens;")
self.conn.commit()
self._create_type("typens.typens_ii",
[("a", "integer"), ("b", "integer")])
t = psycopg2.extras.register_composite(
"typens.typens_ii", self.conn)
curs.execute("select (4,8)::typens.typens_ii")
self.assertEqual(curs.fetchone()[0], (4,8))
def _create_type(self, name, fields): def _create_type(self, name, fields):
curs = self.conn.cursor() curs = self.conn.cursor()
try: try:
@ -534,11 +552,16 @@ class AdaptTypeTestCase(unittest.TestCase):
curs.execute("create type %s as (%s);" % (name, curs.execute("create type %s as (%s);" % (name,
", ".join(["%s %s" % p for p in fields]))) ", ".join(["%s %s" % p for p in fields])))
if '.' in name:
schema, name = name.split('.')
else:
schema = 'public'
curs.execute("""\ curs.execute("""\
SELECT t.oid SELECT t.oid
FROM pg_type t JOIN pg_namespace ns ON typnamespace = ns.oid FROM pg_type t JOIN pg_namespace ns ON typnamespace = ns.oid
WHERE typname = %s and nspname = 'public'; WHERE typname = %s and nspname = %s;
""", (name,)) """, (name, schema))
oid = curs.fetchone()[0] oid = curs.fetchone()[0]
self.conn.commit() self.conn.commit()
return oid return oid