Replace stop_replication with requirement for an exception.

This commit is contained in:
Oleksandr Shulgin 2015-10-19 15:42:42 +02:00
parent 0435320f34
commit 4ab7cf0157
6 changed files with 51 additions and 51 deletions

View File

@ -348,9 +348,11 @@ The individual messages in the replication stream are presented by
`start_replication()` first. `start_replication()` first.
When called, this method enters an endless loop, reading messages from When called, this method enters an endless loop, reading messages from
the server and passing them to ``consume()``. In order to make this the server and passing them to ``consume()``, then waiting for more
method break out of the loop and return, ``consume()`` can call messages from the server. In order to make this method break out of
`stop_replication()` on the cursor or it can throw an exception. the loop and return, ``consume()`` can throw a `StopReplication`
exception (any unhandled exception will make it break out of the loop
as well).
If *decode* is set to `!True`, the messages read from the server are If *decode* is set to `!True`, the messages read from the server are
converted according to the connection `~connection.encoding`. This converted according to the connection `~connection.encoding`. This
@ -398,13 +400,6 @@ The individual messages in the replication stream are presented by
load on network and the server. A possible strategy is to confirm load on network and the server. A possible strategy is to confirm
after every COMMIT message. after every COMMIT message.
.. method:: stop_replication()
This method can be called on synchronous connection from the
``consume()`` callable in order to break out of the endless loop in
`consume_replication_stream()`. If called on asynchronous connection
or when replication is not in progress, this method raises an error.
.. method:: send_replication_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False) .. method:: send_replication_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False)
:param write_lsn: a LSN position up to which the client has written the data locally :param write_lsn: a LSN position up to which the client has written the data locally
@ -506,10 +501,11 @@ The individual messages in the replication stream are presented by
if not sel[0]: if not sel[0]:
cur.send_replication_feedback() cur.send_replication_feedback()
.. index:: .. index::
pair: Cursor; Replication pair: Cursor; Replication
.. autoclass:: StopReplication
.. index:: .. index::
single: Data types; Additional single: Data types; Additional

View File

@ -500,6 +500,18 @@ class PhysicalReplicationConnection(ReplicationConnectionBase):
super(PhysicalReplicationConnection, self).__init__(*args, **kwargs) super(PhysicalReplicationConnection, self).__init__(*args, **kwargs)
class StopReplication(Exception):
"""
Exception used to break out of the endless loop in
`~ReplicationCursor.consume_replication_stream()`.
Subclass of `~exceptions.Exception`. Intentionally *not* inherited from
`~psycopg2.Error` as occurrence of this exception does not indicate an
error.
"""
pass
class ReplicationCursor(_cursor): class ReplicationCursor(_cursor):
"""A cursor used for communication on the replication protocol.""" """A cursor used for communication on the replication protocol."""

View File

