1
1
mirror of https://github.com/psycopg/psycopg2.git synced 2025-02-26 05:30:33 +03:00

Try to restore the connection state after a wait callback error.

This commit is contained in:
Daniele Varrazzo 2010-04-05 00:52:50 +01:00
parent a54932ee9c
commit 7a06c0455b
2 changed files with 39 additions and 0 deletions

View File

@ -33,6 +33,7 @@
HIDDEN PyObject *wait_callback = NULL;
PyObject *have_wait_callback(void);
void psyco_clear_result_blocking(connectionObject *conn);
/* Register a callback function to block waiting for data.
*
@ -163,6 +164,7 @@ psyco_exec_green(connectionObject *conn, const char *command)
pyrv = PyObject_CallFunctionObjArgs(cb, conn, NULL);
if (!pyrv) {
Dprintf("psyco_exec_green: error in callback sending query");
psyco_clear_result_blocking(conn);
goto clear;
}
Py_DECREF(pyrv);
@ -173,6 +175,7 @@ psyco_exec_green(connectionObject *conn, const char *command)
pyrv = PyObject_CallFunctionObjArgs(cb, conn, NULL);
if (!pyrv) {
Dprintf("psyco_exec_green: error in callback reading result");
psyco_clear_result_blocking(conn);
goto clear;
}
Py_DECREF(pyrv);
@ -197,3 +200,23 @@ end:
return result;
}
/* Discard the result of the currenly executed query, blocking.
*
* This function doesn't honour the wait callback: it can be used in case of
* emergency if the callback fails in order to put the connection back into a
* consistent state.
*
* If any command was issued before clearing the result, libpq would fail with
* the error "another command is already in progress".
*/
void
psyco_clear_result_blocking(connectionObject *conn)
{
PGresult *res;
Dprintf("psyco_clear_result_blocking");
while (NULL != (res = PQgetResult(conn->pgconn))) {
PQclear(res);
}
}

View File

@ -52,6 +52,22 @@ class GreenTests(unittest.TestCase):
self.fail("sending a large query didn't trigger block on write.")
def test_error_in_callback(self):
conn = self.connect()
curs = conn.cursor()
curs.execute("select 1") # have a BEGIN
curs.fetchone()
# now try to do something that will fail in the callback
psycopg2.extensions.set_wait_callback(lambda conn: 1/0)
self.assertRaises(ZeroDivisionError, curs.execute, "select 2")
# check that the connection is left in an usable state
psycopg2.extensions.set_wait_callback(psycopg2.extras.wait_select)
conn.rollback()
curs.execute("select 2")
self.assertEqual(2, curs.fetchone()[0])
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)