Add ReplicationMessage object

This commit is contained in:
Oleksandr Shulgin 2015-06-05 17:44:09 +02:00
parent f7b84ce843
commit 453830f80c
10 changed files with 295 additions and 12 deletions

View File

@ -574,6 +574,10 @@ class ReplicationCursor(_cursor):
return self.start_replication_expert(o, command, keepalive_interval)
# thin wrapper
def sync_server(self, msg):
return self.replication_sync_server(msg)
# a dbtype and adapter for Python UUID type

View File

@ -72,7 +72,10 @@ struct cursorObject {
#define DEFAULT_COPYSIZE 16384
#define DEFAULT_COPYBUFF 8192
int in_replication; /* we're in streaming replication loop */
int stop_replication; /* client requested to stop replication */
int keepalive_interval; /* interval for keepalive messages in replication mode */
replicationMessageObject *repl_sync_msg; /* set when the client asks us to sync the server */
PyObject *tuple_factory; /* factory for result tuples */
PyObject *tzinfo_factory; /* factory for tzinfo objects */

View File

@ -28,6 +28,7 @@
#include "psycopg/cursor.h"
#include "psycopg/connection.h"
#include "psycopg/replication_message.h"
#include "psycopg/green.h"
#include "psycopg/pqpath.h"
#include "psycopg/typecast.h"
@ -1605,17 +1606,68 @@ psyco_curs_start_replication_expert(cursorObject *self, PyObject *args)
self->copysize = 0;
Py_INCREF(file);
self->copyfile = file;
self->in_replication = 1;
self->keepalive_interval = keepalive_interval;
self->stop_replication = 0;
self->repl_sync_msg = NULL;
if (pq_execute(self, command, 0, 1 /* no_result */, 1 /* no_begin */) >= 0) {
res = Py_None;
Py_INCREF(Py_None);
}
Py_CLEAR(self->repl_sync_msg);
Py_CLEAR(self->copyfile);
self->in_replication = 0;
return res;
}
#define psyco_curs_stop_replication_doc \
"start_replication() -- Set flag to break out of endless loop in start_replication()."
static PyObject *
psyco_curs_stop_replication(cursorObject *self)
{
EXC_IF_CURS_CLOSED(self);
if (!self->in_replication) {
PyErr_SetString(ProgrammingError,
"stop_replication() called when not in streaming replication loop");
} else {
self->stop_replication = 1;
}
Py_RETURN_NONE;
}
#define psyco_curs_replication_sync_server_doc \
"replication_sync_server(msg) -- Set flag to sync the server up to this replication message."
static PyObject *
psyco_curs_replication_sync_server(cursorObject *self, PyObject *args)
{
replicationMessageObject *msg;
EXC_IF_CURS_CLOSED(self);
if (!PyArg_ParseTuple(args, "O!", &replicationMessageType, &msg)) {
return NULL;
}
if (!self->in_replication) {
PyErr_SetString(ProgrammingError,
"replication_sync_server() called when not in streaming replication loop");
} else {
Py_CLEAR(self->repl_sync_msg);
self->repl_sync_msg = msg;
Py_XINCREF(self->repl_sync_msg);
}
Py_RETURN_NONE;
}
/* extension: closed - return true if cursor is closed */
#define psyco_curs_closed_doc \
@ -1792,6 +1844,10 @@ static struct PyMethodDef cursorObject_methods[] = {
METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_expert_doc},
{"start_replication_expert", (PyCFunction)psyco_curs_start_replication_expert,
METH_VARARGS, psyco_curs_start_replication_expert_doc},
{"stop_replication", (PyCFunction)psyco_curs_stop_replication,
METH_NOARGS, psyco_curs_stop_replication_doc},
{"replication_sync_server", (PyCFunction)psyco_curs_replication_sync_server,
METH_VARARGS, psyco_curs_replication_sync_server_doc},
{NULL}
};
@ -1908,6 +1964,7 @@ cursor_clear(cursorObject *self)
Py_CLEAR(self->casts);
Py_CLEAR(self->caster);
Py_CLEAR(self->copyfile);
Py_CLEAR(self->repl_sync_msg);
Py_CLEAR(self->tuple_factory);
Py_CLEAR(self->tzinfo_factory);
Py_CLEAR(self->query);
@ -1997,6 +2054,7 @@ cursor_traverse(cursorObject *self, visitproc visit, void *arg)
Py_VISIT(self->casts);
Py_VISIT(self->caster);
Py_VISIT(self->copyfile);
Py_VISIT(self->repl_sync_msg);
Py_VISIT(self->tuple_factory);
Py_VISIT(self->tzinfo_factory);
Py_VISIT(self->query);

