diff --git a/doc/src/extras.rst b/doc/src/extras.rst index b7136fec..68c9e3ac 100644 --- a/doc/src/extras.rst +++ b/doc/src/extras.rst @@ -360,7 +360,7 @@ The individual messages in the replication stream are represented by :param status_interval: time between feedback packets sent to the server - .. method:: consume_stream(consume, keepalive_interval=10) + .. method:: consume_stream(consume, keepalive_interval=None) :param consume: a callable object with signature :samp:`consume({msg})` :param keepalive_interval: interval (in seconds) to send keepalive @@ -386,6 +386,9 @@ The individual messages in the replication stream are represented by This method also sends feedback messages to the server every *keepalive_interval* (in seconds). The value of this parameter must be set to at least 1 second, but it can have a fractional part. + If the *keepalive_interval* is not specified, the value of + *status_interval* specified in the `start_replication()` or + `start_replication_expert()` will be used. The client must confirm every processed message by calling `send_feedback()` method on the corresponding replication cursor. A diff --git a/psycopg/replication_cursor_type.c b/psycopg/replication_cursor_type.c index a31f6b83..a590386a 100644 --- a/psycopg/replication_cursor_type.c +++ b/psycopg/replication_cursor_type.c @@ -96,19 +96,19 @@ exit: } #define consume_stream_doc \ -"consume_stream(consumer, keepalive_interval=10) -- Consume replication stream." +"consume_stream(consumer, keepalive_interval=None) -- Consume replication stream." static PyObject * consume_stream(replicationCursorObject *self, PyObject *args, PyObject *kwargs) { cursorObject *curs = &self->cur; - PyObject *consume = NULL, *res = NULL; - double keepalive_interval = 10; + PyObject *consume = NULL, *interval = NULL, *res = NULL; + double keepalive_interval = 0; static char *kwlist[] = {"consume", "keepalive_interval", NULL}; - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|d", kwlist, - &consume, &keepalive_interval)) { + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|O", kwlist, + &consume, &interval)) { return NULL; } @@ -119,9 +119,23 @@ consume_stream(replicationCursorObject *self, Dprintf("consume_stream"); - if (keepalive_interval < 1.0) { - psyco_set_error(ProgrammingError, curs, "keepalive_interval must be >= 1 (sec)"); - return NULL; + if (interval && interval != Py_None) { + + if (PyFloat_Check(interval)) { + keepalive_interval = PyFloat_AsDouble(interval); + } else if (PyLong_Check(interval)) { + keepalive_interval = PyLong_AsDouble(interval); + } else if (PyInt_Check(interval)) { + keepalive_interval = PyInt_AsLong(interval); + } else { + psyco_set_error(ProgrammingError, curs, "keepalive_interval must be int or float"); + return NULL; + } + + if (keepalive_interval < 1.0) { + psyco_set_error(ProgrammingError, curs, "keepalive_interval must be >= 1 (sec)"); + return NULL; + } } if (self->consuming) { @@ -138,7 +152,9 @@ consume_stream(replicationCursorObject *self, CLEARPGRES(curs->pgres); self->consuming = 1; - set_status_interval(self, keepalive_interval); + if (keepalive_interval >= 1) { + set_status_interval(self, keepalive_interval); + } if (pq_copy_both(self, consume) >= 0) { res = Py_None;