diff --git a/doc/src/extras.rst b/doc/src/extras.rst index 19c81523..1da983a4 100644 --- a/doc/src/extras.rst +++ b/doc/src/extras.rst @@ -197,7 +197,7 @@ Replication cursor cur.drop_replication_slot("testslot") - .. method:: start_replication(slot_type, slot_name=None, writer=None, start_lsn=None, timeline=0, keepalive_interval=10, options=None) + .. method:: start_replication(slot_type, slot_name=None, writer=None, start_lsn=0, timeline=0, keepalive_interval=10, options=None) Start a replication stream. On non-asynchronous connection, also consume the stream messages. @@ -207,15 +207,16 @@ Replication cursor :param slot_name: name of the replication slot to use (required for logical replication) :param writer: a file-like object to write replication messages to - :param start_lsn: the LSN position to start from, in the form - ``XXX/XXX`` (forward-slash separated pair of - hexadecimals) + :param start_lsn: the optional LSN position to start replicating from, + can be an integer or a string of hexadecimal digits + in the form ``XXX/XXX`` :param timeline: WAL history timeline to start streaming from (optional, can only be used with physical replication) :param keepalive_interval: interval (in seconds) to send keepalive messages to the server :param options: a dictionary of options to pass to logical replication - slot + slot (not allowed with physical replication, use + *None*) When used on non-asynchronous connection this method enters an endless loop, reading messages from the server and passing them to ``write()`` diff --git a/lib/extras.py b/lib/extras.py index 85debc68..36138c63 100644 --- a/lib/extras.py +++ b/lib/extras.py @@ -480,8 +480,8 @@ class ReplicationConnection(_connection): """Streamging replication types.""" -REPLICATION_PHYSICAL = 0 -REPLICATION_LOGICAL = 1 +REPLICATION_LOGICAL = "LOGICAL" +REPLICATION_PHYSICAL = "PHYSICAL" class ReplicationCursor(_cursor): """A cursor used for replication commands.""" @@ -504,18 +504,18 @@ class ReplicationCursor(_cursor): if slot_type == REPLICATION_LOGICAL: if output_plugin is None: - raise psycopg2.ProgrammingError("output_plugin is required for logical replication slot") + raise psycopg2.ProgrammingError("output plugin name is required for logical replication slot") - command += "LOGICAL %s" % self.quote_ident(output_plugin) + command += "%s %s" % (slot_type, self.quote_ident(output_plugin)) elif slot_type == REPLICATION_PHYSICAL: if output_plugin is not None: - raise psycopg2.ProgrammingError("output_plugin is not applicable to physical replication") + raise psycopg2.ProgrammingError("cannot specify output plugin name for physical replication slot") - command += "PHYSICAL" + command += slot_type else: - raise psycopg2.ProgrammingError("unrecognized replication slot type") + raise psycopg2.ProgrammingError("unrecognized replication slot type: %s" % slot_type) self.execute(command) @@ -525,44 +525,45 @@ class ReplicationCursor(_cursor): command = "DROP_REPLICATION_SLOT %s" % self.quote_ident(slot_name) self.execute(command) - def start_replication(self, slot_type, slot_name=None, writer=None, start_lsn=None, + def start_replication(self, slot_type, slot_name=None, writer=None, start_lsn=0, timeline=0, keepalive_interval=10, options=None): """Start and consume replication stream.""" command = "START_REPLICATION " - if slot_type == REPLICATION_LOGICAL and slot_name is None: - raise psycopg2.ProgrammingError("slot_name is required for logical replication slot") - - if slot_name: - command += "SLOT %s " % self.quote_ident(slot_name) - if slot_type == REPLICATION_LOGICAL: - command += "LOGICAL " + if slot_name: + command += "SLOT %s " % self.quote_ident(slot_name) + else: + raise psycopg2.ProgrammingError("slot name is required for logical replication") + + command += "%s " % slot_type + elif slot_type == REPLICATION_PHYSICAL: - command += "PHYSICAL " + if slot_name: + command += "SLOT %s " % self.quote_ident(slot_name) + + # don't add "PHYSICAL", before 9.4 it was just START_REPLICATION XXX/XXX else: - raise psycopg2.ProgrammingError("unrecognized replication slot type") + raise psycopg2.ProgrammingError("unrecognized replication slot type: %s" % slot_type) - if start_lsn is None: - start_lsn = '0/0' + if type(start_lsn) is str: + lsn = start_lsn.split('/') + lsn = "%X/%08X" % (int(lsn[0], 16), int(lsn[1], 16)) + else: + lsn = "%X/%08X" % ((start_lsn >> 32) & 0xFFFFFFFF, start_lsn & 0xFFFFFFFF) - # reparse lsn to catch possible garbage - lsn = start_lsn.split('/') - command += "%X/%X" % (int(lsn[0], 16), int(lsn[1], 16)) + command += lsn if timeline != 0: if slot_type == REPLICATION_LOGICAL: raise psycopg2.ProgrammingError("cannot specify timeline for logical replication") - if timeline < 0: - raise psycopg2.ProgrammingError("timeline must be >= 0: %d" % timeline) - command += " TIMELINE %d" % timeline if options: if slot_type == REPLICATION_PHYSICAL: - raise psycopg2.ProgrammingError("cannot specify plugin options for physical replication") + raise psycopg2.ProgrammingError("cannot specify output plugin options for physical replication") command += " (" for k,v in options.iteritems(): diff --git a/psycopg/cursor.h b/psycopg/cursor.h index 380abbf4..dd07243f 100644 --- a/psycopg/cursor.h +++ b/psycopg/cursor.h @@ -97,10 +97,6 @@ struct cursorObject { }; -/* streaming replication modes */ -#define CURSOR_REPLICATION_PHYSICAL 0 -#define CURSOR_REPLICATION_LOGICAL 1 - /* C-callable functions in cursor_int.c and cursor_type.c */ BORROWED HIDDEN PyObject *curs_get_cast(cursorObject *self, PyObject *oid);