poll implementation for async, sync and green connection unified.

This commit is contained in:
Daniele Varrazzo 2010-04-22 19:59:00 +01:00
parent 0da4befe78
commit cb40342afa
6 changed files with 187 additions and 488 deletions

View File

@ -45,16 +45,6 @@ extern "C" {
#define CONN_STATUS_DATESTYLE 21
#define CONN_STATUS_CLIENT_ENCODING 22
/* TODO: REMOVE THOSE */
#define CONN_STATUS_SYNC 3
#define CONN_STATUS_ASYNC 4
#define CONN_STATUS_SEND_DATESTYLE 5
#define CONN_STATUS_SENT_DATESTYLE 6
#define CONN_STATUS_GET_DATESTYLE 7
#define CONN_STATUS_SEND_CLIENT_ENCODING 8
#define CONN_STATUS_SENT_CLIENT_ENCODING 9
#define CONN_STATUS_GET_CLIENT_ENCODING 10
/* async query execution status */
#define ASYNC_DONE 0
#define ASYNC_READ 1
@ -137,12 +127,7 @@ HIDDEN int conn_commit(connectionObject *self);
HIDDEN int conn_rollback(connectionObject *self);
HIDDEN int conn_switch_isolation_level(connectionObject *self, int level);
HIDDEN int conn_set_client_encoding(connectionObject *self, const char *enc);
HIDDEN PyObject *conn_poll_connect_send(connectionObject *self);
HIDDEN PyObject *conn_poll_connect_fetch(connectionObject *self);
HIDDEN PyObject *conn_poll_ready(connectionObject *self);
HIDDEN PyObject *conn_poll_send(connectionObject *self);
HIDDEN PyObject *conn_poll_fetch(connectionObject *self);
HIDDEN PyObject *conn_poll_green(connectionObject *self);
HIDDEN int conn_poll(connectionObject *self);
/* exception-raising macros */
#define EXC_IF_CONN_CLOSED(self) if ((self)->closed > 0) { \

View File

@ -208,7 +208,6 @@ conn_get_encoding(PGresult *pgres)
for (i=0 ; i < strlen(tmp) ; i++)
encoding[i] = toupper(tmp[i]);
encoding[i] = '\0';
CLEARPGRES(pgres);
return encoding;
}
@ -258,10 +257,18 @@ conn_setup(connectionObject *self, PGconn *pgconn)
{
PGresult *pgres;
self->equote = conn_get_standard_conforming_strings(pgconn);
self->server_version = conn_get_server_version(pgconn);
self->protocol = conn_get_protocol_version(self->pgconn);
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&self->lock);
Py_BLOCK_THREADS;
if (pq_set_non_blocking(self, 1, 1) != 0) {
return -1;
}
if (!psyco_green()) {
Py_UNBLOCK_THREADS;
pgres = PQexec(pgconn, psyco_datestyle);
@ -305,6 +312,7 @@ conn_setup(connectionObject *self, PGconn *pgconn)
Py_BLOCK_THREADS;
return -1;
}
CLEARPGRES(pgres);
if (!psyco_green()) {
Py_UNBLOCK_THREADS;
@ -334,8 +342,8 @@ conn_setup(connectionObject *self, PGconn *pgconn)
/* conn_connect - execute a connection to the database */
int
conn_sync_connect(connectionObject *self)
static int
_conn_sync_connect(connectionObject *self)
{
PGconn *pgconn;
PyObject *wait_rv;
@ -382,19 +390,11 @@ conn_sync_connect(connectionObject *self)
}
}
self->equote = conn_get_standard_conforming_strings(pgconn);
self->server_version = conn_get_server_version(pgconn);
self->protocol = conn_get_protocol_version(self->pgconn);
/* From here the connection is considered ready: with the new status,
* poll() will use PQisBusy instead of PQconnectPoll.
*/
self->status = CONN_STATUS_READY;
if (pq_set_non_blocking(self, 1, 1) != 0) {
return -1;
}
if (conn_setup(self, self->pgconn) == -1) {
return -1;
}
@ -403,7 +403,7 @@ conn_sync_connect(connectionObject *self)
}
static int
conn_async_connect(connectionObject *self)
_conn_async_connect(connectionObject *self)
{
PGconn *pgconn;
@ -426,7 +426,10 @@ conn_async_connect(connectionObject *self)
PQsetNoticeProcessor(pgconn, conn_notice_callback, (void*)self);
self->status = CONN_STATUS_SETUP;
/* The connection will be completed banging on poll():
* First with _conn_poll_connecting() that will finish connection,
* then with _conn_poll_setup_async() that will do the same job
* of setup_async(). */
return 0;
}
@ -436,340 +439,23 @@ conn_connect(connectionObject *self, long int async)
{
if (async == 1) {
Dprintf("con_connect: connecting in ASYNC mode");
return conn_async_connect(self);
return _conn_async_connect(self);
}
else {
Dprintf("con_connect: connecting in SYNC mode");
return conn_sync_connect(self);
return _conn_sync_connect(self);
}
}
/* conn_poll_connect_send - handle connection polling when flushing output
during asynchronous connection attempt. */
PyObject *
conn_poll_connect_send(connectionObject *self)
{
const char *query = NULL;
int next_status;
int ret;
Dprintf("conn_poll_connect_send: status %d", self->status);
switch (self->status) {
case CONN_STATUS_SEND_DATESTYLE:
/* set the datestyle */
query = psyco_datestyle;
next_status = CONN_STATUS_SENT_DATESTYLE;
break;
case CONN_STATUS_SEND_CLIENT_ENCODING:
/* get the client_encoding */
query = psyco_client_encoding;
next_status = CONN_STATUS_SENT_CLIENT_ENCODING;
break;
case CONN_STATUS_SENT_DATESTYLE:
case CONN_STATUS_SENT_CLIENT_ENCODING:
/* the query has only been partially sent */
query = NULL;
next_status = self->status;
break;
default:
/* unexpected state, error out */
PyErr_Format(OperationalError,
"unexpected state: %d", self->status);
return NULL;
}
Dprintf("conn_poll_connect_send: sending query %-.200s", query);
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&(self->lock));
if (query != NULL) {
if (PQsendQuery(self->pgconn, query) != 1) {
pthread_mutex_unlock(&(self->lock));
Py_BLOCK_THREADS;
PyErr_SetString(OperationalError,
PQerrorMessage(self->pgconn));
return NULL;
}
}
if (PQflush(self->pgconn) == 0) {
/* the query got fully sent to the server */
Dprintf("conn_poll_connect_send: query got flushed immediately");
/* the return value will be POLL_READ */
ret = PSYCO_POLL_READ;
/* advance the next status, since we skip over the "waiting for the
query to be sent" status */
switch (next_status) {
case CONN_STATUS_SENT_DATESTYLE:
next_status = CONN_STATUS_GET_DATESTYLE;
break;
case CONN_STATUS_SENT_CLIENT_ENCODING:
next_status = CONN_STATUS_GET_CLIENT_ENCODING;
break;
}
}
else {
/* query did not get sent completely, tell the client to wait for the
socket to become writable */
ret = PSYCO_POLL_WRITE;
}
self->status = next_status;
Dprintf("conn_poll_connect_send: next status is %d, returning %d",
self->status, ret);
pthread_mutex_unlock(&(self->lock));
Py_END_ALLOW_THREADS;
return PyInt_FromLong(ret);
}
/* conn_poll_connect_fetch - handle connection polling when reading result
during asynchronous connection attempt. */
PyObject *
conn_poll_connect_fetch(connectionObject *self)
{
PGresult *pgres;
int is_busy;
int next_status;
int ret;
Dprintf("conn_poll_connect_fetch: status %d", self->status);
/* consume the input */
is_busy = pq_is_busy(self);
if (is_busy == -1) {
/* there was an error, raise the exception */
return NULL;
}
else if (is_busy == 1) {
/* the connection is busy, tell the user to wait more */
Dprintf("conn_poll_connect_fetch: connection busy, returning %d",
PSYCO_POLL_READ);
return PyInt_FromLong(PSYCO_POLL_READ);
}
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&(self->lock));
/* connection no longer busy, process input */
pgres = PQgetResult(self->pgconn);
/* do the rest while holding the GIL, we won't be calling into any
blocking API */
pthread_mutex_unlock(&(self->lock));
Py_END_ALLOW_THREADS;
Dprintf("conn_poll_connect_fetch: got result %p", pgres);
/* we expect COMMAND_OK (from SET) or TUPLES_OK (from SHOW) */
if (pgres == NULL || (PQresultStatus(pgres) != PGRES_COMMAND_OK &&
PQresultStatus(pgres) != PGRES_TUPLES_OK)) {
PyErr_SetString(OperationalError, "can't issue "
"initial connection queries");
IFCLEARPGRES(pgres);
return NULL;
}
if (self->status == CONN_STATUS_GET_DATESTYLE) {
/* got the result from SET DATESTYLE*/
Dprintf("conn_poll_connect_fetch: datestyle set");
next_status = CONN_STATUS_SEND_CLIENT_ENCODING;
}
else if (self->status == CONN_STATUS_GET_CLIENT_ENCODING) {
/* got the client_encoding */
self->encoding = conn_get_encoding(pgres);
if (self->encoding == NULL) {
return NULL;
}
Dprintf("conn_poll_connect_fetch: got client_encoding %s", self->encoding);
/* since this is the last step, set the other instance variables now */
self->equote = conn_get_standard_conforming_strings(self->pgconn);
self->protocol = conn_get_protocol_version(self->pgconn);
self->server_version = conn_get_server_version(self->pgconn);
/*
* asynchronous connections always use isolation level 0, the user is
* expected to manage the transactions himself, by sending
* (asynchronously) BEGIN and COMMIT statements.
*/
self->isolation_level = 0;
/* FIXME: this is a bug: the above queries were sent to the server
with a blocking connection */
if (pq_set_non_blocking(self, 1, 1) != 0) {
return NULL;
}
/* next status is going to READY */
next_status = CONN_STATUS_READY;
}
else {
/* unexpected state, error out */
PyErr_Format(OperationalError,
"unexpected state: %d", self->status);
return NULL;
}
/* clear any leftover result, there should be none, but the protocol
requires calling PQgetResult until you get a NULL */
pq_clear_async(self);
self->status = next_status;
/* if the curent status is READY it means we got the result of the
last initialization query, so we return POLL_OK, otherwise we need to
send another query, so return POLL_WRITE */
ret = self->status == CONN_STATUS_READY ? PSYCO_POLL_OK : PSYCO_POLL_WRITE;
Dprintf("conn_poll_connect_fetch: next status is %d, returning %d",
self->status, ret);
return PyInt_FromLong(ret);
}
/* conn_poll_ready - handle connection polling when it is already open */
PyObject *
conn_poll_ready(connectionObject *self)
{
int is_busy;
/* if there is an asynchronous query underway, poll it */
if (self->async_cursor != NULL) {
if (self->async_status == ASYNC_WRITE) {
return conn_poll_send(self);
}
else {
/* this gets called both for ASYNC_READ and ASYNC_DONE, because
even if the async query is complete, we still might want to
check for NOTIFYs */
return conn_poll_fetch(self);
}
}
/* otherwise just check for NOTIFYs */
is_busy = pq_is_busy(self);
if (is_busy == -1) {
/* there was an error, raise the exception */
return NULL;
}
else if (is_busy == 1) {
/* the connection is busy, tell the user to wait more */
Dprintf("conn_poll_ready: returning %d", PSYCO_POLL_READ);
return PyInt_FromLong(PSYCO_POLL_READ);
}
else {
/* connection is idle */
Dprintf("conn_poll_ready: returning %d", PSYCO_POLL_OK);
return PyInt_FromLong(PSYCO_POLL_OK);
}
}
/* conn_poll_send - poll the connection when flushing data to the backend */
PyObject *
conn_poll_send(connectionObject *self)
{
int res;
/* flush queued output to the server */
res = pq_flush(self);
if (res == 1) {
/* some data still waiting to be flushed */
Dprintf("conn_poll_send: returning %d", PSYCO_POLL_WRITE);
return PyInt_FromLong(PSYCO_POLL_WRITE);
}
else if (res == 0) {
/* all data flushed, start waiting for results */
Dprintf("conn_poll_send: returning %d", PSYCO_POLL_READ);
self->async_status = ASYNC_READ;
return PyInt_FromLong(PSYCO_POLL_READ);
}
else {
/* unexpected result */
PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn));
return NULL;
}
}
/* conn_poll_fetch - poll the connection when reading results from the backend
*
* If self_curs is available, use it to store the result of the last query.
* Also unlink it when finished.
*/
PyObject *
conn_poll_fetch(connectionObject *self)
{
int is_busy;
int last_result;
/* consume the input */
is_busy = pq_is_busy(self);
if (is_busy == -1) {
/* there was an error, raise the exception */
return NULL;
}
else if (is_busy == 1) {
/* the connection is busy, tell the user to wait more */
Dprintf("conn_poll_fetch: returning %d", PSYCO_POLL_READ);
return PyInt_FromLong(PSYCO_POLL_READ);
}
/* try to fetch the data only if this was a poll following a read
request; else just return POLL_OK to the user: this is necessary
because of asynchronous NOTIFYs that can be sent by the backend
even if the user didn't asked for them */
if (self->async_status == ASYNC_READ && self->async_cursor) {
cursorObject *curs = (cursorObject *)self->async_cursor;
IFCLEARPGRES(curs->pgres);
curs->pgres = pq_get_last_result(self);
/* fetch the tuples (if there are any) and build the result. We don't
* care if pq_fetch return 0 or 1, but if there was an error, we want to
* signal it to the caller. */
last_result = pq_fetch(curs) == -1 ? -1 : 0;
/* We have finished with our async_cursor */
Py_XDECREF(self->async_cursor);
self->async_cursor = NULL;
}
else {
last_result = 0;
}
if (last_result == 0) {
Dprintf("conn_poll_fetch: returning %d", PSYCO_POLL_OK);
/* self->async_status cannot be ASYNC_WRITE here, because we
never execute curs_poll_fetch in ASYNC_WRITE state, so we can
safely set it to ASYNC_DONE because we either fetched the result or
there is no result to fetch */
self->async_status = ASYNC_DONE;
return PyInt_FromLong(PSYCO_POLL_OK);
}
else if (last_result == 1) {
Dprintf("conn_poll_fetch: got result, but data remaining, "
"returning %d", PSYCO_POLL_READ);
return PyInt_FromLong(PSYCO_POLL_READ);
}
else {
return NULL;
}
}
/* poll during a connection attempt until the connection has established. */
int
static int
_conn_poll_connecting(connectionObject *self)
{
int res = PSYCO_POLL_ERROR;
Dprintf("conn_poll: poll connecting");
switch (PQconnectPoll(self->pgconn)) {
case PGRES_POLLING_OK:
res = PSYCO_POLL_OK;
@ -792,11 +478,13 @@ _conn_poll_connecting(connectionObject *self)
/* Advance to the next state after an attempt of flushing output */
int
static int
_conn_poll_advance_write(connectionObject *self, int flush)
{
int res;
Dprintf("conn_poll: poll writing");
switch (flush) {
case 0: /* success */
/* we've finished pushing the query to the server. Let's start
@ -821,11 +509,12 @@ _conn_poll_advance_write(connectionObject *self, int flush)
}
/* Advance to the next state after a call to a pq_is_busy* function */
int
static int
_conn_poll_advance_read(connectionObject *self, int busy)
{
int res;
Dprintf("conn_poll: poll reading");
switch (busy) {
case 0: /* result is ready */
res = PSYCO_POLL_OK;
@ -850,7 +539,8 @@ _conn_poll_advance_read(connectionObject *self, int busy)
Advance the async_status (usually going WRITE -> READ -> DONE) but don't
mess with the connection status. */
int
static int
_conn_poll_query(connectionObject *self)
{
int res = PSYCO_POLL_ERROR;
@ -891,30 +581,148 @@ _conn_poll_query(connectionObject *self)
return res;
}
/* conn_poll_green - poll a *sync* connection with external wait */
PyObject *
conn_poll_green(connectionObject *self)
/* Advance to the next state during an async connection setup
*
* If the connection is green, this is performed by the regular
* sync code so the queries are sent by conn_setup() while in
* CONN_STATUS_READY state.
*/
static int
_conn_poll_setup_async(connectionObject *self)
{
int res = PSYCO_POLL_ERROR;
PGresult *pgres;
switch (self->status) {
case CONN_STATUS_CONNECTING:
/* Set the connection to nonblocking now. */
if (pq_set_non_blocking(self, 1, 1) != 0) {
break;
}
self->equote = conn_get_standard_conforming_strings(self->pgconn);
self->protocol = conn_get_protocol_version(self->pgconn);
self->server_version = conn_get_server_version(self->pgconn);
/* asynchronous connections always use isolation level 0, the user is
* expected to manage the transactions himself, by sending
* (asynchronously) BEGIN and COMMIT statements.
*/
self->isolation_level = 0;
Dprintf("conn_poll: status -> CONN_STATUS_DATESTYLE");
self->status = CONN_STATUS_DATESTYLE;
if (0 == pq_send_query(self, psyco_datestyle)) {
PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn));
break;
}
Dprintf("conn_poll: async_status -> ASYNC_WRITE");
self->async_status = ASYNC_WRITE;
res = PSYCO_POLL_WRITE;
break;
case CONN_STATUS_DATESTYLE:
res = _conn_poll_query(self);
if (res == PSYCO_POLL_OK) {
res = PSYCO_POLL_ERROR;
pgres = pq_get_last_result(self);
if (pgres == NULL || PQresultStatus(pgres) != PGRES_COMMAND_OK ) {
PyErr_SetString(OperationalError, "can't set datestyle to ISO");
break;
}
CLEARPGRES(pgres);
Dprintf("conn_poll: status -> CONN_STATUS_CLIENT_ENCODING");
self->status = CONN_STATUS_CLIENT_ENCODING;
if (0 == pq_send_query(self, psyco_client_encoding)) {
PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn));
break;
}
Dprintf("conn_poll: async_status -> ASYNC_WRITE");
self->async_status = ASYNC_WRITE;
res = PSYCO_POLL_WRITE;
}
break;
case CONN_STATUS_CLIENT_ENCODING:
res = _conn_poll_query(self);
if (res == PSYCO_POLL_OK) {
res = PSYCO_POLL_ERROR;
pgres = pq_get_last_result(self);
if (pgres == NULL || PQresultStatus(pgres) != PGRES_TUPLES_OK) {
PyErr_SetString(OperationalError, "can't fetch client_encoding");
break;
}
/* conn_get_encoding returns a malloc'd string */
self->encoding = conn_get_encoding(pgres);
CLEARPGRES(pgres);
if (self->encoding == NULL) { break; }
Dprintf("conn_poll: status -> CONN_STATUS_READY");
self->status = CONN_STATUS_READY;
res = PSYCO_POLL_OK;
}
break;
}
return res;
}
/* conn_poll - Main polling switch
*
* The function is called in all the states and connection types and invokes
* the right "next step".
*/
int
conn_poll(connectionObject *self)
{
int res = PSYCO_POLL_ERROR;
Dprintf("conn_poll: status = %d", self->status);
switch (self->status) {
case CONN_STATUS_SETUP:
Dprintf("conn_poll: status = CONN_STATUS_SETUP");
Dprintf("conn_poll: status -> CONN_STATUS_CONNECTING");
self->status = CONN_STATUS_CONNECTING;
res = PSYCO_POLL_WRITE;
break;
case CONN_STATUS_CONNECTING:
Dprintf("conn_poll: status = CONN_STATUS_CONNECTING");
res = _conn_poll_connecting(self);
if (res == PSYCO_POLL_OK && self->async) {
res = _conn_poll_setup_async(self);
}
break;
case CONN_STATUS_DATESTYLE:
case CONN_STATUS_CLIENT_ENCODING:
res = _conn_poll_setup_async(self);
break;
case CONN_STATUS_READY:
case CONN_STATUS_BEGIN:
Dprintf("conn_poll: status = CONN_STATUS_READY/BEGIN");
res = _conn_poll_query(self);
if (res == PSYCO_POLL_OK && self->async_cursor) {
/* An async query has just finished: parse the tuple in the
* target cursor. */
cursorObject *curs = (cursorObject *)self->async_cursor;
IFCLEARPGRES(curs->pgres);
curs->pgres = pq_get_last_result(self);
/* fetch the tuples (if there are any) and build the result. We
* don't care if pq_fetch return 0 or 1, but if there was an error,
* we want to signal it to the caller. */
if (pq_fetch(curs) == -1) {
res = PSYCO_POLL_ERROR;
}
/* We have finished with our async_cursor */
Py_XDECREF(self->async_cursor);
self->async_cursor = NULL;
}
break;
default:
@ -922,12 +730,7 @@ conn_poll_green(connectionObject *self)
res = PSYCO_POLL_ERROR;
}
if (!(res == PSYCO_POLL_ERROR && PyErr_Occurred())) {
return PyInt_FromLong(res);
} else {
/* There is an error and an exception is already in place */
return NULL;
}
return res;
}
/* conn_close - do anything needed to shut down the connection */

View File

@ -408,129 +408,19 @@ psyco_conn_get_exception(PyObject *self, void *closure)
return exception;
}
#define psyco_conn_poll_doc \
"poll() -- return POLL_OK if the operation has finished, " \
"POLL_READ if the application should be waiting " \
"for the socket to be readable or POLL_WRITE " \
"if the socket should be writable."
static PyObject *
psyco_conn_poll_async(connectionObject *self)
{
PostgresPollingStatusType poll_status;
Dprintf("conn_poll: polling with status %d", self->status);
switch (self->status) {
case CONN_STATUS_SETUP:
/* according to libpq documentation the user should start by waiting
for the socket to become writable */
self->status = CONN_STATUS_CONNECTING;
return PyInt_FromLong(PSYCO_POLL_WRITE);
case CONN_STATUS_CONNECTING:
/* this means we are in the middle of a PQconnectPoll loop */
break;
case CONN_STATUS_SEND_DATESTYLE:
case CONN_STATUS_SENT_DATESTYLE:
case CONN_STATUS_SEND_CLIENT_ENCODING:
case CONN_STATUS_SENT_CLIENT_ENCODING:
/* these mean that we need to wait for the socket to become writable
to send the rest of our query */
return conn_poll_connect_send(self);
case CONN_STATUS_GET_DATESTYLE:
case CONN_STATUS_GET_CLIENT_ENCODING:
/* these mean that we are waiting for the results of the queries */
return conn_poll_connect_fetch(self);
case CONN_STATUS_READY:
case CONN_STATUS_BEGIN:
/* The connection is ready, but we might be in an asynchronous query,
or we just might want to check for NOTIFYs. For synchronous
connections the status might be BEGIN, not READY. */
return conn_poll_ready(self);
default:
/* everything else is an error */
PyErr_SetString(OperationalError,
"not in asynchronous connection attempt");
return NULL;
}
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&self->lock);
poll_status = PQconnectPoll(self->pgconn);
if (poll_status == PGRES_POLLING_READING) {
pthread_mutex_unlock(&(self->lock));
Py_BLOCK_THREADS;
Dprintf("conn_poll: returing POLL_READ");
return PyInt_FromLong(PSYCO_POLL_READ);
}
if (poll_status == PGRES_POLLING_WRITING) {
pthread_mutex_unlock(&(self->lock));
Py_BLOCK_THREADS;
Dprintf("conn_poll: returing POLL_WRITE");
return PyInt_FromLong(PSYCO_POLL_WRITE);
}
if (poll_status == PGRES_POLLING_FAILED) {
pthread_mutex_unlock(&(self->lock));
Py_BLOCK_THREADS;
PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn));
return NULL;
}
/* the only other thing that PQconnectPoll can return is PGRES_POLLING_OK,
but make sure */
if (poll_status != PGRES_POLLING_OK) {
pthread_mutex_unlock(&(self->lock));
Py_BLOCK_THREADS;
PyErr_Format(OperationalError,
"unexpected result from PQconnectPoll: %d", poll_status);
return NULL;
}
Dprintf("conn_poll: got POLL_OK");
/* the connection is built, but we want to do a few other things before we
let the user use it */
self->equote = conn_get_standard_conforming_strings(self->pgconn);
Dprintf("conn_poll: got standard_conforming_strings");
/*
* Here is the tricky part, we need to figure the datestyle,
* client_encoding and isolevel, all using nonblocking calls. To do that
* we will keep telling the user to poll, while we are waiting for our
* asynchronous queries to complete.
*/
pthread_mutex_unlock(&(self->lock));
Py_END_ALLOW_THREADS;
/* the next operation the client will do is send a query, so ask him to
wait for a writable condition */
self->status = CONN_STATUS_SEND_DATESTYLE;
Dprintf("conn_poll: connection is built, retrning %d",
PSYCO_POLL_WRITE);
return PyInt_FromLong(PSYCO_POLL_WRITE);
}
static PyObject *
psyco_conn_poll(connectionObject *self)
{
int res;
EXC_IF_CONN_CLOSED(self);
if (self->async) {
return psyco_conn_poll_async(self);
res = conn_poll(self);
if (res != PSYCO_POLL_ERROR || !PyErr_Occurred()) {
return PyInt_FromLong(res);
} else {
return conn_poll_green(self);
/* There is an error and an exception is already in place */
return NULL;
}
}

View File

@ -147,14 +147,13 @@ psyco_exec_green(connectionObject *conn, const char *command)
PGresult *result = NULL;
PyObject *cb, *pyrv;
Dprintf("psyco_exec_green: executing query async");
if (!(cb = have_wait_callback())) {
goto end;
}
/* Send the query asynchronously */
Dprintf("psyco_exec_green: sending query async");
if (0 == PQsendQuery(conn->pgconn, command)) {
Dprintf("psyco_exec_green: PQsendQuery returned 0");
if (0 == pq_send_query(conn, command)) {
goto clear;
}

View File

@ -803,6 +803,27 @@ pq_execute(cursorObject *curs, const char *query, int async)
return 1-async;
}
/* send an async query to the backend.
*
* Return 1 if command succeeded, else 0.
*
* The function should be called helding the connection lock and the GIL.
*/
int
pq_send_query(connectionObject *conn, const char *query)
{
int rv;
Dprintf("pq_send_query: sending ASYNC query:");
Dprintf(" %-.200s", query);
if (0 == (rv = PQsendQuery(conn->pgconn, query))) {
Dprintf("pq_send_query: error: %s", PQerrorMessage(conn->pgconn));
}
return rv;
}
/* Return the last result available on the connection.
*
* The function will block will block only if a command is active and the

View File

@ -38,6 +38,7 @@
HIDDEN PGresult *pq_get_last_result(connectionObject *conn);
HIDDEN int pq_fetch(cursorObject *curs);
HIDDEN int pq_execute(cursorObject *curs, const char *query, int async);
HIDDEN int pq_send_query(connectionObject *conn, const char *query);
HIDDEN int pq_begin_locked(connectionObject *conn, PGresult **pgres,
char **error, PyThreadState **tstate);
HIDDEN int pq_commit(connectionObject *conn);