Add support for streaming replication protocol

Introduce ReplicationConnection and ReplicationCursor classes, that
incapsulate initiation of special type of PostgreSQL connection and
handling of special replication commands only available in this special
connection mode.

The handling of stream of replication data from the server is modelled
largely after the existing support for "COPY table TO file" command and
pg_recvlogical tool supplied with PostgreSQL (though, it can also be
used for physical replication.)
This commit is contained in:
Oleksandr Shulgin 2015-06-01 11:35:05 +02:00
parent d66165232e
commit e32e1b834e
5 changed files with 617 additions and 0 deletions

View File

@ -141,6 +141,128 @@ Logging cursor
.. autoclass:: MinTimeLoggingCursor .. autoclass:: MinTimeLoggingCursor
Replication cursor
^^^^^^^^^^^^^^^^^^
.. autoclass:: ReplicationConnection
This connection factory class can be used to open a special type of
connection that is used for streaming replication.
Example::
from psycopg2.extras import ReplicationConnection, REPLICATION_PHYSICAL, REPLICATION_LOGICAL
conn = psycopg2.connect(dsn, connection_factory=ReplicationConnection)
cur = conn.cursor()
.. seealso::
- PostgreSQL `Replication protocol`__
.. __: http://www.postgresql.org/docs/current/static/protocol-replication.html
.. autoclass:: ReplicationCursor
.. method:: identify_system()
Get information about the cluster status in form of a dict with
``systemid``, ``timeline``, ``xlogpos`` and ``dbname`` as keys.
Example::
>>> print cur.identify_system()
{'timeline': 1, 'systemid': '1234567890123456789', 'dbname': 'test', 'xlogpos': '0/1ABCDEF'}
.. method:: create_replication_slot(slot_type, slot_name, output_plugin=None)
Create streaming replication slot.
:param slot_type: type of replication: either `REPLICATION_PHYSICAL` or
`REPLICATION_LOGICAL`
:param slot_name: name of the replication slot to be created
:param output_plugin: name of the logical decoding output plugin to use
(logical replication only)
Example::
cur.create_replication_slot(REPLICATION_LOGICAL, "testslot", "test_decoding")
.. method:: drop_replication_slot(slot_name)
Drop streaming replication slot.
:param slot_name: name of the replication slot to drop
Example::
cur.drop_replication_slot("testslot")
.. method:: start_replication(file, slot_type, slot_name=None, start_lsn=None, timeline=0, keepalive_interval=10, options=None)
Start and consume replication stream.
:param file: a file-like object to write replication stream messages to
:param slot_type: type of replication: either `REPLICATION_PHYSICAL` or
`REPLICATION_LOGICAL`
:param slot_name: name of the replication slot to use (required for
logical replication)
:param start_lsn: the point in replication stream (WAL position) to start
from, in the form ``XXX/XXX`` (forward-slash separated
pair of hexadecimals)
:param timeline: WAL history timeline to start streaming from (optional,
can only be used with physical replication)
:param keepalive_interval: interval (in seconds) to send keepalive
messages to the server, in case there was no
communication during that period of time
:param options: an dictionary of options to pass to logical replication
slot
The ``keepalive_interval`` must be greater than zero.
This method never returns unless an error message is sent from the
server, or the server closes connection, or there is an exception in the
``write()`` method of the ``file`` object.
One can even use ``sys.stdout`` as the destination (this is only good for
testing purposes, however)::
>>> cur.start_replication(sys.stdout, "testslot")
...
This method acts much like the `~cursor.copy_to()` with an important
distinction that ``write()`` method return value is dirving the
server-side replication cursor. In order to report to the server that
the all the messages up to the current one have been stored reliably, one
should return true value (i.e. something that satisfies ``if retval:``
conidtion) from the ``write`` callback::
class ReplicationStreamWriter(object):
def write(self, msg):
if store_message_reliably(msg):
return True
cur.start_replication(writer, "testslot")
...
.. note::
One needs to be aware that failure to update the server-side cursor
on any one replication slot properly by constantly consuming and
reporting success to the server can eventually lead to "disk full"
condition on the server, because the server retains all the WAL
segments that might be needed to stream the changes via currently
open replication slots.
Drop any open replication slots that are no longer being used. The
list of open slots can be obtained by running a query like ``SELECT *
FROM pg_replication_slots``.
.. data:: REPLICATION_PHYSICAL
.. data:: REPLICATION_LOGICAL
.. index::
pair: Cursor; Replication
.. index:: .. index::

