mirror of
https://github.com/psycopg/psycopg2.git
synced 2025-03-03 07:45:45 +03:00
Block and clear the result of asynchronous queries where necessary
Remove the big loop in pq_fetch with the select() call, among others. Code paths that don't support asynchronous queries should now be adequately guarded.
This commit is contained in:
parent
3d2c315049
commit
1f6ffbba0f
|
@ -413,8 +413,6 @@ conn_set_client_encoding(connectionObject *self, const char *enc)
|
|||
issue any query to the backend */
|
||||
if (strcmp(self->encoding, enc) == 0) return 0;
|
||||
|
||||
/* TODO: check for async query here and raise error if necessary */
|
||||
|
||||
Py_BEGIN_ALLOW_THREADS;
|
||||
pthread_mutex_lock(&self->lock);
|
||||
|
||||
|
|
|
@ -67,6 +67,7 @@ psyco_curs_close(cursorObject *self, PyObject *args)
|
|||
}
|
||||
|
||||
self->closed = 1;
|
||||
pq_clear_async(self->conn);
|
||||
Dprintf("psyco_curs_close: cursor at %p closed", self);
|
||||
|
||||
Py_INCREF(Py_None);
|
||||
|
@ -313,19 +314,6 @@ _psyco_curs_execute(cursorObject *self,
|
|||
int res = 0;
|
||||
PyObject *fquery, *cvt = NULL;
|
||||
|
||||
Py_BEGIN_ALLOW_THREADS;
|
||||
pthread_mutex_lock(&(self->conn->lock));
|
||||
if (self->conn->async_cursor != NULL
|
||||
&& self->conn->async_cursor != (PyObject*)self) {
|
||||
pthread_mutex_unlock(&(self->conn->lock));
|
||||
Py_BLOCK_THREADS;
|
||||
psyco_set_error(ProgrammingError, (PyObject*)self,
|
||||
"asynchronous query already in execution", NULL, NULL);
|
||||
return 0;
|
||||
}
|
||||
pthread_mutex_unlock(&(self->conn->lock));
|
||||
Py_END_ALLOW_THREADS;
|
||||
|
||||
operation = _psyco_curs_validate_sql_basic(self, operation);
|
||||
|
||||
/* Any failure from here forward should 'goto fail' rather than 'return 0'
|
||||
|
@ -1084,6 +1072,10 @@ psyco_curs_scroll(cursorObject *self, PyObject *args, PyObject *kwargs)
|
|||
|
||||
EXC_IF_CURS_CLOSED(self);
|
||||
|
||||
if (self->conn->async == 1) {
|
||||
if (_psyco_curs_prefetch(self) < 0) return NULL;
|
||||
}
|
||||
|
||||
/* if the cursor is not named we have the full result set and we can do
|
||||
our own calculations to scroll; else we just delegate the scrolling
|
||||
to the MOVE SQL statement */
|
||||
|
|
|
@ -680,6 +680,10 @@ pq_execute(cursorObject *curs, const char *query, int async)
|
|||
Py_BEGIN_ALLOW_THREADS;
|
||||
pthread_mutex_lock(&(curs->conn->lock));
|
||||
|
||||
/* FIXME: we should first try to cancel the query, otherwise we will block
|
||||
until it completes AND until we get the result back */
|
||||
pq_clear_async(curs->conn);
|
||||
|
||||
if (pq_begin_locked(curs->conn, &pgres, &error) < 0) {
|
||||
pthread_mutex_unlock(&(curs->conn->lock));
|
||||
Py_BLOCK_THREADS;
|
||||
|
@ -704,15 +708,9 @@ pq_execute(cursorObject *curs, const char *query, int async)
|
|||
}
|
||||
|
||||
else if (async == 1) {
|
||||
/* first of all, let see if the previous query has already ended, if
|
||||
not what should we do? just block and discard data or execute
|
||||
another query? */
|
||||
pq_clear_async(curs->conn);
|
||||
|
||||
Dprintf("pq_execute: executing ASYNC query:");
|
||||
Dprintf(" %-.200s", query);
|
||||
|
||||
/* then we can go on and send a new query without fear */
|
||||
IFCLEARPGRES(curs->pgres);
|
||||
if (PQsendQuery(curs->conn->pgconn, query) == 0) {
|
||||
pthread_mutex_unlock(&(curs->conn->lock));
|
||||
|
@ -1129,50 +1127,6 @@ pq_fetch(cursorObject *curs)
|
|||
/* even if we fail, we remove any information about the previous query */
|
||||
curs_reset(curs);
|
||||
|
||||
/* we check the result from the previous execute; if the result is not
|
||||
already there, we need to consume some input and go to sleep until we
|
||||
get something edible to eat */
|
||||
if (!curs->pgres) {
|
||||
|
||||
Dprintf("pq_fetch: no data: entering polling loop");
|
||||
|
||||
while (pq_is_busy(curs->conn) > 0) {
|
||||
fd_set rfds;
|
||||
struct timeval tv;
|
||||
int sval, sock;
|
||||
|
||||
Py_BEGIN_ALLOW_THREADS;
|
||||
pthread_mutex_lock(&(curs->conn->lock));
|
||||
|
||||
sock = PQsocket(curs->conn->pgconn);
|
||||
FD_ZERO(&rfds);
|
||||
FD_SET(sock, &rfds);
|
||||
|
||||
/* set a default timeout of 5 seconds
|
||||
TODO: make use of the timeout, maybe allowing the user to
|
||||
make a non-blocking (timeouted) call to fetchXXX */
|
||||
tv.tv_sec = 5;
|
||||
tv.tv_usec = 0;
|
||||
|
||||
Dprintf("pq_fetch: entering PDflush() loop");
|
||||
while (PQflush(curs->conn->pgconn) != 0);
|
||||
sval = select(sock+1, &rfds, NULL, NULL, &tv);
|
||||
|
||||
pthread_mutex_unlock(&(curs->conn->lock));
|
||||
Py_END_ALLOW_THREADS;
|
||||
}
|
||||
|
||||
Py_BEGIN_ALLOW_THREADS;
|
||||
pthread_mutex_lock(&(curs->conn->lock));
|
||||
|
||||
Dprintf("pq_fetch: data is probably ready");
|
||||
IFCLEARPGRES(curs->pgres);
|
||||
curs->pgres = PQgetResult(curs->conn->pgconn);
|
||||
|
||||
pthread_mutex_unlock(&(curs->conn->lock));
|
||||
Py_END_ALLOW_THREADS;
|
||||
}
|
||||
|
||||
/* check for PGRES_FATAL_ERROR result */
|
||||
/* FIXME: I am not sure we need to check for critical error here.
|
||||
if (curs->pgres == NULL) {
|
||||
|
|
Loading…
Reference in New Issue
Block a user