From c4ebc0f702dfc3bae92ff3ffc01b9d6ed5d2fbde Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Urba=C5=84ski?= Date: Sun, 11 Apr 2010 20:25:17 +0200 Subject: [PATCH] Handle errors in asynchronous queries. Do it by keeping the reference to the last PGresult in the cursor and calling pq_fetch() before ending the asynchronous execution. This takes care of handling the possible error state of the PGresult and also allows the removal of the needsfetch flag, since now after execution ends the results are already fetched and parsed. --- psycopg/cursor.h | 1 - psycopg/cursor_int.c | 10 ++++++---- psycopg/cursor_type.c | 7 +------ tests/test_async.py | 23 +++++++++++++++++++++++ 4 files changed, 30 insertions(+), 11 deletions(-) diff --git a/psycopg/cursor.h b/psycopg/cursor.h index a0a3a822..68a087ac 100644 --- a/psycopg/cursor.h +++ b/psycopg/cursor.h @@ -46,7 +46,6 @@ typedef struct { int closed:1; /* 1 if the cursor is closed */ int notuples:1; /* 1 if the command was not a SELECT query */ - int needsfetch:1; /* 1 if a call to pq_fetch is pending */ long int rowcount; /* number of rows affected by last execute */ long int columns; /* number of columns fetched from the db */ diff --git a/psycopg/cursor_int.c b/psycopg/cursor_int.c index 32f3b72d..1069aaee 100644 --- a/psycopg/cursor_int.c +++ b/psycopg/cursor_int.c @@ -67,7 +67,6 @@ int curs_get_last_result(cursorObject *self) { PGresult *pgres; - IFCLEARPGRES(self->pgres); Py_BEGIN_ALLOW_THREADS; pthread_mutex_lock(&(self->conn->lock)); /* read one result, there can be multiple if the client sent multiple @@ -76,7 +75,8 @@ curs_get_last_result(cursorObject *self) { if (PQisBusy(self->conn->pgconn) == 1) { /* there is another result waiting, need to tell the client to wait more */ - Dprintf("curs_get_last_result: gut result, but more are pending"); + Dprintf("curs_get_last_result: got result, but more are pending"); + IFCLEARPGRES(self->pgres); self->pgres = pgres; pthread_mutex_unlock(&(self->conn->lock)); Py_BLOCK_THREADS; @@ -90,8 +90,10 @@ curs_get_last_result(cursorObject *self) { self->conn->async_cursor = NULL; pthread_mutex_unlock(&(self->conn->lock)); Py_END_ALLOW_THREADS; - self->needsfetch = 1; - return 0; + /* fetch the tuples (if there are any) and build the result. We don't care + if pq_fetch return 0 or 1, but if there was an error, we want to signal + it to the caller. */ + return pq_fetch(self) == -1 ? -1 : 0; } /* curs_poll_send - handle cursor polling when flushing output */ diff --git a/psycopg/cursor_type.c b/psycopg/cursor_type.c index 77b2393f..b916477d 100644 --- a/psycopg/cursor_type.c +++ b/psycopg/cursor_type.c @@ -645,8 +645,7 @@ _psyco_curs_prefetch(cursorObject *self) { int i = 0; - if (self->pgres == NULL || self->needsfetch) { - self->needsfetch = 0; + if (self->pgres == NULL) { Dprintf("_psyco_curs_prefetch: trying to fetch data"); do { i = pq_fetch(self); @@ -1067,10 +1066,6 @@ psyco_curs_scroll(cursorObject *self, PyObject *args, PyObject *kwargs) our own calculations to scroll; else we just delegate the scrolling to the MOVE SQL statement */ if (self->name == NULL) { - /* the prefetch will be a noop for sync executions, because they - always set self->pgres and never touch self->needsfetch, but for - async queries we need to parse the result and set self->rowcount */ - if (_psyco_curs_prefetch(self) < 0) return NULL; if (strcmp(mode, "relative") == 0) { newpos = self->row + value; } else if (strcmp( mode, "absolute") == 0) { diff --git a/tests/test_async.py b/tests/test_async.py index 5333b0d1..bc23ab7b 100755 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -338,6 +338,29 @@ class AsyncTests(unittest.TestCase): # fetching from the correct cursor works self.assertEquals(cur1.fetchone()[0], 1) + def test_error(self): + cur = self.conn.cursor() + cur.execute("insert into table1 values (%s)", (1, )) + self.wait(cur) + cur.execute("insert into table1 values (%s)", (1, )) + # this should fail + self.assertRaises(psycopg2.IntegrityError, self.wait, cur) + cur.execute("insert into table1 values (%s); " + "insert into table1 values (%s)", (2, 2)) + # this should fail as well + self.assertRaises(psycopg2.IntegrityError, self.wait, cur) + # but this should work + cur.execute("insert into table1 values (%s)", (2, )) + self.wait(cur) + # and the cursor should be usable afterwards + cur.execute("insert into table1 values (%s)", (3, )) + self.wait(cur) + cur.execute("select * from table1 order by id") + self.wait(cur) + self.assertEquals(cur.fetchall(), [(1, ), (2, ), (3, )]) + cur.execute("delete from table1") + self.wait(cur) + def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__)