diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index e550d796..edfdcd3a 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -1534,7 +1534,7 @@ PyObject * pq_read_replication_message(cursorObject *curs, int decode) { char *buffer = NULL; - int len, hdr, reply; + int len, consumed = 0, hdr, reply; XLogRecPtr data_start, wal_end; pg_int64 send_time; PyObject *str = NULL, *msg = NULL; @@ -1542,20 +1542,29 @@ pq_read_replication_message(cursorObject *curs, int decode) Dprintf("pq_read_replication_message(decode=%d)", decode); retry: - Py_BEGIN_ALLOW_THREADS; len = PQgetCopyData(curs->conn->pgconn, &buffer, 1 /* async */); - Py_END_ALLOW_THREADS; if (len == 0) { - /* We should only try reading more data into the internal buffer when - * there is nothing available at the moment. Otherwise, with a really - * highly loaded server we might be reading a number of messages for - * every single one we process, thus overgrowing the internal buffer - * until the system runs out of memory. */ - if (PQconsumeInput(curs->conn->pgconn)) { - goto retry; + /* If we've tried reading some data, but there was none, bail out. */ + if (consumed) { + goto none; } - goto none; + /* We should only try reading more data when there is nothing + available at the moment. Otherwise, with a really highly loaded + server we might be reading a number of messages for every single + one we process, thus overgrowing the internal buffer until the + client system runs out of memory. */ + if (!PQconsumeInput(curs->conn->pgconn)) { + pq_raise(curs->conn, curs, NULL); + goto exit; + } + /* But PQconsumeInput() doesn't tell us if it has actually read + anything into the internal buffer and there is no (supported) way + to ask libpq about this directly. The way we check is setting the + flag and re-trying PQgetCopyData(): if that returns 0 again, + there's no more data available in the buffer, so we return None. */ + consumed = 1; + goto retry; } if (len == -2) { @@ -1574,6 +1583,11 @@ retry: goto none; } + /* It also makes sense to set this flag here to make us return early in + case of retry due to keepalive message. Any pending data on the socket + will trigger read condition in select() in the calling code anyway. */ + consumed = 1; + /* ok, we did really read something: update the io timestamp */ gettimeofday(&curs->repl_last_io, NULL);