View File

@ -437,6 +437,144 @@ class MinTimeLoggingCursor(LoggingCursor):
return LoggingCursor.callproc(self, procname, vars) return LoggingCursor.callproc(self, procname, vars)
class ReplicationConnection(_connection):
"""A connection that uses `ReplicationCursor` automatically."""
def __init__(self, *args, **kwargs):
"""Initializes a replication connection, by adding appropriate replication parameter to the provided dsn arguments."""
if len(args):
dsn = args[0]
# FIXME: could really use parse_dsn here
if dsn.startswith('postgres://') or dsn.startswith('postgresql://'):
# poor man's url parsing
if dsn.rfind('?') > 0:
if not dsn.endswith('?'):
dsn += '&'
else:
dsn += '?'
else:
dsn += ' '
dsn += 'replication=database'
args = [dsn] + list(args[1:])
else:
dbname = kwargs.get('dbname', None)
if dbname is None:
kwargs['dbname'] = 'replication'
if kwargs.get('replication', None) is None:
kwargs['replication'] = 'database' if dbname else 'true'
super(ReplicationConnection, self).__init__(*args, **kwargs)
# prevent auto-issued BEGIN statements
self.autocommit = True
def cursor(self, *args, **kwargs):
kwargs.setdefault('cursor_factory', ReplicationCursor)
return super(ReplicationConnection, self).cursor(*args, **kwargs)
"""Streamging replication types."""
REPLICATION_PHYSICAL = 0
REPLICATION_LOGICAL = 1
class ReplicationCursor(_cursor):
"""A cursor used for replication commands."""
def identify_system(self):
"""Get information about the cluster status."""
self.execute("IDENTIFY_SYSTEM")
return dict(zip(['systemid', 'timeline', 'xlogpos', 'dbname'],
self.fetchall()[0]))
def quote_ident(self, ident):
# FIXME: use PQescapeIdentifier or psycopg_escape_identifier_easy, somehow
return '"%s"' % ident.replace('"', '""')
def create_replication_slot(self, slot_type, slot_name, output_plugin=None):
"""Create streaming replication slot."""
command = "CREATE_REPLICATION_SLOT %s " % self.quote_ident(slot_name)
if slot_type == REPLICATION_LOGICAL:
if output_plugin is None:
raise RuntimeError("output_plugin is required for logical replication slot")
command += "LOGICAL %s" % self.quote_ident(output_plugin)
elif slot_type == REPLICATION_PHYSICAL:
if output_plugin is not None:
raise RuntimeError("output_plugin is not applicable to physical replication")
command += "PHYSICAL"
else:
raise RuntimeError("unrecognized replication slot type")
return self.execute(command)
def drop_replication_slot(self, slot_name):
"""Drop streaming replication slot."""
command = "DROP_REPLICATION_SLOT %s" % self.quote_ident(slot_name)
return self.execute(command)
def start_replication(self, o, slot_type, slot_name=None, start_lsn=None,
timeline=0, keepalive_interval=10, options=None):
"""Start and consume replication stream."""
if keepalive_interval <= 0:
raise RuntimeError("keepalive_interval must be > 0: %d" % keepalive_interval)
command = "START_REPLICATION "
if slot_type == REPLICATION_LOGICAL and slot_name is None:
raise RuntimeError("slot_name is required for logical replication slot")
if slot_name:
command += "SLOT %s " % self.quote_ident(slot_name)
if slot_type == REPLICATION_LOGICAL:
command += "LOGICAL "
elif slot_type == REPLICATION_PHYSICAL:
command += "PHYSICAL "
else:
raise RuntimeError("unrecognized replication slot type")
if start_lsn is None:
start_lsn = '0/0'
# reparse lsn to catch possible garbage
lsn = start_lsn.split('/')
command += "%X/%X" % (int(lsn[0], 16), int(lsn[1], 16))
if timeline != 0:
if slot_type == REPLICATION_LOGICAL:
raise RuntimeError("cannot specify timeline for logical replication")
if timeline < 0:
raise RuntimeError("timeline must be >= 0: %d" % timeline)
command += " TIMELINE %d" % timeline
if options:
if slot_type == REPLICATION_PHYSICAL:
raise RuntimeError("cannot specify plugin options for physical replication")
command += " ("
for k,v in options.iteritems():
if not command.endswith('('):
command += ", "
command += "%s %s" % (self.quote_ident(k), _A(str(v)).getquoted())
command += ")"
return self.start_replication_expert(o, command, keepalive_interval)
# a dbtype and adapter for Python UUID type # a dbtype and adapter for Python UUID type
class UUID_adapter(object): class UUID_adapter(object):

