From 318706f28c07444c1a73a3022eab2018ec73817c Mon Sep 17 00:00:00 2001 From: Oleksandr Shulgin Date: Tue, 30 Jun 2015 16:17:31 +0200 Subject: [PATCH] Update docs for Replication protocol --- doc/src/extras.rst | 199 +++++++++++++++++++++++++++++++++++++-------- 1 file changed, 165 insertions(+), 34 deletions(-) diff --git a/doc/src/extras.rst b/doc/src/extras.rst index 9bc302e2..7cca8400 100644 --- a/doc/src/extras.rst +++ b/doc/src/extras.rst @@ -165,8 +165,8 @@ Replication cursor .. method:: identify_system() - Get information about the cluster status in form of a dict with - ``systemid``, ``timeline``, ``xlogpos`` and ``dbname`` as keys. + This method executes ``IDENTIFY_SYSTEM`` command of the streaming + replication protocol and returns a result as a dictionary. Example:: @@ -197,65 +197,196 @@ Replication cursor cur.drop_replication_slot("testslot") - .. method:: start_replication(file, slot_type, slot_name=None, start_lsn=None, timeline=0, keepalive_interval=10, options=None) + .. method:: start_replication(slot_type, slot_name=None, writer=None, start_lsn=None, timeline=0, keepalive_interval=10, options=None) Start and consume replication stream. - :param file: a file-like object to write replication stream messages to :param slot_type: type of replication: either `REPLICATION_PHYSICAL` or `REPLICATION_LOGICAL` :param slot_name: name of the replication slot to use (required for logical replication) + :param writer: a file-like object to write replication messages to :param start_lsn: the point in replication stream (WAL position) to start from, in the form ``XXX/XXX`` (forward-slash separated pair of hexadecimals) :param timeline: WAL history timeline to start streaming from (optional, can only be used with physical replication) :param keepalive_interval: interval (in seconds) to send keepalive - messages to the server, in case there was no - communication during that period of time + messages to the server :param options: an dictionary of options to pass to logical replication slot - The ``keepalive_interval`` must be greater than zero. + With non-asynchronous connection, this method enters an endless loop, + reading messages from the server and passing them to ``write()`` method + of the *writer* object. This is similar to operation of the + `~cursor.copy_to()` method. It also sends keepalive messages to the + server, in case there were no new data from it for the duration of + *keepalive_interval* seconds (this parameter must be greater than 1 + second, but it can have a fractional part). - This method never returns unless an error message is sent from the - server, or the server closes connection, or there is an exception in the - ``write()`` method of the ``file`` object. + With asynchronous connection, this method returns immediately and the + calling code can start reading the replication messages in a loop. - One can even use ``sys.stdout`` as the destination (this is only good for - testing purposes, however):: + A sketch implementation of the *writer* object might look similar to + the following:: - >>> cur.start_replication(sys.stdout, "testslot") - ... + from io import TextIOBase - This method acts much like the `~cursor.copy_to()` with an important - distinction that ``write()`` method return value is dirving the - server-side replication cursor. In order to report to the server that - the all the messages up to the current one have been stored reliably, one - should return true value (i.e. something that satisfies ``if retval:`` - conidtion) from the ``write`` callback:: + class ReplicationStreamWriter(TextIOBase): - class ReplicationStreamWriter(object): def write(self, msg): - if store_message_reliably(msg): - return True + self.store_data_reliably(msg) - cur.start_replication(writer, "testslot") - ... + if self.should_report_to_the_server(msg): + msg.cursor.send_replication_feedback(flush_lsn=msg.wal_end) + + def store_data_reliably(self, msg): + ... + + def shoud_report_to_the_server(self, msg): + ... + + First, like with the `~cursor.copy_to()` method, the code that is + calling the provided write method checks if the *writer* object is + inherited from `~io.TextIOBase`. If that is the case, the message + payload to be passed is converted to unicode using the connection's + encoding information. Otherwise, the message is passed as is. + + The *msg* object being passed is an instance of `~ReplicationMessage` + class. + + After storing the data passed in the message object, the writer object + should consider sending a confirmation message to the server. This is + done by calling `~send_replication_feedback()` method on the + corresponding replication cursor. A reference to the cursor producing + a given message is provided in the `~ReplicationMessage` as an + attribute. .. note:: - One needs to be aware that failure to update the server-side cursor - on any one replication slot properly by constantly consuming and - reporting success to the server can eventually lead to "disk full" - condition on the server, because the server retains all the WAL - segments that might be needed to stream the changes via currently - open replication slots. + One needs to be aware that failure to properly notify the server on + any one replication slot by constantly consuming and reporting + success to the server at appropriate times can eventually lead to + "disk full" condition on the server, because the server retains all + the WAL segments that might be needed to stream the changes via + currently open replication slots. + + .. method:: stop_replication() + + In non-asynchronous connection, when called from the ``write()`` + method tells the code in `~start_replication` to break out of the + endless loop and return. + + .. method:: send_replication_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False) + + :param write_lsn: a LSN position up to which the client has written the data locally + :param flush_lsn: a LSN position up to which the client has stored the + data reliably (the server is allowed to discard all + and every data that predates this LSN) + :param apply_lsn: a LSN position up to which the warm standby server + has applied the changes (physical replication + master-slave protocol only) + :param reply: request the server to send back a keepalive message immediately + + Use this method to report to the server that all messages up to a + certain LSN position have been stored and may be discarded. + + This method can also be called with default parameters to send a + keepalive message to the server. + + In case the message cannot be sent at the moment, remembers the + positions for a later successful call or call to + `~flush_replication_feedback()`. + + .. method:: flush_replication_feedback(reply=False) + + :param reply: request the server to send back a keepalive message immediately + + This method tries to flush the latest replication feedback message + that `~send_replication_feedback()` was trying to send, if any. + + Low-level methods for asynchronous connection operation. + + While with the non-asynchronous connection, a single call to + `~start_replication()` handles all the complexity, at times it might be + beneficial to use low-level interface for better control, in particular to + `~select.select()` on multiple sockets. The following methods are + provided for asynchronous operation: + + .. method:: read_replication_message(decode=True) + + :param decode: a flag indicating that unicode conversion should be + performed on the data received from the server + + This method should be used in a loop with asynchronous connections + after calling `~start_replication()`. + + It tries to read the next message from the server, without blocking + and returns an instance of `~ReplicationMessage` or *None*, in case + there are no more data messages from the server at the moment. After + receiving a *None* value from this method, one should use a + `~select.select()` or `~select.poll()` on the corresponding connection + to block the process until there is more data from the server. + + The server can send keepalive messages to the client periodically. + Such messages are silently consumed by this method and are never + reported to the caller. + + .. method:: fileno() + + Calls the corresponding connection's `~connection.fileno()` method + and returns the result. + + This is a convenience method which allows replication cursor to be + used directly in `~select.select()` or `~select.poll()` calls. + + .. attribute:: replication_io_timestamp + + A `~datetime` object representing the timestamp at the moment of last + communication with the server (a data or keepalive message in either + direction). + + An actual example of asynchronous operation might look like this:: + + keepalive_interval = 10.0 + while True: + if (datetime.now() - cur.replication_io_timestamp).total_seconds() >= keepalive_interval: + cur.send_replication_feedback() + + while True: + msg = cur.read_replication_message() + if not msg: + break + writer.write(msg) + + timeout = keepalive_interval - (datetime.now() - cur.replication_io_timestamp).total_seconds() + if timeout > 0: + select.select([cur], [], [], timeout) + +.. autoclass:: ReplicationMessage + + .. attribute:: payload + + The actual data received from the server. An instance of either + ``str`` or ``unicode``. + + .. attribute:: data_start + + LSN position of the start of the message. + + .. attribute:: wal_end + + LSN position of the end of the message. + + .. attribute:: send_time + + A `~datetime` object representing the server timestamp at the moment + when the message was sent. + + .. attribute:: cursor + + A reference to the corresponding `~ReplicationCursor` object. - Drop any open replication slots that are no longer being used. The - list of open slots can be obtained by running a query like ``SELECT * - FROM pg_replication_slots``. .. data:: REPLICATION_PHYSICAL