2010-07-09 18:23:02 +04:00
|
|
|
#!/usr/bin/env python
|
|
|
|
|
|
|
|
import unittest
|
|
|
|
import psycopg2
|
|
|
|
import psycopg2.extensions
|
2010-12-29 05:47:29 +03:00
|
|
|
from psycopg2.extensions import b
|
2010-12-21 07:58:38 +03:00
|
|
|
from testconfig import dsn
|
2010-07-09 18:23:02 +04:00
|
|
|
|
|
|
|
class CursorTests(unittest.TestCase):
|
|
|
|
|
2010-11-28 19:00:32 +03:00
|
|
|
def setUp(self):
|
2010-12-21 07:58:38 +03:00
|
|
|
self.conn = psycopg2.connect(dsn)
|
2010-11-28 19:00:32 +03:00
|
|
|
|
|
|
|
def tearDown(self):
|
|
|
|
self.conn.close()
|
2010-07-09 18:23:02 +04:00
|
|
|
|
|
|
|
def test_executemany_propagate_exceptions(self):
|
2010-11-28 19:00:32 +03:00
|
|
|
conn = self.conn
|
2010-07-09 18:23:02 +04:00
|
|
|
cur = conn.cursor()
|
|
|
|
cur.execute("create temp table test_exc (data int);")
|
|
|
|
def buggygen():
|
2010-12-12 19:45:21 +03:00
|
|
|
yield 1//0
|
2010-07-09 18:23:02 +04:00
|
|
|
self.assertRaises(ZeroDivisionError,
|
|
|
|
cur.executemany, "insert into test_exc values (%s)", buggygen())
|
|
|
|
cur.close()
|
|
|
|
|
2010-10-05 06:13:44 +04:00
|
|
|
def test_mogrify_unicode(self):
|
2010-11-28 19:00:32 +03:00
|
|
|
conn = self.conn
|
2010-10-05 06:13:44 +04:00
|
|
|
cur = conn.cursor()
|
|
|
|
|
|
|
|
# test consistency between execute and mogrify.
|
|
|
|
|
|
|
|
# unicode query containing only ascii data
|
|
|
|
cur.execute(u"SELECT 'foo';")
|
|
|
|
self.assertEqual('foo', cur.fetchone()[0])
|
2010-12-29 05:47:29 +03:00
|
|
|
self.assertEqual(b("SELECT 'foo';"), cur.mogrify(u"SELECT 'foo';"))
|
2010-10-05 06:13:44 +04:00
|
|
|
|
|
|
|
conn.set_client_encoding('UTF8')
|
|
|
|
snowman = u"\u2603"
|
|
|
|
|
|
|
|
# unicode query with non-ascii data
|
|
|
|
cur.execute(u"SELECT '%s';" % snowman)
|
2010-12-29 05:47:29 +03:00
|
|
|
self.assertEqual(snowman.encode('utf8'), b(cur.fetchone()[0]))
|
|
|
|
self.assertEqual(("SELECT '%s';" % snowman).encode('utf8'),
|
|
|
|
cur.mogrify(u"SELECT '%s';" % snowman).replace(b("E'"), b("'")))
|
2010-10-05 06:13:44 +04:00
|
|
|
|
|
|
|
# unicode args
|
|
|
|
cur.execute("SELECT %s;", (snowman,))
|
2010-12-29 05:47:29 +03:00
|
|
|
self.assertEqual(snowman.encode("utf-8"), b(cur.fetchone()[0]))
|
|
|
|
self.assertEqual(("SELECT '%s';" % snowman).encode('utf8'),
|
|
|
|
cur.mogrify("SELECT %s;", (snowman,)).replace(b("E'"), b("'")))
|
2010-10-05 06:13:44 +04:00
|
|
|
|
|
|
|
# unicode query and args
|
|
|
|
cur.execute(u"SELECT %s;", (snowman,))
|
2010-12-29 05:47:29 +03:00
|
|
|
self.assertEqual(snowman.encode("utf-8"), b(cur.fetchone()[0]))
|
|
|
|
self.assertEqual(("SELECT '%s';" % snowman).encode('utf8'),
|
|
|
|
cur.mogrify(u"SELECT %s;", (snowman,)).replace(b("E'"), b("'")))
|
2010-07-09 18:23:02 +04:00
|
|
|
|
2010-11-06 05:24:28 +03:00
|
|
|
def test_mogrify_decimal_explodes(self):
|
|
|
|
# issue #7: explodes on windows with python 2.5 and psycopg 2.2.2
|
|
|
|
try:
|
|
|
|
from decimal import Decimal
|
|
|
|
except:
|
|
|
|
return
|
|
|
|
|
2010-11-28 19:00:32 +03:00
|
|
|
conn = self.conn
|
2010-11-06 05:24:28 +03:00
|
|
|
cur = conn.cursor()
|
2010-12-29 05:47:29 +03:00
|
|
|
self.assertEqual(b('SELECT 10.3;'),
|
2010-11-06 05:24:28 +03:00
|
|
|
cur.mogrify("SELECT %s;", (Decimal("10.3"),)))
|
|
|
|
|
2011-01-02 00:55:10 +03:00
|
|
|
def test_cast(self):
|
|
|
|
curs = self.conn.cursor()
|
|
|
|
|
|
|
|
self.assertEqual(42, curs.cast(20, '42'))
|
|
|
|
self.assertAlmostEqual(3.14, curs.cast(700, '3.14'))
|
|
|
|
|
|
|
|
try:
|
|
|
|
from decimal import Decimal
|
|
|
|
except ImportError:
|
|
|
|
self.assertAlmostEqual(123.45, curs.cast(1700, '123.45'))
|
|
|
|
else:
|
|
|
|
self.assertEqual(Decimal('123.45'), curs.cast(1700, '123.45'))
|
|
|
|
|
|
|
|
from datetime import date
|
|
|
|
self.assertEqual(date(2011,1,2), curs.cast(1082, '2011-01-02'))
|
|
|
|
self.assertEqual("who am i?", curs.cast(705, 'who am i?')) # unknown
|
|
|
|
|
|
|
|
def test_cast_specificity(self):
|
|
|
|
curs = self.conn.cursor()
|
|
|
|
self.assertEqual("foo", curs.cast(705, 'foo'))
|
|
|
|
|
|
|
|
D = psycopg2.extensions.new_type((705,), "DOUBLING", lambda v, c: v * 2)
|
|
|
|
psycopg2.extensions.register_type(D, self.conn)
|
|
|
|
self.assertEqual("foofoo", curs.cast(705, 'foo'))
|
|
|
|
|
|
|
|
T = psycopg2.extensions.new_type((705,), "TREBLING", lambda v, c: v * 3)
|
|
|
|
psycopg2.extensions.register_type(T, curs)
|
|
|
|
self.assertEqual("foofoofoo", curs.cast(705, 'foo'))
|
|
|
|
|
|
|
|
curs2 = self.conn.cursor()
|
|
|
|
self.assertEqual("foofoo", curs2.cast(705, 'foo'))
|
|
|
|
|
2010-11-06 05:24:28 +03:00
|
|
|
|
2010-07-09 18:23:02 +04:00
|
|
|
def test_suite():
|
|
|
|
return unittest.TestLoader().loadTestsFromName(__name__)
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
unittest.main()
|