Change the default value of keepalive_interval parameter to None

The previous default value was 10 seconds, what might cause silent
overwrite of the *status_interval* specified in the `start_replication()`
This commit is contained in:
Alexander Kukushkin 2019-05-06 15:26:21 +02:00
parent 6cff5a3e08
commit f827e49f55
2 changed files with 29 additions and 10 deletions

View File

@ -360,7 +360,7 @@ The individual messages in the replication stream are represented by
:param status_interval: time between feedback packets sent to the server
.. method:: consume_stream(consume, keepalive_interval=10)
.. method:: consume_stream(consume, keepalive_interval=None)
:param consume: a callable object with signature :samp:`consume({msg})`
:param keepalive_interval: interval (in seconds) to send keepalive
@ -386,6 +386,9 @@ The individual messages in the replication stream are represented by
This method also sends feedback messages to the server every
*keepalive_interval* (in seconds). The value of this parameter must
be set to at least 1 second, but it can have a fractional part.
If the *keepalive_interval* is not specified, the value of
*status_interval* specified in the `start_replication()` or
`start_replication_expert()` will be used.
The client must confirm every processed message by calling
`send_feedback()` method on the corresponding replication cursor. A

View File

@ -96,19 +96,19 @@ exit:
}
#define consume_stream_doc \
"consume_stream(consumer, keepalive_interval=10) -- Consume replication stream."
"consume_stream(consumer, keepalive_interval=None) -- Consume replication stream."
static PyObject *
consume_stream(replicationCursorObject *self,
PyObject *args, PyObject *kwargs)
{
cursorObject *curs = &self->cur;
PyObject *consume = NULL, *res = NULL;
double keepalive_interval = 10;
PyObject *consume = NULL, *interval = NULL, *res = NULL;
double keepalive_interval = 0;
static char *kwlist[] = {"consume", "keepalive_interval", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|d", kwlist,
&consume, &keepalive_interval)) {
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|O", kwlist,
&consume, &interval)) {
return NULL;
}
@ -119,9 +119,23 @@ consume_stream(replicationCursorObject *self,
Dprintf("consume_stream");
if (keepalive_interval < 1.0) {
psyco_set_error(ProgrammingError, curs, "keepalive_interval must be >= 1 (sec)");
return NULL;
if (interval && interval != Py_None) {
if (PyFloat_Check(interval)) {
keepalive_interval = PyFloat_AsDouble(interval);
} else if (PyLong_Check(interval)) {
keepalive_interval = PyLong_AsDouble(interval);
} else if (PyInt_Check(interval)) {
keepalive_interval = PyInt_AsLong(interval);
} else {
psyco_set_error(ProgrammingError, curs, "keepalive_interval must be int or float");
return NULL;
}
if (keepalive_interval < 1.0) {
psyco_set_error(ProgrammingError, curs, "keepalive_interval must be >= 1 (sec)");
return NULL;
}
}
if (self->consuming) {
@ -138,7 +152,9 @@ consume_stream(replicationCursorObject *self,
CLEARPGRES(curs->pgres);
self->consuming = 1;
set_status_interval(self, keepalive_interval);
if (keepalive_interval >= 1) {
set_status_interval(self, keepalive_interval);
}
if (pq_copy_both(self, consume) >= 0) {
res = Py_None;