diff --git a/doc/src/extras.rst b/doc/src/extras.rst index 4755cc72..ddf989d7 100644 --- a/doc/src/extras.rst +++ b/doc/src/extras.rst @@ -348,9 +348,11 @@ The individual messages in the replication stream are presented by `start_replication()` first. When called, this method enters an endless loop, reading messages from - the server and passing them to ``consume()``. In order to make this - method break out of the loop and return, ``consume()`` can call - `stop_replication()` on the cursor or it can throw an exception. + the server and passing them to ``consume()``, then waiting for more + messages from the server. In order to make this method break out of + the loop and return, ``consume()`` can throw a `StopReplication` + exception (any unhandled exception will make it break out of the loop + as well). If *decode* is set to `!True`, the messages read from the server are converted according to the connection `~connection.encoding`. This @@ -398,13 +400,6 @@ The individual messages in the replication stream are presented by load on network and the server. A possible strategy is to confirm after every COMMIT message. - .. method:: stop_replication() - - This method can be called on synchronous connection from the - ``consume()`` callable in order to break out of the endless loop in - `consume_replication_stream()`. If called on asynchronous connection - or when replication is not in progress, this method raises an error. - .. method:: send_replication_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False) :param write_lsn: a LSN position up to which the client has written the data locally @@ -506,10 +501,11 @@ The individual messages in the replication stream are presented by if not sel[0]: cur.send_replication_feedback() - .. index:: pair: Cursor; Replication +.. autoclass:: StopReplication + .. index:: single: Data types; Additional diff --git a/lib/extras.py b/lib/extras.py index dc2d5e65..8854ec2b 100644 --- a/lib/extras.py +++ b/lib/extras.py @@ -500,6 +500,18 @@ class PhysicalReplicationConnection(ReplicationConnectionBase): super(PhysicalReplicationConnection, self).__init__(*args, **kwargs) +class StopReplication(Exception): + """ + Exception used to break out of the endless loop in + `~ReplicationCursor.consume_replication_stream()`. + + Subclass of `~exceptions.Exception`. Intentionally *not* inherited from + `~psycopg2.Error` as occurrence of this exception does not indicate an + error. + """ + pass + + class ReplicationCursor(_cursor): """A cursor used for communication on the replication protocol.""" diff --git a/psycopg/cursor.h b/psycopg/cursor.h index 3f125998..669e176d 100644 --- a/psycopg/cursor.h +++ b/psycopg/cursor.h @@ -75,7 +75,6 @@ struct cursorObject { /* replication cursor attrs */ int repl_started:1; /* if replication is started */ - int repl_stop:1; /* if client requested to stop replication */ int repl_consuming:1; /* if running the consume loop */ struct timeval repl_keepalive_interval; /* interval for keepalive messages in replication mode */ XLogRecPtr repl_write_lsn; /* LSN stats for replication feedback messages */ diff --git a/psycopg/cursor_type.c b/psycopg/cursor_type.c index f4598873..d51f7a55 100644 --- a/psycopg/cursor_type.c +++ b/psycopg/cursor_type.c @@ -1605,7 +1605,6 @@ psyco_curs_start_replication_expert(cursorObject *self, PyObject *args, PyObject Dprintf("psyco_curs_start_replication_expert: %s", command); self->copysize = 0; - self->repl_stop = 0; self->repl_consuming = 0; self->repl_write_lsn = InvalidXLogRecPtr; @@ -1626,21 +1625,6 @@ psyco_curs_start_replication_expert(cursorObject *self, PyObject *args, PyObject return res; } -#define psyco_curs_stop_replication_doc \ -"stop_replication() -- Set flag to break out of the endless loop in consume_replication_stream()." - -static PyObject * -psyco_curs_stop_replication(cursorObject *self) -{ - EXC_IF_CURS_CLOSED(self); - EXC_IF_CURS_ASYNC(self, stop_replication); - EXC_IF_NOT_REPLICATING(self, stop_replication); - - self->repl_stop = 1; - - Py_RETURN_NONE; -} - #define psyco_curs_consume_replication_stream_doc \ "consume_replication_stream(consumer, keepalive_interval=10) -- Consume replication stream." @@ -1684,7 +1668,6 @@ psyco_curs_consume_replication_stream(cursorObject *self, PyObject *args, PyObje } self->repl_consuming = 0; - self->repl_stop = 0; /* who knows, what if we will be called again? */ return res; } @@ -1992,8 +1975,6 @@ static struct PyMethodDef cursorObject_methods[] = { METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_expert_doc}, {"start_replication_expert", (PyCFunction)psyco_curs_start_replication_expert, METH_VARARGS|METH_KEYWORDS, psyco_curs_start_replication_expert_doc}, - {"stop_replication", (PyCFunction)psyco_curs_stop_replication, - METH_NOARGS, psyco_curs_stop_replication_doc}, {"consume_replication_stream", (PyCFunction)psyco_curs_consume_replication_stream, METH_VARARGS|METH_KEYWORDS, psyco_curs_consume_replication_stream_doc}, {"read_replication_message", (PyCFunction)psyco_curs_read_replication_message, diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index 111eb875..f38fbd39 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -1743,7 +1743,7 @@ pq_copy_both(cursorObject *curs, PyObject *consume, int decode, double keepalive keep_intr.tv_sec = (int)keepalive_interval; keep_intr.tv_usec = (keepalive_interval - keep_intr.tv_sec)*1.0e6; - while (!curs->repl_stop) { + while (1) { msg = pq_read_replication_message(curs, decode); if (!msg) { goto exit; @@ -1803,11 +1803,6 @@ pq_copy_both(cursorObject *curs, PyObject *consume, int decode, double keepalive goto exit; } Py_DECREF(tmp); - - if (curs->repl_stop) { - Dprintf("pq_copy_both: repl_stop flag set by consume_func"); - break; - } } } diff --git a/tests/test_replication.py b/tests/test_replication.py index dfe11af0..cd1321ae 100644 --- a/tests/test_replication.py +++ b/tests/test_replication.py @@ -25,6 +25,7 @@ import psycopg2 import psycopg2.extensions from psycopg2.extras import PhysicalReplicationConnection, LogicalReplicationConnection +from psycopg2.extras import StopReplication from testutils import unittest from testutils import skip_before_postgres @@ -77,20 +78,6 @@ class ReplicationTest(ReplicationTestCase): cur.execute("IDENTIFY_SYSTEM") cur.fetchall() - @skip_before_postgres(9, 0) - def test_stop_replication_raises(self): - conn = self.repl_connect(connection_factory=PhysicalReplicationConnection) - if conn is None: return - cur = conn.cursor() - self.assertRaises(psycopg2.ProgrammingError, cur.stop_replication) - - cur.start_replication() - cur.stop_replication() # doesn't raise now - - def consume(msg): - pass - cur.consume_replication_stream(consume) # should return at once - @skip_before_postgres(9, 4) # slots require 9.4 def test_create_replication_slot(self): conn = self.repl_connect(connection_factory=PhysicalReplicationConnection) @@ -115,6 +102,36 @@ class ReplicationTest(ReplicationTestCase): self.create_replication_slot(cur, slot) cur.start_replication(slot) + @skip_before_postgres(9, 4) # slots require 9.4 + def test_stop_replication(self): + conn = self.repl_connect(connection_factory=LogicalReplicationConnection) + if conn is None: return + cur = conn.cursor() + + slot = "test_slot1" + + self.create_replication_slot(cur, slot, output_plugin='test_decoding') + + self.make_replication_event() + + cur.start_replication(slot) + def consume(msg): + raise StopReplication() + self.assertRaises(StopReplication, cur.consume_replication_stream, consume) + + # generate an event for our replication stream + def make_replication_event(self): + conn = self.connect() + if conn is None: return + cur = conn.cursor() + + try: + cur.execute("DROP TABLE dummy1") + except psycopg2.ProgrammingError: + conn.rollback() + cur.execute("CREATE TABLE dummy1()") + conn.commit() + class AsyncReplicationTest(ReplicationTestCase): @skip_before_postgres(9, 4)