diff --git a/psycopg/cursor.h b/psycopg/cursor.h index 941e279e..432425f5 100644 --- a/psycopg/cursor.h +++ b/psycopg/cursor.h @@ -76,6 +76,7 @@ struct cursorObject { /* replication cursor attrs */ int repl_started:1; /* if replication is started */ int repl_stop:1; /* if client requested to stop replication */ + int repl_consuming:1; /* if running the consume loop */ struct timeval repl_keepalive_interval; /* interval for keepalive messages in replication mode */ XLogRecPtr repl_write_lsn; /* LSN stats for replication feedback messages */ XLogRecPtr repl_flush_lsn; @@ -147,6 +148,22 @@ do \ return NULL; } \ while (0) +#define EXC_IF_REPLICATING(self, cmd) \ +do \ + if ((self)->repl_started) { \ + PyErr_SetString(ProgrammingError, \ + #cmd " cannot be used when replication is already in progress"); \ + return NULL; } \ +while (0) + +#define EXC_IF_NOT_REPLICATING(self, cmd) \ +do \ + if (!(self)->repl_started) { \ + PyErr_SetString(ProgrammingError, \ + #cmd " cannot be used when replication is not in progress"); \ + return NULL; } \ +while (0) + #ifdef __cplusplus } #endif diff --git a/psycopg/cursor_type.c b/psycopg/cursor_type.c index a4581495..c7e6c26a 100644 --- a/psycopg/cursor_type.c +++ b/psycopg/cursor_type.c @@ -1600,16 +1600,13 @@ psyco_curs_start_replication_expert(cursorObject *self, PyObject *args, PyObject EXC_IF_CURS_CLOSED(self); EXC_IF_GREEN(start_replication_expert); EXC_IF_TPC_PREPARED(self->conn, start_replication_expert); + EXC_IF_REPLICATING(self, start_replication_expert); - if (self->repl_started) { - psyco_set_error(ProgrammingError, self, "replication already in progress"); - return NULL; - } - - Dprintf("psyco_curs_start_replication_expert: command = %s", command); + Dprintf("psyco_curs_start_replication_expert: %s", command); self->copysize = 0; self->repl_stop = 0; + self->repl_consuming = 0; self->repl_write_lsn = InvalidXLogRecPtr; self->repl_flush_lsn = InvalidXLogRecPtr; @@ -1637,11 +1634,7 @@ psyco_curs_stop_replication(cursorObject *self) { EXC_IF_CURS_CLOSED(self); EXC_IF_CURS_ASYNC(self, stop_replication); - - if (!self->repl_started || self->repl_stop) { - psyco_set_error(ProgrammingError, self, "replication is not in progress"); - return NULL; - } + EXC_IF_NOT_REPLICATING(self, stop_replication); self->repl_stop = 1; @@ -1668,6 +1661,13 @@ psyco_curs_consume_replication_stream(cursorObject *self, PyObject *args, PyObje EXC_IF_CURS_ASYNC(self, consume_replication_stream); EXC_IF_GREEN(consume_replication_stream); EXC_IF_TPC_PREPARED(self->conn, consume_replication_stream); + EXC_IF_NOT_REPLICATING(self, consume_replication_stream); + + if (self->repl_consuming) { + PyErr_SetString(ProgrammingError, + "consume_replication_stream cannot be used when already in the consume loop"); + return NULL; + } Dprintf("psyco_curs_consume_replication_stream"); @@ -1676,11 +1676,15 @@ psyco_curs_consume_replication_stream(cursorObject *self, PyObject *args, PyObje return NULL; } + self->repl_consuming = 1; + if (pq_copy_both(self, consume, decode, keepalive_interval) >= 0) { res = Py_None; Py_INCREF(res); } + self->repl_consuming = 0; + return res; } @@ -1696,6 +1700,7 @@ psyco_curs_read_replication_message(cursorObject *self, PyObject *args, PyObject EXC_IF_CURS_CLOSED(self); EXC_IF_GREEN(read_replication_message); EXC_IF_TPC_PREPARED(self->conn, read_replication_message); + EXC_IF_NOT_REPLICATING(self, read_replication_message); if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|i", kwlist, &decode)) { @@ -1733,6 +1738,7 @@ psyco_curs_send_replication_feedback(cursorObject *self, PyObject *args, PyObjec static char* kwlist[] = {"write_lsn", "flush_lsn", "apply_lsn", "reply", NULL}; EXC_IF_CURS_CLOSED(self); + EXC_IF_NOT_REPLICATING(self, send_replication_feedback); if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|KKKi", kwlist, &write_lsn, &flush_lsn, &apply_lsn, &reply)) { @@ -1763,6 +1769,7 @@ psyco_curs_flush_replication_feedback(cursorObject *self, PyObject *args, PyObje static char *kwlist[] = {"reply", NULL}; EXC_IF_CURS_CLOSED(self); + EXC_IF_NOT_REPLICATING(self, flush_replication_feedback); if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|i", kwlist, &reply)) {