Drop ReplicationCursor.flush_feedback(), rectify pq_*_replication_*() interface.

This commit is contained in:
Oleksandr Shulgin 2015-10-23 18:30:18 +02:00
parent dd6bcbd04f
commit 8b79bf43ac
7 changed files with 49 additions and 109 deletions

View File

@ -492,22 +492,6 @@ The individual messages in the replication stream are represented by
This method can also be called with all default parameters' values to This method can also be called with all default parameters' values to
just send a keepalive message to the server. just send a keepalive message to the server.
If the feedback message could not be sent, updates the passed LSN
positions in the cursor for a later call to `flush_feedback()` and
returns `!False`, otherwise returns `!True`.
.. method:: flush_feedback(reply=False)
:param reply: request the server to send back a keepalive message immediately
This method tries to flush the latest replication feedback message
that `send_feedback()` was trying to send but couldn't.
If *reply* is `!True` sends a keepalive message in either case.
Returns `!True` if the feedback message was sent successfully,
`!False` otherwise.
Low-level replication cursor methods for :ref:`asynchronous connection Low-level replication cursor methods for :ref:`asynchronous connection
<async-support>` operation. <async-support>` operation.

View File

@ -31,8 +31,6 @@
/* type and constant definitions from internal postgres includes not available otherwise */ /* type and constant definitions from internal postgres includes not available otherwise */
typedef unsigned PG_INT64_TYPE XLogRecPtr; typedef unsigned PG_INT64_TYPE XLogRecPtr;
#define InvalidXLogRecPtr ((XLogRecPtr) 0)
/* have to use lowercase %x, as PyString_FromFormat can't do %X */ /* have to use lowercase %x, as PyString_FromFormat can't do %X */
#define XLOGFMTSTR "%x/%x" #define XLOGFMTSTR "%x/%x"
#define XLOGFMTARGS(x) ((uint32)((x) >> 32)), ((uint32)((x) & 0xFFFFFFFF)) #define XLOGFMTARGS(x) ((uint32)((x) >> 32)), ((uint32)((x) & 0xFFFFFFFF))

View File

