psycopg2/tests/test_extras_dictcursor.py
Daniele Varrazzo bc84b6233e Allow non-ascii chars in namedtuple fields
They can be valid chars in Python 3. Or maybe not? In which case Python
will throw an exception, but that's fine.

Fix regression introduced fixing #211
2018-05-18 12:15:50 +01:00

458 lines
16 KiB
Python
Executable File

#!/usr/bin/env python
#
# extras_dictcursor - test if DictCursor extension class works
#
# Copyright (C) 2004-2010 Federico Di Gregorio <fog@debian.org>
#
# psycopg2 is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
import time
from datetime import timedelta
import psycopg2
import psycopg2.extras
import unittest
from .testutils import ConnectingTestCase, skip_before_postgres, skip_before_python
class ExtrasDictCursorTests(ConnectingTestCase):
"""Test if DictCursor extension class works."""
def setUp(self):
ConnectingTestCase.setUp(self)
curs = self.conn.cursor()
curs.execute("CREATE TEMPORARY TABLE ExtrasDictCursorTests (foo text)")
curs.execute("INSERT INTO ExtrasDictCursorTests VALUES ('bar')")
self.conn.commit()
def testDictConnCursorArgs(self):
self.conn.close()
self.conn = self.connect(connection_factory=psycopg2.extras.DictConnection)
cur = self.conn.cursor()
self.assert_(isinstance(cur, psycopg2.extras.DictCursor))
self.assertEqual(cur.name, None)
# overridable
cur = self.conn.cursor('foo',
cursor_factory=psycopg2.extras.NamedTupleCursor)
self.assertEqual(cur.name, 'foo')
self.assert_(isinstance(cur, psycopg2.extras.NamedTupleCursor))
def testDictCursorWithPlainCursorFetchOne(self):
self._testWithPlainCursor(lambda curs: curs.fetchone())
def testDictCursorWithPlainCursorFetchMany(self):
self._testWithPlainCursor(lambda curs: curs.fetchmany(100)[0])
def testDictCursorWithPlainCursorFetchManyNoarg(self):
self._testWithPlainCursor(lambda curs: curs.fetchmany()[0])
def testDictCursorWithPlainCursorFetchAll(self):
self._testWithPlainCursor(lambda curs: curs.fetchall()[0])
def testDictCursorWithPlainCursorIter(self):
def getter(curs):
for row in curs:
return row
self._testWithPlainCursor(getter)
def testUpdateRow(self):
row = self._testWithPlainCursor(lambda curs: curs.fetchone())
row['foo'] = 'qux'
self.failUnless(row['foo'] == 'qux')
self.failUnless(row[0] == 'qux')
@skip_before_postgres(8, 0)
def testDictCursorWithPlainCursorIterRowNumber(self):
curs = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
self._testIterRowNumber(curs)
def _testWithPlainCursor(self, getter):
curs = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
curs.execute("SELECT * FROM ExtrasDictCursorTests")
row = getter(curs)
self.failUnless(row['foo'] == 'bar')
self.failUnless(row[0] == 'bar')
return row
def testDictCursorWithPlainCursorRealFetchOne(self):
self._testWithPlainCursorReal(lambda curs: curs.fetchone())
def testDictCursorWithPlainCursorRealFetchMany(self):
self._testWithPlainCursorReal(lambda curs: curs.fetchmany(100)[0])
def testDictCursorWithPlainCursorRealFetchManyNoarg(self):
self._testWithPlainCursorReal(lambda curs: curs.fetchmany()[0])
def testDictCursorWithPlainCursorRealFetchAll(self):
self._testWithPlainCursorReal(lambda curs: curs.fetchall()[0])
def testDictCursorWithPlainCursorRealIter(self):
def getter(curs):
for row in curs:
return row
self._testWithPlainCursorReal(getter)
@skip_before_postgres(8, 0)
def testDictCursorWithPlainCursorRealIterRowNumber(self):
curs = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
self._testIterRowNumber(curs)
def _testWithPlainCursorReal(self, getter):
curs = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
curs.execute("SELECT * FROM ExtrasDictCursorTests")
row = getter(curs)
self.failUnless(row['foo'] == 'bar')
def testDictCursorWithNamedCursorFetchOne(self):
self._testWithNamedCursor(lambda curs: curs.fetchone())
def testDictCursorWithNamedCursorFetchMany(self):
self._testWithNamedCursor(lambda curs: curs.fetchmany(100)[0])
def testDictCursorWithNamedCursorFetchManyNoarg(self):
self._testWithNamedCursor(lambda curs: curs.fetchmany()[0])
def testDictCursorWithNamedCursorFetchAll(self):
self._testWithNamedCursor(lambda curs: curs.fetchall()[0])
def testDictCursorWithNamedCursorIter(self):
def getter(curs):
for row in curs:
return row
self._testWithNamedCursor(getter)
@skip_before_postgres(8, 2)
def testDictCursorWithNamedCursorNotGreedy(self):
curs = self.conn.cursor('tmp', cursor_factory=psycopg2.extras.DictCursor)
self._testNamedCursorNotGreedy(curs)
@skip_before_postgres(8, 0)
def testDictCursorWithNamedCursorIterRowNumber(self):
curs = self.conn.cursor('tmp', cursor_factory=psycopg2.extras.DictCursor)
self._testIterRowNumber(curs)
def _testWithNamedCursor(self, getter):
curs = self.conn.cursor('aname', cursor_factory=psycopg2.extras.DictCursor)
curs.execute("SELECT * FROM ExtrasDictCursorTests")
row = getter(curs)
self.failUnless(row['foo'] == 'bar')
self.failUnless(row[0] == 'bar')
def testDictCursorRealWithNamedCursorFetchOne(self):
self._testWithNamedCursorReal(lambda curs: curs.fetchone())
def testDictCursorRealWithNamedCursorFetchMany(self):
self._testWithNamedCursorReal(lambda curs: curs.fetchmany(100)[0])
def testDictCursorRealWithNamedCursorFetchManyNoarg(self):
self._testWithNamedCursorReal(lambda curs: curs.fetchmany()[0])
def testDictCursorRealWithNamedCursorFetchAll(self):
self._testWithNamedCursorReal(lambda curs: curs.fetchall()[0])
def testDictCursorRealWithNamedCursorIter(self):
def getter(curs):
for row in curs:
return row
self._testWithNamedCursorReal(getter)
@skip_before_postgres(8, 2)
def testDictCursorRealWithNamedCursorNotGreedy(self):
curs = self.conn.cursor('tmp', cursor_factory=psycopg2.extras.RealDictCursor)
self._testNamedCursorNotGreedy(curs)
@skip_before_postgres(8, 0)
def testDictCursorRealWithNamedCursorIterRowNumber(self):
curs = self.conn.cursor('tmp', cursor_factory=psycopg2.extras.RealDictCursor)
self._testIterRowNumber(curs)
def _testWithNamedCursorReal(self, getter):
curs = self.conn.cursor('aname',
cursor_factory=psycopg2.extras.RealDictCursor)
curs.execute("SELECT * FROM ExtrasDictCursorTests")
row = getter(curs)
self.failUnless(row['foo'] == 'bar')
def _testNamedCursorNotGreedy(self, curs):
curs.itersize = 2
curs.execute("""select clock_timestamp() as ts from generate_series(1,3)""")
recs = []
for t in curs:
time.sleep(0.01)
recs.append(t)
# check that the dataset was not fetched in a single gulp
self.assert_(recs[1]['ts'] - recs[0]['ts'] < timedelta(seconds=0.005))
self.assert_(recs[2]['ts'] - recs[1]['ts'] > timedelta(seconds=0.0099))
def _testIterRowNumber(self, curs):
# Only checking for dataset < itersize:
# see CursorTests.test_iter_named_cursor_rownumber
curs.itersize = 20
curs.execute("""select * from generate_series(1,10)""")
for i, r in enumerate(curs):
self.assertEqual(i + 1, curs.rownumber)
def testPickleDictRow(self):
import pickle
curs = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
curs.execute("select 10 as a, 20 as b")
r = curs.fetchone()
d = pickle.dumps(r)
r1 = pickle.loads(d)
self.assertEqual(r, r1)
self.assertEqual(r[0], r1[0])
self.assertEqual(r[1], r1[1])
self.assertEqual(r['a'], r1['a'])
self.assertEqual(r['b'], r1['b'])
self.assertEqual(r._index, r1._index)
def testPickleRealDictRow(self):
import pickle
curs = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
curs.execute("select 10 as a, 20 as b")
r = curs.fetchone()
d = pickle.dumps(r)
r1 = pickle.loads(d)
self.assertEqual(r, r1)
self.assertEqual(r['a'], r1['a'])
self.assertEqual(r['b'], r1['b'])
self.assertEqual(r._column_mapping, r1._column_mapping)
class NamedTupleCursorTest(ConnectingTestCase):
def setUp(self):
ConnectingTestCase.setUp(self)
from psycopg2.extras import NamedTupleConnection
self.conn = self.connect(connection_factory=NamedTupleConnection)
curs = self.conn.cursor()
curs.execute("CREATE TEMPORARY TABLE nttest (i int, s text)")
curs.execute("INSERT INTO nttest VALUES (1, 'foo')")
curs.execute("INSERT INTO nttest VALUES (2, 'bar')")
curs.execute("INSERT INTO nttest VALUES (3, 'baz')")
self.conn.commit()
def test_cursor_args(self):
cur = self.conn.cursor('foo', cursor_factory=psycopg2.extras.DictCursor)
self.assertEqual(cur.name, 'foo')
self.assert_(isinstance(cur, psycopg2.extras.DictCursor))
def test_fetchone(self):
curs = self.conn.cursor()
curs.execute("select * from nttest order by 1")
t = curs.fetchone()
self.assertEqual(t[0], 1)
self.assertEqual(t.i, 1)
self.assertEqual(t[1], 'foo')
self.assertEqual(t.s, 'foo')
self.assertEqual(curs.rownumber, 1)
self.assertEqual(curs.rowcount, 3)
def test_fetchmany_noarg(self):
curs = self.conn.cursor()
curs.arraysize = 2
curs.execute("select * from nttest order by 1")
res = curs.fetchmany()
self.assertEqual(2, len(res))
self.assertEqual(res[0].i, 1)
self.assertEqual(res[0].s, 'foo')
self.assertEqual(res[1].i, 2)
self.assertEqual(res[1].s, 'bar')
self.assertEqual(curs.rownumber, 2)
self.assertEqual(curs.rowcount, 3)
def test_fetchmany(self):
curs = self.conn.cursor()
curs.execute("select * from nttest order by 1")
res = curs.fetchmany(2)
self.assertEqual(2, len(res))
self.assertEqual(res[0].i, 1)
self.assertEqual(res[0].s, 'foo')
self.assertEqual(res[1].i, 2)
self.assertEqual(res[1].s, 'bar')
self.assertEqual(curs.rownumber, 2)
self.assertEqual(curs.rowcount, 3)
def test_fetchall(self):
curs = self.conn.cursor()
curs.execute("select * from nttest order by 1")
res = curs.fetchall()
self.assertEqual(3, len(res))
self.assertEqual(res[0].i, 1)
self.assertEqual(res[0].s, 'foo')
self.assertEqual(res[1].i, 2)
self.assertEqual(res[1].s, 'bar')
self.assertEqual(res[2].i, 3)
self.assertEqual(res[2].s, 'baz')
self.assertEqual(curs.rownumber, 3)
self.assertEqual(curs.rowcount, 3)
def test_executemany(self):
curs = self.conn.cursor()
curs.executemany("delete from nttest where i = %s",
[(1,), (2,)])
curs.execute("select * from nttest order by 1")
res = curs.fetchall()
self.assertEqual(1, len(res))
self.assertEqual(res[0].i, 3)
self.assertEqual(res[0].s, 'baz')
def test_iter(self):
curs = self.conn.cursor()
curs.execute("select * from nttest order by 1")
i = iter(curs)
self.assertEqual(curs.rownumber, 0)
t = next(i)
self.assertEqual(t.i, 1)
self.assertEqual(t.s, 'foo')
self.assertEqual(curs.rownumber, 1)
self.assertEqual(curs.rowcount, 3)
t = next(i)
self.assertEqual(t.i, 2)
self.assertEqual(t.s, 'bar')
self.assertEqual(curs.rownumber, 2)
self.assertEqual(curs.rowcount, 3)
t = next(i)
self.assertEqual(t.i, 3)
self.assertEqual(t.s, 'baz')
self.assertRaises(StopIteration, next, i)
self.assertEqual(curs.rownumber, 3)
self.assertEqual(curs.rowcount, 3)
def test_record_updated(self):
curs = self.conn.cursor()
curs.execute("select 1 as foo;")
r = curs.fetchone()
self.assertEqual(r.foo, 1)
curs.execute("select 2 as bar;")
r = curs.fetchone()
self.assertEqual(r.bar, 2)
self.assertRaises(AttributeError, getattr, r, 'foo')
def test_no_result_no_surprise(self):
curs = self.conn.cursor()
curs.execute("update nttest set s = s")
self.assertRaises(psycopg2.ProgrammingError, curs.fetchone)
curs.execute("update nttest set s = s")
self.assertRaises(psycopg2.ProgrammingError, curs.fetchall)
def test_bad_col_names(self):
curs = self.conn.cursor()
curs.execute('select 1 as "foo.bar_baz", 2 as "?column?", 3 as "3"')
rv = curs.fetchone()
self.assertEqual(rv.foo_bar_baz, 1)
self.assertEqual(rv.f_column_, 2)
self.assertEqual(rv.f3, 3)
@skip_before_python(3)
def test_nonascii_name(self):
curs = self.conn.cursor()
curs.execute('select 1 as \xe5h\xe9')
rv = curs.fetchone()
self.assertEqual(getattr(rv, '\xe5h\xe9'), 1)
def test_minimal_generation(self):
# Instrument the class to verify it gets called the minimum number of times.
from psycopg2.extras import NamedTupleCursor
f_orig = NamedTupleCursor._make_nt
calls = [0]
def f_patched(self_):
calls[0] += 1
return f_orig(self_)
NamedTupleCursor._make_nt = f_patched
try:
curs = self.conn.cursor()
curs.execute("select * from nttest order by 1")
curs.fetchone()
curs.fetchone()
curs.fetchone()
self.assertEqual(1, calls[0])
curs.execute("select * from nttest order by 1")
curs.fetchone()
curs.fetchall()
self.assertEqual(2, calls[0])
curs.execute("select * from nttest order by 1")
curs.fetchone()
curs.fetchmany(1)
self.assertEqual(3, calls[0])
finally:
NamedTupleCursor._make_nt = f_orig
@skip_before_postgres(8, 0)
def test_named(self):
curs = self.conn.cursor('tmp')
curs.execute("""select i from generate_series(0,9) i""")
recs = []
recs.extend(curs.fetchmany(5))
recs.append(curs.fetchone())
recs.extend(curs.fetchall())
self.assertEqual(list(range(10)), [t.i for t in recs])
def test_named_fetchone(self):
curs = self.conn.cursor('tmp')
curs.execute("""select 42 as i""")
t = curs.fetchone()
self.assertEqual(t.i, 42)
def test_named_fetchmany(self):
curs = self.conn.cursor('tmp')
curs.execute("""select 42 as i""")
recs = curs.fetchmany(10)
self.assertEqual(recs[0].i, 42)
def test_named_fetchall(self):
curs = self.conn.cursor('tmp')
curs.execute("""select 42 as i""")
recs = curs.fetchall()
self.assertEqual(recs[0].i, 42)
@skip_before_postgres(8, 2)
def test_not_greedy(self):
curs = self.conn.cursor('tmp')
curs.itersize = 2
curs.execute("""select clock_timestamp() as ts from generate_series(1,3)""")
recs = []
for t in curs:
time.sleep(0.01)
recs.append(t)
# check that the dataset was not fetched in a single gulp
self.assert_(recs[1].ts - recs[0].ts < timedelta(seconds=0.005))
self.assert_(recs[2].ts - recs[1].ts > timedelta(seconds=0.0099))
@skip_before_postgres(8, 0)
def test_named_rownumber(self):
curs = self.conn.cursor('tmp')
# Only checking for dataset < itersize:
# see CursorTests.test_iter_named_cursor_rownumber
curs.itersize = 4
curs.execute("""select * from generate_series(1,3)""")
for i, t in enumerate(curs):
self.assertEqual(i + 1, curs.rownumber)
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)
if __name__ == "__main__":
unittest.main()