diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index 59cbb5e3..222696be 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -986,6 +986,9 @@ pq_execute(cursorObject *curs, const char *query, int async, int no_result, int /* don't let pgres = NULL go to pq_fetch() */ if (curs->pgres == NULL) { + if (CONNECTION_BAD == PQstatus(curs->conn->pgconn)) { + curs->conn->closed = 2; + } pthread_mutex_unlock(&(curs->conn->lock)); Py_BLOCK_THREADS; if (!PyErr_Occurred()) { @@ -1013,6 +1016,9 @@ pq_execute(cursorObject *curs, const char *query, int async, int no_result, int CLEARPGRES(curs->pgres); if (PQsendQuery(curs->conn->pgconn, query) == 0) { + if (CONNECTION_BAD == PQstatus(curs->conn->pgconn)) { + curs->conn->closed = 2; + } pthread_mutex_unlock(&(curs->conn->lock)); Py_BLOCK_THREADS; PyErr_SetString(OperationalError, diff --git a/tests/test_cursor.py b/tests/test_cursor.py index 7f3bf21c..f9fc66ca 100755 --- a/tests/test_cursor.py +++ b/tests/test_cursor.py @@ -29,6 +29,8 @@ import psycopg2.extensions from testutils import (unittest, ConnectingTestCase, skip_before_postgres, skip_if_no_namedtuple, skip_if_no_getrefcount, slow) +import psycopg2.extras + class CursorTests(ConnectingTestCase): @@ -537,6 +539,49 @@ class CursorTests(ConnectingTestCase): self.assertRaises(exception, cur.callproc, procname, parameter_sequence) self.conn.rollback() + def test_external_close_sync(self): + # If a "victim" connection is closed by a "control" connection + # behind psycopg2's back, psycopg2 always handles it correctly: + # raise OperationalError, set conn.closed to 2. This reproduces + # issue #443, a race between control_conn closing victim_conn and + # psycopg2 noticing. + control_conn = self.conn + connect_func = self.connect + wait_func = lambda conn: None + self._test_external_close(control_conn, connect_func, wait_func) + + def test_external_close_async(self): + # Issue #443 is in the async code too. Since the fix is duplicated, + # so is the test. + control_conn = self.conn + connect_func = lambda: self.connect(async=True) + wait_func = psycopg2.extras.wait_select + self._test_external_close(control_conn, connect_func, wait_func) + + def _test_external_close(self, control_conn, connect_func, wait_func): + # The short sleep before using victim_conn the second time makes it + # much more likely to lose the race and see the bug. Repeating the + # test several times makes it even more likely. + for i in range(10): + victim_conn = connect_func() + wait_func(victim_conn) + + with victim_conn.cursor() as cur: + cur.execute('select pg_backend_pid()') + wait_func(victim_conn) + pid1 = cur.fetchall()[0][0] + + with control_conn.cursor() as cur: + cur.execute('select pg_terminate_backend(%s)', (pid1,)) + + time.sleep(0.001) + with self.assertRaises(psycopg2.DatabaseError): + with victim_conn.cursor() as cur: + cur.execute('select 1') + wait_func(victim_conn) + + self.assertEqual(victim_conn.closed, 2) + def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__)