View File

@ -72,6 +72,8 @@ struct cursorObject {
#define DEFAULT_COPYSIZE 16384 #define DEFAULT_COPYSIZE 16384
#define DEFAULT_COPYBUFF 8192 #define DEFAULT_COPYBUFF 8192
int keepalive_interval; /* interval for keepalive messages in replication mode */
PyObject *tuple_factory; /* factory for result tuples */ PyObject *tuple_factory; /* factory for result tuples */
PyObject *tzinfo_factory; /* factory for tzinfo objects */ PyObject *tzinfo_factory; /* factory for tzinfo objects */
@ -88,6 +90,10 @@ struct cursorObject {
}; };
/* streaming replication modes */
#define CURSOR_REPLICATION_PHYSICAL 0
#define CURSOR_REPLICATION_LOGICAL 1
/* C-callable functions in cursor_int.c and cursor_type.c */ /* C-callable functions in cursor_int.c and cursor_type.c */
BORROWED HIDDEN PyObject *curs_get_cast(cursorObject *self, PyObject *oid); BORROWED HIDDEN PyObject *curs_get_cast(cursorObject *self, PyObject *oid);

View File

@ -1579,6 +1579,43 @@ exit:
return res; return res;
} }
#define psyco_curs_start_replication_expert_doc \
"start_replication_expert(file, command, keepalive_interval) -- Start and consume replication stream with direct command."
static PyObject *
psyco_curs_start_replication_expert(cursorObject *self, PyObject *args)
{
PyObject *file, *res = NULL;
char *command;
int keepalive_interval;
if (!PyArg_ParseTuple(args, "O&si",
_psyco_curs_has_write_check, &file,
&command, &keepalive_interval)) {
return NULL;
}
EXC_IF_CURS_CLOSED(self);
EXC_IF_CURS_ASYNC(self, start_replication_expert);
EXC_IF_GREEN(start_replication_expert);
EXC_IF_TPC_PREPARED(self->conn, start_replication_expert);
Dprintf("psyco_curs_start_replication_expert: command = %s", command);
self->copysize = 0;
Py_INCREF(file);
self->copyfile = file;
self->keepalive_interval = keepalive_interval;
if (pq_execute(self, command, 0, 1 /* no_result */, 1 /* no_begin */) >= 0) {
res = Py_None;
Py_INCREF(Py_None);
}
Py_CLEAR(self->copyfile);
return res;
}
/* extension: closed - return true if cursor is closed */ /* extension: closed - return true if cursor is closed */
#define psyco_curs_closed_doc \ #define psyco_curs_closed_doc \
@ -1753,6 +1790,8 @@ static struct PyMethodDef cursorObject_methods[] = {
METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_to_doc}, METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_to_doc},
{"copy_expert", (PyCFunction)psyco_curs_copy_expert, {"copy_expert", (PyCFunction)psyco_curs_copy_expert,
METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_expert_doc}, METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_expert_doc},
{"start_replication_expert", (PyCFunction)psyco_curs_start_replication_expert,
METH_VARARGS, psyco_curs_start_replication_expert_doc},
{NULL} {NULL}
}; };

View File

