mirror of
https://github.com/psycopg/psycopg2.git
synced 2025-03-05 16:35:47 +03:00
Update docs on ReplicationCursor
This commit is contained in:
parent
0d731aa12e
commit
9386653d72
|
@ -165,12 +165,12 @@ Replication cursor
|
||||||
|
|
||||||
.. method:: identify_system()
|
.. method:: identify_system()
|
||||||
|
|
||||||
This method executes ``IDENTIFY_SYSTEM`` command of the streaming
|
Execute ``IDENTIFY_SYSTEM`` command of the streaming replication
|
||||||
replication protocol and returns a result as a dictionary.
|
protocol and return the result as a dictionary.
|
||||||
|
|
||||||
Example::
|
Example::
|
||||||
|
|
||||||
>>> print cur.identify_system()
|
>>> cur.identify_system()
|
||||||
{'timeline': 1, 'systemid': '1234567890123456789', 'dbname': 'test', 'xlogpos': '0/1ABCDEF'}
|
{'timeline': 1, 'systemid': '1234567890123456789', 'dbname': 'test', 'xlogpos': '0/1ABCDEF'}
|
||||||
|
|
||||||
.. method:: create_replication_slot(slot_type, slot_name, output_plugin=None)
|
.. method:: create_replication_slot(slot_type, slot_name, output_plugin=None)
|
||||||
|
@ -199,82 +199,81 @@ Replication cursor
|
||||||
|
|
||||||
.. method:: start_replication(slot_type, slot_name=None, writer=None, start_lsn=None, timeline=0, keepalive_interval=10, options=None)
|
.. method:: start_replication(slot_type, slot_name=None, writer=None, start_lsn=None, timeline=0, keepalive_interval=10, options=None)
|
||||||
|
|
||||||
Start and consume replication stream.
|
Start a replication stream. On non-asynchronous connection, also
|
||||||
|
consume the stream messages.
|
||||||
|
|
||||||
:param slot_type: type of replication: either `REPLICATION_PHYSICAL` or
|
:param slot_type: type of replication: either `REPLICATION_PHYSICAL` or
|
||||||
`REPLICATION_LOGICAL`
|
`REPLICATION_LOGICAL`
|
||||||
:param slot_name: name of the replication slot to use (required for
|
:param slot_name: name of the replication slot to use (required for
|
||||||
logical replication)
|
logical replication)
|
||||||
:param writer: a file-like object to write replication messages to
|
:param writer: a file-like object to write replication messages to
|
||||||
:param start_lsn: the point in replication stream (WAL position) to start
|
:param start_lsn: the LSN position to start from, in the form
|
||||||
from, in the form ``XXX/XXX`` (forward-slash separated
|
``XXX/XXX`` (forward-slash separated pair of
|
||||||
pair of hexadecimals)
|
hexadecimals)
|
||||||
:param timeline: WAL history timeline to start streaming from (optional,
|
:param timeline: WAL history timeline to start streaming from (optional,
|
||||||
can only be used with physical replication)
|
can only be used with physical replication)
|
||||||
:param keepalive_interval: interval (in seconds) to send keepalive
|
:param keepalive_interval: interval (in seconds) to send keepalive
|
||||||
messages to the server
|
messages to the server
|
||||||
:param options: an dictionary of options to pass to logical replication
|
:param options: a dictionary of options to pass to logical replication
|
||||||
slot
|
slot
|
||||||
|
|
||||||
With non-asynchronous connection, this method enters an endless loop,
|
When used on non-asynchronous connection this method enters an endless
|
||||||
reading messages from the server and passing them to ``write()`` method
|
loop, reading messages from the server and passing them to ``write()``
|
||||||
of the *writer* object. This is similar to operation of the
|
method of the *writer* object. This is similar to operation of the
|
||||||
`~cursor.copy_to()` method. It also sends keepalive messages to the
|
`~cursor.copy_to()` method. It also sends keepalive messages to the
|
||||||
server, in case there were no new data from it for the duration of
|
server, in case there were no new data from it for the duration of
|
||||||
*keepalive_interval* seconds (this parameter must be greater than 1
|
*keepalive_interval* seconds (this parameter's value must be equal to
|
||||||
second, but it can have a fractional part).
|
at least than 1 second, but it can have a fractional part).
|
||||||
|
|
||||||
With asynchronous connection, this method returns immediately and the
|
With asynchronous connection, this method returns immediately and the
|
||||||
calling code can start reading the replication messages in a loop.
|
calling code can start reading the replication messages in a loop.
|
||||||
|
|
||||||
A sketch implementation of the *writer* object might look similar to
|
A sketch implementation of the *writer* object for logical replication
|
||||||
the following::
|
might look similar to the following::
|
||||||
|
|
||||||
from io import TextIOBase
|
from io import TextIOBase
|
||||||
|
|
||||||
class ReplicationStreamWriter(TextIOBase):
|
class LogicalStreamWriter(TextIOBase):
|
||||||
|
|
||||||
def write(self, msg):
|
def write(self, msg):
|
||||||
self.store_data_reliably(msg)
|
self.store_message_data(msg.payload)
|
||||||
|
|
||||||
if self.should_report_to_the_server(msg):
|
if self.should_report_to_the_server_now(msg):
|
||||||
msg.cursor.send_replication_feedback(flush_lsn=msg.wal_end)
|
msg.cursor.send_replication_feedback(flush_lsn=msg.wal_end)
|
||||||
|
|
||||||
def store_data_reliably(self, msg):
|
First, like with the `~cursor.copy_to()` method, the code that calls
|
||||||
...
|
the provided ``write()`` method checks if the *writer* object is
|
||||||
|
|
||||||
def shoud_report_to_the_server(self, msg):
|
|
||||||
...
|
|
||||||
|
|
||||||
First, like with the `~cursor.copy_to()` method, the code that is
|
|
||||||
calling the provided write method checks if the *writer* object is
|
|
||||||
inherited from `~io.TextIOBase`. If that is the case, the message
|
inherited from `~io.TextIOBase`. If that is the case, the message
|
||||||
payload to be passed is converted to unicode using the connection's
|
payload to be passed is converted to unicode using the connection's
|
||||||
encoding information. Otherwise, the message is passed as is.
|
`~connection.encoding` information. Otherwise, the message is passed
|
||||||
|
as is.
|
||||||
|
|
||||||
The *msg* object being passed is an instance of `~ReplicationMessage`
|
The *msg* object being passed is an instance of `~ReplicationMessage`
|
||||||
class.
|
class.
|
||||||
|
|
||||||
After storing the data passed in the message object, the writer object
|
After storing certain amount of messages' data reliably, the client
|
||||||
should consider sending a confirmation message to the server. This is
|
should send a confirmation message to the server. This should be done
|
||||||
done by calling `~send_replication_feedback()` method on the
|
by calling `~send_replication_feedback()` method on the corresponding
|
||||||
corresponding replication cursor. A reference to the cursor producing
|
replication cursor. A reference to the cursor is provided in the
|
||||||
a given message is provided in the `~ReplicationMessage` as an
|
`~ReplicationMessage` as an attribute.
|
||||||
attribute.
|
|
||||||
|
|
||||||
.. note::
|
.. warning::
|
||||||
|
|
||||||
One needs to be aware that failure to properly notify the server on
|
Failure to properly notify the server by constantly consuming and
|
||||||
any one replication slot by constantly consuming and reporting
|
reporting success at appropriate times can eventually lead to "disk
|
||||||
success to the server at appropriate times can eventually lead to
|
full" condition on the server, because the server retains all the
|
||||||
"disk full" condition on the server, because the server retains all
|
WAL segments that might be needed to stream the changes via all of
|
||||||
the WAL segments that might be needed to stream the changes via
|
the currently open replication slots.
|
||||||
currently open replication slots.
|
|
||||||
|
On the other hand, it is not recommended to send a confirmation
|
||||||
|
after every processed message, since that will put an unnecessary
|
||||||
|
load on network and the server. A possible strategy is to confirm
|
||||||
|
after every COMMIT message.
|
||||||
|
|
||||||
.. method:: stop_replication()
|
.. method:: stop_replication()
|
||||||
|
|
||||||
In non-asynchronous connection, when called from the ``write()``
|
In non-asynchronous connection, when called from the ``write()``
|
||||||
method tells the code in `~start_replication` to break out of the
|
method, tell the code in `~start_replication` to break out of the
|
||||||
endless loop and return.
|
endless loop and return.
|
||||||
|
|
||||||
.. method:: send_replication_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False)
|
.. method:: send_replication_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False)
|
||||||
|
@ -291,12 +290,12 @@ Replication cursor
|
||||||
Use this method to report to the server that all messages up to a
|
Use this method to report to the server that all messages up to a
|
||||||
certain LSN position have been stored and may be discarded.
|
certain LSN position have been stored and may be discarded.
|
||||||
|
|
||||||
This method can also be called with default parameters to send a
|
This method can also be called with all default parameters' values to
|
||||||
keepalive message to the server.
|
send a keepalive message to the server.
|
||||||
|
|
||||||
In case the message cannot be sent at the moment, remembers the
|
In case of asynchronous connection, if the feedback message cannot be
|
||||||
positions for a later successful call or call to
|
sent at the moment, remembers the passed LSN positions for a later
|
||||||
`~flush_replication_feedback()`.
|
hopefully successful call or call to `~flush_replication_feedback()`.
|
||||||
|
|
||||||
.. method:: flush_replication_feedback(reply=False)
|
.. method:: flush_replication_feedback(reply=False)
|
||||||
|
|
||||||
|
@ -307,10 +306,10 @@ Replication cursor
|
||||||
|
|
||||||
Low-level methods for asynchronous connection operation.
|
Low-level methods for asynchronous connection operation.
|
||||||
|
|
||||||
While with the non-asynchronous connection, a single call to
|
With the non-asynchronous connection, a single call to
|
||||||
`~start_replication()` handles all the complexity, at times it might be
|
`~start_replication()` handles all the complexity, but at times it might
|
||||||
beneficial to use low-level interface for better control, in particular to
|
be beneficial to use low-level interface for better control, in particular
|
||||||
`~select.select()` on multiple sockets. The following methods are
|
to `~select.select()` on multiple sockets. The following methods are
|
||||||
provided for asynchronous operation:
|
provided for asynchronous operation:
|
||||||
|
|
||||||
.. method:: read_replication_message(decode=True)
|
.. method:: read_replication_message(decode=True)
|
||||||
|
@ -319,14 +318,18 @@ Replication cursor
|
||||||
performed on the data received from the server
|
performed on the data received from the server
|
||||||
|
|
||||||
This method should be used in a loop with asynchronous connections
|
This method should be used in a loop with asynchronous connections
|
||||||
after calling `~start_replication()`.
|
after calling `~start_replication()` once.
|
||||||
|
|
||||||
It tries to read the next message from the server, without blocking
|
It tries to read the next message from the server, without blocking
|
||||||
and returns an instance of `~ReplicationMessage` or *None*, in case
|
and returns an instance of `~ReplicationMessage` or *None*, in case
|
||||||
there are no more data messages from the server at the moment. After
|
there are no more data messages from the server at the moment.
|
||||||
receiving a *None* value from this method, one should use a
|
|
||||||
`~select.select()` or `~select.poll()` on the corresponding connection
|
It is expected that the calling code will call this method repeatedly
|
||||||
to block the process until there is more data from the server.
|
in order to consume all of the messages that might have been buffered,
|
||||||
|
until *None* is returned. After receiving a *None* value from this
|
||||||
|
method, one might use `~select.select()` or `~select.poll()` on the
|
||||||
|
corresponding connection to block the process until there is more data
|
||||||
|
from the server.
|
||||||
|
|
||||||
The server can send keepalive messages to the client periodically.
|
The server can send keepalive messages to the client periodically.
|
||||||
Such messages are silently consumed by this method and are never
|
Such messages are silently consumed by this method and are never
|
||||||
|
@ -334,8 +337,8 @@ Replication cursor
|
||||||
|
|
||||||
.. method:: fileno()
|
.. method:: fileno()
|
||||||
|
|
||||||
Calls the corresponding connection's `~connection.fileno()` method
|
Call the corresponding connection's `~connection.fileno()` method and
|
||||||
and returns the result.
|
return the result.
|
||||||
|
|
||||||
This is a convenience method which allows replication cursor to be
|
This is a convenience method which allows replication cursor to be
|
||||||
used directly in `~select.select()` or `~select.poll()` calls.
|
used directly in `~select.select()` or `~select.poll()` calls.
|
||||||
|
|
Loading…
Reference in New Issue
Block a user