diff --git a/NEWS b/NEWS index 02cd4d1f..2bea4c59 100644 --- a/NEWS +++ b/NEWS @@ -6,6 +6,8 @@ What's new in psycopg 2.8.4 - Don't swallow keyboard interrupts on connect when a password is specified in the connection string (:ticket:`#898`). +- Don't advance replication cursor when the message wasn't confirmed + (:ticket:`#940`). - Fixed int overflow for large values in `~psycopg2.extensions.Column.table_oid` and `~psycopg2.extensions.Column.type_code` (:ticket:`961`). - Fixed building with Python 3.8 (:ticket:`854`). diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index 414c2d75..28774cd8 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -1576,7 +1576,9 @@ retry: /* We can safely forward flush_lsn to the wal_end from the server keepalive message * if we know that the client already processed (confirmed) the last XLogData message */ - if (repl->flush_lsn >= repl->last_msg_data_start && wal_end > repl->flush_lsn) { + if (repl->explicitly_flushed_lsn >= repl->last_msg_data_start + && wal_end > repl->explicitly_flushed_lsn + && wal_end > repl->flush_lsn) { repl->flush_lsn = wal_end; } diff --git a/psycopg/replication_cursor.h b/psycopg/replication_cursor.h index 1b92b09d..ba066b52 100644 --- a/psycopg/replication_cursor.h +++ b/psycopg/replication_cursor.h @@ -52,6 +52,7 @@ typedef struct replicationCursorObject { XLogRecPtr last_msg_data_start; /* WAL pointer to the last non-keepalive message from the server */ struct timeval last_feedback; /* timestamp of the last feedback message to the server */ + XLogRecPtr explicitly_flushed_lsn; /* the flush LSN explicitly set by the send_feedback call */ } replicationCursorObject; diff --git a/psycopg/replication_cursor_type.c b/psycopg/replication_cursor_type.c index 5fdeaf03..c1dbd431 100644 --- a/psycopg/replication_cursor_type.c +++ b/psycopg/replication_cursor_type.c @@ -211,6 +211,9 @@ send_feedback(replicationCursorObject *self, if (write_lsn > self->write_lsn) self->write_lsn = write_lsn; + if (flush_lsn > self->explicitly_flushed_lsn) + self->explicitly_flushed_lsn = flush_lsn; + if (flush_lsn > self->flush_lsn) self->flush_lsn = flush_lsn;