Add checks on replication state, have to have a separate check for consume loop.

This commit is contained in:
Oleksandr Shulgin 2015-10-14 17:36:50 +02:00
parent a0b42a12ff
commit e05b4fd267
2 changed files with 35 additions and 11 deletions

View File

@ -76,6 +76,7 @@ struct cursorObject {
/* replication cursor attrs */
int repl_started:1; /* if replication is started */
int repl_stop:1; /* if client requested to stop replication */
int repl_consuming:1; /* if running the consume loop */
struct timeval repl_keepalive_interval; /* interval for keepalive messages in replication mode */
XLogRecPtr repl_write_lsn; /* LSN stats for replication feedback messages */
XLogRecPtr repl_flush_lsn;
@ -147,6 +148,22 @@ do \
return NULL; } \
while (0)
#define EXC_IF_REPLICATING(self, cmd) \
do \
if ((self)->repl_started) { \
PyErr_SetString(ProgrammingError, \
#cmd " cannot be used when replication is already in progress"); \
return NULL; } \
while (0)
#define EXC_IF_NOT_REPLICATING(self, cmd) \
do \
if (!(self)->repl_started) { \
PyErr_SetString(ProgrammingError, \
#cmd " cannot be used when replication is not in progress"); \
return NULL; } \
while (0)
#ifdef __cplusplus
}
#endif

View File

@ -1600,16 +1600,13 @@ psyco_curs_start_replication_expert(cursorObject *self, PyObject *args, PyObject
EXC_IF_CURS_CLOSED(self);
EXC_IF_GREEN(start_replication_expert);
EXC_IF_TPC_PREPARED(self->conn, start_replication_expert);
EXC_IF_REPLICATING(self, start_replication_expert);
if (self->repl_started) {
psyco_set_error(ProgrammingError, self, "replication already in progress");
return NULL;
}
Dprintf("psyco_curs_start_replication_expert: command = %s", command);
Dprintf("psyco_curs_start_replication_expert: %s", command);
self->copysize = 0;
self->repl_stop = 0;
self->repl_consuming = 0;
self->repl_write_lsn = InvalidXLogRecPtr;
self->repl_flush_lsn = InvalidXLogRecPtr;
@ -1637,11 +1634,7 @@ psyco_curs_stop_replication(cursorObject *self)
{
EXC_IF_CURS_CLOSED(self);
EXC_IF_CURS_ASYNC(self, stop_replication);
if (!self->repl_started || self->repl_stop) {
psyco_set_error(ProgrammingError, self, "replication is not in progress");
return NULL;
}
EXC_IF_NOT_REPLICATING(self, stop_replication);
self->repl_stop = 1;
@ -1668,6 +1661,13 @@ psyco_curs_consume_replication_stream(cursorObject *self, PyObject *args, PyObje
EXC_IF_CURS_ASYNC(self, consume_replication_stream);
EXC_IF_GREEN(consume_replication_stream);
EXC_IF_TPC_PREPARED(self->conn, consume_replication_stream);
EXC_IF_NOT_REPLICATING(self, consume_replication_stream);
if (self->repl_consuming) {
PyErr_SetString(ProgrammingError,
"consume_replication_stream cannot be used when already in the consume loop");
return NULL;
}
Dprintf("psyco_curs_consume_replication_stream");
@ -1676,11 +1676,15 @@ psyco_curs_consume_replication_stream(cursorObject *self, PyObject *args, PyObje
return NULL;
}
self->repl_consuming = 1;
if (pq_copy_both(self, consume, decode, keepalive_interval) >= 0) {
res = Py_None;
Py_INCREF(res);
}
self->repl_consuming = 0;
return res;
}
@ -1696,6 +1700,7 @@ psyco_curs_read_replication_message(cursorObject *self, PyObject *args, PyObject
EXC_IF_CURS_CLOSED(self);
EXC_IF_GREEN(read_replication_message);
EXC_IF_TPC_PREPARED(self->conn, read_replication_message);
EXC_IF_NOT_REPLICATING(self, read_replication_message);
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|i", kwlist,
&decode)) {
@ -1733,6 +1738,7 @@ psyco_curs_send_replication_feedback(cursorObject *self, PyObject *args, PyObjec
static char* kwlist[] = {"write_lsn", "flush_lsn", "apply_lsn", "reply", NULL};
EXC_IF_CURS_CLOSED(self);
EXC_IF_NOT_REPLICATING(self, send_replication_feedback);
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|KKKi", kwlist,
&write_lsn, &flush_lsn, &apply_lsn, &reply)) {
@ -1763,6 +1769,7 @@ psyco_curs_flush_replication_feedback(cursorObject *self, PyObject *args, PyObje
static char *kwlist[] = {"reply", NULL};
EXC_IF_CURS_CLOSED(self);
EXC_IF_NOT_REPLICATING(self, flush_replication_feedback);
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|i", kwlist,
&reply)) {