Update replication connection/cursor interface and docs.

This commit is contained in:
Oleksandr Shulgin 2015-10-01 15:34:51 +02:00
parent 937a7a9024
commit 95ee218c6d
2 changed files with 151 additions and 71 deletions

View File

@ -144,20 +144,36 @@ Logging cursor
Replication cursor
^^^^^^^^^^^^^^^^^^
.. autoclass:: ReplicationConnection
.. autoclass:: LogicalReplicationConnection
This connection factory class can be used to open a special type of
connection that is used for streaming replication.
connection that is used for logical replication.
Example::
from psycopg2.extras import ReplicationConnection, REPLICATION_PHYSICAL, REPLICATION_LOGICAL
conn = psycopg2.connect(dsn, connection_factory=ReplicationConnection)
cur = conn.cursor()
from psycopg2.extras import LogicalReplicationConnection
log_conn = psycopg2.connect(dsn, connection_factory=LogicalReplicationConnection)
log_cur = log_conn.cursor()
.. autoclass:: PhysicalReplicationConnection
This connection factory class can be used to open a special type of
connection that is used for physical replication.
Example::
from psycopg2.extras import PhysicalReplicationConnection
phys_conn = psycopg2.connect(dsn, connection_factory=PhysicalReplicationConnection)
phys_cur = phys_conn.cursor()
Both `LogicalReplicationConnection` and `PhysicalReplicationConnection` use
`ReplicationCursor` for actual communication on the connection.
.. seealso::
- PostgreSQL `Replication protocol`__
- PostgreSQL `Streaming Replication Protocol`__
.. __: http://www.postgresql.org/docs/current/static/protocol-replication.html
@ -173,19 +189,38 @@ Replication cursor
>>> cur.identify_system()
{'timeline': 1, 'systemid': '1234567890123456789', 'dbname': 'test', 'xlogpos': '0/1ABCDEF'}
.. method:: create_replication_slot(slot_type, slot_name, output_plugin=None)
.. method:: create_replication_slot(slot_name, output_plugin=None)
Create streaming replication slot.
:param slot_type: type of replication: either `REPLICATION_PHYSICAL` or
`REPLICATION_LOGICAL`
:param slot_name: name of the replication slot to be created
:param output_plugin: name of the logical decoding output plugin to use
(logical replication only)
:param slot_type: type of replication: should be either
`REPLICATION_LOGICAL` or `REPLICATION_PHYSICAL`
:param output_plugin: name of the logical decoding output plugin to be
used by the slot; required for logical
replication connections, disallowed for physical
Example::
cur.create_replication_slot(REPLICATION_LOGICAL, "testslot", "test_decoding")
log_cur.create_replication_slot("logical1", "test_decoding")
phys_cur.create_replication_slot("physical1")
# either logical or physical replication connection
cur.create_replication_slot("slot1", slot_type=REPLICATION_LOGICAL)
When creating a slot on a logical replication connection, a logical
replication slot is created by default. Logical replication requires
name of the logical decoding output plugin to be specified.
When creating a slot on a physical replication connection, a physical
replication slot is created by default. No output plugin parameter is
required or allowed when creating a physical replication slot.
In either case, the type of slot being created can be specified
explicitly using *slot_type* parameter.
Replication slots are a feature of PostgreSQL server starting with
version 9.4.
.. method:: drop_replication_slot(slot_name)
@ -195,18 +230,24 @@ Replication cursor
Example::
cur.drop_replication_slot("testslot")
# either logical or physical replication connection
cur.drop_replication_slot("slot1")
.. method:: start_replication(slot_type, slot_name=None, writer=None, start_lsn=0, timeline=0, keepalive_interval=10, options=None)
This
Replication slots are a feature of PostgreSQL server starting with
version 9.4.
Start a replication stream. On non-asynchronous connection, also
consume the stream messages.
.. method:: start_replication(slot_name=None, writer=None, slot_type=None, start_lsn=0, timeline=0, keepalive_interval=10, options=None)
:param slot_type: type of replication: either `REPLICATION_PHYSICAL` or
`REPLICATION_LOGICAL`
:param slot_name: name of the replication slot to use (required for
logical replication)
Start replication on the connection.
:param slot_name: name of the replication slot to use; required for
logical replication, physical replication can work
with or without a slot
:param writer: a file-like object to write replication messages to
:param slot_type: type of replication: should be either
`REPLICATION_LOGICAL` or `REPLICATION_PHYSICAL`
:param start_lsn: the optional LSN position to start replicating from,
can be an integer or a string of hexadecimal digits
in the form ``XXX/XXX``
@ -215,9 +256,23 @@ Replication cursor
:param keepalive_interval: interval (in seconds) to send keepalive
messages to the server
:param options: a dictionary of options to pass to logical replication
slot (not allowed with physical replication, use
slot (not allowed with physical replication, set to
*None*)
If not specified using *slot_type* parameter, the type of replication
to be started is defined by the type of replication connection.
Logical replication is only allowed on logical replication connection,
but physical replication can be used with both types of connection.
On the other hand, physical replication doesn't require a named
replication slot to be used, only logical one does. In any case,
logical replication and replication slots are a feature of PostgreSQL
server starting with version 9.4. Physical replication can be used
starting with 9.0.
If a *slot_name* is specified, the slot must exist on the server and
its type must match the replication type used.
When used on non-asynchronous connection this method enters an endless
loop, reading messages from the server and passing them to ``write()``
method of the *writer* object. This is similar to operation of the
@ -391,10 +446,8 @@ Replication cursor
A reference to the corresponding `~ReplicationCursor` object.
.. data:: REPLICATION_PHYSICAL
.. data:: REPLICATION_LOGICAL
.. data:: REPLICATION_PHYSICAL
.. index::
pair: Cursor; Replication

View File

@ -438,53 +438,78 @@ class MinTimeLoggingCursor(LoggingCursor):
return LoggingCursor.callproc(self, procname, vars)
class ReplicationConnection(_connection):
"""A connection that uses `ReplicationCursor` automatically."""
"""Replication connection types."""
REPLICATION_LOGICAL = "LOGICAL"
REPLICATION_PHYSICAL = "PHYSICAL"
class ReplicationConnectionBase(_connection):
"""
Base class for Logical and Physical replication connection
classes. Uses `ReplicationCursor` automatically.
"""
def __init__(self, *args, **kwargs):
"""Initializes a replication connection, by adding appropriate replication parameter to the provided dsn arguments."""
"""
Initializes a replication connection by adding appropriate
parameters to the provided DSN and tweaking the connection
attributes.
"""
if len(args):
dsn = args[0]
# replication_type is set in subclasses
if self.replication_type == REPLICATION_LOGICAL:
replication = 'database'
# FIXME: could really use parse_dsn here
elif self.replication_type == REPLICATION_PHYSICAL:
replication = 'true'
if dsn.startswith('postgres://') or dsn.startswith('postgresql://'):
# poor man's url parsing
if dsn.rfind('?') > 0:
if not dsn.endswith('?'):
dsn += '&'
else:
dsn += '?'
else:
dsn += ' '
dsn += 'replication=database'
args = [dsn] + list(args[1:])
else:
dbname = kwargs.get('dbname', None)
if dbname is None:
kwargs['dbname'] = 'replication'
raise psycopg2.ProgrammingError("unrecognized replication type: %s" % self.replication_type)
if kwargs.get('replication', None) is None:
kwargs['replication'] = 'database' if dbname else 'true'
# FIXME: could really use parse_dsn here
dsn = args[0]
if dsn.startswith('postgres://') or dsn.startswith('postgresql://'):
# poor man's url parsing
if dsn.rfind('?') > 0:
if not dsn.endswith('?'):
dsn += '&'
else:
dsn += '?'
else:
dsn += ' '
dsn += 'replication=%s' % replication
args = [dsn] + list(args[1:])
super(ReplicationConnection, self).__init__(*args, **kwargs)
super(ReplicationConnectionBase, self).__init__(*args, **kwargs)
# prevent auto-issued BEGIN statements
if not self.async:
self.autocommit = True
def cursor(self, *args, **kwargs):
kwargs.setdefault('cursor_factory', ReplicationCursor)
return super(ReplicationConnection, self).cursor(*args, **kwargs)
if self.cursor_factory is None:
self.cursor_factory = ReplicationCursor
def quote_ident(self, ident):
# FIXME: use PQescapeIdentifier or psycopg_escape_identifier_easy, somehow
return '"%s"' % ident.replace('"', '""')
"""Streamging replication types."""
REPLICATION_LOGICAL = "LOGICAL"
REPLICATION_PHYSICAL = "PHYSICAL"
class LogicalReplicationConnection(ReplicationConnectionBase):
def __init__(self, *args, **kwargs):
self.replication_type = REPLICATION_LOGICAL
super(LogicalReplicationConnection, self).__init__(*args, **kwargs)
class PhysicalReplicationConnection(ReplicationConnectionBase):
def __init__(self, *args, **kwargs):
self.replication_type = REPLICATION_PHYSICAL
super(PhysicalReplicationConnection, self).__init__(*args, **kwargs)
class ReplicationCursor(_cursor):
"""A cursor used for replication commands."""
"""A cursor used for communication on the replication protocol."""
def identify_system(self):
"""Get information about the cluster status."""
@ -493,47 +518,49 @@ class ReplicationCursor(_cursor):
return dict(zip([_.name for _ in self.description],
self.fetchall()[0]))
def quote_ident(self, ident):
# FIXME: use PQescapeIdentifier or psycopg_escape_identifier_easy, somehow
return '"%s"' % ident.replace('"', '""')
def create_replication_slot(self, slot_type, slot_name, output_plugin=None):
def create_replication_slot(self, slot_name, slot_type=None, output_plugin=None):
"""Create streaming replication slot."""
command = "CREATE_REPLICATION_SLOT %s " % self.quote_ident(slot_name)
command = "CREATE_REPLICATION_SLOT %s " % self.connection.quote_ident(slot_name)
if slot_type is None:
slot_type = self.connection.replication_type
if slot_type == REPLICATION_LOGICAL:
if output_plugin is None:
raise psycopg2.ProgrammingError("output plugin name is required for logical replication slot")
raise psycopg2.ProgrammingError("output plugin name is required to create logical replication slot")
command += "%s %s" % (slot_type, self.quote_ident(output_plugin))
command += "%s %s" % (slot_type, self.connection.quote_ident(output_plugin))
elif slot_type == REPLICATION_PHYSICAL:
if output_plugin is not None:
raise psycopg2.ProgrammingError("cannot specify output plugin name for physical replication slot")
raise psycopg2.ProgrammingError("cannot specify output plugin name when creating physical replication slot")
command += slot_type
else:
raise psycopg2.ProgrammingError("unrecognized replication slot type: %s" % slot_type)
raise psycopg2.ProgrammingError("unrecognized replication type: %s" % slot_type)
self.execute(command)
def drop_replication_slot(self, slot_name):
"""Drop streaming replication slot."""
command = "DROP_REPLICATION_SLOT %s" % self.quote_ident(slot_name)
command = "DROP_REPLICATION_SLOT %s" % self.connection.quote_ident(slot_name)
self.execute(command)
def start_replication(self, slot_type, slot_name=None, writer=None, start_lsn=0,
def start_replication(self, slot_name=None, writer=None, slot_type=None, start_lsn=0,
timeline=0, keepalive_interval=10, options=None):
"""Start and consume replication stream."""
command = "START_REPLICATION "
if slot_type is None:
slot_type = self.connection.replication_type
if slot_type == REPLICATION_LOGICAL:
if slot_name:
command += "SLOT %s " % self.quote_ident(slot_name)
command += "SLOT %s " % self.connection.quote_ident(slot_name)
else:
raise psycopg2.ProgrammingError("slot name is required for logical replication")
@ -541,11 +568,11 @@ class ReplicationCursor(_cursor):
elif slot_type == REPLICATION_PHYSICAL:
if slot_name:
command += "SLOT %s " % self.quote_ident(slot_name)
command += "SLOT %s " % self.connection.quote_ident(slot_name)
# don't add "PHYSICAL", before 9.4 it was just START_REPLICATION XXX/XXX
else:
raise psycopg2.ProgrammingError("unrecognized replication slot type: %s" % slot_type)
raise psycopg2.ProgrammingError("unrecognized replication type: %s" % slot_type)
if type(start_lsn) is str:
lsn = start_lsn.split('/')
@ -569,7 +596,7 @@ class ReplicationCursor(_cursor):
for k,v in options.iteritems():
if not command.endswith('('):
command += ", "
command += "%s %s" % (self.quote_ident(k), _A(str(v)))
command += "%s %s" % (self.connection.quote_ident(k), _A(str(v)))
command += ")"
return self.start_replication_expert(command, writer=writer,