psycopg2/tests/test_extras_dictcursor.py
Daniele Varrazzo b8597dc1d3 Fixed NamedTupleCursor rownumber during iteration.
The correction is similar to the other one for the other subclasses.

Also added tests for rowcount and rownumber during different fetch styles.
Just in case.
2012-02-23 22:58:58 +00:00

453 lines
16 KiB
Python
Executable File

#!/usr/bin/env python
#
# extras_dictcursor - test if DictCursor extension class works
#
# Copyright (C) 2004-2010 Federico Di Gregorio <fog@debian.org>
#
# psycopg2 is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
import time
from datetime import timedelta
import psycopg2
import psycopg2.extras
from testutils import unittest, skip_before_postgres, skip_if_no_namedtuple
from testconfig import dsn
class ExtrasDictCursorTests(unittest.TestCase):
"""Test if DictCursor extension class works."""
def setUp(self):
self.conn = psycopg2.connect(dsn)
curs = self.conn.cursor()
curs.execute("CREATE TEMPORARY TABLE ExtrasDictCursorTests (foo text)")
curs.execute("INSERT INTO ExtrasDictCursorTests VALUES ('bar')")
self.conn.commit()
def tearDown(self):
self.conn.close()
def testDictCursorWithPlainCursorFetchOne(self):
self._testWithPlainCursor(lambda curs: curs.fetchone())
def testDictCursorWithPlainCursorFetchMany(self):
self._testWithPlainCursor(lambda curs: curs.fetchmany(100)[0])
def testDictCursorWithPlainCursorFetchManyNoarg(self):
self._testWithPlainCursor(lambda curs: curs.fetchmany()[0])
def testDictCursorWithPlainCursorFetchAll(self):
self._testWithPlainCursor(lambda curs: curs.fetchall()[0])
def testDictCursorWithPlainCursorIter(self):
def getter(curs):
for row in curs:
return row
self._testWithPlainCursor(getter)
def testUpdateRow(self):
row = self._testWithPlainCursor(lambda curs: curs.fetchone())
row['foo'] = 'qux'
self.failUnless(row['foo'] == 'qux')
self.failUnless(row[0] == 'qux')
@skip_before_postgres(8, 0)
def testDictCursorWithPlainCursorIterRowNumber(self):
curs = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
self._testIterRowNumber(curs)
def _testWithPlainCursor(self, getter):
curs = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
curs.execute("SELECT * FROM ExtrasDictCursorTests")
row = getter(curs)
self.failUnless(row['foo'] == 'bar')
self.failUnless(row[0] == 'bar')
return row
def testDictCursorWithPlainCursorRealFetchOne(self):
self._testWithPlainCursorReal(lambda curs: curs.fetchone())
def testDictCursorWithPlainCursorRealFetchMany(self):
self._testWithPlainCursorReal(lambda curs: curs.fetchmany(100)[0])
def testDictCursorWithPlainCursorRealFetchManyNoarg(self):
self._testWithPlainCursorReal(lambda curs: curs.fetchmany()[0])
def testDictCursorWithPlainCursorRealFetchAll(self):
self._testWithPlainCursorReal(lambda curs: curs.fetchall()[0])
def testDictCursorWithPlainCursorRealIter(self):
def getter(curs):
for row in curs:
return row
self._testWithPlainCursorReal(getter)
@skip_before_postgres(8, 0)
def testDictCursorWithPlainCursorRealIterRowNumber(self):
curs = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
self._testIterRowNumber(curs)
def _testWithPlainCursorReal(self, getter):
curs = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
curs.execute("SELECT * FROM ExtrasDictCursorTests")
row = getter(curs)
self.failUnless(row['foo'] == 'bar')
def testDictCursorWithNamedCursorFetchOne(self):
self._testWithNamedCursor(lambda curs: curs.fetchone())
def testDictCursorWithNamedCursorFetchMany(self):
self._testWithNamedCursor(lambda curs: curs.fetchmany(100)[0])
def testDictCursorWithNamedCursorFetchManyNoarg(self):
self._testWithNamedCursor(lambda curs: curs.fetchmany()[0])
def testDictCursorWithNamedCursorFetchAll(self):
self._testWithNamedCursor(lambda curs: curs.fetchall()[0])
def testDictCursorWithNamedCursorIter(self):
def getter(curs):
for row in curs:
return row
self._testWithNamedCursor(getter)
@skip_before_postgres(8, 2)
def testDictCursorWithNamedCursorNotGreedy(self):
curs = self.conn.cursor('tmp', cursor_factory=psycopg2.extras.DictCursor)
self._testNamedCursorNotGreedy(curs)
@skip_before_postgres(8, 0)
def testDictCursorWithNamedCursorIterRowNumber(self):
curs = self.conn.cursor('tmp', cursor_factory=psycopg2.extras.DictCursor)
self._testIterRowNumber(curs)
def _testWithNamedCursor(self, getter):
curs = self.conn.cursor('aname', cursor_factory=psycopg2.extras.DictCursor)
curs.execute("SELECT * FROM ExtrasDictCursorTests")
row = getter(curs)
self.failUnless(row['foo'] == 'bar')
self.failUnless(row[0] == 'bar')
def testDictCursorRealWithNamedCursorFetchOne(self):
self._testWithNamedCursorReal(lambda curs: curs.fetchone())
def testDictCursorRealWithNamedCursorFetchMany(self):
self._testWithNamedCursorReal(lambda curs: curs.fetchmany(100)[0])
def testDictCursorRealWithNamedCursorFetchManyNoarg(self):
self._testWithNamedCursorReal(lambda curs: curs.fetchmany()[0])
def testDictCursorRealWithNamedCursorFetchAll(self):
self._testWithNamedCursorReal(lambda curs: curs.fetchall()[0])
def testDictCursorRealWithNamedCursorIter(self):
def getter(curs):
for row in curs:
return row
self._testWithNamedCursorReal(getter)
@skip_before_postgres(8, 2)
def testDictCursorRealWithNamedCursorNotGreedy(self):
curs = self.conn.cursor('tmp', cursor_factory=psycopg2.extras.RealDictCursor)
self._testNamedCursorNotGreedy(curs)
@skip_before_postgres(8, 0)
def testDictCursorRealWithNamedCursorIterRowNumber(self):
curs = self.conn.cursor('tmp', cursor_factory=psycopg2.extras.RealDictCursor)
self._testIterRowNumber(curs)
def _testWithNamedCursorReal(self, getter):
curs = self.conn.cursor('aname', cursor_factory=psycopg2.extras.RealDictCursor)
curs.execute("SELECT * FROM ExtrasDictCursorTests")
row = getter(curs)
self.failUnless(row['foo'] == 'bar')
def _testNamedCursorNotGreedy(self, curs):
curs.itersize = 2
curs.execute("""select clock_timestamp() as ts from generate_series(1,3)""")
recs = []
for t in curs:
time.sleep(0.01)
recs.append(t)
# check that the dataset was not fetched in a single gulp
self.assert_(recs[1]['ts'] - recs[0]['ts'] < timedelta(seconds=0.005))
self.assert_(recs[2]['ts'] - recs[1]['ts'] > timedelta(seconds=0.0099))
def _testIterRowNumber(self, curs):
# Only checking for dataset < itersize:
# see CursorTests.test_iter_named_cursor_rownumber
curs.itersize = 20
curs.execute("""select * from generate_series(1,10)""")
for i, r in enumerate(curs):
self.assertEqual(i + 1, curs.rownumber)
class NamedTupleCursorTest(unittest.TestCase):
def setUp(self):
from psycopg2.extras import NamedTupleConnection
try:
from collections import namedtuple
except ImportError:
self.conn = None
return
self.conn = psycopg2.connect(dsn,
connection_factory=NamedTupleConnection)
curs = self.conn.cursor()
curs.execute("CREATE TEMPORARY TABLE nttest (i int, s text)")
curs.execute("INSERT INTO nttest VALUES (1, 'foo')")
curs.execute("INSERT INTO nttest VALUES (2, 'bar')")
curs.execute("INSERT INTO nttest VALUES (3, 'baz')")
self.conn.commit()
def tearDown(self):
if self.conn is not None:
self.conn.close()
@skip_if_no_namedtuple
def test_fetchone(self):
curs = self.conn.cursor()
curs.execute("select * from nttest order by 1")
t = curs.fetchone()
self.assertEqual(t[0], 1)
self.assertEqual(t.i, 1)
self.assertEqual(t[1], 'foo')
self.assertEqual(t.s, 'foo')
self.assertEqual(curs.rownumber, 1)
self.assertEqual(curs.rowcount, 3)
@skip_if_no_namedtuple
def test_fetchmany_noarg(self):
curs = self.conn.cursor()
curs.arraysize = 2
curs.execute("select * from nttest order by 1")
res = curs.fetchmany()
self.assertEqual(2, len(res))
self.assertEqual(res[0].i, 1)
self.assertEqual(res[0].s, 'foo')
self.assertEqual(res[1].i, 2)
self.assertEqual(res[1].s, 'bar')
self.assertEqual(curs.rownumber, 2)
self.assertEqual(curs.rowcount, 3)
@skip_if_no_namedtuple
def test_fetchmany(self):
curs = self.conn.cursor()
curs.execute("select * from nttest order by 1")
res = curs.fetchmany(2)
self.assertEqual(2, len(res))
self.assertEqual(res[0].i, 1)
self.assertEqual(res[0].s, 'foo')
self.assertEqual(res[1].i, 2)
self.assertEqual(res[1].s, 'bar')
self.assertEqual(curs.rownumber, 2)
self.assertEqual(curs.rowcount, 3)
@skip_if_no_namedtuple
def test_fetchall(self):
curs = self.conn.cursor()
curs.execute("select * from nttest order by 1")
res = curs.fetchall()
self.assertEqual(3, len(res))
self.assertEqual(res[0].i, 1)
self.assertEqual(res[0].s, 'foo')
self.assertEqual(res[1].i, 2)
self.assertEqual(res[1].s, 'bar')
self.assertEqual(res[2].i, 3)
self.assertEqual(res[2].s, 'baz')
self.assertEqual(curs.rownumber, 3)
self.assertEqual(curs.rowcount, 3)
@skip_if_no_namedtuple
def test_executemany(self):
curs = self.conn.cursor()
curs.executemany("delete from nttest where i = %s",
[(1,), (2,)])
curs.execute("select * from nttest order by 1")
res = curs.fetchall()
self.assertEqual(1, len(res))
self.assertEqual(res[0].i, 3)
self.assertEqual(res[0].s, 'baz')
@skip_if_no_namedtuple
def test_iter(self):
curs = self.conn.cursor()
curs.execute("select * from nttest order by 1")
i = iter(curs)
self.assertEqual(curs.rownumber, 0)
t = i.next()
self.assertEqual(t.i, 1)
self.assertEqual(t.s, 'foo')
self.assertEqual(curs.rownumber, 1)
self.assertEqual(curs.rowcount, 3)
t = i.next()
self.assertEqual(t.i, 2)
self.assertEqual(t.s, 'bar')
self.assertEqual(curs.rownumber, 2)
self.assertEqual(curs.rowcount, 3)
t = i.next()
self.assertEqual(t.i, 3)
self.assertEqual(t.s, 'baz')
self.assertRaises(StopIteration, i.next)
self.assertEqual(curs.rownumber, 3)
self.assertEqual(curs.rowcount, 3)
def test_error_message(self):
try:
from collections import namedtuple
except ImportError:
# an import error somewhere
from psycopg2.extras import NamedTupleConnection
try:
if self.conn is not None:
self.conn.close()
self.conn = psycopg2.connect(dsn,
connection_factory=NamedTupleConnection)
curs = self.conn.cursor()
curs.execute("select 1")
curs.fetchone()
except ImportError:
pass
else:
self.fail("expecting ImportError")
else:
# skip the test
pass
@skip_if_no_namedtuple
def test_record_updated(self):
curs = self.conn.cursor()
curs.execute("select 1 as foo;")
r = curs.fetchone()
self.assertEqual(r.foo, 1)
curs.execute("select 2 as bar;")
r = curs.fetchone()
self.assertEqual(r.bar, 2)
self.assertRaises(AttributeError, getattr, r, 'foo')
@skip_if_no_namedtuple
def test_no_result_no_surprise(self):
curs = self.conn.cursor()
curs.execute("update nttest set s = s")
self.assertRaises(psycopg2.ProgrammingError, curs.fetchone)
curs.execute("update nttest set s = s")
self.assertRaises(psycopg2.ProgrammingError, curs.fetchall)
@skip_if_no_namedtuple
def test_minimal_generation(self):
# Instrument the class to verify it gets called the minimum number of times.
from psycopg2.extras import NamedTupleCursor
f_orig = NamedTupleCursor._make_nt
calls = [0]
def f_patched(self_):
calls[0] += 1
return f_orig(self_)
NamedTupleCursor._make_nt = f_patched
try:
curs = self.conn.cursor()
curs.execute("select * from nttest order by 1")
curs.fetchone()
curs.fetchone()
curs.fetchone()
self.assertEqual(1, calls[0])
curs.execute("select * from nttest order by 1")
curs.fetchone()
curs.fetchall()
self.assertEqual(2, calls[0])
curs.execute("select * from nttest order by 1")
curs.fetchone()
curs.fetchmany(1)
self.assertEqual(3, calls[0])
finally:
NamedTupleCursor._make_nt = f_orig
@skip_if_no_namedtuple
@skip_before_postgres(8, 0)
def test_named(self):
curs = self.conn.cursor('tmp')
curs.execute("""select i from generate_series(0,9) i""")
recs = []
recs.extend(curs.fetchmany(5))
recs.append(curs.fetchone())
recs.extend(curs.fetchall())
self.assertEqual(range(10), [t.i for t in recs])
@skip_if_no_namedtuple
def test_named_fetchone(self):
curs = self.conn.cursor('tmp')
curs.execute("""select 42 as i""")
t = curs.fetchone()
self.assertEqual(t.i, 42)
@skip_if_no_namedtuple
def test_named_fetchmany(self):
curs = self.conn.cursor('tmp')
curs.execute("""select 42 as i""")
recs = curs.fetchmany(10)
self.assertEqual(recs[0].i, 42)
@skip_if_no_namedtuple
def test_named_fetchall(self):
curs = self.conn.cursor('tmp')
curs.execute("""select 42 as i""")
recs = curs.fetchall()
self.assertEqual(recs[0].i, 42)
@skip_if_no_namedtuple
@skip_before_postgres(8, 2)
def test_not_greedy(self):
curs = self.conn.cursor('tmp')
curs.itersize = 2
curs.execute("""select clock_timestamp() as ts from generate_series(1,3)""")
recs = []
for t in curs:
time.sleep(0.01)
recs.append(t)
# check that the dataset was not fetched in a single gulp
self.assert_(recs[1].ts - recs[0].ts < timedelta(seconds=0.005))
self.assert_(recs[2].ts - recs[1].ts > timedelta(seconds=0.0099))
@skip_if_no_namedtuple
@skip_before_postgres(8, 0)
def test_named_rownumber(self):
curs = self.conn.cursor('tmp')
# Only checking for dataset < itersize:
# see CursorTests.test_iter_named_cursor_rownumber
curs.itersize = 4
curs.execute("""select * from generate_series(1,3)""")
for i, t in enumerate(curs):
self.assertEqual(i + 1, curs.rownumber)
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)
if __name__ == "__main__":
unittest.main()