Store WAL end pointer in the replication cursor

This commit is contained in:
Martins Grunskis 2018-10-29 09:50:31 +01:00 committed by grunskis-bonial
parent 1b07d2e34d
commit 0b54551c27
5 changed files with 29 additions and 2 deletions

View File

@ -481,6 +481,12 @@ The individual messages in the replication stream are represented by
communication with the server (a data or keepalive message in either
direction).
.. attribute:: wal_end
LSN position of the current end of WAL on the server at the
moment of last data or keepalive message received from the
server.
An actual example of asynchronous operation might look like this::
from select import select
@ -500,7 +506,7 @@ The individual messages in the replication stream are represented by
try:
sel = select([cur], [], [], max(0, timeout))
if not any(sel):
cur.send_feedback() # timed out, send keepalive message
cur.send_feedback(flush_lsn=cur.wal_end) # timed out, send keepalive message
except InterruptedError:
pass # recalculate timeout and continue

View File

@ -1721,6 +1721,10 @@ retry:
goto exit;
}
wal_end = fe_recvint64(buffer + 1);
Dprintf("pq_read_replication_message: wal_end="XLOGFMTSTR, XLOGFMTARGS(wal_end));
repl->wal_end = wal_end;
reply = buffer[hdr];
if (reply && pq_send_replication_feedback(repl, 0) < 0) {
goto exit;
@ -1735,6 +1739,7 @@ retry:
goto exit;
}
repl->wal_end = wal_end;
ret = 0;
exit:

View File

@ -44,6 +44,8 @@ typedef struct replicationCursorObject {
struct timeval last_io; /* timestamp of the last exchange with the server */
struct timeval keepalive_interval; /* interval for keepalive messages in replication mode */
XLogRecPtr wal_end; /* WAL end pointer from the last exchange with the server */
XLogRecPtr write_lsn; /* LSNs for replication feedback messages */
XLogRecPtr flush_lsn;
XLogRecPtr apply_lsn;

View File

@ -231,6 +231,17 @@ psyco_repl_curs_get_io_timestamp(replicationCursorObject *self)
return res;
}
/* object member list */
#define OFFSETOF(x) offsetof(replicationCursorObject, x)
static struct PyMemberDef replicationCursorObject_members[] = {
{"wal_end", T_ULONGLONG, OFFSETOF(wal_end), READONLY,
"LSN position of the current end of WAL on the server."},
{NULL}
};
/* object method list */
static struct PyMethodDef replicationCursorObject_methods[] = {
@ -262,6 +273,8 @@ replicationCursor_init(PyObject *obj, PyObject *args, PyObject *kwargs)
self->consuming = 0;
self->decode = 0;
self->wal_end = 0;
self->write_lsn = 0;
self->flush_lsn = 0;
self->apply_lsn = 0;
@ -311,7 +324,7 @@ PyTypeObject replicationCursorType = {
0, /*tp_iter*/
0, /*tp_iternext*/
replicationCursorObject_methods, /*tp_methods*/
0, /*tp_members*/
replicationCursorObject_members, /*tp_members*/
replicationCursorObject_getsets, /*tp_getset*/
&cursorType, /*tp_base*/
0, /*tp_dict*/

View File

@ -239,6 +239,7 @@ class AsyncReplicationTest(ReplicationTestCase):
def consume(msg):
# just check the methods
"%s: %s" % (cur.io_timestamp, repr(msg))
"%s: %s" % (cur.wal_end, repr(msg))
self.msg_count += 1
if self.msg_count > 3: