Attempt to fix issue #113.

If the network is down, trying to read blocking will hang the process hard
(ctrl-c not working). Send a cancel signal instead (as suggested in
http://archives.postgresql.org/pgsql-hackers/2012-07/msg00903.php) and go
back into a green polling: this should allow a further error (e.g. another
ctrl-c) to break the loop. In this case we cannot assume anything about
the state of the connection, so we close it.
This commit is contained in:
Daniele Varrazzo 2012-10-06 01:10:41 +01:00
parent 6d1b3b21e6
commit fa032f09fb
2 changed files with 88 additions and 15 deletions

View File

@ -34,7 +34,7 @@
HIDDEN PyObject *wait_callback = NULL;
static PyObject *have_wait_callback(void);
static void psyco_clear_result_blocking(connectionObject *conn);
static void psyco_panic_cancel(connectionObject *conn);
/* Register a callback function to block waiting for data.
*
@ -178,7 +178,7 @@ psyco_exec_green(connectionObject *conn, const char *command)
conn->async_status = ASYNC_WRITE;
if (0 != psyco_wait(conn)) {
psyco_clear_result_blocking(conn);
psyco_panic_cancel(conn);
goto end;
}
@ -192,22 +192,47 @@ end:
}
/* Discard the result of the currenly executed query, blocking.
*
* This function doesn't honour the wait callback: it can be used in case of
* emergency if the callback fails in order to put the connection back into a
* consistent state.
*
* If any command was issued before clearing the result, libpq would fail with
* the error "another command is already in progress".
/* There has been a communication error during query execution. It may have
* happened e.g. for a network error or an error in the callback, and we
* cannot tell the two apart. The strategy here to avoid blocking (issue #113)
* is to try and cancel the query, waiting for the result in non-blocking way.
* If again we receive an error, we raise an error and close the connection.
* Discard the result of the currenly executed query, blocking.
*/
static void
psyco_clear_result_blocking(connectionObject *conn)
psyco_panic_cancel(connectionObject *conn)
{
PGresult *res;
PyObject *etype, *evalue, *etb;
char errbuf[256];
Dprintf("psyco_clear_result_blocking");
while (NULL != (res = PQgetResult(conn->pgconn))) {
PQclear(res);
/* we should have an exception set. */
PyErr_Fetch(&etype, &evalue, &etb);
if (NULL == etype) {
Dprintf("panic_cancel: called without exception set");
}
/* Try sending the cancel signal */
Dprintf("panic_cancel: sending cancel request");
if (PQcancel(conn->cancel, errbuf, sizeof(errbuf)) == 0) {
Dprintf("panic_cancel: canceling failed: %s", errbuf);
/* raise a warning: we'll keep the previous error */
PyErr_WarnEx(NULL, errbuf, 1);
goto exit;
}
/* go back in the loop for another attempt at async processing */
/* TODO: should we start on ASYNC_WRITE instead? */
if (0 != psyco_wait(conn)) {
Dprintf("panic_cancel: error after cancel: closing the connection");
PyErr_WarnEx(NULL, "async cancel failed: closing the connection", 1);
conn_close_locked(conn);
goto exit;
}
exit:
/* restore the exception. If no exception was set at function begin, don't
* clobber one that may have been set here. */
if (etype) {
PyErr_Restore(etype, evalue, etb);
}
}

View File

@ -0,0 +1,48 @@
#!/usr/bin/env python
"""Test for issue #113: test with error during green processing
"""
DSN = 'dbname=test'
# import eventlet.patcher
# eventlet.patcher.monkey_patch()
import os
import signal
import psycopg2
from psycopg2 import extensions
from eventlet.hubs import trampoline
panic = []
def wait_cb(conn):
"""A wait callback useful to allow eventlet to work with Psycopg."""
while 1:
if panic:
raise Exception('whatever')
state = conn.poll()
if state == extensions.POLL_OK:
break
elif state == extensions.POLL_READ:
trampoline(conn.fileno(), read=True)
elif state == extensions.POLL_WRITE:
trampoline(conn.fileno(), write=True)
else:
raise psycopg2.OperationalError(
"Bad result from poll: %r" % state)
extensions.set_wait_callback(wait_cb)
def handler(signum, frame):
panic.append(True)
signal.signal(signal.SIGHUP, handler)
conn = psycopg2.connect(DSN)
curs = conn.cursor()
print "PID", os.getpid()
curs.execute("select pg_sleep(1000)")
# You can unplug the network cable etc. here.
# Kill -HUP will raise an exception in the callback.