diff --git a/doc/src/extras.rst b/doc/src/extras.rst index e2036600..8c0de15d 100644 --- a/doc/src/extras.rst +++ b/doc/src/extras.rst @@ -506,10 +506,23 @@ The individual messages in the replication stream are represented by try: sel = select([cur], [], [], max(0, timeout)) if not any(sel): - cur.send_feedback(flush_lsn=cur.wal_end) # timed out, send keepalive message + cur.send_feedback() # timed out, send keepalive message except InterruptedError: pass # recalculate timeout and continue + .. warning:: + + The ``consume({msg})`` function will only be called when + there are new database writes on the server e.g. any DML or + DDL statement. Depending on your Postgres cluster + configuration this might cause the server to run out of disk + space if the writes are far apart. To prevent this from + happening you can use `~ReplicationCursor.wal_end` value to + periodically send feedback to the server to notify that your + replication client has received and processed all the + messages. + + .. index:: pair: Cursor; Replication diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index 558d93c7..26667eb3 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -1550,6 +1550,8 @@ retry: (*msg)->data_start = data_start; (*msg)->wal_end = wal_end; (*msg)->send_time = send_time; + + repl->wal_end = wal_end; } else if (buffer[0] == 'k') { /* Primary keepalive message: msgtype(1), walEnd(8), sendTime(8), reply(1) */ @@ -1577,7 +1579,6 @@ retry: goto exit; } - repl->wal_end = wal_end; ret = 0; exit: diff --git a/psycopg/replication_cursor.h b/psycopg/replication_cursor.h index e896426e..db549c86 100644 --- a/psycopg/replication_cursor.h +++ b/psycopg/replication_cursor.h @@ -44,11 +44,11 @@ typedef struct replicationCursorObject { struct timeval last_io; /* timestamp of the last exchange with the server */ struct timeval keepalive_interval; /* interval for keepalive messages in replication mode */ - XLogRecPtr wal_end; /* WAL end pointer from the last exchange with the server */ - XLogRecPtr write_lsn; /* LSNs for replication feedback messages */ XLogRecPtr flush_lsn; XLogRecPtr apply_lsn; + + XLogRecPtr wal_end; /* WAL end pointer from the last exchange with the server */ } replicationCursorObject;