diff --git a/psycopg/green.c b/psycopg/green.c index 65578f51..8e5ec1be 100644 --- a/psycopg/green.c +++ b/psycopg/green.c @@ -34,7 +34,7 @@ HIDDEN PyObject *wait_callback = NULL; static PyObject *have_wait_callback(void); -static void psyco_clear_result_blocking(connectionObject *conn); +static void psyco_panic_cancel(connectionObject *conn); /* Register a callback function to block waiting for data. * @@ -178,7 +178,7 @@ psyco_exec_green(connectionObject *conn, const char *command) conn->async_status = ASYNC_WRITE; if (0 != psyco_wait(conn)) { - psyco_clear_result_blocking(conn); + psyco_panic_cancel(conn); goto end; } @@ -192,22 +192,47 @@ end: } -/* Discard the result of the currenly executed query, blocking. - * - * This function doesn't honour the wait callback: it can be used in case of - * emergency if the callback fails in order to put the connection back into a - * consistent state. - * - * If any command was issued before clearing the result, libpq would fail with - * the error "another command is already in progress". +/* There has been a communication error during query execution. It may have + * happened e.g. for a network error or an error in the callback, and we + * cannot tell the two apart. The strategy here to avoid blocking (issue #113) + * is to try and cancel the query, waiting for the result in non-blocking way. + * If again we receive an error, we raise an error and close the connection. + * Discard the result of the currenly executed query, blocking. */ static void -psyco_clear_result_blocking(connectionObject *conn) +psyco_panic_cancel(connectionObject *conn) { - PGresult *res; + PyObject *etype, *evalue, *etb; + char errbuf[256]; - Dprintf("psyco_clear_result_blocking"); - while (NULL != (res = PQgetResult(conn->pgconn))) { - PQclear(res); + /* we should have an exception set. */ + PyErr_Fetch(&etype, &evalue, &etb); + if (NULL == etype) { + Dprintf("panic_cancel: called without exception set"); + } + + /* Try sending the cancel signal */ + Dprintf("panic_cancel: sending cancel request"); + if (PQcancel(conn->cancel, errbuf, sizeof(errbuf)) == 0) { + Dprintf("panic_cancel: canceling failed: %s", errbuf); + /* raise a warning: we'll keep the previous error */ + PyErr_WarnEx(NULL, errbuf, 1); + goto exit; + } + + /* go back in the loop for another attempt at async processing */ + /* TODO: should we start on ASYNC_WRITE instead? */ + if (0 != psyco_wait(conn)) { + Dprintf("panic_cancel: error after cancel: closing the connection"); + PyErr_WarnEx(NULL, "async cancel failed: closing the connection", 1); + conn_close_locked(conn); + goto exit; + } + +exit: + /* restore the exception. If no exception was set at function begin, don't + * clobber one that may have been set here. */ + if (etype) { + PyErr_Restore(etype, evalue, etb); } } diff --git a/sandbox/test_green_error.py b/sandbox/test_green_error.py new file mode 100644 index 00000000..23a247b5 --- /dev/null +++ b/sandbox/test_green_error.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python +"""Test for issue #113: test with error during green processing +""" + +DSN = 'dbname=test' + +# import eventlet.patcher +# eventlet.patcher.monkey_patch() + +import os +import signal +import psycopg2 +from psycopg2 import extensions +from eventlet.hubs import trampoline + +panic = [] + +def wait_cb(conn): + """A wait callback useful to allow eventlet to work with Psycopg.""" + while 1: + if panic: + raise Exception('whatever') + + state = conn.poll() + if state == extensions.POLL_OK: + break + elif state == extensions.POLL_READ: + trampoline(conn.fileno(), read=True) + elif state == extensions.POLL_WRITE: + trampoline(conn.fileno(), write=True) + else: + raise psycopg2.OperationalError( + "Bad result from poll: %r" % state) + +extensions.set_wait_callback(wait_cb) + +def handler(signum, frame): + panic.append(True) + +signal.signal(signal.SIGHUP, handler) + +conn = psycopg2.connect(DSN) +curs = conn.cursor() +print "PID", os.getpid() +curs.execute("select pg_sleep(1000)") + +# You can unplug the network cable etc. here. +# Kill -HUP will raise an exception in the callback.