Update docs for Replication protocol

This commit is contained in:
Oleksandr Shulgin 2015-06-30 16:17:31 +02:00
parent 058db56430
commit 318706f28c

View File

@ -165,8 +165,8 @@ Replication cursor
.. method:: identify_system()
Get information about the cluster status in form of a dict with
``systemid``, ``timeline``, ``xlogpos`` and ``dbname`` as keys.
This method executes ``IDENTIFY_SYSTEM`` command of the streaming
replication protocol and returns a result as a dictionary.
Example::
@ -197,65 +197,196 @@ Replication cursor
cur.drop_replication_slot("testslot")
.. method:: start_replication(file, slot_type, slot_name=None, start_lsn=None, timeline=0, keepalive_interval=10, options=None)
.. method:: start_replication(slot_type, slot_name=None, writer=None, start_lsn=None, timeline=0, keepalive_interval=10, options=None)
Start and consume replication stream.
:param file: a file-like object to write replication stream messages to
:param slot_type: type of replication: either `REPLICATION_PHYSICAL` or
`REPLICATION_LOGICAL`
:param slot_name: name of the replication slot to use (required for
logical replication)
:param writer: a file-like object to write replication messages to
:param start_lsn: the point in replication stream (WAL position) to start
from, in the form ``XXX/XXX`` (forward-slash separated
pair of hexadecimals)
:param timeline: WAL history timeline to start streaming from (optional,
can only be used with physical replication)
:param keepalive_interval: interval (in seconds) to send keepalive
messages to the server, in case there was no
communication during that period of time
messages to the server
:param options: an dictionary of options to pass to logical replication
slot
The ``keepalive_interval`` must be greater than zero.
With non-asynchronous connection, this method enters an endless loop,
reading messages from the server and passing them to ``write()`` method
of the *writer* object. This is similar to operation of the
`~cursor.copy_to()` method. It also sends keepalive messages to the
server, in case there were no new data from it for the duration of
*keepalive_interval* seconds (this parameter must be greater than 1
second, but it can have a fractional part).
This method never returns unless an error message is sent from the
server, or the server closes connection, or there is an exception in the
``write()`` method of the ``file`` object.
With asynchronous connection, this method returns immediately and the
calling code can start reading the replication messages in a loop.
One can even use ``sys.stdout`` as the destination (this is only good for
testing purposes, however)::
A sketch implementation of the *writer* object might look similar to
the following::
>>> cur.start_replication(sys.stdout, "testslot")
...
from io import TextIOBase
This method acts much like the `~cursor.copy_to()` with an important
distinction that ``write()`` method return value is dirving the
server-side replication cursor. In order to report to the server that
the all the messages up to the current one have been stored reliably, one
should return true value (i.e. something that satisfies ``if retval:``
conidtion) from the ``write`` callback::
class ReplicationStreamWriter(TextIOBase):
class ReplicationStreamWriter(object):
def write(self, msg):
if store_message_reliably(msg):
return True
self.store_data_reliably(msg)
cur.start_replication(writer, "testslot")
if self.should_report_to_the_server(msg):
msg.cursor.send_replication_feedback(flush_lsn=msg.wal_end)
def store_data_reliably(self, msg):
...
def shoud_report_to_the_server(self, msg):
...
First, like with the `~cursor.copy_to()` method, the code that is
calling the provided write method checks if the *writer* object is
inherited from `~io.TextIOBase`. If that is the case, the message
payload to be passed is converted to unicode using the connection's
encoding information. Otherwise, the message is passed as is.
The *msg* object being passed is an instance of `~ReplicationMessage`
class.
After storing the data passed in the message object, the writer object
should consider sending a confirmation message to the server. This is
done by calling `~send_replication_feedback()` method on the
corresponding replication cursor. A reference to the cursor producing
a given message is provided in the `~ReplicationMessage` as an
attribute.
.. note::
One needs to be aware that failure to update the server-side cursor
on any one replication slot properly by constantly consuming and
reporting success to the server can eventually lead to "disk full"
condition on the server, because the server retains all the WAL
segments that might be needed to stream the changes via currently
open replication slots.
One needs to be aware that failure to properly notify the server on
any one replication slot by constantly consuming and reporting
success to the server at appropriate times can eventually lead to
"disk full" condition on the server, because the server retains all
the WAL segments that might be needed to stream the changes via
currently open replication slots.
.. method:: stop_replication()
In non-asynchronous connection, when called from the ``write()``
method tells the code in `~start_replication` to break out of the
endless loop and return.
.. method:: send_replication_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False)
:param write_lsn: a LSN position up to which the client has written the data locally
:param flush_lsn: a LSN position up to which the client has stored the
data reliably (the server is allowed to discard all
and every data that predates this LSN)
:param apply_lsn: a LSN position up to which the warm standby server
has applied the changes (physical replication
master-slave protocol only)
:param reply: request the server to send back a keepalive message immediately
Use this method to report to the server that all messages up to a
certain LSN position have been stored and may be discarded.
This method can also be called with default parameters to send a
keepalive message to the server.
In case the message cannot be sent at the moment, remembers the
positions for a later successful call or call to
`~flush_replication_feedback()`.
.. method:: flush_replication_feedback(reply=False)
:param reply: request the server to send back a keepalive message immediately
This method tries to flush the latest replication feedback message
that `~send_replication_feedback()` was trying to send, if any.
Low-level methods for asynchronous connection operation.
While with the non-asynchronous connection, a single call to
`~start_replication()` handles all the complexity, at times it might be
beneficial to use low-level interface for better control, in particular to
`~select.select()` on multiple sockets. The following methods are
provided for asynchronous operation:
.. method:: read_replication_message(decode=True)
:param decode: a flag indicating that unicode conversion should be
performed on the data received from the server
This method should be used in a loop with asynchronous connections
after calling `~start_replication()`.
It tries to read the next message from the server, without blocking
and returns an instance of `~ReplicationMessage` or *None*, in case
there are no more data messages from the server at the moment. After
receiving a *None* value from this method, one should use a
`~select.select()` or `~select.poll()` on the corresponding connection
to block the process until there is more data from the server.
The server can send keepalive messages to the client periodically.
Such messages are silently consumed by this method and are never
reported to the caller.
.. method:: fileno()
Calls the corresponding connection's `~connection.fileno()` method
and returns the result.
This is a convenience method which allows replication cursor to be
used directly in `~select.select()` or `~select.poll()` calls.
.. attribute:: replication_io_timestamp
A `~datetime` object representing the timestamp at the moment of last
communication with the server (a data or keepalive message in either
direction).
An actual example of asynchronous operation might look like this::
keepalive_interval = 10.0
while True:
if (datetime.now() - cur.replication_io_timestamp).total_seconds() >= keepalive_interval:
cur.send_replication_feedback()
while True:
msg = cur.read_replication_message()
if not msg:
break
writer.write(msg)
timeout = keepalive_interval - (datetime.now() - cur.replication_io_timestamp).total_seconds()
if timeout > 0:
select.select([cur], [], [], timeout)
.. autoclass:: ReplicationMessage
.. attribute:: payload
The actual data received from the server. An instance of either
``str`` or ``unicode``.
.. attribute:: data_start
LSN position of the start of the message.
.. attribute:: wal_end
LSN position of the end of the message.
.. attribute:: send_time
A `~datetime` object representing the server timestamp at the moment
when the message was sent.
.. attribute:: cursor
A reference to the corresponding `~ReplicationCursor` object.
Drop any open replication slots that are no longer being used. The
list of open slots can be obtained by running a query like ``SELECT *
FROM pg_replication_slots``.
.. data:: REPLICATION_PHYSICAL