psycopg2/tests/types_extras.py

548 lines
18 KiB
Python

#!/usr/bin/env python
#
# types_extras.py - tests for extras types conversions
#
# Copyright (C) 2008-2010 Federico Di Gregorio <fog@debian.org>
#
# psycopg2 is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
try:
import decimal
except:
pass
import re
import sys
from datetime import date
from testutils import unittest
import psycopg2
import psycopg2.extras
import tests
def skip_if_no_uuid(f):
def skip_if_no_uuid_(self):
try:
import uuid
except ImportError:
return self.skipTest("uuid not available in this Python version")
try:
cur = self.conn.cursor()
cur.execute("select typname from pg_type where typname = 'uuid'")
has = cur.fetchone()
finally:
self.conn.rollback()
if has:
return f(self)
else:
return self.skipTest("uuid type not available on the server")
return skip_if_no_uuid_
def filter_scs(conn, s):
if conn.get_parameter_status("standard_conforming_strings") == 'off':
return s
else:
return s.replace("E'", "'")
class TypesExtrasTests(unittest.TestCase):
"""Test that all type conversions are working."""
def setUp(self):
self.conn = psycopg2.connect(tests.dsn)
def tearDown(self):
self.conn.close()
def execute(self, *args):
curs = self.conn.cursor()
curs.execute(*args)
return curs.fetchone()[0]
@skip_if_no_uuid
def testUUID(self):
import uuid
psycopg2.extras.register_uuid()
u = uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350')
s = self.execute("SELECT %s AS foo", (u,))
self.failUnless(u == s)
# must survive NULL cast to a uuid
s = self.execute("SELECT NULL::uuid AS foo")
self.failUnless(s is None)
@skip_if_no_uuid
def testUUIDARRAY(self):
import uuid
psycopg2.extras.register_uuid()
u = [uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350'), uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e352')]
s = self.execute("SELECT %s AS foo", (u,))
self.failUnless(u == s)
# array with a NULL element
u = [uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350'), None]
s = self.execute("SELECT %s AS foo", (u,))
self.failUnless(u == s)
# must survive NULL cast to a uuid[]
s = self.execute("SELECT NULL::uuid[] AS foo")
self.failUnless(s is None)
# what about empty arrays?
s = self.execute("SELECT '{}'::uuid[] AS foo")
self.failUnless(type(s) == list and len(s) == 0)
def testINET(self):
psycopg2.extras.register_inet()
i = "192.168.1.0/24";
s = self.execute("SELECT %s AS foo", (i,))
self.failUnless(i == s)
# must survive NULL cast to inet
s = self.execute("SELECT NULL::inet AS foo")
self.failUnless(s is None)
def test_inet_conform(self):
from psycopg2.extras import Inet
i = Inet("192.168.1.0/24")
a = psycopg2.extensions.adapt(i)
a.prepare(self.conn)
self.assertEqual(
filter_scs(self.conn, "E'192.168.1.0/24'::inet"),
a.getquoted())
# adapts ok with unicode too
i = Inet(u"192.168.1.0/24")
a = psycopg2.extensions.adapt(i)
a.prepare(self.conn)
self.assertEqual(
filter_scs(self.conn, "E'192.168.1.0/24'::inet"),
a.getquoted())
def test_adapt_fail(self):
class Foo(object): pass
self.assertRaises(psycopg2.ProgrammingError,
psycopg2.extensions.adapt, Foo(), psycopg2.extensions.ISQLQuote, None)
try:
psycopg2.extensions.adapt(Foo(), psycopg2.extensions.ISQLQuote, None)
except psycopg2.ProgrammingError, err:
self.failUnless(str(err) == "can't adapt type 'Foo'")
def skip_if_no_hstore(f):
def skip_if_no_hstore_(self):
from psycopg2.extras import HstoreAdapter
oids = HstoreAdapter.get_oids(self.conn)
if oids is None:
return self.skipTest("hstore not available in test database")
return f(self)
return skip_if_no_hstore_
class HstoreTestCase(unittest.TestCase):
def setUp(self):
self.conn = psycopg2.connect(tests.dsn)
def tearDown(self):
self.conn.close()
def test_adapt_8(self):
if self.conn.server_version >= 90000:
return self.skipTest("skipping dict adaptation with PG pre-9 syntax")
from psycopg2.extras import HstoreAdapter
o = {'a': '1', 'b': "'", 'c': None}
if self.conn.encoding == 'UTF8':
o['d'] = u'\xe0'
a = HstoreAdapter(o)
a.prepare(self.conn)
q = a.getquoted()
self.assert_(q.startswith("(("), q)
self.assert_(q.endswith("))"), q)
ii = q[1:-1].split("||")
ii.sort()
self.assertEqual(len(ii), len(o))
self.assertEqual(ii[0], filter_scs(self.conn, "(E'a' => E'1')"))
self.assertEqual(ii[1], filter_scs(self.conn, "(E'b' => E'''')"))
self.assertEqual(ii[2], filter_scs(self.conn, "(E'c' => NULL)"))
if 'd' in o:
encc = u'\xe0'.encode(psycopg2.extensions.encodings[self.conn.encoding])
self.assertEqual(ii[3], filter_scs(self.conn, "(E'd' => E'%s')" % encc))
def test_adapt_9(self):
if self.conn.server_version < 90000:
return self.skipTest("skipping dict adaptation with PG 9 syntax")
from psycopg2.extras import HstoreAdapter
o = {'a': '1', 'b': "'", 'c': None}
if self.conn.encoding == 'UTF8':
o['d'] = u'\xe0'
a = HstoreAdapter(o)
a.prepare(self.conn)
q = a.getquoted()
m = re.match(r'hstore\(ARRAY\[([^\]]+)\], ARRAY\[([^\]]+)\]\)', q)
self.assert_(m, repr(q))
kk = m.group(1).split(", ")
vv = m.group(2).split(", ")
ii = zip(kk, vv)
ii.sort()
self.assertEqual(len(ii), len(o))
self.assertEqual(ii[0], ("E'a'", "E'1'"))
self.assertEqual(ii[1], ("E'b'", "E''''"))
self.assertEqual(ii[2], ("E'c'", "NULL"))
if 'd' in o:
encc = u'\xe0'.encode(psycopg2.extensions.encodings[self.conn.encoding])
self.assertEqual(ii[3], ("E'd'", "E'%s'" % encc))
def test_parse(self):
from psycopg2.extras import HstoreAdapter
def ok(s, d):
self.assertEqual(HstoreAdapter.parse(s, None), d)
ok(None, None)
ok('', {})
ok('"a"=>"1", "b"=>"2"', {'a': '1', 'b': '2'})
ok('"a" => "1" ,"b" => "2"', {'a': '1', 'b': '2'})
ok('"a"=>NULL, "b"=>"2"', {'a': None, 'b': '2'})
ok(r'"a"=>"\"", "\""=>"2"', {'a': '"', '"': '2'})
ok('"a"=>"\'", "\'"=>"2"', {'a': "'", "'": '2'})
ok('"a"=>"1", "b"=>NULL', {'a': '1', 'b': None})
ok(r'"a\\"=>"1"', {'a\\': '1'})
ok(r'"a\""=>"1"', {'a"': '1'})
ok(r'"a\\\""=>"1"', {r'a\"': '1'})
ok(r'"a\\\\\""=>"1"', {r'a\\"': '1'})
def ko(s):
self.assertRaises(psycopg2.InterfaceError,
HstoreAdapter.parse, s, None)
ko('a')
ko('"a"')
ko(r'"a\\""=>"1"')
ko(r'"a\\\\""=>"1"')
ko('"a=>"1"')
ko('"a"=>"1", "b"=>NUL')
@skip_if_no_hstore
def test_register_conn(self):
from psycopg2.extras import register_hstore
register_hstore(self.conn)
cur = self.conn.cursor()
cur.execute("select null::hstore, ''::hstore, 'a => b'::hstore")
t = cur.fetchone()
self.assert_(t[0] is None)
self.assertEqual(t[1], {})
self.assertEqual(t[2], {'a': 'b'})
@skip_if_no_hstore
def test_register_curs(self):
from psycopg2.extras import register_hstore
cur = self.conn.cursor()
register_hstore(cur)
cur.execute("select null::hstore, ''::hstore, 'a => b'::hstore")
t = cur.fetchone()
self.assert_(t[0] is None)
self.assertEqual(t[1], {})
self.assertEqual(t[2], {'a': 'b'})
@skip_if_no_hstore
def test_register_unicode(self):
from psycopg2.extras import register_hstore
register_hstore(self.conn, unicode=True)
cur = self.conn.cursor()
cur.execute("select null::hstore, ''::hstore, 'a => b'::hstore")
t = cur.fetchone()
self.assert_(t[0] is None)
self.assertEqual(t[1], {})
self.assertEqual(t[2], {u'a': u'b'})
self.assert_(isinstance(t[2].keys()[0], unicode))
self.assert_(isinstance(t[2].values()[0], unicode))
@skip_if_no_hstore
def test_register_globally(self):
from psycopg2.extras import register_hstore, HstoreAdapter
oids = HstoreAdapter.get_oids(self.conn)
try:
register_hstore(self.conn, globally=True)
conn2 = psycopg2.connect(self.conn.dsn)
try:
cur2 = self.conn.cursor()
cur2.execute("select 'a => b'::hstore")
r = cur2.fetchone()
self.assert_(isinstance(r[0], dict))
finally:
conn2.close()
finally:
psycopg2.extensions.string_types.pop(oids[0])
# verify the caster is not around anymore
cur = self.conn.cursor()
cur.execute("select 'a => b'::hstore")
r = cur.fetchone()
self.assert_(isinstance(r[0], str))
@skip_if_no_hstore
def test_roundtrip(self):
from psycopg2.extras import register_hstore
register_hstore(self.conn)
cur = self.conn.cursor()
def ok(d):
cur.execute("select %s", (d,))
d1 = cur.fetchone()[0]
self.assertEqual(len(d), len(d1))
for k in d:
self.assert_(k in d1, k)
self.assertEqual(d[k], d1[k])
ok({})
ok({'a': 'b', 'c': None})
ab = map(chr, range(32, 128))
ok(dict(zip(ab, ab)))
ok({''.join(ab): ''.join(ab)})
self.conn.set_client_encoding('latin1')
ab = map(chr, range(1, 256))
ok({''.join(ab): ''.join(ab)})
ok(dict(zip(ab, ab)))
@skip_if_no_hstore
def test_roundtrip_unicode(self):
from psycopg2.extras import register_hstore
register_hstore(self.conn, unicode=True)
cur = self.conn.cursor()
def ok(d):
cur.execute("select %s", (d,))
d1 = cur.fetchone()[0]
self.assertEqual(len(d), len(d1))
for k, v in d1.iteritems():
self.assert_(k in d, k)
self.assertEqual(d[k], v)
self.assert_(isinstance(k, unicode))
self.assert_(v is None or isinstance(v, unicode))
ok({})
ok({'a': 'b', 'c': None, 'd': u'\u20ac', u'\u2603': 'e'})
ab = map(unichr, range(1, 1024))
ok({u''.join(ab): u''.join(ab)})
ok(dict(zip(ab, ab)))
class AdaptTypeTestCase(unittest.TestCase):
def setUp(self):
self.conn = psycopg2.connect(tests.dsn)
def tearDown(self):
self.conn.close()
def test_none_in_record(self):
curs = self.conn.cursor()
s = curs.mogrify("SELECT %s;", [(42, None)])
self.assertEqual("SELECT (42, NULL);", s)
curs.execute("SELECT %s;", [(42, None)])
d = curs.fetchone()[0]
self.assertEqual("(42,)", d)
def test_none_fast_path(self):
# the None adapter is not actually invoked in regular adaptation
ext = psycopg2.extensions
class WonkyAdapter(object):
def __init__(self, obj): pass
def getquoted(self): return "NOPE!"
curs = self.conn.cursor()
orig_adapter = ext.adapters[type(None), ext.ISQLQuote]
try:
ext.register_adapter(type(None), WonkyAdapter)
self.assertEqual(ext.adapt(None).getquoted(), "NOPE!")
s = curs.mogrify("SELECT %s;", (None,))
self.assertEqual("SELECT NULL;", s)
finally:
ext.register_adapter(type(None), orig_adapter)
def test_tokenization(self):
from psycopg2.extras import CompositeCaster
def ok(s, v):
self.assertEqual(CompositeCaster.tokenize(s), v)
ok("(,)", [None, None])
ok('(hello,,10.234,2010-11-11)', ['hello', None, '10.234', '2010-11-11'])
ok('(10,"""")', ['10', '"'])
ok('(10,",")', ['10', ','])
ok(r'(10,"\\")', ['10', '\\'])
ok(r'''(10,"\\',""")''', ['10', '''\\',"'''])
ok('(10,"(20,""(30,40)"")")', ['10', '(20,"(30,40)")'])
ok('(10,"(20,""(30,""""(40,50)"""")"")")', ['10', '(20,"(30,""(40,50)"")")'])
ok('(,"(,""(a\nb\tc)"")")', [None, '(,"(a\nb\tc)")'])
ok('(\x01,\x02,\x03,\x04,\x05,\x06,\x07,\x08,"\t","\n","\x0b",'
'"\x0c","\r",\x0e,\x0f,\x10,\x11,\x12,\x13,\x14,\x15,\x16,'
'\x17,\x18,\x19,\x1a,\x1b,\x1c,\x1d,\x1e,\x1f," ",!,"""",#,'
'$,%,&,\',"(",")",*,+,",",-,.,/,0,1,2,3,4,5,6,7,8,9,:,;,<,=,>,?,'
'@,A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P,Q,R,S,T,U,V,W,X,Y,Z,[,"\\\\",],'
'^,_,`,a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t,u,v,w,x,y,z,{,|,},'
'~,\x7f)',
map(chr, range(1, 128)))
ok('(,"\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f'
'\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !'
'""#$%&\'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\\\]'
'^_`abcdefghijklmnopqrstuvwxyz{|}~\x7f")',
[None, ''.join(map(chr, range(1, 128)))])
def test_cast_composite(self):
oid = self._create_type("type_isd",
[('anint', 'integer'), ('astring', 'text'), ('adate', 'date')])
t = psycopg2.extras.register_composite("type_isd", self.conn)
self.assertEqual(t.name, 'type_isd')
self.assertEqual(t.oid, oid)
self.assert_(issubclass(t.type, tuple))
self.assertEqual(t.attnames, ['anint', 'astring', 'adate'])
self.assertEqual(t.atttypes, [23,25,1082])
curs = self.conn.cursor()
r = (10, 'hello', date(2011,1,2))
curs.execute("select %s::type_isd;", (r,))
v = curs.fetchone()[0]
self.assert_(isinstance(v, t.type))
self.assertEqual(v[0], 10)
self.assertEqual(v[1], "hello")
self.assertEqual(v[2], date(2011,1,2))
try:
from collections import namedtuple
except ImportError:
pass
else:
self.assert_(t.type is not tuple)
self.assertEqual(v.anint, 10)
self.assertEqual(v.astring, "hello")
self.assertEqual(v.adate, date(2011,1,2))
def test_cast_nested(self):
self._create_type("type_is",
[("anint", "integer"), ("astring", "text")])
self._create_type("type_r_dt",
[("adate", "date"), ("apair", "type_is")])
self._create_type("type_r_ft",
[("afloat", "float8"), ("anotherpair", "type_r_dt")])
psycopg2.extras.register_composite("type_is", self.conn)
psycopg2.extras.register_composite("type_r_dt", self.conn)
psycopg2.extras.register_composite("type_r_ft", self.conn)
curs = self.conn.cursor()
r = (0.25, (date(2011,1,2), (42, "hello")))
curs.execute("select %s::type_r_ft;", (r,))
v = curs.fetchone()[0]
self.assertEqual(r, v)
try:
from collections import namedtuple
except ImportError:
pass
else:
self.assertEqual(v.anotherpair.apair.astring, "hello")
def test_register_on_cursor(self):
self._create_type("type_ii", [("a", "integer"), ("b", "integer")])
curs1 = self.conn.cursor()
curs2 = self.conn.cursor()
psycopg2.extras.register_composite("type_ii", curs1)
curs1.execute("select (1,2)::type_ii")
self.assertEqual(curs1.fetchone()[0], (1,2))
curs2.execute("select (1,2)::type_ii")
self.assertEqual(curs2.fetchone()[0], "(1,2)")
def test_register_on_connection(self):
self._create_type("type_ii", [("a", "integer"), ("b", "integer")])
conn1 = psycopg2.connect(self.conn.dsn)
conn2 = psycopg2.connect(self.conn.dsn)
try:
psycopg2.extras.register_composite("type_ii", conn1)
curs1 = conn1.cursor()
curs2 = conn2.cursor()
curs1.execute("select (1,2)::type_ii")
self.assertEqual(curs1.fetchone()[0], (1,2))
curs2.execute("select (1,2)::type_ii")
self.assertEqual(curs2.fetchone()[0], "(1,2)")
finally:
conn1.close()
conn2.close()
def test_register_globally(self):
self._create_type("type_ii", [("a", "integer"), ("b", "integer")])
conn1 = psycopg2.connect(self.conn.dsn)
conn2 = psycopg2.connect(self.conn.dsn)
try:
t = psycopg2.extras.register_composite("type_ii", conn1, globally=True)
try:
curs1 = conn1.cursor()
curs2 = conn2.cursor()
curs1.execute("select (1,2)::type_ii")
self.assertEqual(curs1.fetchone()[0], (1,2))
curs2.execute("select (1,2)::type_ii")
self.assertEqual(curs2.fetchone()[0], (1,2))
finally:
del psycopg2.extensions.string_types[t.oid]
finally:
conn1.close()
conn2.close()
def _create_type(self, name, fields):
curs = self.conn.cursor()
try:
curs.execute("drop type %s cascade;" % name)
except psycopg2.ProgrammingError:
self.conn.rollback()
curs.execute("create type %s as (%s);" % (name,
", ".join(["%s %s" % p for p in fields])))
curs.execute("""\
SELECT t.oid
FROM pg_type t JOIN pg_namespace ns ON typnamespace = ns.oid
WHERE typname = %s and nspname = 'public';
""", (name,))
oid = curs.fetchone()[0]
self.conn.commit()
return oid
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)
if __name__ == "__main__":
unittest.main()