Add quick start to the replication doc, minor doc fixes.

This commit is contained in:
Oleksandr Shulgin 2015-10-20 12:36:13 +02:00
parent 0bb81fc848
commit 23abe4f501
3 changed files with 160 additions and 90 deletions

View File

@ -141,8 +141,81 @@ Logging cursor
.. autoclass:: MinTimeLoggingCursor
Replication cursor
^^^^^^^^^^^^^^^^^^
Replication protocol support
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Modern PostgreSQL servers (version 9.0 and above) support replication. The
replication protocol is built on top of the client-server protocol and can be
operated using ``libpq``, as such it can be also operated by ``psycopg2``.
The replication protocol can be operated on both synchronous and
:ref:`asynchronous <async-support>` connections.
Server version 9.4 adds a new feature called *Logical Replication*.
.. seealso::
- PostgreSQL `Streaming Replication Protocol`__
.. __: http://www.postgresql.org/docs/current/static/protocol-replication.html
Logical replication Quick-Start
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
You must be using PostgreSQL server version 9.4 or above to run this quick
start.
Make sure that replication connections are permitted for user ``postgres`` in
``pg_hba.conf`` and reload the server configuration. You also need to set
``wal_level=logical`` and ``max_wal_senders``, ``max_replication_slots`` to
value greater than zero in ``postgresql.conf`` (these changes require a server
restart). Create a database ``psycopg2test``.
Then run the following code to quickly try the replication support out. This
is not production code -- it has no error handling, it sends feedback too
often, etc. -- and it's only intended as a simple demo of logical
replication::
from __future__ import print_function
import sys
import psycopg2
import psycopg2.extras
conn = psycopg2.connect('dbname=psycopg2test user=postgres',
connection_factory=psycopg2.extras.LogicalReplicationConnection)
cur = conn.cursor()
try:
cur.start_replication(slot_name='pytest')
except psycopg2.ProgrammingError:
cur.create_replication_slot('pytest', output_plugin='test_decoding')
cur.start_replication(slot_name='pytest')
class DemoConsumer(object):
def __call__(self, msg):
print(msg.payload)
msg.cursor.send_feedback(flush_lsn=msg.data_start)
democonsumer = DemoConsumer()
print("Starting streaming, press Control-C to end...", file=sys.stderr)
try:
cur.consume_stream(democonsumer)
except KeyboardInterrupt:
cur.close()
conn.close()
print("The slot 'pytest' still exists. Drop it with SELECT pg_drop_replication_slot('pytest'); if no longer needed.", file=sys.stderr)
print("WARNING: Transaction logs will accumulate in pg_xlog until the slot is dropped.", file=sys.stderr)
You can now make changes to the ``psycopg2test`` database using a normal
psycopg2 session, ``psql``, etc. and see the logical decoding stream printed
by this demo client.
This will continue running until terminated with ``Control-C``.
Replication connection and cursor classes
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. autoclass:: ReplicationConnectionBase
@ -177,17 +250,11 @@ The following replication types are defined:
phys_cur = phys_conn.cursor()
Both `LogicalReplicationConnection` and `PhysicalReplicationConnection` use
`ReplicationCursor` for actual communication on the connection.
.. seealso::
- PostgreSQL `Streaming Replication Protocol`__
.. __: http://www.postgresql.org/docs/current/static/protocol-replication.html
`ReplicationCursor` for actual communication with the server.
The individual messages in the replication stream are presented by
`ReplicationMessage` objects:
The individual messages in the replication stream are represented by
`ReplicationMessage` objects (both logical and physical type):
.. autoclass:: ReplicationMessage
@ -249,7 +316,7 @@ The individual messages in the replication stream are presented by
replication slot is created by default. No output plugin parameter is
required or allowed when creating a physical replication slot.
In either case, the type of slot being created can be specified
In either case the type of slot being created can be specified
explicitly using *slot_type* parameter.
Replication slots are a feature of PostgreSQL server starting with
@ -295,25 +362,25 @@ The individual messages in the replication stream are presented by
replication can be used with both types of connection.
On the other hand, physical replication doesn't require a named
replication slot to be used, only logical one does. In any case,
logical replication and replication slots are a feature of PostgreSQL
server starting with version 9.4. Physical replication can be used
starting with 9.0.
replication slot to be used, only logical replication does. In any
case logical replication and replication slots are a feature of
PostgreSQL server starting with version 9.4. Physical replication can
be used starting with 9.0.
If *start_lsn* is specified, the requested stream will start from that
LSN. The default is `!None`, which passes the LSN ``0/0``, causing
replay to begin at the last point at which the server got replay
confirmation from the client for, or the oldest available point for a
new slot.
LSN. The default is `!None` which passes the LSN ``0/0`` causing
replay to begin at the last point for which the server got flush
confirmation from the client, or the oldest available point for a new
slot.
The server might produce an error if a WAL file for the given LSN has
already been recycled, or it may silently start streaming from a later
already been recycled or it may silently start streaming from a later
position: the client can verify the actual position using information
provided the `ReplicationMessage` attributes. The exact server
provided by the `ReplicationMessage` attributes. The exact server
behavior depends on the type of replication and use of slots.
A *timeline* parameter can only be specified with physical replication
and only starting with server version 9.3.
The *timeline* parameter can only be specified with physical
replication and only starting with server version 9.3.
A dictionary of *options* may be passed to the logical decoding plugin
on a logical replication slot. The set of supported options depends
@ -324,8 +391,9 @@ The individual messages in the replication stream are presented by
`start_replication_expert()` internally.
After starting the replication, to actually consume the incoming
server messages, use `consume_stream()` or implement a loop around
`read_message()` in case of asynchronous connection.
server messages use `consume_stream()` or implement a loop around
`read_message()` in case of :ref:`asynchronous connection
<async-support>`.
.. method:: start_replication_expert(command)
@ -343,66 +411,66 @@ The individual messages in the replication stream are presented by
This method can only be used with synchronous connection. For
asynchronous connections see `read_message()`.
Before calling this method to consume the stream, use
Before calling this method to consume the stream use
`start_replication()` first.
When called, this method enters an endless loop, reading messages from
the server and passing them to ``consume()``, then waiting for more
messages from the server. In order to make this method break out of
the loop and return, ``consume()`` can throw a `StopReplication`
exception (any unhandled exception will make it break out of the loop
as well).
This method enters an endless loop reading messages from the server
and passing them to ``consume()``, then waiting for more messages from
the server. In order to make this method break out of the loop and
return, ``consume()`` can throw a `StopReplication` exception. Any
unhandled exception will make it break out of the loop as well.
If *decode* is set to `!True`, the messages read from the server are
If *decode* is set to `!True` the messages read from the server are
converted according to the connection `~connection.encoding`. This
parameter should not be set with physical replication.
This method also sends keepalive messages to the server, in case there
This method also sends keepalive messages to the server in case there
were no new data from the server for the duration of
*keepalive_interval* (in seconds). The value of this parameter must
be equal to at least 1 second, but it can have a fractional part.
be set to at least 1 second, but it can have a fractional part.
The *msg* objects passed to ``consume()`` are instances of
`ReplicationMessage` class.
After processing certain amount of messages the client should send a
confirmation message to the server. This should be done by calling
`send_feedback()` method on the corresponding replication cursor. A
reference to the cursor is provided in the `ReplicationMessage` as an
attribute.
The following example is a sketch implementation of ``consume()``
callable for logical replication::
class LogicalStreamConsumer(object):
def __call__(self, msg):
self.store_message_data(msg.payload)
...
if self.should_report_to_the_server_now(msg):
def __call__(self, msg):
self.process_message(msg.payload)
if self.should_send_feedback(msg):
msg.cursor.send_feedback(flush_lsn=msg.data_start)
consumer = LogicalStreamConsumer()
cur.consume_stream(consumer, decode=True)
The *msg* objects passed to ``consume()`` are instances of
`ReplicationMessage` class.
After storing certain amount of messages' data reliably, the client
should send a confirmation message to the server. This should be done
by calling `send_feedback()` method on the corresponding replication
cursor. A reference to the cursor is provided in the
`ReplicationMessage` as an attribute.
.. warning::
When using replication with slots, failure to properly notify the
server by constantly consuming and reporting success at
appropriate times can eventually lead to "disk full" condition on
the server, because the server retains all the WAL segments that
might be needed to stream the changes via all of the currently
open replication slots.
When using replication with slots, failure to constantly consume
*and* report success to the server appropriately can eventually
lead to "disk full" condition on the server, because the server
retains all the WAL segments that might be needed to stream the
changes via all of the currently open replication slots.
On the other hand, it is not recommended to send a confirmation
after every processed message, since that will put an unnecessary
load on network and the server. A possible strategy is to confirm
after every COMMIT message.
On the other hand, it is not recommended to send confirmation
after *every* processed message, since that will put an
unnecessary load on network and the server. A possible strategy
is to confirm after every COMMIT message.
.. method:: send_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False)
:param write_lsn: a LSN position up to which the client has written the data locally
:param flush_lsn: a LSN position up to which the client has stored the
:param flush_lsn: a LSN position up to which the client has processed the
data reliably (the server is allowed to discard all
and every data that predates this LSN)
:param apply_lsn: a LSN position up to which the warm standby server
@ -411,7 +479,7 @@ The individual messages in the replication stream are presented by
:param reply: request the server to send back a keepalive message immediately
Use this method to report to the server that all messages up to a
certain LSN position have been stored on the client and may be
certain LSN position have been processed on the client and may be
discarded on the server.
This method can also be called with all default parameters' values to
@ -433,13 +501,14 @@ The individual messages in the replication stream are presented by
Returns `!True` if the feedback message was sent successfully,
`!False` otherwise.
Low-level methods for asynchronous connection operation.
Low-level replication cursor methods for :ref:`asynchronous connection
<async-support>` operation.
With the synchronous connection, a call to `consume_stream()` handles all
With the synchronous connection a call to `consume_stream()` handles all
the complexity of handling the incoming messages and sending keepalive
replies, but at times it might be beneficial to use low-level interface
for better control, in particular to `~select.select()` on multiple
sockets. The following methods are provided for asynchronous operation:
for better control, in particular to `~select` on multiple sockets. The
following methods are provided for asynchronous operation:
.. method:: read_message(decode=True)
@ -449,16 +518,16 @@ The individual messages in the replication stream are presented by
This method should be used in a loop with asynchronous connections
after calling `start_replication()` once.
It tries to read the next message from the server, without blocking
and returns an instance of `ReplicationMessage` or `!None`, in case
there are no more data messages from the server at the moment.
It tries to read the next message from the server without blocking and
returns an instance of `ReplicationMessage` or `!None`, in case there
are no more data messages from the server at the moment.
It is expected that the calling code will call this method repeatedly
in order to consume all of the messages that might have been buffered,
until `!None` is returned. After receiving a `!None` value from this
method, the caller should use `~select.select()` or `~select.poll()`
on the corresponding connection to block the process until there is
more data from the server.
in order to consume all of the messages that might have been buffered
until `!None` is returned. After receiving `!None` from this method
the caller should use `~select.select()` or `~select.poll()` on the
corresponding connection to block the process until there is more data
from the server.
The server can send keepalive messages to the client periodically.
Such messages are silently consumed by this method and are never
@ -480,24 +549,25 @@ The individual messages in the replication stream are presented by
An actual example of asynchronous operation might look like this::
def consume(msg):
...
def consume(msg):
...
keepalive_interval = 10.0
while True:
msg = cur.read_message()
if msg:
consume(msg)
else:
now = datetime.now()
timeout = keepalive_interval - (now - cur.io_timestamp).total_seconds()
if timeout > 0:
sel = select.select([cur], [], [], timeout)
else:
sel = ([], [], [])
keepalive_interval = 10.0
while True:
msg = cur.read_message()
if msg:
consume(msg)
else:
now = datetime.now()
timeout = keepalive_interval - (now - cur.io_timestamp).total_seconds()
if timeout > 0:
sel = select.select([cur], [], [], timeout)
else:
sel = ([], [], [])
if not sel[0]:
cur.send_feedback()
if not sel[0]:
# timed out, send keepalive message
cur.send_feedback()
.. index::
pair: Cursor; Replication

View File

@ -514,7 +514,7 @@ class StopReplication(Exception):
class ReplicationCursor(_replicationCursor):
"""A cursor used for communication on the replication protocol."""
"""A cursor used for communication on replication connections."""
def create_replication_slot(self, slot_name, slot_type=None, output_plugin=None):
"""Create streaming replication slot."""

View File

@ -146,7 +146,7 @@ static struct PyGetSetDef replicationMessageObject_getsets[] = {
/* object type */
#define replicationMessageType_doc \
"A database replication message."
"A replication protocol message."
PyTypeObject replicationMessageType = {
PyVarObject_HEAD_INIT(NULL, 0)