diff --git a/NEWS b/NEWS index e36c7e3a..d04a62c7 100644 --- a/NEWS +++ b/NEWS @@ -4,6 +4,8 @@ Current release What's new in psycopg 2.6.2 ^^^^^^^^^^^^^^^^^^^^^^^^^^^ +- Fixed inconsistent state in externally closed connections + (:tickets:`#263, #311, #443`). - Report the server response status on errors (such as :ticket:`#281`). - Raise `!NotSupportedError` on unhandled server response status (:ticket:`#352`). diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index eb862d3d..26c1e0bb 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -167,8 +167,10 @@ pq_raise(connectionObject *conn, cursorObject *curs, PGresult **pgres) /* if the connection has somehow been broken, we mark the connection object as closed but requiring cleanup */ - if (conn->pgconn != NULL && PQstatus(conn->pgconn) == CONNECTION_BAD) + if (conn->pgconn != NULL && PQstatus(conn->pgconn) == CONNECTION_BAD) { conn->closed = 2; + exc = OperationalError; + } if (pgres == NULL && curs != NULL) pgres = &curs->pgres; @@ -202,9 +204,9 @@ pq_raise(connectionObject *conn, cursorObject *curs, PGresult **pgres) if (code != NULL) { exc = exception_from_sqlstate(code); } - else { - /* Fallback if there is no exception code (reported happening e.g. - * when the connection is closed). */ + else if (exc == NULL) { + /* Fallback if there is no exception code (unless we already + determined that the connection was closed). */ exc = DatabaseError; } @@ -934,6 +936,9 @@ pq_execute(cursorObject *curs, const char *query, int async, int no_result, int /* don't let pgres = NULL go to pq_fetch() */ if (curs->pgres == NULL) { + if (CONNECTION_BAD == PQstatus(curs->conn->pgconn)) { + curs->conn->closed = 2; + } pthread_mutex_unlock(&(curs->conn->lock)); Py_BLOCK_THREADS; if (!PyErr_Occurred()) { @@ -961,6 +966,9 @@ pq_execute(cursorObject *curs, const char *query, int async, int no_result, int CLEARPGRES(curs->pgres); if (PQsendQuery(curs->conn->pgconn, query) == 0) { + if (CONNECTION_BAD == PQstatus(curs->conn->pgconn)) { + curs->conn->closed = 2; + } pthread_mutex_unlock(&(curs->conn->lock)); Py_BLOCK_THREADS; PyErr_SetString(OperationalError, diff --git a/tests/test_cursor.py b/tests/test_cursor.py index 970cc37d..1f2c94c2 100755 --- a/tests/test_cursor.py +++ b/tests/test_cursor.py @@ -22,10 +22,13 @@ # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. +from __future__ import with_statement + import time import pickle import psycopg2 import psycopg2.extensions +import psycopg2.extras from psycopg2.extensions import b from testutils import unittest, ConnectingTestCase, skip_before_postgres from testutils import skip_if_no_namedtuple, skip_if_no_getrefcount @@ -490,6 +493,52 @@ class CursorTests(ConnectingTestCase): cur = self.conn.cursor() self.assertRaises(TypeError, cur.callproc, 'lower', 42) + @skip_before_postgres(8, 4) + def test_external_close_sync(self): + # If a "victim" connection is closed by a "control" connection + # behind psycopg2's back, psycopg2 always handles it correctly: + # raise OperationalError, set conn.closed to 2. This reproduces + # issue #443, a race between control_conn closing victim_conn and + # psycopg2 noticing. + control_conn = self.conn + connect_func = self.connect + wait_func = lambda conn: None + self._test_external_close(control_conn, connect_func, wait_func) + + @skip_before_postgres(8, 4) + def test_external_close_async(self): + # Issue #443 is in the async code too. Since the fix is duplicated, + # so is the test. + control_conn = self.conn + connect_func = lambda: self.connect(async=True) + wait_func = psycopg2.extras.wait_select + self._test_external_close(control_conn, connect_func, wait_func) + + def _test_external_close(self, control_conn, connect_func, wait_func): + # The short sleep before using victim_conn the second time makes it + # much more likely to lose the race and see the bug. Repeating the + # test several times makes it even more likely. + for i in range(10): + victim_conn = connect_func() + wait_func(victim_conn) + + with victim_conn.cursor() as cur: + cur.execute('select pg_backend_pid()') + wait_func(victim_conn) + pid1 = cur.fetchall()[0][0] + + with control_conn.cursor() as cur: + cur.execute('select pg_terminate_backend(%s)', (pid1,)) + + def f(): + with victim_conn.cursor() as cur: + cur.execute('select 1') + wait_func(victim_conn) + + time.sleep(0.001) + self.assertRaises(psycopg2.OperationalError, f) + self.assertEqual(victim_conn.closed, 2) + def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__)