Move the decode parameter to start_replication().

It makes more sense this way, because otherwise it must be passed to every call
of `read_message()`.
This commit is contained in:
Oleksandr Shulgin 2015-10-23 11:31:55 +02:00
parent 76c7f4a0b5
commit e69dafbecc
6 changed files with 71 additions and 70 deletions

View File

@ -185,10 +185,10 @@ replication::
connection_factory=psycopg2.extras.LogicalReplicationConnection) connection_factory=psycopg2.extras.LogicalReplicationConnection)
cur = conn.cursor() cur = conn.cursor()
try: try:
cur.start_replication(slot_name='pytest') cur.start_replication(slot_name='pytest', decode=True) # test_decoding produces textual output
except psycopg2.ProgrammingError: except psycopg2.ProgrammingError:
cur.create_replication_slot('pytest', output_plugin='test_decoding') cur.create_replication_slot('pytest', output_plugin='test_decoding')
cur.start_replication(slot_name='pytest') cur.start_replication(slot_name='pytest', decode=True)
class DemoConsumer(object): class DemoConsumer(object):
def __call__(self, msg): def __call__(self, msg):
@ -260,9 +260,12 @@ The individual messages in the replication stream are represented by
.. attribute:: payload .. attribute:: payload
The actual data received from the server. An instance of either The actual data received from the server.
``str`` or ``unicode``, depending on the method that was used to
produce this message. An instance of either `bytes()` or `unicode()`, depending on the value
of `decode` option passed to `ReplicationCursor.start_replication()`
on the connection. See `ReplicationCursor.read_message()` for
details.
.. attribute:: data_size .. attribute:: data_size
@ -336,7 +339,7 @@ The individual messages in the replication stream are represented by
Replication slots are a feature of PostgreSQL server starting with Replication slots are a feature of PostgreSQL server starting with
version 9.4. version 9.4.
.. method:: start_replication(slot_name=None, slot_type=None, start_lsn=0, timeline=0, options=None) .. method:: start_replication(slot_name=None, slot_type=None, start_lsn=0, timeline=0, options=None, decode=False)
Start replication on the connection. Start replication on the connection.
@ -352,6 +355,8 @@ The individual messages in the replication stream are represented by
can only be used with physical replication) can only be used with physical replication)
:param options: a dictionary of options to pass to logical replication :param options: a dictionary of options to pass to logical replication
slot (not allowed with physical replication) slot (not allowed with physical replication)
:param decode: a flag indicating that unicode conversion should be
performed on messages received from the server
If a *slot_name* is specified, the slot must exist on the server and If a *slot_name* is specified, the slot must exist on the server and
its type must match the replication type used. its type must match the replication type used.
@ -387,6 +392,11 @@ The individual messages in the replication stream are represented by
on the output plugin that was used to create the slot. Must be on the output plugin that was used to create the slot. Must be
`!None` for physical replication. `!None` for physical replication.
If *decode* is set to `!True` the messages received from the server
would be converted according to the connection `~connection.encoding`.
*This parameter should not be set with physical replication or with
logical replication plugins that produce binary output.*
This function constructs a ``START_REPLICATION`` command and calls This function constructs a ``START_REPLICATION`` command and calls
`start_replication_expert()` internally. `start_replication_expert()` internally.
@ -395,43 +405,40 @@ The individual messages in the replication stream are represented by
`read_message()` in case of :ref:`asynchronous connection `read_message()` in case of :ref:`asynchronous connection
<async-support>`. <async-support>`.
.. method:: start_replication_expert(command) .. method:: start_replication_expert(command, decode=False)
Start replication on the connection using provided ``START_REPLICATION`` Start replication on the connection using provided
command. ``START_REPLICATION`` command. See `start_replication()` for
description of *decode* parameter.
.. method:: consume_stream(consume, decode=False, keepalive_interval=10) .. method:: consume_stream(consume, keepalive_interval=10)
:param consume: a callable object with signature ``consume(msg)`` :param consume: a callable object with signature ``consume(msg)``
:param decode: a flag indicating that unicode conversion should be
performed on the messages received from the server
:param keepalive_interval: interval (in seconds) to send keepalive :param keepalive_interval: interval (in seconds) to send keepalive
messages to the server messages to the server
This method can only be used with synchronous connection. For This method can only be used with synchronous connection. For
asynchronous connections see `read_message()`. asynchronous connections see `read_message()`.
Before calling this method to consume the stream use Before using this method to consume the stream call
`start_replication()` first. `start_replication()` first.
This method enters an endless loop reading messages from the server This method enters an endless loop reading messages from the server
and passing them to ``consume()``, then waiting for more messages from and passing them to ``consume()`` one at a time, then waiting for more
the server. In order to make this method break out of the loop and messages from the server. In order to make this method break out of
return, ``consume()`` can throw a `StopReplication` exception. Any the loop and return, ``consume()`` can throw a `StopReplication`
unhandled exception will make it break out of the loop as well. exception. Any unhandled exception will make it break out of the loop
as well.
If *decode* is set to `!True` the messages read from the server are The *msg* object passed to ``consume()`` is an instance of
converted according to the connection `~connection.encoding`. This `ReplicationMessage` class. See `read_message()` for details about
parameter should not be set with physical replication. message decoding.
This method also sends keepalive messages to the server in case there This method also sends keepalive messages to the server in case there
were no new data from the server for the duration of were no new data from the server for the duration of
*keepalive_interval* (in seconds). The value of this parameter must *keepalive_interval* (in seconds). The value of this parameter must
be set to at least 1 second, but it can have a fractional part. be set to at least 1 second, but it can have a fractional part.
The *msg* objects passed to ``consume()`` are instances of
`ReplicationMessage` class.
After processing certain amount of messages the client should send a After processing certain amount of messages the client should send a
confirmation message to the server. This should be done by calling confirmation message to the server. This should be done by calling
`send_feedback()` method on the corresponding replication cursor. A `send_feedback()` method on the corresponding replication cursor. A
@ -452,7 +459,7 @@ The individual messages in the replication stream are represented by
msg.cursor.send_feedback(flush_lsn=msg.data_start) msg.cursor.send_feedback(flush_lsn=msg.data_start)
consumer = LogicalStreamConsumer() consumer = LogicalStreamConsumer()
cur.consume_stream(consumer, decode=True) cur.consume_stream(consumer)
.. warning:: .. warning::
@ -510,17 +517,21 @@ The individual messages in the replication stream are represented by
for better control, in particular to `~select` on multiple sockets. The for better control, in particular to `~select` on multiple sockets. The
following methods are provided for asynchronous operation: following methods are provided for asynchronous operation:
.. method:: read_message(decode=True) .. method:: read_message()
:param decode: a flag indicating that unicode conversion should be Try to read the next message from the server without blocking and
performed on the data received from the server return an instance of `ReplicationMessage` or `!None`, in case there
are no more data messages from the server at the moment.
This method should be used in a loop with asynchronous connections This method should be used in a loop with asynchronous connections
after calling `start_replication()` once. (after calling `start_replication()` once). For synchronous
connections see `consume_stream()`.
It tries to read the next message from the server without blocking and The returned message's `ReplicationMessage.payload` is an instance of
returns an instance of `ReplicationMessage` or `!None`, in case there `unicode()` decoded according to connection `connection.encoding`
are no more data messages from the server at the moment. *iff* `decode` was set to `!True` in the initial call to
`start_replication()` on this connection, otherwise it is an instance
of `bytes()` with no decoding.
It is expected that the calling code will call this method repeatedly It is expected that the calling code will call this method repeatedly
in order to consume all of the messages that might have been buffered in order to consume all of the messages that might have been buffered

