Added pq_get_result_async() replaced pg_get_last_result()

The new function keeps together PQconsumeInput() with PQisBusy(), in
order to handle the condition in which not all the results of a sequence
of statements arrive in the same roundtrip.

Added pointer to a PGresult to the connection to keep the state across
async communication: it can probably be used to simplify other code
paths where a result is brought forward manually.

Close #802
Close #855
Close #856
This commit is contained in:
Daniele Varrazzo 2019-03-08 03:24:19 +00:00
parent 761c3784c4
commit 734845b79a
9 changed files with 167 additions and 69 deletions

2
NEWS
View File

@ -33,6 +33,8 @@ New features:
- Added `~psycopg2.extensions.Diagnostics.severity_nonlocalized` attribute on - Added `~psycopg2.extensions.Diagnostics.severity_nonlocalized` attribute on
the `~psycopg2.extensions.Diagnostics` object (:ticket:`#783`). the `~psycopg2.extensions.Diagnostics` object (:ticket:`#783`).
- More efficient `~psycopg2.extras.NamedTupleCursor` (:ticket:`#838`). - More efficient `~psycopg2.extras.NamedTupleCursor` (:ticket:`#838`).
- Async communication improved to fix blocking on results returned at
different times (:ticket:`#856`).
Other changes: Other changes:

View File

@ -108,6 +108,7 @@ struct connectionObject {
* for a green connection. If NULL, the connection is idle. */ * for a green connection. If NULL, the connection is idle. */
PyObject *async_cursor; PyObject *async_cursor;
int async_status; /* asynchronous execution status */ int async_status; /* asynchronous execution status */
PGresult *pgres; /* temporary result across async calls */
/* notice processing */ /* notice processing */
PyObject *notice_list; PyObject *notice_list;

View File

@ -896,7 +896,9 @@ _conn_poll_advance_write(connectionObject *self)
return res; return res;
} }
/* Advance to the next state after a call to a pq_is_busy* function */
/* Advance to the next state after reading results */
static int static int
_conn_poll_advance_read(connectionObject *self) _conn_poll_advance_read(connectionObject *self)
{ {
@ -905,13 +907,13 @@ _conn_poll_advance_read(connectionObject *self)
Dprintf("conn_poll: poll reading"); Dprintf("conn_poll: poll reading");
busy = pq_is_busy(self); busy = pq_get_result_async(self);
switch (busy) { switch (busy) {
case 0: /* result is ready */ case 0: /* result is ready */
res = PSYCO_POLL_OK;
Dprintf("conn_poll: async_status -> ASYNC_DONE"); Dprintf("conn_poll: async_status -> ASYNC_DONE");
self->async_status = ASYNC_DONE; self->async_status = ASYNC_DONE;
res = PSYCO_POLL_OK;
break; break;
case 1: /* result not ready: fd would block */ case 1: /* result not ready: fd would block */
res = PSYCO_POLL_READ; res = PSYCO_POLL_READ;
@ -920,13 +922,15 @@ _conn_poll_advance_read(connectionObject *self)
res = PSYCO_POLL_ERROR; res = PSYCO_POLL_ERROR;
break; break;
default: default:
Dprintf("conn_poll: unexpected result from pq_is_busy: %d", busy); Dprintf("conn_poll: unexpected result from pq_get_result_async: %d",
busy);
res = PSYCO_POLL_ERROR; res = PSYCO_POLL_ERROR;
break; break;
} }
return res; return res;
} }
/* Poll the connection for the send query/retrieve result phase /* Poll the connection for the send query/retrieve result phase
Advance the async_status (usually going WRITE -> READ -> DONE) but don't Advance the async_status (usually going WRITE -> READ -> DONE) but don't
@ -974,7 +978,6 @@ static int
_conn_poll_setup_async(connectionObject *self) _conn_poll_setup_async(connectionObject *self)
{ {
int res = PSYCO_POLL_ERROR; int res = PSYCO_POLL_ERROR;
PGresult *pgres;
switch (self->status) { switch (self->status) {
case CONN_STATUS_CONNECTING: case CONN_STATUS_CONNECTING:
@ -1025,12 +1028,12 @@ _conn_poll_setup_async(connectionObject *self)
res = _conn_poll_query(self); res = _conn_poll_query(self);
if (res == PSYCO_POLL_OK) { if (res == PSYCO_POLL_OK) {
res = PSYCO_POLL_ERROR; res = PSYCO_POLL_ERROR;
pgres = pq_get_last_result(self); if (self->pgres == NULL
if (pgres == NULL || PQresultStatus(pgres) != PGRES_COMMAND_OK ) { || PQresultStatus(self->pgres) != PGRES_COMMAND_OK ) {
PyErr_SetString(OperationalError, "can't set datestyle to ISO"); PyErr_SetString(OperationalError, "can't set datestyle to ISO");
break; break;
} }
CLEARPGRES(pgres); CLEARPGRES(self->pgres);
Dprintf("conn_poll: status -> CONN_STATUS_READY"); Dprintf("conn_poll: status -> CONN_STATUS_READY");
self->status = CONN_STATUS_READY; self->status = CONN_STATUS_READY;
@ -1119,8 +1122,9 @@ conn_poll(connectionObject *self)
break; break;
} }
CLEARPGRES(curs->pgres); PQclear(curs->pgres);
curs->pgres = pq_get_last_result(self); curs->pgres = self->pgres;
self->pgres = NULL;
/* fetch the tuples (if there are any) and build the result. We /* fetch the tuples (if there are any) and build the result. We
* don't care if pq_fetch return 0 or 1, but if there was an error, * don't care if pq_fetch return 0 or 1, but if there was an error,

View File

@ -52,8 +52,6 @@ psyco_curs_close(cursorObject *self, PyObject *dummy)
PyObject *rv = NULL; PyObject *rv = NULL;
char *lname = NULL; char *lname = NULL;
EXC_IF_ASYNC_IN_PROGRESS(self, close);
if (self->closed) { if (self->closed) {
rv = Py_None; rv = Py_None;
Py_INCREF(rv); Py_INCREF(rv);
@ -64,6 +62,8 @@ psyco_curs_close(cursorObject *self, PyObject *dummy)
char buffer[256]; char buffer[256];
PGTransactionStatusType status; PGTransactionStatusType status;
EXC_IF_ASYNC_IN_PROGRESS(self, close_named);
if (self->conn) { if (self->conn) {
status = PQtransactionStatus(self->conn->pgconn); status = PQtransactionStatus(self->conn->pgconn);
} }

View File

@ -177,10 +177,12 @@ psyco_exec_green(connectionObject *conn, const char *command)
goto end; goto end;
} }
/* Now we can read the data without fear of blocking. */ /* the result is now in the connection: take its ownership */
result = pq_get_last_result(conn); result = conn->pgres;
conn->pgres = NULL;
end: end:
CLEARPGRES(conn->pgres);
conn->async_status = ASYNC_DONE; conn->async_status = ASYNC_DONE;
Py_CLEAR(conn->async_cursor); Py_CLEAR(conn->async_cursor);
return result; return result;