@ -75,7 +75,6 @@ struct cursorObject {
/* replication cursor attrs */ /* replication cursor attrs */
int repl_started:1; /* if replication is started */ int repl_started:1; /* if replication is started */
int repl_stop:1; /* if client requested to stop replication */
int repl_consuming:1; /* if running the consume loop */ int repl_consuming:1; /* if running the consume loop */
struct timeval repl_keepalive_interval; /* interval for keepalive messages in replication mode */ struct timeval repl_keepalive_interval; /* interval for keepalive messages in replication mode */
XLogRecPtr repl_write_lsn; /* LSN stats for replication feedback messages */ XLogRecPtr repl_write_lsn; /* LSN stats for replication feedback messages */

View File

@ -1605,7 +1605,6 @@ psyco_curs_start_replication_expert(cursorObject *self, PyObject *args, PyObject
Dprintf("psyco_curs_start_replication_expert: %s", command); Dprintf("psyco_curs_start_replication_expert: %s", command);
self->copysize = 0; self->copysize = 0;
self->repl_stop = 0;
self->repl_consuming = 0; self->repl_consuming = 0;
self->repl_write_lsn = InvalidXLogRecPtr; self->repl_write_lsn = InvalidXLogRecPtr;
@ -1626,21 +1625,6 @@ psyco_curs_start_replication_expert(cursorObject *self, PyObject *args, PyObject
return res; return res;
} }
#define psyco_curs_stop_replication_doc \
"stop_replication() -- Set flag to break out of the endless loop in consume_replication_stream()."
static PyObject *
psyco_curs_stop_replication(cursorObject *self)
{
EXC_IF_CURS_CLOSED(self);
EXC_IF_CURS_ASYNC(self, stop_replication);
EXC_IF_NOT_REPLICATING(self, stop_replication);
self->repl_stop = 1;
Py_RETURN_NONE;
}
#define psyco_curs_consume_replication_stream_doc \ #define psyco_curs_consume_replication_stream_doc \
"consume_replication_stream(consumer, keepalive_interval=10) -- Consume replication stream." "consume_replication_stream(consumer, keepalive_interval=10) -- Consume replication stream."
@ -1684,7 +1668,6 @@ psyco_curs_consume_replication_stream(cursorObject *self, PyObject *args, PyObje
} }
self->repl_consuming = 0; self->repl_consuming = 0;
self->repl_stop = 0; /* who knows, what if we will be called again? */
return res; return res;
} }
@ -1992,8 +1975,6 @@ static struct PyMethodDef cursorObject_methods[] = {
METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_expert_doc}, METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_expert_doc},
{"start_replication_expert", (PyCFunction)psyco_curs_start_replication_expert, {"start_replication_expert", (PyCFunction)psyco_curs_start_replication_expert,
METH_VARARGS|METH_KEYWORDS, psyco_curs_start_replication_expert_doc}, METH_VARARGS|METH_KEYWORDS, psyco_curs_start_replication_expert_doc},
{"stop_replication", (PyCFunction)psyco_curs_stop_replication,
METH_NOARGS, psyco_curs_stop_replication_doc},
{"consume_replication_stream", (PyCFunction)psyco_curs_consume_replication_stream, {"consume_replication_stream", (PyCFunction)psyco_curs_consume_replication_stream,
METH_VARARGS|METH_KEYWORDS, psyco_curs_consume_replication_stream_doc}, METH_VARARGS|METH_KEYWORDS, psyco_curs_consume_replication_stream_doc},
{"read_replication_message", (PyCFunction)psyco_curs_read_replication_message, {"read_replication_message", (PyCFunction)psyco_curs_read_replication_message,

View File

@ -1743,7 +1743,7 @@ pq_copy_both(cursorObject *curs, PyObject *consume, int decode, double keepalive
keep_intr.tv_sec = (int)keepalive_interval; keep_intr.tv_sec = (int)keepalive_interval;
keep_intr.tv_usec = (keepalive_interval - keep_intr.tv_sec)*1.0e6; keep_intr.tv_usec = (keepalive_interval - keep_intr.tv_sec)*1.0e6;
while (!curs->repl_stop) { while (1) {
msg = pq_read_replication_message(curs, decode); msg = pq_read_replication_message(curs, decode);
if (!msg) { if (!msg) {
goto exit; goto exit;
@ -1803,11 +1803,6 @@ pq_copy_both(cursorObject *curs, PyObject *consume, int decode, double keepalive
goto exit; goto exit;
} }
Py_DECREF(tmp); Py_DECREF(tmp);
if (curs->repl_stop) {
Dprintf("pq_copy_both: repl_stop flag set by consume_func");
break;
}
} }
} }

View File

@ -25,6 +25,7 @@
import psycopg2 import psycopg2
import psycopg2.extensions import psycopg2.extensions
from psycopg2.extras import PhysicalReplicationConnection, LogicalReplicationConnection from psycopg2.extras import PhysicalReplicationConnection, LogicalReplicationConnection
from psycopg2.extras import StopReplication
from testutils import unittest from testutils import unittest
from testutils import skip_before_postgres from testutils import skip_before_postgres
@ -77,20 +78,6 @@ class ReplicationTest(ReplicationTestCase):
cur.execute("IDENTIFY_SYSTEM") cur.execute("IDENTIFY_SYSTEM")
cur.fetchall() cur.fetchall()
@skip_before_postgres(9, 0)
def test_stop_replication_raises(self):
conn = self.repl_connect(connection_factory=PhysicalReplicationConnection)
if conn is None: return
cur = conn.cursor()
self.assertRaises(psycopg2.ProgrammingError, cur.stop_replication)
cur.start_replication()
cur.stop_replication() # doesn't raise now
def consume(msg):
pass
cur.consume_replication_stream(consume) # should return at once
@skip_before_postgres(9, 4) # slots require 9.4 @skip_before_postgres(9, 4) # slots require 9.4
def test_create_replication_slot(self): def test_create_replication_slot(self):
conn = self.repl_connect(connection_factory=PhysicalReplicationConnection) conn = self.repl_connect(connection_factory=PhysicalReplicationConnection)
@ -115,6 +102,36 @@ class ReplicationTest(ReplicationTestCase):
self.create_replication_slot(cur, slot) self.create_replication_slot(cur, slot)
cur.start_replication(slot) cur.start_replication(slot)
@skip_before_postgres(9, 4) # slots require 9.4
def test_stop_replication(self):
conn = self.repl_connect(connection_factory=LogicalReplicationConnection)
if conn is None: return
cur = conn.cursor()
slot = "test_slot1"
self.create_replication_slot(cur, slot, output_plugin='test_decoding')
self.make_replication_event()
cur.start_replication(slot)
def consume(msg):
raise StopReplication()
self.assertRaises(StopReplication, cur.consume_replication_stream, consume)
# generate an event for our replication stream
def make_replication_event(self):
conn = self.connect()
if conn is None: return
cur = conn.cursor()
try:
cur.execute("DROP TABLE dummy1")
except psycopg2.ProgrammingError:
conn.rollback()
cur.execute("CREATE TABLE dummy1()")
conn.commit()
class AsyncReplicationTest(ReplicationTestCase): class AsyncReplicationTest(ReplicationTestCase):
@skip_before_postgres(9, 4) @skip_before_postgres(9, 4)