Rework replication connection/cursor classes

This commit is contained in:
Oleksandr Shulgin 2015-10-01 19:28:00 +02:00
parent cac83da5db
commit 0233620c26
8 changed files with 368 additions and 242 deletions

View File

@ -144,6 +144,15 @@ Logging cursor
Replication cursor Replication cursor
^^^^^^^^^^^^^^^^^^ ^^^^^^^^^^^^^^^^^^
.. autoclass:: ReplicationConnectionBase
The following replication types are defined:
.. data:: REPLICATION_LOGICAL
.. data:: REPLICATION_PHYSICAL
.. autoclass:: LogicalReplicationConnection .. autoclass:: LogicalReplicationConnection
This connection factory class can be used to open a special type of This connection factory class can be used to open a special type of
@ -167,7 +176,6 @@ Replication cursor
phys_conn = psycopg2.connect(dsn, connection_factory=PhysicalReplicationConnection) phys_conn = psycopg2.connect(dsn, connection_factory=PhysicalReplicationConnection)
phys_cur = phys_conn.cursor() phys_cur = phys_conn.cursor()
Both `LogicalReplicationConnection` and `PhysicalReplicationConnection` use Both `LogicalReplicationConnection` and `PhysicalReplicationConnection` use
`ReplicationCursor` for actual communication on the connection. `ReplicationCursor` for actual communication on the connection.
@ -177,6 +185,41 @@ Replication cursor
.. __: http://www.postgresql.org/docs/current/static/protocol-replication.html .. __: http://www.postgresql.org/docs/current/static/protocol-replication.html
The individual messages in the replication stream are presented by
`ReplicationMessage` objects:
.. autoclass:: ReplicationMessage
.. attribute:: payload
The actual data received from the server. An instance of either
``str`` or ``unicode``, depending on the method that was used to
produce this message.
.. attribute:: data_size
The raw size of the message payload (before possible unicode
conversion).
.. attribute:: data_start
LSN position of the start of the message.
.. attribute:: wal_end
LSN position of the current end of WAL on the server.
.. attribute:: send_time
A `~datetime` object representing the server timestamp at the moment
when the message was sent.
.. attribute:: cursor
A reference to the corresponding `ReplicationCursor` object.
.. autoclass:: ReplicationCursor .. autoclass:: ReplicationCursor
.. method:: identify_system() .. method:: identify_system()
@ -233,19 +276,16 @@ Replication cursor
# either logical or physical replication connection # either logical or physical replication connection
cur.drop_replication_slot("slot1") cur.drop_replication_slot("slot1")
This
Replication slots are a feature of PostgreSQL server starting with Replication slots are a feature of PostgreSQL server starting with
version 9.4. version 9.4.
.. method:: start_replication(slot_name=None, writer=None, slot_type=None, start_lsn=0, timeline=0, keepalive_interval=10, options=None) .. method:: start_replication(slot_name=None, slot_type=None, start_lsn=0, timeline=0, options=None)
Start replication on the connection. Start replication on the connection.
:param slot_name: name of the replication slot to use; required for :param slot_name: name of the replication slot to use; required for
logical replication, physical replication can work logical replication, physical replication can work
with or without a slot with or without a slot
:param writer: a file-like object to write replication messages to
:param slot_type: type of replication: should be either :param slot_type: type of replication: should be either
`REPLICATION_LOGICAL` or `REPLICATION_PHYSICAL` `REPLICATION_LOGICAL` or `REPLICATION_PHYSICAL`
:param start_lsn: the optional LSN position to start replicating from, :param start_lsn: the optional LSN position to start replicating from,
@ -253,16 +293,16 @@ Replication cursor
in the form ``XXX/XXX`` in the form ``XXX/XXX``
:param timeline: WAL history timeline to start streaming from (optional, :param timeline: WAL history timeline to start streaming from (optional,
can only be used with physical replication) can only be used with physical replication)
:param keepalive_interval: interval (in seconds) to send keepalive
messages to the server
:param options: a dictionary of options to pass to logical replication :param options: a dictionary of options to pass to logical replication
slot (not allowed with physical replication, set to slot (not allowed with physical replication)
*None*)
If a *slot_name* is specified, the slot must exist on the server and
its type must match the replication type used.
If not specified using *slot_type* parameter, the type of replication If not specified using *slot_type* parameter, the type of replication
to be started is defined by the type of replication connection. is defined by the type of replication connection. Logical replication
Logical replication is only allowed on logical replication connection, is only allowed on logical replication connection, but physical
but physical replication can be used with both types of connection. replication can be used with both types of connection.
On the other hand, physical replication doesn't require a named On the other hand, physical replication doesn't require a named
replication slot to be used, only logical one does. In any case, replication slot to be used, only logical one does. In any case,
@ -270,56 +310,99 @@ Replication cursor
server starting with version 9.4. Physical replication can be used server starting with version 9.4. Physical replication can be used
starting with 9.0. starting with 9.0.
If a *slot_name* is specified, the slot must exist on the server and If *start_lsn* is specified, the requested stream will start from that
its type must match the replication type used. LSN. The default is `!None`, which passes the LSN ``0/0``, causing
replay to begin at the last point at which the server got replay
confirmation from the client for, or the oldest available point for a
new slot.
When used on non-asynchronous connection this method enters an endless The server might produce an error if a WAL file for the given LSN has
loop, reading messages from the server and passing them to ``write()`` already been recycled, or it may silently start streaming from a later
method of the *writer* object. This is similar to operation of the position: the client can verify the actual position using information
`~cursor.copy_to()` method. It also sends keepalive messages to the provided the `ReplicationMessage` attributes. The exact server
server, in case there were no new data from it for the duration of behavior depends on the type of replication and use of slots.
*keepalive_interval* seconds (this parameter's value must be equal to
at least than 1 second, but it can have a fractional part).
With asynchronous connection, this method returns immediately and the A *timeline* parameter can only be specified with physical replication
calling code can start reading the replication messages in a loop. and only starting with server version 9.3.
A sketch implementation of the *writer* object for logical replication A dictionary of *options* may be passed to the logical decoding plugin
might look similar to the following:: on a logical replication slot. The set of supported options depends
on the output plugin that was used to create the slot. Must be
`!None` for physical replication.
from io import TextIOBase This function constructs a ``START_REPLICATION`` command and calls
`start_replication_expert()` internally.
class LogicalStreamWriter(TextIOBase): After starting the replication, to actually consume the incoming
server messages, use `consume_replication_stream()` or implement a
loop around `read_replication_message()` in case of asynchronous
connection.
def write(self, msg): .. method:: start_replication_expert(command)
Start replication on the connection using provided ``START_REPLICATION``
command.
.. method:: consume_replication_stream(consumer, decode=False, keepalive_interval=10)
:param consumer: an object providing ``consume()`` method
:param decode: a flag indicating that unicode conversion should be
performed on the messages received from the server
:param keepalive_interval: interval (in seconds) to send keepalive
messages to the server
This method can only be used with synchronous connection. For
asynchronous connections see `read_replication_message()`.
Before calling this method to consume the stream, use
`start_replication()` first.
When called, this method enters an endless loop, reading messages from
the server and passing them to ``consume()`` method of the *consumer*
object. In order to make this method break out of the loop and
return, the ``consume()`` method can call `stop_replication()` on the
cursor or it can throw an exception.
If *decode* is set to `!True`, the messages read from the server are
converted according to the connection `~connection.encoding`. This
parameter should not be set with physical replication.
This method also sends keepalive messages to the server, in case there
were no new data from the server for the duration of
*keepalive_interval* (in seconds). The value of this parameter must
be equal to at least 1 second, but it can have a fractional part.
The following example is a sketch implementation of *consumer* object
for logical replication::
class LogicalStreamConsumer(object):
def consume(self, msg):
self.store_message_data(msg.payload) self.store_message_data(msg.payload)
if self.should_report_to_the_server_now(msg): if self.should_report_to_the_server_now(msg):
msg.cursor.send_replication_feedback(flush_lsn=msg.wal_end) msg.cursor.send_replication_feedback(flush_lsn=msg.data_start)
First, like with the `~cursor.copy_to()` method, the code that calls consumer = LogicalStreamConsumer()
the provided ``write()`` method checks if the *writer* object is cur.consume_replication_stream(consumer, decode=True)
inherited from `~io.TextIOBase`. If that is the case, the message
payload to be passed is converted to unicode using the connection's
`~connection.encoding` information. Otherwise, the message is passed
as is.
The *msg* object being passed is an instance of `~ReplicationMessage` The *msg* objects passed to the ``consume()`` method are instances of
class. `ReplicationMessage` class.
After storing certain amount of messages' data reliably, the client After storing certain amount of messages' data reliably, the client
should send a confirmation message to the server. This should be done should send a confirmation message to the server. This should be done
by calling `~send_replication_feedback()` method on the corresponding by calling `send_replication_feedback()` method on the corresponding
replication cursor. A reference to the cursor is provided in the replication cursor. A reference to the cursor is provided in the
`~ReplicationMessage` as an attribute. `ReplicationMessage` as an attribute.
.. warning:: .. warning::
Failure to properly notify the server by constantly consuming and When using replication with slots, failure to properly notify the
reporting success at appropriate times can eventually lead to "disk server by constantly consuming and reporting success at
full" condition on the server, because the server retains all the appropriate times can eventually lead to "disk full" condition on
WAL segments that might be needed to stream the changes via all of the server, because the server retains all the WAL segments that
the currently open replication slots. might be needed to stream the changes via all of the currently
open replication slots.
On the other hand, it is not recommended to send a confirmation On the other hand, it is not recommended to send a confirmation
after every processed message, since that will put an unnecessary after every processed message, since that will put an unnecessary
@ -328,9 +411,11 @@ Replication cursor
.. method:: stop_replication() .. method:: stop_replication()
In non-asynchronous connection, when called from the ``write()`` This method can be called on synchronous connections from the
method, tell the code in `~start_replication` to break out of the ``consume()`` method of a ``consumer`` object in order to break out of
endless loop and return. the endless loop in `consume_replication_stream()`. If called on
asynchronous connection or outside of the consume loop, this method
raises an error.
.. method:: send_replication_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False) .. method:: send_replication_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False)
@ -344,29 +429,37 @@ Replication cursor
:param reply: request the server to send back a keepalive message immediately :param reply: request the server to send back a keepalive message immediately
Use this method to report to the server that all messages up to a Use this method to report to the server that all messages up to a
certain LSN position have been stored and may be discarded. certain LSN position have been stored on the client and may be
discarded on the server.
This method can also be called with all default parameters' values to This method can also be called with all default parameters' values to
send a keepalive message to the server. just send a keepalive message to the server.
In case of asynchronous connection, if the feedback message cannot be If the feedback message could not be sent, updates the passed LSN
sent at the moment, remembers the passed LSN positions for a later positions in the cursor for a later call to
hopefully successful call or call to `~flush_replication_feedback()`. `flush_replication_feedback()` and returns `!False`, otherwise returns
`!True`.
.. method:: flush_replication_feedback(reply=False) .. method:: flush_replication_feedback(reply=False)
:param reply: request the server to send back a keepalive message immediately :param reply: request the server to send back a keepalive message immediately
This method tries to flush the latest replication feedback message This method tries to flush the latest replication feedback message
that `~send_replication_feedback()` was trying to send, if any. that `send_replication_feedback()` was trying to send but couldn't.
If *reply* is `!True` sends a keepalive message in either case.
Returns `!True` if the feedback message was sent successfully,
`!False` otherwise.
Low-level methods for asynchronous connection operation. Low-level methods for asynchronous connection operation.
With the non-asynchronous connection, a single call to With the synchronous connection, a call to `consume_replication_stream()`
`~start_replication()` handles all the complexity, but at times it might handles all the complexity of handling the incoming messages and sending
be beneficial to use low-level interface for better control, in particular keepalive replies, but at times it might be beneficial to use low-level
to `~select.select()` on multiple sockets. The following methods are interface for better control, in particular to `~select.select()` on
provided for asynchronous operation: multiple sockets. The following methods are provided for asynchronous
operation:
.. method:: read_replication_message(decode=True) .. method:: read_replication_message(decode=True)
@ -374,18 +467,18 @@ Replication cursor
performed on the data received from the server performed on the data received from the server
This method should be used in a loop with asynchronous connections This method should be used in a loop with asynchronous connections
after calling `~start_replication()` once. after calling `start_replication()` once.
It tries to read the next message from the server, without blocking It tries to read the next message from the server, without blocking
and returns an instance of `~ReplicationMessage` or *None*, in case and returns an instance of `ReplicationMessage` or `!None`, in case
there are no more data messages from the server at the moment. there are no more data messages from the server at the moment.
It is expected that the calling code will call this method repeatedly It is expected that the calling code will call this method repeatedly
in order to consume all of the messages that might have been buffered, in order to consume all of the messages that might have been buffered,
until *None* is returned. After receiving a *None* value from this until `!None` is returned. After receiving a `!None` value from this
method, one might use `~select.select()` or `~select.poll()` on the method, the caller should use `~select.select()` or `~select.poll()`
corresponding connection to block the process until there is more data on the corresponding connection to block the process until there is
from the server. more data from the server.
The server can send keepalive messages to the client periodically. The server can send keepalive messages to the client periodically.
Such messages are silently consumed by this method and are never Such messages are silently consumed by this method and are never
@ -408,46 +501,20 @@ Replication cursor
An actual example of asynchronous operation might look like this:: An actual example of asynchronous operation might look like this::
keepalive_interval = 10.0 keepalive_interval = 10.0
while True:
if (datetime.now() - cur.replication_io_timestamp).total_seconds() >= keepalive_interval:
cur.send_replication_feedback()
while True: while True:
msg = cur.read_replication_message() msg = cur.read_replication_message()
if not msg: if msg:
break consumer.consume(msg)
writer.write(msg) else:
timeout = keepalive_interval - (datetime.now() - cur.replication_io_timestamp).total_seconds() timeout = keepalive_interval - (datetime.now() - cur.replication_io_timestamp).total_seconds()
if timeout > 0: if timeout > 0:
select.select([cur], [], [], timeout) sel = select.select([cur], [], [], timeout)
else:
sel = []
.. autoclass:: ReplicationMessage if not sel:
cur.send_replication_feedback()
.. attribute:: payload
The actual data received from the server. An instance of either
``str`` or ``unicode``.
.. attribute:: data_start
LSN position of the start of the message.
.. attribute:: wal_end
LSN position of the end of the message.
.. attribute:: send_time
A `~datetime` object representing the server timestamp at the moment
when the message was sent.
.. attribute:: cursor
A reference to the corresponding `~ReplicationCursor` object.
.. data:: REPLICATION_LOGICAL
.. data:: REPLICATION_PHYSICAL
.. index:: .. index::
pair: Cursor; Replication pair: Cursor; Replication

