diff --git a/doc/src/extras.rst b/doc/src/extras.rst index 9384a961..2a7bed26 100644 --- a/doc/src/extras.rst +++ b/doc/src/extras.rst @@ -141,8 +141,81 @@ Logging cursor .. autoclass:: MinTimeLoggingCursor -Replication cursor -^^^^^^^^^^^^^^^^^^ +Replication protocol support +^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Modern PostgreSQL servers (version 9.0 and above) support replication. The +replication protocol is built on top of the client-server protocol and can be +operated using ``libpq``, as such it can be also operated by ``psycopg2``. +The replication protocol can be operated on both synchronous and +:ref:`asynchronous ` connections. + +Server version 9.4 adds a new feature called *Logical Replication*. + +.. seealso:: + + - PostgreSQL `Streaming Replication Protocol`__ + + .. __: http://www.postgresql.org/docs/current/static/protocol-replication.html + + +Logical replication Quick-Start +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +You must be using PostgreSQL server version 9.4 or above to run this quick +start. + +Make sure that replication connections are permitted for user ``postgres`` in +``pg_hba.conf`` and reload the server configuration. You also need to set +``wal_level=logical`` and ``max_wal_senders``, ``max_replication_slots`` to +value greater than zero in ``postgresql.conf`` (these changes require a server +restart). Create a database ``psycopg2test``. + +Then run the following code to quickly try the replication support out. This +is not production code -- it has no error handling, it sends feedback too +often, etc. -- and it's only intended as a simple demo of logical +replication:: + + from __future__ import print_function + import sys + import psycopg2 + import psycopg2.extras + + conn = psycopg2.connect('dbname=psycopg2test user=postgres', + connection_factory=psycopg2.extras.LogicalReplicationConnection) + cur = conn.cursor() + try: + cur.start_replication(slot_name='pytest') + except psycopg2.ProgrammingError: + cur.create_replication_slot('pytest', output_plugin='test_decoding') + cur.start_replication(slot_name='pytest') + + class DemoConsumer(object): + def __call__(self, msg): + print(msg.payload) + msg.cursor.send_feedback(flush_lsn=msg.data_start) + + democonsumer = DemoConsumer() + + print("Starting streaming, press Control-C to end...", file=sys.stderr) + try: + cur.consume_stream(democonsumer) + except KeyboardInterrupt: + cur.close() + conn.close() + print("The slot 'pytest' still exists. Drop it with SELECT pg_drop_replication_slot('pytest'); if no longer needed.", file=sys.stderr) + print("WARNING: Transaction logs will accumulate in pg_xlog until the slot is dropped.", file=sys.stderr) + + +You can now make changes to the ``psycopg2test`` database using a normal +psycopg2 session, ``psql``, etc. and see the logical decoding stream printed +by this demo client. + +This will continue running until terminated with ``Control-C``. + + +Replication connection and cursor classes +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. autoclass:: ReplicationConnectionBase @@ -177,17 +250,11 @@ The following replication types are defined: phys_cur = phys_conn.cursor() Both `LogicalReplicationConnection` and `PhysicalReplicationConnection` use - `ReplicationCursor` for actual communication on the connection. - -.. seealso:: - - - PostgreSQL `Streaming Replication Protocol`__ - - .. __: http://www.postgresql.org/docs/current/static/protocol-replication.html + `ReplicationCursor` for actual communication with the server. -The individual messages in the replication stream are presented by -`ReplicationMessage` objects: +The individual messages in the replication stream are represented by +`ReplicationMessage` objects (both logical and physical type): .. autoclass:: ReplicationMessage @@ -249,7 +316,7 @@ The individual messages in the replication stream are presented by replication slot is created by default. No output plugin parameter is required or allowed when creating a physical replication slot. - In either case, the type of slot being created can be specified + In either case the type of slot being created can be specified explicitly using *slot_type* parameter. Replication slots are a feature of PostgreSQL server starting with @@ -295,25 +362,25 @@ The individual messages in the replication stream are presented by replication can be used with both types of connection. On the other hand, physical replication doesn't require a named - replication slot to be used, only logical one does. In any case, - logical replication and replication slots are a feature of PostgreSQL - server starting with version 9.4. Physical replication can be used - starting with 9.0. + replication slot to be used, only logical replication does. In any + case logical replication and replication slots are a feature of + PostgreSQL server starting with version 9.4. Physical replication can + be used starting with 9.0. If *start_lsn* is specified, the requested stream will start from that - LSN. The default is `!None`, which passes the LSN ``0/0``, causing - replay to begin at the last point at which the server got replay - confirmation from the client for, or the oldest available point for a - new slot. + LSN. The default is `!None` which passes the LSN ``0/0`` causing + replay to begin at the last point for which the server got flush + confirmation from the client, or the oldest available point for a new + slot. The server might produce an error if a WAL file for the given LSN has - already been recycled, or it may silently start streaming from a later + already been recycled or it may silently start streaming from a later position: the client can verify the actual position using information - provided the `ReplicationMessage` attributes. The exact server + provided by the `ReplicationMessage` attributes. The exact server behavior depends on the type of replication and use of slots. - A *timeline* parameter can only be specified with physical replication - and only starting with server version 9.3. + The *timeline* parameter can only be specified with physical + replication and only starting with server version 9.3. A dictionary of *options* may be passed to the logical decoding plugin on a logical replication slot. The set of supported options depends @@ -324,8 +391,9 @@ The individual messages in the replication stream are presented by `start_replication_expert()` internally. After starting the replication, to actually consume the incoming - server messages, use `consume_stream()` or implement a loop around - `read_message()` in case of asynchronous connection. + server messages use `consume_stream()` or implement a loop around + `read_message()` in case of :ref:`asynchronous connection + `. .. method:: start_replication_expert(command) @@ -343,66 +411,66 @@ The individual messages in the replication stream are presented by This method can only be used with synchronous connection. For asynchronous connections see `read_message()`. - Before calling this method to consume the stream, use + Before calling this method to consume the stream use `start_replication()` first. - When called, this method enters an endless loop, reading messages from - the server and passing them to ``consume()``, then waiting for more - messages from the server. In order to make this method break out of - the loop and return, ``consume()`` can throw a `StopReplication` - exception (any unhandled exception will make it break out of the loop - as well). + This method enters an endless loop reading messages from the server + and passing them to ``consume()``, then waiting for more messages from + the server. In order to make this method break out of the loop and + return, ``consume()`` can throw a `StopReplication` exception. Any + unhandled exception will make it break out of the loop as well. - If *decode* is set to `!True`, the messages read from the server are + If *decode* is set to `!True` the messages read from the server are converted according to the connection `~connection.encoding`. This parameter should not be set with physical replication. - This method also sends keepalive messages to the server, in case there + This method also sends keepalive messages to the server in case there were no new data from the server for the duration of *keepalive_interval* (in seconds). The value of this parameter must - be equal to at least 1 second, but it can have a fractional part. + be set to at least 1 second, but it can have a fractional part. + + The *msg* objects passed to ``consume()`` are instances of + `ReplicationMessage` class. + + After processing certain amount of messages the client should send a + confirmation message to the server. This should be done by calling + `send_feedback()` method on the corresponding replication cursor. A + reference to the cursor is provided in the `ReplicationMessage` as an + attribute. The following example is a sketch implementation of ``consume()`` callable for logical replication:: class LogicalStreamConsumer(object): - def __call__(self, msg): - self.store_message_data(msg.payload) + ... - if self.should_report_to_the_server_now(msg): + def __call__(self, msg): + self.process_message(msg.payload) + + if self.should_send_feedback(msg): msg.cursor.send_feedback(flush_lsn=msg.data_start) consumer = LogicalStreamConsumer() cur.consume_stream(consumer, decode=True) - The *msg* objects passed to ``consume()`` are instances of - `ReplicationMessage` class. - - After storing certain amount of messages' data reliably, the client - should send a confirmation message to the server. This should be done - by calling `send_feedback()` method on the corresponding replication - cursor. A reference to the cursor is provided in the - `ReplicationMessage` as an attribute. - .. warning:: - When using replication with slots, failure to properly notify the - server by constantly consuming and reporting success at - appropriate times can eventually lead to "disk full" condition on - the server, because the server retains all the WAL segments that - might be needed to stream the changes via all of the currently - open replication slots. + When using replication with slots, failure to constantly consume + *and* report success to the server appropriately can eventually + lead to "disk full" condition on the server, because the server + retains all the WAL segments that might be needed to stream the + changes via all of the currently open replication slots. - On the other hand, it is not recommended to send a confirmation - after every processed message, since that will put an unnecessary - load on network and the server. A possible strategy is to confirm - after every COMMIT message. + On the other hand, it is not recommended to send confirmation + after *every* processed message, since that will put an + unnecessary load on network and the server. A possible strategy + is to confirm after every COMMIT message. .. method:: send_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False) :param write_lsn: a LSN position up to which the client has written the data locally - :param flush_lsn: a LSN position up to which the client has stored the + :param flush_lsn: a LSN position up to which the client has processed the data reliably (the server is allowed to discard all and every data that predates this LSN) :param apply_lsn: a LSN position up to which the warm standby server @@ -411,7 +479,7 @@ The individual messages in the replication stream are presented by :param reply: request the server to send back a keepalive message immediately Use this method to report to the server that all messages up to a - certain LSN position have been stored on the client and may be + certain LSN position have been processed on the client and may be discarded on the server. This method can also be called with all default parameters' values to @@ -433,13 +501,14 @@ The individual messages in the replication stream are presented by Returns `!True` if the feedback message was sent successfully, `!False` otherwise. - Low-level methods for asynchronous connection operation. + Low-level replication cursor methods for :ref:`asynchronous connection + ` operation. - With the synchronous connection, a call to `consume_stream()` handles all + With the synchronous connection a call to `consume_stream()` handles all the complexity of handling the incoming messages and sending keepalive replies, but at times it might be beneficial to use low-level interface - for better control, in particular to `~select.select()` on multiple - sockets. The following methods are provided for asynchronous operation: + for better control, in particular to `~select` on multiple sockets. The + following methods are provided for asynchronous operation: .. method:: read_message(decode=True) @@ -449,16 +518,16 @@ The individual messages in the replication stream are presented by This method should be used in a loop with asynchronous connections after calling `start_replication()` once. - It tries to read the next message from the server, without blocking - and returns an instance of `ReplicationMessage` or `!None`, in case - there are no more data messages from the server at the moment. + It tries to read the next message from the server without blocking and + returns an instance of `ReplicationMessage` or `!None`, in case there + are no more data messages from the server at the moment. It is expected that the calling code will call this method repeatedly - in order to consume all of the messages that might have been buffered, - until `!None` is returned. After receiving a `!None` value from this - method, the caller should use `~select.select()` or `~select.poll()` - on the corresponding connection to block the process until there is - more data from the server. + in order to consume all of the messages that might have been buffered + until `!None` is returned. After receiving `!None` from this method + the caller should use `~select.select()` or `~select.poll()` on the + corresponding connection to block the process until there is more data + from the server. The server can send keepalive messages to the client periodically. Such messages are silently consumed by this method and are never @@ -480,24 +549,25 @@ The individual messages in the replication stream are presented by An actual example of asynchronous operation might look like this:: - def consume(msg): - ... + def consume(msg): + ... - keepalive_interval = 10.0 - while True: - msg = cur.read_message() - if msg: - consume(msg) - else: - now = datetime.now() - timeout = keepalive_interval - (now - cur.io_timestamp).total_seconds() - if timeout > 0: - sel = select.select([cur], [], [], timeout) - else: - sel = ([], [], []) + keepalive_interval = 10.0 + while True: + msg = cur.read_message() + if msg: + consume(msg) + else: + now = datetime.now() + timeout = keepalive_interval - (now - cur.io_timestamp).total_seconds() + if timeout > 0: + sel = select.select([cur], [], [], timeout) + else: + sel = ([], [], []) - if not sel[0]: - cur.send_feedback() + if not sel[0]: + # timed out, send keepalive message + cur.send_feedback() .. index:: pair: Cursor; Replication diff --git a/lib/extras.py b/lib/extras.py index 7c713573..8e1373c1 100644 --- a/lib/extras.py +++ b/lib/extras.py @@ -514,7 +514,7 @@ class StopReplication(Exception): class ReplicationCursor(_replicationCursor): - """A cursor used for communication on the replication protocol.""" + """A cursor used for communication on replication connections.""" def create_replication_slot(self, slot_name, slot_type=None, output_plugin=None): """Create streaming replication slot.""" diff --git a/psycopg/replication_message_type.c b/psycopg/replication_message_type.c index d4b0457b..893ce7ad 100644 --- a/psycopg/replication_message_type.c +++ b/psycopg/replication_message_type.c @@ -146,7 +146,7 @@ static struct PyGetSetDef replicationMessageObject_getsets[] = { /* object type */ #define replicationMessageType_doc \ -"A database replication message." +"A replication protocol message." PyTypeObject replicationMessageType = { PyVarObject_HEAD_INIT(NULL, 0)