From 734845b79adc733bd9b7414602111824c55fdb2c Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Fri, 8 Mar 2019 03:24:19 +0000 Subject: [PATCH] Added pq_get_result_async() replaced pg_get_last_result() The new function keeps together PQconsumeInput() with PQisBusy(), in order to handle the condition in which not all the results of a sequence of statements arrive in the same roundtrip. Added pointer to a PGresult to the connection to keep the state across async communication: it can probably be used to simplify other code paths where a result is brought forward manually. Close #802 Close #855 Close #856 --- NEWS | 2 + psycopg/connection.h | 1 + psycopg/connection_int.c | 24 ++++---- psycopg/cursor_type.c | 4 +- psycopg/green.c | 6 +- psycopg/pqpath.c | 120 ++++++++++++++++++++++----------------- psycopg/pqpath.h | 3 +- tests/test_async.py | 41 +++++++++++++ tests/test_green.py | 35 +++++++++++- 9 files changed, 167 insertions(+), 69 deletions(-) diff --git a/NEWS b/NEWS index 2a6df275..7dc3e1b8 100644 --- a/NEWS +++ b/NEWS @@ -33,6 +33,8 @@ New features: - Added `~psycopg2.extensions.Diagnostics.severity_nonlocalized` attribute on the `~psycopg2.extensions.Diagnostics` object (:ticket:`#783`). - More efficient `~psycopg2.extras.NamedTupleCursor` (:ticket:`#838`). +- Async communication improved to fix blocking on results returned at + different times (:ticket:`#856`). Other changes: diff --git a/psycopg/connection.h b/psycopg/connection.h index 60ec6205..b7344d7e 100644 --- a/psycopg/connection.h +++ b/psycopg/connection.h @@ -108,6 +108,7 @@ struct connectionObject { * for a green connection. If NULL, the connection is idle. */ PyObject *async_cursor; int async_status; /* asynchronous execution status */ + PGresult *pgres; /* temporary result across async calls */ /* notice processing */ PyObject *notice_list; diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c index 5312b0f9..c05f48ee 100644 --- a/psycopg/connection_int.c +++ b/psycopg/connection_int.c @@ -896,7 +896,9 @@ _conn_poll_advance_write(connectionObject *self) return res; } -/* Advance to the next state after a call to a pq_is_busy* function */ + +/* Advance to the next state after reading results */ + static int _conn_poll_advance_read(connectionObject *self) { @@ -905,13 +907,13 @@ _conn_poll_advance_read(connectionObject *self) Dprintf("conn_poll: poll reading"); - busy = pq_is_busy(self); + busy = pq_get_result_async(self); switch (busy) { case 0: /* result is ready */ - res = PSYCO_POLL_OK; Dprintf("conn_poll: async_status -> ASYNC_DONE"); self->async_status = ASYNC_DONE; + res = PSYCO_POLL_OK; break; case 1: /* result not ready: fd would block */ res = PSYCO_POLL_READ; @@ -920,13 +922,15 @@ _conn_poll_advance_read(connectionObject *self) res = PSYCO_POLL_ERROR; break; default: - Dprintf("conn_poll: unexpected result from pq_is_busy: %d", busy); + Dprintf("conn_poll: unexpected result from pq_get_result_async: %d", + busy); res = PSYCO_POLL_ERROR; break; } return res; } + /* Poll the connection for the send query/retrieve result phase Advance the async_status (usually going WRITE -> READ -> DONE) but don't @@ -974,7 +978,6 @@ static int _conn_poll_setup_async(connectionObject *self) { int res = PSYCO_POLL_ERROR; - PGresult *pgres; switch (self->status) { case CONN_STATUS_CONNECTING: @@ -1025,12 +1028,12 @@ _conn_poll_setup_async(connectionObject *self) res = _conn_poll_query(self); if (res == PSYCO_POLL_OK) { res = PSYCO_POLL_ERROR; - pgres = pq_get_last_result(self); - if (pgres == NULL || PQresultStatus(pgres) != PGRES_COMMAND_OK ) { + if (self->pgres == NULL + || PQresultStatus(self->pgres) != PGRES_COMMAND_OK ) { PyErr_SetString(OperationalError, "can't set datestyle to ISO"); break; } - CLEARPGRES(pgres); + CLEARPGRES(self->pgres); Dprintf("conn_poll: status -> CONN_STATUS_READY"); self->status = CONN_STATUS_READY; @@ -1119,8 +1122,9 @@ conn_poll(connectionObject *self) break; } - CLEARPGRES(curs->pgres); - curs->pgres = pq_get_last_result(self); + PQclear(curs->pgres); + curs->pgres = self->pgres; + self->pgres = NULL; /* fetch the tuples (if there are any) and build the result. We * don't care if pq_fetch return 0 or 1, but if there was an error, diff --git a/psycopg/cursor_type.c b/psycopg/cursor_type.c index 875be663..0d10e61f 100644 --- a/psycopg/cursor_type.c +++ b/psycopg/cursor_type.c @@ -52,8 +52,6 @@ psyco_curs_close(cursorObject *self, PyObject *dummy) PyObject *rv = NULL; char *lname = NULL; - EXC_IF_ASYNC_IN_PROGRESS(self, close); - if (self->closed) { rv = Py_None; Py_INCREF(rv); @@ -64,6 +62,8 @@ psyco_curs_close(cursorObject *self, PyObject *dummy) char buffer[256]; PGTransactionStatusType status; + EXC_IF_ASYNC_IN_PROGRESS(self, close_named); + if (self->conn) { status = PQtransactionStatus(self->conn->pgconn); } diff --git a/psycopg/green.c b/psycopg/green.c index fec59d95..d2d73838 100644 --- a/psycopg/green.c +++ b/psycopg/green.c @@ -177,10 +177,12 @@ psyco_exec_green(connectionObject *conn, const char *command) goto end; } - /* Now we can read the data without fear of blocking. */ - result = pq_get_last_result(conn); + /* the result is now in the connection: take its ownership */ + result = conn->pgres; + conn->pgres = NULL; end: + CLEARPGRES(conn->pgres); conn->async_status = ASYNC_DONE; Py_CLEAR(conn->async_cursor); return result; diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index e95893e0..0789ab83 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -764,22 +764,22 @@ exit: } -/* pq_is_busy - consume input and return connection status - - a status of 1 means that a call to pq_fetch will block, while a status of 0 - means that there is data available to be collected. -1 means an error, the - exception will be set accordingly. - +/* pq_get_result_async - read an available result without blocking. + * + * Return 0 if the result is ready, 1 if it will block, -1 on error. + * The last result will be returned in pgres. + * * The function should be called with the lock and holding the GIL. */ -int -pq_is_busy(connectionObject *conn) +RAISES_NEG int +pq_get_result_async(connectionObject *conn) { - Dprintf("pq_is_busy: calling PQconsumeInput()"); + int rv = -1; + Dprintf("pq_get_result_async: calling PQconsumeInput()"); if (PQconsumeInput(conn->pgconn) == 0) { - Dprintf("pq_is_busy: PQconsumeInput() failed"); + Dprintf("pq_get_result_async: PQconsumeInput() failed"); /* if the libpq says pgconn is lost, close the py conn */ if (CONNECTION_BAD == PQstatus(conn->pgconn)) { @@ -787,13 +787,69 @@ pq_is_busy(connectionObject *conn) } PyErr_SetString(OperationalError, PQerrorMessage(conn->pgconn)); - return -1; + goto exit; } conn_notifies_process(conn); conn_notice_process(conn); - return PQisBusy(conn->pgconn); + for (;;) { + int busy; + PGresult *res; + ExecStatusType status; + + Dprintf("pq_get_result_async: calling PQisBusy()"); + busy = PQisBusy(conn->pgconn); + + if (busy) { + /* try later */ + Dprintf("pq_get_result_async: PQisBusy() = 1"); + rv = 1; + goto exit; + } + + if (!(res = PQgetResult(conn->pgconn))) { + Dprintf("pq_get_result_async: got no result"); + /* the result is ready: it was the previously read one */ + rv = 0; + goto exit; + } + + status = PQresultStatus(res); + Dprintf("pq_get_result_async: got result %s", PQresStatus(status)); + + /* Store the result outside because we want to return the last non-null + * one and we may have to do it across poll calls. However if there is + * an error in the stream of results we want to handle the *first* + * error. So don't clobber it with the following ones. */ + if (conn->pgres && PQresultStatus(conn->pgres) == PGRES_FATAL_ERROR) { + Dprintf("previous pgres is error: discarding"); + PQclear(res); + } + else { + PQclear(conn->pgres); + conn->pgres = res; + } + + switch (status) { + case PGRES_COPY_OUT: + case PGRES_COPY_IN: + case PGRES_COPY_BOTH: + /* After entering copy mode, libpq will make a phony + * PGresult for us every time we query for it, so we need to + * break out of this endless loop. */ + rv = 0; + goto exit; + + default: + /* keep on reading to check if there are other results or + * we have finished. */ + continue; + } + } + +exit: + return rv; } /* pq_flush - flush output and return connection status @@ -995,6 +1051,7 @@ pq_send_query(connectionObject *conn, const char *query) Dprintf("pq_send_query: sending ASYNC query:"); Dprintf(" %-.200s", query); + CLEARPGRES(conn->pgres); if (0 == (rv = PQsendQuery(conn->pgconn, query))) { Dprintf("pq_send_query: error: %s", PQerrorMessage(conn->pgconn)); } @@ -1002,45 +1059,6 @@ pq_send_query(connectionObject *conn, const char *query) return rv; } -/* Return the last result available on the connection. - * - * The function will block only if a command is active and the - * necessary response data has not yet been read by PQconsumeInput. - * - * The result should be disposed of using PQclear() - */ -PGresult * -pq_get_last_result(connectionObject *conn) -{ - PGresult *result = NULL, *res; - ExecStatusType status; - - /* Read until PQgetResult gives a NULL */ - while (NULL != (res = PQgetResult(conn->pgconn))) { - if (result) { - /* TODO too bad: we are discarding results from all the queries - * except the last. We could have populated `nextset()` with it - * but it would be an incompatible change (apps currently issue - * groups of queries expecting to receive the last result: they - * would start receiving the first instead). */ - PQclear(result); - } - result = res; - status = PQresultStatus(result); - Dprintf("pq_get_last_result: got result %s", PQresStatus(status)); - - /* After entering copy mode, libpq will make a phony - * PGresult for us every time we query for it, so we need to - * break out of this endless loop. */ - if (status == PGRES_COPY_BOTH - || status == PGRES_COPY_OUT - || status == PGRES_COPY_IN) { - break; - } - } - - return result; -} /* pq_fetch - fetch data after a query diff --git a/psycopg/pqpath.h b/psycopg/pqpath.h index 58a1c59b..0ad74d97 100644 --- a/psycopg/pqpath.h +++ b/psycopg/pqpath.h @@ -35,7 +35,6 @@ #define CLEARPGRES(pgres) do { PQclear(pgres); pgres = NULL; } while (0) /* exported functions */ -HIDDEN PGresult *pq_get_last_result(connectionObject *conn); RAISES_NEG HIDDEN int pq_fetch(cursorObject *curs, int no_result); RAISES_NEG HIDDEN int pq_execute(cursorObject *curs, const char *query, int async, int no_result, int no_begin); @@ -59,7 +58,7 @@ HIDDEN int pq_tpc_command_locked(connectionObject *conn, const char *cmd, const char *tid, PGresult **pgres, char **error, PyThreadState **tstate); -HIDDEN int pq_is_busy(connectionObject *conn); +RAISES_NEG HIDDEN int pq_get_result_async(connectionObject *conn); HIDDEN int pq_flush(connectionObject *conn); HIDDEN void pq_clear_async(connectionObject *conn); RAISES_NEG HIDDEN int pq_set_non_blocking(connectionObject *conn, int arg); diff --git a/tests/test_async.py b/tests/test_async.py index 057deac1..21ef7fe5 100755 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -29,6 +29,7 @@ import unittest import warnings import psycopg2 +import psycopg2.errors from psycopg2 import extensions as ext from .testutils import ConnectingTestCase, StringIO, skip_before_postgres, slow @@ -405,6 +406,15 @@ class AsyncTests(ConnectingTestCase): cur.execute("delete from table1") self.wait(cur) + def test_stop_on_first_error(self): + cur = self.conn.cursor() + cur.execute("select 1; select x; select 1/0; select 2") + self.assertRaises(psycopg2.errors.UndefinedColumn, self.wait, cur) + + cur.execute("select 1") + self.wait(cur) + self.assertEqual(cur.fetchone(), (1,)) + def test_error_two_cursors(self): cur = self.conn.cursor() cur2 = self.conn.cursor() @@ -454,6 +464,37 @@ class AsyncTests(ConnectingTestCase): cur.execute("copy (select 1) to stdout") self.assertRaises(psycopg2.ProgrammingError, self.wait, self.conn) + @slow + @skip_before_postgres(9, 0) + def test_non_block_after_notification(self): + from select import select + + cur = self.conn.cursor() + cur.execute(""" + select 1; + do $$ + begin + raise notice 'hello'; + end + $$ language plpgsql; + select pg_sleep(1); + """) + + polls = 0 + while True: + state = self.conn.poll() + if state == psycopg2.extensions.POLL_OK: + break + elif state == psycopg2.extensions.POLL_READ: + select([self.conn], [], [], 0.1) + elif state == psycopg2.extensions.POLL_WRITE: + select([], [self.conn], [], 0.1) + else: + raise Exception("Unexpected result from poll: %r", state) + polls += 1 + + self.assert_(polls >= 8, polls) + def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__) diff --git a/tests/test_green.py b/tests/test_green.py index 76ee1c83..46f18dcb 100755 --- a/tests/test_green.py +++ b/tests/test_green.py @@ -25,6 +25,7 @@ import select import unittest import warnings + import psycopg2 import psycopg2.extensions import psycopg2.extras @@ -58,10 +59,10 @@ class GreenTestCase(ConnectingTestCase): ConnectingTestCase.tearDown(self) psycopg2.extensions.set_wait_callback(self._cb) - def set_stub_wait_callback(self, conn): + def set_stub_wait_callback(self, conn, cb=None): stub = ConnectionStub(conn) psycopg2.extensions.set_wait_callback( - lambda conn: psycopg2.extras.wait_select(stub)) + lambda conn: (cb or psycopg2.extras.wait_select)(stub)) return stub @slow @@ -119,6 +120,36 @@ class GreenTestCase(ConnectingTestCase): self.assertRaises(psycopg2.ProgrammingError, cur.execute, "copy (select 1) to stdout") + @slow + @skip_before_postgres(9, 0) + def test_non_block_after_notification(self): + def wait(conn): + while 1: + state = conn.poll() + if state == POLL_OK: + break + elif state == POLL_READ: + select.select([conn.fileno()], [], [], 0.1) + elif state == POLL_WRITE: + select.select([], [conn.fileno()], [], 0.1) + else: + raise conn.OperationalError("bad state from poll: %s" % state) + + stub = self.set_stub_wait_callback(self.conn, wait) + cur = self.conn.cursor() + cur.execute(""" + select 1; + do $$ + begin + raise notice 'hello'; + end + $$ language plpgsql; + select pg_sleep(1); + """) + + polls = stub.polls.count(POLL_READ) + self.assert_(polls > 8, polls) + class CallbackErrorTestCase(ConnectingTestCase): def setUp(self):