Close the connection on error in callback

Unfortunately PQcancel blocks, so it's not better than PQgetResult.
It has been suggested to use PQreset in non-blocking way but this would give
the Python program the burden of handling a connection done but not configured
in an unexpected place.
This commit is contained in:
Daniele Varrazzo 2012-10-06 11:58:52 +01:00
parent 2611d62283
commit b61a2a34c4
4 changed files with 70 additions and 62 deletions

3
NEWS
View File

@ -16,6 +16,9 @@ What's new in psycopg 2.4.6
- Dropped GIL release during string adaptation around a function call - Dropped GIL release during string adaptation around a function call
invoking a Python API function, which could cause interpreter crash. invoking a Python API function, which could cause interpreter crash.
Thanks to Manu Cupcic for the report (ticket #110). Thanks to Manu Cupcic for the report (ticket #110).
- Close a green connection if there is an error in the callback.
Maybe a harsh solution but it leaves the program responsive
(ticket #113).
- 'register_hstore()', 'register_composite()', 'tpc_recover()' work with - 'register_hstore()', 'register_composite()', 'tpc_recover()' work with
RealDictConnection and Cursor (ticket #114). RealDictConnection and Cursor (ticket #114).
- connect() raises an exception instead of swallowing keyword arguments - connect() raises an exception instead of swallowing keyword arguments

View File

@ -34,7 +34,7 @@
HIDDEN PyObject *wait_callback = NULL; HIDDEN PyObject *wait_callback = NULL;
static PyObject *have_wait_callback(void); static PyObject *have_wait_callback(void);
static void psyco_panic_cancel(connectionObject *conn); static void green_panic(connectionObject *conn);
/* Register a callback function to block waiting for data. /* Register a callback function to block waiting for data.
* *
@ -178,7 +178,7 @@ psyco_exec_green(connectionObject *conn, const char *command)
conn->async_status = ASYNC_WRITE; conn->async_status = ASYNC_WRITE;
if (0 != psyco_wait(conn)) { if (0 != psyco_wait(conn)) {
psyco_panic_cancel(conn); green_panic(conn);
goto end; goto end;
} }
@ -194,52 +194,19 @@ end:
/* There has been a communication error during query execution. It may have /* There has been a communication error during query execution. It may have
* happened e.g. for a network error or an error in the callback, and we * happened e.g. for a network error or an error in the callback, and we
* cannot tell the two apart. The strategy here to avoid blocking (issue #113) * cannot tell the two apart.
* is to try and cancel the query, waiting for the result in non-blocking way. * Trying to PQcancel or PQgetResult to put the connection back into a working
* If again we receive an error, we raise an error and close the connection. * state doesn't work nice (issue #113): the program blocks and the
* Discard the result of the currenly executed query, blocking. * interpreter won't even respond to SIGINT. PQreset could work async, but the
* python program would have then a connection made but not configured where
* it is probably not designed to handled. So for the moment we do the kindest
* thing we can: we close the connection. A long-running program should
* already have a way to discard broken connections; a short-lived one would
* benefit of working ctrl-c.
*/ */
static void static void
psyco_panic_cancel(connectionObject *conn) green_panic(connectionObject *conn)
{ {
PGresult *res; Dprintf("green_panic: closing the connection");
PyObject *etype, *evalue, *etb; conn_close_locked(conn);
char errbuf[256];
/* we should have an exception set. */
PyErr_Fetch(&etype, &evalue, &etb);
if (NULL == etype) {
Dprintf("panic_cancel: called without exception set");
}
/* Try sending the cancel signal */
Dprintf("panic_cancel: sending cancel request");
if (PQcancel(conn->cancel, errbuf, sizeof(errbuf)) == 0) {
Dprintf("panic_cancel: canceling failed: %s", errbuf);
/* raise a warning: we'll keep the previous error */
PyErr_WarnEx(NULL, errbuf, 1);
goto exit;
}
/* go back in the loop for another attempt at async processing */
/* TODO: should we start on ASYNC_WRITE instead? */
if (0 != psyco_wait(conn)) {
Dprintf("panic_cancel: error after cancel: closing the connection");
PyErr_WarnEx(NULL, "async cancel failed: closing the connection", 1);
conn_close_locked(conn);
goto exit;
}
/* we must clear the result or we get "another command is already in
* progress" */
if (NULL != (res = pq_get_last_result(conn))) {
PQclear(res);
}
exit:
/* restore the exception. If no exception was set at function begin, don't
* clobber one that may have been set here. */
if (etype) {
PyErr_Restore(etype, evalue, etb);
}
} }

View File

@ -4,15 +4,20 @@
DSN = 'dbname=test' DSN = 'dbname=test'
# import eventlet.patcher import eventlet.patcher
# eventlet.patcher.monkey_patch() eventlet.patcher.monkey_patch()
import os import os
import signal import signal
from time import sleep
import psycopg2 import psycopg2
from psycopg2 import extensions from psycopg2 import extensions
from eventlet.hubs import trampoline from eventlet.hubs import trampoline
# register a test wait callback that fails if SIGHUP is received
panic = [] panic = []
def wait_cb(conn): def wait_cb(conn):
@ -34,23 +39,43 @@ def wait_cb(conn):
extensions.set_wait_callback(wait_cb) extensions.set_wait_callback(wait_cb)
# SIGHUP handler to inject a fail in the callback
def handler(signum, frame): def handler(signum, frame):
panic.append(True) panic.append(True)
signal.signal(signal.SIGHUP, handler) signal.signal(signal.SIGHUP, handler)
conn = psycopg2.connect(DSN)
curs = conn.cursor()
print "PID", os.getpid()
try:
curs.execute("select pg_sleep(1000)")
except BaseException, e:
print "got exception:", e.__class__.__name__, e
conn.rollback() # Simulate another green thread working
curs.execute("select 1")
print curs.fetchone() def worker():
while 1:
print "I'm working"
sleep(1)
eventlet.spawn(worker)
# You can unplug the network cable etc. here. # You can unplug the network cable etc. here.
# Kill -HUP will raise an exception in the callback. # Kill -HUP will raise an exception in the callback.
print "PID", os.getpid()
conn = psycopg2.connect(DSN)
curs = conn.cursor()
try:
for i in range(1000):
curs.execute("select %s, pg_sleep(1)", (i,))
r = curs.fetchone()
print "selected", r
except BaseException, e:
print "got exception:", e.__class__.__name__, e
if conn.closed:
print "the connection is closed"
else:
conn.rollback()
curs.execute("select 1")
print curs.fetchone()

View File

@ -79,6 +79,9 @@ class GreenTests(unittest.TestCase):
warnings.warn("sending a large query didn't trigger block on write.") warnings.warn("sending a large query didn't trigger block on write.")
def test_error_in_callback(self): def test_error_in_callback(self):
# behaviour changed after issue #113: if there is an error in the
# callback for the moment we don't have a way to reset the connection
# without blocking (ticket #113) so just close it.
conn = self.conn conn = self.conn
curs = conn.cursor() curs = conn.cursor()
curs.execute("select 1") # have a BEGIN curs.execute("select 1") # have a BEGIN
@ -88,11 +91,21 @@ class GreenTests(unittest.TestCase):
psycopg2.extensions.set_wait_callback(lambda conn: 1//0) psycopg2.extensions.set_wait_callback(lambda conn: 1//0)
self.assertRaises(ZeroDivisionError, curs.execute, "select 2") self.assertRaises(ZeroDivisionError, curs.execute, "select 2")
self.assert_(conn.closed)
def test_dont_freak_out(self):
# if there is an error in a green query, don't freak out and close
# the connection
conn = self.conn
curs = conn.cursor()
self.assertRaises(psycopg2.ProgrammingError,
curs.execute, "select the unselectable")
# check that the connection is left in an usable state # check that the connection is left in an usable state
psycopg2.extensions.set_wait_callback(psycopg2.extras.wait_select) self.assert_(not conn.closed)
conn.rollback() conn.rollback()
curs.execute("select 2") curs.execute("select 1")
self.assertEqual(2, curs.fetchone()[0]) self.assertEqual(curs.fetchone()[0], 1)
def test_suite(): def test_suite():