Handle errors in asynchronous queries.

Do it by keeping the reference to the last PGresult in the cursor and
calling pq_fetch() before ending the asynchronous execution. This
takes care of handling the possible error state of the PGresult and
also allows the removal of the needsfetch flag, since now after
execution ends the results are already fetched and parsed.
This commit is contained in:
Jan Urbański 2010-04-11 20:25:17 +02:00 committed by Daniele Varrazzo
parent 249b3ef88f
commit c4ebc0f702
4 changed files with 30 additions and 11 deletions

View File

@ -46,7 +46,6 @@ typedef struct {
int closed:1; /* 1 if the cursor is closed */ int closed:1; /* 1 if the cursor is closed */
int notuples:1; /* 1 if the command was not a SELECT query */ int notuples:1; /* 1 if the command was not a SELECT query */
int needsfetch:1; /* 1 if a call to pq_fetch is pending */
long int rowcount; /* number of rows affected by last execute */ long int rowcount; /* number of rows affected by last execute */
long int columns; /* number of columns fetched from the db */ long int columns; /* number of columns fetched from the db */

View File

@ -67,7 +67,6 @@ int
curs_get_last_result(cursorObject *self) { curs_get_last_result(cursorObject *self) {
PGresult *pgres; PGresult *pgres;
IFCLEARPGRES(self->pgres);
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&(self->conn->lock)); pthread_mutex_lock(&(self->conn->lock));
/* read one result, there can be multiple if the client sent multiple /* read one result, there can be multiple if the client sent multiple
@ -76,7 +75,8 @@ curs_get_last_result(cursorObject *self) {
if (PQisBusy(self->conn->pgconn) == 1) { if (PQisBusy(self->conn->pgconn) == 1) {
/* there is another result waiting, need to tell the client to /* there is another result waiting, need to tell the client to
wait more */ wait more */
Dprintf("curs_get_last_result: gut result, but more are pending"); Dprintf("curs_get_last_result: got result, but more are pending");
IFCLEARPGRES(self->pgres);
self->pgres = pgres; self->pgres = pgres;
pthread_mutex_unlock(&(self->conn->lock)); pthread_mutex_unlock(&(self->conn->lock));
Py_BLOCK_THREADS; Py_BLOCK_THREADS;
@ -90,8 +90,10 @@ curs_get_last_result(cursorObject *self) {
self->conn->async_cursor = NULL; self->conn->async_cursor = NULL;
pthread_mutex_unlock(&(self->conn->lock)); pthread_mutex_unlock(&(self->conn->lock));
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
self->needsfetch = 1; /* fetch the tuples (if there are any) and build the result. We don't care
return 0; if pq_fetch return 0 or 1, but if there was an error, we want to signal
it to the caller. */
return pq_fetch(self) == -1 ? -1 : 0;
} }
/* curs_poll_send - handle cursor polling when flushing output */ /* curs_poll_send - handle cursor polling when flushing output */

View File

@ -645,8 +645,7 @@ _psyco_curs_prefetch(cursorObject *self)
{ {
int i = 0; int i = 0;
if (self->pgres == NULL || self->needsfetch) { if (self->pgres == NULL) {
self->needsfetch = 0;
Dprintf("_psyco_curs_prefetch: trying to fetch data"); Dprintf("_psyco_curs_prefetch: trying to fetch data");
do { do {
i = pq_fetch(self); i = pq_fetch(self);
@ -1067,10 +1066,6 @@ psyco_curs_scroll(cursorObject *self, PyObject *args, PyObject *kwargs)
our own calculations to scroll; else we just delegate the scrolling our own calculations to scroll; else we just delegate the scrolling
to the MOVE SQL statement */ to the MOVE SQL statement */
if (self->name == NULL) { if (self->name == NULL) {
/* the prefetch will be a noop for sync executions, because they
always set self->pgres and never touch self->needsfetch, but for
async queries we need to parse the result and set self->rowcount */
if (_psyco_curs_prefetch(self) < 0) return NULL;
if (strcmp(mode, "relative") == 0) { if (strcmp(mode, "relative") == 0) {
newpos = self->row + value; newpos = self->row + value;
} else if (strcmp( mode, "absolute") == 0) { } else if (strcmp( mode, "absolute") == 0) {

View File

@ -338,6 +338,29 @@ class AsyncTests(unittest.TestCase):
# fetching from the correct cursor works # fetching from the correct cursor works
self.assertEquals(cur1.fetchone()[0], 1) self.assertEquals(cur1.fetchone()[0], 1)
def test_error(self):
cur = self.conn.cursor()
cur.execute("insert into table1 values (%s)", (1, ))
self.wait(cur)
cur.execute("insert into table1 values (%s)", (1, ))
# this should fail
self.assertRaises(psycopg2.IntegrityError, self.wait, cur)
cur.execute("insert into table1 values (%s); "
"insert into table1 values (%s)", (2, 2))
# this should fail as well
self.assertRaises(psycopg2.IntegrityError, self.wait, cur)
# but this should work
cur.execute("insert into table1 values (%s)", (2, ))
self.wait(cur)
# and the cursor should be usable afterwards
cur.execute("insert into table1 values (%s)", (3, ))
self.wait(cur)
cur.execute("select * from table1 order by id")
self.wait(cur)
self.assertEquals(cur.fetchall(), [(1, ), (2, ), (3, )])
cur.execute("delete from table1")
self.wait(cur)
def test_suite(): def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__) return unittest.TestLoader().loadTestsFromName(__name__)