diff --git a/NEWS b/NEWS index 4478e58a..2e5477e8 100644 --- a/NEWS +++ b/NEWS @@ -16,6 +16,9 @@ What's new in psycopg 2.4.6 - Dropped GIL release during string adaptation around a function call invoking a Python API function, which could cause interpreter crash. Thanks to Manu Cupcic for the report (ticket #110). + - Close a green connection if there is an error in the callback. + Maybe a harsh solution but it leaves the program responsive + (ticket #113). - 'register_hstore()', 'register_composite()', 'tpc_recover()' work with RealDictConnection and Cursor (ticket #114). - connect() raises an exception instead of swallowing keyword arguments diff --git a/psycopg/green.c b/psycopg/green.c index 2f35d9f6..3ffa810b 100644 --- a/psycopg/green.c +++ b/psycopg/green.c @@ -34,7 +34,7 @@ HIDDEN PyObject *wait_callback = NULL; static PyObject *have_wait_callback(void); -static void psyco_panic_cancel(connectionObject *conn); +static void green_panic(connectionObject *conn); /* Register a callback function to block waiting for data. * @@ -178,7 +178,7 @@ psyco_exec_green(connectionObject *conn, const char *command) conn->async_status = ASYNC_WRITE; if (0 != psyco_wait(conn)) { - psyco_panic_cancel(conn); + green_panic(conn); goto end; } @@ -194,52 +194,19 @@ end: /* There has been a communication error during query execution. It may have * happened e.g. for a network error or an error in the callback, and we - * cannot tell the two apart. The strategy here to avoid blocking (issue #113) - * is to try and cancel the query, waiting for the result in non-blocking way. - * If again we receive an error, we raise an error and close the connection. - * Discard the result of the currenly executed query, blocking. + * cannot tell the two apart. + * Trying to PQcancel or PQgetResult to put the connection back into a working + * state doesn't work nice (issue #113): the program blocks and the + * interpreter won't even respond to SIGINT. PQreset could work async, but the + * python program would have then a connection made but not configured where + * it is probably not designed to handled. So for the moment we do the kindest + * thing we can: we close the connection. A long-running program should + * already have a way to discard broken connections; a short-lived one would + * benefit of working ctrl-c. */ static void -psyco_panic_cancel(connectionObject *conn) +green_panic(connectionObject *conn) { - PGresult *res; - PyObject *etype, *evalue, *etb; - char errbuf[256]; - - /* we should have an exception set. */ - PyErr_Fetch(&etype, &evalue, &etb); - if (NULL == etype) { - Dprintf("panic_cancel: called without exception set"); - } - - /* Try sending the cancel signal */ - Dprintf("panic_cancel: sending cancel request"); - if (PQcancel(conn->cancel, errbuf, sizeof(errbuf)) == 0) { - Dprintf("panic_cancel: canceling failed: %s", errbuf); - /* raise a warning: we'll keep the previous error */ - PyErr_WarnEx(NULL, errbuf, 1); - goto exit; - } - - /* go back in the loop for another attempt at async processing */ - /* TODO: should we start on ASYNC_WRITE instead? */ - if (0 != psyco_wait(conn)) { - Dprintf("panic_cancel: error after cancel: closing the connection"); - PyErr_WarnEx(NULL, "async cancel failed: closing the connection", 1); - conn_close_locked(conn); - goto exit; - } - - /* we must clear the result or we get "another command is already in - * progress" */ - if (NULL != (res = pq_get_last_result(conn))) { - PQclear(res); - } - -exit: - /* restore the exception. If no exception was set at function begin, don't - * clobber one that may have been set here. */ - if (etype) { - PyErr_Restore(etype, evalue, etb); - } + Dprintf("green_panic: closing the connection"); + conn_close_locked(conn); } diff --git a/sandbox/test_green_error.py b/sandbox/test_green_error.py index 7d17cf74..7477382a 100644 --- a/sandbox/test_green_error.py +++ b/sandbox/test_green_error.py @@ -4,15 +4,20 @@ DSN = 'dbname=test' -# import eventlet.patcher -# eventlet.patcher.monkey_patch() +import eventlet.patcher +eventlet.patcher.monkey_patch() import os import signal +from time import sleep + import psycopg2 from psycopg2 import extensions from eventlet.hubs import trampoline + +# register a test wait callback that fails if SIGHUP is received + panic = [] def wait_cb(conn): @@ -34,23 +39,43 @@ def wait_cb(conn): extensions.set_wait_callback(wait_cb) + +# SIGHUP handler to inject a fail in the callback + def handler(signum, frame): panic.append(True) signal.signal(signal.SIGHUP, handler) -conn = psycopg2.connect(DSN) -curs = conn.cursor() -print "PID", os.getpid() -try: - curs.execute("select pg_sleep(1000)") -except BaseException, e: - print "got exception:", e.__class__.__name__, e -conn.rollback() -curs.execute("select 1") -print curs.fetchone() +# Simulate another green thread working + +def worker(): + while 1: + print "I'm working" + sleep(1) + +eventlet.spawn(worker) # You can unplug the network cable etc. here. # Kill -HUP will raise an exception in the callback. + +print "PID", os.getpid() +conn = psycopg2.connect(DSN) +curs = conn.cursor() +try: + for i in range(1000): + curs.execute("select %s, pg_sleep(1)", (i,)) + r = curs.fetchone() + print "selected", r + +except BaseException, e: + print "got exception:", e.__class__.__name__, e + +if conn.closed: + print "the connection is closed" +else: + conn.rollback() + curs.execute("select 1") + print curs.fetchone() diff --git a/tests/test_green.py b/tests/test_green.py index d641d183..e0cd57de 100755 --- a/tests/test_green.py +++ b/tests/test_green.py @@ -79,6 +79,9 @@ class GreenTests(unittest.TestCase): warnings.warn("sending a large query didn't trigger block on write.") def test_error_in_callback(self): + # behaviour changed after issue #113: if there is an error in the + # callback for the moment we don't have a way to reset the connection + # without blocking (ticket #113) so just close it. conn = self.conn curs = conn.cursor() curs.execute("select 1") # have a BEGIN @@ -88,11 +91,21 @@ class GreenTests(unittest.TestCase): psycopg2.extensions.set_wait_callback(lambda conn: 1//0) self.assertRaises(ZeroDivisionError, curs.execute, "select 2") + self.assert_(conn.closed) + + def test_dont_freak_out(self): + # if there is an error in a green query, don't freak out and close + # the connection + conn = self.conn + curs = conn.cursor() + self.assertRaises(psycopg2.ProgrammingError, + curs.execute, "select the unselectable") + # check that the connection is left in an usable state - psycopg2.extensions.set_wait_callback(psycopg2.extras.wait_select) + self.assert_(not conn.closed) conn.rollback() - curs.execute("select 2") - self.assertEqual(2, curs.fetchone()[0]) + curs.execute("select 1") + self.assertEqual(curs.fetchone()[0], 1) def test_suite():