From 1f6ffbba0f24537648c51384a72b72bff27499b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Urba=C5=84ski?= Date: Fri, 26 Mar 2010 03:49:24 +0100 Subject: [PATCH] Block and clear the result of asynchronous queries where necessary Remove the big loop in pq_fetch with the select() call, among others. Code paths that don't support asynchronous queries should now be adequately guarded. --- psycopg/connection_int.c | 2 -- psycopg/cursor_type.c | 18 ++++---------- psycopg/pqpath.c | 54 +++------------------------------------- 3 files changed, 9 insertions(+), 65 deletions(-) diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c index 6a0b0f7b..8c619ecc 100644 --- a/psycopg/connection_int.c +++ b/psycopg/connection_int.c @@ -413,8 +413,6 @@ conn_set_client_encoding(connectionObject *self, const char *enc) issue any query to the backend */ if (strcmp(self->encoding, enc) == 0) return 0; - /* TODO: check for async query here and raise error if necessary */ - Py_BEGIN_ALLOW_THREADS; pthread_mutex_lock(&self->lock); diff --git a/psycopg/cursor_type.c b/psycopg/cursor_type.c index 7a92ee24..0609ca31 100644 --- a/psycopg/cursor_type.c +++ b/psycopg/cursor_type.c @@ -67,6 +67,7 @@ psyco_curs_close(cursorObject *self, PyObject *args) } self->closed = 1; + pq_clear_async(self->conn); Dprintf("psyco_curs_close: cursor at %p closed", self); Py_INCREF(Py_None); @@ -313,19 +314,6 @@ _psyco_curs_execute(cursorObject *self, int res = 0; PyObject *fquery, *cvt = NULL; - Py_BEGIN_ALLOW_THREADS; - pthread_mutex_lock(&(self->conn->lock)); - if (self->conn->async_cursor != NULL - && self->conn->async_cursor != (PyObject*)self) { - pthread_mutex_unlock(&(self->conn->lock)); - Py_BLOCK_THREADS; - psyco_set_error(ProgrammingError, (PyObject*)self, - "asynchronous query already in execution", NULL, NULL); - return 0; - } - pthread_mutex_unlock(&(self->conn->lock)); - Py_END_ALLOW_THREADS; - operation = _psyco_curs_validate_sql_basic(self, operation); /* Any failure from here forward should 'goto fail' rather than 'return 0' @@ -1084,6 +1072,10 @@ psyco_curs_scroll(cursorObject *self, PyObject *args, PyObject *kwargs) EXC_IF_CURS_CLOSED(self); + if (self->conn->async == 1) { + if (_psyco_curs_prefetch(self) < 0) return NULL; + } + /* if the cursor is not named we have the full result set and we can do our own calculations to scroll; else we just delegate the scrolling to the MOVE SQL statement */ diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index 78b81d23..3f4ad4ec 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -680,6 +680,10 @@ pq_execute(cursorObject *curs, const char *query, int async) Py_BEGIN_ALLOW_THREADS; pthread_mutex_lock(&(curs->conn->lock)); + /* FIXME: we should first try to cancel the query, otherwise we will block + until it completes AND until we get the result back */ + pq_clear_async(curs->conn); + if (pq_begin_locked(curs->conn, &pgres, &error) < 0) { pthread_mutex_unlock(&(curs->conn->lock)); Py_BLOCK_THREADS; @@ -704,15 +708,9 @@ pq_execute(cursorObject *curs, const char *query, int async) } else if (async == 1) { - /* first of all, let see if the previous query has already ended, if - not what should we do? just block and discard data or execute - another query? */ - pq_clear_async(curs->conn); - Dprintf("pq_execute: executing ASYNC query:"); Dprintf(" %-.200s", query); - /* then we can go on and send a new query without fear */ IFCLEARPGRES(curs->pgres); if (PQsendQuery(curs->conn->pgconn, query) == 0) { pthread_mutex_unlock(&(curs->conn->lock)); @@ -1129,50 +1127,6 @@ pq_fetch(cursorObject *curs) /* even if we fail, we remove any information about the previous query */ curs_reset(curs); - /* we check the result from the previous execute; if the result is not - already there, we need to consume some input and go to sleep until we - get something edible to eat */ - if (!curs->pgres) { - - Dprintf("pq_fetch: no data: entering polling loop"); - - while (pq_is_busy(curs->conn) > 0) { - fd_set rfds; - struct timeval tv; - int sval, sock; - - Py_BEGIN_ALLOW_THREADS; - pthread_mutex_lock(&(curs->conn->lock)); - - sock = PQsocket(curs->conn->pgconn); - FD_ZERO(&rfds); - FD_SET(sock, &rfds); - - /* set a default timeout of 5 seconds - TODO: make use of the timeout, maybe allowing the user to - make a non-blocking (timeouted) call to fetchXXX */ - tv.tv_sec = 5; - tv.tv_usec = 0; - - Dprintf("pq_fetch: entering PDflush() loop"); - while (PQflush(curs->conn->pgconn) != 0); - sval = select(sock+1, &rfds, NULL, NULL, &tv); - - pthread_mutex_unlock(&(curs->conn->lock)); - Py_END_ALLOW_THREADS; - } - - Py_BEGIN_ALLOW_THREADS; - pthread_mutex_lock(&(curs->conn->lock)); - - Dprintf("pq_fetch: data is probably ready"); - IFCLEARPGRES(curs->pgres); - curs->pgres = PQgetResult(curs->conn->pgconn); - - pthread_mutex_unlock(&(curs->conn->lock)); - Py_END_ALLOW_THREADS; - } - /* check for PGRES_FATAL_ERROR result */ /* FIXME: I am not sure we need to check for critical error here. if (curs->pgres == NULL) {