Implemented named cursors.

This commit is contained in:
Federico Di Gregorio 2005-10-22 06:59:31 +00:00
parent f687f2853e
commit ef3430d24f
9 changed files with 257 additions and 93 deletions

View File

@ -1,3 +1,27 @@
2005-10-22 Federico Di Gregorio <fog@initd.org>
* psycopg/cursor_type.c: added support for named cursors:
- .fetchXXX() methods now execute a FETCH if the cursor is named
- .execute() executes a DECLARE if the cursor is named
- .execute() fails if a named cursor is used in autocommit
- .executemany() can't be called on named cursors
- .scroll() executes a MOVE if the cursor is named
- .close() executes a CLOSE if the cursor is named
Also, a "transaction mark" was added to both the connection and the
cursor and an exception is raised when using a named cursor unless the
two marks correspond.
* psycopg/connection_int.c: snprintf->PyOS_snprintf.
* psycopg/psycopgmodule.c: snprintf->PyOS_snprintf.
* psycopg/cursor_type.c: changed self->query type from C string to
PyObject* to better manage queries in named cursors.
* psycopg/psycopgmodule.c: cleaned up exception names (now the errors
is printed as psycopg2.Error and not as the confusing
psycopg2._psycopg.Error.)
2005-10-20 Federico Di Gregorio <fog@initd.org>
* lib/pool.py: renamed ThreadedConnectionPool to PersistentConnectionPool

6
NEWS
View File

@ -1,3 +1,9 @@
What's new in psycopg 2.0 beta 6
--------------------------------
* Support for named cursors (see examples/fetch.py).
What's new in psycopg 2.0 beta 5
--------------------------------

View File

@ -52,19 +52,28 @@ conn.commit()
# does some nice tricks with the transaction and postgres cursors
# (remember to always commit or rollback before a DECLARE)
#
# we don't need to DECLARE ourselves, psycopg now support named
# cursor (but we leave the code here, comments, as an example of
# what psycopg is doing under the hood)
#curs.execute("DECLARE crs CURSOR FOR SELECT * FROM test_fetch")
#curs.execute("FETCH 10 FROM crs")
#print "First 10 rows:", flatten(curs.fetchall())
#curs.execute("MOVE -5 FROM crs")
#print "Moved back cursor by 5 rows (to row 5.)"
#curs.execute("FETCH 10 FROM crs")
#print "Another 10 rows:", flatten(curs.fetchall())
#curs.execute("FETCH 10 FROM crs")
#print "The remaining rows:", flatten(curs.fetchall())
curs.execute("DECLARE crs CURSOR FOR SELECT * FROM test_fetch")
curs.execute("FETCH 10 FROM crs")
print "First 10 rows:", flatten(curs.fetchall())
curs.execute("MOVE -5 FROM crs")
ncurs = conn.cursor("crs")
ncurs.execute("SELECT * FROM test_fetch")
print "First 10 rows:", flatten(ncurs.fetchmany(10))
ncurs.scroll(-5)
print "Moved back cursor by 5 rows (to row 5.)"
curs.execute("FETCH 10 FROM crs")
print "Another 10 rows:", flatten(curs.fetchall())
curs.execute("FETCH 10 FROM crs")
print "The remaining rows:", flatten(curs.fetchall())
# rollback to close the transaction
print "Another 10 rows:", flatten(ncurs.fetchmany(10))
print "Another one:", list(ncurs.fetchone())
print "The remaining rows:", flatten(ncurs.fetchall())
conn.rollback()
curs.execute("DROP TABLE test_fetch")

View File

