Merge branch 'keepalive-save-wal-end'

Close #800
This commit is contained in:
Daniele Varrazzo 2019-03-30 21:29:39 +00:00
commit b76ff2fc33
6 changed files with 46 additions and 1 deletions

2
NEWS
View File

@ -22,6 +22,8 @@ New features:
C structures to Python and interact with libpq via ctypes (:ticket:`#782`). C structures to Python and interact with libpq via ctypes (:ticket:`#782`).
- `~psycopg2.sql.Identifier` can represent qualified names in SQL composition - `~psycopg2.sql.Identifier` can represent qualified names in SQL composition
(:ticket:`#732`). (:ticket:`#732`).
- Added `!ReplicationMessage`.\ `~psycopg2.extras.ReplicationMessage.wal_end`
attribute (:ticket:`#800`).
- Added *fetch* parameter to `~psycopg2.extras.execute_values()` function - Added *fetch* parameter to `~psycopg2.extras.execute_values()` function
(:ticket:`#813`). (:ticket:`#813`).
- `!str()` on `~psycopg2.extras.Range` produces a human-readable representation - `!str()` on `~psycopg2.extras.Range` produces a human-readable representation

View File

@ -208,6 +208,8 @@ The individual messages in the replication stream are represented by
LSN position of the current end of WAL on the server. LSN position of the current end of WAL on the server.
.. versionadded:: 2.8
.. attribute:: send_time .. attribute:: send_time
A `~datetime` object representing the server timestamp at the moment A `~datetime` object representing the server timestamp at the moment
@ -481,6 +483,12 @@ The individual messages in the replication stream are represented by
communication with the server (a data or keepalive message in either communication with the server (a data or keepalive message in either
direction). direction).
.. attribute:: wal_end
LSN position of the current end of WAL on the server at the
moment of last data or keepalive message received from the
server.
An actual example of asynchronous operation might look like this:: An actual example of asynchronous operation might look like this::
from select import select from select import select
@ -504,6 +512,19 @@ The individual messages in the replication stream are represented by
except InterruptedError: except InterruptedError:
pass # recalculate timeout and continue pass # recalculate timeout and continue
.. warning::
The ``consume({msg})`` function will only be called when
there are new database writes on the server e.g. any DML or
DDL statement. Depending on your Postgres cluster
configuration this might cause the server to run out of disk
space if the writes are far apart. To prevent this from
happening you can use `~ReplicationCursor.wal_end` value to
periodically send feedback to the server to notify that your
replication client has received and processed all the
messages.
.. index:: .. index::
pair: Cursor; Replication pair: Cursor; Replication

View File

@ -1550,6 +1550,8 @@ retry:
(*msg)->data_start = data_start; (*msg)->data_start = data_start;
(*msg)->wal_end = wal_end; (*msg)->wal_end = wal_end;
(*msg)->send_time = send_time; (*msg)->send_time = send_time;
repl->wal_end = wal_end;
} }
else if (buffer[0] == 'k') { else if (buffer[0] == 'k') {
/* Primary keepalive message: msgtype(1), walEnd(8), sendTime(8), reply(1) */ /* Primary keepalive message: msgtype(1), walEnd(8), sendTime(8), reply(1) */
@ -1559,6 +1561,10 @@ retry:
goto exit; goto exit;
} }
wal_end = fe_recvint64(buffer + 1);
Dprintf("pq_read_replication_message: wal_end="XLOGFMTSTR, XLOGFMTARGS(wal_end));
repl->wal_end = wal_end;
reply = buffer[hdr]; reply = buffer[hdr];
if (reply && pq_send_replication_feedback(repl, 0) < 0) { if (reply && pq_send_replication_feedback(repl, 0) < 0) {
goto exit; goto exit;

View File

@ -47,6 +47,8 @@ typedef struct replicationCursorObject {
XLogRecPtr write_lsn; /* LSNs for replication feedback messages */ XLogRecPtr write_lsn; /* LSNs for replication feedback messages */
XLogRecPtr flush_lsn; XLogRecPtr flush_lsn;
XLogRecPtr apply_lsn; XLogRecPtr apply_lsn;
XLogRecPtr wal_end; /* WAL end pointer from the last exchange with the server */
} replicationCursorObject; } replicationCursorObject;

View File

@ -228,6 +228,17 @@ repl_curs_get_io_timestamp(replicationCursorObject *self)
return res; return res;
} }
/* object member list */
#define OFFSETOF(x) offsetof(replicationCursorObject, x)
static struct PyMemberDef replicationCursorObject_members[] = {
{"wal_end", T_ULONGLONG, OFFSETOF(wal_end), READONLY,
"LSN position of the current end of WAL on the server."},
{NULL}
};
/* object method list */ /* object method list */
static struct PyMethodDef replicationCursorObject_methods[] = { static struct PyMethodDef replicationCursorObject_methods[] = {
@ -259,6 +270,8 @@ replicationCursor_init(PyObject *obj, PyObject *args, PyObject *kwargs)
self->consuming = 0; self->consuming = 0;
self->decode = 0; self->decode = 0;
self->wal_end = 0;
self->write_lsn = 0; self->write_lsn = 0;
self->flush_lsn = 0; self->flush_lsn = 0;
self->apply_lsn = 0; self->apply_lsn = 0;
@ -308,7 +321,7 @@ PyTypeObject replicationCursorType = {
0, /*tp_iter*/ 0, /*tp_iter*/
0, /*tp_iternext*/ 0, /*tp_iternext*/
replicationCursorObject_methods, /*tp_methods*/ replicationCursorObject_methods, /*tp_methods*/
0, /*tp_members*/ replicationCursorObject_members, /*tp_members*/
replicationCursorObject_getsets, /*tp_getset*/ replicationCursorObject_getsets, /*tp_getset*/
&cursorType, /*tp_base*/ &cursorType, /*tp_base*/
0, /*tp_dict*/ 0, /*tp_dict*/

View File

@ -242,6 +242,7 @@ class AsyncReplicationTest(ReplicationTestCase):
def consume(msg): def consume(msg):
# just check the methods # just check the methods
"%s: %s" % (cur.io_timestamp, repr(msg)) "%s: %s" % (cur.io_timestamp, repr(msg))
"%s: %s" % (cur.wal_end, repr(msg))
self.msg_count += 1 self.msg_count += 1
if self.msg_count > 3: if self.msg_count > 3: