From 61e52ce8793472ff1348ab93ccdeb682a1e7b3df Mon Sep 17 00:00:00 2001 From: Oleksandr Shulgin Date: Wed, 10 Jun 2015 09:06:08 +0200 Subject: [PATCH] Rework replication protocol This change exposes lower level functions for operating the (logical) replication protocol, while keeping the high-level start_replication function that does all the job for you in case of a synchronous connection. A number of other changes and fixes are put into this commit. --- lib/extras.py | 36 +++-- psycopg/cursor.h | 11 +- psycopg/cursor_type.c | 183 +++++++++++++++++---- psycopg/pqpath.c | 360 +++++++++++++++++++++--------------------- psycopg/pqpath.h | 3 + psycopg2.cproj | 2 + 6 files changed, 357 insertions(+), 238 deletions(-) diff --git a/lib/extras.py b/lib/extras.py index 2f32bf12..85debc68 100644 --- a/lib/extras.py +++ b/lib/extras.py @@ -471,7 +471,8 @@ class ReplicationConnection(_connection): super(ReplicationConnection, self).__init__(*args, **kwargs) # prevent auto-issued BEGIN statements - self.autocommit = True + if not self.async: + self.autocommit = True def cursor(self, *args, **kwargs): kwargs.setdefault('cursor_factory', ReplicationCursor) @@ -503,18 +504,18 @@ class ReplicationCursor(_cursor): if slot_type == REPLICATION_LOGICAL: if output_plugin is None: - raise RuntimeError("output_plugin is required for logical replication slot") + raise psycopg2.ProgrammingError("output_plugin is required for logical replication slot") command += "LOGICAL %s" % self.quote_ident(output_plugin) elif slot_type == REPLICATION_PHYSICAL: if output_plugin is not None: - raise RuntimeError("output_plugin is not applicable to physical replication") + raise psycopg2.ProgrammingError("output_plugin is not applicable to physical replication") command += "PHYSICAL" else: - raise RuntimeError("unrecognized replication slot type") + raise psycopg2.ProgrammingError("unrecognized replication slot type") self.execute(command) @@ -524,17 +525,14 @@ class ReplicationCursor(_cursor): command = "DROP_REPLICATION_SLOT %s" % self.quote_ident(slot_name) self.execute(command) - def start_replication(self, o, slot_type, slot_name=None, start_lsn=None, + def start_replication(self, slot_type, slot_name=None, writer=None, start_lsn=None, timeline=0, keepalive_interval=10, options=None): """Start and consume replication stream.""" - if keepalive_interval <= 0: - raise RuntimeError("keepalive_interval must be > 0: %d" % keepalive_interval) - command = "START_REPLICATION " if slot_type == REPLICATION_LOGICAL and slot_name is None: - raise RuntimeError("slot_name is required for logical replication slot") + raise psycopg2.ProgrammingError("slot_name is required for logical replication slot") if slot_name: command += "SLOT %s " % self.quote_ident(slot_name) @@ -544,7 +542,7 @@ class ReplicationCursor(_cursor): elif slot_type == REPLICATION_PHYSICAL: command += "PHYSICAL " else: - raise RuntimeError("unrecognized replication slot type") + raise psycopg2.ProgrammingError("unrecognized replication slot type") if start_lsn is None: start_lsn = '0/0' @@ -555,16 +553,16 @@ class ReplicationCursor(_cursor): if timeline != 0: if slot_type == REPLICATION_LOGICAL: - raise RuntimeError("cannot specify timeline for logical replication") + raise psycopg2.ProgrammingError("cannot specify timeline for logical replication") if timeline < 0: - raise RuntimeError("timeline must be >= 0: %d" % timeline) + raise psycopg2.ProgrammingError("timeline must be >= 0: %d" % timeline) command += " TIMELINE %d" % timeline if options: if slot_type == REPLICATION_PHYSICAL: - raise RuntimeError("cannot specify plugin options for physical replication") + raise psycopg2.ProgrammingError("cannot specify plugin options for physical replication") command += " (" for k,v in options.iteritems(): @@ -573,11 +571,15 @@ class ReplicationCursor(_cursor): command += "%s %s" % (self.quote_ident(k), _A(str(v))) command += ")" - return self.start_replication_expert(o, command, keepalive_interval) + return self.start_replication_expert(command, writer=writer, + keepalive_interval=keepalive_interval) - # thin wrapper - def sync_server(self, msg): - return self.replication_sync_server(msg) + def send_feedback_message(self, written_lsn=0, sync_lsn=0, apply_lsn=0, reply_requested=False): + return self.send_replication_feedback(written_lsn, sync_lsn, apply_lsn, reply_requested) + + # allows replication cursors to be used in select.select() directly + def fileno(self): + return self.connection.fileno() # a dbtype and adapter for Python UUID type diff --git a/psycopg/cursor.h b/psycopg/cursor.h index 1a630553..380abbf4 100644 --- a/psycopg/cursor.h +++ b/psycopg/cursor.h @@ -73,10 +73,13 @@ struct cursorObject { #define DEFAULT_COPYSIZE 16384 #define DEFAULT_COPYBUFF 8192 - int in_replication; /* we're in streaming replication loop */ - int stop_replication; /* client requested to stop replication */ - int keepalive_interval; /* interval for keepalive messages in replication mode */ - XLogRecPtr repl_sync_lsn; /* set when the client asks us to sync the server */ + int repl_stop; /* if client requested to stop replication */ + struct timeval repl_keepalive_interval; /* interval for keepalive messages in replication mode */ + XLogRecPtr repl_write_lsn; /* LSN stats for replication feedback messages */ + XLogRecPtr repl_flush_lsn; + XLogRecPtr repl_apply_lsn; + int repl_feedback_pending; /* flag set when we couldn't send the feedback to the server */ + struct timeval repl_last_io; /* timestamp of the last exchange with the server */ PyObject *tuple_factory; /* factory for result tuples */ PyObject *tzinfo_factory; /* factory for tzinfo objects */ diff --git a/psycopg/cursor_type.c b/psycopg/cursor_type.c index 19f82c60..9de5b085 100644 --- a/psycopg/cursor_type.c +++ b/psycopg/cursor_type.c @@ -36,9 +36,11 @@ #include "psycopg/microprotocols_proto.h" #include - #include +/* python */ +#include "datetime.h" + /** DBAPI methods **/ @@ -1581,78 +1583,182 @@ exit: } #define psyco_curs_start_replication_expert_doc \ -"start_replication_expert(file, command, keepalive_interval) -- Start and consume replication stream with direct command." +"start_replication_expert(command, writer=None, keepalive_interval=10) -- Start and consume replication stream with direct command." static PyObject * -psyco_curs_start_replication_expert(cursorObject *self, PyObject *args) +psyco_curs_start_replication_expert(cursorObject *self, PyObject *args, PyObject *kwargs) { - PyObject *file, *res = NULL; + PyObject *writer = NULL, *res = NULL; char *command; - int keepalive_interval; + double keepalive_interval = 10; + static char *kwlist[] = {"command", "writer", "keepalive_interval", NULL}; - if (!PyArg_ParseTuple(args, "O&si", - _psyco_curs_has_write_check, &file, - &command, &keepalive_interval)) { + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|Od", kwlist, + &command, &writer, &keepalive_interval)) { return NULL; } EXC_IF_CURS_CLOSED(self); - EXC_IF_CURS_ASYNC(self, start_replication_expert); EXC_IF_GREEN(start_replication_expert); EXC_IF_TPC_PREPARED(self->conn, start_replication_expert); Dprintf("psyco_curs_start_replication_expert: command = %s", command); - self->copysize = 0; - Py_INCREF(file); - self->copyfile = file; - self->in_replication = 1; - self->keepalive_interval = keepalive_interval; - self->stop_replication = 0; - self->repl_sync_lsn = InvalidXLogRecPtr; + if (keepalive_interval < 1.0) { + psyco_set_error(ProgrammingError, self, "keepalive_interval must be >= 1sec"); + return NULL; + } - if (pq_execute(self, command, 0, 1 /* no_result */, 1 /* no_begin */) >= 0) { + self->copysize = 0; + Py_XINCREF(writer); + self->copyfile = writer; + + self->repl_stop = 0; + self->repl_keepalive_interval.tv_sec = (int)keepalive_interval; + self->repl_keepalive_interval.tv_usec = + (keepalive_interval - (int)keepalive_interval)*1.0e6; + + self->repl_write_lsn = InvalidXLogRecPtr; + self->repl_flush_lsn = InvalidXLogRecPtr; + self->repl_apply_lsn = InvalidXLogRecPtr; + self->repl_feedback_pending = 0; + + gettimeofday(&self->repl_last_io, NULL); + + if (pq_execute(self, command, self->conn->async, + 1 /* no_result */, 1 /* no_begin */) >= 0) { res = Py_None; - Py_INCREF(Py_None); + Py_INCREF(res); } Py_CLEAR(self->copyfile); - self->in_replication = 0; return res; } #define psyco_curs_stop_replication_doc \ -"start_replication() -- Set flag to break out of endless loop in start_replication()." +"stop_replication() -- Set flag to break out of endless loop in start_replication() on sync connection." static PyObject * psyco_curs_stop_replication(cursorObject *self) { EXC_IF_CURS_CLOSED(self); - if (!self->in_replication) { - PyErr_SetString(ProgrammingError, - "stop_replication() called when not in streaming replication loop"); - } else { - self->stop_replication = 1; - } + self->repl_stop = 1; Py_RETURN_NONE; } -#define psyco_curs_replication_sync_server_doc \ -"replication_sync_server(msg) -- Set flag to sync the server up to this replication message." +#define psyco_curs_read_replication_message_doc \ +"read_replication_message(decode=True) -- Try reading a replication message from the server (non-blocking)." static PyObject * -psyco_curs_replication_sync_server(cursorObject *self, PyObject *args) +psyco_curs_read_replication_message(cursorObject *self, PyObject *args, PyObject *kwargs) { - EXC_IF_CURS_CLOSED(self); + int decode = 1; + static char *kwlist[] = {"decode", NULL}; - if (!PyArg_ParseTuple(args, "K", &self->repl_sync_lsn)) { + EXC_IF_CURS_CLOSED(self); + EXC_IF_GREEN(read_replication_message); + EXC_IF_TPC_PREPARED(self->conn, read_replication_message); + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|i", kwlist, + &decode)) { return NULL; } - Py_RETURN_NONE; + return pq_read_replication_message(self, decode); +} + +static PyObject * +curs_flush_replication_feedback(cursorObject *self, int reply) +{ + if (!self->repl_feedback_pending) + Py_RETURN_FALSE; + + if (pq_send_replication_feedback(self, reply)) { + self->repl_feedback_pending = 0; + Py_RETURN_TRUE; + } else { + self->repl_feedback_pending = 1; + Py_RETURN_FALSE; + } +} + +#define psyco_curs_send_replication_feedback_doc \ +"send_replication_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False) -- Try sending a replication feedback message to the server and optionally request a reply." + +static PyObject * +psyco_curs_send_replication_feedback(cursorObject *self, PyObject *args, PyObject *kwargs) +{ + XLogRecPtr write_lsn = InvalidXLogRecPtr, + flush_lsn = InvalidXLogRecPtr, + apply_lsn = InvalidXLogRecPtr; + int reply = 0; + static char* kwlist[] = {"write_lsn", "flush_lsn", "apply_lsn", "reply", NULL}; + + EXC_IF_CURS_CLOSED(self); + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|KKKi", kwlist, + &write_lsn, &flush_lsn, &apply_lsn, &reply)) { + return NULL; + } + + if (write_lsn > self->repl_write_lsn) + self->repl_write_lsn = write_lsn; + + if (flush_lsn > self->repl_flush_lsn) + self->repl_flush_lsn = flush_lsn; + + if (apply_lsn > self->repl_apply_lsn) + self->repl_apply_lsn = apply_lsn; + + self->repl_feedback_pending = 1; + + return curs_flush_replication_feedback(self, reply); +} + +#define psyco_curs_flush_replication_feedback_doc \ +"flush_replication_feedback(reply=False) -- Try flushing the latest pending replication feedback message to the server and optionally request a reply." + +static PyObject * +psyco_curs_flush_replication_feedback(cursorObject *self, PyObject *args, PyObject *kwargs) +{ + int reply = 0; + static char *kwlist[] = {"reply", NULL}; + + EXC_IF_CURS_CLOSED(self); + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|i", kwlist, + &reply)) { + return NULL; + } + + return curs_flush_replication_feedback(self, reply); +} + +#define psyco_curs_replication_io_timestamp_doc \ +"replication_io_timestamp -- the timestamp of latest IO with the server" + +static PyObject * +psyco_curs_get_replication_io_timestamp(cursorObject *self) +{ + PyObject *tval, *res = NULL; + double seconds; + + EXC_IF_CURS_CLOSED(self); + + // TODO: move to a one-call init function + PyDateTime_IMPORT; + + seconds = self->repl_last_io.tv_sec + self->repl_last_io.tv_usec / 1.0e6; + + tval = Py_BuildValue("(d)", seconds); + if (tval) { + res = PyDateTime_FromTimestamp(tval); + Py_DECREF(tval); + } + return res; } /* extension: closed - return true if cursor is closed */ @@ -1830,11 +1936,15 @@ static struct PyMethodDef cursorObject_methods[] = { {"copy_expert", (PyCFunction)psyco_curs_copy_expert, METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_expert_doc}, {"start_replication_expert", (PyCFunction)psyco_curs_start_replication_expert, - METH_VARARGS, psyco_curs_start_replication_expert_doc}, + METH_VARARGS|METH_KEYWORDS, psyco_curs_start_replication_expert_doc}, {"stop_replication", (PyCFunction)psyco_curs_stop_replication, METH_NOARGS, psyco_curs_stop_replication_doc}, - {"replication_sync_server", (PyCFunction)psyco_curs_replication_sync_server, - METH_VARARGS, psyco_curs_replication_sync_server_doc}, + {"read_replication_message", (PyCFunction)psyco_curs_read_replication_message, + METH_VARARGS|METH_KEYWORDS, psyco_curs_read_replication_message_doc}, + {"send_replication_feedback", (PyCFunction)psyco_curs_send_replication_feedback, + METH_VARARGS|METH_KEYWORDS, psyco_curs_send_replication_feedback_doc}, + {"flush_replication_feedback", (PyCFunction)psyco_curs_flush_replication_feedback, + METH_VARARGS|METH_KEYWORDS, psyco_curs_flush_replication_feedback_doc}, {NULL} }; @@ -1885,6 +1995,9 @@ static struct PyGetSetDef cursorObject_getsets[] = { (getter)psyco_curs_scrollable_get, (setter)psyco_curs_scrollable_set, psyco_curs_scrollable_doc, NULL }, + { "replication_io_timestamp", + (getter)psyco_curs_get_replication_io_timestamp, NULL, + psyco_curs_replication_io_timestamp_doc, NULL }, {NULL} }; diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index 7ce06a86..03d928cf 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -1062,6 +1062,9 @@ pq_get_last_result(connectionObject *conn) PQclear(result); } result = res; + if (PQresultStatus(result) == PGRES_COPY_BOTH) { + break; + } } return result; @@ -1522,32 +1525,151 @@ exit: return ret; } -static int -sendFeedback(PGconn *conn, XLogRecPtr written_lsn, XLogRecPtr fsync_lsn, - int replyRequested) +/* ignores keepalive messages */ +PyObject * +pq_read_replication_message(cursorObject *curs, int decode) +{ + char *buffer = NULL; + int len, hdr, reply; + XLogRecPtr data_start, wal_end; + pg_int64 send_time; + PyObject *str = NULL, *msg = NULL; + + Dprintf("pq_read_replication_message(decode=%d)", decode); + +retry: + if (!PQconsumeInput(curs->conn->pgconn)) { + goto none; + } + + Py_BEGIN_ALLOW_THREADS; + len = PQgetCopyData(curs->conn->pgconn, &buffer, 1 /* async */); + Py_END_ALLOW_THREADS; + if (len == 0) { + goto none; + } + + if (len == -2) { + pq_raise(curs->conn, curs, NULL); + goto exit; + } + if (len == -1) { + curs->pgres = PQgetResult(curs->conn->pgconn); + + if (curs->pgres && PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR) { + pq_raise(curs->conn, curs, NULL); + goto exit; + } + + CLEARPGRES(curs->pgres); + goto none; + } + + /* ok, we did really read something: update the io timestamp */ + gettimeofday(&curs->repl_last_io, NULL); + + Dprintf("pq_read_replication_message: msg=%c, len=%d", buffer[0], len); + if (buffer[0] == 'w') { + /* msgtype(1), dataStart(8), walEnd(8), sendTime(8) */ + hdr = 1 + 8 + 8 + 8; + if (len < hdr + 1) { + psyco_set_error(OperationalError, curs, "data message header too small"); + goto exit; + } + + data_start = fe_recvint64(buffer + 1); + wal_end = fe_recvint64(buffer + 1 + 8); + send_time = fe_recvint64(buffer + 1 + 8 + 8); + + Dprintf("pq_read_replication_message: data_start="XLOGFMTSTR", wal_end="XLOGFMTSTR, + XLOGFMTARGS(data_start), XLOGFMTARGS(wal_end)); + + Dprintf("pq_read_replication_message: >>%.*s<<", len - hdr, buffer + hdr); + + if (decode) { + str = PyUnicode_Decode(buffer + hdr, len - hdr, curs->conn->codec, NULL); + } else { + str = Bytes_FromStringAndSize(buffer + hdr, len - hdr); + } + if (!str) { goto exit; } + + msg = PyObject_CallFunctionObjArgs((PyObject *)&replicationMessageType, + curs, str, NULL); + Py_DECREF(str); + if (!msg) { goto exit; } + + ((replicationMessageObject *)msg)->data_start = data_start; + ((replicationMessageObject *)msg)->wal_end = wal_end; + ((replicationMessageObject *)msg)->send_time = send_time; + } + else if (buffer[0] == 'k') { + /* msgtype(1), walEnd(8), sendTime(8), reply(1) */ + hdr = 1 + 8 + 8; + if (len < hdr + 1) { + psyco_set_error(OperationalError, curs, "keepalive message header too small"); + goto exit; + } + + reply = buffer[hdr]; + if (reply) { + if (!pq_send_replication_feedback(curs, 0)) { + if (curs->conn->async) { + curs->repl_feedback_pending = 1; + } else { + pq_raise(curs->conn, curs, NULL); + goto exit; + } + } + else { + gettimeofday(&curs->repl_last_io, NULL); + } + } + + PQfreemem(buffer); + buffer = NULL; + goto retry; + } + else { + psyco_set_error(OperationalError, curs, "unrecognized replication message type"); + goto exit; + } + +exit: + if (buffer) { + PQfreemem(buffer); + } + + return msg; + +none: + msg = Py_None; + Py_INCREF(msg); + goto exit; +} + +int +pq_send_replication_feedback(cursorObject* curs, int reply_requested) { char replybuf[1 + 8 + 8 + 8 + 8 + 1]; int len = 0; - Dprintf("_pq_copy_both_v3: confirming write up to "XLOGFMTSTR", flush to "XLOGFMTSTR, - XLOGFMTARGS(written_lsn), XLOGFMTARGS(fsync_lsn)); + Dprintf("pq_send_replication_feedback: write="XLOGFMTSTR", flush="XLOGFMTSTR", apply="XLOGFMTSTR, + XLOGFMTARGS(curs->repl_write_lsn), + XLOGFMTARGS(curs->repl_flush_lsn), + XLOGFMTARGS(curs->repl_apply_lsn)); - replybuf[len] = 'r'; - len += 1; - fe_sendint64(written_lsn, &replybuf[len]); /* write */ - len += 8; - fe_sendint64(fsync_lsn, &replybuf[len]); /* flush */ - len += 8; - fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */ - len += 8; - fe_sendint64(feGetCurrentTimestamp(), &replybuf[len]); /* sendTime */ - len += 8; - replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */ - len += 1; + replybuf[len] = 'r'; len += 1; + fe_sendint64(curs->repl_write_lsn, &replybuf[len]); len += 8; + fe_sendint64(curs->repl_flush_lsn, &replybuf[len]); len += 8; + fe_sendint64(curs->repl_apply_lsn, &replybuf[len]); len += 8; + fe_sendint64(feGetCurrentTimestamp(), &replybuf[len]); len += 8; + replybuf[len] = reply_requested ? 1 : 0; len += 1; - if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn)) { + if (PQputCopyData(curs->conn->pgconn, replybuf, len) <= 0 || + PQflush(curs->conn->pgconn) != 0) { return 0; } + gettimeofday(&curs->repl_last_io, NULL); return 1; } @@ -1556,33 +1678,19 @@ sendFeedback(PGconn *conn, XLogRecPtr written_lsn, XLogRecPtr fsync_lsn, static int _pq_copy_both_v3(cursorObject *curs) { - PyObject *tmp = NULL; + PyObject *msg, *tmp = NULL; PyObject *write_func = NULL; - PyObject *obj = NULL; - replicationMessageObject *msg = NULL; int ret = -1; int is_text; - PGconn *conn; - char *buffer = NULL; + PGconn *pgconn; fd_set fds; - struct timeval last_comm, curr_time, ping_time, time_diff; - int len, hdr, reply, sel; - - XLogRecPtr written_lsn = InvalidXLogRecPtr, - fsync_lsn = InvalidXLogRecPtr, - data_start, wal_end; - pg_int64 send_time; + struct timeval curr_time, ping_time, time_diff; + int sel; if (!curs->copyfile) { - PyErr_SetString(ProgrammingError, - "can't execute START_REPLICATION: use the start_replication() method instead"); - goto exit; - } - - if (curs->keepalive_interval <= 0) { - PyErr_Format(PyExc_RuntimeError, "keepalive_interval must be > 0: %d", - curs->keepalive_interval); + psyco_set_error(ProgrammingError, curs, + "can't execute START_REPLICATION directly: use the start_replication() method instead"); goto exit; } @@ -1597,31 +1705,29 @@ _pq_copy_both_v3(cursorObject *curs) } CLEARPGRES(curs->pgres); - - /* timestamp of last communication with the server */ - gettimeofday(&last_comm, NULL); - - conn = curs->conn->pgconn; + pgconn = curs->conn->pgconn; while (1) { - len = PQgetCopyData(conn, &buffer, 1 /* async! */); - if (len < 0) { - break; + msg = pq_read_replication_message(curs, is_text); + if (!msg) { + goto exit; } - if (len == 0) { - FD_ZERO(&fds); - FD_SET(PQsocket(conn), &fds); + else if (msg == Py_None) { + Py_DECREF(msg); + + FD_ZERO(&fds); + FD_SET(PQsocket(pgconn), &fds); - /* set up timeout according to keepalive_interval, but no less than 1 second */ gettimeofday(&curr_time, NULL); - ping_time = last_comm; - ping_time.tv_sec += curs->keepalive_interval; + ping_time = curs->repl_last_io; + ping_time.tv_sec += curs->repl_keepalive_interval.tv_sec; + ping_time.tv_usec += curs->repl_keepalive_interval.tv_usec; timersub(&ping_time, &curr_time, &time_diff); if (time_diff.tv_sec > 0) { Py_BEGIN_ALLOW_THREADS; - sel = select(PQsocket(conn) + 1, &fds, NULL, NULL, &time_diff); + sel = select(PQsocket(pgconn) + 1, &fds, NULL, NULL, &time_diff); Py_END_ALLOW_THREADS; } else { @@ -1639,148 +1745,34 @@ _pq_copy_both_v3(cursorObject *curs) continue; } - if (sel > 0) { - if (!PQconsumeInput(conn)) { - Dprintf("_pq_copy_both_v3: PQconsumeInput failed"); + if (sel == 0) { + if (!pq_send_replication_feedback(curs, 0)) { pq_raise(curs->conn, curs, NULL); goto exit; } } - else { /* timeout */ - if (!sendFeedback(conn, written_lsn, fsync_lsn, 0)) { - pq_raise(curs->conn, curs, NULL); - goto exit; - } - } - gettimeofday(&last_comm, NULL); continue; } - if (len > 0 && buffer) { - gettimeofday(&last_comm, NULL); + else { + tmp = PyObject_CallFunctionObjArgs(write_func, msg, NULL); + Py_DECREF(msg); - Dprintf("_pq_copy_both_v3: msg=%c, len=%d", buffer[0], len); - if (buffer[0] == 'w') { - /* msgtype(1), dataStart(8), walEnd(8), sendTime(8) */ - hdr = 1 + 8 + 8 + 8; - if (len < hdr + 1) { - PyErr_Format(PyExc_RuntimeError, - "streaming header too small in data message: %d", len); - goto exit; - } - - data_start = fe_recvint64(buffer + 1); - wal_end = fe_recvint64(buffer + 1 + 8); - send_time = fe_recvint64(buffer + 1 + 8 + 8); - - Dprintf("_pq_copy_both_v3: data_start="XLOGFMTSTR", wal_end="XLOGFMTSTR", send_time=%lld", - XLOGFMTARGS(data_start), XLOGFMTARGS(wal_end), send_time); - - if (is_text) { - obj = PyUnicode_Decode(buffer + hdr, len - hdr, curs->conn->codec, NULL); - } - else { - obj = Bytes_FromStringAndSize(buffer + hdr, len - hdr); - } - if (!obj) { goto exit; } - - msg = (replicationMessageObject *) - PyObject_CallFunctionObjArgs((PyObject *)&replicationMessageType, - curs, obj, NULL); - Py_DECREF(obj); - if (!msg) { goto exit; } - - msg->data_start = data_start; - msg->wal_end = wal_end; - msg->send_time = send_time; - - tmp = PyObject_CallFunctionObjArgs(write_func, msg, NULL); - - if (tmp == NULL) { - Dprintf("_pq_copy_both_v3: write_func returned NULL"); - goto exit; - } - Py_DECREF(tmp); - - /* update the LSN position we've written up to */ - if (written_lsn < wal_end) - written_lsn = wal_end; - - /* if requested by sync_server(msg), we confirm LSN with the server */ - if (curs->repl_sync_lsn != InvalidXLogRecPtr) { - Dprintf("_pq_copy_both_v3: server sync requested at "XLOGFMTSTR, - XLOGFMTARGS(curs->repl_sync_lsn)); - - if (fsync_lsn < curs->repl_sync_lsn) - fsync_lsn = curs->repl_sync_lsn; - - curs->repl_sync_lsn = InvalidXLogRecPtr; - - if (!sendFeedback(conn, written_lsn, fsync_lsn, 0)) { - pq_raise(curs->conn, curs, NULL); - goto exit; - } - gettimeofday(&last_comm, NULL); - } - - if (curs->stop_replication) { - Dprintf("_pq_copy_both_v3: stop_replication flag set by write_func"); - break; - } - - Py_DECREF(msg); - msg = NULL; - } - else if (buffer[0] == 'k') { - /* msgtype(1), walEnd(8), sendTime(8), reply(1) */ - hdr = 1 + 8 + 8; - if (len < hdr + 1) { - PyErr_Format(PyExc_RuntimeError, - "streaming header too small in keepalive message: %d", len); - goto exit; - } - - reply = buffer[hdr]; - if (reply) { - if (!sendFeedback(conn, written_lsn, fsync_lsn, 0)) { - pq_raise(curs->conn, curs, NULL); - goto exit; - } - gettimeofday(&last_comm, NULL); - } - } - else { - PyErr_Format(PyExc_RuntimeError, - "unrecognized streaming message type: \"%c\"", buffer[0]); + if (tmp == NULL) { + Dprintf("_pq_copy_both_v3: write_func returned NULL"); goto exit; } + Py_DECREF(tmp); - /* buffer is allocated on every PQgetCopyData() call */ - PQfreemem(buffer); - buffer = NULL; + if (curs->repl_stop) { + Dprintf("_pq_copy_both_v3: repl_stop flag set by write_func"); + break; + } } } - if (len == -2) { - pq_raise(curs->conn, curs, NULL); - goto exit; - } - if (len == -1) { - curs->pgres = PQgetResult(curs->conn->pgconn); - - if (curs->pgres && PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR) - pq_raise(curs->conn, curs, NULL); - - CLEARPGRES(curs->pgres); - } - ret = 1; exit: - if (buffer) { - PQfreemem(buffer); - } - - Py_XDECREF(msg); Py_XDECREF(write_func); return ret; } @@ -1847,9 +1839,13 @@ pq_fetch(cursorObject *curs, int no_result) case PGRES_COPY_BOTH: Dprintf("pq_fetch: data from a streaming replication slot (no tuples)"); curs->rowcount = -1; - ex = _pq_copy_both_v3(curs); - /* error caught by out glorious notice handler */ - if (PyErr_Occurred()) ex = -1; + if (curs->conn->async) { + ex = 0; + } else { + ex = _pq_copy_both_v3(curs); + /* error caught by out glorious notice handler */ + if (PyErr_Occurred()) ex = -1; + } CLEARPGRES(curs->pgres); break; diff --git a/psycopg/pqpath.h b/psycopg/pqpath.h index bd3293f8..9a348bc2 100644 --- a/psycopg/pqpath.h +++ b/psycopg/pqpath.h @@ -72,4 +72,7 @@ HIDDEN int pq_execute_command_locked(connectionObject *conn, RAISES HIDDEN void pq_complete_error(connectionObject *conn, PGresult **pgres, char **error); +HIDDEN PyObject *pq_read_replication_message(cursorObject *curs, int decode); +HIDDEN int pq_send_replication_feedback(cursorObject *curs, int reply_requested); + #endif /* !defined(PSYCOPG_PQPATH_H) */ diff --git a/psycopg2.cproj b/psycopg2.cproj index 18b9727f..386287c1 100644 --- a/psycopg2.cproj +++ b/psycopg2.cproj @@ -92,6 +92,7 @@ + @@ -224,6 +225,7 @@ +