From 09a4bb70a168799a91f63f1c2039f456c485960f Mon Sep 17 00:00:00 2001 From: Oleksandr Shulgin Date: Tue, 5 Jan 2016 12:31:57 +0100 Subject: [PATCH] Allow retrying start_replication after syntax or data error. --- psycopg/pqpath.c | 7 +++++-- psycopg/replication_cursor.h | 20 +------------------- psycopg/replication_cursor_type.c | 25 +++++++++++++------------ tests/test_replication.py | 13 +++++++++++++ 4 files changed, 32 insertions(+), 33 deletions(-) diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index 760fc977..6d6728ca 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -1870,8 +1870,11 @@ pq_fetch(cursorObject *curs, int no_result) Dprintf("pq_fetch: data from a streaming replication slot (no tuples)"); curs->rowcount = -1; ex = 0; - /* nothing to do here: pq_copy_both will be called separately */ - CLEARPGRES(curs->pgres); + /* Nothing to do here: pq_copy_both will be called separately. + + Also don't clear the result status: it's checked in + consume_stream. */ + /*CLEARPGRES(curs->pgres);*/ break; case PGRES_TUPLES_OK: diff --git a/psycopg/replication_cursor.h b/psycopg/replication_cursor.h index 36ced138..71c6e190 100644 --- a/psycopg/replication_cursor.h +++ b/psycopg/replication_cursor.h @@ -38,11 +38,10 @@ extern HIDDEN PyTypeObject replicationCursorType; typedef struct replicationCursorObject { cursorObject cur; - int started:1; /* if replication is started */ int consuming:1; /* if running the consume loop */ int decode:1; /* if we should use character decoding on the messages */ - struct timeval last_io ; /* timestamp of the last exchange with the server */ + struct timeval last_io; /* timestamp of the last exchange with the server */ struct timeval keepalive_interval; /* interval for keepalive messages in replication mode */ XLogRecPtr write_lsn; /* LSNs for replication feedback messages */ @@ -53,23 +52,6 @@ typedef struct replicationCursorObject { RAISES_NEG int psyco_repl_curs_datetime_init(void); -/* exception-raising macros */ -#define EXC_IF_REPLICATING(self, cmd) \ -do \ - if ((self)->started) { \ - PyErr_SetString(ProgrammingError, \ - #cmd " cannot be used when replication is already in progress"); \ - return NULL; } \ -while (0) - -#define EXC_IF_NOT_REPLICATING(self, cmd) \ -do \ - if (!(self)->started) { \ - PyErr_SetString(ProgrammingError, \ - #cmd " cannot be used when replication is not in progress"); \ - return NULL; } \ -while (0) - #ifdef __cplusplus } #endif diff --git a/psycopg/replication_cursor_type.c b/psycopg/replication_cursor_type.c index f652984e..204ff20a 100644 --- a/psycopg/replication_cursor_type.c +++ b/psycopg/replication_cursor_type.c @@ -59,7 +59,6 @@ psyco_repl_curs_start_replication_expert(replicationCursorObject *self, EXC_IF_CURS_CLOSED(curs); EXC_IF_GREEN(start_replication_expert); EXC_IF_TPC_PREPARED(conn, start_replication_expert); - EXC_IF_REPLICATING(self, start_replication_expert); Dprintf("psyco_repl_curs_start_replication_expert: '%s'; decode: %d", command, decode); @@ -67,7 +66,6 @@ psyco_repl_curs_start_replication_expert(replicationCursorObject *self, res = Py_None; Py_INCREF(res); - self->started = 1; self->decode = decode; gettimeofday(&self->last_io, NULL); } @@ -96,13 +94,6 @@ psyco_repl_curs_consume_stream(replicationCursorObject *self, EXC_IF_CURS_ASYNC(curs, consume_stream); EXC_IF_GREEN(consume_stream); EXC_IF_TPC_PREPARED(self->cur.conn, consume_stream); - EXC_IF_NOT_REPLICATING(self, consume_stream); - - if (self->consuming) { - PyErr_SetString(ProgrammingError, - "consume_stream cannot be used when already in the consume loop"); - return NULL; - } Dprintf("psyco_repl_curs_consume_stream"); @@ -111,6 +102,19 @@ psyco_repl_curs_consume_stream(replicationCursorObject *self, return NULL; } + if (self->consuming) { + PyErr_SetString(ProgrammingError, + "consume_stream cannot be used when already in the consume loop"); + return NULL; + } + + if (curs->pgres == NULL || PQresultStatus(curs->pgres) != PGRES_COPY_BOTH) { + PyErr_SetString(ProgrammingError, + "consume_stream: not replicating, call start_replication first"); + return NULL; + } + CLEARPGRES(curs->pgres); + self->consuming = 1; if (pq_copy_both(self, consume, keepalive_interval) >= 0) { @@ -135,7 +139,6 @@ psyco_repl_curs_read_message(replicationCursorObject *self) EXC_IF_CURS_CLOSED(curs); EXC_IF_GREEN(read_message); EXC_IF_TPC_PREPARED(self->cur.conn, read_message); - EXC_IF_NOT_REPLICATING(self, read_message); if (pq_read_replication_message(self, &msg) < 0) { return NULL; @@ -160,7 +163,6 @@ psyco_repl_curs_send_feedback(replicationCursorObject *self, static char* kwlist[] = {"write_lsn", "flush_lsn", "apply_lsn", "reply", NULL}; EXC_IF_CURS_CLOSED(curs); - EXC_IF_NOT_REPLICATING(self, send_feedback); if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|KKKi", kwlist, &write_lsn, &flush_lsn, &apply_lsn, &reply)) { @@ -246,7 +248,6 @@ static struct PyGetSetDef replicationCursorObject_getsets[] = { static int replicationCursor_setup(replicationCursorObject* self) { - self->started = 0; self->consuming = 0; self->decode = 0; diff --git a/tests/test_replication.py b/tests/test_replication.py index 4441a266..a316135f 100644 --- a/tests/test_replication.py +++ b/tests/test_replication.py @@ -118,6 +118,18 @@ class ReplicationTest(ReplicationTestCase): self.create_replication_slot(cur) cur.start_replication(self.slot) + @skip_before_postgres(9, 4) # slots require 9.4 + def test_start_and_recover_from_error(self): + conn = self.repl_connect(connection_factory=LogicalReplicationConnection) + if conn is None: return + cur = conn.cursor() + + self.create_replication_slot(cur, output_plugin='test_decoding') + + self.assertRaises(psycopg2.DataError, cur.start_replication, + slot_name=self.slot, options=dict(invalid_param='value')) + cur.start_replication(slot_name=self.slot) + @skip_before_postgres(9, 4) # slots require 9.4 def test_stop_replication(self): conn = self.repl_connect(connection_factory=LogicalReplicationConnection) @@ -162,6 +174,7 @@ class AsyncReplicationTest(ReplicationTestCase): cur.send_feedback(flush_lsn=msg.data_start) + # cannot be used in asynchronous mode self.assertRaises(psycopg2.ProgrammingError, cur.consume_stream, consume) def process_stream():