Rework replication protocol

This change exposes lower level functions for operating the
(logical) replication protocol, while keeping the high-level
start_replication function that does all the job for you in
case of a synchronous connection.

A number of other changes and fixes are put into this commit.
This commit is contained in:
Oleksandr Shulgin 2015-06-10 09:06:08 +02:00
parent 9ed90b1216
commit 61e52ce879
6 changed files with 357 additions and 238 deletions

View File

@ -471,6 +471,7 @@ class ReplicationConnection(_connection):
super(ReplicationConnection, self).__init__(*args, **kwargs) super(ReplicationConnection, self).__init__(*args, **kwargs)
# prevent auto-issued BEGIN statements # prevent auto-issued BEGIN statements
if not self.async:
self.autocommit = True self.autocommit = True
def cursor(self, *args, **kwargs): def cursor(self, *args, **kwargs):
@ -503,18 +504,18 @@ class ReplicationCursor(_cursor):
if slot_type == REPLICATION_LOGICAL: if slot_type == REPLICATION_LOGICAL:
if output_plugin is None: if output_plugin is None:
raise RuntimeError("output_plugin is required for logical replication slot") raise psycopg2.ProgrammingError("output_plugin is required for logical replication slot")
command += "LOGICAL %s" % self.quote_ident(output_plugin) command += "LOGICAL %s" % self.quote_ident(output_plugin)
elif slot_type == REPLICATION_PHYSICAL: elif slot_type == REPLICATION_PHYSICAL:
if output_plugin is not None: if output_plugin is not None:
raise RuntimeError("output_plugin is not applicable to physical replication") raise psycopg2.ProgrammingError("output_plugin is not applicable to physical replication")
command += "PHYSICAL" command += "PHYSICAL"
else: else:
raise RuntimeError("unrecognized replication slot type") raise psycopg2.ProgrammingError("unrecognized replication slot type")
self.execute(command) self.execute(command)
@ -524,17 +525,14 @@ class ReplicationCursor(_cursor):
command = "DROP_REPLICATION_SLOT %s" % self.quote_ident(slot_name) command = "DROP_REPLICATION_SLOT %s" % self.quote_ident(slot_name)
self.execute(command) self.execute(command)
def start_replication(self, o, slot_type, slot_name=None, start_lsn=None, def start_replication(self, slot_type, slot_name=None, writer=None, start_lsn=None,
timeline=0, keepalive_interval=10, options=None): timeline=0, keepalive_interval=10, options=None):
"""Start and consume replication stream.""" """Start and consume replication stream."""
if keepalive_interval <= 0:
raise RuntimeError("keepalive_interval must be > 0: %d" % keepalive_interval)
command = "START_REPLICATION " command = "START_REPLICATION "
if slot_type == REPLICATION_LOGICAL and slot_name is None: if slot_type == REPLICATION_LOGICAL and slot_name is None:
raise RuntimeError("slot_name is required for logical replication slot") raise psycopg2.ProgrammingError("slot_name is required for logical replication slot")
if slot_name: if slot_name:
command += "SLOT %s " % self.quote_ident(slot_name) command += "SLOT %s " % self.quote_ident(slot_name)
@ -544,7 +542,7 @@ class ReplicationCursor(_cursor):
elif slot_type == REPLICATION_PHYSICAL: elif slot_type == REPLICATION_PHYSICAL:
command += "PHYSICAL " command += "PHYSICAL "
else: else:
raise RuntimeError("unrecognized replication slot type") raise psycopg2.ProgrammingError("unrecognized replication slot type")
if start_lsn is None: if start_lsn is None:
start_lsn = '0/0' start_lsn = '0/0'
@ -555,16 +553,16 @@ class ReplicationCursor(_cursor):
if timeline != 0: if timeline != 0:
if slot_type == REPLICATION_LOGICAL: if slot_type == REPLICATION_LOGICAL:
raise RuntimeError("cannot specify timeline for logical replication") raise psycopg2.ProgrammingError("cannot specify timeline for logical replication")
if timeline < 0: if timeline < 0:
raise RuntimeError("timeline must be >= 0: %d" % timeline) raise psycopg2.ProgrammingError("timeline must be >= 0: %d" % timeline)
command += " TIMELINE %d" % timeline command += " TIMELINE %d" % timeline
if options: if options:
if slot_type == REPLICATION_PHYSICAL: if slot_type == REPLICATION_PHYSICAL:
raise RuntimeError("cannot specify plugin options for physical replication") raise psycopg2.ProgrammingError("cannot specify plugin options for physical replication")
command += " (" command += " ("
for k,v in options.iteritems(): for k,v in options.iteritems():
@ -573,11 +571,15 @@ class ReplicationCursor(_cursor):
command += "%s %s" % (self.quote_ident(k), _A(str(v))) command += "%s %s" % (self.quote_ident(k), _A(str(v)))
command += ")" command += ")"
return self.start_replication_expert(o, command, keepalive_interval) return self.start_replication_expert(command, writer=writer,
keepalive_interval=keepalive_interval)
# thin wrapper def send_feedback_message(self, written_lsn=0, sync_lsn=0, apply_lsn=0, reply_requested=False):
def sync_server(self, msg): return self.send_replication_feedback(written_lsn, sync_lsn, apply_lsn, reply_requested)
return self.replication_sync_server(msg)
# allows replication cursors to be used in select.select() directly
def fileno(self):
return self.connection.fileno()
# a dbtype and adapter for Python UUID type # a dbtype and adapter for Python UUID type

