Merge branch 'fix-886'

This commit is contained in:
Daniele Varrazzo 2019-04-06 20:09:47 +01:00
commit 5c4b8a3d1f
3 changed files with 62 additions and 44 deletions

1
NEWS
View File

@ -4,6 +4,7 @@ Current release
What's new in psycopg 2.8.1
---------------------------
- Fixed `RealDictRow` modifiability (:ticket:`#886`).
- Fixed "there's no async cursor" error polling a connection with no cursor
(:ticket:`#887`).

View File

@ -253,56 +253,39 @@ class RealDictCursor(DictCursorBase):
self._query_executed = False
class RealDictRow(dict):
class RealDictRow(OrderedDict):
"""A `!dict` subclass representing a data record."""
__slots__ = ('_column_mapping',)
def __init__(self, *args, **kwargs):
if args and isinstance(args[0], _cursor):
cursor = args[0]
args = args[1:]
else:
cursor = None
def __init__(self, cursor):
super(RealDictRow, self).__init__()
# Required for named cursors
if cursor.description and not cursor.column_mapping:
cursor._build_index()
super(RealDictRow, self).__init__(*args, **kwargs)
self._column_mapping = cursor.column_mapping
if cursor is not None:
# Required for named cursors
if cursor.description and not cursor.column_mapping:
cursor._build_index()
def __setitem__(self, name, value):
if type(name) == int:
name = self._column_mapping[name]
super(RealDictRow, self).__setitem__(name, value)
# Store the cols mapping in the dict itself until the row is fully
# populated, so we don't need to add attributes to the class
# (hence keeping its maintenance, special pickle support, etc.)
self[RealDictRow] = cursor.column_mapping
def __getstate__(self):
return self.copy(), self._column_mapping[:]
def __setitem__(self, key, value):
if RealDictRow in self:
# We are in the row building phase
mapping = self[RealDictRow]
super(RealDictRow, self).__setitem__(mapping[key], value)
if len(self) == len(mapping) + 1:
# Row building finished
del self[RealDictRow]
return
def __setstate__(self, data):
self.update(data[0])
self._column_mapping = data[1]
def __iter__(self):
return iter(self._column_mapping)
def keys(self):
return iter(self._column_mapping)
def values(self):
return (self[k] for k in self._column_mapping)
def items(self):
return ((k, self[k]) for k in self._column_mapping)
if PY2:
iterkeys = keys
itervalues = values
iteritems = items
def keys(self):
return list(self.iterkeys())
def values(self):
return list(self.itervalues())
def items(self):
return list(self.iteritems())
super(RealDictRow, self).__setitem__(key, value)
class NamedTupleConnection(_connection):

View File

@ -221,6 +221,12 @@ class ExtrasDictCursorTests(_DictCursorBase):
class ExtrasDictCursorRealTests(_DictCursorBase):
def testRealMeansReal(self):
curs = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
curs.execute("SELECT * FROM ExtrasDictCursorTests")
row = curs.fetchone()
self.assert_(isinstance(row, dict))
def testDictCursorWithPlainCursorRealFetchOne(self):
self._testWithPlainCursorReal(lambda curs: curs.fetchone())
@ -259,7 +265,6 @@ class ExtrasDictCursorRealTests(_DictCursorBase):
self.assertEqual(r, r1)
self.assertEqual(r['a'], r1['a'])
self.assertEqual(r['b'], r1['b'])
self.assertEqual(r._column_mapping, r1._column_mapping)
def testDictCursorRealWithNamedCursorFetchOne(self):
self._testWithNamedCursorReal(lambda curs: curs.fetchone())
@ -358,6 +363,35 @@ class ExtrasDictCursorRealTests(_DictCursorBase):
self.assertEqual(list(r1.itervalues()), list(r.itervalues()))
self.assertEqual(list(r1.iteritems()), list(r.iteritems()))
def test_pop(self):
curs = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
curs.execute("select 1 as a, 2 as b, 3 as c")
r = curs.fetchone()
self.assertEqual(r.pop('b'), 2)
self.assertEqual(list(r), ['a', 'c'])
self.assertEqual(list(r.keys()), ['a', 'c'])
self.assertEqual(list(r.values()), [1, 3])
self.assertEqual(list(r.items()), [('a', 1), ('c', 3)])
self.assertEqual(r.pop('b', None), None)
self.assertRaises(KeyError, r.pop, 'b')
def test_mod(self):
curs = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
curs.execute("select 1 as a, 2 as b, 3 as c")
r = curs.fetchone()
r['d'] = 4
self.assertEqual(list(r), ['a', 'b', 'c', 'd'])
self.assertEqual(list(r.keys()), ['a', 'b', 'c', 'd'])
self.assertEqual(list(r.values()), [1, 2, 3, 4])
self.assertEqual(list(
r.items()), [('a', 1), ('b', 2), ('c', 3), ('d', 4)])
assert r['a'] == 1
assert r['b'] == 2
assert r['c'] == 3
assert r['d'] == 4
class NamedTupleCursorTest(ConnectingTestCase):
def setUp(self):