diff --git a/psycopg/green.c b/psycopg/green.c index 874c264d..a49f818a 100644 --- a/psycopg/green.c +++ b/psycopg/green.c @@ -28,9 +28,12 @@ #include "psycopg/python.h" #include "psycopg/psycopg.h" #include "psycopg/green.h" +#include "psycopg/connection.h" HIDDEN PyObject *wait_callback = NULL; +PyObject *have_wait_callback(void); + /* Register a callback function to block waiting for data. * * The function is exported by the _psycopg module. @@ -64,6 +67,26 @@ psyco_green() #endif } +/* Return the wait callback if available. + * + * If not available, set a Python exception and return. + * + * The function returns a new reference: decref after use. + */ +PyObject * +have_wait_callback() +{ + PyObject *cb; + + cb = wait_callback; + if (!cb) { + PyErr_SetString(OperationalError, "wait callback not available"); + return NULL; + } + Py_INCREF(cb); + return cb; +} + /* Block waiting for data available in an async connection. * * This function assumes `wait_callback` to be available: @@ -79,16 +102,71 @@ psyco_wait(PyObject *conn, PyObject *curs) PyObject *cb; Dprintf("psyco_wait"); - cb = wait_callback; - if (!cb) { - PyErr_SetString(OperationalError, "wait callback not available"); + if (!(cb = have_wait_callback())) { return NULL; } - Py_INCREF(cb); rv = PyObject_CallFunctionObjArgs(cb, conn, curs, NULL); Py_DECREF(cb); return rv; } +/* Replacement for PQexec using the user-provided wait function. + * + * The function should be called helding the connection lock, and + * the GIL because some Python code is expected to be called. + * + * If PGresult is NULL, there may have been either a libpq error + * or an exception raised by Python code: before raising an exception + * check if there is already one using `PyErr_Occurred()` */ +PGresult * +psyco_exec_green(connectionObject *conn, const char *command) +{ + PGconn *pgconn = conn->pgconn; + PGresult *result = NULL, *res; + PyObject *cb; + + if (!(cb = have_wait_callback())) { + goto end; + } + + /* Send the query asynchronously */ + Dprintf("psyco_exec_green: sending query async"); + if (0 == PQsendQuery(pgconn, command)) { + /* TODO: not handling the case of block during send */ + Dprintf("psyco_exec_green: PQsendQuery returned 0"); + goto clear; + } + + /* Loop reading data using the user-provided wait function */ + conn->async_status = ASYNC_READ; + PyObject *pyrv; + + pyrv = PyObject_CallFunctionObjArgs(cb, conn, NULL, NULL); + if (!pyrv) { + Dprintf("psyco_exec_green: error in callback"); + goto clear; + } + Py_DECREF(pyrv); + + /* Now we can read the data without fear of blocking. + * Read until PQgetResult gives a NULL */ + while (NULL != (res = PQgetResult(pgconn))) { + if (result) { + /* TODO too bad: we are discarding results from all the queries + * except the last. We could have populated `nextset()` with it + * but it would be an incompatible change (apps currently issue + * groups of queries expecting to receive the last result: they + * would start receiving the first instead). */ + PQclear(result); + } + result = res; + } + +clear: + Py_DECREF(cb); +end: + return result; +} + diff --git a/psycopg/green.h b/psycopg/green.h index 28999272..3c09b327 100644 --- a/psycopg/green.h +++ b/psycopg/green.h @@ -26,7 +26,8 @@ #ifndef PSYCOPG_GREEN_H #define PSYCOPG_GREEN_H 1 -struct PyObject; +#include +#include "psycopg/connection.h" #ifdef __cplusplus extern "C" { @@ -50,6 +51,7 @@ extern "C" { HIDDEN PyObject *psyco_set_wait_callback(PyObject *self, PyObject *obj); HIDDEN int psyco_green(void); HIDDEN PyObject *psyco_wait(PyObject *conn, PyObject *curs); +HIDDEN PGresult *psyco_exec_green(connectionObject *conn, const char *command); #ifdef __cplusplus }