Reusable parts of the green polling refactored out.

The functions _conn_poll_connecting() and _conn_poll_query() will be
usable by the async connections too.
This commit is contained in:
Daniele Varrazzo 2010-04-22 14:46:19 +01:00
parent df959c20be
commit 5be0fc52ca
3 changed files with 117 additions and 83 deletions

View File

@ -40,9 +40,14 @@ extern "C" {
#define CONN_STATUS_SETUP 0
#define CONN_STATUS_READY 1
#define CONN_STATUS_BEGIN 2
/* async connection building statuses */
#define CONN_STATUS_CONNECTING 20
#define CONN_STATUS_DATESTYLE 21
#define CONN_STATUS_CLIENT_ENCODING 22
/* TODO: REMOVE THOSE */
#define CONN_STATUS_SYNC 3
#define CONN_STATUS_ASYNC 4
/* async connection building statuses */
#define CONN_STATUS_SEND_DATESTYLE 5
#define CONN_STATUS_SENT_DATESTYLE 6
#define CONN_STATUS_GET_DATESTYLE 7

View File

@ -749,6 +749,107 @@ conn_poll_fetch(connectionObject *self)
}
}
/* poll during a connection attempt until the connection has established. */
int
_conn_poll_connecting(connectionObject *self)
{
int res = PSYCO_POLL_ERROR;
switch (PQconnectPoll(self->pgconn)) {
case PGRES_POLLING_OK:
res = PSYCO_POLL_OK;
break;
case PGRES_POLLING_READING:
res = PSYCO_POLL_READ;
break;
case PGRES_POLLING_WRITING:
res = PSYCO_POLL_WRITE;
break;
case PGRES_POLLING_FAILED:
case PGRES_POLLING_ACTIVE:
PyErr_SetString(OperationalError, "asynchronous connection failed");
res = PSYCO_POLL_ERROR;
break;
}
return res;
}
/* Poll the connection for the send query/retrieve result phase
Advance the async_status (usually going WRITE -> READ -> DONE) but don't
mess with the connection status. */
int
_conn_poll_query(connectionObject *self)
{
int res = PSYCO_POLL_ERROR;
switch (self->async_status) {
case ASYNC_WRITE:
Dprintf("conn_poll: async_status = ASYNC_WRITE");
switch (PQflush(self->pgconn)) {
case 0: /* success */
/* we've finished pushing the query to the server. Let's start
reading the results. */
self->async_status = ASYNC_READ;
res = PSYCO_POLL_READ;
break;
case 1: /* would block */
res = PSYCO_POLL_WRITE;
break;
case -1: /* error */
PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn));
res = PSYCO_POLL_ERROR;
break;
}
break;
case ASYNC_READ:
Dprintf("conn_poll: async_status = ASYNC_READ");
if (0 == PQconsumeInput(self->pgconn)) {
PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn));
res = PSYCO_POLL_ERROR;
}
if (PQisBusy(self->pgconn)) {
res = PSYCO_POLL_READ;
} else {
/* Reading complete: set the async status so that a spare poll()
will only look for NOTIFYs */
self->async_status = ASYNC_DONE;
res = PSYCO_POLL_OK;
}
break;
case ASYNC_DONE:
Dprintf("conn_poll: async_status = ASYNC_DONE");
/* We haven't asked anything: just check for notifications. */
switch (pq_is_busy(self)) {
case 0: /* will not block */
res = PSYCO_POLL_OK;
break;
case 1: /* will block */
res = PSYCO_POLL_READ;
break;
case -1: /* ouch, error */
break;
default:
Dprintf("conn_poll: unexpected result from pq_is_busy");
break;
}
break;
default:
Dprintf("conn_poll: in unexpected async status: %d",
self->async_status);
res = PSYCO_POLL_ERROR;
}
return res;
}
/* conn_poll_green - poll a *sync* connection with external wait */
PyObject *
@ -759,91 +860,19 @@ conn_poll_green(connectionObject *self)
switch (self->status) {
case CONN_STATUS_SETUP:
Dprintf("conn_poll: status = CONN_STATUS_SETUP");
self->status = CONN_STATUS_ASYNC;
self->status = CONN_STATUS_CONNECTING;
res = PSYCO_POLL_WRITE;
break;
case CONN_STATUS_ASYNC:
Dprintf("conn_poll: status = CONN_STATUS_ASYNC");
switch (PQconnectPoll(self->pgconn)) {
case PGRES_POLLING_OK:
res = PSYCO_POLL_OK;
break;
case PGRES_POLLING_READING:
res = PSYCO_POLL_READ;
break;
case PGRES_POLLING_WRITING:
res = PSYCO_POLL_WRITE;
break;
case PGRES_POLLING_FAILED:
case PGRES_POLLING_ACTIVE:
res = PSYCO_POLL_ERROR;
break;
}
case CONN_STATUS_CONNECTING:
Dprintf("conn_poll: status = CONN_STATUS_CONNECTING");
res = _conn_poll_connecting(self);
break;
case CONN_STATUS_READY:
case CONN_STATUS_BEGIN:
Dprintf("conn_poll: status = CONN_STATUS_READY/BEGIN");
switch (self->async_status) {
case ASYNC_READ:
Dprintf("conn_poll: async_status = ASYNC_READ");
if (0 == PQconsumeInput(self->pgconn)) {
PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn));
res = PSYCO_POLL_ERROR;
}
if (PQisBusy(self->pgconn)) {
res = PSYCO_POLL_READ;
} else {
/* Reading complete: set the async status so that a spare poll()
will only look for NOTIFYs */
self->async_status = ASYNC_DONE;
res = PSYCO_POLL_OK;
}
break;
case ASYNC_WRITE:
Dprintf("conn_poll: async_status = ASYNC_WRITE");
switch (PQflush(self->pgconn)) {
case 0: /* success */
/* we've finished pushing the query to the server. Let's start
reading the results. */
self->async_status = ASYNC_READ;
res = PSYCO_POLL_READ;
break;
case 1: /* would block */
res = PSYCO_POLL_WRITE;
break;
case -1: /* error */
PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn));
res = PSYCO_POLL_ERROR;
break;
}
break;
case ASYNC_DONE:
Dprintf("conn_poll: async_status = ASYNC_DONE");
/* We haven't asked anything: just check for notifications. */
switch (pq_is_busy(self)) {
case 0: /* will not block */
res = PSYCO_POLL_OK;
break;
case 1: /* will block */
res = PSYCO_POLL_READ;
break;
case -1: /* ouch, error */
break;
default:
Dprintf("conn_poll: unexpected result from pq_is_busy");
break;
}
break;
default:
Dprintf("conn_poll: in unexpected async status: %d",
self->async_status);
res = PSYCO_POLL_ERROR;
}
res = _conn_poll_query(self);
break;
default:

View File

@ -426,9 +426,13 @@ psyco_conn_poll_async(connectionObject *self)
case CONN_STATUS_SETUP:
/* according to libpq documentation the user should start by waiting
for the socket to become writable */
self->status = CONN_STATUS_ASYNC;
self->status = CONN_STATUS_CONNECTING;
return PyInt_FromLong(PSYCO_POLL_WRITE);
case CONN_STATUS_CONNECTING:
/* this means we are in the middle of a PQconnectPoll loop */
break;
case CONN_STATUS_SEND_DATESTYLE:
case CONN_STATUS_SENT_DATESTYLE:
case CONN_STATUS_SEND_CLIENT_ENCODING:
@ -442,10 +446,6 @@ psyco_conn_poll_async(connectionObject *self)
/* these mean that we are waiting for the results of the queries */
return conn_poll_connect_fetch(self);
case CONN_STATUS_ASYNC:
/* this means we are in the middle of a PQconnectPoll loop */
break;
case CONN_STATUS_READY:
case CONN_STATUS_BEGIN:
/* The connection is ready, but we might be in an asynchronous query,