diff --git a/doc/src/extras.rst b/doc/src/extras.rst index bd13a782..58b0dc07 100644 --- a/doc/src/extras.rst +++ b/doc/src/extras.rst @@ -492,22 +492,6 @@ The individual messages in the replication stream are represented by This method can also be called with all default parameters' values to just send a keepalive message to the server. - If the feedback message could not be sent, updates the passed LSN - positions in the cursor for a later call to `flush_feedback()` and - returns `!False`, otherwise returns `!True`. - - .. method:: flush_feedback(reply=False) - - :param reply: request the server to send back a keepalive message immediately - - This method tries to flush the latest replication feedback message - that `send_feedback()` was trying to send but couldn't. - - If *reply* is `!True` sends a keepalive message in either case. - - Returns `!True` if the feedback message was sent successfully, - `!False` otherwise. - Low-level replication cursor methods for :ref:`asynchronous connection ` operation. diff --git a/psycopg/libpq_support.h b/psycopg/libpq_support.h index c7139463..77d7ab12 100644 --- a/psycopg/libpq_support.h +++ b/psycopg/libpq_support.h @@ -31,8 +31,6 @@ /* type and constant definitions from internal postgres includes not available otherwise */ typedef unsigned PG_INT64_TYPE XLogRecPtr; -#define InvalidXLogRecPtr ((XLogRecPtr) 0) - /* have to use lowercase %x, as PyString_FromFormat can't do %X */ #define XLOGFMTSTR "%x/%x" #define XLOGFMTARGS(x) ((uint32)((x) >> 32)), ((uint32)((x) & 0xFFFFFFFF)) diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index 424ed901..63154172 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -1542,8 +1542,8 @@ exit: Any keepalive messages from the server are silently consumed and are never returned to the caller. */ -PyObject * -pq_read_replication_message(replicationCursorObject *repl) +int +pq_read_replication_message(replicationCursorObject *repl, replicationMessageObject **msg) { cursorObject *curs = &repl->cur; connectionObject *conn = curs->conn; @@ -1553,18 +1553,21 @@ pq_read_replication_message(replicationCursorObject *repl) XLogRecPtr data_start, wal_end; pg_int64 send_time; PyObject *str = NULL, *result = NULL; - replicationMessageObject *msg = NULL; + int ret = -1; Dprintf("pq_read_replication_message"); + *msg = NULL; consumed = 0; + retry: len = PQgetCopyData(pgconn, &buffer, 1 /* async */); if (len == 0) { /* If we've tried reading some data, but there was none, bail out. */ if (consumed) { - goto none; + ret = 0; + goto exit; } /* We should only try reading more data when there is nothing available at the moment. Otherwise, with a really highly loaded @@ -1599,7 +1602,8 @@ retry: } CLEARPGRES(curs->pgres); - goto none; + ret = 0; + goto exit; } /* It also makes sense to set this flag here to make us return early in @@ -1641,11 +1645,11 @@ retry: Py_DECREF(str); if (!result) { goto exit; } - msg = (replicationMessageObject *)result; - msg->data_size = data_size; - msg->data_start = data_start; - msg->wal_end = wal_end; - msg->send_time = send_time; + *msg = (replicationMessageObject *)result; + (*msg)->data_size = data_size; + (*msg)->data_start = data_start; + (*msg)->wal_end = wal_end; + (*msg)->send_time = send_time; } else if (buffer[0] == 'k') { /* Primary keepalive message: msgtype(1), walEnd(8), sendTime(8), reply(1) */ @@ -1656,19 +1660,8 @@ retry: } reply = buffer[hdr]; - if (reply) { - if (!pq_send_replication_feedback(repl, 0)) { - if (conn->async) { - repl->feedback_pending = 1; - } else { - /* XXX not sure if this was a good idea after all */ - pq_raise(conn, curs, NULL); - goto exit; - } - } - else { - gettimeofday(&repl->last_io, NULL); - } + if (reply && pq_send_replication_feedback(repl, 0) < 0) { + goto exit; } PQfreemem(buffer); @@ -1680,24 +1673,22 @@ retry: goto exit; } + ret = 0; + exit: if (buffer) { PQfreemem(buffer); } - return result; - -none: - result = Py_None; - Py_INCREF(result); - goto exit; + return ret; } int pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested) { cursorObject *curs = &repl->cur; - PGconn *pgconn = curs->conn->pgconn; + connectionObject *conn = curs->conn; + PGconn *pgconn = conn->pgconn; char replybuf[1 + 8 + 8 + 8 + 8 + 1]; int len = 0; @@ -1714,11 +1705,12 @@ pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested) replybuf[len] = reply_requested ? 1 : 0; len += 1; if (PQputCopyData(pgconn, replybuf, len) <= 0 || PQflush(pgconn) != 0) { - return 0; + pq_raise(conn, curs, NULL); + return -1; } gettimeofday(&repl->last_io, NULL); - return 1; + return 0; } /* Calls pq_read_replication_message in an endless loop, until @@ -1734,7 +1726,8 @@ pq_copy_both(replicationCursorObject *repl, PyObject *consume, double keepalive_ cursorObject *curs = &repl->cur; connectionObject *conn = curs->conn; PGconn *pgconn = conn->pgconn; - PyObject *msg, *tmp = NULL; + replicationMessageObject *msg = NULL; + PyObject *tmp = NULL; int fd, sel, ret = -1; fd_set fds; struct timeval keep_intr, curr_time, ping_time, timeout; @@ -1750,13 +1743,10 @@ pq_copy_both(replicationCursorObject *repl, PyObject *consume, double keepalive_ keep_intr.tv_usec = (keepalive_interval - keep_intr.tv_sec)*1.0e6; while (1) { - msg = pq_read_replication_message(repl); - if (!msg) { + if (pq_read_replication_message(repl, &msg) < 0) { goto exit; } - else if (msg == Py_None) { - Py_DECREF(msg); - + else if (msg == NULL) { fd = PQsocket(pgconn); if (fd < 0) { pq_raise(conn, curs, NULL); @@ -1793,8 +1783,7 @@ pq_copy_both(replicationCursorObject *repl, PyObject *consume, double keepalive_ } if (sel == 0) { - if (!pq_send_replication_feedback(repl, 0)) { - pq_raise(conn, curs, NULL); + if (pq_send_replication_feedback(repl, 0) < 0) { goto exit; } } diff --git a/psycopg/pqpath.h b/psycopg/pqpath.h index 1348d9c4..5cf22309 100644 --- a/psycopg/pqpath.h +++ b/psycopg/pqpath.h @@ -27,8 +27,9 @@ #define PSYCOPG_PQPATH_H 1 #include "psycopg/cursor.h" -#include "psycopg/replication_cursor.h" #include "psycopg/connection.h" +#include "psycopg/replication_cursor.h" +#include "psycopg/replication_message.h" /* macro to clean the pg result */ #define CLEARPGRES(pgres) do { PQclear(pgres); pgres = NULL; } while (0) @@ -76,7 +77,8 @@ RAISES HIDDEN void pq_complete_error(connectionObject *conn, PGresult **pgres, /* replication protocol support */ HIDDEN int pq_copy_both(replicationCursorObject *repl, PyObject *consumer, double keepalive_interval); -HIDDEN PyObject *pq_read_replication_message(replicationCursorObject *repl); +HIDDEN int pq_read_replication_message(replicationCursorObject *repl, + replicationMessageObject **msg); HIDDEN int pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested); #endif /* !defined(PSYCOPG_PQPATH_H) */ diff --git a/psycopg/replication_cursor.h b/psycopg/replication_cursor.h index 07bf7b54..36ced138 100644 --- a/psycopg/replication_cursor.h +++ b/psycopg/replication_cursor.h @@ -45,10 +45,9 @@ typedef struct replicationCursorObject { struct timeval last_io ; /* timestamp of the last exchange with the server */ struct timeval keepalive_interval; /* interval for keepalive messages in replication mode */ - XLogRecPtr write_lsn; /* LSN stats for replication feedback messages */ + XLogRecPtr write_lsn; /* LSNs for replication feedback messages */ XLogRecPtr flush_lsn; XLogRecPtr apply_lsn; - int feedback_pending; /* flag set when we couldn't send the feedback to the server */ } replicationCursorObject; diff --git a/psycopg/replication_cursor_type.c b/psycopg/replication_cursor_type.c index 1fd5ea39..f652984e 100644 --- a/psycopg/replication_cursor_type.c +++ b/psycopg/replication_cursor_type.c @@ -130,28 +130,21 @@ static PyObject * psyco_repl_curs_read_message(replicationCursorObject *self) { cursorObject *curs = &self->cur; + replicationMessageObject *msg = NULL; EXC_IF_CURS_CLOSED(curs); EXC_IF_GREEN(read_message); EXC_IF_TPC_PREPARED(self->cur.conn, read_message); EXC_IF_NOT_REPLICATING(self, read_message); - return pq_read_replication_message(self); -} - -static PyObject * -repl_curs_flush_feedback(replicationCursorObject *self, int reply) -{ - if (!(self->feedback_pending || reply)) - Py_RETURN_TRUE; - - if (pq_send_replication_feedback(self, reply)) { - self->feedback_pending = 0; - Py_RETURN_TRUE; - } else { - self->feedback_pending = 1; - Py_RETURN_FALSE; + if (pq_read_replication_message(self, &msg) < 0) { + return NULL; } + if (msg) { + return (PyObject *)msg; + } + + Py_RETURN_NONE; } #define psyco_repl_curs_send_feedback_doc \ @@ -162,9 +155,7 @@ psyco_repl_curs_send_feedback(replicationCursorObject *self, PyObject *args, PyObject *kwargs) { cursorObject *curs = &self->cur; - XLogRecPtr write_lsn = InvalidXLogRecPtr, - flush_lsn = InvalidXLogRecPtr, - apply_lsn = InvalidXLogRecPtr; + XLogRecPtr write_lsn = 0, flush_lsn = 0, apply_lsn = 0; int reply = 0; static char* kwlist[] = {"write_lsn", "flush_lsn", "apply_lsn", "reply", NULL}; @@ -185,31 +176,11 @@ psyco_repl_curs_send_feedback(replicationCursorObject *self, if (apply_lsn > self->apply_lsn) self->apply_lsn = apply_lsn; - self->feedback_pending = 1; - - return repl_curs_flush_feedback(self, reply); -} - -#define psyco_repl_curs_flush_feedback_doc \ -"flush_feedback(reply=False) -- Try flushing the latest pending replication feedback message to the server and optionally request a reply." - -static PyObject * -psyco_repl_curs_flush_feedback(replicationCursorObject *self, - PyObject *args, PyObject *kwargs) -{ - cursorObject *curs = &self->cur; - int reply = 0; - static char *kwlist[] = {"reply", NULL}; - - EXC_IF_CURS_CLOSED(curs); - EXC_IF_NOT_REPLICATING(self, flush_feedback); - - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|i", kwlist, - &reply)) { + if (pq_send_replication_feedback(self, reply) < 0) { return NULL; } - return repl_curs_flush_feedback(self, reply); + Py_RETURN_NONE; } @@ -260,8 +231,6 @@ static struct PyMethodDef replicationCursorObject_methods[] = { METH_NOARGS, psyco_repl_curs_read_message_doc}, {"send_feedback", (PyCFunction)psyco_repl_curs_send_feedback, METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_send_feedback_doc}, - {"flush_feedback", (PyCFunction)psyco_repl_curs_flush_feedback, - METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_flush_feedback_doc}, {NULL} }; @@ -281,10 +250,9 @@ replicationCursor_setup(replicationCursorObject* self) self->consuming = 0; self->decode = 0; - self->write_lsn = InvalidXLogRecPtr; - self->flush_lsn = InvalidXLogRecPtr; - self->apply_lsn = InvalidXLogRecPtr; - self->feedback_pending = 0; + self->write_lsn = 0; + self->flush_lsn = 0; + self->apply_lsn = 0; return 0; } diff --git a/tests/test_replication.py b/tests/test_replication.py index 2dbb0086..4441a266 100644 --- a/tests/test_replication.py +++ b/tests/test_replication.py @@ -157,7 +157,7 @@ class AsyncReplicationTest(ReplicationTestCase): self.msg_count += 1 if self.msg_count > 3: - cur.flush_feedback(reply=True) + cur.send_feedback(reply=True) raise StopReplication() cur.send_feedback(flush_lsn=msg.data_start)