View File

@ -544,9 +544,9 @@ class ReplicationCursor(_cursor):
command = "DROP_REPLICATION_SLOT %s" % self.connection.quote_ident(slot_name) command = "DROP_REPLICATION_SLOT %s" % self.connection.quote_ident(slot_name)
self.execute(command) self.execute(command)
def start_replication(self, slot_name=None, writer=None, slot_type=None, start_lsn=0, def start_replication(self, slot_name=None, slot_type=None, start_lsn=0,
timeline=0, keepalive_interval=10, options=None): timeline=0, options=None):
"""Start and consume replication stream.""" """Start replication stream."""
command = "START_REPLICATION " command = "START_REPLICATION "
@ -594,8 +594,7 @@ class ReplicationCursor(_cursor):
command += "%s %s" % (self.connection.quote_ident(k), _A(str(v))) command += "%s %s" % (self.connection.quote_ident(k), _A(str(v)))
command += ")" command += ")"
return self.start_replication_expert(command, writer=writer, return self.start_replication_expert(command)
keepalive_interval=keepalive_interval)
def send_feedback_message(self, written_lsn=0, sync_lsn=0, apply_lsn=0, reply_requested=False): def send_feedback_message(self, written_lsn=0, sync_lsn=0, apply_lsn=0, reply_requested=False):
return self.send_replication_feedback(written_lsn, sync_lsn, apply_lsn, reply_requested) return self.send_replication_feedback(written_lsn, sync_lsn, apply_lsn, reply_requested)

