Fix use of PQconsumeInput() in pq_read_replication_message()

The libpq's PQconsumeInput() returns 0 in case of an error only, but
we need to know if it was able to actually read something.  Work
around this by setting an internal flag before retry.
This commit is contained in:
Oleksandr Shulgin 2015-07-07 19:04:32 +02:00
parent eac16d048a
commit 26fe1f230f

View File

@ -1534,7 +1534,7 @@ PyObject *
pq_read_replication_message(cursorObject *curs, int decode) pq_read_replication_message(cursorObject *curs, int decode)
{ {
char *buffer = NULL; char *buffer = NULL;
int len, hdr, reply; int len, consumed = 0, hdr, reply;
XLogRecPtr data_start, wal_end; XLogRecPtr data_start, wal_end;
pg_int64 send_time; pg_int64 send_time;
PyObject *str = NULL, *msg = NULL; PyObject *str = NULL, *msg = NULL;
@ -1542,20 +1542,29 @@ pq_read_replication_message(cursorObject *curs, int decode)
Dprintf("pq_read_replication_message(decode=%d)", decode); Dprintf("pq_read_replication_message(decode=%d)", decode);
retry: retry:
Py_BEGIN_ALLOW_THREADS;
len = PQgetCopyData(curs->conn->pgconn, &buffer, 1 /* async */); len = PQgetCopyData(curs->conn->pgconn, &buffer, 1 /* async */);
Py_END_ALLOW_THREADS;
if (len == 0) { if (len == 0) {
/* We should only try reading more data into the internal buffer when /* If we've tried reading some data, but there was none, bail out. */
* there is nothing available at the moment. Otherwise, with a really if (consumed) {
* highly loaded server we might be reading a number of messages for goto none;
* every single one we process, thus overgrowing the internal buffer
* until the system runs out of memory. */
if (PQconsumeInput(curs->conn->pgconn)) {
goto retry;
} }
goto none; /* We should only try reading more data when there is nothing
available at the moment. Otherwise, with a really highly loaded
server we might be reading a number of messages for every single
one we process, thus overgrowing the internal buffer until the
client system runs out of memory. */
if (!PQconsumeInput(curs->conn->pgconn)) {
pq_raise(curs->conn, curs, NULL);
goto exit;
}
/* But PQconsumeInput() doesn't tell us if it has actually read
anything into the internal buffer and there is no (supported) way
to ask libpq about this directly. The way we check is setting the
flag and re-trying PQgetCopyData(): if that returns 0 again,
there's no more data available in the buffer, so we return None. */
consumed = 1;
goto retry;
} }
if (len == -2) { if (len == -2) {
@ -1574,6 +1583,11 @@ retry:
goto none; goto none;
} }
/* It also makes sense to set this flag here to make us return early in
case of retry due to keepalive message. Any pending data on the socket
will trigger read condition in select() in the calling code anyway. */
consumed = 1;
/* ok, we did really read something: update the io timestamp */ /* ok, we did really read something: update the io timestamp */
gettimeofday(&curs->repl_last_io, NULL); gettimeofday(&curs->repl_last_io, NULL);