@ -1542,8 +1542,8 @@ exit:
Any keepalive messages from the server are silently consumed and Any keepalive messages from the server are silently consumed and
are never returned to the caller. are never returned to the caller.
*/ */
PyObject * int
pq_read_replication_message(replicationCursorObject *repl) pq_read_replication_message(replicationCursorObject *repl, replicationMessageObject **msg)
{ {
cursorObject *curs = &repl->cur; cursorObject *curs = &repl->cur;
connectionObject *conn = curs->conn; connectionObject *conn = curs->conn;
@ -1553,18 +1553,21 @@ pq_read_replication_message(replicationCursorObject *repl)
XLogRecPtr data_start, wal_end; XLogRecPtr data_start, wal_end;
pg_int64 send_time; pg_int64 send_time;
PyObject *str = NULL, *result = NULL; PyObject *str = NULL, *result = NULL;
replicationMessageObject *msg = NULL; int ret = -1;
Dprintf("pq_read_replication_message"); Dprintf("pq_read_replication_message");
*msg = NULL;
consumed = 0; consumed = 0;
retry: retry:
len = PQgetCopyData(pgconn, &buffer, 1 /* async */); len = PQgetCopyData(pgconn, &buffer, 1 /* async */);
if (len == 0) { if (len == 0) {
/* If we've tried reading some data, but there was none, bail out. */ /* If we've tried reading some data, but there was none, bail out. */
if (consumed) { if (consumed) {
goto none; ret = 0;
goto exit;
} }
/* We should only try reading more data when there is nothing /* We should only try reading more data when there is nothing
available at the moment. Otherwise, with a really highly loaded available at the moment. Otherwise, with a really highly loaded
@ -1599,7 +1602,8 @@ retry:
} }
CLEARPGRES(curs->pgres); CLEARPGRES(curs->pgres);
goto none; ret = 0;
goto exit;
} }
/* It also makes sense to set this flag here to make us return early in /* It also makes sense to set this flag here to make us return early in
@ -1641,11 +1645,11 @@ retry:
Py_DECREF(str); Py_DECREF(str);
if (!result) { goto exit; } if (!result) { goto exit; }
msg = (replicationMessageObject *)result; *msg = (replicationMessageObject *)result;
msg->data_size = data_size; (*msg)->data_size = data_size;
msg->data_start = data_start; (*msg)->data_start = data_start;
msg->wal_end = wal_end; (*msg)->wal_end = wal_end;
msg->send_time = send_time; (*msg)->send_time = send_time;
} }
else if (buffer[0] == 'k') { else if (buffer[0] == 'k') {
/* Primary keepalive message: msgtype(1), walEnd(8), sendTime(8), reply(1) */ /* Primary keepalive message: msgtype(1), walEnd(8), sendTime(8), reply(1) */
@ -1656,19 +1660,8 @@ retry:
} }
reply = buffer[hdr]; reply = buffer[hdr];
if (reply) { if (reply && pq_send_replication_feedback(repl, 0) < 0) {
if (!pq_send_replication_feedback(repl, 0)) { goto exit;
if (conn->async) {
repl->feedback_pending = 1;
} else {
/* XXX not sure if this was a good idea after all */
pq_raise(conn, curs, NULL);
goto exit;
}
}
else {
gettimeofday(&repl->last_io, NULL);
}
} }
PQfreemem(buffer); PQfreemem(buffer);
@ -1680,24 +1673,22 @@ retry:
goto exit; goto exit;
} }
ret = 0;
exit: exit:
if (buffer) { if (buffer) {
PQfreemem(buffer); PQfreemem(buffer);
} }
return result; return ret;
none:
result = Py_None;
Py_INCREF(result);
goto exit;
} }
int int
pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested) pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested)
{ {
cursorObject *curs = &repl->cur; cursorObject *curs = &repl->cur;
PGconn *pgconn = curs->conn->pgconn; connectionObject *conn = curs->conn;
PGconn *pgconn = conn->pgconn;
char replybuf[1 + 8 + 8 + 8 + 8 + 1]; char replybuf[1 + 8 + 8 + 8 + 8 + 1];
int len = 0; int len = 0;
@ -1714,11 +1705,12 @@ pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested)
replybuf[len] = reply_requested ? 1 : 0; len += 1; replybuf[len] = reply_requested ? 1 : 0; len += 1;
if (PQputCopyData(pgconn, replybuf, len) <= 0 || PQflush(pgconn) != 0) { if (PQputCopyData(pgconn, replybuf, len) <= 0 || PQflush(pgconn) != 0) {
return 0; pq_raise(conn, curs, NULL);
return -1;
} }
gettimeofday(&repl->last_io, NULL); gettimeofday(&repl->last_io, NULL);
return 1; return 0;
} }
/* Calls pq_read_replication_message in an endless loop, until /* Calls pq_read_replication_message in an endless loop, until
@ -1734,7 +1726,8 @@ pq_copy_both(replicationCursorObject *repl, PyObject *consume, double keepalive_
cursorObject *curs = &repl->cur; cursorObject *curs = &repl->cur;
connectionObject *conn = curs->conn; connectionObject *conn = curs->conn;
PGconn *pgconn = conn->pgconn; PGconn *pgconn = conn->pgconn;
PyObject *msg, *tmp = NULL; replicationMessageObject *msg = NULL;
PyObject *tmp = NULL;
int fd, sel, ret = -1; int fd, sel, ret = -1;
fd_set fds; fd_set fds;
struct timeval keep_intr, curr_time, ping_time, timeout; struct timeval keep_intr, curr_time, ping_time, timeout;
@ -1750,13 +1743,10 @@ pq_copy_both(replicationCursorObject *repl, PyObject *consume, double keepalive_
keep_intr.tv_usec = (keepalive_interval - keep_intr.tv_sec)*1.0e6; keep_intr.tv_usec = (keepalive_interval - keep_intr.tv_sec)*1.0e6;
while (1) { while (1) {
msg = pq_read_replication_message(repl); if (pq_read_replication_message(repl, &msg) < 0) {
if (!msg) {
goto exit; goto exit;
} }
else if (msg == Py_None) { else if (msg == NULL) {
Py_DECREF(msg);
fd = PQsocket(pgconn); fd = PQsocket(pgconn);
if (fd < 0) { if (fd < 0) {
pq_raise(conn, curs, NULL); pq_raise(conn, curs, NULL);
@ -1793,8 +1783,7 @@ pq_copy_both(replicationCursorObject *repl, PyObject *consume, double keepalive_
} }
if (sel == 0) { if (sel == 0) {
if (!pq_send_replication_feedback(repl, 0)) { if (pq_send_replication_feedback(repl, 0) < 0) {
pq_raise(conn, curs, NULL);
goto exit; goto exit;
} }
} }

View File

@ -27,8 +27,9 @@
#define PSYCOPG_PQPATH_H 1 #define PSYCOPG_PQPATH_H 1
#include "psycopg/cursor.h" #include "psycopg/cursor.h"
#include "psycopg/replication_cursor.h"
#include "psycopg/connection.h" #include "psycopg/connection.h"
#include "psycopg/replication_cursor.h"
#include "psycopg/replication_message.h"
/* macro to clean the pg result */ /* macro to clean the pg result */
#define CLEARPGRES(pgres) do { PQclear(pgres); pgres = NULL; } while (0) #define CLEARPGRES(pgres) do { PQclear(pgres); pgres = NULL; } while (0)
@ -76,7 +77,8 @@ RAISES HIDDEN void pq_complete_error(connectionObject *conn, PGresult **pgres,
/* replication protocol support */ /* replication protocol support */
HIDDEN int pq_copy_both(replicationCursorObject *repl, PyObject *consumer, HIDDEN int pq_copy_both(replicationCursorObject *repl, PyObject *consumer,
double keepalive_interval); double keepalive_interval);
HIDDEN PyObject *pq_read_replication_message(replicationCursorObject *repl); HIDDEN int pq_read_replication_message(replicationCursorObject *repl,
replicationMessageObject **msg);
HIDDEN int pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested); HIDDEN int pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested);
#endif /* !defined(PSYCOPG_PQPATH_H) */ #endif /* !defined(PSYCOPG_PQPATH_H) */