View File

@ -548,7 +548,7 @@ class ReplicationCursor(_replicationCursor):
self.execute(command) self.execute(command)
def start_replication(self, slot_name=None, slot_type=None, start_lsn=0, def start_replication(self, slot_name=None, slot_type=None, start_lsn=0,
timeline=0, options=None): timeline=0, options=None, decode=False):
"""Start replication stream.""" """Start replication stream."""
command = "START_REPLICATION " command = "START_REPLICATION "
@ -597,7 +597,7 @@ class ReplicationCursor(_replicationCursor):
command += "%s %s" % (quote_ident(k, self), _A(str(v))) command += "%s %s" % (quote_ident(k, self), _A(str(v)))
command += ")" command += ")"
self.start_replication_expert(command) self.start_replication_expert(command, decode=decode)
# allows replication cursors to be used in select.select() directly # allows replication cursors to be used in select.select() directly
def fileno(self): def fileno(self):

View File

@ -1543,7 +1543,7 @@ exit:
are never returned to the caller. are never returned to the caller.
*/ */
PyObject * PyObject *
pq_read_replication_message(replicationCursorObject *repl, int decode) pq_read_replication_message(replicationCursorObject *repl)
{ {
cursorObject *curs = &repl->cur; cursorObject *curs = &repl->cur;
connectionObject *conn = curs->conn; connectionObject *conn = curs->conn;
@ -1555,7 +1555,7 @@ pq_read_replication_message(replicationCursorObject *repl, int decode)
PyObject *str = NULL, *result = NULL; PyObject *str = NULL, *result = NULL;
replicationMessageObject *msg = NULL; replicationMessageObject *msg = NULL;
Dprintf("pq_read_replication_message(decode=%d)", decode); Dprintf("pq_read_replication_message");
consumed = 0; consumed = 0;
retry: retry:
@ -1629,8 +1629,7 @@ retry:
Dprintf("pq_read_replication_message: >>%.*s<<", data_size, buffer + hdr); Dprintf("pq_read_replication_message: >>%.*s<<", data_size, buffer + hdr);
/* XXX it would be wise to check if it's really a logical replication */ if (repl->decode) {
if (decode) {
str = PyUnicode_Decode(buffer + hdr, data_size, conn->codec, NULL); str = PyUnicode_Decode(buffer + hdr, data_size, conn->codec, NULL);
} else { } else {
str = Bytes_FromStringAndSize(buffer + hdr, data_size); str = Bytes_FromStringAndSize(buffer + hdr, data_size);
@ -1730,8 +1729,7 @@ pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested)
manages to send keepalive messages to the server as needed. manages to send keepalive messages to the server as needed.
*/ */
int int
pq_copy_both(replicationCursorObject *repl, PyObject *consume, int decode, pq_copy_both(replicationCursorObject *repl, PyObject *consume, double keepalive_interval)
double keepalive_interval)
{ {
cursorObject *curs = &repl->cur; cursorObject *curs = &repl->cur;
connectionObject *conn = curs->conn; connectionObject *conn = curs->conn;
@ -1752,7 +1750,7 @@ pq_copy_both(replicationCursorObject *repl, PyObject *consume, int decode,
keep_intr.tv_usec = (keepalive_interval - keep_intr.tv_sec)*1.0e6; keep_intr.tv_usec = (keepalive_interval - keep_intr.tv_sec)*1.0e6;
while (1) { while (1) {
msg = pq_read_replication_message(repl, decode); msg = pq_read_replication_message(repl);
if (!msg) { if (!msg) {
goto exit; goto exit;
} }

View File

@ -75,8 +75,8 @@ RAISES HIDDEN void pq_complete_error(connectionObject *conn, PGresult **pgres,
/* replication protocol support */ /* replication protocol support */
HIDDEN int pq_copy_both(replicationCursorObject *repl, PyObject *consumer, HIDDEN int pq_copy_both(replicationCursorObject *repl, PyObject *consumer,
int decode, double keepalive_interval); double keepalive_interval);
HIDDEN PyObject *pq_read_replication_message(replicationCursorObject *repl, int decode); HIDDEN PyObject *pq_read_replication_message(replicationCursorObject *repl);
HIDDEN int pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested); HIDDEN int pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested);
#endif /* !defined(PSYCOPG_PQPATH_H) */ #endif /* !defined(PSYCOPG_PQPATH_H) */

View File

@ -40,8 +40,9 @@ typedef struct replicationCursorObject {
int started:1; /* if replication is started */ int started:1; /* if replication is started */
int consuming:1; /* if running the consume loop */ int consuming:1; /* if running the consume loop */
int decode:1; /* if we should use character decoding on the messages */
struct timeval last_io; /* timestamp of the last exchange with the server */ struct timeval last_io ; /* timestamp of the last exchange with the server */
struct timeval keepalive_interval; /* interval for keepalive messages in replication mode */ struct timeval keepalive_interval; /* interval for keepalive messages in replication mode */
XLogRecPtr write_lsn; /* LSN stats for replication feedback messages */ XLogRecPtr write_lsn; /* LSN stats for replication feedback messages */

View File

@ -39,7 +39,7 @@
#define psyco_repl_curs_start_replication_expert_doc \ #define psyco_repl_curs_start_replication_expert_doc \
"start_replication_expert(command, writer=None, keepalive_interval=10) -- Start replication stream with a directly given command." "start_replication_expert(command, decode=False) -- Start replication with a given command."
static PyObject * static PyObject *
psyco_repl_curs_start_replication_expert(replicationCursorObject *self, psyco_repl_curs_start_replication_expert(replicationCursorObject *self,
@ -49,9 +49,10 @@ psyco_repl_curs_start_replication_expert(replicationCursorObject *self,
connectionObject *conn = self->cur.conn; connectionObject *conn = self->cur.conn;
PyObject *res = NULL; PyObject *res = NULL;
char *command; char *command;
static char *kwlist[] = {"command", NULL}; long int decode = 0;
static char *kwlist[] = {"command", "decode", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s", kwlist, &command)) { if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|l", kwlist, &command, &decode)) {
return NULL; return NULL;
} }
@ -60,17 +61,15 @@ psyco_repl_curs_start_replication_expert(replicationCursorObject *self,
EXC_IF_TPC_PREPARED(conn, start_replication_expert); EXC_IF_TPC_PREPARED(conn, start_replication_expert);
EXC_IF_REPLICATING(self, start_replication_expert); EXC_IF_REPLICATING(self, start_replication_expert);
Dprintf("psyco_repl_curs_start_replication_expert: %s", command); Dprintf("psyco_repl_curs_start_replication_expert: '%s'; decode: %d", command, decode);
/* self->copysize = 0;*/
gettimeofday(&self->last_io, NULL);
if (pq_execute(curs, command, conn->async, 1 /* no_result */, 1 /* no_begin */) >= 0) { if (pq_execute(curs, command, conn->async, 1 /* no_result */, 1 /* no_begin */) >= 0) {
res = Py_None; res = Py_None;
Py_INCREF(res); Py_INCREF(res);
self->started = 1; self->started = 1;
self->decode = decode;
gettimeofday(&self->last_io, NULL);
} }
return res; return res;
@ -85,12 +84,11 @@ psyco_repl_curs_consume_stream(replicationCursorObject *self,
{ {
cursorObject *curs = &self->cur; cursorObject *curs = &self->cur;
PyObject *consume = NULL, *res = NULL; PyObject *consume = NULL, *res = NULL;
int decode = 0;
double keepalive_interval = 10; double keepalive_interval = 10;
static char *kwlist[] = {"consume", "decode", "keepalive_interval", NULL}; static char *kwlist[] = {"consume", "keepalive_interval", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|id", kwlist, if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|id", kwlist,
&consume, &decode, &keepalive_interval)) { &consume, &keepalive_interval)) {
return NULL; return NULL;
} }
@ -115,7 +113,7 @@ psyco_repl_curs_consume_stream(replicationCursorObject *self,
self->consuming = 1; self->consuming = 1;
if (pq_copy_both(self, consume, decode, keepalive_interval) >= 0) { if (pq_copy_both(self, consume, keepalive_interval) >= 0) {
res = Py_None; res = Py_None;
Py_INCREF(res); Py_INCREF(res);
} }
@ -126,27 +124,19 @@ psyco_repl_curs_consume_stream(replicationCursorObject *self,
} }
#define psyco_repl_curs_read_message_doc \ #define psyco_repl_curs_read_message_doc \
"read_message(decode=True) -- Try reading a replication message from the server (non-blocking)." "read_message() -- Try reading a replication message from the server (non-blocking)."
static PyObject * static PyObject *
psyco_repl_curs_read_message(replicationCursorObject *self, psyco_repl_curs_read_message(replicationCursorObject *self)
PyObject *args, PyObject *kwargs)
{ {
cursorObject *curs = &self->cur; cursorObject *curs = &self->cur;
int decode = 1;
static char *kwlist[] = {"decode", NULL};
EXC_IF_CURS_CLOSED(curs); EXC_IF_CURS_CLOSED(curs);
EXC_IF_GREEN(read_message); EXC_IF_GREEN(read_message);
EXC_IF_TPC_PREPARED(self->cur.conn, read_message); EXC_IF_TPC_PREPARED(self->cur.conn, read_message);
EXC_IF_NOT_REPLICATING(self, read_message); EXC_IF_NOT_REPLICATING(self, read_message);
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|i", kwlist, return pq_read_replication_message(self);
&decode)) {
return NULL;
}
return pq_read_replication_message(self, decode);
} }
static PyObject * static PyObject *
@ -267,7 +257,7 @@ static struct PyMethodDef replicationCursorObject_methods[] = {
{"consume_stream", (PyCFunction)psyco_repl_curs_consume_stream, {"consume_stream", (PyCFunction)psyco_repl_curs_consume_stream,
METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_consume_stream_doc}, METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_consume_stream_doc},
{"read_message", (PyCFunction)psyco_repl_curs_read_message, {"read_message", (PyCFunction)psyco_repl_curs_read_message,
METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_read_message_doc}, METH_NOARGS, psyco_repl_curs_read_message_doc},
{"send_feedback", (PyCFunction)psyco_repl_curs_send_feedback, {"send_feedback", (PyCFunction)psyco_repl_curs_send_feedback,
METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_send_feedback_doc}, METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_send_feedback_doc},
{"flush_feedback", (PyCFunction)psyco_repl_curs_flush_feedback, {"flush_feedback", (PyCFunction)psyco_repl_curs_flush_feedback,
@ -289,6 +279,7 @@ replicationCursor_setup(replicationCursorObject* self)
{ {
self->started = 0; self->started = 0;
self->consuming = 0; self->consuming = 0;
self->decode = 0;
self->write_lsn = InvalidXLogRecPtr; self->write_lsn = InvalidXLogRecPtr;
self->flush_lsn = InvalidXLogRecPtr; self->flush_lsn = InvalidXLogRecPtr;