First try at curs.withhold implementation

This commit is contained in:
Federico Di Gregorio 2011-07-05 10:28:34 +02:00
parent cb1d163f4f
commit 2f6336ea78
4 changed files with 74 additions and 11 deletions

View File

@ -42,6 +42,7 @@ struct cursorObject {
int closed:1; /* 1 if the cursor is closed */
int notuples:1; /* 1 if the command was not a SELECT query */
int withhold:1; /* 1 if the cursor is named and uses WITH HOLD */
long int rowcount; /* number of rows affected by last execute */
long int columns; /* number of columns fetched from the db */
@ -99,7 +100,7 @@ if ((self)->notuples && (self)->name == NULL) { \
return NULL; }
#define EXC_IF_NO_MARK(self) \
if ((self)->mark != (self)->conn->mark) { \
if ((self)->mark != (self)->conn->mark && (self)->withhold == 0) { \
PyErr_SetString(ProgrammingError, "named cursor isn't valid anymore"); \
return NULL; }

View File

@ -391,8 +391,10 @@ _psyco_curs_execute(cursorObject *self,
if (self->name != NULL) {
self->query = Bytes_FromFormat(
"DECLARE \"%s\" CURSOR WITHOUT HOLD FOR %s",
self->name, Bytes_AS_STRING(fquery));
"DECLARE \"%s\" CURSOR %s HOLD FOR %s",
self->name,
self->withhold ? "WITH" : "WITHOUT",
Bytes_AS_STRING(fquery));
Py_DECREF(fquery);
}
else {
@ -402,8 +404,10 @@ _psyco_curs_execute(cursorObject *self,
else {
if (self->name != NULL) {
self->query = Bytes_FromFormat(
"DECLARE \"%s\" CURSOR WITHOUT HOLD FOR %s",
self->name, Bytes_AS_STRING(operation));
"DECLARE \"%s\" CURSOR %s HOLD FOR %s",
self->name,
self->withhold ? "WITH" : "WITHOUT",
Bytes_AS_STRING(operation));
}
else {
/* Transfer reference ownership of the str in operation to
@ -461,11 +465,7 @@ psyco_curs_execute(cursorObject *self, PyObject *args, PyObject *kwargs)
"can't use a named cursor outside of transactions", NULL, NULL);
return NULL;
}
if (self->conn->mark != self->mark) {
psyco_set_error(ProgrammingError, self,
"named cursor isn't valid anymore", NULL, NULL);
return NULL;
}
EXC_IF_NO_MARK(self);
}
EXC_IF_CURS_CLOSED(self);
@ -1519,7 +1519,7 @@ exit:
return res;
}
/* extension: closed - return true if cursor is closed*/
/* extension: closed - return true if cursor is closed */
#define psyco_curs_closed_doc \
"True if cursor is closed, False if cursor is open"
@ -1535,6 +1535,39 @@ psyco_curs_get_closed(cursorObject *self, void *closure)
return closed;
}
/* extension: withhold - get or set "WITH HOLD" for named cursors */
#define psyco_curs_withhold_doc \
"Set or return cursor use of WITH HOLD"
static PyObject *
psyco_curs_withhold_get(cursorObject *self)
{
PyObject *ret;
ret = self->withhold ? Py_True : Py_False;
Py_INCREF(ret);
return ret;
}
static int
psyco_curs_withhold_set(cursorObject *self, PyObject *pyvalue)
{
int value;
if (self->name == NULL) {
PyErr_SetString(ProgrammingError,
"trying to set .withhold on unnamed cursor");
return -1;
}
if ((value = PyObject_IsTrue(pyvalue)) == -1)
return -1;
self->withhold = value;
return 0;
}
#endif
@ -1657,6 +1690,10 @@ static struct PyGetSetDef cursorObject_getsets[] = {
#ifdef PSYCOPG_EXTENSIONS
{ "closed", (getter)psyco_curs_get_closed, NULL,
psyco_curs_closed_doc, NULL },
{ "withhold",
(getter)psyco_curs_withhold_get,
(setter)psyco_curs_withhold_set,
psyco_curs_withhold_doc, NULL },
#endif
{NULL}
};
@ -1686,6 +1723,7 @@ cursor_setup(cursorObject *self, connectionObject *conn, const char *name)
self->conn = conn;
self->closed = 0;
self->withhold = 0;
self->mark = conn->mark;
self->pgres = NULL;
self->notuples = 1;

View File

@ -202,6 +202,9 @@
<None Include="tests\dbapi20_tpc.py" />
<None Include="tests\test_cursor.py" />
<None Include="NEWS" />
<None Include="tests\test_cancel.py" />
<None Include="tests\testconfig.py" />
<None Include="tests\testutils.py" />
</ItemGroup>
<ItemGroup>
<Compile Include="psycopg\adapter_asis.c" />

View File

@ -157,6 +157,27 @@ class CursorTests(unittest.TestCase):
curs = self.conn.cursor(r'1-2-3 \ "test"')
curs.execute("select data from invname order by data")
self.assertEqual(curs.fetchall(), [(10,), (20,), (30,)])
def test_withhold(self):
curs = self.conn.cursor()
curs.execute("drop table if exists withhold")
curs.execute("create table withhold (data int)")
for i in (10, 20, 30):
curs.execute("insert into withhold values (%s)", (i,))
curs.close()
curs = self.conn.cursor("W")
self.assertEqual(curs.withhold, False);
curs.withhold = True
self.assertEqual(curs.withhold, True);
curs.execute("select data from withhold order by data")
self.conn.commit()
self.assertEqual(curs.fetchall(), [(10,), (20,), (30,)])
curs = self.conn.cursor()
curs.execute("drop table withhold")
self.conn.commit()
@skip_before_postgres(8, 2)
def test_iter_named_cursor_efficient(self):