View File

@ -73,10 +73,13 @@ struct cursorObject {
#define DEFAULT_COPYSIZE 16384 #define DEFAULT_COPYSIZE 16384
#define DEFAULT_COPYBUFF 8192 #define DEFAULT_COPYBUFF 8192
int in_replication; /* we're in streaming replication loop */ int repl_stop; /* if client requested to stop replication */
int stop_replication; /* client requested to stop replication */ struct timeval repl_keepalive_interval; /* interval for keepalive messages in replication mode */
int keepalive_interval; /* interval for keepalive messages in replication mode */ XLogRecPtr repl_write_lsn; /* LSN stats for replication feedback messages */
XLogRecPtr repl_sync_lsn; /* set when the client asks us to sync the server */ XLogRecPtr repl_flush_lsn;
XLogRecPtr repl_apply_lsn;
int repl_feedback_pending; /* flag set when we couldn't send the feedback to the server */
struct timeval repl_last_io; /* timestamp of the last exchange with the server */
PyObject *tuple_factory; /* factory for result tuples */ PyObject *tuple_factory; /* factory for result tuples */
PyObject *tzinfo_factory; /* factory for tzinfo objects */ PyObject *tzinfo_factory; /* factory for tzinfo objects */

View File

@ -36,9 +36,11 @@
#include "psycopg/microprotocols_proto.h" #include "psycopg/microprotocols_proto.h"
#include <string.h> #include <string.h>
#include <stdlib.h> #include <stdlib.h>
/* python */
#include "datetime.h"
/** DBAPI methods **/ /** DBAPI methods **/
@ -1581,78 +1583,182 @@ exit:
} }
#define psyco_curs_start_replication_expert_doc \ #define psyco_curs_start_replication_expert_doc \
"start_replication_expert(file, command, keepalive_interval) -- Start and consume replication stream with direct command." "start_replication_expert(command, writer=None, keepalive_interval=10) -- Start and consume replication stream with direct command."
static PyObject * static PyObject *
psyco_curs_start_replication_expert(cursorObject *self, PyObject *args) psyco_curs_start_replication_expert(cursorObject *self, PyObject *args, PyObject *kwargs)
{ {
PyObject *file, *res = NULL; PyObject *writer = NULL, *res = NULL;
char *command; char *command;
int keepalive_interval; double keepalive_interval = 10;
static char *kwlist[] = {"command", "writer", "keepalive_interval", NULL};
if (!PyArg_ParseTuple(args, "O&si", if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|Od", kwlist,
_psyco_curs_has_write_check, &file, &command, &writer, &keepalive_interval)) {
&command, &keepalive_interval)) {
return NULL; return NULL;
} }
EXC_IF_CURS_CLOSED(self); EXC_IF_CURS_CLOSED(self);
EXC_IF_CURS_ASYNC(self, start_replication_expert);
EXC_IF_GREEN(start_replication_expert); EXC_IF_GREEN(start_replication_expert);
EXC_IF_TPC_PREPARED(self->conn, start_replication_expert); EXC_IF_TPC_PREPARED(self->conn, start_replication_expert);
Dprintf("psyco_curs_start_replication_expert: command = %s", command); Dprintf("psyco_curs_start_replication_expert: command = %s", command);
self->copysize = 0; if (keepalive_interval < 1.0) {
Py_INCREF(file); psyco_set_error(ProgrammingError, self, "keepalive_interval must be >= 1sec");
self->copyfile = file; return NULL;
self->in_replication = 1; }
self->keepalive_interval = keepalive_interval;
self->stop_replication = 0;
self->repl_sync_lsn = InvalidXLogRecPtr;
if (pq_execute(self, command, 0, 1 /* no_result */, 1 /* no_begin */) >= 0) { self->copysize = 0;
Py_XINCREF(writer);
self->copyfile = writer;
self->repl_stop = 0;
self->repl_keepalive_interval.tv_sec = (int)keepalive_interval;
self->repl_keepalive_interval.tv_usec =
(keepalive_interval - (int)keepalive_interval)*1.0e6;
self->repl_write_lsn = InvalidXLogRecPtr;
self->repl_flush_lsn = InvalidXLogRecPtr;
self->repl_apply_lsn = InvalidXLogRecPtr;
self->repl_feedback_pending = 0;
gettimeofday(&self->repl_last_io, NULL);
if (pq_execute(self, command, self->conn->async,
1 /* no_result */, 1 /* no_begin */) >= 0) {
res = Py_None; res = Py_None;
Py_INCREF(Py_None); Py_INCREF(res);
} }
Py_CLEAR(self->copyfile); Py_CLEAR(self->copyfile);
self->in_replication = 0;
return res; return res;
} }
#define psyco_curs_stop_replication_doc \ #define psyco_curs_stop_replication_doc \
"start_replication() -- Set flag to break out of endless loop in start_replication()." "stop_replication() -- Set flag to break out of endless loop in start_replication() on sync connection."
static PyObject * static PyObject *
psyco_curs_stop_replication(cursorObject *self) psyco_curs_stop_replication(cursorObject *self)
{ {
EXC_IF_CURS_CLOSED(self); EXC_IF_CURS_CLOSED(self);
if (!self->in_replication) { self->repl_stop = 1;
PyErr_SetString(ProgrammingError,
"stop_replication() called when not in streaming replication loop");
} else {
self->stop_replication = 1;
}
Py_RETURN_NONE; Py_RETURN_NONE;
} }
#define psyco_curs_replication_sync_server_doc \ #define psyco_curs_read_replication_message_doc \
"replication_sync_server(msg) -- Set flag to sync the server up to this replication message." "read_replication_message(decode=True) -- Try reading a replication message from the server (non-blocking)."
static PyObject * static PyObject *
psyco_curs_replication_sync_server(cursorObject *self, PyObject *args) psyco_curs_read_replication_message(cursorObject *self, PyObject *args, PyObject *kwargs)
{ {
EXC_IF_CURS_CLOSED(self); int decode = 1;
static char *kwlist[] = {"decode", NULL};
if (!PyArg_ParseTuple(args, "K", &self->repl_sync_lsn)) { EXC_IF_CURS_CLOSED(self);
EXC_IF_GREEN(read_replication_message);
EXC_IF_TPC_PREPARED(self->conn, read_replication_message);
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|i", kwlist,
&decode)) {
return NULL; return NULL;
} }
Py_RETURN_NONE; return pq_read_replication_message(self, decode);
}
static PyObject *
curs_flush_replication_feedback(cursorObject *self, int reply)
{
if (!self->repl_feedback_pending)
Py_RETURN_FALSE;
if (pq_send_replication_feedback(self, reply)) {
self->repl_feedback_pending = 0;
Py_RETURN_TRUE;
} else {
self->repl_feedback_pending = 1;
Py_RETURN_FALSE;
}
}
#define psyco_curs_send_replication_feedback_doc \
"send_replication_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False) -- Try sending a replication feedback message to the server and optionally request a reply."
static PyObject *
psyco_curs_send_replication_feedback(cursorObject *self, PyObject *args, PyObject *kwargs)
{
XLogRecPtr write_lsn = InvalidXLogRecPtr,
flush_lsn = InvalidXLogRecPtr,
apply_lsn = InvalidXLogRecPtr;
int reply = 0;
static char* kwlist[] = {"write_lsn", "flush_lsn", "apply_lsn", "reply", NULL};
EXC_IF_CURS_CLOSED(self);
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|KKKi", kwlist,
&write_lsn, &flush_lsn, &apply_lsn, &reply)) {
return NULL;
}
if (write_lsn > self->repl_write_lsn)
self->repl_write_lsn = write_lsn;
if (flush_lsn > self->repl_flush_lsn)
self->repl_flush_lsn = flush_lsn;
if (apply_lsn > self->repl_apply_lsn)
self->repl_apply_lsn = apply_lsn;
self->repl_feedback_pending = 1;
return curs_flush_replication_feedback(self, reply);
}
#define psyco_curs_flush_replication_feedback_doc \
"flush_replication_feedback(reply=False) -- Try flushing the latest pending replication feedback message to the server and optionally request a reply."
static PyObject *
psyco_curs_flush_replication_feedback(cursorObject *self, PyObject *args, PyObject *kwargs)
{
int reply = 0;
static char *kwlist[] = {"reply", NULL};
EXC_IF_CURS_CLOSED(self);
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|i", kwlist,
&reply)) {
return NULL;
}
return curs_flush_replication_feedback(self, reply);
}
#define psyco_curs_replication_io_timestamp_doc \
"replication_io_timestamp -- the timestamp of latest IO with the server"
static PyObject *
psyco_curs_get_replication_io_timestamp(cursorObject *self)
{
PyObject *tval, *res = NULL;
double seconds;
EXC_IF_CURS_CLOSED(self);
// TODO: move to a one-call init function
PyDateTime_IMPORT;
seconds = self->repl_last_io.tv_sec + self->repl_last_io.tv_usec / 1.0e6;
tval = Py_BuildValue("(d)", seconds);
if (tval) {
res = PyDateTime_FromTimestamp(tval);
Py_DECREF(tval);
}
return res;
} }
/* extension: closed - return true if cursor is closed */ /* extension: closed - return true if cursor is closed */
@ -1830,11 +1936,15 @@ static struct PyMethodDef cursorObject_methods[] = {
{"copy_expert", (PyCFunction)psyco_curs_copy_expert, {"copy_expert", (PyCFunction)psyco_curs_copy_expert,
METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_expert_doc}, METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_expert_doc},
{"start_replication_expert", (PyCFunction)psyco_curs_start_replication_expert, {"start_replication_expert", (PyCFunction)psyco_curs_start_replication_expert,
METH_VARARGS, psyco_curs_start_replication_expert_doc}, METH_VARARGS|METH_KEYWORDS, psyco_curs_start_replication_expert_doc},
{"stop_replication", (PyCFunction)psyco_curs_stop_replication, {"stop_replication", (PyCFunction)psyco_curs_stop_replication,
METH_NOARGS, psyco_curs_stop_replication_doc}, METH_NOARGS, psyco_curs_stop_replication_doc},
{"replication_sync_server", (PyCFunction)psyco_curs_replication_sync_server, {"read_replication_message", (PyCFunction)psyco_curs_read_replication_message,
METH_VARARGS, psyco_curs_replication_sync_server_doc}, METH_VARARGS|METH_KEYWORDS, psyco_curs_read_replication_message_doc},
{"send_replication_feedback", (PyCFunction)psyco_curs_send_replication_feedback,
METH_VARARGS|METH_KEYWORDS, psyco_curs_send_replication_feedback_doc},
{"flush_replication_feedback", (PyCFunction)psyco_curs_flush_replication_feedback,
METH_VARARGS|METH_KEYWORDS, psyco_curs_flush_replication_feedback_doc},
{NULL} {NULL}
}; };
@ -1885,6 +1995,9 @@ static struct PyGetSetDef cursorObject_getsets[] = {
(getter)psyco_curs_scrollable_get, (getter)psyco_curs_scrollable_get,
(setter)psyco_curs_scrollable_set, (setter)psyco_curs_scrollable_set,
psyco_curs_scrollable_doc, NULL }, psyco_curs_scrollable_doc, NULL },
{ "replication_io_timestamp",
(getter)psyco_curs_get_replication_io_timestamp, NULL,
psyco_curs_replication_io_timestamp_doc, NULL },
{NULL} {NULL}
}; };

