Merge branch 'feature/smart-feedback'

This commit is contained in:
Daniele Varrazzo 2019-05-12 23:55:11 +09:00
commit 668d507c34
12 changed files with 197 additions and 97 deletions

9
NEWS
View File

@ -1,6 +1,15 @@
Current release Current release
--------------- ---------------
What's new in psycopg 2.8.3
^^^^^^^^^^^^^^^^^^^^^^^^^^^
- Added *interval_status* parameter to
`~psycopg2.extras.ReplicationCursor.start_replication()` method and other
facilities to send automatic replication keepalives at periodic intervals
(:ticket:`#913`).
What's new in psycopg 2.8.2 What's new in psycopg 2.8.2
^^^^^^^^^^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^^^^^^^^^^

View File

@ -552,8 +552,7 @@ value greater than zero in ``postgresql.conf`` (these changes require a server
restart). Create a database ``psycopg2_test``. restart). Create a database ``psycopg2_test``.
Then run the following code to quickly try the replication support out. This Then run the following code to quickly try the replication support out. This
is not production code -- it has no error handling, it sends feedback too is not production code -- it's only intended as a simple demo of logical
often, etc. -- and it's only intended as a simple demo of logical
replication:: replication::
from __future__ import print_function from __future__ import print_function

View File

@ -270,7 +270,7 @@ The individual messages in the replication stream are represented by
Replication slots are a feature of PostgreSQL server starting with Replication slots are a feature of PostgreSQL server starting with
version 9.4. version 9.4.
.. method:: start_replication(slot_name=None, slot_type=None, start_lsn=0, timeline=0, options=None, decode=False) .. method:: start_replication(slot_name=None, slot_type=None, start_lsn=0, timeline=0, options=None, decode=False, status_interval=10)
Start replication on the connection. Start replication on the connection.
@ -288,6 +288,7 @@ The individual messages in the replication stream are represented by
slot (not allowed with physical replication) slot (not allowed with physical replication)
:param decode: a flag indicating that unicode conversion should be :param decode: a flag indicating that unicode conversion should be
performed on messages received from the server performed on messages received from the server
:param status_interval: time between feedback packets sent to the server
If a *slot_name* is specified, the slot must exist on the server and If a *slot_name* is specified, the slot must exist on the server and
its type must match the replication type used. its type must match the replication type used.
@ -328,6 +329,14 @@ The individual messages in the replication stream are represented by
*This parameter should not be set with physical replication or with *This parameter should not be set with physical replication or with
logical replication plugins that produce binary output.* logical replication plugins that produce binary output.*
Replication stream should periodically send feedback to the database
to prevent disconnect via timeout. Feedback is automatically sent when
`read_message()` is called or during run of the `consume_stream()`.
To specify the feedback interval use *status_interval* parameter.
The value of this parameter must be set to at least 1 second, but
it can have a fractional part.
This function constructs a |START_REPLICATION|_ command and calls This function constructs a |START_REPLICATION|_ command and calls
`start_replication_expert()` internally. `start_replication_expert()` internally.
@ -336,10 +345,13 @@ The individual messages in the replication stream are represented by
`read_message()` in case of :ref:`asynchronous connection `read_message()` in case of :ref:`asynchronous connection
<async-support>`. <async-support>`.
.. versionchanged:: 2.8.3
added the *status_interval* parameter.
.. |START_REPLICATION| replace:: :sql:`START_REPLICATION` .. |START_REPLICATION| replace:: :sql:`START_REPLICATION`
.. _START_REPLICATION: https://www.postgresql.org/docs/current/static/protocol-replication.html .. _START_REPLICATION: https://www.postgresql.org/docs/current/static/protocol-replication.html
.. method:: start_replication_expert(command, decode=False) .. method:: start_replication_expert(command, decode=False, status_interval=10)
Start replication on the connection using provided Start replication on the connection using provided
|START_REPLICATION|_ command. |START_REPLICATION|_ command.
@ -348,9 +360,13 @@ The individual messages in the replication stream are represented by
`~psycopg2.sql.Composable` instance for dynamic generation. `~psycopg2.sql.Composable` instance for dynamic generation.
:param decode: a flag indicating that unicode conversion should be :param decode: a flag indicating that unicode conversion should be
performed on messages received from the server. performed on messages received from the server.
:param status_interval: time between feedback packets sent to the server
.. versionchanged:: 2.8.3
added the *status_interval* parameter.
.. method:: consume_stream(consume, keepalive_interval=10) .. method:: consume_stream(consume, keepalive_interval=None)
:param consume: a callable object with signature :samp:`consume({msg})` :param consume: a callable object with signature :samp:`consume({msg})`
:param keepalive_interval: interval (in seconds) to send keepalive :param keepalive_interval: interval (in seconds) to send keepalive
@ -373,13 +389,14 @@ The individual messages in the replication stream are represented by
`ReplicationMessage` class. See `read_message()` for details about `ReplicationMessage` class. See `read_message()` for details about
message decoding. message decoding.
This method also sends keepalive messages to the server in case there This method also sends feedback messages to the server every
were no new data from the server for the duration of
*keepalive_interval* (in seconds). The value of this parameter must *keepalive_interval* (in seconds). The value of this parameter must
be set to at least 1 second, but it can have a fractional part. be set to at least 1 second, but it can have a fractional part.
If the *keepalive_interval* is not specified, the value of
*status_interval* specified in the `start_replication()` or
`start_replication_expert()` will be used.
After processing certain amount of messages the client should send a The client must confirm every processed message by calling
confirmation message to the server. This should be done by calling
`send_feedback()` method on the corresponding replication cursor. A `send_feedback()` method on the corresponding replication cursor. A
reference to the cursor is provided in the `ReplicationMessage` as an reference to the cursor is provided in the `ReplicationMessage` as an
attribute. attribute.
@ -393,8 +410,6 @@ The individual messages in the replication stream are represented by
def __call__(self, msg): def __call__(self, msg):
self.process_message(msg.payload) self.process_message(msg.payload)
if self.should_send_feedback(msg):
msg.cursor.send_feedback(flush_lsn=msg.data_start) msg.cursor.send_feedback(flush_lsn=msg.data_start)
consumer = LogicalStreamConsumer() consumer = LogicalStreamConsumer()
@ -408,12 +423,10 @@ The individual messages in the replication stream are represented by
retains all the WAL segments that might be needed to stream the retains all the WAL segments that might be needed to stream the
changes via all of the currently open replication slots. changes via all of the currently open replication slots.
On the other hand, it is not recommended to send confirmation .. versionchanged:: 2.8.3
after *every* processed message, since that will put an changed the default value of the *keepalive_interval* parameter to `!None`.
unnecessary load on network and the server. A possible strategy
is to confirm after every COMMIT message.
.. method:: send_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False) .. method:: send_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False, force=False)
:param write_lsn: a LSN position up to which the client has written the data locally :param write_lsn: a LSN position up to which the client has written the data locally
:param flush_lsn: a LSN position up to which the client has processed the :param flush_lsn: a LSN position up to which the client has processed the
@ -423,13 +436,19 @@ The individual messages in the replication stream are represented by
has applied the changes (physical replication has applied the changes (physical replication
master-slave protocol only) master-slave protocol only)
:param reply: request the server to send back a keepalive message immediately :param reply: request the server to send back a keepalive message immediately
:param force: force sending a feedback message regardless of status_interval timeout
Use this method to report to the server that all messages up to a Use this method to report to the server that all messages up to a
certain LSN position have been processed on the client and may be certain LSN position have been processed on the client and may be
discarded on the server. discarded on the server.
This method can also be called with all default parameters' values to If the *reply* or *force* parameters are not set, this method will
just send a keepalive message to the server. just update internal structures without sending the feedback message
to the server. The library sends feedback message automatically
when *status_interval* timeout is reached.
.. versionchanged:: 2.8.3
added the *force* parameter.
Low-level replication cursor methods for :ref:`asynchronous connection Low-level replication cursor methods for :ref:`asynchronous connection
<async-support>` operation. <async-support>` operation.
@ -463,9 +482,9 @@ The individual messages in the replication stream are represented by
corresponding connection to block the process until there is more data corresponding connection to block the process until there is more data
from the server. from the server.
The server can send keepalive messages to the client periodically. Last, but not least, this method sends feedback messages when
Such messages are silently consumed by this method and are never *status_interval* timeout is reached or when keepalive message with
reported to the caller. reply request arrived from the server.
.. method:: fileno() .. method:: fileno()
@ -481,6 +500,13 @@ The individual messages in the replication stream are represented by
communication with the server (a data or keepalive message in either communication with the server (a data or keepalive message in either
direction). direction).
.. attribute:: feedback_timestamp
A `~datetime` object representing the timestamp at the moment when
the last feedback message sent to the server.
.. versionadded:: 2.8.3
.. attribute:: wal_end .. attribute:: wal_end
LSN position of the current end of WAL on the server at the LSN position of the current end of WAL on the server at the
@ -496,33 +522,21 @@ The individual messages in the replication stream are represented by
def consume(msg): def consume(msg):
# ... # ...
msg.cursor.send_feedback(flush_lsn=msg.data_start)
keepalive_interval = 10.0 status_interval = 10.0
while True: while True:
msg = cur.read_message() msg = cur.read_message()
if msg: if msg:
consume(msg) consume(msg)
else: else:
now = datetime.now() now = datetime.now()
timeout = keepalive_interval - (now - cur.io_timestamp).total_seconds() timeout = status_interval - (now - cur.feedback_timestamp).total_seconds()
try: try:
sel = select([cur], [], [], max(0, timeout)) sel = select([cur], [], [], max(0, timeout))
if not any(sel):
cur.send_feedback() # timed out, send keepalive message
except InterruptedError: except InterruptedError:
pass # recalculate timeout and continue pass # recalculate timeout and continue
.. warning::
The :samp:`consume({msg})` function will only be called when there are new
database writes on the server e.g. any DML or DDL statement. Depending on
your Postgres cluster configuration this might cause the server to run out
of disk space if the writes are too far apart. To prevent this from
happening you can use `~ReplicationCursor.wal_end` value to periodically
send feedback to the server to notify that your replication client has
received and processed all the messages.
.. index:: .. index::
pair: Cursor; Replication pair: Cursor; Replication

