Added 'psyco_exec_green()' to replace 'PQexec' using the user-provided block function.

This commit is contained in:
Daniele Varrazzo 2010-04-03 22:29:08 +01:00
parent 6dd6bee217
commit 442b3b2f6c
2 changed files with 85 additions and 5 deletions

View File

@ -28,9 +28,12 @@
#include "psycopg/python.h" #include "psycopg/python.h"
#include "psycopg/psycopg.h" #include "psycopg/psycopg.h"
#include "psycopg/green.h" #include "psycopg/green.h"
#include "psycopg/connection.h"
HIDDEN PyObject *wait_callback = NULL; HIDDEN PyObject *wait_callback = NULL;
PyObject *have_wait_callback(void);
/* Register a callback function to block waiting for data. /* Register a callback function to block waiting for data.
* *
* The function is exported by the _psycopg module. * The function is exported by the _psycopg module.
@ -64,6 +67,26 @@ psyco_green()
#endif #endif
} }
/* Return the wait callback if available.
*
* If not available, set a Python exception and return.
*
* The function returns a new reference: decref after use.
*/
PyObject *
have_wait_callback()
{
PyObject *cb;
cb = wait_callback;
if (!cb) {
PyErr_SetString(OperationalError, "wait callback not available");
return NULL;
}
Py_INCREF(cb);
return cb;
}
/* Block waiting for data available in an async connection. /* Block waiting for data available in an async connection.
* *
* This function assumes `wait_callback` to be available: * This function assumes `wait_callback` to be available:
@ -79,16 +102,71 @@ psyco_wait(PyObject *conn, PyObject *curs)
PyObject *cb; PyObject *cb;
Dprintf("psyco_wait"); Dprintf("psyco_wait");
cb = wait_callback; if (!(cb = have_wait_callback())) {
if (!cb) {
PyErr_SetString(OperationalError, "wait callback not available");
return NULL; return NULL;
} }
Py_INCREF(cb);
rv = PyObject_CallFunctionObjArgs(cb, conn, curs, NULL); rv = PyObject_CallFunctionObjArgs(cb, conn, curs, NULL);
Py_DECREF(cb); Py_DECREF(cb);
return rv; return rv;
} }
/* Replacement for PQexec using the user-provided wait function.
*
* The function should be called helding the connection lock, and
* the GIL because some Python code is expected to be called.
*
* If PGresult is NULL, there may have been either a libpq error
* or an exception raised by Python code: before raising an exception
* check if there is already one using `PyErr_Occurred()` */
PGresult *
psyco_exec_green(connectionObject *conn, const char *command)
{
PGconn *pgconn = conn->pgconn;
PGresult *result = NULL, *res;
PyObject *cb;
if (!(cb = have_wait_callback())) {
goto end;
}
/* Send the query asynchronously */
Dprintf("psyco_exec_green: sending query async");
if (0 == PQsendQuery(pgconn, command)) {
/* TODO: not handling the case of block during send */
Dprintf("psyco_exec_green: PQsendQuery returned 0");
goto clear;
}
/* Loop reading data using the user-provided wait function */
conn->async_status = ASYNC_READ;
PyObject *pyrv;
pyrv = PyObject_CallFunctionObjArgs(cb, conn, NULL, NULL);
if (!pyrv) {
Dprintf("psyco_exec_green: error in callback");
goto clear;
}
Py_DECREF(pyrv);
/* Now we can read the data without fear of blocking.
* Read until PQgetResult gives a NULL */
while (NULL != (res = PQgetResult(pgconn))) {
if (result) {
/* TODO too bad: we are discarding results from all the queries
* except the last. We could have populated `nextset()` with it
* but it would be an incompatible change (apps currently issue
* groups of queries expecting to receive the last result: they
* would start receiving the first instead). */
PQclear(result);
}
result = res;
}
clear:
Py_DECREF(cb);
end:
return result;
}

View File

@ -26,7 +26,8 @@
#ifndef PSYCOPG_GREEN_H #ifndef PSYCOPG_GREEN_H
#define PSYCOPG_GREEN_H 1 #define PSYCOPG_GREEN_H 1
struct PyObject; #include <libpq-fe.h>
#include "psycopg/connection.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -50,6 +51,7 @@ extern "C" {
HIDDEN PyObject *psyco_set_wait_callback(PyObject *self, PyObject *obj); HIDDEN PyObject *psyco_set_wait_callback(PyObject *self, PyObject *obj);
HIDDEN int psyco_green(void); HIDDEN int psyco_green(void);
HIDDEN PyObject *psyco_wait(PyObject *conn, PyObject *curs); HIDDEN PyObject *psyco_wait(PyObject *conn, PyObject *curs);
HIDDEN PGresult *psyco_exec_green(connectionObject *conn, const char *command);
#ifdef __cplusplus #ifdef __cplusplus
} }