View File

@ -45,10 +45,9 @@ typedef struct replicationCursorObject {
struct timeval last_io ; /* timestamp of the last exchange with the server */ struct timeval last_io ; /* timestamp of the last exchange with the server */
struct timeval keepalive_interval; /* interval for keepalive messages in replication mode */ struct timeval keepalive_interval; /* interval for keepalive messages in replication mode */
XLogRecPtr write_lsn; /* LSN stats for replication feedback messages */ XLogRecPtr write_lsn; /* LSNs for replication feedback messages */
XLogRecPtr flush_lsn; XLogRecPtr flush_lsn;
XLogRecPtr apply_lsn; XLogRecPtr apply_lsn;
int feedback_pending; /* flag set when we couldn't send the feedback to the server */
} replicationCursorObject; } replicationCursorObject;

View File

@ -130,28 +130,21 @@ static PyObject *
psyco_repl_curs_read_message(replicationCursorObject *self) psyco_repl_curs_read_message(replicationCursorObject *self)
{ {
cursorObject *curs = &self->cur; cursorObject *curs = &self->cur;
replicationMessageObject *msg = NULL;
EXC_IF_CURS_CLOSED(curs); EXC_IF_CURS_CLOSED(curs);
EXC_IF_GREEN(read_message); EXC_IF_GREEN(read_message);
EXC_IF_TPC_PREPARED(self->cur.conn, read_message); EXC_IF_TPC_PREPARED(self->cur.conn, read_message);
EXC_IF_NOT_REPLICATING(self, read_message); EXC_IF_NOT_REPLICATING(self, read_message);
return pq_read_replication_message(self); if (pq_read_replication_message(self, &msg) < 0) {
} return NULL;
static PyObject *
repl_curs_flush_feedback(replicationCursorObject *self, int reply)
{
if (!(self->feedback_pending || reply))
Py_RETURN_TRUE;
if (pq_send_replication_feedback(self, reply)) {
self->feedback_pending = 0;
Py_RETURN_TRUE;
} else {
self->feedback_pending = 1;
Py_RETURN_FALSE;
} }
if (msg) {
return (PyObject *)msg;
}
Py_RETURN_NONE;
} }
#define psyco_repl_curs_send_feedback_doc \ #define psyco_repl_curs_send_feedback_doc \
@ -162,9 +155,7 @@ psyco_repl_curs_send_feedback(replicationCursorObject *self,
PyObject *args, PyObject *kwargs) PyObject *args, PyObject *kwargs)
{ {
cursorObject *curs = &self->cur; cursorObject *curs = &self->cur;
XLogRecPtr write_lsn = InvalidXLogRecPtr, XLogRecPtr write_lsn = 0, flush_lsn = 0, apply_lsn = 0;
flush_lsn = InvalidXLogRecPtr,
apply_lsn = InvalidXLogRecPtr;
int reply = 0; int reply = 0;
static char* kwlist[] = {"write_lsn", "flush_lsn", "apply_lsn", "reply", NULL}; static char* kwlist[] = {"write_lsn", "flush_lsn", "apply_lsn", "reply", NULL};
@ -185,31 +176,11 @@ psyco_repl_curs_send_feedback(replicationCursorObject *self,
if (apply_lsn > self->apply_lsn) if (apply_lsn > self->apply_lsn)
self->apply_lsn = apply_lsn; self->apply_lsn = apply_lsn;
self->feedback_pending = 1; if (pq_send_replication_feedback(self, reply) < 0) {
return repl_curs_flush_feedback(self, reply);
}
#define psyco_repl_curs_flush_feedback_doc \
"flush_feedback(reply=False) -- Try flushing the latest pending replication feedback message to the server and optionally request a reply."
static PyObject *
psyco_repl_curs_flush_feedback(replicationCursorObject *self,
PyObject *args, PyObject *kwargs)
{
cursorObject *curs = &self->cur;
int reply = 0;
static char *kwlist[] = {"reply", NULL};
EXC_IF_CURS_CLOSED(curs);
EXC_IF_NOT_REPLICATING(self, flush_feedback);
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|i", kwlist,
&reply)) {
return NULL; return NULL;
} }
return repl_curs_flush_feedback(self, reply); Py_RETURN_NONE;
} }
@ -260,8 +231,6 @@ static struct PyMethodDef replicationCursorObject_methods[] = {
METH_NOARGS, psyco_repl_curs_read_message_doc}, METH_NOARGS, psyco_repl_curs_read_message_doc},
{"send_feedback", (PyCFunction)psyco_repl_curs_send_feedback, {"send_feedback", (PyCFunction)psyco_repl_curs_send_feedback,
METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_send_feedback_doc}, METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_send_feedback_doc},
{"flush_feedback", (PyCFunction)psyco_repl_curs_flush_feedback,
METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_flush_feedback_doc},
{NULL} {NULL}
}; };
@ -281,10 +250,9 @@ replicationCursor_setup(replicationCursorObject* self)
self->consuming = 0; self->consuming = 0;
self->decode = 0; self->decode = 0;
self->write_lsn = InvalidXLogRecPtr; self->write_lsn = 0;
self->flush_lsn = InvalidXLogRecPtr; self->flush_lsn = 0;
self->apply_lsn = InvalidXLogRecPtr; self->apply_lsn = 0;
self->feedback_pending = 0;
return 0; return 0;
} }

View File

@ -157,7 +157,7 @@ class AsyncReplicationTest(ReplicationTestCase):
self.msg_count += 1 self.msg_count += 1
if self.msg_count > 3: if self.msg_count > 3:
cur.flush_feedback(reply=True) cur.send_feedback(reply=True)
raise StopReplication() raise StopReplication()
cur.send_feedback(flush_lsn=msg.data_start) cur.send_feedback(flush_lsn=msg.data_start)