View File

@ -73,7 +73,9 @@ struct cursorObject {
#define DEFAULT_COPYSIZE 16384 #define DEFAULT_COPYSIZE 16384
#define DEFAULT_COPYBUFF 8192 #define DEFAULT_COPYBUFF 8192
int repl_stop; /* if client requested to stop replication */ /* replication cursor attrs */
int repl_started:1; /* if replication is started */
int repl_stop:1; /* if client requested to stop replication */
struct timeval repl_keepalive_interval; /* interval for keepalive messages in replication mode */ struct timeval repl_keepalive_interval; /* interval for keepalive messages in replication mode */
XLogRecPtr repl_write_lsn; /* LSN stats for replication feedback messages */ XLogRecPtr repl_write_lsn; /* LSN stats for replication feedback messages */
XLogRecPtr repl_flush_lsn; XLogRecPtr repl_flush_lsn;

View File

@ -36,6 +36,7 @@
#include "psycopg/microprotocols_proto.h" #include "psycopg/microprotocols_proto.h"
#include <string.h> #include <string.h>
#include <stdlib.h> #include <stdlib.h>
/* python */ /* python */
@ -1588,13 +1589,11 @@ exit:
static PyObject * static PyObject *
psyco_curs_start_replication_expert(cursorObject *self, PyObject *args, PyObject *kwargs) psyco_curs_start_replication_expert(cursorObject *self, PyObject *args, PyObject *kwargs)
{ {
PyObject *writer = NULL, *res = NULL; PyObject *res = NULL;
char *command; char *command;
double keepalive_interval = 10; static char *kwlist[] = {"command", NULL};
static char *kwlist[] = {"command", "writer", "keepalive_interval", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|Od", kwlist, if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s", kwlist, &command)) {
&command, &writer, &keepalive_interval)) {
return NULL; return NULL;
} }
@ -1602,21 +1601,15 @@ psyco_curs_start_replication_expert(cursorObject *self, PyObject *args, PyObject
EXC_IF_GREEN(start_replication_expert); EXC_IF_GREEN(start_replication_expert);
EXC_IF_TPC_PREPARED(self->conn, start_replication_expert); EXC_IF_TPC_PREPARED(self->conn, start_replication_expert);
Dprintf("psyco_curs_start_replication_expert: command = %s", command); if (self->repl_started) {
psyco_set_error(ProgrammingError, self, "replication already in progress");
if (keepalive_interval < 1.0) {
psyco_set_error(ProgrammingError, self, "keepalive_interval must be >= 1sec");
return NULL; return NULL;
} }
self->copysize = 0; Dprintf("psyco_curs_start_replication_expert: command = %s", command);
Py_XINCREF(writer);
self->copyfile = writer;
self->copysize = 0;
self->repl_stop = 0; self->repl_stop = 0;
self->repl_keepalive_interval.tv_sec = (int)keepalive_interval;
self->repl_keepalive_interval.tv_usec =
(keepalive_interval - (int)keepalive_interval)*1.0e6;
self->repl_write_lsn = InvalidXLogRecPtr; self->repl_write_lsn = InvalidXLogRecPtr;
self->repl_flush_lsn = InvalidXLogRecPtr; self->repl_flush_lsn = InvalidXLogRecPtr;
@ -1631,7 +1624,7 @@ psyco_curs_start_replication_expert(cursorObject *self, PyObject *args, PyObject
Py_INCREF(res); Py_INCREF(res);
} }
Py_CLEAR(self->copyfile); self->repl_started = 1;
return res; return res;
} }
@ -1643,12 +1636,54 @@ static PyObject *
psyco_curs_stop_replication(cursorObject *self) psyco_curs_stop_replication(cursorObject *self)
{ {
EXC_IF_CURS_CLOSED(self); EXC_IF_CURS_CLOSED(self);
EXC_IF_CURS_ASYNC(self, stop_replication);
if (!self->repl_started || self->repl_stop) {
psyco_set_error(ProgrammingError, self, "replication is not in progress");
return NULL;
}
self->repl_stop = 1; self->repl_stop = 1;
Py_RETURN_NONE; Py_RETURN_NONE;
} }
#define psyco_curs_consume_replication_stream_doc \
"consume_replication_stream(consumer, keepalive_interval=10) -- Consume replication stream."
static PyObject *
psyco_curs_consume_replication_stream(cursorObject *self, PyObject *args, PyObject *kwargs)
{
PyObject *consumer = NULL, *res = NULL;
int decode = 0;
double keepalive_interval = 10;
static char *kwlist[] = {"consumer", "decode", "keepalive_interval", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|id", kwlist,
&consumer, &decode, &keepalive_interval)) {
return NULL;
}
EXC_IF_CURS_CLOSED(self);
EXC_IF_CURS_ASYNC(self, consume_replication_stream);
EXC_IF_GREEN(consume_replication_stream);
EXC_IF_TPC_PREPARED(self->conn, consume_replication_stream);
Dprintf("psyco_curs_consume_replication_stream");
if (keepalive_interval < 1.0) {
psyco_set_error(ProgrammingError, self, "keepalive_interval must be >= 1 (sec)");
return NULL;
}
if (pq_copy_both(self, consumer, decode, keepalive_interval) >= 0) {
res = Py_None;
Py_INCREF(res);
}
return res;
}
#define psyco_curs_read_replication_message_doc \ #define psyco_curs_read_replication_message_doc \
"read_replication_message(decode=True) -- Try reading a replication message from the server (non-blocking)." "read_replication_message(decode=True) -- Try reading a replication message from the server (non-blocking)."
@ -1673,7 +1708,7 @@ psyco_curs_read_replication_message(cursorObject *self, PyObject *args, PyObject
static PyObject * static PyObject *
curs_flush_replication_feedback(cursorObject *self, int reply) curs_flush_replication_feedback(cursorObject *self, int reply)
{ {
if (!self->repl_feedback_pending) if (!(self->repl_feedback_pending || reply))
Py_RETURN_FALSE; Py_RETURN_FALSE;
if (pq_send_replication_feedback(self, reply)) { if (pq_send_replication_feedback(self, reply)) {
@ -1939,6 +1974,8 @@ static struct PyMethodDef cursorObject_methods[] = {
METH_VARARGS|METH_KEYWORDS, psyco_curs_start_replication_expert_doc}, METH_VARARGS|METH_KEYWORDS, psyco_curs_start_replication_expert_doc},
{"stop_replication", (PyCFunction)psyco_curs_stop_replication, {"stop_replication", (PyCFunction)psyco_curs_stop_replication,
METH_NOARGS, psyco_curs_stop_replication_doc}, METH_NOARGS, psyco_curs_stop_replication_doc},
{"consume_replication_stream", (PyCFunction)psyco_curs_consume_replication_stream,
METH_VARARGS|METH_KEYWORDS, psyco_curs_consume_replication_stream_doc},
{"read_replication_message", (PyCFunction)psyco_curs_read_replication_message, {"read_replication_message", (PyCFunction)psyco_curs_read_replication_message,
METH_VARARGS|METH_KEYWORDS, psyco_curs_read_replication_message_doc}, METH_VARARGS|METH_KEYWORDS, psyco_curs_read_replication_message_doc},
{"send_replication_feedback", (PyCFunction)psyco_curs_send_replication_feedback, {"send_replication_feedback", (PyCFunction)psyco_curs_send_replication_feedback,

View File

@ -1531,18 +1531,28 @@ exit:
return ret; return ret;
} }
/* ignores keepalive messages */ /* Tries to read the next message from the replication stream, without
blocking, in both sync and async connection modes. If no message
is ready in the CopyData buffer, tries to read from the server,
again without blocking. If that doesn't help, returns Py_None.
The caller is then supposed to block on the socket(s) and call this
function again.
Any keepalive messages from the server are silently consumed and
are never returned to the caller.
*/
PyObject * PyObject *
pq_read_replication_message(cursorObject *curs, int decode) pq_read_replication_message(cursorObject *curs, int decode)
{ {
char *buffer = NULL; char *buffer = NULL;
int len, consumed = 0, hdr, reply; int len, data_size, consumed, hdr, reply;
XLogRecPtr data_start, wal_end; XLogRecPtr data_start, wal_end;
pg_int64 send_time; pg_int64 send_time;
PyObject *str = NULL, *msg = NULL; PyObject *str = NULL, *msg = NULL;
Dprintf("pq_read_replication_message(decode=%d)", decode); Dprintf("pq_read_replication_message(decode=%d)", decode);
consumed = 0;
retry: retry:
len = PQgetCopyData(curs->conn->pgconn, &buffer, 1 /* async */); len = PQgetCopyData(curs->conn->pgconn, &buffer, 1 /* async */);
@ -1570,10 +1580,12 @@ retry:
} }
if (len == -2) { if (len == -2) {
/* serious error */
pq_raise(curs->conn, curs, NULL); pq_raise(curs->conn, curs, NULL);
goto exit; goto exit;
} }
if (len == -1) { if (len == -1) {
/* EOF */
curs->pgres = PQgetResult(curs->conn->pgconn); curs->pgres = PQgetResult(curs->conn->pgconn);
if (curs->pgres && PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR) { if (curs->pgres && PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR) {
@ -1595,13 +1607,14 @@ retry:
Dprintf("pq_read_replication_message: msg=%c, len=%d", buffer[0], len); Dprintf("pq_read_replication_message: msg=%c, len=%d", buffer[0], len);
if (buffer[0] == 'w') { if (buffer[0] == 'w') {
/* msgtype(1), dataStart(8), walEnd(8), sendTime(8) */ /* XLogData: msgtype(1), dataStart(8), walEnd(8), sendTime(8) */
hdr = 1 + 8 + 8 + 8; hdr = 1 + 8 + 8 + 8;
if (len < hdr + 1) { if (len < hdr + 1) {
psyco_set_error(OperationalError, curs, "data message header too small"); psyco_set_error(OperationalError, curs, "data message header too small");
goto exit; goto exit;
} }
data_size = len - hdr;
data_start = fe_recvint64(buffer + 1); data_start = fe_recvint64(buffer + 1);
wal_end = fe_recvint64(buffer + 1 + 8); wal_end = fe_recvint64(buffer + 1 + 8);
send_time = fe_recvint64(buffer + 1 + 8 + 8); send_time = fe_recvint64(buffer + 1 + 8 + 8);
@ -1609,12 +1622,13 @@ retry:
Dprintf("pq_read_replication_message: data_start="XLOGFMTSTR", wal_end="XLOGFMTSTR, Dprintf("pq_read_replication_message: data_start="XLOGFMTSTR", wal_end="XLOGFMTSTR,
XLOGFMTARGS(data_start), XLOGFMTARGS(wal_end)); XLOGFMTARGS(data_start), XLOGFMTARGS(wal_end));
Dprintf("pq_read_replication_message: >>%.*s<<", len - hdr, buffer + hdr); Dprintf("pq_read_replication_message: >>%.*s<<", data_size, buffer + hdr);
/* XXX it would be wise to check if it's really a logical replication */
if (decode) { if (decode) {
str = PyUnicode_Decode(buffer + hdr, len - hdr, curs->conn->codec, NULL); str = PyUnicode_Decode(buffer + hdr, data_size, curs->conn->codec, NULL);
} else { } else {
str = Bytes_FromStringAndSize(buffer + hdr, len - hdr); str = Bytes_FromStringAndSize(buffer + hdr, data_size);
} }
if (!str) { goto exit; } if (!str) { goto exit; }
@ -1623,12 +1637,13 @@ retry:
Py_DECREF(str); Py_DECREF(str);
if (!msg) { goto exit; } if (!msg) { goto exit; }
((replicationMessageObject *)msg)->data_size = data_size;
((replicationMessageObject *)msg)->data_start = data_start; ((replicationMessageObject *)msg)->data_start = data_start;
((replicationMessageObject *)msg)->wal_end = wal_end; ((replicationMessageObject *)msg)->wal_end = wal_end;
((replicationMessageObject *)msg)->send_time = send_time; ((replicationMessageObject *)msg)->send_time = send_time;
} }
else if (buffer[0] == 'k') { else if (buffer[0] == 'k') {
/* msgtype(1), walEnd(8), sendTime(8), reply(1) */ /* Primary keepalive message: msgtype(1), walEnd(8), sendTime(8), reply(1) */
hdr = 1 + 8 + 8; hdr = 1 + 8 + 8;
if (len < hdr + 1) { if (len < hdr + 1) {
psyco_set_error(OperationalError, curs, "keepalive message header too small"); psyco_set_error(OperationalError, curs, "keepalive message header too small");
@ -1641,6 +1656,7 @@ retry:
if (curs->conn->async) { if (curs->conn->async) {
curs->repl_feedback_pending = 1; curs->repl_feedback_pending = 1;
} else { } else {
/* XXX not sure if this was a good idea after all */
pq_raise(curs->conn, curs, NULL); pq_raise(curs->conn, curs, NULL);
goto exit; goto exit;
} }
@ -1699,38 +1715,36 @@ pq_send_replication_feedback(cursorObject* curs, int reply_requested)
return 1; return 1;
} }
/* used for streaming replication only */ /* Calls pq_read_replication_message in an endless loop, until
static int stop_replication is called or a fatal error occurs. The messages
_pq_copy_both_v3(cursorObject *curs) are passed to the consumer object.
When no message is available, blocks on the connection socket, but
manages to send keepalive messages to the server as needed.
*/
int
pq_copy_both(cursorObject *curs, PyObject *consumer, int decode, double keepalive_interval)
{ {
PyObject *msg, *tmp = NULL; PyObject *msg, *tmp = NULL;
PyObject *write_func = NULL; PyObject *consume_func = NULL;
int is_text, fd, sel, ret = -1; int fd, sel, ret = -1;
PGconn *pgconn; PGconn *pgconn;
fd_set fds; fd_set fds;
struct timeval curr_time, ping_time, time_diff; struct timeval keep_intr, curr_time, ping_time, timeout;
if (!curs->copyfile) { if (!(consume_func = PyObject_GetAttrString(consumer, "consume"))) {
psyco_set_error(ProgrammingError, curs, Dprintf("pq_copy_both: can't get o.consume");
"can't execute START_REPLICATION directly: use the start_replication() method instead");
goto exit;
}
if (!(write_func = PyObject_GetAttrString(curs->copyfile, "write"))) {
Dprintf("_pq_copy_both_v3: can't get o.write");
goto exit;
}
/* if the file is text we must pass it unicode. */
if (-1 == (is_text = psycopg_is_text_file(curs->copyfile))) {
goto exit; goto exit;
} }
CLEARPGRES(curs->pgres); CLEARPGRES(curs->pgres);
pgconn = curs->conn->pgconn; pgconn = curs->conn->pgconn;
keep_intr.tv_sec = (int)keepalive_interval;
keep_intr.tv_usec = (keepalive_interval - keep_intr.tv_sec)*1.0e6;
while (1) { while (1) {
msg = pq_read_replication_message(curs, is_text); msg = pq_read_replication_message(curs, decode);
if (!msg) { if (!msg) {
goto exit; goto exit;
} }
@ -1748,14 +1762,12 @@ _pq_copy_both_v3(cursorObject *curs)
gettimeofday(&curr_time, NULL); gettimeofday(&curr_time, NULL);
ping_time = curs->repl_last_io; timeradd(&curs->repl_last_io, &keep_intr, &ping_time);
ping_time.tv_sec += curs->repl_keepalive_interval.tv_sec; timersub(&ping_time, &curr_time, &timeout);
ping_time.tv_usec += curs->repl_keepalive_interval.tv_usec;
timersub(&ping_time, &curr_time, &time_diff); if (timeout.tv_sec >= 0) {
if (time_diff.tv_sec > 0) {
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
sel = select(fd + 1, &fds, NULL, NULL, &time_diff); sel = select(fd + 1, &fds, NULL, NULL, &timeout);
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
} }
else { else {
@ -1782,17 +1794,17 @@ _pq_copy_both_v3(cursorObject *curs)
continue; continue;
} }
else { else {
tmp = PyObject_CallFunctionObjArgs(write_func, msg, NULL); tmp = PyObject_CallFunctionObjArgs(consume_func, msg, NULL);
Py_DECREF(msg); Py_DECREF(msg);
if (tmp == NULL) { if (tmp == NULL) {
Dprintf("_pq_copy_both_v3: write_func returned NULL"); Dprintf("pq_copy_both: consume_func returned NULL");
goto exit; goto exit;
} }
Py_DECREF(tmp); Py_DECREF(tmp);
if (curs->repl_stop) { if (curs->repl_stop) {
Dprintf("_pq_copy_both_v3: repl_stop flag set by write_func"); Dprintf("pq_copy_both: repl_stop flag set by consume_func");
break; break;
} }
} }
@ -1801,7 +1813,7 @@ _pq_copy_both_v3(cursorObject *curs)
ret = 1; ret = 1;
exit: exit:
Py_XDECREF(write_func); Py_XDECREF(consume_func);
return ret; return ret;
} }
@ -1867,13 +1879,14 @@ pq_fetch(cursorObject *curs, int no_result)
case PGRES_COPY_BOTH: case PGRES_COPY_BOTH:
Dprintf("pq_fetch: data from a streaming replication slot (no tuples)"); Dprintf("pq_fetch: data from a streaming replication slot (no tuples)");
curs->rowcount = -1; curs->rowcount = -1;
if (curs->conn->async) { ex = 0;
/*if (curs->conn->async) {
ex = 0; ex = 0;
} else { } else {
ex = _pq_copy_both_v3(curs); ex = _pq_copy_both_v3(curs);
/* error caught by out glorious notice handler */
if (PyErr_Occurred()) ex = -1; if (PyErr_Occurred()) ex = -1;
} }*/
CLEARPGRES(curs->pgres); CLEARPGRES(curs->pgres);
break; break;

View File

@ -72,6 +72,8 @@ HIDDEN int pq_execute_command_locked(connectionObject *conn,
RAISES HIDDEN void pq_complete_error(connectionObject *conn, PGresult **pgres, RAISES HIDDEN void pq_complete_error(connectionObject *conn, PGresult **pgres,
char **error); char **error);
HIDDEN int pq_copy_both(cursorObject *curs, PyObject *consumer,
int decode, double keepalive_interval);
HIDDEN PyObject *pq_read_replication_message(cursorObject *curs, int decode); HIDDEN PyObject *pq_read_replication_message(cursorObject *curs, int decode);
HIDDEN int pq_send_replication_feedback(cursorObject *curs, int reply_requested); HIDDEN int pq_send_replication_feedback(cursorObject *curs, int reply_requested);

View File

@ -42,6 +42,7 @@ struct replicationMessageObject {
cursorObject *cursor; cursorObject *cursor;
PyObject *payload; PyObject *payload;
int data_size;
XLogRecPtr data_start; XLogRecPtr data_start;
XLogRecPtr wal_end; XLogRecPtr wal_end;
pg_int64 send_time; pg_int64 send_time;

View File

@ -49,8 +49,9 @@ static PyObject *
replmsg_repr(replicationMessageObject *self) replmsg_repr(replicationMessageObject *self)
{ {
return PyString_FromFormat( return PyString_FromFormat(
"<replicationMessage object at %p; data_start: "XLOGFMTSTR"; wal_end: "XLOGFMTSTR"; send_time: %lld>", "<replicationMessage object at %p; data_size: %d; data_start: "XLOGFMTSTR"; wal_end: "XLOGFMTSTR"; send_time: %lld>",
self, XLOGFMTARGS(self->data_start), XLOGFMTARGS(self->wal_end), self->send_time); self, self->data_size, XLOGFMTARGS(self->data_start), XLOGFMTARGS(self->wal_end),
self->send_time);
} }
static int static int
@ -63,8 +64,10 @@ replmsg_init(PyObject *obj, PyObject *args, PyObject *kwargs)
Py_XINCREF(self->cursor); Py_XINCREF(self->cursor);
Py_XINCREF(self->payload); Py_XINCREF(self->payload);
self->data_size = 0;
self->data_start = 0; self->data_start = 0;
self->wal_end = 0; self->wal_end = 0;
self->send_time = 0;
return 0; return 0;
} }
@ -125,6 +128,8 @@ static struct PyMemberDef replicationMessageObject_members[] = {
"TODO"}, "TODO"},
{"payload", T_OBJECT, OFFSETOF(payload), READONLY, {"payload", T_OBJECT, OFFSETOF(payload), READONLY,
"TODO"}, "TODO"},
{"data_size", T_INT, OFFSETOF(data_size), READONLY,
"TODO"},
{"data_start", T_ULONGLONG, OFFSETOF(data_start), READONLY, {"data_start", T_ULONGLONG, OFFSETOF(data_start), READONLY,
"TODO"}, "TODO"},
{"wal_end", T_ULONGLONG, OFFSETOF(wal_end), READONLY, {"wal_end", T_ULONGLONG, OFFSETOF(wal_end), READONLY,