View File

@ -1062,6 +1062,9 @@ pq_get_last_result(connectionObject *conn)
PQclear(result); PQclear(result);
} }
result = res; result = res;
if (PQresultStatus(result) == PGRES_COPY_BOTH) {
break;
}
} }
return result; return result;
@ -1522,32 +1525,151 @@ exit:
return ret; return ret;
} }
static int /* ignores keepalive messages */
sendFeedback(PGconn *conn, XLogRecPtr written_lsn, XLogRecPtr fsync_lsn, PyObject *
int replyRequested) pq_read_replication_message(cursorObject *curs, int decode)
{
char *buffer = NULL;
int len, hdr, reply;
XLogRecPtr data_start, wal_end;
pg_int64 send_time;
PyObject *str = NULL, *msg = NULL;
Dprintf("pq_read_replication_message(decode=%d)", decode);
retry:
if (!PQconsumeInput(curs->conn->pgconn)) {
goto none;
}
Py_BEGIN_ALLOW_THREADS;
len = PQgetCopyData(curs->conn->pgconn, &buffer, 1 /* async */);
Py_END_ALLOW_THREADS;
if (len == 0) {
goto none;
}
if (len == -2) {
pq_raise(curs->conn, curs, NULL);
goto exit;
}
if (len == -1) {
curs->pgres = PQgetResult(curs->conn->pgconn);
if (curs->pgres && PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR) {
pq_raise(curs->conn, curs, NULL);
goto exit;
}
CLEARPGRES(curs->pgres);
goto none;
}
/* ok, we did really read something: update the io timestamp */
gettimeofday(&curs->repl_last_io, NULL);
Dprintf("pq_read_replication_message: msg=%c, len=%d", buffer[0], len);
if (buffer[0] == 'w') {
/* msgtype(1), dataStart(8), walEnd(8), sendTime(8) */
hdr = 1 + 8 + 8 + 8;
if (len < hdr + 1) {
psyco_set_error(OperationalError, curs, "data message header too small");
goto exit;
}
data_start = fe_recvint64(buffer + 1);
wal_end = fe_recvint64(buffer + 1 + 8);
send_time = fe_recvint64(buffer + 1 + 8 + 8);
Dprintf("pq_read_replication_message: data_start="XLOGFMTSTR", wal_end="XLOGFMTSTR,
XLOGFMTARGS(data_start), XLOGFMTARGS(wal_end));
Dprintf("pq_read_replication_message: >>%.*s<<", len - hdr, buffer + hdr);
if (decode) {
str = PyUnicode_Decode(buffer + hdr, len - hdr, curs->conn->codec, NULL);
} else {
str = Bytes_FromStringAndSize(buffer + hdr, len - hdr);
}
if (!str) { goto exit; }
msg = PyObject_CallFunctionObjArgs((PyObject *)&replicationMessageType,
curs, str, NULL);
Py_DECREF(str);
if (!msg) { goto exit; }
((replicationMessageObject *)msg)->data_start = data_start;
((replicationMessageObject *)msg)->wal_end = wal_end;
((replicationMessageObject *)msg)->send_time = send_time;
}
else if (buffer[0] == 'k') {
/* msgtype(1), walEnd(8), sendTime(8), reply(1) */
hdr = 1 + 8 + 8;
if (len < hdr + 1) {
psyco_set_error(OperationalError, curs, "keepalive message header too small");
goto exit;
}
reply = buffer[hdr];
if (reply) {
if (!pq_send_replication_feedback(curs, 0)) {
if (curs->conn->async) {
curs->repl_feedback_pending = 1;
} else {
pq_raise(curs->conn, curs, NULL);
goto exit;
}
}
else {
gettimeofday(&curs->repl_last_io, NULL);
}
}
PQfreemem(buffer);
buffer = NULL;
goto retry;
}
else {
psyco_set_error(OperationalError, curs, "unrecognized replication message type");
goto exit;
}
exit:
if (buffer) {
PQfreemem(buffer);
}
return msg;
none:
msg = Py_None;
Py_INCREF(msg);
goto exit;
}
int
pq_send_replication_feedback(cursorObject* curs, int reply_requested)
{ {
char replybuf[1 + 8 + 8 + 8 + 8 + 1]; char replybuf[1 + 8 + 8 + 8 + 8 + 1];
int len = 0; int len = 0;
Dprintf("_pq_copy_both_v3: confirming write up to "XLOGFMTSTR", flush to "XLOGFMTSTR, Dprintf("pq_send_replication_feedback: write="XLOGFMTSTR", flush="XLOGFMTSTR", apply="XLOGFMTSTR,
XLOGFMTARGS(written_lsn), XLOGFMTARGS(fsync_lsn)); XLOGFMTARGS(curs->repl_write_lsn),
XLOGFMTARGS(curs->repl_flush_lsn),
XLOGFMTARGS(curs->repl_apply_lsn));
replybuf[len] = 'r'; replybuf[len] = 'r'; len += 1;
len += 1; fe_sendint64(curs->repl_write_lsn, &replybuf[len]); len += 8;
fe_sendint64(written_lsn, &replybuf[len]); /* write */ fe_sendint64(curs->repl_flush_lsn, &replybuf[len]); len += 8;
len += 8; fe_sendint64(curs->repl_apply_lsn, &replybuf[len]); len += 8;
fe_sendint64(fsync_lsn, &replybuf[len]); /* flush */ fe_sendint64(feGetCurrentTimestamp(), &replybuf[len]); len += 8;
len += 8; replybuf[len] = reply_requested ? 1 : 0; len += 1;
fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
len += 8;
fe_sendint64(feGetCurrentTimestamp(), &replybuf[len]); /* sendTime */
len += 8;
replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
len += 1;
if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn)) { if (PQputCopyData(curs->conn->pgconn, replybuf, len) <= 0 ||
PQflush(curs->conn->pgconn) != 0) {
return 0; return 0;
} }
gettimeofday(&curs->repl_last_io, NULL);
return 1; return 1;
} }
@ -1556,33 +1678,19 @@ sendFeedback(PGconn *conn, XLogRecPtr written_lsn, XLogRecPtr fsync_lsn,
static int static int
_pq_copy_both_v3(cursorObject *curs) _pq_copy_both_v3(cursorObject *curs)
{ {
PyObject *tmp = NULL; PyObject *msg, *tmp = NULL;
PyObject *write_func = NULL; PyObject *write_func = NULL;
PyObject *obj = NULL;
replicationMessageObject *msg = NULL;
int ret = -1; int ret = -1;
int is_text; int is_text;
PGconn *conn; PGconn *pgconn;
char *buffer = NULL;
fd_set fds; fd_set fds;
struct timeval last_comm, curr_time, ping_time, time_diff; struct timeval curr_time, ping_time, time_diff;
int len, hdr, reply, sel; int sel;
XLogRecPtr written_lsn = InvalidXLogRecPtr,
fsync_lsn = InvalidXLogRecPtr,
data_start, wal_end;
pg_int64 send_time;
if (!curs->copyfile) { if (!curs->copyfile) {
PyErr_SetString(ProgrammingError, psyco_set_error(ProgrammingError, curs,
"can't execute START_REPLICATION: use the start_replication() method instead"); "can't execute START_REPLICATION directly: use the start_replication() method instead");
goto exit;
}
if (curs->keepalive_interval <= 0) {
PyErr_Format(PyExc_RuntimeError, "keepalive_interval must be > 0: %d",
curs->keepalive_interval);
goto exit; goto exit;
} }
@ -1597,31 +1705,29 @@ _pq_copy_both_v3(cursorObject *curs)
} }
CLEARPGRES(curs->pgres); CLEARPGRES(curs->pgres);
pgconn = curs->conn->pgconn;
/* timestamp of last communication with the server */
gettimeofday(&last_comm, NULL);
conn = curs->conn->pgconn;
while (1) { while (1) {
len = PQgetCopyData(conn, &buffer, 1 /* async! */); msg = pq_read_replication_message(curs, is_text);
if (len < 0) { if (!msg) {
break; goto exit;
} }
if (len == 0) { else if (msg == Py_None) {
FD_ZERO(&fds); Py_DECREF(msg);
FD_SET(PQsocket(conn), &fds);
FD_ZERO(&fds);
FD_SET(PQsocket(pgconn), &fds);
/* set up timeout according to keepalive_interval, but no less than 1 second */
gettimeofday(&curr_time, NULL); gettimeofday(&curr_time, NULL);
ping_time = last_comm; ping_time = curs->repl_last_io;
ping_time.tv_sec += curs->keepalive_interval; ping_time.tv_sec += curs->repl_keepalive_interval.tv_sec;
ping_time.tv_usec += curs->repl_keepalive_interval.tv_usec;
timersub(&ping_time, &curr_time, &time_diff); timersub(&ping_time, &curr_time, &time_diff);
if (time_diff.tv_sec > 0) { if (time_diff.tv_sec > 0) {
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
sel = select(PQsocket(conn) + 1, &fds, NULL, NULL, &time_diff); sel = select(PQsocket(pgconn) + 1, &fds, NULL, NULL, &time_diff);
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
} }
else { else {
@ -1639,61 +1745,17 @@ _pq_copy_both_v3(cursorObject *curs)
continue; continue;
} }
if (sel > 0) { if (sel == 0) {
if (!PQconsumeInput(conn)) { if (!pq_send_replication_feedback(curs, 0)) {
Dprintf("_pq_copy_both_v3: PQconsumeInput failed");
pq_raise(curs->conn, curs, NULL); pq_raise(curs->conn, curs, NULL);
goto exit; goto exit;
} }
} }
else { /* timeout */
if (!sendFeedback(conn, written_lsn, fsync_lsn, 0)) {
pq_raise(curs->conn, curs, NULL);
goto exit;
}
}
gettimeofday(&last_comm, NULL);
continue; continue;
} }
if (len > 0 && buffer) {
gettimeofday(&last_comm, NULL);
Dprintf("_pq_copy_both_v3: msg=%c, len=%d", buffer[0], len);
if (buffer[0] == 'w') {
/* msgtype(1), dataStart(8), walEnd(8), sendTime(8) */
hdr = 1 + 8 + 8 + 8;
if (len < hdr + 1) {
PyErr_Format(PyExc_RuntimeError,
"streaming header too small in data message: %d", len);
goto exit;
}
data_start = fe_recvint64(buffer + 1);
wal_end = fe_recvint64(buffer + 1 + 8);
send_time = fe_recvint64(buffer + 1 + 8 + 8);
Dprintf("_pq_copy_both_v3: data_start="XLOGFMTSTR", wal_end="XLOGFMTSTR", send_time=%lld",
XLOGFMTARGS(data_start), XLOGFMTARGS(wal_end), send_time);
if (is_text) {
obj = PyUnicode_Decode(buffer + hdr, len - hdr, curs->conn->codec, NULL);
}
else { else {
obj = Bytes_FromStringAndSize(buffer + hdr, len - hdr);
}
if (!obj) { goto exit; }
msg = (replicationMessageObject *)
PyObject_CallFunctionObjArgs((PyObject *)&replicationMessageType,
curs, obj, NULL);
Py_DECREF(obj);
if (!msg) { goto exit; }
msg->data_start = data_start;
msg->wal_end = wal_end;
msg->send_time = send_time;
tmp = PyObject_CallFunctionObjArgs(write_func, msg, NULL); tmp = PyObject_CallFunctionObjArgs(write_func, msg, NULL);
Py_DECREF(msg);
if (tmp == NULL) { if (tmp == NULL) {
Dprintf("_pq_copy_both_v3: write_func returned NULL"); Dprintf("_pq_copy_both_v3: write_func returned NULL");
@ -1701,86 +1763,16 @@ _pq_copy_both_v3(cursorObject *curs)
} }
Py_DECREF(tmp); Py_DECREF(tmp);
/* update the LSN position we've written up to */ if (curs->repl_stop) {
if (written_lsn < wal_end) Dprintf("_pq_copy_both_v3: repl_stop flag set by write_func");
written_lsn = wal_end;
/* if requested by sync_server(msg), we confirm LSN with the server */
if (curs->repl_sync_lsn != InvalidXLogRecPtr) {
Dprintf("_pq_copy_both_v3: server sync requested at "XLOGFMTSTR,
XLOGFMTARGS(curs->repl_sync_lsn));
if (fsync_lsn < curs->repl_sync_lsn)
fsync_lsn = curs->repl_sync_lsn;
curs->repl_sync_lsn = InvalidXLogRecPtr;
if (!sendFeedback(conn, written_lsn, fsync_lsn, 0)) {
pq_raise(curs->conn, curs, NULL);
goto exit;
}
gettimeofday(&last_comm, NULL);
}
if (curs->stop_replication) {
Dprintf("_pq_copy_both_v3: stop_replication flag set by write_func");
break; break;
} }
Py_DECREF(msg);
msg = NULL;
} }
else if (buffer[0] == 'k') {
/* msgtype(1), walEnd(8), sendTime(8), reply(1) */
hdr = 1 + 8 + 8;
if (len < hdr + 1) {
PyErr_Format(PyExc_RuntimeError,
"streaming header too small in keepalive message: %d", len);
goto exit;
}
reply = buffer[hdr];
if (reply) {
if (!sendFeedback(conn, written_lsn, fsync_lsn, 0)) {
pq_raise(curs->conn, curs, NULL);
goto exit;
}
gettimeofday(&last_comm, NULL);
}
}
else {
PyErr_Format(PyExc_RuntimeError,
"unrecognized streaming message type: \"%c\"", buffer[0]);
goto exit;
}
/* buffer is allocated on every PQgetCopyData() call */
PQfreemem(buffer);
buffer = NULL;
}
}
if (len == -2) {
pq_raise(curs->conn, curs, NULL);
goto exit;
}
if (len == -1) {
curs->pgres = PQgetResult(curs->conn->pgconn);
if (curs->pgres && PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR)
pq_raise(curs->conn, curs, NULL);
CLEARPGRES(curs->pgres);
} }
ret = 1; ret = 1;
exit: exit:
if (buffer) {
PQfreemem(buffer);
}
Py_XDECREF(msg);
Py_XDECREF(write_func); Py_XDECREF(write_func);
return ret; return ret;
} }
@ -1847,9 +1839,13 @@ pq_fetch(cursorObject *curs, int no_result)
case PGRES_COPY_BOTH: case PGRES_COPY_BOTH:
Dprintf("pq_fetch: data from a streaming replication slot (no tuples)"); Dprintf("pq_fetch: data from a streaming replication slot (no tuples)");
curs->rowcount = -1; curs->rowcount = -1;
if (curs->conn->async) {
ex = 0;
} else {
ex = _pq_copy_both_v3(curs); ex = _pq_copy_both_v3(curs);
/* error caught by out glorious notice handler */ /* error caught by out glorious notice handler */
if (PyErr_Occurred()) ex = -1; if (PyErr_Occurred()) ex = -1;
}
CLEARPGRES(curs->pgres); CLEARPGRES(curs->pgres);
break; break;