@ -48,6 +48,7 @@ typedef struct {
long int closed; /* 2 means connection has been closed */
long int isolation_level; /* isolation level for this connection */
long int mark; /* number of commits/rollbacks done so far */
int status; /* status of the connection */
int protocol; /* protocol version */

View File

@ -191,7 +191,8 @@ conn_commit(connectionObject *self)
pthread_mutex_lock(&self->lock);
res = pq_commit(self);
self->mark++;
pthread_mutex_unlock(&self->lock);
Py_END_ALLOW_THREADS;
@ -209,7 +210,8 @@ conn_rollback(connectionObject *self)
pthread_mutex_lock(&self->lock);
res = pq_abort(self);
self->mark++;
pthread_mutex_unlock(&self->lock);
Py_END_ALLOW_THREADS;
@ -232,7 +234,8 @@ conn_switch_isolation_level(connectionObject *self, int level)
res = pq_abort(self);
}
self->isolation_level = level;
self->mark++;
Dprintf("conn_switch_isolation_level: switched to level %d", level);
pthread_mutex_unlock(&self->lock);
@ -256,7 +259,7 @@ conn_set_client_encoding(connectionObject *self, char *enc)
pthread_mutex_lock(&self->lock);
/* set encoding, no encoding string is longer than 24 bytes */
snprintf(query, 47, "SET client_encoding = '%s'", enc);
PyOS_snprintf(query, 47, "SET client_encoding = '%s'", enc);
/* abort the current transaction, to set the encoding ouside of
transactions */

View File

@ -62,9 +62,18 @@ psyco_conn_cursor(connectionObject *self, PyObject *args, PyObject *keywds)
Dprintf("psyco_conn_cursor: parameters: name = %s", name);
if (factory == NULL) factory = (PyObject *)&cursorType;
obj = PyObject_CallFunction(factory, "O", self);
if (name)
obj = PyObject_CallFunction(factory, "Os", self, name);
else
obj = PyObject_CallFunction(factory, "O", self);
/* TODO: added error checking on obj (cursor) here */
if (obj == NULL) return NULL;
if (PyObject_IsInstance(obj, (PyObject *)&cursorType) == 0) {
PyErr_SetString(PyExc_TypeError,
"cursor factory must be subclass of psycopg2._psycopg.cursor");
Py_DECREF(obj);
return NULL;
}
Dprintf("psyco_conn_cursor: new cursor at %p: refcnt = %d",
obj, obj->ob_refcnt);
@ -259,6 +268,7 @@ connection_setup(connectionObject *self, char *dsn)
self->critical = NULL;
self->async_cursor = NULL;
self->pgconn = NULL;
self->mark = 0;
pthread_mutex_init(&(self->lock), NULL);

View File