View File

@ -561,7 +561,7 @@ class ReplicationCursor(_replicationCursor):
self.execute(command) self.execute(command)
def start_replication(self, slot_name=None, slot_type=None, start_lsn=0, def start_replication(self, slot_name=None, slot_type=None, start_lsn=0,
timeline=0, options=None, decode=False): timeline=0, options=None, decode=False, status_interval=10):
"""Start replication stream.""" """Start replication stream."""
command = "START_REPLICATION " command = "START_REPLICATION "
@ -615,7 +615,7 @@ class ReplicationCursor(_replicationCursor):
command += "%s %s" % (quote_ident(k, self), _A(str(v))) command += "%s %s" % (quote_ident(k, self), _A(str(v)))
command += ")" command += ")"
self.start_replication_expert(command, decode=decode) self.start_replication_expert(command, decode=decode, status_interval=status_interval)
# allows replication cursors to be used in select.select() directly # allows replication cursors to be used in select.select() directly
def fileno(self): def fileno(self):

View File

@ -1454,12 +1454,20 @@ pq_read_replication_message(replicationCursorObject *repl, replicationMessageObj
int64_t send_time; int64_t send_time;
PyObject *str = NULL, *result = NULL; PyObject *str = NULL, *result = NULL;
int ret = -1; int ret = -1;
struct timeval curr_time, feedback_time;
Dprintf("pq_read_replication_message"); Dprintf("pq_read_replication_message");
*msg = NULL; *msg = NULL;
consumed = 0; consumed = 0;
/* Is it a time to send the next feedback message? */
gettimeofday(&curr_time, NULL);
timeradd(&repl->last_feedback, &repl->status_interval, &feedback_time);
if (timercmp(&curr_time, &feedback_time, >=) && pq_send_replication_feedback(repl, 0) < 0) {
goto exit;
}
retry: retry:
len = PQgetCopyData(pgconn, &buffer, 1 /* async */); len = PQgetCopyData(pgconn, &buffer, 1 /* async */);
@ -1552,6 +1560,7 @@ retry:
(*msg)->send_time = send_time; (*msg)->send_time = send_time;
repl->wal_end = wal_end; repl->wal_end = wal_end;
repl->last_msg_data_start = data_start;
} }
else if (buffer[0] == 'k') { else if (buffer[0] == 'k') {
/* Primary keepalive message: msgtype(1), walEnd(8), sendTime(8), reply(1) */ /* Primary keepalive message: msgtype(1), walEnd(8), sendTime(8), reply(1) */
@ -1565,6 +1574,12 @@ retry:
Dprintf("pq_read_replication_message: wal_end="XLOGFMTSTR, XLOGFMTARGS(wal_end)); Dprintf("pq_read_replication_message: wal_end="XLOGFMTSTR, XLOGFMTARGS(wal_end));
repl->wal_end = wal_end; repl->wal_end = wal_end;
/* We can safely forward flush_lsn to the wal_end from the server keepalive message
* if we know that the client already processed (confirmed) the last XLogData message */
if (repl->flush_lsn >= repl->last_msg_data_start && wal_end > repl->flush_lsn) {
repl->flush_lsn = wal_end;
}
reply = buffer[hdr]; reply = buffer[hdr];
if (reply && pq_send_replication_feedback(repl, 0) < 0) { if (reply && pq_send_replication_feedback(repl, 0) < 0) {
goto exit; goto exit;
@ -1614,7 +1629,8 @@ pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested)
pq_raise(conn, curs, NULL); pq_raise(conn, curs, NULL);
return -1; return -1;
} }
gettimeofday(&repl->last_io, NULL); gettimeofday(&repl->last_feedback, NULL);
repl->last_io = repl->last_feedback;
return 0; return 0;
} }
@ -1627,7 +1643,7 @@ pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested)
manages to send keepalive messages to the server as needed. manages to send keepalive messages to the server as needed.
*/ */
int int
pq_copy_both(replicationCursorObject *repl, PyObject *consume, double keepalive_interval) pq_copy_both(replicationCursorObject *repl, PyObject *consume)
{ {
cursorObject *curs = &repl->cur; cursorObject *curs = &repl->cur;
connectionObject *conn = curs->conn; connectionObject *conn = curs->conn;
@ -1636,7 +1652,7 @@ pq_copy_both(replicationCursorObject *repl, PyObject *consume, double keepalive_
PyObject *tmp = NULL; PyObject *tmp = NULL;
int fd, sel, ret = -1; int fd, sel, ret = -1;
fd_set fds; fd_set fds;
struct timeval keep_intr, curr_time, ping_time, timeout; struct timeval curr_time, feedback_time, timeout;
if (!PyCallable_Check(consume)) { if (!PyCallable_Check(consume)) {
Dprintf("pq_copy_both: expected callable consume object"); Dprintf("pq_copy_both: expected callable consume object");
@ -1645,9 +1661,6 @@ pq_copy_both(replicationCursorObject *repl, PyObject *consume, double keepalive_
CLEARPGRES(curs->pgres); CLEARPGRES(curs->pgres);
keep_intr.tv_sec = (int)keepalive_interval;
keep_intr.tv_usec = (long)((keepalive_interval - keep_intr.tv_sec)*1.0e6);
while (1) { while (1) {
if (pq_read_replication_message(repl, &msg) < 0) { if (pq_read_replication_message(repl, &msg) < 0) {
goto exit; goto exit;
@ -1662,20 +1675,16 @@ pq_copy_both(replicationCursorObject *repl, PyObject *consume, double keepalive_
FD_ZERO(&fds); FD_ZERO(&fds);
FD_SET(fd, &fds); FD_SET(fd, &fds);
/* how long can we wait before we need to send a keepalive? */ /* how long can we wait before we need to send a feedback? */
gettimeofday(&curr_time, NULL); gettimeofday(&curr_time, NULL);
timeradd(&repl->last_io, &keep_intr, &ping_time); timeradd(&repl->last_feedback, &repl->status_interval, &feedback_time);
timersub(&ping_time, &curr_time, &timeout); timersub(&feedback_time, &curr_time, &timeout);
if (timeout.tv_sec >= 0) { if (timeout.tv_sec >= 0) {
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
sel = select(fd + 1, &fds, NULL, NULL, &timeout); sel = select(fd + 1, &fds, NULL, NULL, &timeout);
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
}
else {
sel = 0; /* we're past target time, pretend select() timed out */
}
if (sel < 0) { if (sel < 0) {
if (errno != EINTR) { if (errno != EINTR) {
@ -1685,15 +1694,8 @@ pq_copy_both(replicationCursorObject *repl, PyObject *consume, double keepalive_
if (PyErr_CheckSignals()) { if (PyErr_CheckSignals()) {
goto exit; goto exit;
} }
continue;
}
if (sel == 0) {
if (pq_send_replication_feedback(repl, 0) < 0) {
goto exit;
} }
} }
continue;
} }
else { else {
tmp = PyObject_CallFunctionObjArgs(consume, msg, NULL); tmp = PyObject_CallFunctionObjArgs(consume, msg, NULL);

View File

@ -65,8 +65,7 @@ HIDDEN int pq_execute_command_locked(connectionObject *conn, const char *query,
RAISES HIDDEN void pq_complete_error(connectionObject *conn); RAISES HIDDEN void pq_complete_error(connectionObject *conn);
/* replication protocol support */ /* replication protocol support */
HIDDEN int pq_copy_both(replicationCursorObject *repl, PyObject *consumer, HIDDEN int pq_copy_both(replicationCursorObject *repl, PyObject *consumer);
double keepalive_interval);
HIDDEN int pq_read_replication_message(replicationCursorObject *repl, HIDDEN int pq_read_replication_message(replicationCursorObject *repl,
replicationMessageObject **msg); replicationMessageObject **msg);
HIDDEN int pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested); HIDDEN int pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested);

View File

@ -42,13 +42,16 @@ typedef struct replicationCursorObject {
int decode:1; /* if we should use character decoding on the messages */ int decode:1; /* if we should use character decoding on the messages */
struct timeval last_io; /* timestamp of the last exchange with the server */ struct timeval last_io; /* timestamp of the last exchange with the server */
struct timeval keepalive_interval; /* interval for keepalive messages in replication mode */ struct timeval status_interval; /* time between status packets sent to the server */
XLogRecPtr write_lsn; /* LSNs for replication feedback messages */ XLogRecPtr write_lsn; /* LSNs for replication feedback messages */
XLogRecPtr flush_lsn; XLogRecPtr flush_lsn;
XLogRecPtr apply_lsn; XLogRecPtr apply_lsn;
XLogRecPtr wal_end; /* WAL end pointer from the last exchange with the server */ XLogRecPtr wal_end; /* WAL end pointer from the last exchange with the server */
XLogRecPtr last_msg_data_start; /* WAL pointer to the last non-keepalive message from the server */
struct timeval last_feedback; /* timestamp of the last feedback message to the server */
} replicationCursorObject; } replicationCursorObject;

View File

@ -38,8 +38,14 @@
#include "datetime.h" #include "datetime.h"
static void set_status_interval(replicationCursorObject *self, double status_interval)
{
self->status_interval.tv_sec = (int)status_interval;
self->status_interval.tv_usec = (long)((status_interval - self->status_interval.tv_sec)*1.0e6);
}
#define start_replication_expert_doc \ #define start_replication_expert_doc \
"start_replication_expert(command, decode=False) -- Start replication with a given command." "start_replication_expert(command, decode=False, status_interval=10) -- Start replication with a given command."
static PyObject * static PyObject *
start_replication_expert(replicationCursorObject *self, start_replication_expert(replicationCursorObject *self,
@ -49,10 +55,12 @@ start_replication_expert(replicationCursorObject *self,
connectionObject *conn = self->cur.conn; connectionObject *conn = self->cur.conn;
PyObject *res = NULL; PyObject *res = NULL;
PyObject *command = NULL; PyObject *command = NULL;
double status_interval = 10;
long int decode = 0; long int decode = 0;
static char *kwlist[] = {"command", "decode", NULL}; static char *kwlist[] = {"command", "decode", "status_interval", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|l", kwlist, &command, &decode)) { if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|ld", kwlist,
&command, &decode, &status_interval)) {
return NULL; return NULL;
} }
@ -64,6 +72,11 @@ start_replication_expert(replicationCursorObject *self,
goto exit; goto exit;
} }
if (status_interval < 1.0) {
psyco_set_error(ProgrammingError, curs, "status_interval must be >= 1 (sec)");
return NULL;
}
Dprintf("start_replication_expert: '%s'; decode: %ld", Dprintf("start_replication_expert: '%s'; decode: %ld",
Bytes_AS_STRING(command), decode); Bytes_AS_STRING(command), decode);
@ -72,6 +85,7 @@ start_replication_expert(replicationCursorObject *self,
res = Py_None; res = Py_None;
Py_INCREF(res); Py_INCREF(res);
set_status_interval(self, status_interval);
self->decode = decode; self->decode = decode;
gettimeofday(&self->last_io, NULL); gettimeofday(&self->last_io, NULL);
} }
@ -82,19 +96,19 @@ exit:
} }
#define consume_stream_doc \ #define consume_stream_doc \
"consume_stream(consumer, keepalive_interval=10) -- Consume replication stream." "consume_stream(consumer, keepalive_interval=None) -- Consume replication stream."
static PyObject * static PyObject *
consume_stream(replicationCursorObject *self, consume_stream(replicationCursorObject *self,
PyObject *args, PyObject *kwargs) PyObject *args, PyObject *kwargs)
{ {
cursorObject *curs = &self->cur; cursorObject *curs = &self->cur;
PyObject *consume = NULL, *res = NULL; PyObject *consume = NULL, *interval = NULL, *res = NULL;
double keepalive_interval = 10; double keepalive_interval = 0;
static char *kwlist[] = {"consume", "keepalive_interval", NULL}; static char *kwlist[] = {"consume", "keepalive_interval", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|d", kwlist, if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|O", kwlist,
&consume, &keepalive_interval)) { &consume, &interval)) {
return NULL; return NULL;
} }
@ -105,10 +119,24 @@ consume_stream(replicationCursorObject *self,
Dprintf("consume_stream"); Dprintf("consume_stream");
if (interval && interval != Py_None) {
if (PyFloat_Check(interval)) {
keepalive_interval = PyFloat_AsDouble(interval);
} else if (PyLong_Check(interval)) {
keepalive_interval = PyLong_AsDouble(interval);
} else if (PyInt_Check(interval)) {
keepalive_interval = PyInt_AsLong(interval);
} else {
psyco_set_error(ProgrammingError, curs, "keepalive_interval must be int or float");
return NULL;
}
if (keepalive_interval < 1.0) { if (keepalive_interval < 1.0) {
psyco_set_error(ProgrammingError, curs, "keepalive_interval must be >= 1 (sec)"); psyco_set_error(ProgrammingError, curs, "keepalive_interval must be >= 1 (sec)");
return NULL; return NULL;
} }
}
if (self->consuming) { if (self->consuming) {
PyErr_SetString(ProgrammingError, PyErr_SetString(ProgrammingError,
@ -124,8 +152,11 @@ consume_stream(replicationCursorObject *self,
CLEARPGRES(curs->pgres); CLEARPGRES(curs->pgres);
self->consuming = 1; self->consuming = 1;
if (keepalive_interval > 0) {
set_status_interval(self, keepalive_interval);
}
if (pq_copy_both(self, consume, keepalive_interval) >= 0) { if (pq_copy_both(self, consume) >= 0) {
res = Py_None; res = Py_None;
Py_INCREF(res); Py_INCREF(res);
} }
@ -159,7 +190,7 @@ read_message(replicationCursorObject *self, PyObject *dummy)
} }
#define send_feedback_doc \ #define send_feedback_doc \
"send_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False) -- Try sending a replication feedback message to the server and optionally request a reply." "send_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False, force=False) -- Update a replication feedback, optionally request a reply or force sending a feedback message regardless of the timeout."
static PyObject * static PyObject *
send_feedback(replicationCursorObject *self, send_feedback(replicationCursorObject *self,
@ -167,13 +198,13 @@ send_feedback(replicationCursorObject *self,
{ {
cursorObject *curs = &self->cur; cursorObject *curs = &self->cur;
XLogRecPtr write_lsn = 0, flush_lsn = 0, apply_lsn = 0; XLogRecPtr write_lsn = 0, flush_lsn = 0, apply_lsn = 0;
int reply = 0; int reply = 0, force = 0;
static char* kwlist[] = {"write_lsn", "flush_lsn", "apply_lsn", "reply", NULL}; static char* kwlist[] = {"write_lsn", "flush_lsn", "apply_lsn", "reply", "force", NULL};
EXC_IF_CURS_CLOSED(curs); EXC_IF_CURS_CLOSED(curs);
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|KKKi", kwlist, if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|KKKii", kwlist,
&write_lsn, &flush_lsn, &apply_lsn, &reply)) { &write_lsn, &flush_lsn, &apply_lsn, &reply, &force)) {
return NULL; return NULL;
} }
@ -186,7 +217,7 @@ send_feedback(replicationCursorObject *self,
if (apply_lsn > self->apply_lsn) if (apply_lsn > self->apply_lsn)
self->apply_lsn = apply_lsn; self->apply_lsn = apply_lsn;
if (pq_send_replication_feedback(self, reply) < 0) { if ((force || reply) && pq_send_replication_feedback(self, reply) < 0) {
return NULL; return NULL;
} }
@ -228,6 +259,28 @@ repl_curs_get_io_timestamp(replicationCursorObject *self)
return res; return res;
} }
#define repl_curs_feedback_timestamp_doc \
"feedback_timestamp -- the timestamp of the latest feedback message sent to the server"
static PyObject *
repl_curs_get_feedback_timestamp(replicationCursorObject *self)
{
cursorObject *curs = &self->cur;
PyObject *tval, *res = NULL;
double seconds;
EXC_IF_CURS_CLOSED(curs);
seconds = self->last_feedback.tv_sec + self->last_feedback.tv_usec / 1.0e6;
tval = Py_BuildValue("(d)", seconds);
if (tval) {
res = PyDateTime_FromTimestamp(tval);
Py_DECREF(tval);
}
return res;
}
/* object member list */ /* object member list */
#define OFFSETOF(x) offsetof(replicationCursorObject, x) #define OFFSETOF(x) offsetof(replicationCursorObject, x)
@ -259,6 +312,9 @@ static struct PyGetSetDef replicationCursorObject_getsets[] = {
{ "io_timestamp", { "io_timestamp",
(getter)repl_curs_get_io_timestamp, NULL, (getter)repl_curs_get_io_timestamp, NULL,
repl_curs_io_timestamp_doc, NULL }, repl_curs_io_timestamp_doc, NULL },
{ "feedback_timestamp",
(getter)repl_curs_get_feedback_timestamp, NULL,
repl_curs_feedback_timestamp_doc, NULL },
{NULL} {NULL}
}; };

View File

@ -35,6 +35,13 @@
extern HIDDEN void timeradd(struct timeval *a, struct timeval *b, struct timeval *c); extern HIDDEN void timeradd(struct timeval *a, struct timeval *b, struct timeval *c);
extern HIDDEN void timersub(struct timeval *a, struct timeval *b, struct timeval *c); extern HIDDEN void timersub(struct timeval *a, struct timeval *b, struct timeval *c);
#endif #endif
#ifndef timercmp
#define timercmp(a, b, cmp) \
(((a)->tv_sec == (b)->tv_sec) ? \
((a)->tv_usec cmp (b)->tv_usec) : \
((a)->tv_sec cmp (b)->tv_sec))
#endif
#endif #endif
#endif /* !defined(PSYCOPG_SOLARIS_SUPPORT_H) */ #endif /* !defined(PSYCOPG_SOLARIS_SUPPORT_H) */

View File

@ -43,6 +43,13 @@ extern HIDDEN void timeradd(struct timeval *a, struct timeval *b, struct timeval
#endif #endif
extern HIDDEN void timersub(struct timeval *a, struct timeval *b, struct timeval *c); extern HIDDEN void timersub(struct timeval *a, struct timeval *b, struct timeval *c);
#ifndef timercmp
#define timercmp(a, b, cmp) \
(((a)->tv_sec == (b)->tv_sec) ? \
((a)->tv_usec cmp (b)->tv_usec) : \
((a)->tv_sec cmp (b)->tv_sec))
#endif
#endif #endif
#endif /* !defined(PSYCOPG_WIN32_SUPPORT_H) */ #endif /* !defined(PSYCOPG_WIN32_SUPPORT_H) */

View File

@ -194,6 +194,7 @@ class AsyncReplicationTest(ReplicationTestCase):
def consume(msg): def consume(msg):
# just check the methods # just check the methods
"%s: %s" % (cur.io_timestamp, repr(msg)) "%s: %s" % (cur.io_timestamp, repr(msg))
"%s: %s" % (cur.feedback_timestamp, repr(msg))
self.msg_count += 1 self.msg_count += 1
if self.msg_count > 3: if self.msg_count > 3:

View File

@ -163,18 +163,20 @@ class ReplicationTest(ReplicationTestCase):
cur = conn.cursor() cur = conn.cursor()
self.create_replication_slot(cur, output_plugin='test_decoding') self.create_replication_slot(cur, output_plugin='test_decoding')
self.make_replication_events()
def consume(msg):
raise StopReplication()
with self.assertRaises(psycopg2.DataError):
# try with invalid options # try with invalid options
cur.start_replication( cur.start_replication(
slot_name=self.slot, options={'invalid_param': 'value'}) slot_name=self.slot, options={'invalid_param': 'value'})
cur.consume_stream(consume)
def consume(msg):
pass
# we don't see the error from the server before we try to read the data
self.assertRaises(psycopg2.DataError, cur.consume_stream, consume)
# try with correct command # try with correct command
cur.start_replication(slot_name=self.slot) cur.start_replication(slot_name=self.slot)
self.assertRaises(StopReplication, cur.consume_stream, consume)
@skip_before_postgres(9, 4) # slots require 9.4 @skip_before_postgres(9, 4) # slots require 9.4
@skip_repl_if_green @skip_repl_if_green
@ -242,6 +244,7 @@ class AsyncReplicationTest(ReplicationTestCase):
def consume(msg): def consume(msg):
# just check the methods # just check the methods
"%s: %s" % (cur.io_timestamp, repr(msg)) "%s: %s" % (cur.io_timestamp, repr(msg))
"%s: %s" % (cur.feedback_timestamp, repr(msg))
"%s: %s" % (cur.wal_end, repr(msg)) "%s: %s" % (cur.wal_end, repr(msg))
self.msg_count += 1 self.msg_count += 1