View File

@ -72,4 +72,7 @@ HIDDEN int pq_execute_command_locked(connectionObject *conn,
RAISES HIDDEN void pq_complete_error(connectionObject *conn, PGresult **pgres, RAISES HIDDEN void pq_complete_error(connectionObject *conn, PGresult **pgres,
char **error); char **error);
HIDDEN PyObject *pq_read_replication_message(cursorObject *curs, int decode);
HIDDEN int pq_send_replication_feedback(cursorObject *curs, int reply_requested);
#endif /* !defined(PSYCOPG_PQPATH_H) */ #endif /* !defined(PSYCOPG_PQPATH_H) */

View File

@ -92,6 +92,7 @@
<None Include="psycopg\pqpath.h" /> <None Include="psycopg\pqpath.h" />
<None Include="psycopg\psycopg.h" /> <None Include="psycopg\psycopg.h" />
<None Include="psycopg\python.h" /> <None Include="psycopg\python.h" />
<None Include="psycopg\replication_message.h" />
<None Include="psycopg\typecast.h" /> <None Include="psycopg\typecast.h" />
<None Include="psycopg\typecast_binary.h" /> <None Include="psycopg\typecast_binary.h" />
<None Include="psycopg\win32_support.h" /> <None Include="psycopg\win32_support.h" />
@ -224,6 +225,7 @@
<Compile Include="psycopg\microprotocols_proto.c" /> <Compile Include="psycopg\microprotocols_proto.c" />
<Compile Include="psycopg\pqpath.c" /> <Compile Include="psycopg\pqpath.c" />
<Compile Include="psycopg\psycopgmodule.c" /> <Compile Include="psycopg\psycopgmodule.c" />
<Compile Include="psycopg\replication_message_type.c" />
<Compile Include="psycopg\typecast.c" /> <Compile Include="psycopg\typecast.c" />
<Compile Include="psycopg\typecast_array.c" /> <Compile Include="psycopg\typecast_array.c" />
<Compile Include="psycopg\typecast_basic.c" /> <Compile Include="psycopg\typecast_basic.c" />