@ -46,6 +46,7 @@ typedef struct {
long int columns; /* number of columns fetched from the db */
long int arraysize; /* how many rows should fetchmany() return */
long int row; /* the row counter for fetch*() operations */
long int mark; /* transaction marker, copied from conn */
PyObject *description; /* read-only attribute: sequence of 7-item
sequences.*/
@ -65,9 +66,11 @@ typedef struct {
PyObject *tuple_factory; /* factory for result tuples */
PyObject *tzinfo_factory; /* factory for tzinfo objects */
PyObject *query; /* last query executed */
char *qattr; /* quoting attr, used when quoting strings */
char *notice; /* a notice from the backend */
char *query; /* last query executed */
char *name; /* this cursor name */
PyObject *string_types; /* a set of typecasters for string types */
PyObject *binary_types; /* a set of typecasters for binary types */
@ -83,10 +86,16 @@ if ((self)->closed || ((self)->conn && (self)->conn->closed)) { \
PyErr_SetString(InterfaceError, "cursor already closed"); \
return NULL; }
#define EXC_IF_NO_TUPLES(self) if ((self)->notuples) { \
#define EXC_IF_NO_TUPLES(self) \
if ((self)->notuples && (self)->name == NULL) { \
PyErr_SetString(ProgrammingError, "no results to fetch"); \
return NULL; }
#define EXC_IF_NO_MARK(self) \
if ((self)->mark != (self)->conn->mark) { \
PyErr_SetString(ProgrammingError, "named cursor isn't valid anymore"); \
return NULL; }
#ifdef __cplusplus
}
#endif

View File

@ -14,7 +14,7 @@
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* You should have received a copy of the GNU General Public Likcense
* along with this program; if not, write to the Free Software
* Foundation, 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/
@ -51,6 +51,14 @@ psyco_curs_close(cursorObject *self, PyObject *args)
if (!PyArg_ParseTuple(args, "")) return NULL;
EXC_IF_CURS_CLOSED(self);
if (self->name != NULL) {
char buffer[128];
EXC_IF_NO_MARK(self);
PyOS_snprintf(buffer, 127, "CLOSE %s", self->name);
if (pq_execute(self, buffer, 0) == -1) return NULL;
}
self->closed = 1;
Dprintf("psyco_curs_close: cursor at %p closed", self);
@ -276,7 +284,7 @@ _psyco_curs_execute(cursorObject *self,
IFCLEARPGRES(self->pgres);
if (self->query) {
free(self->query);
Py_DECREF(self->query);
self->query = NULL;
}
@ -284,25 +292,7 @@ _psyco_curs_execute(cursorObject *self,
/* here we are, and we have a sequence or a dictionary filled with
objects to be substituted (bound variables). we try to be smart and do
the right thing (i.e., what the user expects), so:
1. if the bound variable is None the format string is changed into a %s
(just like now) and the variable substituted with the "NULL" string;
2. if a bound variable has the .sqlquote method, we suppose it is able
to do the required quoting by itself: we call the method and
substitute the result in the sequence/dictionary. if the result of
calling .sqlquote is not a string object or the format string is not
%s we raise an error;
3. if a bound variable does not have the .sqlquote method AND the
format string is %s str() is called on the variable and the result
wrapped in a psycopg.QuotedString object;
4. if the format string is not %s we suppose the object is capable to
format itself accordingly, so we don't touch it.
let's go... */
the right thing (i.e., what the user expects) */
if (vars && vars != Py_None)
{
@ -363,18 +353,33 @@ _psyco_curs_execute(cursorObject *self,
Py_XDECREF(uoperation);
return 0;
}
self->query = strdup(PyString_AS_STRING(fquery));
Dprintf("psyco_curs_execute: cvt->refcnt = %d, fquery->refcnt = %d",
cvt->ob_refcnt, fquery->ob_refcnt);
Py_DECREF(fquery);
if (self->name != NULL) {
self->query = PyString_FromFormat(
"DECLARE %s CURSOR WITHOUT HOLD FOR %s",
self->name, PyString_AS_STRING(fquery));
Py_DECREF(fquery);
}
else {
self->query = fquery;
}
Dprintf("psyco_curs_execute: cvt->refcnt = %d", cvt->ob_refcnt);
Py_DECREF(cvt);
}
else {
self->query = strdup(PyString_AS_STRING(operation));
if (self->name != NULL) {
self->query = PyString_FromFormat(
"DECLARE %s CURSOR WITHOUT HOLD FOR %s",
self->name, PyString_AS_STRING(operation));
}
else {
Py_INCREF(operation);
self->query = operation;
}
}
res = pq_execute(self, self->query, async);
res = pq_execute(self, PyString_AS_STRING(self->query), async);
Dprintf("psyco_curs_execute: res = %d, pgres = %p", res, self->pgres);
@ -396,6 +401,24 @@ psyco_curs_execute(cursorObject *self, PyObject *args, PyObject *kwargs)
return NULL;
}
if (self->name != NULL) {
if (self->query != Py_None) {
PyErr_SetString(ProgrammingError,
"can't call .execute() on named cursors more than once");
return NULL;
}
if (self->conn->isolation_level == 0) {
PyErr_SetString(ProgrammingError,
"can't use a named cursor outside of transactions");
return NULL;
}
if (self->conn->mark != self->mark) {
PyErr_SetString(ProgrammingError,
"named cursor isn't valid anymore");
return NULL;
}
}
EXC_IF_CURS_CLOSED(self);
if (_psyco_curs_execute(self, operation, vars, async)) {
@ -424,6 +447,12 @@ psyco_curs_executemany(cursorObject *self, PyObject *args, PyObject *kwargs)
}
EXC_IF_CURS_CLOSED(self);
if (self->name != NULL) {
PyErr_SetString(ProgrammingError,
"can't call .executemany() on named cursors");
return NULL;
}
if (!PyIter_Check(vars)) {
vars = iter = PyObject_GetIter(vars);
@ -643,7 +672,6 @@ _psyco_curs_buildrow_with_factory(cursorObject *self, int row)
return _psyco_curs_buildrow_fill(self, res, row, n, 0);
}
PyObject *
psyco_curs_fetchone(cursorObject *self, PyObject *args)
{
@ -655,6 +683,15 @@ psyco_curs_fetchone(cursorObject *self, PyObject *args)
if (_psyco_curs_prefetch(self) < 0) return NULL;
EXC_IF_NO_TUPLES(self);
if (self->name != NULL) {
char buffer[128];
EXC_IF_NO_MARK(self);
PyOS_snprintf(buffer, 127, "FETCH FORWARD 1 FROM %s", self->name);
if (pq_execute(self, buffer, 0) == -1) return NULL;
if (_psyco_curs_prefetch(self) < 0) return NULL;
}
Dprintf("psyco_curs_fetchone: fetching row %ld", self->row);
Dprintf("psyco_curs_fetchone: rowcount = %ld", self->rowcount);
@ -706,6 +743,16 @@ psyco_curs_fetchmany(cursorObject *self, PyObject *args, PyObject *kwords)
if (_psyco_curs_prefetch(self) < 0) return NULL;
EXC_IF_NO_TUPLES(self);
if (self->name != NULL) {
char buffer[128];
EXC_IF_NO_MARK(self);
PyOS_snprintf(buffer, 127, "FETCH FORWARD %d FROM %s",
(int)size, self->name);
if (pq_execute(self, buffer, 0) == -1) return NULL;
if (_psyco_curs_prefetch(self) < 0) return NULL;
}
/* make sure size is not > than the available number of rows */
if (size > self->rowcount - self->row || size < 0) {
size = self->rowcount - self->row;
@ -767,6 +814,15 @@ psyco_curs_fetchall(cursorObject *self, PyObject *args)
EXC_IF_CURS_CLOSED(self);
if (_psyco_curs_prefetch(self) < 0) return NULL;
EXC_IF_NO_TUPLES(self);
if (self->name != NULL) {
char buffer[128];
EXC_IF_NO_MARK(self);
PyOS_snprintf(buffer, 127, "FETCH FORWARD ALL FROM %s", self->name);
if (pq_execute(self, buffer, 0) == -1) return NULL;
if (_psyco_curs_prefetch(self) < 0) return NULL;
}
size = self->rowcount - self->row;
@ -823,6 +879,12 @@ psyco_curs_callproc(cursorObject *self, PyObject *args, PyObject *kwargs)
EXC_IF_CURS_CLOSED(self);
if (self->name != NULL) {
PyErr_SetString(ProgrammingError,
"can't call .executemany() on named cursors");
return NULL;
}
if(parameters && parameters != Py_None) {
nparameters = PyObject_Length(parameters);
}
@ -932,27 +994,47 @@ psyco_curs_scroll(cursorObject *self, PyObject *args, PyObject *kwargs)
EXC_IF_CURS_CLOSED(self);
if (strcmp(mode, "relative") == 0) {
newpos = self->row + value;
} else if ( strcmp( mode, "absolute") == 0) {
newpos = value;
} else {
PyErr_SetString(ProgrammingError,
"scroll mode must be 'relative' or 'absolute'");
return NULL;
}
if (newpos < 0 || newpos >= self->rowcount ) {
PyErr_SetString(PyExc_IndexError,
"scroll destination out of bounds");
return NULL;
/* if the cursor is not named we have the full result set and we can do
our own calculations to scroll; else we just delegate the scrolling
to the MOVE SQL statement */
if (self->name == NULL) {
if (strcmp(mode, "relative") == 0) {
newpos = self->row + value;
} else if (strcmp( mode, "absolute") == 0) {
newpos = value;
} else {
PyErr_SetString(ProgrammingError,
"scroll mode must be 'relative' or 'absolute'");
return NULL;
}
if (newpos < 0 || newpos >= self->rowcount ) {
PyErr_SetString(PyExc_IndexError,
"scroll destination out of bounds");
return NULL;
}
self->row = newpos;
}
self->row = newpos;
else {
char buffer[128];
EXC_IF_NO_MARK(self);
if (strcmp(mode, "absolute") == 0) {
PyOS_snprintf(buffer, 127, "MOVE ABSOLUTE %d FROM %s",
value, self->name);
}
else {
PyOS_snprintf(buffer, 127, "MOVE %d FROM %s", value, self->name);
}
if (pq_execute(self, buffer, 0) == -1) return NULL;
if (_psyco_curs_prefetch(self) < 0) return NULL;
}
Py_INCREF(Py_None);
return Py_None;
}
@ -1221,14 +1303,15 @@ static struct PyMemberDef cursorObject_members[] = {
/* DBAPI-2.0 extensions */
{"rownumber", T_LONG, OFFSETOF(row), RO},
{"connection", T_OBJECT, OFFSETOF(conn), RO},
#ifdef PSYCOPG_EXTENSIONS
#ifdef PSYCOPG_EXTENSIONS
{"name", T_STRING, OFFSETOF(name), RO},
{"statusmessage", T_OBJECT, OFFSETOF(pgstatus), RO},
{"query", T_STRING, OFFSETOF(query), RO},
{"query", T_OBJECT, OFFSETOF(query), RO},
{"row_factory", T_OBJECT, OFFSETOF(tuple_factory), 0},
{"tzinfo_factory", T_OBJECT, OFFSETOF(tzinfo_factory), 0},
{"typecaster", T_OBJECT, OFFSETOF(caster), RO},
{"string_types", T_OBJECT, OFFSETOF(string_types), 0},
{"binary_types", T_OBJECT, OFFSETOF(binary_types), 0},
{"binary_types", T_OBJECT, OFFSETOF(binary_types), 0},
#endif
{NULL}
};
@ -1236,16 +1319,29 @@ static struct PyMemberDef cursorObject_members[] = {
/* initialization and finalization methods */
static int
cursor_setup(cursorObject *self, connectionObject *conn)
cursor_setup(cursorObject *self, connectionObject *conn, char *name)
{
Dprintf("cursor_setup: init cursor object at %p, refcnt = %d",
self, ((PyObject *)self)->ob_refcnt);
Dprintf("cursor_setup: init cursor object at %p", self);
Dprintf("cursor_setup: parameters: name = %s, conn = %p", name, conn);
if (name) {
self->name = PyMem_Malloc(strlen(name)+1);
if (self->name == NULL) return 1;
strncpy(self->name, name, strlen(name)+1);
}
/* FIXME: why does this raise an excpetion on the _next_ line of code?
if (PyObject_IsInstance((PyObject*)conn,
(PyObject *)&connectionType) == 0) {
PyErr_SetString(PyExc_TypeError,
"argument 1 must be subclass of psycopg2._psycopg.connection");
return 1;
} */
self->conn = conn;
Py_INCREF((PyObject*)self->conn);
self->closed = 0;
self->mark = conn->mark;
self->pgres = NULL;
self->notuples = 1;
self->arraysize = 1;
@ -1254,7 +1350,6 @@ cursor_setup(cursorObject *self, connectionObject *conn)
self->casts = NULL;
self->notice = NULL;
self->query = NULL;
self->string_types = NULL;
self->binary_types = NULL;
@ -1265,6 +1360,8 @@ cursor_setup(cursorObject *self, connectionObject *conn)
Py_INCREF(Py_None);
self->tuple_factory = Py_None;
Py_INCREF(Py_None);
self->query = Py_None;
Py_INCREF(Py_None);
/* default tzinfo factory */
self->tzinfo_factory = pyPsycopgTzFixedOffsetTimezone;
@ -1280,15 +1377,15 @@ cursor_dealloc(PyObject* obj)
{
cursorObject *self = (cursorObject *)obj;
if (self->query) free(self->query);
Py_DECREF((PyObject*)self->conn);
if (self->name) PyMem_Free(self->name);
Py_XDECREF((PyObject*)self->conn);
Py_XDECREF(self->casts);
Py_XDECREF(self->description);
Py_XDECREF(self->pgstatus);
Py_XDECREF(self->tuple_factory);
Py_XDECREF(self->tzinfo_factory);
Py_XDECREF(self->query);
IFCLEARPGRES(self->pgres);
@ -1301,16 +1398,18 @@ cursor_dealloc(PyObject* obj)
static int
cursor_init(PyObject *obj, PyObject *args, PyObject *kwds)
{
char *name = NULL;
PyObject *conn;
if (!PyArg_ParseTuple(args, "O", &conn))
if (!PyArg_ParseTuple(args, "O|s", &conn, &name))
return -1;
return cursor_setup((cursorObject *)obj, (connectionObject *)conn);
return cursor_setup((cursorObject *)obj, (connectionObject *)conn, name);
}
static PyObject *
cursor_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
{
{
return type->tp_alloc(type, 0);
}

View File

@ -139,7 +139,7 @@ psyco_connect(PyObject *self, PyObject *args, PyObject *keywds)
}
if (iport > 0)
snprintf(port, 16, "%d", iport);
PyOS_snprintf(port, 16, "%d", iport);
if (dsn == NULL) {
int l = 36; /* len("dbname= user= password= host= port=\0") */
@ -295,26 +295,29 @@ PyObject *Error, *Warning, *InterfaceError, *DatabaseError,
static void
psyco_errors_init(void)
{
Error = PyErr_NewException("psycopg2._psycopg.Error",
PyExc_StandardError, NULL);
Warning = PyErr_NewException("psycopg2._psycopg.Warning",
/* the names of the exceptions here reflect the oranization of the
psycopg2 module and not the fact the the original error objects
live in _psycopg */
Error = PyErr_NewException("psycopg2.Error", PyExc_StandardError, NULL);
Warning = PyErr_NewException("psycopg2.Warning",
PyExc_StandardError,NULL);
InterfaceError = PyErr_NewException("psycopg2._psycopg.InterfaceError",
InterfaceError = PyErr_NewException("psycopg2.InterfaceError",
Error, NULL);
DatabaseError = PyErr_NewException("psycopg2._psycopg.DatabaseError",
DatabaseError = PyErr_NewException("psycopg2.DatabaseError",
Error, NULL);
InternalError = PyErr_NewException("psycopg2._psycopg.InternalError",
InternalError = PyErr_NewException("psycopg2.InternalError",
DatabaseError, NULL);
OperationalError = PyErr_NewException("psycopg2._psycopg.OperationalError",
OperationalError = PyErr_NewException("psycopg2.OperationalError",
DatabaseError, NULL);
ProgrammingError = PyErr_NewException("psycopg2._psycopg.ProgrammingError",
ProgrammingError = PyErr_NewException("psycopg2.ProgrammingError",
DatabaseError, NULL);
IntegrityError = PyErr_NewException("psycopg2._psycopg.IntegrityError",
IntegrityError = PyErr_NewException("psycopg2.IntegrityError",
DatabaseError,NULL);
DataError = PyErr_NewException("psycopg2._psycopg.DataError",
DataError = PyErr_NewException("psycopg2.DataError",
DatabaseError, NULL);
NotSupportedError =
PyErr_NewException("psycopg2._psycopg.NotSupportedError",
NotSupportedError = PyErr_NewException("psycopg2.NotSupportedError",
DatabaseError, NULL);
}