Change the API for asynchronous queries to use cursor.poll()

The isread() API was not safe, because the query might have not been
sent fully to the server after calling execute(). To make the async
API complete, a similar mechanism to async connections must be used.

The cursor now has a poll() method that you would use identically to
the poll() method of the connection class.
This commit is contained in:
Jan Urbański 2010-03-26 04:01:33 +01:00 committed by Federico Di Gregorio
parent 9b259a8a54
commit 91ef0e09ed
5 changed files with 91 additions and 35 deletions

View File

@ -52,6 +52,10 @@ extern "C" {
#define CONN_STATUS_SENT_TRANSACTION_ISOLATION 12
#define CONN_STATUS_GET_TRANSACTION_ISOLATION 13
/* async query execution status */
#define ASYNC_READ 1
#define ASYNC_WRITE 2
/* polling result, try to keep in sync with PostgresPollingStatusType from
libpq-fe.h */
#define PSYCO_POLL_READ 1
@ -97,6 +101,7 @@ typedef struct {
PGconn *pgconn; /* the postgresql connection */
PyObject *async_cursor; /* a cursor executing an asynchronous query */
int async_status; /* asynchronous execution status */
/* notice processing */
PyObject *notice_list;

View File

@ -87,6 +87,8 @@ typedef struct {
/* C-callable functions in cursor_int.c and cursor_ext.c */
HIDDEN void curs_reset(cursorObject *self);
HIDDEN void curs_get_last_result(cursorObject *self);
HIDDEN PyObject *curs_poll_send(cursorObject *self);
HIDDEN PyObject *curs_poll_fetch(cursorObject *self);
/* exception-raising macros */
#define EXC_IF_CURS_CLOSED(self) \

View File

@ -79,3 +79,56 @@ curs_get_last_result(cursorObject *self) {
Py_END_ALLOW_THREADS;
self->needsfetch = 1;
}
/* curs_poll_send - handle cursor polling when flushing output */
PyObject *
curs_poll_send(cursorObject *self)
{
int res;
/* flush queued output to the server */
res = pq_flush(self->conn);
if (res == 1) {
/* some data still waiting to be flushed */
Dprintf("cur_poll_send: returning %d", PSYCO_POLL_WRITE);
return PyInt_FromLong(PSYCO_POLL_WRITE);
}
else if (res == 0) {
/* all data flushed, start waiting for results */
Dprintf("cur_poll_send: returning %d", PSYCO_POLL_READ);
return PyInt_FromLong(PSYCO_POLL_READ);
}
else {
/* unexpected result */
PyErr_SetString(OperationalError, PQerrorMessage(self->conn->pgconn));
return NULL;
}
}
/* curs_poll_fetch - handle cursor polling when reading result */
PyObject *
curs_poll_fetch(cursorObject *self)
{
int is_busy;
/* consume the input */
is_busy = pq_is_busy(self->conn);
if (is_busy == -1) {
/* there was an error, raise the exception */
return NULL;
}
else if (is_busy == 1) {
/* the connection is busy, tell the user to wait more */
Dprintf("cur_poll_fetch: returning %d", PSYCO_POLL_READ);
return PyInt_FromLong(PSYCO_POLL_READ);
}
/* all data has arrived */
curs_get_last_result(self);
Dprintf("cur_poll_fetch: returning %d", PSYCO_POLL_OK);
return PyInt_FromLong(PSYCO_POLL_OK);
}

View File

@ -1497,18 +1497,14 @@ psyco_curs_copy_expert(cursorObject *self, PyObject *args, PyObject *kwargs)
"fileno() -> int -- Return file descriptor associated to database connection."
static PyObject *
psyco_curs_fileno(cursorObject *self, PyObject *args)
psyco_curs_fileno(cursorObject *self)
{
long int socket;
if (!PyArg_ParseTuple(args, "")) return NULL;
EXC_IF_CURS_CLOSED(self);
/* note how we call PQflush() to make sure the user will use
select() in the safe way! */
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&(self->conn->lock));
PQflush(self->conn->pgconn);
socket = (long int)PQsocket(self->conn->pgconn);
pthread_mutex_unlock(&(self->conn->lock));
Py_END_ALLOW_THREADS;
@ -1516,43 +1512,31 @@ psyco_curs_fileno(cursorObject *self, PyObject *args)
return PyInt_FromLong(socket);
}
/* extension: isready - return true if data from async execute is ready */
/* extension: poll - return true if data from async execute is ready */
#define psyco_curs_isready_doc \
"isready() -> bool -- Return True if data is ready after an async query."
#define psyco_curs_poll_doc \
"poll() -- return POLL_OK if the query has been fully processed, " \
"POLL_READ if the query has been sent and the application should be " \
"waiting for the result to arrive or POLL_WRITE is the query is still " \
"being sent."
static PyObject *
psyco_curs_isready(cursorObject *self, PyObject *args)
psyco_curs_poll(cursorObject *self)
{
int res;
if (!PyArg_ParseTuple(args, "")) return NULL;
EXC_IF_CURS_CLOSED(self);
/* pq_is_busy does its own locking, we don't need anything special but if
the cursor is ready we need to fetch the result and free the connection
for the next query. if -1 is returned we raise an exception. */
Dprintf("curs_poll: polling with status %d", self->conn->async_status);
res = pq_is_busy(self->conn);
if (res == 1) {
Py_INCREF(Py_False);
return Py_False;
if (self->conn->async_status == ASYNC_WRITE) {
return curs_poll_send(self);
}
else if (res == -1) {
return NULL;
else if (self->conn->async_status == ASYNC_READ) {
return curs_poll_fetch(self);
}
else {
IFCLEARPGRES(self->pgres);
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&(self->conn->lock));
self->pgres = PQgetResult(self->conn->pgconn);
self->conn->async_cursor = NULL;
pthread_mutex_unlock(&(self->conn->lock));
Py_END_ALLOW_THREADS;
self->needsfetch = 1;
Py_INCREF(Py_True);
return Py_True;
PyErr_Format(OperationalError, "unexpected execution status: %d",
self->conn->async_status);
return NULL;
}
}
@ -1634,10 +1618,10 @@ static struct PyMethodDef cursorObject_methods[] = {
#ifdef PSYCOPG_EXTENSIONS
{"mogrify", (PyCFunction)psyco_curs_mogrify,
METH_VARARGS|METH_KEYWORDS, psyco_curs_mogrify_doc},
{"poll", (PyCFunction)psyco_curs_poll,
METH_VARARGS, psyco_curs_poll_doc},
{"fileno", (PyCFunction)psyco_curs_fileno,
METH_VARARGS, psyco_curs_fileno_doc},
{"isready", (PyCFunction)psyco_curs_isready,
METH_VARARGS, psyco_curs_isready_doc},
METH_NOARGS, psyco_curs_fileno_doc},
{"copy_from", (PyCFunction)psyco_curs_copy_from,
METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_from_doc},
{"copy_to", (PyCFunction)psyco_curs_copy_to,

View File

@ -661,6 +661,7 @@ pq_execute(cursorObject *curs, const char *query, int async)
{
PGresult *pgres = NULL;
char *error = NULL;
int async_status = ASYNC_WRITE;
/* if the status of the connection is critical raise an exception and
definitely close the connection */
@ -720,6 +721,16 @@ pq_execute(cursorObject *curs, const char *query, int async)
return -1;
}
Dprintf("pq_execute: async query sent to backend");
if (PQflush(curs->conn->pgconn) == 0) {
/* the query got fully sent to the server */
Dprintf("pq_execute: query got flushed immediately");
/* the async status will be ASYNC_READ */
async_status = ASYNC_READ;
}
else {
async_status = ASYNC_WRITE;
}
}
pthread_mutex_unlock(&(curs->conn->lock));
@ -734,6 +745,7 @@ pq_execute(cursorObject *curs, const char *query, int async)
if (pq_fetch(curs) == -1) return -1;
}
else {
curs->conn->async_status = async_status;
curs->conn->async_cursor = (PyObject*)curs;
}