View File

@ -764,22 +764,22 @@ exit:
} }
/* pq_is_busy - consume input and return connection status /* pq_get_result_async - read an available result without blocking.
*
a status of 1 means that a call to pq_fetch will block, while a status of 0 * Return 0 if the result is ready, 1 if it will block, -1 on error.
means that there is data available to be collected. -1 means an error, the * The last result will be returned in pgres.
exception will be set accordingly. *
* The function should be called with the lock and holding the GIL. * The function should be called with the lock and holding the GIL.
*/ */
int RAISES_NEG int
pq_is_busy(connectionObject *conn) pq_get_result_async(connectionObject *conn)
{ {
Dprintf("pq_is_busy: calling PQconsumeInput()"); int rv = -1;
Dprintf("pq_get_result_async: calling PQconsumeInput()");
if (PQconsumeInput(conn->pgconn) == 0) { if (PQconsumeInput(conn->pgconn) == 0) {
Dprintf("pq_is_busy: PQconsumeInput() failed"); Dprintf("pq_get_result_async: PQconsumeInput() failed");
/* if the libpq says pgconn is lost, close the py conn */ /* if the libpq says pgconn is lost, close the py conn */
if (CONNECTION_BAD == PQstatus(conn->pgconn)) { if (CONNECTION_BAD == PQstatus(conn->pgconn)) {
@ -787,13 +787,69 @@ pq_is_busy(connectionObject *conn)
} }
PyErr_SetString(OperationalError, PQerrorMessage(conn->pgconn)); PyErr_SetString(OperationalError, PQerrorMessage(conn->pgconn));
return -1; goto exit;
} }
conn_notifies_process(conn); conn_notifies_process(conn);
conn_notice_process(conn); conn_notice_process(conn);
return PQisBusy(conn->pgconn); for (;;) {
int busy;
PGresult *res;
ExecStatusType status;
Dprintf("pq_get_result_async: calling PQisBusy()");
busy = PQisBusy(conn->pgconn);
if (busy) {
/* try later */
Dprintf("pq_get_result_async: PQisBusy() = 1");
rv = 1;
goto exit;
}
if (!(res = PQgetResult(conn->pgconn))) {
Dprintf("pq_get_result_async: got no result");
/* the result is ready: it was the previously read one */
rv = 0;
goto exit;
}
status = PQresultStatus(res);
Dprintf("pq_get_result_async: got result %s", PQresStatus(status));
/* Store the result outside because we want to return the last non-null
* one and we may have to do it across poll calls. However if there is
* an error in the stream of results we want to handle the *first*
* error. So don't clobber it with the following ones. */
if (conn->pgres && PQresultStatus(conn->pgres) == PGRES_FATAL_ERROR) {
Dprintf("previous pgres is error: discarding");
PQclear(res);
}
else {
PQclear(conn->pgres);
conn->pgres = res;
}
switch (status) {
case PGRES_COPY_OUT:
case PGRES_COPY_IN:
case PGRES_COPY_BOTH:
/* After entering copy mode, libpq will make a phony
* PGresult for us every time we query for it, so we need to
* break out of this endless loop. */
rv = 0;
goto exit;
default:
/* keep on reading to check if there are other results or
* we have finished. */
continue;
}
}
exit:
return rv;
} }
/* pq_flush - flush output and return connection status /* pq_flush - flush output and return connection status
@ -995,6 +1051,7 @@ pq_send_query(connectionObject *conn, const char *query)
Dprintf("pq_send_query: sending ASYNC query:"); Dprintf("pq_send_query: sending ASYNC query:");
Dprintf(" %-.200s", query); Dprintf(" %-.200s", query);
CLEARPGRES(conn->pgres);
if (0 == (rv = PQsendQuery(conn->pgconn, query))) { if (0 == (rv = PQsendQuery(conn->pgconn, query))) {
Dprintf("pq_send_query: error: %s", PQerrorMessage(conn->pgconn)); Dprintf("pq_send_query: error: %s", PQerrorMessage(conn->pgconn));
} }
@ -1002,45 +1059,6 @@ pq_send_query(connectionObject *conn, const char *query)
return rv; return rv;
} }
/* Return the last result available on the connection.
*
* The function will block only if a command is active and the
* necessary response data has not yet been read by PQconsumeInput.
*
* The result should be disposed of using PQclear()
*/
PGresult *
pq_get_last_result(connectionObject *conn)
{
PGresult *result = NULL, *res;
ExecStatusType status;
/* Read until PQgetResult gives a NULL */
while (NULL != (res = PQgetResult(conn->pgconn))) {
if (result) {
/* TODO too bad: we are discarding results from all the queries
* except the last. We could have populated `nextset()` with it
* but it would be an incompatible change (apps currently issue
* groups of queries expecting to receive the last result: they
* would start receiving the first instead). */
PQclear(result);
}
result = res;
status = PQresultStatus(result);
Dprintf("pq_get_last_result: got result %s", PQresStatus(status));
/* After entering copy mode, libpq will make a phony
* PGresult for us every time we query for it, so we need to
* break out of this endless loop. */
if (status == PGRES_COPY_BOTH
|| status == PGRES_COPY_OUT
|| status == PGRES_COPY_IN) {
break;
}
}
return result;
}
/* pq_fetch - fetch data after a query /* pq_fetch - fetch data after a query

View File

@ -35,7 +35,6 @@
#define CLEARPGRES(pgres) do { PQclear(pgres); pgres = NULL; } while (0) #define CLEARPGRES(pgres) do { PQclear(pgres); pgres = NULL; } while (0)
/* exported functions */ /* exported functions */
HIDDEN PGresult *pq_get_last_result(connectionObject *conn);
RAISES_NEG HIDDEN int pq_fetch(cursorObject *curs, int no_result); RAISES_NEG HIDDEN int pq_fetch(cursorObject *curs, int no_result);
RAISES_NEG HIDDEN int pq_execute(cursorObject *curs, const char *query, RAISES_NEG HIDDEN int pq_execute(cursorObject *curs, const char *query,
int async, int no_result, int no_begin); int async, int no_result, int no_begin);
@ -59,7 +58,7 @@ HIDDEN int pq_tpc_command_locked(connectionObject *conn,
const char *cmd, const char *tid, const char *cmd, const char *tid,
PGresult **pgres, char **error, PGresult **pgres, char **error,
PyThreadState **tstate); PyThreadState **tstate);
HIDDEN int pq_is_busy(connectionObject *conn); RAISES_NEG HIDDEN int pq_get_result_async(connectionObject *conn);
HIDDEN int pq_flush(connectionObject *conn); HIDDEN int pq_flush(connectionObject *conn);
HIDDEN void pq_clear_async(connectionObject *conn); HIDDEN void pq_clear_async(connectionObject *conn);
RAISES_NEG HIDDEN int pq_set_non_blocking(connectionObject *conn, int arg); RAISES_NEG HIDDEN int pq_set_non_blocking(connectionObject *conn, int arg);

