Allow retrying start_replication after syntax or data error.

This commit is contained in:
Oleksandr Shulgin 2016-01-05 12:31:57 +01:00
parent 4b9a6f48f3
commit 09a4bb70a1
4 changed files with 32 additions and 33 deletions

View File

@ -1870,8 +1870,11 @@ pq_fetch(cursorObject *curs, int no_result)
Dprintf("pq_fetch: data from a streaming replication slot (no tuples)"); Dprintf("pq_fetch: data from a streaming replication slot (no tuples)");
curs->rowcount = -1; curs->rowcount = -1;
ex = 0; ex = 0;
/* nothing to do here: pq_copy_both will be called separately */ /* Nothing to do here: pq_copy_both will be called separately.
CLEARPGRES(curs->pgres);
Also don't clear the result status: it's checked in
consume_stream. */
/*CLEARPGRES(curs->pgres);*/
break; break;
case PGRES_TUPLES_OK: case PGRES_TUPLES_OK:

View File

@ -38,7 +38,6 @@ extern HIDDEN PyTypeObject replicationCursorType;
typedef struct replicationCursorObject { typedef struct replicationCursorObject {
cursorObject cur; cursorObject cur;
int started:1; /* if replication is started */
int consuming:1; /* if running the consume loop */ int consuming:1; /* if running the consume loop */
int decode:1; /* if we should use character decoding on the messages */ int decode:1; /* if we should use character decoding on the messages */
@ -53,23 +52,6 @@ typedef struct replicationCursorObject {
RAISES_NEG int psyco_repl_curs_datetime_init(void); RAISES_NEG int psyco_repl_curs_datetime_init(void);
/* exception-raising macros */
#define EXC_IF_REPLICATING(self, cmd) \
do \
if ((self)->started) { \
PyErr_SetString(ProgrammingError, \
#cmd " cannot be used when replication is already in progress"); \
return NULL; } \
while (0)
#define EXC_IF_NOT_REPLICATING(self, cmd) \
do \
if (!(self)->started) { \
PyErr_SetString(ProgrammingError, \
#cmd " cannot be used when replication is not in progress"); \
return NULL; } \
while (0)
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -59,7 +59,6 @@ psyco_repl_curs_start_replication_expert(replicationCursorObject *self,
EXC_IF_CURS_CLOSED(curs); EXC_IF_CURS_CLOSED(curs);
EXC_IF_GREEN(start_replication_expert); EXC_IF_GREEN(start_replication_expert);
EXC_IF_TPC_PREPARED(conn, start_replication_expert); EXC_IF_TPC_PREPARED(conn, start_replication_expert);
EXC_IF_REPLICATING(self, start_replication_expert);
Dprintf("psyco_repl_curs_start_replication_expert: '%s'; decode: %d", command, decode); Dprintf("psyco_repl_curs_start_replication_expert: '%s'; decode: %d", command, decode);
@ -67,7 +66,6 @@ psyco_repl_curs_start_replication_expert(replicationCursorObject *self,
res = Py_None; res = Py_None;
Py_INCREF(res); Py_INCREF(res);
self->started = 1;
self->decode = decode; self->decode = decode;
gettimeofday(&self->last_io, NULL); gettimeofday(&self->last_io, NULL);
} }
@ -96,13 +94,6 @@ psyco_repl_curs_consume_stream(replicationCursorObject *self,
EXC_IF_CURS_ASYNC(curs, consume_stream); EXC_IF_CURS_ASYNC(curs, consume_stream);
EXC_IF_GREEN(consume_stream); EXC_IF_GREEN(consume_stream);
EXC_IF_TPC_PREPARED(self->cur.conn, consume_stream); EXC_IF_TPC_PREPARED(self->cur.conn, consume_stream);
EXC_IF_NOT_REPLICATING(self, consume_stream);
if (self->consuming) {
PyErr_SetString(ProgrammingError,
"consume_stream cannot be used when already in the consume loop");
return NULL;
}
Dprintf("psyco_repl_curs_consume_stream"); Dprintf("psyco_repl_curs_consume_stream");
@ -111,6 +102,19 @@ psyco_repl_curs_consume_stream(replicationCursorObject *self,
return NULL; return NULL;
} }
if (self->consuming) {
PyErr_SetString(ProgrammingError,
"consume_stream cannot be used when already in the consume loop");
return NULL;
}
if (curs->pgres == NULL || PQresultStatus(curs->pgres) != PGRES_COPY_BOTH) {
PyErr_SetString(ProgrammingError,
"consume_stream: not replicating, call start_replication first");
return NULL;
}
CLEARPGRES(curs->pgres);
self->consuming = 1; self->consuming = 1;
if (pq_copy_both(self, consume, keepalive_interval) >= 0) { if (pq_copy_both(self, consume, keepalive_interval) >= 0) {
@ -135,7 +139,6 @@ psyco_repl_curs_read_message(replicationCursorObject *self)
EXC_IF_CURS_CLOSED(curs); EXC_IF_CURS_CLOSED(curs);
EXC_IF_GREEN(read_message); EXC_IF_GREEN(read_message);
EXC_IF_TPC_PREPARED(self->cur.conn, read_message); EXC_IF_TPC_PREPARED(self->cur.conn, read_message);
EXC_IF_NOT_REPLICATING(self, read_message);
if (pq_read_replication_message(self, &msg) < 0) { if (pq_read_replication_message(self, &msg) < 0) {
return NULL; return NULL;
@ -160,7 +163,6 @@ psyco_repl_curs_send_feedback(replicationCursorObject *self,
static char* kwlist[] = {"write_lsn", "flush_lsn", "apply_lsn", "reply", NULL}; static char* kwlist[] = {"write_lsn", "flush_lsn", "apply_lsn", "reply", NULL};
EXC_IF_CURS_CLOSED(curs); EXC_IF_CURS_CLOSED(curs);
EXC_IF_NOT_REPLICATING(self, send_feedback);
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|KKKi", kwlist, if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|KKKi", kwlist,
&write_lsn, &flush_lsn, &apply_lsn, &reply)) { &write_lsn, &flush_lsn, &apply_lsn, &reply)) {
@ -246,7 +248,6 @@ static struct PyGetSetDef replicationCursorObject_getsets[] = {
static int static int
replicationCursor_setup(replicationCursorObject* self) replicationCursor_setup(replicationCursorObject* self)
{ {
self->started = 0;
self->consuming = 0; self->consuming = 0;
self->decode = 0; self->decode = 0;

View File

@ -118,6 +118,18 @@ class ReplicationTest(ReplicationTestCase):
self.create_replication_slot(cur) self.create_replication_slot(cur)
cur.start_replication(self.slot) cur.start_replication(self.slot)
@skip_before_postgres(9, 4) # slots require 9.4
def test_start_and_recover_from_error(self):
conn = self.repl_connect(connection_factory=LogicalReplicationConnection)
if conn is None: return
cur = conn.cursor()
self.create_replication_slot(cur, output_plugin='test_decoding')
self.assertRaises(psycopg2.DataError, cur.start_replication,
slot_name=self.slot, options=dict(invalid_param='value'))
cur.start_replication(slot_name=self.slot)
@skip_before_postgres(9, 4) # slots require 9.4 @skip_before_postgres(9, 4) # slots require 9.4
def test_stop_replication(self): def test_stop_replication(self):
conn = self.repl_connect(connection_factory=LogicalReplicationConnection) conn = self.repl_connect(connection_factory=LogicalReplicationConnection)
@ -162,6 +174,7 @@ class AsyncReplicationTest(ReplicationTestCase):
cur.send_feedback(flush_lsn=msg.data_start) cur.send_feedback(flush_lsn=msg.data_start)
# cannot be used in asynchronous mode
self.assertRaises(psycopg2.ProgrammingError, cur.consume_stream, consume) self.assertRaises(psycopg2.ProgrammingError, cur.consume_stream, consume)
def process_stream(): def process_stream():