View File

@ -33,6 +33,10 @@ typedef unsigned PG_INT64_TYPE XLogRecPtr;
#define InvalidXLogRecPtr ((XLogRecPtr) 0)
/* have to use lowercase %x, as PyString_FromFormat can't do %X */
#define XLOGFMTSTR "%x/%x"
#define XLOGFMTARGS(x) ((uint32)((x) >> 32)), ((uint32)((x) & 0xFFFFFFFF))
HIDDEN pg_int64 feGetCurrentTimestamp(void);
HIDDEN void fe_sendint64(pg_int64 i, char *buf);
HIDDEN pg_int64 fe_recvint64(char *buf);

View File

@ -35,6 +35,7 @@
#include "psycopg/pqpath.h"
#include "psycopg/connection.h"
#include "psycopg/cursor.h"
#include "psycopg/replication_message.h"
#include "psycopg/green.h"
#include "psycopg/typecast.h"
#include "psycopg/pgtypes.h"
@ -1528,9 +1529,8 @@ sendFeedback(PGconn *conn, XLogRecPtr written_lsn, XLogRecPtr fsync_lsn,
char replybuf[1 + 8 + 8 + 8 + 8 + 1];
int len = 0;
Dprintf("_pq_copy_both_v3: confirming write up to %X/%X, flush to %X/%X\n",
(uint32) (written_lsn >> 32), (uint32) written_lsn,
(uint32) (fsync_lsn >> 32), (uint32) fsync_lsn);
Dprintf("_pq_copy_both_v3: confirming write up to "XLOGFMTSTR", flush to "XLOGFMTSTR,
XLOGFMTARGS(written_lsn), XLOGFMTARGS(fsync_lsn));
replybuf[len] = 'r';
len += 1;
@ -1559,6 +1559,7 @@ _pq_copy_both_v3(cursorObject *curs)
PyObject *tmp = NULL;
PyObject *write_func = NULL;
PyObject *obj = NULL;
replicationMessageObject *msg = NULL;
int ret = -1;
int is_text;
@ -1568,9 +1569,9 @@ _pq_copy_both_v3(cursorObject *curs)
struct timeval last_comm, curr_time, ping_time, time_diff;
int len, hdr, reply, sel;
XLogRecPtr written_lsn = InvalidXLogRecPtr;
XLogRecPtr fsync_lsn = InvalidXLogRecPtr;
XLogRecPtr wal_end = InvalidXLogRecPtr;
XLogRecPtr written_lsn = InvalidXLogRecPtr,
fsync_lsn = InvalidXLogRecPtr,
data_start, wal_end;
if (!curs->copyfile) {
PyErr_SetString(ProgrammingError,
@ -1666,7 +1667,12 @@ _pq_copy_both_v3(cursorObject *curs)
goto exit;
}
wal_end = fe_recvint64(buffer + 1 + 8);
data_start = fe_recvint64(buffer + 1);
wal_end = fe_recvint64(buffer + 1 + 8);
/*send_time = fe_recvint64(buffer + 1 + 8 + 8);*/
Dprintf("_pq_copy_both_v3: data_start="XLOGFMTSTR", wal_end="XLOGFMTSTR,
XLOGFMTARGS(data_start), XLOGFMTARGS(wal_end));
if (is_text) {
obj = PyUnicode_Decode(buffer + hdr, len - hdr, curs->conn->codec, NULL);
@ -1676,21 +1682,36 @@ _pq_copy_both_v3(cursorObject *curs)
}
if (!obj) { goto exit; }
tmp = PyObject_CallFunctionObjArgs(write_func, obj, NULL);
msg = (replicationMessageObject *)
PyObject_CallFunctionObjArgs((PyObject *)&replicationMessageType,
obj, NULL);
Py_DECREF(obj);
if (!msg) { goto exit; }
msg->data_start = data_start;
msg->wal_end = wal_end;
tmp = PyObject_CallFunctionObjArgs(write_func, msg, NULL);
if (tmp == NULL) {
Dprintf("_pq_copy_both_v3: write_func returned NULL");
goto exit;
}
Py_DECREF(tmp);
/* update the LSN position we've written up to */
if (written_lsn < wal_end)
written_lsn = wal_end;
/* if write() returned true-ish, we confirm LSN with the server */
if (PyObject_IsTrue(tmp)) {
fsync_lsn = written_lsn;
/* if requested by sync_server(msg), we confirm LSN with the server */
if (curs->repl_sync_msg) {
Dprintf("_pq_copy_both_v3: server sync requested at "XLOGFMTSTR,
XLOGFMTARGS(curs->repl_sync_msg->wal_end));
if (fsync_lsn < curs->repl_sync_msg->wal_end)
fsync_lsn = curs->repl_sync_msg->wal_end;
Py_CLEAR(curs->repl_sync_msg);
if (!sendFeedback(conn, written_lsn, fsync_lsn, 0)) {
pq_raise(curs->conn, curs, NULL);
@ -1698,8 +1719,14 @@ _pq_copy_both_v3(cursorObject *curs)
}
gettimeofday(&last_comm, NULL);
}
Py_DECREF(tmp);
if (curs->stop_replication) {
Dprintf("_pq_copy_both_v3: stop_replication flag set by write_func");
break;
}
Py_DECREF(msg);
msg = NULL;
}
else if (buffer[0] == 'k') {
/* msgtype(1), walEnd(8), sendTime(8), reply(1) */
@ -1751,6 +1778,7 @@ exit:
PQfreemem(buffer);
}
Py_XDECREF(msg);
Py_XDECREF(write_func);
return ret;
}

View File

@ -117,6 +117,7 @@ HIDDEN PyObject *psyco_GetDecimalType(void);
/* forward declarations */
typedef struct cursorObject cursorObject;
typedef struct connectionObject connectionObject;
typedef struct replicationMessageObject replicationMessageObject;
/* some utility functions */
RAISES HIDDEN PyObject *psyco_set_error(PyObject *exc, cursorObject *curs, const char *msg);

View File

@ -28,6 +28,7 @@
#include "psycopg/connection.h"
#include "psycopg/cursor.h"
#include "psycopg/replication_message.h"
#include "psycopg/green.h"
#include "psycopg/lobject.h"
#include "psycopg/notify.h"
@ -785,6 +786,9 @@ INIT_MODULE(_psycopg)(void)
Py_TYPE(&cursorType) = &PyType_Type;
if (PyType_Ready(&cursorType) == -1) goto exit;
Py_TYPE(&replicationMessageType) = &PyType_Type;
if (PyType_Ready(&replicationMessageType) == -1) goto exit;
Py_TYPE(&typecastType) = &PyType_Type;
if (PyType_Ready(&typecastType) == -1) goto exit;

View File

@ -0,0 +1,52 @@
/* replication_message.h - definition for the psycopg ReplicationMessage type
*
* Copyright (C) 2003-2015 Federico Di Gregorio <fog@debian.org>
*
* This file is part of psycopg.
*
* psycopg2 is free software: you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* In addition, as a special exception, the copyright holders give
* permission to link this program with the OpenSSL library (or with
* modified versions of OpenSSL that use the same license as OpenSSL),
* and distribute linked combinations including the two.
*
* You must obey the GNU Lesser General Public License in all respects for
* all of the code used other than OpenSSL.
*
* psycopg2 is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
* License for more details.
*/
#ifndef PSYCOPG_REPLICATION_MESSAGE_H
#define PSYCOPG_REPLICATION_MESSAGE_H 1
#include "libpq_support.h"
#ifdef __cplusplus
extern "C" {
#endif
extern HIDDEN PyTypeObject replicationMessageType;
/* the typedef is forward-declared in psycopg.h */
struct replicationMessageObject {
PyObject_HEAD
PyObject *payload;
XLogRecPtr data_start;
XLogRecPtr wal_end;
/* send_time */
};
#ifdef __cplusplus
}
#endif
#endif /* !defined(PSYCOPG_REPLICATION_MESSAGE_H) */

View File

@ -0,0 +1,127 @@
/* replication_message_type.c - python interface to ReplcationMessage objects
*
* Copyright (C) 2003-2015 Federico Di Gregorio <fog@debian.org>
*
* This file is part of psycopg.
*
* psycopg2 is free software: you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* In addition, as a special exception, the copyright holders give
* permission to link this program with the OpenSSL library (or with
* modified versions of OpenSSL that use the same license as OpenSSL),
* and distribute linked combinations including the two.
*
* You must obey the GNU Lesser General Public License in all respects for
* all of the code used other than OpenSSL.
*
* psycopg2 is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
* License for more details.
*/
#define PSYCOPG_MODULE
#include "psycopg/psycopg.h"
#include "psycopg/replication_message.h"
static PyObject *
replmsg_repr(replicationMessageObject *self)
{
return PyString_FromFormat(
"<replicationMessage object at %p; data_start: "XLOGFMTSTR"; wal_end: "XLOGFMTSTR">",
self, XLOGFMTARGS(self->data_start), XLOGFMTARGS(self->wal_end));
}
static int
replmsg_init(PyObject *obj, PyObject *args, PyObject *kwargs)
{
replicationMessageObject *self = (replicationMessageObject*) obj;
if (!PyArg_ParseTuple(args, "O", &self->payload))
return -1;
Py_XINCREF(self->payload);
self->data_start = 0;
self->wal_end = 0;
return 0;
}
static int
replmsg_clear(PyObject *self)
{
Py_CLEAR(((replicationMessageObject*) self)->payload);
return 0;
}
static void
replmsg_dealloc(PyObject* obj)
{
replmsg_clear(obj);
}
#define OFFSETOF(x) offsetof(replicationMessageObject, x)
/* object member list */
static struct PyMemberDef replicationMessageObject_members[] = {
{"payload", T_OBJECT, OFFSETOF(payload), READONLY,
"TODO"},
{"data_start", T_ULONGLONG, OFFSETOF(data_start), READONLY,
"TODO"},
{"wal_end", T_ULONGLONG, OFFSETOF(wal_end), READONLY,
"TODO"},
{NULL}
};
/* object type */
#define replicationMessageType_doc \
"A database replication message."
PyTypeObject replicationMessageType = {
PyVarObject_HEAD_INIT(NULL, 0)
"psycopg2.extensions.ReplicationMessage",
sizeof(replicationMessageObject), 0,
replmsg_dealloc, /*tp_dealloc*/
0, /*tp_print*/
0, /*tp_getattr*/
0, /*tp_setattr*/
0, /*tp_compare*/
(reprfunc)replmsg_repr, /*tp_repr*/
0, /*tp_as_number*/
0, /*tp_as_sequence*/
0, /*tp_as_mapping*/
0, /*tp_hash */
0, /*tp_call*/
0, /*tp_str*/
0, /*tp_getattro*/
0, /*tp_setattro*/
0, /*tp_as_buffer*/
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE,
/*tp_flags*/
replicationMessageType_doc, /*tp_doc*/
0, /*tp_traverse*/
replmsg_clear, /*tp_clear*/
0, /*tp_richcompare*/
0, /*tp_weaklistoffset*/
0, /*tp_iter*/
0, /*tp_iternext*/
0, /*tp_methods*/
replicationMessageObject_members, /*tp_members*/
0, /*tp_getset*/
0, /*tp_base*/
0, /*tp_dict*/
0, /*tp_descr_get*/
0, /*tp_descr_set*/
0, /*tp_dictoffset*/
replmsg_init, /*tp_init*/
0, /*tp_alloc*/
PyType_GenericNew, /*tp_new*/
};

View File

@ -466,6 +466,7 @@ sources = [
'connection_int.c', 'connection_type.c',
'cursor_int.c', 'cursor_type.c',
'replication_message_type.c',
'diagnostics_type.c', 'error_type.c',
'lobject_int.c', 'lobject_type.c',
'notify_type.c', 'xid_type.c',
@ -481,6 +482,7 @@ depends = [
# headers
'config.h', 'pgtypes.h', 'psycopg.h', 'python.h', 'connection.h',
'cursor.h', 'diagnostics.h', 'error.h', 'green.h', 'lobject.h',
'replication_message.h',
'notify.h', 'pqpath.h', 'xid.h',
'libpq_support.h', 'win32_support.h',