View File

@ -29,6 +29,7 @@ import unittest
import warnings import warnings
import psycopg2 import psycopg2
import psycopg2.errors
from psycopg2 import extensions as ext from psycopg2 import extensions as ext
from .testutils import ConnectingTestCase, StringIO, skip_before_postgres, slow from .testutils import ConnectingTestCase, StringIO, skip_before_postgres, slow
@ -405,6 +406,15 @@ class AsyncTests(ConnectingTestCase):
cur.execute("delete from table1") cur.execute("delete from table1")
self.wait(cur) self.wait(cur)
def test_stop_on_first_error(self):
cur = self.conn.cursor()
cur.execute("select 1; select x; select 1/0; select 2")
self.assertRaises(psycopg2.errors.UndefinedColumn, self.wait, cur)
cur.execute("select 1")
self.wait(cur)
self.assertEqual(cur.fetchone(), (1,))
def test_error_two_cursors(self): def test_error_two_cursors(self):
cur = self.conn.cursor() cur = self.conn.cursor()
cur2 = self.conn.cursor() cur2 = self.conn.cursor()
@ -454,6 +464,37 @@ class AsyncTests(ConnectingTestCase):
cur.execute("copy (select 1) to stdout") cur.execute("copy (select 1) to stdout")
self.assertRaises(psycopg2.ProgrammingError, self.wait, self.conn) self.assertRaises(psycopg2.ProgrammingError, self.wait, self.conn)
@slow
@skip_before_postgres(9, 0)
def test_non_block_after_notification(self):
from select import select
cur = self.conn.cursor()
cur.execute("""
select 1;
do $$
begin
raise notice 'hello';
end
$$ language plpgsql;
select pg_sleep(1);
""")
polls = 0
while True:
state = self.conn.poll()
if state == psycopg2.extensions.POLL_OK:
break
elif state == psycopg2.extensions.POLL_READ:
select([self.conn], [], [], 0.1)
elif state == psycopg2.extensions.POLL_WRITE:
select([], [self.conn], [], 0.1)
else:
raise Exception("Unexpected result from poll: %r", state)
polls += 1
self.assert_(polls >= 8, polls)
def test_suite(): def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__) return unittest.TestLoader().loadTestsFromName(__name__)

