diff --git a/doc/src/extras.rst b/doc/src/extras.rst index 1da983a4..de94e6d0 100644 --- a/doc/src/extras.rst +++ b/doc/src/extras.rst @@ -144,20 +144,36 @@ Logging cursor Replication cursor ^^^^^^^^^^^^^^^^^^ -.. autoclass:: ReplicationConnection +.. autoclass:: LogicalReplicationConnection This connection factory class can be used to open a special type of - connection that is used for streaming replication. + connection that is used for logical replication. Example:: - from psycopg2.extras import ReplicationConnection, REPLICATION_PHYSICAL, REPLICATION_LOGICAL - conn = psycopg2.connect(dsn, connection_factory=ReplicationConnection) - cur = conn.cursor() + from psycopg2.extras import LogicalReplicationConnection + log_conn = psycopg2.connect(dsn, connection_factory=LogicalReplicationConnection) + log_cur = log_conn.cursor() + + +.. autoclass:: PhysicalReplicationConnection + + This connection factory class can be used to open a special type of + connection that is used for physical replication. + + Example:: + + from psycopg2.extras import PhysicalReplicationConnection + phys_conn = psycopg2.connect(dsn, connection_factory=PhysicalReplicationConnection) + phys_cur = phys_conn.cursor() + + + Both `LogicalReplicationConnection` and `PhysicalReplicationConnection` use + `ReplicationCursor` for actual communication on the connection. .. seealso:: - - PostgreSQL `Replication protocol`__ + - PostgreSQL `Streaming Replication Protocol`__ .. __: http://www.postgresql.org/docs/current/static/protocol-replication.html @@ -173,19 +189,38 @@ Replication cursor >>> cur.identify_system() {'timeline': 1, 'systemid': '1234567890123456789', 'dbname': 'test', 'xlogpos': '0/1ABCDEF'} - .. method:: create_replication_slot(slot_type, slot_name, output_plugin=None) + .. method:: create_replication_slot(slot_name, output_plugin=None) Create streaming replication slot. - :param slot_type: type of replication: either `REPLICATION_PHYSICAL` or - `REPLICATION_LOGICAL` :param slot_name: name of the replication slot to be created - :param output_plugin: name of the logical decoding output plugin to use - (logical replication only) + :param slot_type: type of replication: should be either + `REPLICATION_LOGICAL` or `REPLICATION_PHYSICAL` + :param output_plugin: name of the logical decoding output plugin to be + used by the slot; required for logical + replication connections, disallowed for physical Example:: - cur.create_replication_slot(REPLICATION_LOGICAL, "testslot", "test_decoding") + log_cur.create_replication_slot("logical1", "test_decoding") + phys_cur.create_replication_slot("physical1") + + # either logical or physical replication connection + cur.create_replication_slot("slot1", slot_type=REPLICATION_LOGICAL) + + When creating a slot on a logical replication connection, a logical + replication slot is created by default. Logical replication requires + name of the logical decoding output plugin to be specified. + + When creating a slot on a physical replication connection, a physical + replication slot is created by default. No output plugin parameter is + required or allowed when creating a physical replication slot. + + In either case, the type of slot being created can be specified + explicitly using *slot_type* parameter. + + Replication slots are a feature of PostgreSQL server starting with + version 9.4. .. method:: drop_replication_slot(slot_name) @@ -195,18 +230,24 @@ Replication cursor Example:: - cur.drop_replication_slot("testslot") + # either logical or physical replication connection + cur.drop_replication_slot("slot1") - .. method:: start_replication(slot_type, slot_name=None, writer=None, start_lsn=0, timeline=0, keepalive_interval=10, options=None) + This + + Replication slots are a feature of PostgreSQL server starting with + version 9.4. - Start a replication stream. On non-asynchronous connection, also - consume the stream messages. + .. method:: start_replication(slot_name=None, writer=None, slot_type=None, start_lsn=0, timeline=0, keepalive_interval=10, options=None) - :param slot_type: type of replication: either `REPLICATION_PHYSICAL` or - `REPLICATION_LOGICAL` - :param slot_name: name of the replication slot to use (required for - logical replication) + Start replication on the connection. + + :param slot_name: name of the replication slot to use; required for + logical replication, physical replication can work + with or without a slot :param writer: a file-like object to write replication messages to + :param slot_type: type of replication: should be either + `REPLICATION_LOGICAL` or `REPLICATION_PHYSICAL` :param start_lsn: the optional LSN position to start replicating from, can be an integer or a string of hexadecimal digits in the form ``XXX/XXX`` @@ -215,9 +256,23 @@ Replication cursor :param keepalive_interval: interval (in seconds) to send keepalive messages to the server :param options: a dictionary of options to pass to logical replication - slot (not allowed with physical replication, use + slot (not allowed with physical replication, set to *None*) + If not specified using *slot_type* parameter, the type of replication + to be started is defined by the type of replication connection. + Logical replication is only allowed on logical replication connection, + but physical replication can be used with both types of connection. + + On the other hand, physical replication doesn't require a named + replication slot to be used, only logical one does. In any case, + logical replication and replication slots are a feature of PostgreSQL + server starting with version 9.4. Physical replication can be used + starting with 9.0. + + If a *slot_name* is specified, the slot must exist on the server and + its type must match the replication type used. + When used on non-asynchronous connection this method enters an endless loop, reading messages from the server and passing them to ``write()`` method of the *writer* object. This is similar to operation of the @@ -391,10 +446,8 @@ Replication cursor A reference to the corresponding `~ReplicationCursor` object. - -.. data:: REPLICATION_PHYSICAL - .. data:: REPLICATION_LOGICAL +.. data:: REPLICATION_PHYSICAL .. index:: pair: Cursor; Replication diff --git a/lib/extras.py b/lib/extras.py index 36138c63..4587afea 100644 --- a/lib/extras.py +++ b/lib/extras.py @@ -438,53 +438,78 @@ class MinTimeLoggingCursor(LoggingCursor): return LoggingCursor.callproc(self, procname, vars) -class ReplicationConnection(_connection): - """A connection that uses `ReplicationCursor` automatically.""" +"""Replication connection types.""" +REPLICATION_LOGICAL = "LOGICAL" +REPLICATION_PHYSICAL = "PHYSICAL" + + +class ReplicationConnectionBase(_connection): + """ + Base class for Logical and Physical replication connection + classes. Uses `ReplicationCursor` automatically. + """ def __init__(self, *args, **kwargs): - """Initializes a replication connection, by adding appropriate replication parameter to the provided dsn arguments.""" + """ + Initializes a replication connection by adding appropriate + parameters to the provided DSN and tweaking the connection + attributes. + """ - if len(args): - dsn = args[0] + # replication_type is set in subclasses + if self.replication_type == REPLICATION_LOGICAL: + replication = 'database' - # FIXME: could really use parse_dsn here + elif self.replication_type == REPLICATION_PHYSICAL: + replication = 'true' - if dsn.startswith('postgres://') or dsn.startswith('postgresql://'): - # poor man's url parsing - if dsn.rfind('?') > 0: - if not dsn.endswith('?'): - dsn += '&' - else: - dsn += '?' - else: - dsn += ' ' - dsn += 'replication=database' - args = [dsn] + list(args[1:]) else: - dbname = kwargs.get('dbname', None) - if dbname is None: - kwargs['dbname'] = 'replication' + raise psycopg2.ProgrammingError("unrecognized replication type: %s" % self.replication_type) - if kwargs.get('replication', None) is None: - kwargs['replication'] = 'database' if dbname else 'true' + # FIXME: could really use parse_dsn here + dsn = args[0] + if dsn.startswith('postgres://') or dsn.startswith('postgresql://'): + # poor man's url parsing + if dsn.rfind('?') > 0: + if not dsn.endswith('?'): + dsn += '&' + else: + dsn += '?' + else: + dsn += ' ' + dsn += 'replication=%s' % replication + args = [dsn] + list(args[1:]) - super(ReplicationConnection, self).__init__(*args, **kwargs) + super(ReplicationConnectionBase, self).__init__(*args, **kwargs) # prevent auto-issued BEGIN statements if not self.async: self.autocommit = True - def cursor(self, *args, **kwargs): - kwargs.setdefault('cursor_factory', ReplicationCursor) - return super(ReplicationConnection, self).cursor(*args, **kwargs) + if self.cursor_factory is None: + self.cursor_factory = ReplicationCursor + + def quote_ident(self, ident): + # FIXME: use PQescapeIdentifier or psycopg_escape_identifier_easy, somehow + return '"%s"' % ident.replace('"', '""') -"""Streamging replication types.""" -REPLICATION_LOGICAL = "LOGICAL" -REPLICATION_PHYSICAL = "PHYSICAL" +class LogicalReplicationConnection(ReplicationConnectionBase): + + def __init__(self, *args, **kwargs): + self.replication_type = REPLICATION_LOGICAL + super(LogicalReplicationConnection, self).__init__(*args, **kwargs) + + +class PhysicalReplicationConnection(ReplicationConnectionBase): + + def __init__(self, *args, **kwargs): + self.replication_type = REPLICATION_PHYSICAL + super(PhysicalReplicationConnection, self).__init__(*args, **kwargs) + class ReplicationCursor(_cursor): - """A cursor used for replication commands.""" + """A cursor used for communication on the replication protocol.""" def identify_system(self): """Get information about the cluster status.""" @@ -493,47 +518,49 @@ class ReplicationCursor(_cursor): return dict(zip([_.name for _ in self.description], self.fetchall()[0])) - def quote_ident(self, ident): - # FIXME: use PQescapeIdentifier or psycopg_escape_identifier_easy, somehow - return '"%s"' % ident.replace('"', '""') - - def create_replication_slot(self, slot_type, slot_name, output_plugin=None): + def create_replication_slot(self, slot_name, slot_type=None, output_plugin=None): """Create streaming replication slot.""" - command = "CREATE_REPLICATION_SLOT %s " % self.quote_ident(slot_name) + command = "CREATE_REPLICATION_SLOT %s " % self.connection.quote_ident(slot_name) + + if slot_type is None: + slot_type = self.connection.replication_type if slot_type == REPLICATION_LOGICAL: if output_plugin is None: - raise psycopg2.ProgrammingError("output plugin name is required for logical replication slot") + raise psycopg2.ProgrammingError("output plugin name is required to create logical replication slot") - command += "%s %s" % (slot_type, self.quote_ident(output_plugin)) + command += "%s %s" % (slot_type, self.connection.quote_ident(output_plugin)) elif slot_type == REPLICATION_PHYSICAL: if output_plugin is not None: - raise psycopg2.ProgrammingError("cannot specify output plugin name for physical replication slot") + raise psycopg2.ProgrammingError("cannot specify output plugin name when creating physical replication slot") command += slot_type else: - raise psycopg2.ProgrammingError("unrecognized replication slot type: %s" % slot_type) + raise psycopg2.ProgrammingError("unrecognized replication type: %s" % slot_type) self.execute(command) def drop_replication_slot(self, slot_name): """Drop streaming replication slot.""" - command = "DROP_REPLICATION_SLOT %s" % self.quote_ident(slot_name) + command = "DROP_REPLICATION_SLOT %s" % self.connection.quote_ident(slot_name) self.execute(command) - def start_replication(self, slot_type, slot_name=None, writer=None, start_lsn=0, + def start_replication(self, slot_name=None, writer=None, slot_type=None, start_lsn=0, timeline=0, keepalive_interval=10, options=None): """Start and consume replication stream.""" command = "START_REPLICATION " + if slot_type is None: + slot_type = self.connection.replication_type + if slot_type == REPLICATION_LOGICAL: if slot_name: - command += "SLOT %s " % self.quote_ident(slot_name) + command += "SLOT %s " % self.connection.quote_ident(slot_name) else: raise psycopg2.ProgrammingError("slot name is required for logical replication") @@ -541,11 +568,11 @@ class ReplicationCursor(_cursor): elif slot_type == REPLICATION_PHYSICAL: if slot_name: - command += "SLOT %s " % self.quote_ident(slot_name) - + command += "SLOT %s " % self.connection.quote_ident(slot_name) # don't add "PHYSICAL", before 9.4 it was just START_REPLICATION XXX/XXX + else: - raise psycopg2.ProgrammingError("unrecognized replication slot type: %s" % slot_type) + raise psycopg2.ProgrammingError("unrecognized replication type: %s" % slot_type) if type(start_lsn) is str: lsn = start_lsn.split('/') @@ -569,7 +596,7 @@ class ReplicationCursor(_cursor): for k,v in options.iteritems(): if not command.endswith('('): command += ", " - command += "%s %s" % (self.quote_ident(k), _A(str(v))) + command += "%s %s" % (self.connection.quote_ident(k), _A(str(v))) command += ")" return self.start_replication_expert(command, writer=writer,