diff --git a/psycopg/cursor.h b/psycopg/cursor.h index a0a3a822..68a087ac 100644 --- a/psycopg/cursor.h +++ b/psycopg/cursor.h @@ -46,7 +46,6 @@ typedef struct { int closed:1; /* 1 if the cursor is closed */ int notuples:1; /* 1 if the command was not a SELECT query */ - int needsfetch:1; /* 1 if a call to pq_fetch is pending */ long int rowcount; /* number of rows affected by last execute */ long int columns; /* number of columns fetched from the db */ diff --git a/psycopg/cursor_int.c b/psycopg/cursor_int.c index 32f3b72d..1069aaee 100644 --- a/psycopg/cursor_int.c +++ b/psycopg/cursor_int.c @@ -67,7 +67,6 @@ int curs_get_last_result(cursorObject *self) { PGresult *pgres; - IFCLEARPGRES(self->pgres); Py_BEGIN_ALLOW_THREADS; pthread_mutex_lock(&(self->conn->lock)); /* read one result, there can be multiple if the client sent multiple @@ -76,7 +75,8 @@ curs_get_last_result(cursorObject *self) { if (PQisBusy(self->conn->pgconn) == 1) { /* there is another result waiting, need to tell the client to wait more */ - Dprintf("curs_get_last_result: gut result, but more are pending"); + Dprintf("curs_get_last_result: got result, but more are pending"); + IFCLEARPGRES(self->pgres); self->pgres = pgres; pthread_mutex_unlock(&(self->conn->lock)); Py_BLOCK_THREADS; @@ -90,8 +90,10 @@ curs_get_last_result(cursorObject *self) { self->conn->async_cursor = NULL; pthread_mutex_unlock(&(self->conn->lock)); Py_END_ALLOW_THREADS; - self->needsfetch = 1; - return 0; + /* fetch the tuples (if there are any) and build the result. We don't care + if pq_fetch return 0 or 1, but if there was an error, we want to signal + it to the caller. */ + return pq_fetch(self) == -1 ? -1 : 0; } /* curs_poll_send - handle cursor polling when flushing output */ diff --git a/psycopg/cursor_type.c b/psycopg/cursor_type.c index 77b2393f..b916477d 100644 --- a/psycopg/cursor_type.c +++ b/psycopg/cursor_type.c @@ -645,8 +645,7 @@ _psyco_curs_prefetch(cursorObject *self) { int i = 0; - if (self->pgres == NULL || self->needsfetch) { - self->needsfetch = 0; + if (self->pgres == NULL) { Dprintf("_psyco_curs_prefetch: trying to fetch data"); do { i = pq_fetch(self); @@ -1067,10 +1066,6 @@ psyco_curs_scroll(cursorObject *self, PyObject *args, PyObject *kwargs) our own calculations to scroll; else we just delegate the scrolling to the MOVE SQL statement */ if (self->name == NULL) { - /* the prefetch will be a noop for sync executions, because they - always set self->pgres and never touch self->needsfetch, but for - async queries we need to parse the result and set self->rowcount */ - if (_psyco_curs_prefetch(self) < 0) return NULL; if (strcmp(mode, "relative") == 0) { newpos = self->row + value; } else if (strcmp( mode, "absolute") == 0) { diff --git a/tests/test_async.py b/tests/test_async.py index 5333b0d1..bc23ab7b 100755 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -338,6 +338,29 @@ class AsyncTests(unittest.TestCase): # fetching from the correct cursor works self.assertEquals(cur1.fetchone()[0], 1) + def test_error(self): + cur = self.conn.cursor() + cur.execute("insert into table1 values (%s)", (1, )) + self.wait(cur) + cur.execute("insert into table1 values (%s)", (1, )) + # this should fail + self.assertRaises(psycopg2.IntegrityError, self.wait, cur) + cur.execute("insert into table1 values (%s); " + "insert into table1 values (%s)", (2, 2)) + # this should fail as well + self.assertRaises(psycopg2.IntegrityError, self.wait, cur) + # but this should work + cur.execute("insert into table1 values (%s)", (2, )) + self.wait(cur) + # and the cursor should be usable afterwards + cur.execute("insert into table1 values (%s)", (3, )) + self.wait(cur) + cur.execute("select * from table1 order by id") + self.wait(cur) + self.assertEquals(cur.fetchall(), [(1, ), (2, ), (3, )]) + cur.execute("delete from table1") + self.wait(cur) + def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__)