diff --git a/lib/extras.py b/lib/extras.py index 92dd8192..8118e134 100644 --- a/lib/extras.py +++ b/lib/extras.py @@ -574,6 +574,10 @@ class ReplicationCursor(_cursor): return self.start_replication_expert(o, command, keepalive_interval) + # thin wrapper + def sync_server(self, msg): + return self.replication_sync_server(msg) + # a dbtype and adapter for Python UUID type diff --git a/psycopg/cursor.h b/psycopg/cursor.h index 93b697b2..78ee21c4 100644 --- a/psycopg/cursor.h +++ b/psycopg/cursor.h @@ -72,7 +72,10 @@ struct cursorObject { #define DEFAULT_COPYSIZE 16384 #define DEFAULT_COPYBUFF 8192 + int in_replication; /* we're in streaming replication loop */ + int stop_replication; /* client requested to stop replication */ int keepalive_interval; /* interval for keepalive messages in replication mode */ + replicationMessageObject *repl_sync_msg; /* set when the client asks us to sync the server */ PyObject *tuple_factory; /* factory for result tuples */ PyObject *tzinfo_factory; /* factory for tzinfo objects */ diff --git a/psycopg/cursor_type.c b/psycopg/cursor_type.c index 954e764d..1ea922bb 100644 --- a/psycopg/cursor_type.c +++ b/psycopg/cursor_type.c @@ -28,6 +28,7 @@ #include "psycopg/cursor.h" #include "psycopg/connection.h" +#include "psycopg/replication_message.h" #include "psycopg/green.h" #include "psycopg/pqpath.h" #include "psycopg/typecast.h" @@ -1605,17 +1606,68 @@ psyco_curs_start_replication_expert(cursorObject *self, PyObject *args) self->copysize = 0; Py_INCREF(file); self->copyfile = file; + self->in_replication = 1; self->keepalive_interval = keepalive_interval; + self->stop_replication = 0; + self->repl_sync_msg = NULL; if (pq_execute(self, command, 0, 1 /* no_result */, 1 /* no_begin */) >= 0) { res = Py_None; Py_INCREF(Py_None); } + + Py_CLEAR(self->repl_sync_msg); Py_CLEAR(self->copyfile); + self->in_replication = 0; return res; } +#define psyco_curs_stop_replication_doc \ +"start_replication() -- Set flag to break out of endless loop in start_replication()." + +static PyObject * +psyco_curs_stop_replication(cursorObject *self) +{ + EXC_IF_CURS_CLOSED(self); + + if (!self->in_replication) { + PyErr_SetString(ProgrammingError, + "stop_replication() called when not in streaming replication loop"); + } else { + self->stop_replication = 1; + } + + Py_RETURN_NONE; +} + +#define psyco_curs_replication_sync_server_doc \ +"replication_sync_server(msg) -- Set flag to sync the server up to this replication message." + +static PyObject * +psyco_curs_replication_sync_server(cursorObject *self, PyObject *args) +{ + replicationMessageObject *msg; + + EXC_IF_CURS_CLOSED(self); + + if (!PyArg_ParseTuple(args, "O!", &replicationMessageType, &msg)) { + return NULL; + } + + if (!self->in_replication) { + PyErr_SetString(ProgrammingError, + "replication_sync_server() called when not in streaming replication loop"); + } else { + Py_CLEAR(self->repl_sync_msg); + + self->repl_sync_msg = msg; + Py_XINCREF(self->repl_sync_msg); + } + + Py_RETURN_NONE; +} + /* extension: closed - return true if cursor is closed */ #define psyco_curs_closed_doc \ @@ -1792,6 +1844,10 @@ static struct PyMethodDef cursorObject_methods[] = { METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_expert_doc}, {"start_replication_expert", (PyCFunction)psyco_curs_start_replication_expert, METH_VARARGS, psyco_curs_start_replication_expert_doc}, + {"stop_replication", (PyCFunction)psyco_curs_stop_replication, + METH_NOARGS, psyco_curs_stop_replication_doc}, + {"replication_sync_server", (PyCFunction)psyco_curs_replication_sync_server, + METH_VARARGS, psyco_curs_replication_sync_server_doc}, {NULL} }; @@ -1908,6 +1964,7 @@ cursor_clear(cursorObject *self) Py_CLEAR(self->casts); Py_CLEAR(self->caster); Py_CLEAR(self->copyfile); + Py_CLEAR(self->repl_sync_msg); Py_CLEAR(self->tuple_factory); Py_CLEAR(self->tzinfo_factory); Py_CLEAR(self->query); @@ -1997,6 +2054,7 @@ cursor_traverse(cursorObject *self, visitproc visit, void *arg) Py_VISIT(self->casts); Py_VISIT(self->caster); Py_VISIT(self->copyfile); + Py_VISIT(self->repl_sync_msg); Py_VISIT(self->tuple_factory); Py_VISIT(self->tzinfo_factory); Py_VISIT(self->query); diff --git a/psycopg/libpq_support.h b/psycopg/libpq_support.h index 007f5e18..e597d24c 100644 --- a/psycopg/libpq_support.h +++ b/psycopg/libpq_support.h @@ -33,6 +33,10 @@ typedef unsigned PG_INT64_TYPE XLogRecPtr; #define InvalidXLogRecPtr ((XLogRecPtr) 0) +/* have to use lowercase %x, as PyString_FromFormat can't do %X */ +#define XLOGFMTSTR "%x/%x" +#define XLOGFMTARGS(x) ((uint32)((x) >> 32)), ((uint32)((x) & 0xFFFFFFFF)) + HIDDEN pg_int64 feGetCurrentTimestamp(void); HIDDEN void fe_sendint64(pg_int64 i, char *buf); HIDDEN pg_int64 fe_recvint64(char *buf); diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index e87befae..4ae62971 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -35,6 +35,7 @@ #include "psycopg/pqpath.h" #include "psycopg/connection.h" #include "psycopg/cursor.h" +#include "psycopg/replication_message.h" #include "psycopg/green.h" #include "psycopg/typecast.h" #include "psycopg/pgtypes.h" @@ -1528,9 +1529,8 @@ sendFeedback(PGconn *conn, XLogRecPtr written_lsn, XLogRecPtr fsync_lsn, char replybuf[1 + 8 + 8 + 8 + 8 + 1]; int len = 0; - Dprintf("_pq_copy_both_v3: confirming write up to %X/%X, flush to %X/%X\n", - (uint32) (written_lsn >> 32), (uint32) written_lsn, - (uint32) (fsync_lsn >> 32), (uint32) fsync_lsn); + Dprintf("_pq_copy_both_v3: confirming write up to "XLOGFMTSTR", flush to "XLOGFMTSTR, + XLOGFMTARGS(written_lsn), XLOGFMTARGS(fsync_lsn)); replybuf[len] = 'r'; len += 1; @@ -1559,6 +1559,7 @@ _pq_copy_both_v3(cursorObject *curs) PyObject *tmp = NULL; PyObject *write_func = NULL; PyObject *obj = NULL; + replicationMessageObject *msg = NULL; int ret = -1; int is_text; @@ -1568,9 +1569,9 @@ _pq_copy_both_v3(cursorObject *curs) struct timeval last_comm, curr_time, ping_time, time_diff; int len, hdr, reply, sel; - XLogRecPtr written_lsn = InvalidXLogRecPtr; - XLogRecPtr fsync_lsn = InvalidXLogRecPtr; - XLogRecPtr wal_end = InvalidXLogRecPtr; + XLogRecPtr written_lsn = InvalidXLogRecPtr, + fsync_lsn = InvalidXLogRecPtr, + data_start, wal_end; if (!curs->copyfile) { PyErr_SetString(ProgrammingError, @@ -1666,7 +1667,12 @@ _pq_copy_both_v3(cursorObject *curs) goto exit; } - wal_end = fe_recvint64(buffer + 1 + 8); + data_start = fe_recvint64(buffer + 1); + wal_end = fe_recvint64(buffer + 1 + 8); + /*send_time = fe_recvint64(buffer + 1 + 8 + 8);*/ + + Dprintf("_pq_copy_both_v3: data_start="XLOGFMTSTR", wal_end="XLOGFMTSTR, + XLOGFMTARGS(data_start), XLOGFMTARGS(wal_end)); if (is_text) { obj = PyUnicode_Decode(buffer + hdr, len - hdr, curs->conn->codec, NULL); @@ -1676,21 +1682,36 @@ _pq_copy_both_v3(cursorObject *curs) } if (!obj) { goto exit; } - tmp = PyObject_CallFunctionObjArgs(write_func, obj, NULL); + msg = (replicationMessageObject *) + PyObject_CallFunctionObjArgs((PyObject *)&replicationMessageType, + obj, NULL); Py_DECREF(obj); + if (!msg) { goto exit; } + + msg->data_start = data_start; + msg->wal_end = wal_end; + + tmp = PyObject_CallFunctionObjArgs(write_func, msg, NULL); if (tmp == NULL) { Dprintf("_pq_copy_both_v3: write_func returned NULL"); goto exit; } + Py_DECREF(tmp); /* update the LSN position we've written up to */ if (written_lsn < wal_end) written_lsn = wal_end; - /* if write() returned true-ish, we confirm LSN with the server */ - if (PyObject_IsTrue(tmp)) { - fsync_lsn = written_lsn; + /* if requested by sync_server(msg), we confirm LSN with the server */ + if (curs->repl_sync_msg) { + Dprintf("_pq_copy_both_v3: server sync requested at "XLOGFMTSTR, + XLOGFMTARGS(curs->repl_sync_msg->wal_end)); + + if (fsync_lsn < curs->repl_sync_msg->wal_end) + fsync_lsn = curs->repl_sync_msg->wal_end; + + Py_CLEAR(curs->repl_sync_msg); if (!sendFeedback(conn, written_lsn, fsync_lsn, 0)) { pq_raise(curs->conn, curs, NULL); @@ -1698,8 +1719,14 @@ _pq_copy_both_v3(cursorObject *curs) } gettimeofday(&last_comm, NULL); } - Py_DECREF(tmp); + if (curs->stop_replication) { + Dprintf("_pq_copy_both_v3: stop_replication flag set by write_func"); + break; + } + + Py_DECREF(msg); + msg = NULL; } else if (buffer[0] == 'k') { /* msgtype(1), walEnd(8), sendTime(8), reply(1) */ @@ -1751,6 +1778,7 @@ exit: PQfreemem(buffer); } + Py_XDECREF(msg); Py_XDECREF(write_func); return ret; } diff --git a/psycopg/psycopg.h b/psycopg/psycopg.h index eb406fd2..adda12d9 100644 --- a/psycopg/psycopg.h +++ b/psycopg/psycopg.h @@ -117,6 +117,7 @@ HIDDEN PyObject *psyco_GetDecimalType(void); /* forward declarations */ typedef struct cursorObject cursorObject; typedef struct connectionObject connectionObject; +typedef struct replicationMessageObject replicationMessageObject; /* some utility functions */ RAISES HIDDEN PyObject *psyco_set_error(PyObject *exc, cursorObject *curs, const char *msg); diff --git a/psycopg/psycopgmodule.c b/psycopg/psycopgmodule.c index 61e2de57..67393c37 100644 --- a/psycopg/psycopgmodule.c +++ b/psycopg/psycopgmodule.c @@ -28,6 +28,7 @@ #include "psycopg/connection.h" #include "psycopg/cursor.h" +#include "psycopg/replication_message.h" #include "psycopg/green.h" #include "psycopg/lobject.h" #include "psycopg/notify.h" @@ -785,6 +786,9 @@ INIT_MODULE(_psycopg)(void) Py_TYPE(&cursorType) = &PyType_Type; if (PyType_Ready(&cursorType) == -1) goto exit; + Py_TYPE(&replicationMessageType) = &PyType_Type; + if (PyType_Ready(&replicationMessageType) == -1) goto exit; + Py_TYPE(&typecastType) = &PyType_Type; if (PyType_Ready(&typecastType) == -1) goto exit; diff --git a/psycopg/replication_message.h b/psycopg/replication_message.h new file mode 100644 index 00000000..bf2b5f16 --- /dev/null +++ b/psycopg/replication_message.h @@ -0,0 +1,52 @@ +/* replication_message.h - definition for the psycopg ReplicationMessage type + * + * Copyright (C) 2003-2015 Federico Di Gregorio + * + * This file is part of psycopg. + * + * psycopg2 is free software: you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * In addition, as a special exception, the copyright holders give + * permission to link this program with the OpenSSL library (or with + * modified versions of OpenSSL that use the same license as OpenSSL), + * and distribute linked combinations including the two. + * + * You must obey the GNU Lesser General Public License in all respects for + * all of the code used other than OpenSSL. + * + * psycopg2 is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + */ + +#ifndef PSYCOPG_REPLICATION_MESSAGE_H +#define PSYCOPG_REPLICATION_MESSAGE_H 1 + +#include "libpq_support.h" + +#ifdef __cplusplus +extern "C" { +#endif + +extern HIDDEN PyTypeObject replicationMessageType; + +/* the typedef is forward-declared in psycopg.h */ +struct replicationMessageObject { + PyObject_HEAD + + PyObject *payload; + + XLogRecPtr data_start; + XLogRecPtr wal_end; + /* send_time */ +}; + +#ifdef __cplusplus +} +#endif + +#endif /* !defined(PSYCOPG_REPLICATION_MESSAGE_H) */ diff --git a/psycopg/replication_message_type.c b/psycopg/replication_message_type.c new file mode 100644 index 00000000..6968955e --- /dev/null +++ b/psycopg/replication_message_type.c @@ -0,0 +1,127 @@ +/* replication_message_type.c - python interface to ReplcationMessage objects + * + * Copyright (C) 2003-2015 Federico Di Gregorio + * + * This file is part of psycopg. + * + * psycopg2 is free software: you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * In addition, as a special exception, the copyright holders give + * permission to link this program with the OpenSSL library (or with + * modified versions of OpenSSL that use the same license as OpenSSL), + * and distribute linked combinations including the two. + * + * You must obey the GNU Lesser General Public License in all respects for + * all of the code used other than OpenSSL. + * + * psycopg2 is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + */ + +#define PSYCOPG_MODULE +#include "psycopg/psycopg.h" + +#include "psycopg/replication_message.h" + + +static PyObject * +replmsg_repr(replicationMessageObject *self) +{ + return PyString_FromFormat( + "", + self, XLOGFMTARGS(self->data_start), XLOGFMTARGS(self->wal_end)); +} + +static int +replmsg_init(PyObject *obj, PyObject *args, PyObject *kwargs) +{ + replicationMessageObject *self = (replicationMessageObject*) obj; + + if (!PyArg_ParseTuple(args, "O", &self->payload)) + return -1; + Py_XINCREF(self->payload); + + self->data_start = 0; + self->wal_end = 0; + + return 0; +} + +static int +replmsg_clear(PyObject *self) +{ + Py_CLEAR(((replicationMessageObject*) self)->payload); + return 0; +} + +static void +replmsg_dealloc(PyObject* obj) +{ + replmsg_clear(obj); +} + + +#define OFFSETOF(x) offsetof(replicationMessageObject, x) + +/* object member list */ + +static struct PyMemberDef replicationMessageObject_members[] = { + {"payload", T_OBJECT, OFFSETOF(payload), READONLY, + "TODO"}, + {"data_start", T_ULONGLONG, OFFSETOF(data_start), READONLY, + "TODO"}, + {"wal_end", T_ULONGLONG, OFFSETOF(wal_end), READONLY, + "TODO"}, + {NULL} +}; + +/* object type */ + +#define replicationMessageType_doc \ +"A database replication message." + +PyTypeObject replicationMessageType = { + PyVarObject_HEAD_INIT(NULL, 0) + "psycopg2.extensions.ReplicationMessage", + sizeof(replicationMessageObject), 0, + replmsg_dealloc, /*tp_dealloc*/ + 0, /*tp_print*/ + 0, /*tp_getattr*/ + 0, /*tp_setattr*/ + 0, /*tp_compare*/ + (reprfunc)replmsg_repr, /*tp_repr*/ + 0, /*tp_as_number*/ + 0, /*tp_as_sequence*/ + 0, /*tp_as_mapping*/ + 0, /*tp_hash */ + 0, /*tp_call*/ + 0, /*tp_str*/ + 0, /*tp_getattro*/ + 0, /*tp_setattro*/ + 0, /*tp_as_buffer*/ + Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE, + /*tp_flags*/ + replicationMessageType_doc, /*tp_doc*/ + 0, /*tp_traverse*/ + replmsg_clear, /*tp_clear*/ + 0, /*tp_richcompare*/ + 0, /*tp_weaklistoffset*/ + 0, /*tp_iter*/ + 0, /*tp_iternext*/ + 0, /*tp_methods*/ + replicationMessageObject_members, /*tp_members*/ + 0, /*tp_getset*/ + 0, /*tp_base*/ + 0, /*tp_dict*/ + 0, /*tp_descr_get*/ + 0, /*tp_descr_set*/ + 0, /*tp_dictoffset*/ + replmsg_init, /*tp_init*/ + 0, /*tp_alloc*/ + PyType_GenericNew, /*tp_new*/ +}; diff --git a/setup.py b/setup.py index 1f87520e..7c1a479f 100644 --- a/setup.py +++ b/setup.py @@ -466,6 +466,7 @@ sources = [ 'connection_int.c', 'connection_type.c', 'cursor_int.c', 'cursor_type.c', + 'replication_message_type.c', 'diagnostics_type.c', 'error_type.c', 'lobject_int.c', 'lobject_type.c', 'notify_type.c', 'xid_type.c', @@ -481,6 +482,7 @@ depends = [ # headers 'config.h', 'pgtypes.h', 'psycopg.h', 'python.h', 'connection.h', 'cursor.h', 'diagnostics.h', 'error.h', 'green.h', 'lobject.h', + 'replication_message.h', 'notify.h', 'pqpath.h', 'xid.h', 'libpq_support.h', 'win32_support.h',