Cleanup start replication wrt. slot type a bit.

This commit is contained in:
Oleksandr Shulgin 2015-10-01 11:08:56 +02:00
parent f872a2aabb
commit 937a7a9024
3 changed files with 33 additions and 35 deletions

View File

@ -197,7 +197,7 @@ Replication cursor
cur.drop_replication_slot("testslot") cur.drop_replication_slot("testslot")
.. method:: start_replication(slot_type, slot_name=None, writer=None, start_lsn=None, timeline=0, keepalive_interval=10, options=None) .. method:: start_replication(slot_type, slot_name=None, writer=None, start_lsn=0, timeline=0, keepalive_interval=10, options=None)
Start a replication stream. On non-asynchronous connection, also Start a replication stream. On non-asynchronous connection, also
consume the stream messages. consume the stream messages.
@ -207,15 +207,16 @@ Replication cursor
:param slot_name: name of the replication slot to use (required for :param slot_name: name of the replication slot to use (required for
logical replication) logical replication)
:param writer: a file-like object to write replication messages to :param writer: a file-like object to write replication messages to
:param start_lsn: the LSN position to start from, in the form :param start_lsn: the optional LSN position to start replicating from,
``XXX/XXX`` (forward-slash separated pair of can be an integer or a string of hexadecimal digits
hexadecimals) in the form ``XXX/XXX``
:param timeline: WAL history timeline to start streaming from (optional, :param timeline: WAL history timeline to start streaming from (optional,
can only be used with physical replication) can only be used with physical replication)
:param keepalive_interval: interval (in seconds) to send keepalive :param keepalive_interval: interval (in seconds) to send keepalive
messages to the server messages to the server
:param options: a dictionary of options to pass to logical replication :param options: a dictionary of options to pass to logical replication
slot slot (not allowed with physical replication, use
*None*)
When used on non-asynchronous connection this method enters an endless When used on non-asynchronous connection this method enters an endless
loop, reading messages from the server and passing them to ``write()`` loop, reading messages from the server and passing them to ``write()``

View File

@ -480,8 +480,8 @@ class ReplicationConnection(_connection):
"""Streamging replication types.""" """Streamging replication types."""
REPLICATION_PHYSICAL = 0 REPLICATION_LOGICAL = "LOGICAL"
REPLICATION_LOGICAL = 1 REPLICATION_PHYSICAL = "PHYSICAL"
class ReplicationCursor(_cursor): class ReplicationCursor(_cursor):
"""A cursor used for replication commands.""" """A cursor used for replication commands."""
@ -504,18 +504,18 @@ class ReplicationCursor(_cursor):
if slot_type == REPLICATION_LOGICAL: if slot_type == REPLICATION_LOGICAL:
if output_plugin is None: if output_plugin is None:
raise psycopg2.ProgrammingError("output_plugin is required for logical replication slot") raise psycopg2.ProgrammingError("output plugin name is required for logical replication slot")
command += "LOGICAL %s" % self.quote_ident(output_plugin) command += "%s %s" % (slot_type, self.quote_ident(output_plugin))
elif slot_type == REPLICATION_PHYSICAL: elif slot_type == REPLICATION_PHYSICAL:
if output_plugin is not None: if output_plugin is not None:
raise psycopg2.ProgrammingError("output_plugin is not applicable to physical replication") raise psycopg2.ProgrammingError("cannot specify output plugin name for physical replication slot")
command += "PHYSICAL" command += slot_type
else: else:
raise psycopg2.ProgrammingError("unrecognized replication slot type") raise psycopg2.ProgrammingError("unrecognized replication slot type: %s" % slot_type)
self.execute(command) self.execute(command)
@ -525,44 +525,45 @@ class ReplicationCursor(_cursor):
command = "DROP_REPLICATION_SLOT %s" % self.quote_ident(slot_name) command = "DROP_REPLICATION_SLOT %s" % self.quote_ident(slot_name)
self.execute(command) self.execute(command)
def start_replication(self, slot_type, slot_name=None, writer=None, start_lsn=None, def start_replication(self, slot_type, slot_name=None, writer=None, start_lsn=0,
timeline=0, keepalive_interval=10, options=None): timeline=0, keepalive_interval=10, options=None):
"""Start and consume replication stream.""" """Start and consume replication stream."""
command = "START_REPLICATION " command = "START_REPLICATION "
if slot_type == REPLICATION_LOGICAL and slot_name is None: if slot_type == REPLICATION_LOGICAL:
raise psycopg2.ProgrammingError("slot_name is required for logical replication slot") if slot_name:
command += "SLOT %s " % self.quote_ident(slot_name)
else:
raise psycopg2.ProgrammingError("slot name is required for logical replication")
command += "%s " % slot_type
elif slot_type == REPLICATION_PHYSICAL:
if slot_name: if slot_name:
command += "SLOT %s " % self.quote_ident(slot_name) command += "SLOT %s " % self.quote_ident(slot_name)
if slot_type == REPLICATION_LOGICAL: # don't add "PHYSICAL", before 9.4 it was just START_REPLICATION XXX/XXX
command += "LOGICAL "
elif slot_type == REPLICATION_PHYSICAL:
command += "PHYSICAL "
else: else:
raise psycopg2.ProgrammingError("unrecognized replication slot type") raise psycopg2.ProgrammingError("unrecognized replication slot type: %s" % slot_type)
if start_lsn is None: if type(start_lsn) is str:
start_lsn = '0/0'
# reparse lsn to catch possible garbage
lsn = start_lsn.split('/') lsn = start_lsn.split('/')
command += "%X/%X" % (int(lsn[0], 16), int(lsn[1], 16)) lsn = "%X/%08X" % (int(lsn[0], 16), int(lsn[1], 16))
else:
lsn = "%X/%08X" % ((start_lsn >> 32) & 0xFFFFFFFF, start_lsn & 0xFFFFFFFF)
command += lsn
if timeline != 0: if timeline != 0:
if slot_type == REPLICATION_LOGICAL: if slot_type == REPLICATION_LOGICAL:
raise psycopg2.ProgrammingError("cannot specify timeline for logical replication") raise psycopg2.ProgrammingError("cannot specify timeline for logical replication")
if timeline < 0:
raise psycopg2.ProgrammingError("timeline must be >= 0: %d" % timeline)
command += " TIMELINE %d" % timeline command += " TIMELINE %d" % timeline
if options: if options:
if slot_type == REPLICATION_PHYSICAL: if slot_type == REPLICATION_PHYSICAL:
raise psycopg2.ProgrammingError("cannot specify plugin options for physical replication") raise psycopg2.ProgrammingError("cannot specify output plugin options for physical replication")
command += " (" command += " ("
for k,v in options.iteritems(): for k,v in options.iteritems():

View File

@ -97,10 +97,6 @@ struct cursorObject {
}; };
/* streaming replication modes */
#define CURSOR_REPLICATION_PHYSICAL 0
#define CURSOR_REPLICATION_LOGICAL 1
/* C-callable functions in cursor_int.c and cursor_type.c */ /* C-callable functions in cursor_int.c and cursor_type.c */
BORROWED HIDDEN PyObject *curs_get_cast(cursorObject *self, PyObject *oid); BORROWED HIDDEN PyObject *curs_get_cast(cursorObject *self, PyObject *oid);