@ -40,7 +40,14 @@
#include "psycopg/pgtypes.h" #include "psycopg/pgtypes.h"
#include "psycopg/error.h" #include "psycopg/error.h"
#include "postgres_fe.h"
#include "access/xlog_internal.h"
#include "common/fe_memutils.h"
#include "libpq-fe.h"
#include <string.h> #include <string.h>
#include <netinet/in.h>
#include <arpa/inet.h>
extern HIDDEN PyObject *psyco_DescriptionType; extern HIDDEN PyObject *psyco_DescriptionType;
@ -1514,6 +1521,302 @@ exit:
return ret; return ret;
} }
/* support routines taken from pg_basebackup/streamutil.c */
/*
* Frontend version of GetCurrentTimestamp(), since we are not linked with
* backend code. The protocol always uses integer timestamps, regardless of
* server setting.
*/
static int64
feGetCurrentTimestamp(void)
{
int64 result;
struct timeval tp;
gettimeofday(&tp, NULL);
result = (int64) tp.tv_sec -
((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
result = (result * USECS_PER_SEC) + tp.tv_usec;
return result;
}
/*
* Converts an int64 to network byte order.
*/
static void
fe_sendint64(int64 i, char *buf)
{
uint32 n32;
/* High order half first, since we're doing MSB-first */
n32 = (uint32) (i >> 32);
n32 = htonl(n32);
memcpy(&buf[0], &n32, 4);
/* Now the low order half */
n32 = (uint32) i;
n32 = htonl(n32);
memcpy(&buf[4], &n32, 4);
}
/*
* Converts an int64 from network byte order to native format.
*/
static int64
fe_recvint64(char *buf)
{
int64 result;
uint32 h32;
uint32 l32;
memcpy(&h32, buf, 4);
memcpy(&l32, buf + 4, 4);
h32 = ntohl(h32);
l32 = ntohl(l32);
result = h32;
result <<= 32;
result |= l32;
return result;
}
static int
sendFeedback(PGconn *conn, XLogRecPtr written_lsn, XLogRecPtr fsync_lsn,
int replyRequested)
{
char replybuf[1 + 8 + 8 + 8 + 8 + 1];
int len = 0;
Dprintf("_pq_copy_both_v3: confirming write up to %X/%X, flush to %X/%X\n",
(uint32) (written_lsn >> 32), (uint32) written_lsn,
(uint32) (fsync_lsn >> 32), (uint32) fsync_lsn);
replybuf[len] = 'r';
len += 1;
fe_sendint64(written_lsn, &replybuf[len]); /* write */
len += 8;
fe_sendint64(fsync_lsn, &replybuf[len]); /* flush */
len += 8;
fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
len += 8;
fe_sendint64(feGetCurrentTimestamp(), &replybuf[len]); /* sendTime */
len += 8;
replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
len += 1;
if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn)) {
return 0;
}
return 1;
}
/* used for streaming replication only */
static int
_pq_copy_both_v3(cursorObject *curs)
{
PyObject *tmp = NULL;
PyObject *write_func = NULL;
PyObject *obj = NULL;
int ret = -1;
int is_text;
PGconn *conn;
char *buffer = NULL;
fd_set fds;
struct timeval last_comm, curr_time, ping_time, time_diff;
int len, hdr, reply, sel;
XLogRecPtr written_lsn = InvalidXLogRecPtr;
XLogRecPtr fsync_lsn = InvalidXLogRecPtr;
XLogRecPtr wal_end = InvalidXLogRecPtr;
if (!curs->copyfile) {
PyErr_SetString(ProgrammingError,
"can't execute START_REPLICATION: use the start_replication() method instead");
goto exit;
}
if (curs->keepalive_interval <= 0) {
PyErr_Format(PyExc_RuntimeError, "keepalive_interval must be > 0: %d",
curs->keepalive_interval);
goto exit;
}
if (!(write_func = PyObject_GetAttrString(curs->copyfile, "write"))) {
Dprintf("_pq_copy_both_v3: can't get o.write");
goto exit;
}
/* if the file is text we must pass it unicode. */
if (-1 == (is_text = psycopg_is_text_file(curs->copyfile))) {
goto exit;
}
CLEARPGRES(curs->pgres);
/* timestamp of last communication with the server */
gettimeofday(&last_comm, NULL);
conn = curs->conn->pgconn;
while (1) {
len = PQgetCopyData(conn, &buffer, 1 /* async! */);
if (len < 0) {
break;
}
if (len == 0) {
FD_ZERO(&fds);
FD_SET(PQsocket(conn), &fds);
/* set up timeout according to keepalive_interval, but no less than 1 second */
gettimeofday(&curr_time, NULL);
ping_time = last_comm;
ping_time.tv_sec += curs->keepalive_interval;
if (timercmp(&ping_time, &curr_time, >)) {
timersub(&ping_time, &curr_time, &time_diff);
Py_BEGIN_ALLOW_THREADS;
sel = select(PQsocket(conn) + 1, &fds, NULL, NULL, &time_diff);
Py_END_ALLOW_THREADS;
}
else {
sel = 0; /* pretend select() timed out */
}
if (sel < 0) {
if (errno != EINTR) {
PyErr_SetFromErrno(PyExc_OSError);
goto exit;
}
if (PyErr_CheckSignals()) {
goto exit;
}
continue;
}
if (sel > 0) {
if (!PQconsumeInput(conn)) {
Dprintf("_pq_copy_both_v3: PQconsumeInput failed");
pq_raise(curs->conn, curs, NULL);
goto exit;
}
}
else { /* timeout */
if (!sendFeedback(conn, written_lsn, fsync_lsn, false)) {
pq_raise(curs->conn, curs, NULL);
goto exit;
}
}
gettimeofday(&last_comm, NULL);
continue;
}
if (len > 0 && buffer) {
gettimeofday(&last_comm, NULL);
Dprintf("_pq_copy_both_v3: msg=%c, len=%d", buffer[0], len);
if (buffer[0] == 'w') {
/* msgtype(1), dataStart(8), walEnd(8), sendTime(8) */
hdr = 1 + 8 + 8 + 8;
if (len < hdr + 1) {
PyErr_Format(PyExc_RuntimeError,
"streaming header too small in data message: %d", len);
goto exit;
}
wal_end = fe_recvint64(buffer + 1 + 8);
if (is_text) {
obj = PyUnicode_Decode(buffer + hdr, len - hdr, curs->conn->codec, NULL);
}
else {
obj = Bytes_FromStringAndSize(buffer + hdr, len - hdr);
}
if (!obj) { goto exit; }
tmp = PyObject_CallFunctionObjArgs(write_func, obj, NULL);
Py_DECREF(obj);
if (tmp == NULL) {
Dprintf("_pq_copy_both_v3: write_func returned NULL");
goto exit;
}
written_lsn = Max(wal_end, written_lsn);
/* if write() returned true-ish, we confirm LSN with the server */
if (PyObject_IsTrue(tmp)) {
fsync_lsn = written_lsn;
if (!sendFeedback(conn, written_lsn, fsync_lsn, false)) {
pq_raise(curs->conn, curs, NULL);
goto exit;
}
gettimeofday(&last_comm, NULL);
}
Py_DECREF(tmp);
}
else if (buffer[0] == 'k') {
/* msgtype(1), walEnd(8), sendTime(8), reply(1) */
hdr = 1 + 8 + 8;
if (len < hdr + 1) {
PyErr_Format(PyExc_RuntimeError,
"streaming header too small in keepalive message: %d", len);
goto exit;
}
reply = buffer[hdr];
if (reply) {
if (!sendFeedback(conn, written_lsn, fsync_lsn, false)) {
pq_raise(curs->conn, curs, NULL);
goto exit;
}
gettimeofday(&last_comm, NULL);
}
}
else {
PyErr_Format(PyExc_RuntimeError,
"unrecognized streaming message type: \"%c\"", buffer[0]);
goto exit;
}
/* buffer is allocated on every PQgetCopyData() call */
PQfreemem(buffer);
buffer = NULL;
}
}
if (len == -2) {
pq_raise(curs->conn, curs, NULL);
goto exit;
}
if (len == -1) {
curs->pgres = PQgetResult(curs->conn->pgconn);
if (curs->pgres && PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR)
pq_raise(curs->conn, curs, NULL);
CLEARPGRES(curs->pgres);
}
ret = 1;
exit:
if (buffer) {
PQfreemem(buffer);
}
Py_XDECREF(write_func);
return ret;
}
int int
pq_fetch(cursorObject *curs, int no_result) pq_fetch(cursorObject *curs, int no_result)
{ {
@ -1573,6 +1876,15 @@ pq_fetch(cursorObject *curs, int no_result)
CLEARPGRES(curs->pgres); CLEARPGRES(curs->pgres);
break; break;
case PGRES_COPY_BOTH:
Dprintf("pq_fetch: data from a streaming replication slot (no tuples)");
curs->rowcount = -1;
ex = _pq_copy_both_v3(curs);
/* error caught by out glorious notice handler */
if (PyErr_Occurred()) ex = -1;
CLEARPGRES(curs->pgres);
break;
case PGRES_TUPLES_OK: case PGRES_TUPLES_OK:
if (!no_result) { if (!no_result) {
Dprintf("pq_fetch: got tuples"); Dprintf("pq_fetch: got tuples");