Added a function to advance the state after a read attempt.

Dropped calls to PQisBusy/PQconsumeInput in the connection code.
This commit is contained in:
Daniele Varrazzo 2010-04-22 17:46:00 +01:00
parent e82d2be64b
commit f3b3483df3
3 changed files with 58 additions and 23 deletions

View File

@ -791,6 +791,32 @@ _conn_poll_connecting(connectionObject *self)
} }
/* Advance to the next state after a call to a pq_is_busy* function */
int
_conn_poll_advance_read(connectionObject *self, int busy)
{
int res;
switch (busy) {
case 0: /* result is ready */
res = PSYCO_POLL_OK;
Dprintf("conn_poll: async_status -> ASYNC_DONE");
self->async_status = ASYNC_DONE;
break;
case 1: /* result not ready: fd would block */
res = PSYCO_POLL_READ;
break;
case -1: /* ouch, error */
res = PSYCO_POLL_ERROR;
break;
default:
Dprintf("conn_poll: unexpected result from pq_is_busy: %d", busy);
res = PSYCO_POLL_ERROR;
break;
}
return res;
}
/* Poll the connection for the send query/retrieve result phase /* Poll the connection for the send query/retrieve result phase
Advance the async_status (usually going WRITE -> READ -> DONE) but don't Advance the async_status (usually going WRITE -> READ -> DONE) but don't
@ -822,42 +848,29 @@ _conn_poll_query(connectionObject *self)
case ASYNC_READ: case ASYNC_READ:
Dprintf("conn_poll: async_status = ASYNC_READ"); Dprintf("conn_poll: async_status = ASYNC_READ");
if (0 == PQconsumeInput(self->pgconn)) { if (self->async) {
PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn)); res = _conn_poll_advance_read(self, pq_is_busy(self));
res = PSYCO_POLL_ERROR;
} }
if (PQisBusy(self->pgconn)) { else {
res = PSYCO_POLL_READ; /* we are a green connection being polled as result of a query.
} else { this means that our caller has the lock and we are being called
/* Reading complete: set the async status so that a spare poll() from the callback. If we tried to acquire the lock now it would
will only look for NOTIFYs */ be a deadlock. */
self->async_status = ASYNC_DONE; res = _conn_poll_advance_read(self, pq_is_busy_locked(self));
res = PSYCO_POLL_OK;
} }
break; break;
case ASYNC_DONE: case ASYNC_DONE:
Dprintf("conn_poll: async_status = ASYNC_DONE"); Dprintf("conn_poll: async_status = ASYNC_DONE");
/* We haven't asked anything: just check for notifications. */ /* We haven't asked anything: just check for notifications. */
switch (pq_is_busy(self)) { res = _conn_poll_advance_read(self, pq_is_busy(self));
case 0: /* will not block */
res = PSYCO_POLL_OK;
break;
case 1: /* will block */
res = PSYCO_POLL_READ;
break;
case -1: /* ouch, error */
break;
default:
Dprintf("conn_poll: unexpected result from pq_is_busy");
break;
}
break; break;
default: default:
Dprintf("conn_poll: in unexpected async status: %d", Dprintf("conn_poll: in unexpected async status: %d",
self->async_status); self->async_status);
res = PSYCO_POLL_ERROR; res = PSYCO_POLL_ERROR;
break;
} }
return res; return res;

View File

@ -643,6 +643,27 @@ pq_is_busy(connectionObject *conn)
return res; return res;
} }
/* pq_is_busy_locked - equivalent to pq_is_busy but we already have the lock
*
* The function should be called with the lock and holding the GIL.
*/
int
pq_is_busy_locked(connectionObject *conn)
{
Dprintf("pq_is_busy_locked: consuming input");
if (PQconsumeInput(conn->pgconn) == 0) {
Dprintf("pq_is_busy_locked: PQconsumeInput() failed");
PyErr_SetString(OperationalError, PQerrorMessage(conn->pgconn));
return -1;
}
/* We can't call conn_notice_process/conn_notifies_process because
they try to get the lock. We don't need anyway them because at the end of
the loop we are in (async reading) pq_fetch will be called. */
return PQisBusy(conn->pgconn);
}
/* pq_flush - flush output and return connection status /* pq_flush - flush output and return connection status

View File

@ -46,6 +46,7 @@ HIDDEN int pq_abort_locked(connectionObject *conn, PGresult **pgres,
HIDDEN int pq_abort(connectionObject *conn); HIDDEN int pq_abort(connectionObject *conn);
HIDDEN int pq_reset(connectionObject *conn); HIDDEN int pq_reset(connectionObject *conn);
HIDDEN int pq_is_busy(connectionObject *conn); HIDDEN int pq_is_busy(connectionObject *conn);
HIDDEN int pq_is_busy_locked(connectionObject *conn);
HIDDEN int pq_flush(connectionObject *conn); HIDDEN int pq_flush(connectionObject *conn);
HIDDEN void pq_clear_async(connectionObject *conn); HIDDEN void pq_clear_async(connectionObject *conn);
HIDDEN int pq_set_non_blocking(connectionObject *conn, int arg, int pyerr); HIDDEN int pq_set_non_blocking(connectionObject *conn, int arg, int pyerr);