From 6d1b3b21e6e7828811121fb70c56a5338aa56026 Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Sat, 6 Oct 2012 01:03:12 +0100 Subject: [PATCH 1/5] Added function conn_close_locked() --- psycopg/connection.h | 1 + psycopg/connection_int.c | 24 +++++++++++++++++------- 2 files changed, 18 insertions(+), 7 deletions(-) diff --git a/psycopg/connection.h b/psycopg/connection.h index 9647ffd2..01cc6a44 100644 --- a/psycopg/connection.h +++ b/psycopg/connection.h @@ -141,6 +141,7 @@ HIDDEN void conn_notifies_process(connectionObject *self); RAISES_NEG HIDDEN int conn_setup(connectionObject *self, PGconn *pgconn); HIDDEN int conn_connect(connectionObject *self, long int async); HIDDEN void conn_close(connectionObject *self); +HIDDEN void conn_close_locked(connectionObject *self); RAISES_NEG HIDDEN int conn_commit(connectionObject *self); RAISES_NEG HIDDEN int conn_rollback(connectionObject *self); RAISES_NEG HIDDEN int conn_set_session(connectionObject *self, const char *isolevel, diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c index 8c8fed47..a93c233a 100644 --- a/psycopg/connection_int.c +++ b/psycopg/connection_int.c @@ -922,12 +922,24 @@ conn_close(connectionObject *self) return; } - /* sets this connection as closed even for other threads; also note that - we need to check the value of pgconn, because we get called even when - the connection fails! */ + /* sets this connection as closed even for other threads; */ Py_BEGIN_ALLOW_THREADS; pthread_mutex_lock(&self->lock); + conn_close_locked(self); + + pthread_mutex_unlock(&self->lock); + Py_END_ALLOW_THREADS; +} + +/* conn_close_locked - shut down the connection with the lock already taken */ + +void conn_close_locked(connectionObject *self) +{ + if (self->closed) { + return; + } + /* We used to call pq_abort_locked here, but the idea of issuing a * rollback on close/GC has been considered inappropriate. * @@ -937,9 +949,10 @@ conn_close(connectionObject *self) * transaction though: to avoid these problems the transaction should be * closed only in status CONN_STATUS_READY. */ - self->closed = 1; + /* we need to check the value of pgconn, because we get called even when + * the connection fails! */ if (self->pgconn) { PQfinish(self->pgconn); self->pgconn = NULL; @@ -947,9 +960,6 @@ conn_close(connectionObject *self) PQfreeCancel(self->cancel); self->cancel = NULL; } - - pthread_mutex_unlock(&self->lock); - Py_END_ALLOW_THREADS; } /* conn_commit - commit on a connection */ From fa032f09fb5bdbbb0f804b346978525939261c97 Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Sat, 6 Oct 2012 01:10:41 +0100 Subject: [PATCH 2/5] Attempt to fix issue #113. If the network is down, trying to read blocking will hang the process hard (ctrl-c not working). Send a cancel signal instead (as suggested in http://archives.postgresql.org/pgsql-hackers/2012-07/msg00903.php) and go back into a green polling: this should allow a further error (e.g. another ctrl-c) to break the loop. In this case we cannot assume anything about the state of the connection, so we close it. --- psycopg/green.c | 55 +++++++++++++++++++++++++++---------- sandbox/test_green_error.py | 48 ++++++++++++++++++++++++++++++++ 2 files changed, 88 insertions(+), 15 deletions(-) create mode 100644 sandbox/test_green_error.py diff --git a/psycopg/green.c b/psycopg/green.c index 65578f51..8e5ec1be 100644 --- a/psycopg/green.c +++ b/psycopg/green.c @@ -34,7 +34,7 @@ HIDDEN PyObject *wait_callback = NULL; static PyObject *have_wait_callback(void); -static void psyco_clear_result_blocking(connectionObject *conn); +static void psyco_panic_cancel(connectionObject *conn); /* Register a callback function to block waiting for data. * @@ -178,7 +178,7 @@ psyco_exec_green(connectionObject *conn, const char *command) conn->async_status = ASYNC_WRITE; if (0 != psyco_wait(conn)) { - psyco_clear_result_blocking(conn); + psyco_panic_cancel(conn); goto end; } @@ -192,22 +192,47 @@ end: } -/* Discard the result of the currenly executed query, blocking. - * - * This function doesn't honour the wait callback: it can be used in case of - * emergency if the callback fails in order to put the connection back into a - * consistent state. - * - * If any command was issued before clearing the result, libpq would fail with - * the error "another command is already in progress". +/* There has been a communication error during query execution. It may have + * happened e.g. for a network error or an error in the callback, and we + * cannot tell the two apart. The strategy here to avoid blocking (issue #113) + * is to try and cancel the query, waiting for the result in non-blocking way. + * If again we receive an error, we raise an error and close the connection. + * Discard the result of the currenly executed query, blocking. */ static void -psyco_clear_result_blocking(connectionObject *conn) +psyco_panic_cancel(connectionObject *conn) { - PGresult *res; + PyObject *etype, *evalue, *etb; + char errbuf[256]; - Dprintf("psyco_clear_result_blocking"); - while (NULL != (res = PQgetResult(conn->pgconn))) { - PQclear(res); + /* we should have an exception set. */ + PyErr_Fetch(&etype, &evalue, &etb); + if (NULL == etype) { + Dprintf("panic_cancel: called without exception set"); + } + + /* Try sending the cancel signal */ + Dprintf("panic_cancel: sending cancel request"); + if (PQcancel(conn->cancel, errbuf, sizeof(errbuf)) == 0) { + Dprintf("panic_cancel: canceling failed: %s", errbuf); + /* raise a warning: we'll keep the previous error */ + PyErr_WarnEx(NULL, errbuf, 1); + goto exit; + } + + /* go back in the loop for another attempt at async processing */ + /* TODO: should we start on ASYNC_WRITE instead? */ + if (0 != psyco_wait(conn)) { + Dprintf("panic_cancel: error after cancel: closing the connection"); + PyErr_WarnEx(NULL, "async cancel failed: closing the connection", 1); + conn_close_locked(conn); + goto exit; + } + +exit: + /* restore the exception. If no exception was set at function begin, don't + * clobber one that may have been set here. */ + if (etype) { + PyErr_Restore(etype, evalue, etb); } } diff --git a/sandbox/test_green_error.py b/sandbox/test_green_error.py new file mode 100644 index 00000000..23a247b5 --- /dev/null +++ b/sandbox/test_green_error.py @@ -0,0 +1,48 @@ +#!/usr/bin/env python +"""Test for issue #113: test with error during green processing +""" + +DSN = 'dbname=test' + +# import eventlet.patcher +# eventlet.patcher.monkey_patch() + +import os +import signal +import psycopg2 +from psycopg2 import extensions +from eventlet.hubs import trampoline + +panic = [] + +def wait_cb(conn): + """A wait callback useful to allow eventlet to work with Psycopg.""" + while 1: + if panic: + raise Exception('whatever') + + state = conn.poll() + if state == extensions.POLL_OK: + break + elif state == extensions.POLL_READ: + trampoline(conn.fileno(), read=True) + elif state == extensions.POLL_WRITE: + trampoline(conn.fileno(), write=True) + else: + raise psycopg2.OperationalError( + "Bad result from poll: %r" % state) + +extensions.set_wait_callback(wait_cb) + +def handler(signum, frame): + panic.append(True) + +signal.signal(signal.SIGHUP, handler) + +conn = psycopg2.connect(DSN) +curs = conn.cursor() +print "PID", os.getpid() +curs.execute("select pg_sleep(1000)") + +# You can unplug the network cable etc. here. +# Kill -HUP will raise an exception in the callback. From 6b6aded90b622a8119f3ad30aa164cd39f6e0a0f Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Sat, 6 Oct 2012 01:16:57 +0100 Subject: [PATCH 3/5] Added compatibility PyErr_WarnEx macro for Python 2.4 --- psycopg/python.h | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/psycopg/python.h b/psycopg/python.h index 6d87fa5b..f6d6be0a 100644 --- a/psycopg/python.h +++ b/psycopg/python.h @@ -35,6 +35,11 @@ # error "psycopg requires Python >= 2.4" #endif +#if PY_VERSION_HEX < 0x02050000 +/* Function missing in Py 2.4 */ +#define PyErr_WarnEx(cat,msg,lvl) PyErr_Warn(cat,msg) +#endif + #if PY_VERSION_HEX < 0x02050000 && !defined(PY_SSIZE_T_MIN) typedef int Py_ssize_t; #define PY_SSIZE_T_MIN INT_MIN From 7632e1ae46d0f65b6302f02d0b6a5c236b84c0fe Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Sat, 6 Oct 2012 01:45:24 +0100 Subject: [PATCH 4/5] Get the result from the connection after the green panic Otherwise the connection won't be usable in case we manage to put it back on track (libpq reports "another command is already in progress") --- psycopg/green.c | 7 +++++++ sandbox/test_green_error.py | 10 +++++++++- 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/psycopg/green.c b/psycopg/green.c index 8e5ec1be..2f35d9f6 100644 --- a/psycopg/green.c +++ b/psycopg/green.c @@ -202,6 +202,7 @@ end: static void psyco_panic_cancel(connectionObject *conn) { + PGresult *res; PyObject *etype, *evalue, *etb; char errbuf[256]; @@ -229,6 +230,12 @@ psyco_panic_cancel(connectionObject *conn) goto exit; } + /* we must clear the result or we get "another command is already in + * progress" */ + if (NULL != (res = pq_get_last_result(conn))) { + PQclear(res); + } + exit: /* restore the exception. If no exception was set at function begin, don't * clobber one that may have been set here. */ diff --git a/sandbox/test_green_error.py b/sandbox/test_green_error.py index 23a247b5..7d17cf74 100644 --- a/sandbox/test_green_error.py +++ b/sandbox/test_green_error.py @@ -42,7 +42,15 @@ signal.signal(signal.SIGHUP, handler) conn = psycopg2.connect(DSN) curs = conn.cursor() print "PID", os.getpid() -curs.execute("select pg_sleep(1000)") +try: + curs.execute("select pg_sleep(1000)") +except BaseException, e: + print "got exception:", e.__class__.__name__, e + +conn.rollback() +curs.execute("select 1") +print curs.fetchone() + # You can unplug the network cable etc. here. # Kill -HUP will raise an exception in the callback. From 58d048198f88b882ce7530b5eb2dec672c36c8a0 Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Sat, 6 Oct 2012 11:58:52 +0100 Subject: [PATCH 5/5] Close the connection on error in callback Unfortunately PQcancel blocks, so it's not better than PQgetResult. It has been suggested to use PQreset in non-blocking way but this would give the Python program the burden of handling a connection done but not configured in an unexpected place. --- NEWS | 3 ++ psycopg/green.c | 61 +++++++++---------------------------- sandbox/test_green_error.py | 49 +++++++++++++++++++++-------- tests/test_green.py | 19 ++++++++++-- 4 files changed, 70 insertions(+), 62 deletions(-) diff --git a/NEWS b/NEWS index af5988b7..b5ec2b5e 100644 --- a/NEWS +++ b/NEWS @@ -7,6 +7,9 @@ What's new in psycopg 2.4.6 - Dropped GIL release during string adaptation around a function call invoking a Python API function, which could cause interpreter crash. Thanks to Manu Cupcic for the report (ticket #110). + - Close a green connection if there is an error in the callback. + Maybe a harsh solution but it leaves the program responsive + (ticket #113). - 'register_hstore()', 'register_composite()', 'tpc_recover()' work with RealDictConnection and Cursor (ticket #114). - connect() raises an exception instead of swallowing keyword arguments diff --git a/psycopg/green.c b/psycopg/green.c index 2f35d9f6..3ffa810b 100644 --- a/psycopg/green.c +++ b/psycopg/green.c @@ -34,7 +34,7 @@ HIDDEN PyObject *wait_callback = NULL; static PyObject *have_wait_callback(void); -static void psyco_panic_cancel(connectionObject *conn); +static void green_panic(connectionObject *conn); /* Register a callback function to block waiting for data. * @@ -178,7 +178,7 @@ psyco_exec_green(connectionObject *conn, const char *command) conn->async_status = ASYNC_WRITE; if (0 != psyco_wait(conn)) { - psyco_panic_cancel(conn); + green_panic(conn); goto end; } @@ -194,52 +194,19 @@ end: /* There has been a communication error during query execution. It may have * happened e.g. for a network error or an error in the callback, and we - * cannot tell the two apart. The strategy here to avoid blocking (issue #113) - * is to try and cancel the query, waiting for the result in non-blocking way. - * If again we receive an error, we raise an error and close the connection. - * Discard the result of the currenly executed query, blocking. + * cannot tell the two apart. + * Trying to PQcancel or PQgetResult to put the connection back into a working + * state doesn't work nice (issue #113): the program blocks and the + * interpreter won't even respond to SIGINT. PQreset could work async, but the + * python program would have then a connection made but not configured where + * it is probably not designed to handled. So for the moment we do the kindest + * thing we can: we close the connection. A long-running program should + * already have a way to discard broken connections; a short-lived one would + * benefit of working ctrl-c. */ static void -psyco_panic_cancel(connectionObject *conn) +green_panic(connectionObject *conn) { - PGresult *res; - PyObject *etype, *evalue, *etb; - char errbuf[256]; - - /* we should have an exception set. */ - PyErr_Fetch(&etype, &evalue, &etb); - if (NULL == etype) { - Dprintf("panic_cancel: called without exception set"); - } - - /* Try sending the cancel signal */ - Dprintf("panic_cancel: sending cancel request"); - if (PQcancel(conn->cancel, errbuf, sizeof(errbuf)) == 0) { - Dprintf("panic_cancel: canceling failed: %s", errbuf); - /* raise a warning: we'll keep the previous error */ - PyErr_WarnEx(NULL, errbuf, 1); - goto exit; - } - - /* go back in the loop for another attempt at async processing */ - /* TODO: should we start on ASYNC_WRITE instead? */ - if (0 != psyco_wait(conn)) { - Dprintf("panic_cancel: error after cancel: closing the connection"); - PyErr_WarnEx(NULL, "async cancel failed: closing the connection", 1); - conn_close_locked(conn); - goto exit; - } - - /* we must clear the result or we get "another command is already in - * progress" */ - if (NULL != (res = pq_get_last_result(conn))) { - PQclear(res); - } - -exit: - /* restore the exception. If no exception was set at function begin, don't - * clobber one that may have been set here. */ - if (etype) { - PyErr_Restore(etype, evalue, etb); - } + Dprintf("green_panic: closing the connection"); + conn_close_locked(conn); } diff --git a/sandbox/test_green_error.py b/sandbox/test_green_error.py index 7d17cf74..7477382a 100644 --- a/sandbox/test_green_error.py +++ b/sandbox/test_green_error.py @@ -4,15 +4,20 @@ DSN = 'dbname=test' -# import eventlet.patcher -# eventlet.patcher.monkey_patch() +import eventlet.patcher +eventlet.patcher.monkey_patch() import os import signal +from time import sleep + import psycopg2 from psycopg2 import extensions from eventlet.hubs import trampoline + +# register a test wait callback that fails if SIGHUP is received + panic = [] def wait_cb(conn): @@ -34,23 +39,43 @@ def wait_cb(conn): extensions.set_wait_callback(wait_cb) + +# SIGHUP handler to inject a fail in the callback + def handler(signum, frame): panic.append(True) signal.signal(signal.SIGHUP, handler) -conn = psycopg2.connect(DSN) -curs = conn.cursor() -print "PID", os.getpid() -try: - curs.execute("select pg_sleep(1000)") -except BaseException, e: - print "got exception:", e.__class__.__name__, e -conn.rollback() -curs.execute("select 1") -print curs.fetchone() +# Simulate another green thread working + +def worker(): + while 1: + print "I'm working" + sleep(1) + +eventlet.spawn(worker) # You can unplug the network cable etc. here. # Kill -HUP will raise an exception in the callback. + +print "PID", os.getpid() +conn = psycopg2.connect(DSN) +curs = conn.cursor() +try: + for i in range(1000): + curs.execute("select %s, pg_sleep(1)", (i,)) + r = curs.fetchone() + print "selected", r + +except BaseException, e: + print "got exception:", e.__class__.__name__, e + +if conn.closed: + print "the connection is closed" +else: + conn.rollback() + curs.execute("select 1") + print curs.fetchone() diff --git a/tests/test_green.py b/tests/test_green.py index d641d183..e0cd57de 100755 --- a/tests/test_green.py +++ b/tests/test_green.py @@ -79,6 +79,9 @@ class GreenTests(unittest.TestCase): warnings.warn("sending a large query didn't trigger block on write.") def test_error_in_callback(self): + # behaviour changed after issue #113: if there is an error in the + # callback for the moment we don't have a way to reset the connection + # without blocking (ticket #113) so just close it. conn = self.conn curs = conn.cursor() curs.execute("select 1") # have a BEGIN @@ -88,11 +91,21 @@ class GreenTests(unittest.TestCase): psycopg2.extensions.set_wait_callback(lambda conn: 1//0) self.assertRaises(ZeroDivisionError, curs.execute, "select 2") + self.assert_(conn.closed) + + def test_dont_freak_out(self): + # if there is an error in a green query, don't freak out and close + # the connection + conn = self.conn + curs = conn.cursor() + self.assertRaises(psycopg2.ProgrammingError, + curs.execute, "select the unselectable") + # check that the connection is left in an usable state - psycopg2.extensions.set_wait_callback(psycopg2.extras.wait_select) + self.assert_(not conn.closed) conn.rollback() - curs.execute("select 2") - self.assertEqual(2, curs.fetchone()[0]) + curs.execute("select 1") + self.assertEqual(curs.fetchone()[0], 1) def test_suite():