mirror of
https://github.com/psycopg/psycopg2.git
synced 2024-11-30 04:33:45 +03:00
Attempt to fix issue #113.
If the network is down, trying to read blocking will hang the process hard (ctrl-c not working). Send a cancel signal instead (as suggested in http://archives.postgresql.org/pgsql-hackers/2012-07/msg00903.php) and go back into a green polling: this should allow a further error (e.g. another ctrl-c) to break the loop. In this case we cannot assume anything about the state of the connection, so we close it.
This commit is contained in:
parent
2137db89d4
commit
53b1c70f3a
|
@ -34,7 +34,7 @@
|
||||||
HIDDEN PyObject *wait_callback = NULL;
|
HIDDEN PyObject *wait_callback = NULL;
|
||||||
|
|
||||||
static PyObject *have_wait_callback(void);
|
static PyObject *have_wait_callback(void);
|
||||||
static void psyco_clear_result_blocking(connectionObject *conn);
|
static void psyco_panic_cancel(connectionObject *conn);
|
||||||
|
|
||||||
/* Register a callback function to block waiting for data.
|
/* Register a callback function to block waiting for data.
|
||||||
*
|
*
|
||||||
|
@ -178,7 +178,7 @@ psyco_exec_green(connectionObject *conn, const char *command)
|
||||||
conn->async_status = ASYNC_WRITE;
|
conn->async_status = ASYNC_WRITE;
|
||||||
|
|
||||||
if (0 != psyco_wait(conn)) {
|
if (0 != psyco_wait(conn)) {
|
||||||
psyco_clear_result_blocking(conn);
|
psyco_panic_cancel(conn);
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -192,22 +192,47 @@ end:
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/* Discard the result of the currenly executed query, blocking.
|
/* There has been a communication error during query execution. It may have
|
||||||
*
|
* happened e.g. for a network error or an error in the callback, and we
|
||||||
* This function doesn't honour the wait callback: it can be used in case of
|
* cannot tell the two apart. The strategy here to avoid blocking (issue #113)
|
||||||
* emergency if the callback fails in order to put the connection back into a
|
* is to try and cancel the query, waiting for the result in non-blocking way.
|
||||||
* consistent state.
|
* If again we receive an error, we raise an error and close the connection.
|
||||||
*
|
* Discard the result of the currenly executed query, blocking.
|
||||||
* If any command was issued before clearing the result, libpq would fail with
|
|
||||||
* the error "another command is already in progress".
|
|
||||||
*/
|
*/
|
||||||
static void
|
static void
|
||||||
psyco_clear_result_blocking(connectionObject *conn)
|
psyco_panic_cancel(connectionObject *conn)
|
||||||
{
|
{
|
||||||
PGresult *res;
|
PyObject *etype, *evalue, *etb;
|
||||||
|
char errbuf[256];
|
||||||
|
|
||||||
Dprintf("psyco_clear_result_blocking");
|
/* we should have an exception set. */
|
||||||
while (NULL != (res = PQgetResult(conn->pgconn))) {
|
PyErr_Fetch(&etype, &evalue, &etb);
|
||||||
PQclear(res);
|
if (NULL == etype) {
|
||||||
|
Dprintf("panic_cancel: called without exception set");
|
||||||
|
}
|
||||||
|
|
||||||
|
/* Try sending the cancel signal */
|
||||||
|
Dprintf("panic_cancel: sending cancel request");
|
||||||
|
if (PQcancel(conn->cancel, errbuf, sizeof(errbuf)) == 0) {
|
||||||
|
Dprintf("panic_cancel: canceling failed: %s", errbuf);
|
||||||
|
/* raise a warning: we'll keep the previous error */
|
||||||
|
PyErr_WarnEx(NULL, errbuf, 1);
|
||||||
|
goto exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* go back in the loop for another attempt at async processing */
|
||||||
|
/* TODO: should we start on ASYNC_WRITE instead? */
|
||||||
|
if (0 != psyco_wait(conn)) {
|
||||||
|
Dprintf("panic_cancel: error after cancel: closing the connection");
|
||||||
|
PyErr_WarnEx(NULL, "async cancel failed: closing the connection", 1);
|
||||||
|
conn_close_locked(conn);
|
||||||
|
goto exit;
|
||||||
|
}
|
||||||
|
|
||||||
|
exit:
|
||||||
|
/* restore the exception. If no exception was set at function begin, don't
|
||||||
|
* clobber one that may have been set here. */
|
||||||
|
if (etype) {
|
||||||
|
PyErr_Restore(etype, evalue, etb);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
48
sandbox/test_green_error.py
Normal file
48
sandbox/test_green_error.py
Normal file
|
@ -0,0 +1,48 @@
|
||||||
|
#!/usr/bin/env python
|
||||||
|
"""Test for issue #113: test with error during green processing
|
||||||
|
"""
|
||||||
|
|
||||||
|
DSN = 'dbname=test'
|
||||||
|
|
||||||
|
# import eventlet.patcher
|
||||||
|
# eventlet.patcher.monkey_patch()
|
||||||
|
|
||||||
|
import os
|
||||||
|
import signal
|
||||||
|
import psycopg2
|
||||||
|
from psycopg2 import extensions
|
||||||
|
from eventlet.hubs import trampoline
|
||||||
|
|
||||||
|
panic = []
|
||||||
|
|
||||||
|
def wait_cb(conn):
|
||||||
|
"""A wait callback useful to allow eventlet to work with Psycopg."""
|
||||||
|
while 1:
|
||||||
|
if panic:
|
||||||
|
raise Exception('whatever')
|
||||||
|
|
||||||
|
state = conn.poll()
|
||||||
|
if state == extensions.POLL_OK:
|
||||||
|
break
|
||||||
|
elif state == extensions.POLL_READ:
|
||||||
|
trampoline(conn.fileno(), read=True)
|
||||||
|
elif state == extensions.POLL_WRITE:
|
||||||
|
trampoline(conn.fileno(), write=True)
|
||||||
|
else:
|
||||||
|
raise psycopg2.OperationalError(
|
||||||
|
"Bad result from poll: %r" % state)
|
||||||
|
|
||||||
|
extensions.set_wait_callback(wait_cb)
|
||||||
|
|
||||||
|
def handler(signum, frame):
|
||||||
|
panic.append(True)
|
||||||
|
|
||||||
|
signal.signal(signal.SIGHUP, handler)
|
||||||
|
|
||||||
|
conn = psycopg2.connect(DSN)
|
||||||
|
curs = conn.cursor()
|
||||||
|
print "PID", os.getpid()
|
||||||
|
curs.execute("select pg_sleep(1000)")
|
||||||
|
|
||||||
|
# You can unplug the network cable etc. here.
|
||||||
|
# Kill -HUP will raise an exception in the callback.
|
Loading…
Reference in New Issue
Block a user