View File

@ -25,6 +25,7 @@
import select import select
import unittest import unittest
import warnings import warnings
import psycopg2 import psycopg2
import psycopg2.extensions import psycopg2.extensions
import psycopg2.extras import psycopg2.extras
@ -58,10 +59,10 @@ class GreenTestCase(ConnectingTestCase):
ConnectingTestCase.tearDown(self) ConnectingTestCase.tearDown(self)
psycopg2.extensions.set_wait_callback(self._cb) psycopg2.extensions.set_wait_callback(self._cb)
def set_stub_wait_callback(self, conn): def set_stub_wait_callback(self, conn, cb=None):
stub = ConnectionStub(conn) stub = ConnectionStub(conn)
psycopg2.extensions.set_wait_callback( psycopg2.extensions.set_wait_callback(
lambda conn: psycopg2.extras.wait_select(stub)) lambda conn: (cb or psycopg2.extras.wait_select)(stub))
return stub return stub
@slow @slow
@ -119,6 +120,36 @@ class GreenTestCase(ConnectingTestCase):
self.assertRaises(psycopg2.ProgrammingError, self.assertRaises(psycopg2.ProgrammingError,
cur.execute, "copy (select 1) to stdout") cur.execute, "copy (select 1) to stdout")
@slow
@skip_before_postgres(9, 0)
def test_non_block_after_notification(self):
def wait(conn):
while 1:
state = conn.poll()
if state == POLL_OK:
break
elif state == POLL_READ:
select.select([conn.fileno()], [], [], 0.1)
elif state == POLL_WRITE:
select.select([], [conn.fileno()], [], 0.1)
else:
raise conn.OperationalError("bad state from poll: %s" % state)
stub = self.set_stub_wait_callback(self.conn, wait)
cur = self.conn.cursor()
cur.execute("""
select 1;
do $$
begin
raise notice 'hello';
end
$$ language plpgsql;
select pg_sleep(1);
""")
polls = stub.polls.count(POLL_READ)
self.assert_(polls > 8, polls)
class CallbackErrorTestCase(ConnectingTestCase): class CallbackErrorTestCase(ConnectingTestCase):
def setUp(self): def setUp(self):