mirror of
https://github.com/psycopg/psycopg2.git
synced 2024-11-11 11:36:37 +03:00
05c28cce78
namedtuple is available on all Python versions supported by psycopg2. It was first introduced in Python 2.6. Can remove all workarounds and special documentation.
442 lines
16 KiB
Python
Executable File
442 lines
16 KiB
Python
Executable File
#!/usr/bin/env python
|
|
#
|
|
# extras_dictcursor - test if DictCursor extension class works
|
|
#
|
|
# Copyright (C) 2004-2010 Federico Di Gregorio <fog@debian.org>
|
|
#
|
|
# psycopg2 is free software: you can redistribute it and/or modify it
|
|
# under the terms of the GNU Lesser General Public License as published
|
|
# by the Free Software Foundation, either version 3 of the License, or
|
|
# (at your option) any later version.
|
|
#
|
|
# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
|
|
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
|
# License for more details.
|
|
|
|
import time
|
|
from datetime import timedelta
|
|
import psycopg2
|
|
import psycopg2.extras
|
|
from testutils import unittest, ConnectingTestCase, skip_before_postgres
|
|
|
|
|
|
class ExtrasDictCursorTests(ConnectingTestCase):
|
|
"""Test if DictCursor extension class works."""
|
|
|
|
def setUp(self):
|
|
ConnectingTestCase.setUp(self)
|
|
curs = self.conn.cursor()
|
|
curs.execute("CREATE TEMPORARY TABLE ExtrasDictCursorTests (foo text)")
|
|
curs.execute("INSERT INTO ExtrasDictCursorTests VALUES ('bar')")
|
|
self.conn.commit()
|
|
|
|
def testDictConnCursorArgs(self):
|
|
self.conn.close()
|
|
self.conn = self.connect(connection_factory=psycopg2.extras.DictConnection)
|
|
cur = self.conn.cursor()
|
|
self.assert_(isinstance(cur, psycopg2.extras.DictCursor))
|
|
self.assertEqual(cur.name, None)
|
|
# overridable
|
|
cur = self.conn.cursor('foo',
|
|
cursor_factory=psycopg2.extras.NamedTupleCursor)
|
|
self.assertEqual(cur.name, 'foo')
|
|
self.assert_(isinstance(cur, psycopg2.extras.NamedTupleCursor))
|
|
|
|
def testDictCursorWithPlainCursorFetchOne(self):
|
|
self._testWithPlainCursor(lambda curs: curs.fetchone())
|
|
|
|
def testDictCursorWithPlainCursorFetchMany(self):
|
|
self._testWithPlainCursor(lambda curs: curs.fetchmany(100)[0])
|
|
|
|
def testDictCursorWithPlainCursorFetchManyNoarg(self):
|
|
self._testWithPlainCursor(lambda curs: curs.fetchmany()[0])
|
|
|
|
def testDictCursorWithPlainCursorFetchAll(self):
|
|
self._testWithPlainCursor(lambda curs: curs.fetchall()[0])
|
|
|
|
def testDictCursorWithPlainCursorIter(self):
|
|
def getter(curs):
|
|
for row in curs:
|
|
return row
|
|
self._testWithPlainCursor(getter)
|
|
|
|
def testUpdateRow(self):
|
|
row = self._testWithPlainCursor(lambda curs: curs.fetchone())
|
|
row['foo'] = 'qux'
|
|
self.failUnless(row['foo'] == 'qux')
|
|
self.failUnless(row[0] == 'qux')
|
|
|
|
@skip_before_postgres(8, 0)
|
|
def testDictCursorWithPlainCursorIterRowNumber(self):
|
|
curs = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
|
|
self._testIterRowNumber(curs)
|
|
|
|
def _testWithPlainCursor(self, getter):
|
|
curs = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
|
|
curs.execute("SELECT * FROM ExtrasDictCursorTests")
|
|
row = getter(curs)
|
|
self.failUnless(row['foo'] == 'bar')
|
|
self.failUnless(row[0] == 'bar')
|
|
return row
|
|
|
|
def testDictCursorWithPlainCursorRealFetchOne(self):
|
|
self._testWithPlainCursorReal(lambda curs: curs.fetchone())
|
|
|
|
def testDictCursorWithPlainCursorRealFetchMany(self):
|
|
self._testWithPlainCursorReal(lambda curs: curs.fetchmany(100)[0])
|
|
|
|
def testDictCursorWithPlainCursorRealFetchManyNoarg(self):
|
|
self._testWithPlainCursorReal(lambda curs: curs.fetchmany()[0])
|
|
|
|
def testDictCursorWithPlainCursorRealFetchAll(self):
|
|
self._testWithPlainCursorReal(lambda curs: curs.fetchall()[0])
|
|
|
|
def testDictCursorWithPlainCursorRealIter(self):
|
|
def getter(curs):
|
|
for row in curs:
|
|
return row
|
|
self._testWithPlainCursorReal(getter)
|
|
|
|
@skip_before_postgres(8, 0)
|
|
def testDictCursorWithPlainCursorRealIterRowNumber(self):
|
|
curs = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
|
self._testIterRowNumber(curs)
|
|
|
|
def _testWithPlainCursorReal(self, getter):
|
|
curs = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
|
curs.execute("SELECT * FROM ExtrasDictCursorTests")
|
|
row = getter(curs)
|
|
self.failUnless(row['foo'] == 'bar')
|
|
|
|
def testDictCursorWithNamedCursorFetchOne(self):
|
|
self._testWithNamedCursor(lambda curs: curs.fetchone())
|
|
|
|
def testDictCursorWithNamedCursorFetchMany(self):
|
|
self._testWithNamedCursor(lambda curs: curs.fetchmany(100)[0])
|
|
|
|
def testDictCursorWithNamedCursorFetchManyNoarg(self):
|
|
self._testWithNamedCursor(lambda curs: curs.fetchmany()[0])
|
|
|
|
def testDictCursorWithNamedCursorFetchAll(self):
|
|
self._testWithNamedCursor(lambda curs: curs.fetchall()[0])
|
|
|
|
def testDictCursorWithNamedCursorIter(self):
|
|
def getter(curs):
|
|
for row in curs:
|
|
return row
|
|
self._testWithNamedCursor(getter)
|
|
|
|
@skip_before_postgres(8, 2)
|
|
def testDictCursorWithNamedCursorNotGreedy(self):
|
|
curs = self.conn.cursor('tmp', cursor_factory=psycopg2.extras.DictCursor)
|
|
self._testNamedCursorNotGreedy(curs)
|
|
|
|
@skip_before_postgres(8, 0)
|
|
def testDictCursorWithNamedCursorIterRowNumber(self):
|
|
curs = self.conn.cursor('tmp', cursor_factory=psycopg2.extras.DictCursor)
|
|
self._testIterRowNumber(curs)
|
|
|
|
def _testWithNamedCursor(self, getter):
|
|
curs = self.conn.cursor('aname', cursor_factory=psycopg2.extras.DictCursor)
|
|
curs.execute("SELECT * FROM ExtrasDictCursorTests")
|
|
row = getter(curs)
|
|
self.failUnless(row['foo'] == 'bar')
|
|
self.failUnless(row[0] == 'bar')
|
|
|
|
def testDictCursorRealWithNamedCursorFetchOne(self):
|
|
self._testWithNamedCursorReal(lambda curs: curs.fetchone())
|
|
|
|
def testDictCursorRealWithNamedCursorFetchMany(self):
|
|
self._testWithNamedCursorReal(lambda curs: curs.fetchmany(100)[0])
|
|
|
|
def testDictCursorRealWithNamedCursorFetchManyNoarg(self):
|
|
self._testWithNamedCursorReal(lambda curs: curs.fetchmany()[0])
|
|
|
|
def testDictCursorRealWithNamedCursorFetchAll(self):
|
|
self._testWithNamedCursorReal(lambda curs: curs.fetchall()[0])
|
|
|
|
def testDictCursorRealWithNamedCursorIter(self):
|
|
def getter(curs):
|
|
for row in curs:
|
|
return row
|
|
self._testWithNamedCursorReal(getter)
|
|
|
|
@skip_before_postgres(8, 2)
|
|
def testDictCursorRealWithNamedCursorNotGreedy(self):
|
|
curs = self.conn.cursor('tmp', cursor_factory=psycopg2.extras.RealDictCursor)
|
|
self._testNamedCursorNotGreedy(curs)
|
|
|
|
@skip_before_postgres(8, 0)
|
|
def testDictCursorRealWithNamedCursorIterRowNumber(self):
|
|
curs = self.conn.cursor('tmp', cursor_factory=psycopg2.extras.RealDictCursor)
|
|
self._testIterRowNumber(curs)
|
|
|
|
def _testWithNamedCursorReal(self, getter):
|
|
curs = self.conn.cursor('aname',
|
|
cursor_factory=psycopg2.extras.RealDictCursor)
|
|
curs.execute("SELECT * FROM ExtrasDictCursorTests")
|
|
row = getter(curs)
|
|
self.failUnless(row['foo'] == 'bar')
|
|
|
|
def _testNamedCursorNotGreedy(self, curs):
|
|
curs.itersize = 2
|
|
curs.execute("""select clock_timestamp() as ts from generate_series(1,3)""")
|
|
recs = []
|
|
for t in curs:
|
|
time.sleep(0.01)
|
|
recs.append(t)
|
|
|
|
# check that the dataset was not fetched in a single gulp
|
|
self.assert_(recs[1]['ts'] - recs[0]['ts'] < timedelta(seconds=0.005))
|
|
self.assert_(recs[2]['ts'] - recs[1]['ts'] > timedelta(seconds=0.0099))
|
|
|
|
def _testIterRowNumber(self, curs):
|
|
# Only checking for dataset < itersize:
|
|
# see CursorTests.test_iter_named_cursor_rownumber
|
|
curs.itersize = 20
|
|
curs.execute("""select * from generate_series(1,10)""")
|
|
for i, r in enumerate(curs):
|
|
self.assertEqual(i + 1, curs.rownumber)
|
|
|
|
def testPickleDictRow(self):
|
|
import pickle
|
|
curs = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
|
|
curs.execute("select 10 as a, 20 as b")
|
|
r = curs.fetchone()
|
|
d = pickle.dumps(r)
|
|
r1 = pickle.loads(d)
|
|
self.assertEqual(r, r1)
|
|
self.assertEqual(r[0], r1[0])
|
|
self.assertEqual(r[1], r1[1])
|
|
self.assertEqual(r['a'], r1['a'])
|
|
self.assertEqual(r['b'], r1['b'])
|
|
self.assertEqual(r._index, r1._index)
|
|
|
|
def testPickleRealDictRow(self):
|
|
import pickle
|
|
curs = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
|
|
curs.execute("select 10 as a, 20 as b")
|
|
r = curs.fetchone()
|
|
d = pickle.dumps(r)
|
|
r1 = pickle.loads(d)
|
|
self.assertEqual(r, r1)
|
|
self.assertEqual(r['a'], r1['a'])
|
|
self.assertEqual(r['b'], r1['b'])
|
|
self.assertEqual(r._column_mapping, r1._column_mapping)
|
|
|
|
|
|
class NamedTupleCursorTest(ConnectingTestCase):
|
|
def setUp(self):
|
|
ConnectingTestCase.setUp(self)
|
|
from psycopg2.extras import NamedTupleConnection
|
|
|
|
self.conn = self.connect(connection_factory=NamedTupleConnection)
|
|
curs = self.conn.cursor()
|
|
curs.execute("CREATE TEMPORARY TABLE nttest (i int, s text)")
|
|
curs.execute("INSERT INTO nttest VALUES (1, 'foo')")
|
|
curs.execute("INSERT INTO nttest VALUES (2, 'bar')")
|
|
curs.execute("INSERT INTO nttest VALUES (3, 'baz')")
|
|
self.conn.commit()
|
|
|
|
def test_cursor_args(self):
|
|
cur = self.conn.cursor('foo', cursor_factory=psycopg2.extras.DictCursor)
|
|
self.assertEqual(cur.name, 'foo')
|
|
self.assert_(isinstance(cur, psycopg2.extras.DictCursor))
|
|
|
|
def test_fetchone(self):
|
|
curs = self.conn.cursor()
|
|
curs.execute("select * from nttest order by 1")
|
|
t = curs.fetchone()
|
|
self.assertEqual(t[0], 1)
|
|
self.assertEqual(t.i, 1)
|
|
self.assertEqual(t[1], 'foo')
|
|
self.assertEqual(t.s, 'foo')
|
|
self.assertEqual(curs.rownumber, 1)
|
|
self.assertEqual(curs.rowcount, 3)
|
|
|
|
def test_fetchmany_noarg(self):
|
|
curs = self.conn.cursor()
|
|
curs.arraysize = 2
|
|
curs.execute("select * from nttest order by 1")
|
|
res = curs.fetchmany()
|
|
self.assertEqual(2, len(res))
|
|
self.assertEqual(res[0].i, 1)
|
|
self.assertEqual(res[0].s, 'foo')
|
|
self.assertEqual(res[1].i, 2)
|
|
self.assertEqual(res[1].s, 'bar')
|
|
self.assertEqual(curs.rownumber, 2)
|
|
self.assertEqual(curs.rowcount, 3)
|
|
|
|
def test_fetchmany(self):
|
|
curs = self.conn.cursor()
|
|
curs.execute("select * from nttest order by 1")
|
|
res = curs.fetchmany(2)
|
|
self.assertEqual(2, len(res))
|
|
self.assertEqual(res[0].i, 1)
|
|
self.assertEqual(res[0].s, 'foo')
|
|
self.assertEqual(res[1].i, 2)
|
|
self.assertEqual(res[1].s, 'bar')
|
|
self.assertEqual(curs.rownumber, 2)
|
|
self.assertEqual(curs.rowcount, 3)
|
|
|
|
def test_fetchall(self):
|
|
curs = self.conn.cursor()
|
|
curs.execute("select * from nttest order by 1")
|
|
res = curs.fetchall()
|
|
self.assertEqual(3, len(res))
|
|
self.assertEqual(res[0].i, 1)
|
|
self.assertEqual(res[0].s, 'foo')
|
|
self.assertEqual(res[1].i, 2)
|
|
self.assertEqual(res[1].s, 'bar')
|
|
self.assertEqual(res[2].i, 3)
|
|
self.assertEqual(res[2].s, 'baz')
|
|
self.assertEqual(curs.rownumber, 3)
|
|
self.assertEqual(curs.rowcount, 3)
|
|
|
|
def test_executemany(self):
|
|
curs = self.conn.cursor()
|
|
curs.executemany("delete from nttest where i = %s",
|
|
[(1,), (2,)])
|
|
curs.execute("select * from nttest order by 1")
|
|
res = curs.fetchall()
|
|
self.assertEqual(1, len(res))
|
|
self.assertEqual(res[0].i, 3)
|
|
self.assertEqual(res[0].s, 'baz')
|
|
|
|
def test_iter(self):
|
|
curs = self.conn.cursor()
|
|
curs.execute("select * from nttest order by 1")
|
|
i = iter(curs)
|
|
self.assertEqual(curs.rownumber, 0)
|
|
|
|
t = i.next()
|
|
self.assertEqual(t.i, 1)
|
|
self.assertEqual(t.s, 'foo')
|
|
self.assertEqual(curs.rownumber, 1)
|
|
self.assertEqual(curs.rowcount, 3)
|
|
|
|
t = i.next()
|
|
self.assertEqual(t.i, 2)
|
|
self.assertEqual(t.s, 'bar')
|
|
self.assertEqual(curs.rownumber, 2)
|
|
self.assertEqual(curs.rowcount, 3)
|
|
|
|
t = i.next()
|
|
self.assertEqual(t.i, 3)
|
|
self.assertEqual(t.s, 'baz')
|
|
self.assertRaises(StopIteration, i.next)
|
|
self.assertEqual(curs.rownumber, 3)
|
|
self.assertEqual(curs.rowcount, 3)
|
|
|
|
def test_record_updated(self):
|
|
curs = self.conn.cursor()
|
|
curs.execute("select 1 as foo;")
|
|
r = curs.fetchone()
|
|
self.assertEqual(r.foo, 1)
|
|
|
|
curs.execute("select 2 as bar;")
|
|
r = curs.fetchone()
|
|
self.assertEqual(r.bar, 2)
|
|
self.assertRaises(AttributeError, getattr, r, 'foo')
|
|
|
|
def test_no_result_no_surprise(self):
|
|
curs = self.conn.cursor()
|
|
curs.execute("update nttest set s = s")
|
|
self.assertRaises(psycopg2.ProgrammingError, curs.fetchone)
|
|
|
|
curs.execute("update nttest set s = s")
|
|
self.assertRaises(psycopg2.ProgrammingError, curs.fetchall)
|
|
|
|
def test_minimal_generation(self):
|
|
# Instrument the class to verify it gets called the minimum number of times.
|
|
from psycopg2.extras import NamedTupleCursor
|
|
f_orig = NamedTupleCursor._make_nt
|
|
calls = [0]
|
|
|
|
def f_patched(self_):
|
|
calls[0] += 1
|
|
return f_orig(self_)
|
|
|
|
NamedTupleCursor._make_nt = f_patched
|
|
|
|
try:
|
|
curs = self.conn.cursor()
|
|
curs.execute("select * from nttest order by 1")
|
|
curs.fetchone()
|
|
curs.fetchone()
|
|
curs.fetchone()
|
|
self.assertEqual(1, calls[0])
|
|
|
|
curs.execute("select * from nttest order by 1")
|
|
curs.fetchone()
|
|
curs.fetchall()
|
|
self.assertEqual(2, calls[0])
|
|
|
|
curs.execute("select * from nttest order by 1")
|
|
curs.fetchone()
|
|
curs.fetchmany(1)
|
|
self.assertEqual(3, calls[0])
|
|
|
|
finally:
|
|
NamedTupleCursor._make_nt = f_orig
|
|
|
|
@skip_before_postgres(8, 0)
|
|
def test_named(self):
|
|
curs = self.conn.cursor('tmp')
|
|
curs.execute("""select i from generate_series(0,9) i""")
|
|
recs = []
|
|
recs.extend(curs.fetchmany(5))
|
|
recs.append(curs.fetchone())
|
|
recs.extend(curs.fetchall())
|
|
self.assertEqual(range(10), [t.i for t in recs])
|
|
|
|
def test_named_fetchone(self):
|
|
curs = self.conn.cursor('tmp')
|
|
curs.execute("""select 42 as i""")
|
|
t = curs.fetchone()
|
|
self.assertEqual(t.i, 42)
|
|
|
|
def test_named_fetchmany(self):
|
|
curs = self.conn.cursor('tmp')
|
|
curs.execute("""select 42 as i""")
|
|
recs = curs.fetchmany(10)
|
|
self.assertEqual(recs[0].i, 42)
|
|
|
|
def test_named_fetchall(self):
|
|
curs = self.conn.cursor('tmp')
|
|
curs.execute("""select 42 as i""")
|
|
recs = curs.fetchall()
|
|
self.assertEqual(recs[0].i, 42)
|
|
|
|
@skip_before_postgres(8, 2)
|
|
def test_not_greedy(self):
|
|
curs = self.conn.cursor('tmp')
|
|
curs.itersize = 2
|
|
curs.execute("""select clock_timestamp() as ts from generate_series(1,3)""")
|
|
recs = []
|
|
for t in curs:
|
|
time.sleep(0.01)
|
|
recs.append(t)
|
|
|
|
# check that the dataset was not fetched in a single gulp
|
|
self.assert_(recs[1].ts - recs[0].ts < timedelta(seconds=0.005))
|
|
self.assert_(recs[2].ts - recs[1].ts > timedelta(seconds=0.0099))
|
|
|
|
@skip_before_postgres(8, 0)
|
|
def test_named_rownumber(self):
|
|
curs = self.conn.cursor('tmp')
|
|
# Only checking for dataset < itersize:
|
|
# see CursorTests.test_iter_named_cursor_rownumber
|
|
curs.itersize = 4
|
|
curs.execute("""select * from generate_series(1,3)""")
|
|
for i, t in enumerate(curs):
|
|
self.assertEqual(i + 1, curs.rownumber)
|
|
|
|
|
|
def test_suite():
|
|
return unittest.TestLoader().loadTestsFromName(__name__)
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|