From d915cb12a8541af9477dde7546f0c4a76156fe60 Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Tue, 20 Apr 2010 13:31:19 +0100 Subject: [PATCH] Functions to poll in ready state moved on the connection. --- psycopg/connection.h | 2 + psycopg/connection_int.c | 84 +++++++++++++++++++++++++++++++++++++++- psycopg/cursor.h | 2 - psycopg/cursor_int.c | 76 ------------------------------------ 4 files changed, 84 insertions(+), 80 deletions(-) diff --git a/psycopg/connection.h b/psycopg/connection.h index 4f2a2b2e..df15c14a 100644 --- a/psycopg/connection.h +++ b/psycopg/connection.h @@ -135,6 +135,8 @@ HIDDEN int conn_set_client_encoding(connectionObject *self, const char *enc); HIDDEN PyObject *conn_poll_connect_send(connectionObject *self); HIDDEN PyObject *conn_poll_connect_fetch(connectionObject *self); HIDDEN PyObject *conn_poll_ready(connectionObject *self); +HIDDEN PyObject *conn_poll_send(connectionObject *self); +HIDDEN PyObject *conn_poll_fetch(connectionObject *self); HIDDEN PyObject *conn_poll_green(connectionObject *self); /* exception-raising macros */ diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c index 99c51ff9..bde9a2ad 100644 --- a/psycopg/connection_int.c +++ b/psycopg/connection_int.c @@ -649,13 +649,13 @@ conn_poll_ready(connectionObject *self) /* if there is an asynchronous query underway, poll it */ if (self->async_cursor != NULL) { if (self->async_status == ASYNC_WRITE) { - return curs_poll_send((cursorObject *) self->async_cursor); + return conn_poll_send(self); } else { /* this gets called both for ASYNC_READ and ASYNC_DONE, because even if the async query is complete, we still might want to check for NOTIFYs */ - return curs_poll_fetch((cursorObject *) self->async_cursor); + return conn_poll_fetch(self); } } @@ -677,6 +677,86 @@ conn_poll_ready(connectionObject *self) } } +/* conn_poll_send - poll the connection when flushing data to the backend */ + +PyObject * +conn_poll_send(connectionObject *self) +{ + int res; + + /* flush queued output to the server */ + res = pq_flush(self); + + if (res == 1) { + /* some data still waiting to be flushed */ + Dprintf("conn_poll_send: returning %d", PSYCO_POLL_WRITE); + return PyInt_FromLong(PSYCO_POLL_WRITE); + } + else if (res == 0) { + /* all data flushed, start waiting for results */ + Dprintf("conn_poll_send: returning %d", PSYCO_POLL_READ); + self->async_status = ASYNC_READ; + return PyInt_FromLong(PSYCO_POLL_READ); + } + else { + /* unexpected result */ + PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn)); + return NULL; + } +} + +/* conn_poll_fetch - poll the connection when reading results from the backend + * + * Assume self->async_cursor is not null: use such cursor to store results. + */ + +PyObject * +conn_poll_fetch(connectionObject *self) +{ + int is_busy; + int last_result; + + /* consume the input */ + is_busy = pq_is_busy(self); + if (is_busy == -1) { + /* there was an error, raise the exception */ + return NULL; + } + else if (is_busy == 1) { + /* the connection is busy, tell the user to wait more */ + Dprintf("conn_poll_fetch: returning %d", PSYCO_POLL_READ); + return PyInt_FromLong(PSYCO_POLL_READ); + } + + /* try to fetch the data only if this was a poll following a read + request; else just return POLL_OK to the user: this is necessary + because of asynchronous NOTIFYs that can be sent by the backend + even if the user didn't asked for them */ + + if (self->async_status == ASYNC_READ) + last_result = curs_get_last_result((cursorObject *)self->async_cursor); + else + last_result = 0; + + if (last_result == 0) { + Dprintf("conn_poll_fetch: returning %d", PSYCO_POLL_OK); + /* self->async_status cannot be ASYNC_WRITE here, because we + never execute curs_poll_fetch in ASYNC_WRITE state, so we can + safely set it to ASYNC_DONE because we either fetched the result or + there is no result to fetch */ + self->async_status = ASYNC_DONE; + return PyInt_FromLong(PSYCO_POLL_OK); + } + else if (last_result == 1) { + Dprintf("conn_poll_fetch: got result, but data remaining, " + "returning %d", PSYCO_POLL_READ); + return PyInt_FromLong(PSYCO_POLL_READ); + } + else { + return NULL; + } +} + /* conn_poll_green - poll a *sync* connection with external wait */ PyObject * diff --git a/psycopg/cursor.h b/psycopg/cursor.h index 613ed9d3..27a8e420 100644 --- a/psycopg/cursor.h +++ b/psycopg/cursor.h @@ -86,8 +86,6 @@ typedef struct { /* C-callable functions in cursor_int.c and cursor_ext.c */ HIDDEN void curs_reset(cursorObject *self); HIDDEN int curs_get_last_result(cursorObject *self); -HIDDEN PyObject *curs_poll_send(cursorObject *self); -HIDDEN PyObject *curs_poll_fetch(cursorObject *self); /* exception-raising macros */ #define EXC_IF_CURS_CLOSED(self) \ diff --git a/psycopg/cursor_int.c b/psycopg/cursor_int.c index 1069aaee..8f60ca7b 100644 --- a/psycopg/cursor_int.c +++ b/psycopg/cursor_int.c @@ -96,79 +96,3 @@ curs_get_last_result(cursorObject *self) { return pq_fetch(self) == -1 ? -1 : 0; } -/* curs_poll_send - handle cursor polling when flushing output */ - -PyObject * -curs_poll_send(cursorObject *self) -{ - int res; - - /* flush queued output to the server */ - res = pq_flush(self->conn); - - if (res == 1) { - /* some data still waiting to be flushed */ - Dprintf("cur_poll_send: returning %d", PSYCO_POLL_WRITE); - return PyInt_FromLong(PSYCO_POLL_WRITE); - } - else if (res == 0) { - /* all data flushed, start waiting for results */ - Dprintf("cur_poll_send: returning %d", PSYCO_POLL_READ); - self->conn->async_status = ASYNC_READ; - return PyInt_FromLong(PSYCO_POLL_READ); - } - else { - /* unexpected result */ - PyErr_SetString(OperationalError, PQerrorMessage(self->conn->pgconn)); - return NULL; - } -} - -/* curs_poll_fetch - handle cursor polling when reading result */ - -PyObject * -curs_poll_fetch(cursorObject *self) -{ - int is_busy; - int last_result; - - /* consume the input */ - is_busy = pq_is_busy(self->conn); - if (is_busy == -1) { - /* there was an error, raise the exception */ - return NULL; - } - else if (is_busy == 1) { - /* the connection is busy, tell the user to wait more */ - Dprintf("cur_poll_fetch: returning %d", PSYCO_POLL_READ); - return PyInt_FromLong(PSYCO_POLL_READ); - } - - /* try to fetch the data only if this was a poll following a read - request; else just return POLL_OK to the user: this is necessary - because of asynchronous NOTIFYs that can be sent by the backend - even if the user didn't asked for them */ - - if (self->conn->async_status == ASYNC_READ) - last_result = curs_get_last_result(self); - else - last_result = 0; - - if (last_result == 0) { - Dprintf("cur_poll_fetch: returning %d", PSYCO_POLL_OK); - /* self->conn->async_status cannot be ASYNC_WRITE here, because we - never execute curs_poll_fetch in ASYNC_WRITE state, so we can - safely set it to ASYNC_DONE because we either fetched the result or - there is no result to fetch */ - self->conn->async_status = ASYNC_DONE; - return PyInt_FromLong(PSYCO_POLL_OK); - } - else if (last_result == 1) { - Dprintf("cur_poll_fetch: got result, but data remaining, " - "returning %d", PSYCO_POLL_READ); - return PyInt_FromLong(PSYCO_POLL_READ); - } - else { - return NULL; - } -}