diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index b21f3977..06730562 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -1576,7 +1576,9 @@ retry: /* We can safely forward flush_lsn to the wal_end from the server keepalive message * if we know that the client already processed (confirmed) the last XLogData message */ - if (repl->flush_lsn >= repl->last_msg_data_start && wal_end > repl->flush_lsn) { + if (repl->explicitly_flushed_lsn >= repl->last_msg_data_start + && wal_end > repl->explicitly_flushed_lsn + && wal_end > repl->flush_lsn) { repl->flush_lsn = wal_end; } diff --git a/psycopg/replication_cursor.h b/psycopg/replication_cursor.h index 1b92b09d..ba066b52 100644 --- a/psycopg/replication_cursor.h +++ b/psycopg/replication_cursor.h @@ -52,6 +52,7 @@ typedef struct replicationCursorObject { XLogRecPtr last_msg_data_start; /* WAL pointer to the last non-keepalive message from the server */ struct timeval last_feedback; /* timestamp of the last feedback message to the server */ + XLogRecPtr explicitly_flushed_lsn; /* the flush LSN explicitly set by the send_feedback call */ } replicationCursorObject; diff --git a/psycopg/replication_cursor_type.c b/psycopg/replication_cursor_type.c index 5fdeaf03..c1dbd431 100644 --- a/psycopg/replication_cursor_type.c +++ b/psycopg/replication_cursor_type.c @@ -211,6 +211,9 @@ send_feedback(replicationCursorObject *self, if (write_lsn > self->write_lsn) self->write_lsn = write_lsn; + if (flush_lsn > self->explicitly_flushed_lsn) + self->explicitly_flushed_lsn = flush_lsn; + if (flush_lsn > self->flush_lsn) self->flush_lsn = flush_lsn;