psycopg2/tests/types_extras.py
Jason Erickson 88cc5a986d Use tests.dsn for the dsn connection string
Using self.conn.dsn as the dsn connection string actually has the password
'x'ed out.  The initial connection replaces the password with 'x' to
obfuscate it.  Using tests.dsn instead of self.conn.dsn ensures that the
correct connection string is used.
2011-02-02 17:14:29 -07:00

548 lines
18 KiB
Python
Executable File

#!/usr/bin/env python
#
# types_extras.py - tests for extras types conversions
#
# Copyright (C) 2008-2010 Federico Di Gregorio <fog@debian.org>
#
# psycopg2 is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
try:
import decimal
except:
pass
import re
import sys
from datetime import date
from testutils import unittest, skip_if_no_uuid
import psycopg2
import psycopg2.extras
import tests
def filter_scs(conn, s):
if conn.get_parameter_status("standard_conforming_strings") == 'off':
return s
else:
return s.replace("E'", "'")
class TypesExtrasTests(unittest.TestCase):
"""Test that all type conversions are working."""
def setUp(self):
self.conn = psycopg2.connect(tests.dsn)
def tearDown(self):
self.conn.close()
def execute(self, *args):
curs = self.conn.cursor()
curs.execute(*args)
return curs.fetchone()[0]
@skip_if_no_uuid
def testUUID(self):
import uuid
psycopg2.extras.register_uuid()
u = uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350')
s = self.execute("SELECT %s AS foo", (u,))
self.failUnless(u == s)
# must survive NULL cast to a uuid
s = self.execute("SELECT NULL::uuid AS foo")
self.failUnless(s is None)
@skip_if_no_uuid
def testUUIDARRAY(self):
import uuid
psycopg2.extras.register_uuid()
u = [uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350'), uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e352')]
s = self.execute("SELECT %s AS foo", (u,))
self.failUnless(u == s)
# array with a NULL element
u = [uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350'), None]
s = self.execute("SELECT %s AS foo", (u,))
self.failUnless(u == s)
# must survive NULL cast to a uuid[]
s = self.execute("SELECT NULL::uuid[] AS foo")
self.failUnless(s is None)
# what about empty arrays?
s = self.execute("SELECT '{}'::uuid[] AS foo")
self.failUnless(type(s) == list and len(s) == 0)
def testINET(self):
psycopg2.extras.register_inet()
i = "192.168.1.0/24";
s = self.execute("SELECT %s AS foo", (i,))
self.failUnless(i == s)
# must survive NULL cast to inet
s = self.execute("SELECT NULL::inet AS foo")
self.failUnless(s is None)
def test_inet_conform(self):
from psycopg2.extras import Inet
i = Inet("192.168.1.0/24")
a = psycopg2.extensions.adapt(i)
a.prepare(self.conn)
self.assertEqual(
filter_scs(self.conn, "E'192.168.1.0/24'::inet"),
a.getquoted())
# adapts ok with unicode too
i = Inet(u"192.168.1.0/24")
a = psycopg2.extensions.adapt(i)
a.prepare(self.conn)
self.assertEqual(
filter_scs(self.conn, "E'192.168.1.0/24'::inet"),
a.getquoted())
def test_adapt_fail(self):
class Foo(object): pass
self.assertRaises(psycopg2.ProgrammingError,
psycopg2.extensions.adapt, Foo(), psycopg2.extensions.ISQLQuote, None)
try:
psycopg2.extensions.adapt(Foo(), psycopg2.extensions.ISQLQuote, None)
except psycopg2.ProgrammingError, err:
self.failUnless(str(err) == "can't adapt type 'Foo'")
def skip_if_no_hstore(f):
def skip_if_no_hstore_(self):
from psycopg2.extras import HstoreAdapter
oids = HstoreAdapter.get_oids(self.conn)
if oids is None:
return self.skipTest("hstore not available in test database")
return f(self)
return skip_if_no_hstore_
class HstoreTestCase(unittest.TestCase):
def setUp(self):
self.conn = psycopg2.connect(tests.dsn)
def tearDown(self):
self.conn.close()
def test_adapt_8(self):
if self.conn.server_version >= 90000:
return self.skipTest("skipping dict adaptation with PG pre-9 syntax")
from psycopg2.extras import HstoreAdapter
o = {'a': '1', 'b': "'", 'c': None}
if self.conn.encoding == 'UTF8':
o['d'] = u'\xe0'
a = HstoreAdapter(o)
a.prepare(self.conn)
q = a.getquoted()
self.assert_(q.startswith("(("), q)
self.assert_(q.endswith("))"), q)
ii = q[1:-1].split("||")
ii.sort()
self.assertEqual(len(ii), len(o))
self.assertEqual(ii[0], filter_scs(self.conn, "(E'a' => E'1')"))
self.assertEqual(ii[1], filter_scs(self.conn, "(E'b' => E'''')"))
self.assertEqual(ii[2], filter_scs(self.conn, "(E'c' => NULL)"))
if 'd' in o:
encc = u'\xe0'.encode(psycopg2.extensions.encodings[self.conn.encoding])
self.assertEqual(ii[3], filter_scs(self.conn, "(E'd' => E'%s')" % encc))
def test_adapt_9(self):
if self.conn.server_version < 90000:
return self.skipTest("skipping dict adaptation with PG 9 syntax")
from psycopg2.extras import HstoreAdapter
o = {'a': '1', 'b': "'", 'c': None}
if self.conn.encoding == 'UTF8':
o['d'] = u'\xe0'
a = HstoreAdapter(o)
a.prepare(self.conn)
q = a.getquoted()
m = re.match(r'hstore\(ARRAY\[([^\]]+)\], ARRAY\[([^\]]+)\]\)', q)
self.assert_(m, repr(q))
kk = m.group(1).split(", ")
vv = m.group(2).split(", ")
ii = zip(kk, vv)
ii.sort()
def f(*args):
return tuple([filter_scs(self.conn, s) for s in args])
self.assertEqual(len(ii), len(o))
self.assertEqual(ii[0], f("E'a'", "E'1'"))
self.assertEqual(ii[1], f("E'b'", "E''''"))
self.assertEqual(ii[2], f("E'c'", "NULL"))
if 'd' in o:
encc = u'\xe0'.encode(psycopg2.extensions.encodings[self.conn.encoding])
self.assertEqual(ii[3], f("E'd'", "E'%s'" % encc))
def test_parse(self):
from psycopg2.extras import HstoreAdapter
def ok(s, d):
self.assertEqual(HstoreAdapter.parse(s, None), d)
ok(None, None)
ok('', {})
ok('"a"=>"1", "b"=>"2"', {'a': '1', 'b': '2'})
ok('"a" => "1" ,"b" => "2"', {'a': '1', 'b': '2'})
ok('"a"=>NULL, "b"=>"2"', {'a': None, 'b': '2'})
ok(r'"a"=>"\"", "\""=>"2"', {'a': '"', '"': '2'})
ok('"a"=>"\'", "\'"=>"2"', {'a': "'", "'": '2'})
ok('"a"=>"1", "b"=>NULL', {'a': '1', 'b': None})
ok(r'"a\\"=>"1"', {'a\\': '1'})
ok(r'"a\""=>"1"', {'a"': '1'})
ok(r'"a\\\""=>"1"', {r'a\"': '1'})
ok(r'"a\\\\\""=>"1"', {r'a\\"': '1'})
def ko(s):
self.assertRaises(psycopg2.InterfaceError,
HstoreAdapter.parse, s, None)
ko('a')
ko('"a"')
ko(r'"a\\""=>"1"')
ko(r'"a\\\\""=>"1"')
ko('"a=>"1"')
ko('"a"=>"1", "b"=>NUL')
@skip_if_no_hstore
def test_register_conn(self):
from psycopg2.extras import register_hstore
register_hstore(self.conn)
cur = self.conn.cursor()
cur.execute("select null::hstore, ''::hstore, 'a => b'::hstore")
t = cur.fetchone()
self.assert_(t[0] is None)
self.assertEqual(t[1], {})
self.assertEqual(t[2], {'a': 'b'})
@skip_if_no_hstore
def test_register_curs(self):
from psycopg2.extras import register_hstore
cur = self.conn.cursor()
register_hstore(cur)
cur.execute("select null::hstore, ''::hstore, 'a => b'::hstore")
t = cur.fetchone()
self.assert_(t[0] is None)
self.assertEqual(t[1], {})
self.assertEqual(t[2], {'a': 'b'})
@skip_if_no_hstore
def test_register_unicode(self):
from psycopg2.extras import register_hstore
register_hstore(self.conn, unicode=True)
cur = self.conn.cursor()
cur.execute("select null::hstore, ''::hstore, 'a => b'::hstore")
t = cur.fetchone()
self.assert_(t[0] is None)
self.assertEqual(t[1], {})
self.assertEqual(t[2], {u'a': u'b'})
self.assert_(isinstance(t[2].keys()[0], unicode))
self.assert_(isinstance(t[2].values()[0], unicode))
@skip_if_no_hstore
def test_register_globally(self):
from psycopg2.extras import register_hstore, HstoreAdapter
oids = HstoreAdapter.get_oids(self.conn)
try:
register_hstore(self.conn, globally=True)
conn2 = psycopg2.connect(tests.dsn)
try:
cur2 = self.conn.cursor()
cur2.execute("select 'a => b'::hstore")
r = cur2.fetchone()
self.assert_(isinstance(r[0], dict))
finally:
conn2.close()
finally:
psycopg2.extensions.string_types.pop(oids[0])
# verify the caster is not around anymore
cur = self.conn.cursor()
cur.execute("select 'a => b'::hstore")
r = cur.fetchone()
self.assert_(isinstance(r[0], str))
@skip_if_no_hstore
def test_roundtrip(self):
from psycopg2.extras import register_hstore
register_hstore(self.conn)
cur = self.conn.cursor()
def ok(d):
cur.execute("select %s", (d,))
d1 = cur.fetchone()[0]
self.assertEqual(len(d), len(d1))
for k in d:
self.assert_(k in d1, k)
self.assertEqual(d[k], d1[k])
ok({})
ok({'a': 'b', 'c': None})
ab = map(chr, range(32, 128))
ok(dict(zip(ab, ab)))
ok({''.join(ab): ''.join(ab)})
self.conn.set_client_encoding('latin1')
ab = map(chr, range(1, 256))
ok({''.join(ab): ''.join(ab)})
ok(dict(zip(ab, ab)))
@skip_if_no_hstore
def test_roundtrip_unicode(self):
from psycopg2.extras import register_hstore
register_hstore(self.conn, unicode=True)
cur = self.conn.cursor()
def ok(d):
cur.execute("select %s", (d,))
d1 = cur.fetchone()[0]
self.assertEqual(len(d), len(d1))
for k, v in d1.iteritems():
self.assert_(k in d, k)
self.assertEqual(d[k], v)
self.assert_(isinstance(k, unicode))
self.assert_(v is None or isinstance(v, unicode))
ok({})
ok({'a': 'b', 'c': None, 'd': u'\u20ac', u'\u2603': 'e'})
ab = map(unichr, range(1, 1024))
ok({u''.join(ab): u''.join(ab)})
ok(dict(zip(ab, ab)))
def skip_if_no_composite(f):
def skip_if_no_composite_(self):
if self.conn.server_version < 80000:
return self.skipTest(
"server version %s doesn't support composite types"
% self.conn.server_version)
return f(self)
skip_if_no_composite_.__name__ = f.__name__
return skip_if_no_composite_
class AdaptTypeTestCase(unittest.TestCase):
def setUp(self):
self.conn = psycopg2.connect(tests.dsn)
def tearDown(self):
self.conn.close()
@skip_if_no_composite
def test_none_in_record(self):
curs = self.conn.cursor()
s = curs.mogrify("SELECT %s;", [(42, None)])
self.assertEqual("SELECT (42, NULL);", s)
curs.execute("SELECT %s;", [(42, None)])
d = curs.fetchone()[0]
self.assertEqual("(42,)", d)
def test_none_fast_path(self):
# the None adapter is not actually invoked in regular adaptation
ext = psycopg2.extensions
class WonkyAdapter(object):
def __init__(self, obj): pass
def getquoted(self): return "NOPE!"
curs = self.conn.cursor()
orig_adapter = ext.adapters[type(None), ext.ISQLQuote]
try:
ext.register_adapter(type(None), WonkyAdapter)
self.assertEqual(ext.adapt(None).getquoted(), "NOPE!")
s = curs.mogrify("SELECT %s;", (None,))
self.assertEqual("SELECT NULL;", s)
finally:
ext.register_adapter(type(None), orig_adapter)
def test_tokenization(self):
from psycopg2.extras import CompositeCaster
def ok(s, v):
self.assertEqual(CompositeCaster.tokenize(s), v)
ok("(,)", [None, None])
ok('(hello,,10.234,2010-11-11)', ['hello', None, '10.234', '2010-11-11'])
ok('(10,"""")', ['10', '"'])
ok('(10,",")', ['10', ','])
ok(r'(10,"\\")', ['10', '\\'])
ok(r'''(10,"\\',""")''', ['10', '''\\',"'''])
ok('(10,"(20,""(30,40)"")")', ['10', '(20,"(30,40)")'])
ok('(10,"(20,""(30,""""(40,50)"""")"")")', ['10', '(20,"(30,""(40,50)"")")'])
ok('(,"(,""(a\nb\tc)"")")', [None, '(,"(a\nb\tc)")'])
ok('(\x01,\x02,\x03,\x04,\x05,\x06,\x07,\x08,"\t","\n","\x0b",'
'"\x0c","\r",\x0e,\x0f,\x10,\x11,\x12,\x13,\x14,\x15,\x16,'
'\x17,\x18,\x19,\x1a,\x1b,\x1c,\x1d,\x1e,\x1f," ",!,"""",#,'
'$,%,&,\',"(",")",*,+,",",-,.,/,0,1,2,3,4,5,6,7,8,9,:,;,<,=,>,?,'
'@,A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P,Q,R,S,T,U,V,W,X,Y,Z,[,"\\\\",],'
'^,_,`,a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t,u,v,w,x,y,z,{,|,},'
'~,\x7f)',
map(chr, range(1, 128)))
ok('(,"\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f'
'\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !'
'""#$%&\'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\\\]'
'^_`abcdefghijklmnopqrstuvwxyz{|}~\x7f")',
[None, ''.join(map(chr, range(1, 128)))])
@skip_if_no_composite
def test_cast_composite(self):
oid = self._create_type("type_isd",
[('anint', 'integer'), ('astring', 'text'), ('adate', 'date')])
t = psycopg2.extras.register_composite("type_isd", self.conn)
self.assertEqual(t.name, 'type_isd')
self.assertEqual(t.oid, oid)
self.assert_(issubclass(t.type, tuple))
self.assertEqual(t.attnames, ['anint', 'astring', 'adate'])
self.assertEqual(t.atttypes, [23,25,1082])
curs = self.conn.cursor()
r = (10, 'hello', date(2011,1,2))
curs.execute("select %s::type_isd;", (r,))
v = curs.fetchone()[0]
self.assert_(isinstance(v, t.type))
self.assertEqual(v[0], 10)
self.assertEqual(v[1], "hello")
self.assertEqual(v[2], date(2011,1,2))
try:
from collections import namedtuple
except ImportError:
pass
else:
self.assert_(t.type is not tuple)
self.assertEqual(v.anint, 10)
self.assertEqual(v.astring, "hello")
self.assertEqual(v.adate, date(2011,1,2))
@skip_if_no_composite
def test_cast_nested(self):
self._create_type("type_is",
[("anint", "integer"), ("astring", "text")])
self._create_type("type_r_dt",
[("adate", "date"), ("apair", "type_is")])
self._create_type("type_r_ft",
[("afloat", "float8"), ("anotherpair", "type_r_dt")])
psycopg2.extras.register_composite("type_is", self.conn)
psycopg2.extras.register_composite("type_r_dt", self.conn)
psycopg2.extras.register_composite("type_r_ft", self.conn)
curs = self.conn.cursor()
r = (0.25, (date(2011,1,2), (42, "hello")))
curs.execute("select %s::type_r_ft;", (r,))
v = curs.fetchone()[0]
self.assertEqual(r, v)
try:
from collections import namedtuple
except ImportError:
pass
else:
self.assertEqual(v.anotherpair.apair.astring, "hello")
@skip_if_no_composite
def test_register_on_cursor(self):
self._create_type("type_ii", [("a", "integer"), ("b", "integer")])
curs1 = self.conn.cursor()
curs2 = self.conn.cursor()
psycopg2.extras.register_composite("type_ii", curs1)
curs1.execute("select (1,2)::type_ii")
self.assertEqual(curs1.fetchone()[0], (1,2))
curs2.execute("select (1,2)::type_ii")
self.assertEqual(curs2.fetchone()[0], "(1,2)")
@skip_if_no_composite
def test_register_on_connection(self):
self._create_type("type_ii", [("a", "integer"), ("b", "integer")])
conn1 = psycopg2.connect(tests.dsn)
conn2 = psycopg2.connect(tests.dsn)
try:
psycopg2.extras.register_composite("type_ii", conn1)
curs1 = conn1.cursor()
curs2 = conn2.cursor()
curs1.execute("select (1,2)::type_ii")
self.assertEqual(curs1.fetchone()[0], (1,2))
curs2.execute("select (1,2)::type_ii")
self.assertEqual(curs2.fetchone()[0], "(1,2)")
finally:
conn1.close()
conn2.close()
@skip_if_no_composite
def test_register_globally(self):
self._create_type("type_ii", [("a", "integer"), ("b", "integer")])
conn1 = psycopg2.connect(tests.dsn)
conn2 = psycopg2.connect(tests.dsn)
try:
t = psycopg2.extras.register_composite("type_ii", conn1, globally=True)
try:
curs1 = conn1.cursor()
curs2 = conn2.cursor()
curs1.execute("select (1,2)::type_ii")
self.assertEqual(curs1.fetchone()[0], (1,2))
curs2.execute("select (1,2)::type_ii")
self.assertEqual(curs2.fetchone()[0], (1,2))
finally:
del psycopg2.extensions.string_types[t.oid]
finally:
conn1.close()
conn2.close()
def _create_type(self, name, fields):
curs = self.conn.cursor()
try:
curs.execute("drop type %s cascade;" % name)
except psycopg2.ProgrammingError:
self.conn.rollback()
curs.execute("create type %s as (%s);" % (name,
", ".join(["%s %s" % p for p in fields])))
curs.execute("""\
SELECT t.oid
FROM pg_type t JOIN pg_namespace ns ON typnamespace = ns.oid
WHERE typname = %s and nspname = 'public';
""", (name,))
oid = curs.fetchone()[0]
self.conn.commit()
return oid
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)
if __name__ == "__main__":
unittest.main()