diff --git a/doc/src/extras.rst b/doc/src/extras.rst index de94e6d0..82a2be18 100644 --- a/doc/src/extras.rst +++ b/doc/src/extras.rst @@ -144,32 +144,40 @@ Logging cursor Replication cursor ^^^^^^^^^^^^^^^^^^ +.. autoclass:: ReplicationConnectionBase + + +The following replication types are defined: + +.. data:: REPLICATION_LOGICAL +.. data:: REPLICATION_PHYSICAL + + .. autoclass:: LogicalReplicationConnection - This connection factory class can be used to open a special type of - connection that is used for logical replication. + This connection factory class can be used to open a special type of + connection that is used for logical replication. - Example:: + Example:: - from psycopg2.extras import LogicalReplicationConnection - log_conn = psycopg2.connect(dsn, connection_factory=LogicalReplicationConnection) - log_cur = log_conn.cursor() + from psycopg2.extras import LogicalReplicationConnection + log_conn = psycopg2.connect(dsn, connection_factory=LogicalReplicationConnection) + log_cur = log_conn.cursor() .. autoclass:: PhysicalReplicationConnection - This connection factory class can be used to open a special type of - connection that is used for physical replication. + This connection factory class can be used to open a special type of + connection that is used for physical replication. - Example:: + Example:: - from psycopg2.extras import PhysicalReplicationConnection - phys_conn = psycopg2.connect(dsn, connection_factory=PhysicalReplicationConnection) - phys_cur = phys_conn.cursor() + from psycopg2.extras import PhysicalReplicationConnection + phys_conn = psycopg2.connect(dsn, connection_factory=PhysicalReplicationConnection) + phys_cur = phys_conn.cursor() - - Both `LogicalReplicationConnection` and `PhysicalReplicationConnection` use - `ReplicationCursor` for actual communication on the connection. + Both `LogicalReplicationConnection` and `PhysicalReplicationConnection` use + `ReplicationCursor` for actual communication on the connection. .. seealso:: @@ -177,160 +185,237 @@ Replication cursor .. __: http://www.postgresql.org/docs/current/static/protocol-replication.html + +The individual messages in the replication stream are presented by +`ReplicationMessage` objects: + +.. autoclass:: ReplicationMessage + + .. attribute:: payload + + The actual data received from the server. An instance of either + ``str`` or ``unicode``, depending on the method that was used to + produce this message. + + .. attribute:: data_size + + The raw size of the message payload (before possible unicode + conversion). + + .. attribute:: data_start + + LSN position of the start of the message. + + .. attribute:: wal_end + + LSN position of the current end of WAL on the server. + + .. attribute:: send_time + + A `~datetime` object representing the server timestamp at the moment + when the message was sent. + + .. attribute:: cursor + + A reference to the corresponding `ReplicationCursor` object. + + .. autoclass:: ReplicationCursor .. method:: identify_system() - Execute ``IDENTIFY_SYSTEM`` command of the streaming replication - protocol and return the result as a dictionary. + Execute ``IDENTIFY_SYSTEM`` command of the streaming replication + protocol and return the result as a dictionary. - Example:: + Example:: - >>> cur.identify_system() - {'timeline': 1, 'systemid': '1234567890123456789', 'dbname': 'test', 'xlogpos': '0/1ABCDEF'} + >>> cur.identify_system() + {'timeline': 1, 'systemid': '1234567890123456789', 'dbname': 'test', 'xlogpos': '0/1ABCDEF'} .. method:: create_replication_slot(slot_name, output_plugin=None) - Create streaming replication slot. + Create streaming replication slot. - :param slot_name: name of the replication slot to be created - :param slot_type: type of replication: should be either - `REPLICATION_LOGICAL` or `REPLICATION_PHYSICAL` - :param output_plugin: name of the logical decoding output plugin to be - used by the slot; required for logical - replication connections, disallowed for physical + :param slot_name: name of the replication slot to be created + :param slot_type: type of replication: should be either + `REPLICATION_LOGICAL` or `REPLICATION_PHYSICAL` + :param output_plugin: name of the logical decoding output plugin to be + used by the slot; required for logical + replication connections, disallowed for physical - Example:: + Example:: - log_cur.create_replication_slot("logical1", "test_decoding") - phys_cur.create_replication_slot("physical1") + log_cur.create_replication_slot("logical1", "test_decoding") + phys_cur.create_replication_slot("physical1") - # either logical or physical replication connection - cur.create_replication_slot("slot1", slot_type=REPLICATION_LOGICAL) + # either logical or physical replication connection + cur.create_replication_slot("slot1", slot_type=REPLICATION_LOGICAL) - When creating a slot on a logical replication connection, a logical - replication slot is created by default. Logical replication requires - name of the logical decoding output plugin to be specified. + When creating a slot on a logical replication connection, a logical + replication slot is created by default. Logical replication requires + name of the logical decoding output plugin to be specified. - When creating a slot on a physical replication connection, a physical - replication slot is created by default. No output plugin parameter is - required or allowed when creating a physical replication slot. + When creating a slot on a physical replication connection, a physical + replication slot is created by default. No output plugin parameter is + required or allowed when creating a physical replication slot. - In either case, the type of slot being created can be specified - explicitly using *slot_type* parameter. + In either case, the type of slot being created can be specified + explicitly using *slot_type* parameter. - Replication slots are a feature of PostgreSQL server starting with - version 9.4. + Replication slots are a feature of PostgreSQL server starting with + version 9.4. .. method:: drop_replication_slot(slot_name) - Drop streaming replication slot. + Drop streaming replication slot. - :param slot_name: name of the replication slot to drop + :param slot_name: name of the replication slot to drop - Example:: + Example:: - # either logical or physical replication connection - cur.drop_replication_slot("slot1") + # either logical or physical replication connection + cur.drop_replication_slot("slot1") - This - - Replication slots are a feature of PostgreSQL server starting with - version 9.4. + Replication slots are a feature of PostgreSQL server starting with + version 9.4. - .. method:: start_replication(slot_name=None, writer=None, slot_type=None, start_lsn=0, timeline=0, keepalive_interval=10, options=None) + .. method:: start_replication(slot_name=None, slot_type=None, start_lsn=0, timeline=0, options=None) - Start replication on the connection. + Start replication on the connection. - :param slot_name: name of the replication slot to use; required for - logical replication, physical replication can work - with or without a slot - :param writer: a file-like object to write replication messages to - :param slot_type: type of replication: should be either - `REPLICATION_LOGICAL` or `REPLICATION_PHYSICAL` - :param start_lsn: the optional LSN position to start replicating from, - can be an integer or a string of hexadecimal digits - in the form ``XXX/XXX`` - :param timeline: WAL history timeline to start streaming from (optional, - can only be used with physical replication) - :param keepalive_interval: interval (in seconds) to send keepalive - messages to the server - :param options: a dictionary of options to pass to logical replication - slot (not allowed with physical replication, set to - *None*) + :param slot_name: name of the replication slot to use; required for + logical replication, physical replication can work + with or without a slot + :param slot_type: type of replication: should be either + `REPLICATION_LOGICAL` or `REPLICATION_PHYSICAL` + :param start_lsn: the optional LSN position to start replicating from, + can be an integer or a string of hexadecimal digits + in the form ``XXX/XXX`` + :param timeline: WAL history timeline to start streaming from (optional, + can only be used with physical replication) + :param options: a dictionary of options to pass to logical replication + slot (not allowed with physical replication) - If not specified using *slot_type* parameter, the type of replication - to be started is defined by the type of replication connection. - Logical replication is only allowed on logical replication connection, - but physical replication can be used with both types of connection. + If a *slot_name* is specified, the slot must exist on the server and + its type must match the replication type used. - On the other hand, physical replication doesn't require a named - replication slot to be used, only logical one does. In any case, - logical replication and replication slots are a feature of PostgreSQL - server starting with version 9.4. Physical replication can be used - starting with 9.0. + If not specified using *slot_type* parameter, the type of replication + is defined by the type of replication connection. Logical replication + is only allowed on logical replication connection, but physical + replication can be used with both types of connection. - If a *slot_name* is specified, the slot must exist on the server and - its type must match the replication type used. + On the other hand, physical replication doesn't require a named + replication slot to be used, only logical one does. In any case, + logical replication and replication slots are a feature of PostgreSQL + server starting with version 9.4. Physical replication can be used + starting with 9.0. - When used on non-asynchronous connection this method enters an endless - loop, reading messages from the server and passing them to ``write()`` - method of the *writer* object. This is similar to operation of the - `~cursor.copy_to()` method. It also sends keepalive messages to the - server, in case there were no new data from it for the duration of - *keepalive_interval* seconds (this parameter's value must be equal to - at least than 1 second, but it can have a fractional part). + If *start_lsn* is specified, the requested stream will start from that + LSN. The default is `!None`, which passes the LSN ``0/0``, causing + replay to begin at the last point at which the server got replay + confirmation from the client for, or the oldest available point for a + new slot. - With asynchronous connection, this method returns immediately and the - calling code can start reading the replication messages in a loop. + The server might produce an error if a WAL file for the given LSN has + already been recycled, or it may silently start streaming from a later + position: the client can verify the actual position using information + provided the `ReplicationMessage` attributes. The exact server + behavior depends on the type of replication and use of slots. - A sketch implementation of the *writer* object for logical replication - might look similar to the following:: + A *timeline* parameter can only be specified with physical replication + and only starting with server version 9.3. - from io import TextIOBase + A dictionary of *options* may be passed to the logical decoding plugin + on a logical replication slot. The set of supported options depends + on the output plugin that was used to create the slot. Must be + `!None` for physical replication. - class LogicalStreamWriter(TextIOBase): + This function constructs a ``START_REPLICATION`` command and calls + `start_replication_expert()` internally. - def write(self, msg): - self.store_message_data(msg.payload) + After starting the replication, to actually consume the incoming + server messages, use `consume_replication_stream()` or implement a + loop around `read_replication_message()` in case of asynchronous + connection. - if self.should_report_to_the_server_now(msg): - msg.cursor.send_replication_feedback(flush_lsn=msg.wal_end) + .. method:: start_replication_expert(command) - First, like with the `~cursor.copy_to()` method, the code that calls - the provided ``write()`` method checks if the *writer* object is - inherited from `~io.TextIOBase`. If that is the case, the message - payload to be passed is converted to unicode using the connection's - `~connection.encoding` information. Otherwise, the message is passed - as is. + Start replication on the connection using provided ``START_REPLICATION`` + command. - The *msg* object being passed is an instance of `~ReplicationMessage` - class. + .. method:: consume_replication_stream(consumer, decode=False, keepalive_interval=10) - After storing certain amount of messages' data reliably, the client - should send a confirmation message to the server. This should be done - by calling `~send_replication_feedback()` method on the corresponding - replication cursor. A reference to the cursor is provided in the - `~ReplicationMessage` as an attribute. + :param consumer: an object providing ``consume()`` method + :param decode: a flag indicating that unicode conversion should be + performed on the messages received from the server + :param keepalive_interval: interval (in seconds) to send keepalive + messages to the server - .. warning:: + This method can only be used with synchronous connection. For + asynchronous connections see `read_replication_message()`. - Failure to properly notify the server by constantly consuming and - reporting success at appropriate times can eventually lead to "disk - full" condition on the server, because the server retains all the - WAL segments that might be needed to stream the changes via all of - the currently open replication slots. + Before calling this method to consume the stream, use + `start_replication()` first. - On the other hand, it is not recommended to send a confirmation - after every processed message, since that will put an unnecessary - load on network and the server. A possible strategy is to confirm - after every COMMIT message. + When called, this method enters an endless loop, reading messages from + the server and passing them to ``consume()`` method of the *consumer* + object. In order to make this method break out of the loop and + return, the ``consume()`` method can call `stop_replication()` on the + cursor or it can throw an exception. + + If *decode* is set to `!True`, the messages read from the server are + converted according to the connection `~connection.encoding`. This + parameter should not be set with physical replication. + + This method also sends keepalive messages to the server, in case there + were no new data from the server for the duration of + *keepalive_interval* (in seconds). The value of this parameter must + be equal to at least 1 second, but it can have a fractional part. + + The following example is a sketch implementation of *consumer* object + for logical replication:: + + class LogicalStreamConsumer(object): + + def consume(self, msg): + self.store_message_data(msg.payload) + + if self.should_report_to_the_server_now(msg): + msg.cursor.send_replication_feedback(flush_lsn=msg.data_start) + + consumer = LogicalStreamConsumer() + cur.consume_replication_stream(consumer, decode=True) + + The *msg* objects passed to the ``consume()`` method are instances of + `ReplicationMessage` class. + + After storing certain amount of messages' data reliably, the client + should send a confirmation message to the server. This should be done + by calling `send_replication_feedback()` method on the corresponding + replication cursor. A reference to the cursor is provided in the + `ReplicationMessage` as an attribute. + + .. warning:: + + When using replication with slots, failure to properly notify the + server by constantly consuming and reporting success at + appropriate times can eventually lead to "disk full" condition on + the server, because the server retains all the WAL segments that + might be needed to stream the changes via all of the currently + open replication slots. + + On the other hand, it is not recommended to send a confirmation + after every processed message, since that will put an unnecessary + load on network and the server. A possible strategy is to confirm + after every COMMIT message. .. method:: stop_replication() - In non-asynchronous connection, when called from the ``write()`` - method, tell the code in `~start_replication` to break out of the - endless loop and return. + This method can be called on synchronous connections from the + ``consume()`` method of a ``consumer`` object in order to break out of + the endless loop in `consume_replication_stream()`. If called on + asynchronous connection or outside of the consume loop, this method + raises an error. .. method:: send_replication_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False) @@ -344,29 +429,37 @@ Replication cursor :param reply: request the server to send back a keepalive message immediately Use this method to report to the server that all messages up to a - certain LSN position have been stored and may be discarded. + certain LSN position have been stored on the client and may be + discarded on the server. This method can also be called with all default parameters' values to - send a keepalive message to the server. + just send a keepalive message to the server. - In case of asynchronous connection, if the feedback message cannot be - sent at the moment, remembers the passed LSN positions for a later - hopefully successful call or call to `~flush_replication_feedback()`. + If the feedback message could not be sent, updates the passed LSN + positions in the cursor for a later call to + `flush_replication_feedback()` and returns `!False`, otherwise returns + `!True`. .. method:: flush_replication_feedback(reply=False) :param reply: request the server to send back a keepalive message immediately This method tries to flush the latest replication feedback message - that `~send_replication_feedback()` was trying to send, if any. + that `send_replication_feedback()` was trying to send but couldn't. + + If *reply* is `!True` sends a keepalive message in either case. + + Returns `!True` if the feedback message was sent successfully, + `!False` otherwise. Low-level methods for asynchronous connection operation. - With the non-asynchronous connection, a single call to - `~start_replication()` handles all the complexity, but at times it might - be beneficial to use low-level interface for better control, in particular - to `~select.select()` on multiple sockets. The following methods are - provided for asynchronous operation: + With the synchronous connection, a call to `consume_replication_stream()` + handles all the complexity of handling the incoming messages and sending + keepalive replies, but at times it might be beneficial to use low-level + interface for better control, in particular to `~select.select()` on + multiple sockets. The following methods are provided for asynchronous + operation: .. method:: read_replication_message(decode=True) @@ -374,18 +467,18 @@ Replication cursor performed on the data received from the server This method should be used in a loop with asynchronous connections - after calling `~start_replication()` once. + after calling `start_replication()` once. It tries to read the next message from the server, without blocking - and returns an instance of `~ReplicationMessage` or *None*, in case + and returns an instance of `ReplicationMessage` or `!None`, in case there are no more data messages from the server at the moment. It is expected that the calling code will call this method repeatedly in order to consume all of the messages that might have been buffered, - until *None* is returned. After receiving a *None* value from this - method, one might use `~select.select()` or `~select.poll()` on the - corresponding connection to block the process until there is more data - from the server. + until `!None` is returned. After receiving a `!None` value from this + method, the caller should use `~select.select()` or `~select.poll()` + on the corresponding connection to block the process until there is + more data from the server. The server can send keepalive messages to the client periodically. Such messages are silently consumed by this method and are never @@ -409,45 +502,19 @@ Replication cursor keepalive_interval = 10.0 while True: - if (datetime.now() - cur.replication_io_timestamp).total_seconds() >= keepalive_interval: - cur.send_replication_feedback() + msg = cur.read_replication_message() + if msg: + consumer.consume(msg) + else: + timeout = keepalive_interval - (datetime.now() - cur.replication_io_timestamp).total_seconds() + if timeout > 0: + sel = select.select([cur], [], [], timeout) + else: + sel = [] - while True: - msg = cur.read_replication_message() - if not msg: - break - writer.write(msg) + if not sel: + cur.send_replication_feedback() - timeout = keepalive_interval - (datetime.now() - cur.replication_io_timestamp).total_seconds() - if timeout > 0: - select.select([cur], [], [], timeout) - -.. autoclass:: ReplicationMessage - - .. attribute:: payload - - The actual data received from the server. An instance of either - ``str`` or ``unicode``. - - .. attribute:: data_start - - LSN position of the start of the message. - - .. attribute:: wal_end - - LSN position of the end of the message. - - .. attribute:: send_time - - A `~datetime` object representing the server timestamp at the moment - when the message was sent. - - .. attribute:: cursor - - A reference to the corresponding `~ReplicationCursor` object. - -.. data:: REPLICATION_LOGICAL -.. data:: REPLICATION_PHYSICAL .. index:: pair: Cursor; Replication diff --git a/lib/extras.py b/lib/extras.py index 998c792f..c05536ad 100644 --- a/lib/extras.py +++ b/lib/extras.py @@ -544,9 +544,9 @@ class ReplicationCursor(_cursor): command = "DROP_REPLICATION_SLOT %s" % self.connection.quote_ident(slot_name) self.execute(command) - def start_replication(self, slot_name=None, writer=None, slot_type=None, start_lsn=0, - timeline=0, keepalive_interval=10, options=None): - """Start and consume replication stream.""" + def start_replication(self, slot_name=None, slot_type=None, start_lsn=0, + timeline=0, options=None): + """Start replication stream.""" command = "START_REPLICATION " @@ -594,8 +594,7 @@ class ReplicationCursor(_cursor): command += "%s %s" % (self.connection.quote_ident(k), _A(str(v))) command += ")" - return self.start_replication_expert(command, writer=writer, - keepalive_interval=keepalive_interval) + return self.start_replication_expert(command) def send_feedback_message(self, written_lsn=0, sync_lsn=0, apply_lsn=0, reply_requested=False): return self.send_replication_feedback(written_lsn, sync_lsn, apply_lsn, reply_requested) diff --git a/psycopg/cursor.h b/psycopg/cursor.h index dd07243f..941e279e 100644 --- a/psycopg/cursor.h +++ b/psycopg/cursor.h @@ -73,7 +73,9 @@ struct cursorObject { #define DEFAULT_COPYSIZE 16384 #define DEFAULT_COPYBUFF 8192 - int repl_stop; /* if client requested to stop replication */ + /* replication cursor attrs */ + int repl_started:1; /* if replication is started */ + int repl_stop:1; /* if client requested to stop replication */ struct timeval repl_keepalive_interval; /* interval for keepalive messages in replication mode */ XLogRecPtr repl_write_lsn; /* LSN stats for replication feedback messages */ XLogRecPtr repl_flush_lsn; diff --git a/psycopg/cursor_type.c b/psycopg/cursor_type.c index 9de5b085..d033a3df 100644 --- a/psycopg/cursor_type.c +++ b/psycopg/cursor_type.c @@ -36,6 +36,7 @@ #include "psycopg/microprotocols_proto.h" #include + #include /* python */ @@ -1588,13 +1589,11 @@ exit: static PyObject * psyco_curs_start_replication_expert(cursorObject *self, PyObject *args, PyObject *kwargs) { - PyObject *writer = NULL, *res = NULL; + PyObject *res = NULL; char *command; - double keepalive_interval = 10; - static char *kwlist[] = {"command", "writer", "keepalive_interval", NULL}; + static char *kwlist[] = {"command", NULL}; - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|Od", kwlist, - &command, &writer, &keepalive_interval)) { + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s", kwlist, &command)) { return NULL; } @@ -1602,21 +1601,15 @@ psyco_curs_start_replication_expert(cursorObject *self, PyObject *args, PyObject EXC_IF_GREEN(start_replication_expert); EXC_IF_TPC_PREPARED(self->conn, start_replication_expert); - Dprintf("psyco_curs_start_replication_expert: command = %s", command); - - if (keepalive_interval < 1.0) { - psyco_set_error(ProgrammingError, self, "keepalive_interval must be >= 1sec"); + if (self->repl_started) { + psyco_set_error(ProgrammingError, self, "replication already in progress"); return NULL; } - self->copysize = 0; - Py_XINCREF(writer); - self->copyfile = writer; + Dprintf("psyco_curs_start_replication_expert: command = %s", command); + self->copysize = 0; self->repl_stop = 0; - self->repl_keepalive_interval.tv_sec = (int)keepalive_interval; - self->repl_keepalive_interval.tv_usec = - (keepalive_interval - (int)keepalive_interval)*1.0e6; self->repl_write_lsn = InvalidXLogRecPtr; self->repl_flush_lsn = InvalidXLogRecPtr; @@ -1631,7 +1624,7 @@ psyco_curs_start_replication_expert(cursorObject *self, PyObject *args, PyObject Py_INCREF(res); } - Py_CLEAR(self->copyfile); + self->repl_started = 1; return res; } @@ -1643,12 +1636,54 @@ static PyObject * psyco_curs_stop_replication(cursorObject *self) { EXC_IF_CURS_CLOSED(self); + EXC_IF_CURS_ASYNC(self, stop_replication); + + if (!self->repl_started || self->repl_stop) { + psyco_set_error(ProgrammingError, self, "replication is not in progress"); + return NULL; + } self->repl_stop = 1; Py_RETURN_NONE; } +#define psyco_curs_consume_replication_stream_doc \ +"consume_replication_stream(consumer, keepalive_interval=10) -- Consume replication stream." + +static PyObject * +psyco_curs_consume_replication_stream(cursorObject *self, PyObject *args, PyObject *kwargs) +{ + PyObject *consumer = NULL, *res = NULL; + int decode = 0; + double keepalive_interval = 10; + static char *kwlist[] = {"consumer", "decode", "keepalive_interval", NULL}; + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|id", kwlist, + &consumer, &decode, &keepalive_interval)) { + return NULL; + } + + EXC_IF_CURS_CLOSED(self); + EXC_IF_CURS_ASYNC(self, consume_replication_stream); + EXC_IF_GREEN(consume_replication_stream); + EXC_IF_TPC_PREPARED(self->conn, consume_replication_stream); + + Dprintf("psyco_curs_consume_replication_stream"); + + if (keepalive_interval < 1.0) { + psyco_set_error(ProgrammingError, self, "keepalive_interval must be >= 1 (sec)"); + return NULL; + } + + if (pq_copy_both(self, consumer, decode, keepalive_interval) >= 0) { + res = Py_None; + Py_INCREF(res); + } + + return res; +} + #define psyco_curs_read_replication_message_doc \ "read_replication_message(decode=True) -- Try reading a replication message from the server (non-blocking)." @@ -1673,7 +1708,7 @@ psyco_curs_read_replication_message(cursorObject *self, PyObject *args, PyObject static PyObject * curs_flush_replication_feedback(cursorObject *self, int reply) { - if (!self->repl_feedback_pending) + if (!(self->repl_feedback_pending || reply)) Py_RETURN_FALSE; if (pq_send_replication_feedback(self, reply)) { @@ -1939,6 +1974,8 @@ static struct PyMethodDef cursorObject_methods[] = { METH_VARARGS|METH_KEYWORDS, psyco_curs_start_replication_expert_doc}, {"stop_replication", (PyCFunction)psyco_curs_stop_replication, METH_NOARGS, psyco_curs_stop_replication_doc}, + {"consume_replication_stream", (PyCFunction)psyco_curs_consume_replication_stream, + METH_VARARGS|METH_KEYWORDS, psyco_curs_consume_replication_stream_doc}, {"read_replication_message", (PyCFunction)psyco_curs_read_replication_message, METH_VARARGS|METH_KEYWORDS, psyco_curs_read_replication_message_doc}, {"send_replication_feedback", (PyCFunction)psyco_curs_send_replication_feedback, diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index b524b14a..4f1427de 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -1531,18 +1531,28 @@ exit: return ret; } -/* ignores keepalive messages */ +/* Tries to read the next message from the replication stream, without + blocking, in both sync and async connection modes. If no message + is ready in the CopyData buffer, tries to read from the server, + again without blocking. If that doesn't help, returns Py_None. + The caller is then supposed to block on the socket(s) and call this + function again. + + Any keepalive messages from the server are silently consumed and + are never returned to the caller. + */ PyObject * pq_read_replication_message(cursorObject *curs, int decode) { char *buffer = NULL; - int len, consumed = 0, hdr, reply; + int len, data_size, consumed, hdr, reply; XLogRecPtr data_start, wal_end; pg_int64 send_time; PyObject *str = NULL, *msg = NULL; Dprintf("pq_read_replication_message(decode=%d)", decode); + consumed = 0; retry: len = PQgetCopyData(curs->conn->pgconn, &buffer, 1 /* async */); @@ -1570,10 +1580,12 @@ retry: } if (len == -2) { + /* serious error */ pq_raise(curs->conn, curs, NULL); goto exit; } if (len == -1) { + /* EOF */ curs->pgres = PQgetResult(curs->conn->pgconn); if (curs->pgres && PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR) { @@ -1595,13 +1607,14 @@ retry: Dprintf("pq_read_replication_message: msg=%c, len=%d", buffer[0], len); if (buffer[0] == 'w') { - /* msgtype(1), dataStart(8), walEnd(8), sendTime(8) */ + /* XLogData: msgtype(1), dataStart(8), walEnd(8), sendTime(8) */ hdr = 1 + 8 + 8 + 8; if (len < hdr + 1) { psyco_set_error(OperationalError, curs, "data message header too small"); goto exit; } + data_size = len - hdr; data_start = fe_recvint64(buffer + 1); wal_end = fe_recvint64(buffer + 1 + 8); send_time = fe_recvint64(buffer + 1 + 8 + 8); @@ -1609,12 +1622,13 @@ retry: Dprintf("pq_read_replication_message: data_start="XLOGFMTSTR", wal_end="XLOGFMTSTR, XLOGFMTARGS(data_start), XLOGFMTARGS(wal_end)); - Dprintf("pq_read_replication_message: >>%.*s<<", len - hdr, buffer + hdr); + Dprintf("pq_read_replication_message: >>%.*s<<", data_size, buffer + hdr); + /* XXX it would be wise to check if it's really a logical replication */ if (decode) { - str = PyUnicode_Decode(buffer + hdr, len - hdr, curs->conn->codec, NULL); + str = PyUnicode_Decode(buffer + hdr, data_size, curs->conn->codec, NULL); } else { - str = Bytes_FromStringAndSize(buffer + hdr, len - hdr); + str = Bytes_FromStringAndSize(buffer + hdr, data_size); } if (!str) { goto exit; } @@ -1623,12 +1637,13 @@ retry: Py_DECREF(str); if (!msg) { goto exit; } + ((replicationMessageObject *)msg)->data_size = data_size; ((replicationMessageObject *)msg)->data_start = data_start; ((replicationMessageObject *)msg)->wal_end = wal_end; ((replicationMessageObject *)msg)->send_time = send_time; } else if (buffer[0] == 'k') { - /* msgtype(1), walEnd(8), sendTime(8), reply(1) */ + /* Primary keepalive message: msgtype(1), walEnd(8), sendTime(8), reply(1) */ hdr = 1 + 8 + 8; if (len < hdr + 1) { psyco_set_error(OperationalError, curs, "keepalive message header too small"); @@ -1641,6 +1656,7 @@ retry: if (curs->conn->async) { curs->repl_feedback_pending = 1; } else { + /* XXX not sure if this was a good idea after all */ pq_raise(curs->conn, curs, NULL); goto exit; } @@ -1699,38 +1715,36 @@ pq_send_replication_feedback(cursorObject* curs, int reply_requested) return 1; } -/* used for streaming replication only */ -static int -_pq_copy_both_v3(cursorObject *curs) +/* Calls pq_read_replication_message in an endless loop, until + stop_replication is called or a fatal error occurs. The messages + are passed to the consumer object. + + When no message is available, blocks on the connection socket, but + manages to send keepalive messages to the server as needed. +*/ +int +pq_copy_both(cursorObject *curs, PyObject *consumer, int decode, double keepalive_interval) { PyObject *msg, *tmp = NULL; - PyObject *write_func = NULL; - int is_text, fd, sel, ret = -1; + PyObject *consume_func = NULL; + int fd, sel, ret = -1; PGconn *pgconn; fd_set fds; - struct timeval curr_time, ping_time, time_diff; + struct timeval keep_intr, curr_time, ping_time, timeout; - if (!curs->copyfile) { - psyco_set_error(ProgrammingError, curs, - "can't execute START_REPLICATION directly: use the start_replication() method instead"); - goto exit; - } - - if (!(write_func = PyObject_GetAttrString(curs->copyfile, "write"))) { - Dprintf("_pq_copy_both_v3: can't get o.write"); - goto exit; - } - - /* if the file is text we must pass it unicode. */ - if (-1 == (is_text = psycopg_is_text_file(curs->copyfile))) { + if (!(consume_func = PyObject_GetAttrString(consumer, "consume"))) { + Dprintf("pq_copy_both: can't get o.consume"); goto exit; } CLEARPGRES(curs->pgres); pgconn = curs->conn->pgconn; + keep_intr.tv_sec = (int)keepalive_interval; + keep_intr.tv_usec = (keepalive_interval - keep_intr.tv_sec)*1.0e6; + while (1) { - msg = pq_read_replication_message(curs, is_text); + msg = pq_read_replication_message(curs, decode); if (!msg) { goto exit; } @@ -1748,14 +1762,12 @@ _pq_copy_both_v3(cursorObject *curs) gettimeofday(&curr_time, NULL); - ping_time = curs->repl_last_io; - ping_time.tv_sec += curs->repl_keepalive_interval.tv_sec; - ping_time.tv_usec += curs->repl_keepalive_interval.tv_usec; + timeradd(&curs->repl_last_io, &keep_intr, &ping_time); + timersub(&ping_time, &curr_time, &timeout); - timersub(&ping_time, &curr_time, &time_diff); - if (time_diff.tv_sec > 0) { + if (timeout.tv_sec >= 0) { Py_BEGIN_ALLOW_THREADS; - sel = select(fd + 1, &fds, NULL, NULL, &time_diff); + sel = select(fd + 1, &fds, NULL, NULL, &timeout); Py_END_ALLOW_THREADS; } else { @@ -1782,17 +1794,17 @@ _pq_copy_both_v3(cursorObject *curs) continue; } else { - tmp = PyObject_CallFunctionObjArgs(write_func, msg, NULL); + tmp = PyObject_CallFunctionObjArgs(consume_func, msg, NULL); Py_DECREF(msg); if (tmp == NULL) { - Dprintf("_pq_copy_both_v3: write_func returned NULL"); + Dprintf("pq_copy_both: consume_func returned NULL"); goto exit; } Py_DECREF(tmp); if (curs->repl_stop) { - Dprintf("_pq_copy_both_v3: repl_stop flag set by write_func"); + Dprintf("pq_copy_both: repl_stop flag set by consume_func"); break; } } @@ -1801,7 +1813,7 @@ _pq_copy_both_v3(cursorObject *curs) ret = 1; exit: - Py_XDECREF(write_func); + Py_XDECREF(consume_func); return ret; } @@ -1867,13 +1879,14 @@ pq_fetch(cursorObject *curs, int no_result) case PGRES_COPY_BOTH: Dprintf("pq_fetch: data from a streaming replication slot (no tuples)"); curs->rowcount = -1; - if (curs->conn->async) { + ex = 0; + /*if (curs->conn->async) { ex = 0; } else { ex = _pq_copy_both_v3(curs); - /* error caught by out glorious notice handler */ + if (PyErr_Occurred()) ex = -1; - } + }*/ CLEARPGRES(curs->pgres); break; diff --git a/psycopg/pqpath.h b/psycopg/pqpath.h index 9a348bc2..a858a269 100644 --- a/psycopg/pqpath.h +++ b/psycopg/pqpath.h @@ -72,6 +72,8 @@ HIDDEN int pq_execute_command_locked(connectionObject *conn, RAISES HIDDEN void pq_complete_error(connectionObject *conn, PGresult **pgres, char **error); +HIDDEN int pq_copy_both(cursorObject *curs, PyObject *consumer, + int decode, double keepalive_interval); HIDDEN PyObject *pq_read_replication_message(cursorObject *curs, int decode); HIDDEN int pq_send_replication_feedback(cursorObject *curs, int reply_requested); diff --git a/psycopg/replication_message.h b/psycopg/replication_message.h index a7567a1d..201b9fb4 100644 --- a/psycopg/replication_message.h +++ b/psycopg/replication_message.h @@ -42,6 +42,7 @@ struct replicationMessageObject { cursorObject *cursor; PyObject *payload; + int data_size; XLogRecPtr data_start; XLogRecPtr wal_end; pg_int64 send_time; diff --git a/psycopg/replication_message_type.c b/psycopg/replication_message_type.c index edfe6c16..61833931 100644 --- a/psycopg/replication_message_type.c +++ b/psycopg/replication_message_type.c @@ -49,8 +49,9 @@ static PyObject * replmsg_repr(replicationMessageObject *self) { return PyString_FromFormat( - "", - self, XLOGFMTARGS(self->data_start), XLOGFMTARGS(self->wal_end), self->send_time); + "", + self, self->data_size, XLOGFMTARGS(self->data_start), XLOGFMTARGS(self->wal_end), + self->send_time); } static int @@ -63,8 +64,10 @@ replmsg_init(PyObject *obj, PyObject *args, PyObject *kwargs) Py_XINCREF(self->cursor); Py_XINCREF(self->payload); + self->data_size = 0; self->data_start = 0; self->wal_end = 0; + self->send_time = 0; return 0; } @@ -125,6 +128,8 @@ static struct PyMemberDef replicationMessageObject_members[] = { "TODO"}, {"payload", T_OBJECT, OFFSETOF(payload), READONLY, "TODO"}, + {"data_size", T_INT, OFFSETOF(data_size), READONLY, + "TODO"}, {"data_start", T_ULONGLONG, OFFSETOF(data_start), READONLY, "TODO"}, {"wal_end", T_ULONGLONG, OFFSETOF(wal_end), READONLY,