From 6cff5a3e089f155519765f9cdd49ef78b23ab740 Mon Sep 17 00:00:00 2001 From: Alexander Kukushkin Date: Mon, 6 May 2019 10:27:44 +0200 Subject: [PATCH 1/5] Smart replication feedback This commit makes psycopg2 responsible for sending the status update (feedback) messages to the server regardless of whether a synchronous or asynchronous connection is used. Feedback is sent every *status_update* (default value is 10) seconds, which could be configured by passing a corresponding parameter to the `start_replication()` or `start_replication_expert()` methods. The actual feedback message is sent by the `pq_read_replication_message()` when the *status_update* timeout is reached. The default behavior of the `send_feedback()` method is changed. It doesn't send a feedback message on every call anymore but just updates internal structures. There is still a way to *force* sending a message if *force* or *reply* parameters are set. The new approach has certain advantages: 1. The client can simply call the `send_feedback()` for every processed message and the library will take care of not overwhelming the server. Actually, in the synchronous mode it is even mandatory to confirm every processed message. 2. The library tracks internally the pointer of the last received message which is not keepalive. If the client confirmed the last message and after that server sends only keepalives with increasing *wal_end*, the library can safely move forward *flush* position to the *wal_end* and later automatically report it to the server. Reporting of the *wal_end* received from keepalive messages is very important. Not doing so casing: 1. Excessive disk usage, because the replication slot prevents from WAL being cleaned up. 2. The smart and fast shutdown of the server could last indefinitely because walsender waits until the client report *flush* position equal to the *wal_end*. This implementation is only extending the existing API and therefore should not break any of the existing code. --- doc/src/advanced.rst | 3 +- doc/src/extras.rst | 71 +++++++++++++++---------------- lib/extras.py | 4 +- psycopg/pqpath.c | 58 +++++++++++++------------ psycopg/pqpath.h | 3 +- psycopg/replication_cursor.h | 5 ++- psycopg/replication_cursor_type.c | 60 +++++++++++++++++++++----- psycopg/solaris_support.h | 7 +++ psycopg/win32_support.h | 7 +++ tests/test_async_keyword.py | 1 + tests/test_replication.py | 17 +++++--- 11 files changed, 147 insertions(+), 89 deletions(-) diff --git a/doc/src/advanced.rst b/doc/src/advanced.rst index ff0137d0..72e353c2 100644 --- a/doc/src/advanced.rst +++ b/doc/src/advanced.rst @@ -552,8 +552,7 @@ value greater than zero in ``postgresql.conf`` (these changes require a server restart). Create a database ``psycopg2_test``. Then run the following code to quickly try the replication support out. This -is not production code -- it has no error handling, it sends feedback too -often, etc. -- and it's only intended as a simple demo of logical +is not production code -- it's only intended as a simple demo of logical replication:: from __future__ import print_function diff --git a/doc/src/extras.rst b/doc/src/extras.rst index 5d09039e..b7136fec 100644 --- a/doc/src/extras.rst +++ b/doc/src/extras.rst @@ -270,7 +270,7 @@ The individual messages in the replication stream are represented by Replication slots are a feature of PostgreSQL server starting with version 9.4. - .. method:: start_replication(slot_name=None, slot_type=None, start_lsn=0, timeline=0, options=None, decode=False) + .. method:: start_replication(slot_name=None, slot_type=None, start_lsn=0, timeline=0, options=None, decode=False, status_interval=10) Start replication on the connection. @@ -288,6 +288,7 @@ The individual messages in the replication stream are represented by slot (not allowed with physical replication) :param decode: a flag indicating that unicode conversion should be performed on messages received from the server + :param status_interval: time between feedback packets sent to the server If a *slot_name* is specified, the slot must exist on the server and its type must match the replication type used. @@ -328,6 +329,14 @@ The individual messages in the replication stream are represented by *This parameter should not be set with physical replication or with logical replication plugins that produce binary output.* + Replication stream should periodically send feedback to the database + to prevent disconnect via timeout. Feedback is automatically sent when + `read_message()` is called or during run of the `consume_stream()`. + To specify the feedback interval use *status_interval* parameter. + The value of this parameter must be set to at least 1 second, but + it can have a fractional part. + + This function constructs a |START_REPLICATION|_ command and calls `start_replication_expert()` internally. @@ -339,7 +348,7 @@ The individual messages in the replication stream are represented by .. |START_REPLICATION| replace:: :sql:`START_REPLICATION` .. _START_REPLICATION: https://www.postgresql.org/docs/current/static/protocol-replication.html - .. method:: start_replication_expert(command, decode=False) + .. method:: start_replication_expert(command, decode=False, status_interval=10) Start replication on the connection using provided |START_REPLICATION|_ command. @@ -348,6 +357,7 @@ The individual messages in the replication stream are represented by `~psycopg2.sql.Composable` instance for dynamic generation. :param decode: a flag indicating that unicode conversion should be performed on messages received from the server. + :param status_interval: time between feedback packets sent to the server .. method:: consume_stream(consume, keepalive_interval=10) @@ -373,14 +383,12 @@ The individual messages in the replication stream are represented by `ReplicationMessage` class. See `read_message()` for details about message decoding. - This method also sends keepalive messages to the server in case there - were no new data from the server for the duration of - *keepalive_interval* (in seconds). The value of this parameter must + This method also sends feedback messages to the server every + *keepalive_interval* (in seconds). The value of this parameter must be set to at least 1 second, but it can have a fractional part. - After processing certain amount of messages the client should send a - confirmation message to the server. This should be done by calling - `send_feedback()` method on the corresponding replication cursor. A + The client must confirm every processed message by calling + `send_feedback()` method on the corresponding replication cursor. A reference to the cursor is provided in the `ReplicationMessage` as an attribute. @@ -393,9 +401,7 @@ The individual messages in the replication stream are represented by def __call__(self, msg): self.process_message(msg.payload) - - if self.should_send_feedback(msg): - msg.cursor.send_feedback(flush_lsn=msg.data_start) + msg.cursor.send_feedback(flush_lsn=msg.data_start) consumer = LogicalStreamConsumer() cur.consume_stream(consumer) @@ -408,12 +414,7 @@ The individual messages in the replication stream are represented by retains all the WAL segments that might be needed to stream the changes via all of the currently open replication slots. - On the other hand, it is not recommended to send confirmation - after *every* processed message, since that will put an - unnecessary load on network and the server. A possible strategy - is to confirm after every COMMIT message. - - .. method:: send_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False) + .. method:: send_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False, force=False) :param write_lsn: a LSN position up to which the client has written the data locally :param flush_lsn: a LSN position up to which the client has processed the @@ -423,13 +424,16 @@ The individual messages in the replication stream are represented by has applied the changes (physical replication master-slave protocol only) :param reply: request the server to send back a keepalive message immediately + :param force: force sending a feedback message regardless of status_interval timeout Use this method to report to the server that all messages up to a certain LSN position have been processed on the client and may be discarded on the server. - This method can also be called with all default parameters' values to - just send a keepalive message to the server. + If the *reply* or *force* parameters are not set, this method will + just update internal structures without sending the feedback message + to the server. The library sends feedback message automatically + when *status_interval* timeout is reached. Low-level replication cursor methods for :ref:`asynchronous connection ` operation. @@ -463,9 +467,9 @@ The individual messages in the replication stream are represented by corresponding connection to block the process until there is more data from the server. - The server can send keepalive messages to the client periodically. - Such messages are silently consumed by this method and are never - reported to the caller. + Last, but not least, this method sends feedback messages when + *status_interval* timeout is reached or when keepalive message with + reply request arrived from the server. .. method:: fileno() @@ -481,6 +485,11 @@ The individual messages in the replication stream are represented by communication with the server (a data or keepalive message in either direction). + .. attribute:: feedback_timestamp + + A `~datetime` object representing the timestamp at the moment when + the last feedback message sent to the server. + .. attribute:: wal_end LSN position of the current end of WAL on the server at the @@ -496,33 +505,21 @@ The individual messages in the replication stream are represented by def consume(msg): # ... + msg.cursor.send_feedback(flush_lsn=msg.data_start) - keepalive_interval = 10.0 + status_interval = 10.0 while True: msg = cur.read_message() if msg: consume(msg) else: now = datetime.now() - timeout = keepalive_interval - (now - cur.io_timestamp).total_seconds() + timeout = status_interval - (now - cur.feedback_timestamp).total_seconds() try: sel = select([cur], [], [], max(0, timeout)) - if not any(sel): - cur.send_feedback() # timed out, send keepalive message except InterruptedError: pass # recalculate timeout and continue -.. warning:: - - The :samp:`consume({msg})` function will only be called when there are new - database writes on the server e.g. any DML or DDL statement. Depending on - your Postgres cluster configuration this might cause the server to run out - of disk space if the writes are too far apart. To prevent this from - happening you can use `~ReplicationCursor.wal_end` value to periodically - send feedback to the server to notify that your replication client has - received and processed all the messages. - - .. index:: pair: Cursor; Replication diff --git a/lib/extras.py b/lib/extras.py index f9d3cf6a..3fdaaf0a 100644 --- a/lib/extras.py +++ b/lib/extras.py @@ -561,7 +561,7 @@ class ReplicationCursor(_replicationCursor): self.execute(command) def start_replication(self, slot_name=None, slot_type=None, start_lsn=0, - timeline=0, options=None, decode=False): + timeline=0, options=None, decode=False, status_interval=10): """Start replication stream.""" command = "START_REPLICATION " @@ -615,7 +615,7 @@ class ReplicationCursor(_replicationCursor): command += "%s %s" % (quote_ident(k, self), _A(str(v))) command += ")" - self.start_replication_expert(command, decode=decode) + self.start_replication_expert(command, decode=decode, status_interval=status_interval) # allows replication cursors to be used in select.select() directly def fileno(self): diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index 26667eb3..b21f3977 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -1454,12 +1454,20 @@ pq_read_replication_message(replicationCursorObject *repl, replicationMessageObj int64_t send_time; PyObject *str = NULL, *result = NULL; int ret = -1; + struct timeval curr_time, feedback_time; Dprintf("pq_read_replication_message"); *msg = NULL; consumed = 0; + /* Is it a time to send the next feedback message? */ + gettimeofday(&curr_time, NULL); + timeradd(&repl->last_feedback, &repl->status_interval, &feedback_time); + if (timercmp(&curr_time, &feedback_time, >=) && pq_send_replication_feedback(repl, 0) < 0) { + goto exit; + } + retry: len = PQgetCopyData(pgconn, &buffer, 1 /* async */); @@ -1552,6 +1560,7 @@ retry: (*msg)->send_time = send_time; repl->wal_end = wal_end; + repl->last_msg_data_start = data_start; } else if (buffer[0] == 'k') { /* Primary keepalive message: msgtype(1), walEnd(8), sendTime(8), reply(1) */ @@ -1565,6 +1574,12 @@ retry: Dprintf("pq_read_replication_message: wal_end="XLOGFMTSTR, XLOGFMTARGS(wal_end)); repl->wal_end = wal_end; + /* We can safely forward flush_lsn to the wal_end from the server keepalive message + * if we know that the client already processed (confirmed) the last XLogData message */ + if (repl->flush_lsn >= repl->last_msg_data_start && wal_end > repl->flush_lsn) { + repl->flush_lsn = wal_end; + } + reply = buffer[hdr]; if (reply && pq_send_replication_feedback(repl, 0) < 0) { goto exit; @@ -1614,7 +1629,8 @@ pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested) pq_raise(conn, curs, NULL); return -1; } - gettimeofday(&repl->last_io, NULL); + gettimeofday(&repl->last_feedback, NULL); + repl->last_io = repl->last_feedback; return 0; } @@ -1627,7 +1643,7 @@ pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested) manages to send keepalive messages to the server as needed. */ int -pq_copy_both(replicationCursorObject *repl, PyObject *consume, double keepalive_interval) +pq_copy_both(replicationCursorObject *repl, PyObject *consume) { cursorObject *curs = &repl->cur; connectionObject *conn = curs->conn; @@ -1636,7 +1652,7 @@ pq_copy_both(replicationCursorObject *repl, PyObject *consume, double keepalive_ PyObject *tmp = NULL; int fd, sel, ret = -1; fd_set fds; - struct timeval keep_intr, curr_time, ping_time, timeout; + struct timeval curr_time, feedback_time, timeout; if (!PyCallable_Check(consume)) { Dprintf("pq_copy_both: expected callable consume object"); @@ -1645,9 +1661,6 @@ pq_copy_both(replicationCursorObject *repl, PyObject *consume, double keepalive_ CLEARPGRES(curs->pgres); - keep_intr.tv_sec = (int)keepalive_interval; - keep_intr.tv_usec = (long)((keepalive_interval - keep_intr.tv_sec)*1.0e6); - while (1) { if (pq_read_replication_message(repl, &msg) < 0) { goto exit; @@ -1662,38 +1675,27 @@ pq_copy_both(replicationCursorObject *repl, PyObject *consume, double keepalive_ FD_ZERO(&fds); FD_SET(fd, &fds); - /* how long can we wait before we need to send a keepalive? */ + /* how long can we wait before we need to send a feedback? */ gettimeofday(&curr_time, NULL); - timeradd(&repl->last_io, &keep_intr, &ping_time); - timersub(&ping_time, &curr_time, &timeout); + timeradd(&repl->last_feedback, &repl->status_interval, &feedback_time); + timersub(&feedback_time, &curr_time, &timeout); if (timeout.tv_sec >= 0) { Py_BEGIN_ALLOW_THREADS; sel = select(fd + 1, &fds, NULL, NULL, &timeout); Py_END_ALLOW_THREADS; - } - else { - sel = 0; /* we're past target time, pretend select() timed out */ - } - if (sel < 0) { - if (errno != EINTR) { - PyErr_SetFromErrno(PyExc_OSError); - goto exit; - } - if (PyErr_CheckSignals()) { - goto exit; - } - continue; - } - - if (sel == 0) { - if (pq_send_replication_feedback(repl, 0) < 0) { - goto exit; + if (sel < 0) { + if (errno != EINTR) { + PyErr_SetFromErrno(PyExc_OSError); + goto exit; + } + if (PyErr_CheckSignals()) { + goto exit; + } } } - continue; } else { tmp = PyObject_CallFunctionObjArgs(consume, msg, NULL); diff --git a/psycopg/pqpath.h b/psycopg/pqpath.h index 9d732246..eb0578a5 100644 --- a/psycopg/pqpath.h +++ b/psycopg/pqpath.h @@ -65,8 +65,7 @@ HIDDEN int pq_execute_command_locked(connectionObject *conn, const char *query, RAISES HIDDEN void pq_complete_error(connectionObject *conn); /* replication protocol support */ -HIDDEN int pq_copy_both(replicationCursorObject *repl, PyObject *consumer, - double keepalive_interval); +HIDDEN int pq_copy_both(replicationCursorObject *repl, PyObject *consumer); HIDDEN int pq_read_replication_message(replicationCursorObject *repl, replicationMessageObject **msg); HIDDEN int pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested); diff --git a/psycopg/replication_cursor.h b/psycopg/replication_cursor.h index db549c86..1b92b09d 100644 --- a/psycopg/replication_cursor.h +++ b/psycopg/replication_cursor.h @@ -42,13 +42,16 @@ typedef struct replicationCursorObject { int decode:1; /* if we should use character decoding on the messages */ struct timeval last_io; /* timestamp of the last exchange with the server */ - struct timeval keepalive_interval; /* interval for keepalive messages in replication mode */ + struct timeval status_interval; /* time between status packets sent to the server */ XLogRecPtr write_lsn; /* LSNs for replication feedback messages */ XLogRecPtr flush_lsn; XLogRecPtr apply_lsn; XLogRecPtr wal_end; /* WAL end pointer from the last exchange with the server */ + + XLogRecPtr last_msg_data_start; /* WAL pointer to the last non-keepalive message from the server */ + struct timeval last_feedback; /* timestamp of the last feedback message to the server */ } replicationCursorObject; diff --git a/psycopg/replication_cursor_type.c b/psycopg/replication_cursor_type.c index 45220768..a31f6b83 100644 --- a/psycopg/replication_cursor_type.c +++ b/psycopg/replication_cursor_type.c @@ -38,8 +38,14 @@ #include "datetime.h" +static void set_status_interval(replicationCursorObject *self, double status_interval) +{ + self->status_interval.tv_sec = (int)status_interval; + self->status_interval.tv_usec = (long)((status_interval - self->status_interval.tv_sec)*1.0e6); +} + #define start_replication_expert_doc \ -"start_replication_expert(command, decode=False) -- Start replication with a given command." +"start_replication_expert(command, decode=False, status_interval=10) -- Start replication with a given command." static PyObject * start_replication_expert(replicationCursorObject *self, @@ -49,10 +55,12 @@ start_replication_expert(replicationCursorObject *self, connectionObject *conn = self->cur.conn; PyObject *res = NULL; PyObject *command = NULL; + double status_interval = 10; long int decode = 0; - static char *kwlist[] = {"command", "decode", NULL}; + static char *kwlist[] = {"command", "decode", "status_interval", NULL}; - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|l", kwlist, &command, &decode)) { + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|ld", kwlist, + &command, &decode, &status_interval)) { return NULL; } @@ -64,6 +72,11 @@ start_replication_expert(replicationCursorObject *self, goto exit; } + if (status_interval < 1.0) { + psyco_set_error(ProgrammingError, curs, "status_interval must be >= 1 (sec)"); + return NULL; + } + Dprintf("start_replication_expert: '%s'; decode: %ld", Bytes_AS_STRING(command), decode); @@ -72,6 +85,7 @@ start_replication_expert(replicationCursorObject *self, res = Py_None; Py_INCREF(res); + set_status_interval(self, status_interval); self->decode = decode; gettimeofday(&self->last_io, NULL); } @@ -124,8 +138,9 @@ consume_stream(replicationCursorObject *self, CLEARPGRES(curs->pgres); self->consuming = 1; + set_status_interval(self, keepalive_interval); - if (pq_copy_both(self, consume, keepalive_interval) >= 0) { + if (pq_copy_both(self, consume) >= 0) { res = Py_None; Py_INCREF(res); } @@ -159,7 +174,7 @@ read_message(replicationCursorObject *self, PyObject *dummy) } #define send_feedback_doc \ -"send_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False) -- Try sending a replication feedback message to the server and optionally request a reply." +"send_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False, force=False) -- Update a replication feedback, optionally request a reply or force sending a feedback message regardless of the timeout." static PyObject * send_feedback(replicationCursorObject *self, @@ -167,13 +182,13 @@ send_feedback(replicationCursorObject *self, { cursorObject *curs = &self->cur; XLogRecPtr write_lsn = 0, flush_lsn = 0, apply_lsn = 0; - int reply = 0; - static char* kwlist[] = {"write_lsn", "flush_lsn", "apply_lsn", "reply", NULL}; + int reply = 0, force = 0; + static char* kwlist[] = {"write_lsn", "flush_lsn", "apply_lsn", "reply", "force", NULL}; EXC_IF_CURS_CLOSED(curs); - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|KKKi", kwlist, - &write_lsn, &flush_lsn, &apply_lsn, &reply)) { + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|KKKii", kwlist, + &write_lsn, &flush_lsn, &apply_lsn, &reply, &force)) { return NULL; } @@ -186,7 +201,7 @@ send_feedback(replicationCursorObject *self, if (apply_lsn > self->apply_lsn) self->apply_lsn = apply_lsn; - if (pq_send_replication_feedback(self, reply) < 0) { + if ((force || reply) && pq_send_replication_feedback(self, reply) < 0) { return NULL; } @@ -228,6 +243,28 @@ repl_curs_get_io_timestamp(replicationCursorObject *self) return res; } +#define repl_curs_feedback_timestamp_doc \ +"feedback_timestamp -- the timestamp of the latest feedback message sent to the server" + +static PyObject * +repl_curs_get_feedback_timestamp(replicationCursorObject *self) +{ + cursorObject *curs = &self->cur; + PyObject *tval, *res = NULL; + double seconds; + + EXC_IF_CURS_CLOSED(curs); + + seconds = self->last_feedback.tv_sec + self->last_feedback.tv_usec / 1.0e6; + + tval = Py_BuildValue("(d)", seconds); + if (tval) { + res = PyDateTime_FromTimestamp(tval); + Py_DECREF(tval); + } + return res; +} + /* object member list */ #define OFFSETOF(x) offsetof(replicationCursorObject, x) @@ -259,6 +296,9 @@ static struct PyGetSetDef replicationCursorObject_getsets[] = { { "io_timestamp", (getter)repl_curs_get_io_timestamp, NULL, repl_curs_io_timestamp_doc, NULL }, + { "feedback_timestamp", + (getter)repl_curs_get_feedback_timestamp, NULL, + repl_curs_feedback_timestamp_doc, NULL }, {NULL} }; diff --git a/psycopg/solaris_support.h b/psycopg/solaris_support.h index 7b99a662..8cfb409c 100644 --- a/psycopg/solaris_support.h +++ b/psycopg/solaris_support.h @@ -35,6 +35,13 @@ extern HIDDEN void timeradd(struct timeval *a, struct timeval *b, struct timeval *c); extern HIDDEN void timersub(struct timeval *a, struct timeval *b, struct timeval *c); #endif + +#ifndef timercmp +#define timercmp(a, b, cmp) \ + (((a)->tv_sec == (b)->tv_sec) ? \ + ((a)->tv_usec cmp (b)->tv_usec) : \ + ((a)->tv_sec cmp (b)->tv_sec)) +#endif #endif #endif /* !defined(PSYCOPG_SOLARIS_SUPPORT_H) */ diff --git a/psycopg/win32_support.h b/psycopg/win32_support.h index 57b308b1..a49d5dec 100644 --- a/psycopg/win32_support.h +++ b/psycopg/win32_support.h @@ -43,6 +43,13 @@ extern HIDDEN void timeradd(struct timeval *a, struct timeval *b, struct timeval #endif extern HIDDEN void timersub(struct timeval *a, struct timeval *b, struct timeval *c); + +#ifndef timercmp +#define timercmp(a, b, cmp) \ + (((a)->tv_sec == (b)->tv_sec) ? \ + ((a)->tv_usec cmp (b)->tv_usec) : \ + ((a)->tv_sec cmp (b)->tv_sec)) +#endif #endif #endif /* !defined(PSYCOPG_WIN32_SUPPORT_H) */ diff --git a/tests/test_async_keyword.py b/tests/test_async_keyword.py index d86fa5ee..882011cc 100755 --- a/tests/test_async_keyword.py +++ b/tests/test_async_keyword.py @@ -194,6 +194,7 @@ class AsyncReplicationTest(ReplicationTestCase): def consume(msg): # just check the methods "%s: %s" % (cur.io_timestamp, repr(msg)) + "%s: %s" % (cur.feedback_timestamp, repr(msg)) self.msg_count += 1 if self.msg_count > 3: diff --git a/tests/test_replication.py b/tests/test_replication.py index 15d47f62..e4de20ba 100755 --- a/tests/test_replication.py +++ b/tests/test_replication.py @@ -163,18 +163,20 @@ class ReplicationTest(ReplicationTestCase): cur = conn.cursor() self.create_replication_slot(cur, output_plugin='test_decoding') - - # try with invalid options - cur.start_replication( - slot_name=self.slot, options={'invalid_param': 'value'}) + self.make_replication_events() def consume(msg): - pass - # we don't see the error from the server before we try to read the data - self.assertRaises(psycopg2.DataError, cur.consume_stream, consume) + raise StopReplication() + + with self.assertRaises(psycopg2.DataError): + # try with invalid options + cur.start_replication( + slot_name=self.slot, options={'invalid_param': 'value'}) + cur.consume_stream(consume) # try with correct command cur.start_replication(slot_name=self.slot) + self.assertRaises(StopReplication, cur.consume_stream, consume) @skip_before_postgres(9, 4) # slots require 9.4 @skip_repl_if_green @@ -242,6 +244,7 @@ class AsyncReplicationTest(ReplicationTestCase): def consume(msg): # just check the methods "%s: %s" % (cur.io_timestamp, repr(msg)) + "%s: %s" % (cur.feedback_timestamp, repr(msg)) "%s: %s" % (cur.wal_end, repr(msg)) self.msg_count += 1 From f827e49f558878a8e842550e19cbc44b2de6ef13 Mon Sep 17 00:00:00 2001 From: Alexander Kukushkin Date: Mon, 6 May 2019 15:26:21 +0200 Subject: [PATCH 2/5] Change the default value of keepalive_interval parameter to None The previous default value was 10 seconds, what might cause silent overwrite of the *status_interval* specified in the `start_replication()` --- doc/src/extras.rst | 5 ++++- psycopg/replication_cursor_type.c | 34 +++++++++++++++++++++++-------- 2 files changed, 29 insertions(+), 10 deletions(-) diff --git a/doc/src/extras.rst b/doc/src/extras.rst index b7136fec..68c9e3ac 100644 --- a/doc/src/extras.rst +++ b/doc/src/extras.rst @@ -360,7 +360,7 @@ The individual messages in the replication stream are represented by :param status_interval: time between feedback packets sent to the server - .. method:: consume_stream(consume, keepalive_interval=10) + .. method:: consume_stream(consume, keepalive_interval=None) :param consume: a callable object with signature :samp:`consume({msg})` :param keepalive_interval: interval (in seconds) to send keepalive @@ -386,6 +386,9 @@ The individual messages in the replication stream are represented by This method also sends feedback messages to the server every *keepalive_interval* (in seconds). The value of this parameter must be set to at least 1 second, but it can have a fractional part. + If the *keepalive_interval* is not specified, the value of + *status_interval* specified in the `start_replication()` or + `start_replication_expert()` will be used. The client must confirm every processed message by calling `send_feedback()` method on the corresponding replication cursor. A diff --git a/psycopg/replication_cursor_type.c b/psycopg/replication_cursor_type.c index a31f6b83..a590386a 100644 --- a/psycopg/replication_cursor_type.c +++ b/psycopg/replication_cursor_type.c @@ -96,19 +96,19 @@ exit: } #define consume_stream_doc \ -"consume_stream(consumer, keepalive_interval=10) -- Consume replication stream." +"consume_stream(consumer, keepalive_interval=None) -- Consume replication stream." static PyObject * consume_stream(replicationCursorObject *self, PyObject *args, PyObject *kwargs) { cursorObject *curs = &self->cur; - PyObject *consume = NULL, *res = NULL; - double keepalive_interval = 10; + PyObject *consume = NULL, *interval = NULL, *res = NULL; + double keepalive_interval = 0; static char *kwlist[] = {"consume", "keepalive_interval", NULL}; - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|d", kwlist, - &consume, &keepalive_interval)) { + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|O", kwlist, + &consume, &interval)) { return NULL; } @@ -119,9 +119,23 @@ consume_stream(replicationCursorObject *self, Dprintf("consume_stream"); - if (keepalive_interval < 1.0) { - psyco_set_error(ProgrammingError, curs, "keepalive_interval must be >= 1 (sec)"); - return NULL; + if (interval && interval != Py_None) { + + if (PyFloat_Check(interval)) { + keepalive_interval = PyFloat_AsDouble(interval); + } else if (PyLong_Check(interval)) { + keepalive_interval = PyLong_AsDouble(interval); + } else if (PyInt_Check(interval)) { + keepalive_interval = PyInt_AsLong(interval); + } else { + psyco_set_error(ProgrammingError, curs, "keepalive_interval must be int or float"); + return NULL; + } + + if (keepalive_interval < 1.0) { + psyco_set_error(ProgrammingError, curs, "keepalive_interval must be >= 1 (sec)"); + return NULL; + } } if (self->consuming) { @@ -138,7 +152,9 @@ consume_stream(replicationCursorObject *self, CLEARPGRES(curs->pgres); self->consuming = 1; - set_status_interval(self, keepalive_interval); + if (keepalive_interval >= 1) { + set_status_interval(self, keepalive_interval); + } if (pq_copy_both(self, consume) >= 0) { res = Py_None; From 5eec11f2322a2c0cc772d8edee71f02d67040f0a Mon Sep 17 00:00:00 2001 From: Alexander Kukushkin Date: Mon, 6 May 2019 15:42:37 +0200 Subject: [PATCH 3/5] Improve docs --- doc/src/extras.rst | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/doc/src/extras.rst b/doc/src/extras.rst index 68c9e3ac..aa316d2e 100644 --- a/doc/src/extras.rst +++ b/doc/src/extras.rst @@ -345,6 +345,9 @@ The individual messages in the replication stream are represented by `read_message()` in case of :ref:`asynchronous connection `. + .. versionchanged:: 2.8.3 + added the *status_interval* parameter. + .. |START_REPLICATION| replace:: :sql:`START_REPLICATION` .. _START_REPLICATION: https://www.postgresql.org/docs/current/static/protocol-replication.html @@ -359,6 +362,9 @@ The individual messages in the replication stream are represented by performed on messages received from the server. :param status_interval: time between feedback packets sent to the server + .. versionchanged:: 2.8.3 + added the *status_interval* parameter. + .. method:: consume_stream(consume, keepalive_interval=None) @@ -417,6 +423,9 @@ The individual messages in the replication stream are represented by retains all the WAL segments that might be needed to stream the changes via all of the currently open replication slots. + .. versionchanged:: 2.8.3 + changed the default value of the *keepalive_interval* parameter to `!None`. + .. method:: send_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False, force=False) :param write_lsn: a LSN position up to which the client has written the data locally @@ -438,6 +447,9 @@ The individual messages in the replication stream are represented by to the server. The library sends feedback message automatically when *status_interval* timeout is reached. + .. versionchanged:: 2.8.3 + added the *force* parameter. + Low-level replication cursor methods for :ref:`asynchronous connection ` operation. @@ -493,6 +505,8 @@ The individual messages in the replication stream are represented by A `~datetime` object representing the timestamp at the moment when the last feedback message sent to the server. + .. versionadded:: 2.8.3 + .. attribute:: wal_end LSN position of the current end of WAL on the server at the From 90755e6f13a608e8b93238e676086d355aa111db Mon Sep 17 00:00:00 2001 From: Alexander Kukushkin Date: Tue, 7 May 2019 14:18:09 +0200 Subject: [PATCH 4/5] Address code-review --- psycopg/replication_cursor_type.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/psycopg/replication_cursor_type.c b/psycopg/replication_cursor_type.c index a590386a..5fdeaf03 100644 --- a/psycopg/replication_cursor_type.c +++ b/psycopg/replication_cursor_type.c @@ -152,7 +152,7 @@ consume_stream(replicationCursorObject *self, CLEARPGRES(curs->pgres); self->consuming = 1; - if (keepalive_interval >= 1) { + if (keepalive_interval > 0) { set_status_interval(self, keepalive_interval); } From b79895186cff87c5f0da29dc5c22475411809644 Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Sun, 12 May 2019 23:29:22 +0900 Subject: [PATCH 5/5] Added news entry about smart replication feedback --- NEWS | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/NEWS b/NEWS index c2f88d1d..4ff4e602 100644 --- a/NEWS +++ b/NEWS @@ -1,6 +1,15 @@ Current release --------------- +What's new in psycopg 2.8.3 +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +- Added *interval_status* parameter to + `~psycopg2.extras.ReplicationCursor.start_replication()` method and other + facilities to send automatic replication keepalives at periodic intervals + (:ticket:`#913`). + + What's new in psycopg 2.8.2 ^^^^^^^^^^^^^^^^^^^^^^^^^^^