diff --git a/NEWS b/NEWS index af5988b7..b5ec2b5e 100644 --- a/NEWS +++ b/NEWS @@ -7,6 +7,9 @@ What's new in psycopg 2.4.6 - Dropped GIL release during string adaptation around a function call invoking a Python API function, which could cause interpreter crash. Thanks to Manu Cupcic for the report (ticket #110). + - Close a green connection if there is an error in the callback. + Maybe a harsh solution but it leaves the program responsive + (ticket #113). - 'register_hstore()', 'register_composite()', 'tpc_recover()' work with RealDictConnection and Cursor (ticket #114). - connect() raises an exception instead of swallowing keyword arguments diff --git a/psycopg/connection.h b/psycopg/connection.h index 9647ffd2..01cc6a44 100644 --- a/psycopg/connection.h +++ b/psycopg/connection.h @@ -141,6 +141,7 @@ HIDDEN void conn_notifies_process(connectionObject *self); RAISES_NEG HIDDEN int conn_setup(connectionObject *self, PGconn *pgconn); HIDDEN int conn_connect(connectionObject *self, long int async); HIDDEN void conn_close(connectionObject *self); +HIDDEN void conn_close_locked(connectionObject *self); RAISES_NEG HIDDEN int conn_commit(connectionObject *self); RAISES_NEG HIDDEN int conn_rollback(connectionObject *self); RAISES_NEG HIDDEN int conn_set_session(connectionObject *self, const char *isolevel, diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c index 8c8fed47..a93c233a 100644 --- a/psycopg/connection_int.c +++ b/psycopg/connection_int.c @@ -922,12 +922,24 @@ conn_close(connectionObject *self) return; } - /* sets this connection as closed even for other threads; also note that - we need to check the value of pgconn, because we get called even when - the connection fails! */ + /* sets this connection as closed even for other threads; */ Py_BEGIN_ALLOW_THREADS; pthread_mutex_lock(&self->lock); + conn_close_locked(self); + + pthread_mutex_unlock(&self->lock); + Py_END_ALLOW_THREADS; +} + +/* conn_close_locked - shut down the connection with the lock already taken */ + +void conn_close_locked(connectionObject *self) +{ + if (self->closed) { + return; + } + /* We used to call pq_abort_locked here, but the idea of issuing a * rollback on close/GC has been considered inappropriate. * @@ -937,9 +949,10 @@ conn_close(connectionObject *self) * transaction though: to avoid these problems the transaction should be * closed only in status CONN_STATUS_READY. */ - self->closed = 1; + /* we need to check the value of pgconn, because we get called even when + * the connection fails! */ if (self->pgconn) { PQfinish(self->pgconn); self->pgconn = NULL; @@ -947,9 +960,6 @@ conn_close(connectionObject *self) PQfreeCancel(self->cancel); self->cancel = NULL; } - - pthread_mutex_unlock(&self->lock); - Py_END_ALLOW_THREADS; } /* conn_commit - commit on a connection */ diff --git a/psycopg/green.c b/psycopg/green.c index 65578f51..3ffa810b 100644 --- a/psycopg/green.c +++ b/psycopg/green.c @@ -34,7 +34,7 @@ HIDDEN PyObject *wait_callback = NULL; static PyObject *have_wait_callback(void); -static void psyco_clear_result_blocking(connectionObject *conn); +static void green_panic(connectionObject *conn); /* Register a callback function to block waiting for data. * @@ -178,7 +178,7 @@ psyco_exec_green(connectionObject *conn, const char *command) conn->async_status = ASYNC_WRITE; if (0 != psyco_wait(conn)) { - psyco_clear_result_blocking(conn); + green_panic(conn); goto end; } @@ -192,22 +192,21 @@ end: } -/* Discard the result of the currenly executed query, blocking. - * - * This function doesn't honour the wait callback: it can be used in case of - * emergency if the callback fails in order to put the connection back into a - * consistent state. - * - * If any command was issued before clearing the result, libpq would fail with - * the error "another command is already in progress". +/* There has been a communication error during query execution. It may have + * happened e.g. for a network error or an error in the callback, and we + * cannot tell the two apart. + * Trying to PQcancel or PQgetResult to put the connection back into a working + * state doesn't work nice (issue #113): the program blocks and the + * interpreter won't even respond to SIGINT. PQreset could work async, but the + * python program would have then a connection made but not configured where + * it is probably not designed to handled. So for the moment we do the kindest + * thing we can: we close the connection. A long-running program should + * already have a way to discard broken connections; a short-lived one would + * benefit of working ctrl-c. */ static void -psyco_clear_result_blocking(connectionObject *conn) +green_panic(connectionObject *conn) { - PGresult *res; - - Dprintf("psyco_clear_result_blocking"); - while (NULL != (res = PQgetResult(conn->pgconn))) { - PQclear(res); - } + Dprintf("green_panic: closing the connection"); + conn_close_locked(conn); } diff --git a/psycopg/python.h b/psycopg/python.h index 6d87fa5b..f6d6be0a 100644 --- a/psycopg/python.h +++ b/psycopg/python.h @@ -35,6 +35,11 @@ # error "psycopg requires Python >= 2.4" #endif +#if PY_VERSION_HEX < 0x02050000 +/* Function missing in Py 2.4 */ +#define PyErr_WarnEx(cat,msg,lvl) PyErr_Warn(cat,msg) +#endif + #if PY_VERSION_HEX < 0x02050000 && !defined(PY_SSIZE_T_MIN) typedef int Py_ssize_t; #define PY_SSIZE_T_MIN INT_MIN diff --git a/sandbox/test_green_error.py b/sandbox/test_green_error.py new file mode 100644 index 00000000..7477382a --- /dev/null +++ b/sandbox/test_green_error.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python +"""Test for issue #113: test with error during green processing +""" + +DSN = 'dbname=test' + +import eventlet.patcher +eventlet.patcher.monkey_patch() + +import os +import signal +from time import sleep + +import psycopg2 +from psycopg2 import extensions +from eventlet.hubs import trampoline + + +# register a test wait callback that fails if SIGHUP is received + +panic = [] + +def wait_cb(conn): + """A wait callback useful to allow eventlet to work with Psycopg.""" + while 1: + if panic: + raise Exception('whatever') + + state = conn.poll() + if state == extensions.POLL_OK: + break + elif state == extensions.POLL_READ: + trampoline(conn.fileno(), read=True) + elif state == extensions.POLL_WRITE: + trampoline(conn.fileno(), write=True) + else: + raise psycopg2.OperationalError( + "Bad result from poll: %r" % state) + +extensions.set_wait_callback(wait_cb) + + +# SIGHUP handler to inject a fail in the callback + +def handler(signum, frame): + panic.append(True) + +signal.signal(signal.SIGHUP, handler) + + +# Simulate another green thread working + +def worker(): + while 1: + print "I'm working" + sleep(1) + +eventlet.spawn(worker) + + +# You can unplug the network cable etc. here. +# Kill -HUP will raise an exception in the callback. + +print "PID", os.getpid() +conn = psycopg2.connect(DSN) +curs = conn.cursor() +try: + for i in range(1000): + curs.execute("select %s, pg_sleep(1)", (i,)) + r = curs.fetchone() + print "selected", r + +except BaseException, e: + print "got exception:", e.__class__.__name__, e + +if conn.closed: + print "the connection is closed" +else: + conn.rollback() + curs.execute("select 1") + print curs.fetchone() diff --git a/tests/test_green.py b/tests/test_green.py index d641d183..e0cd57de 100755 --- a/tests/test_green.py +++ b/tests/test_green.py @@ -79,6 +79,9 @@ class GreenTests(unittest.TestCase): warnings.warn("sending a large query didn't trigger block on write.") def test_error_in_callback(self): + # behaviour changed after issue #113: if there is an error in the + # callback for the moment we don't have a way to reset the connection + # without blocking (ticket #113) so just close it. conn = self.conn curs = conn.cursor() curs.execute("select 1") # have a BEGIN @@ -88,11 +91,21 @@ class GreenTests(unittest.TestCase): psycopg2.extensions.set_wait_callback(lambda conn: 1//0) self.assertRaises(ZeroDivisionError, curs.execute, "select 2") + self.assert_(conn.closed) + + def test_dont_freak_out(self): + # if there is an error in a green query, don't freak out and close + # the connection + conn = self.conn + curs = conn.cursor() + self.assertRaises(psycopg2.ProgrammingError, + curs.execute, "select the unselectable") + # check that the connection is left in an usable state - psycopg2.extensions.set_wait_callback(psycopg2.extras.wait_select) + self.assert_(not conn.closed) conn.rollback() - curs.execute("select 2") - self.assertEqual(2, curs.fetchone()[0]) + curs.execute("select 1") + self.assertEqual(curs.fetchone()[0], 1) def test_suite():