Refer cursor from ReplicationMessage object. At the same time, for the sync use LSN instead of msg reference in cursor.

This commit is contained in:
Oleksandr Shulgin 2015-06-11 14:52:01 +02:00
parent 35a3262fe3
commit 9ed90b1216
5 changed files with 21 additions and 30 deletions

View File

@ -27,6 +27,7 @@
#define PSYCOPG_CURSOR_H 1 #define PSYCOPG_CURSOR_H 1
#include "psycopg/connection.h" #include "psycopg/connection.h"
#include "libpq_support.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -75,7 +76,7 @@ struct cursorObject {
int in_replication; /* we're in streaming replication loop */ int in_replication; /* we're in streaming replication loop */
int stop_replication; /* client requested to stop replication */ int stop_replication; /* client requested to stop replication */
int keepalive_interval; /* interval for keepalive messages in replication mode */ int keepalive_interval; /* interval for keepalive messages in replication mode */
replicationMessageObject *repl_sync_msg; /* set when the client asks us to sync the server */ XLogRecPtr repl_sync_lsn; /* set when the client asks us to sync the server */
PyObject *tuple_factory; /* factory for result tuples */ PyObject *tuple_factory; /* factory for result tuples */
PyObject *tzinfo_factory; /* factory for tzinfo objects */ PyObject *tzinfo_factory; /* factory for tzinfo objects */

View File

@ -1609,14 +1609,13 @@ psyco_curs_start_replication_expert(cursorObject *self, PyObject *args)
self->in_replication = 1; self->in_replication = 1;
self->keepalive_interval = keepalive_interval; self->keepalive_interval = keepalive_interval;
self->stop_replication = 0; self->stop_replication = 0;
self->repl_sync_msg = NULL; self->repl_sync_lsn = InvalidXLogRecPtr;
if (pq_execute(self, command, 0, 1 /* no_result */, 1 /* no_begin */) >= 0) { if (pq_execute(self, command, 0, 1 /* no_result */, 1 /* no_begin */) >= 0) {
res = Py_None; res = Py_None;
Py_INCREF(Py_None); Py_INCREF(Py_None);
} }
Py_CLEAR(self->repl_sync_msg);
Py_CLEAR(self->copyfile); Py_CLEAR(self->copyfile);
self->in_replication = 0; self->in_replication = 0;
@ -1647,24 +1646,12 @@ psyco_curs_stop_replication(cursorObject *self)
static PyObject * static PyObject *
psyco_curs_replication_sync_server(cursorObject *self, PyObject *args) psyco_curs_replication_sync_server(cursorObject *self, PyObject *args)
{ {
replicationMessageObject *msg;
EXC_IF_CURS_CLOSED(self); EXC_IF_CURS_CLOSED(self);
if (!PyArg_ParseTuple(args, "O!", &replicationMessageType, &msg)) { if (!PyArg_ParseTuple(args, "K", &self->repl_sync_lsn)) {
return NULL; return NULL;
} }
if (!self->in_replication) {
PyErr_SetString(ProgrammingError,
"replication_sync_server() called when not in streaming replication loop");
} else {
Py_CLEAR(self->repl_sync_msg);
self->repl_sync_msg = msg;
Py_XINCREF(self->repl_sync_msg);
}
Py_RETURN_NONE; Py_RETURN_NONE;
} }
@ -1964,7 +1951,6 @@ cursor_clear(cursorObject *self)
Py_CLEAR(self->casts); Py_CLEAR(self->casts);
Py_CLEAR(self->caster); Py_CLEAR(self->caster);
Py_CLEAR(self->copyfile); Py_CLEAR(self->copyfile);
Py_CLEAR(self->repl_sync_msg);
Py_CLEAR(self->tuple_factory); Py_CLEAR(self->tuple_factory);
Py_CLEAR(self->tzinfo_factory); Py_CLEAR(self->tzinfo_factory);
Py_CLEAR(self->query); Py_CLEAR(self->query);
@ -2054,7 +2040,6 @@ cursor_traverse(cursorObject *self, visitproc visit, void *arg)
Py_VISIT(self->casts); Py_VISIT(self->casts);
Py_VISIT(self->caster); Py_VISIT(self->caster);
Py_VISIT(self->copyfile); Py_VISIT(self->copyfile);
Py_VISIT(self->repl_sync_msg);
Py_VISIT(self->tuple_factory); Py_VISIT(self->tuple_factory);
Py_VISIT(self->tzinfo_factory); Py_VISIT(self->tzinfo_factory);
Py_VISIT(self->query); Py_VISIT(self->query);

View File

@ -1685,7 +1685,7 @@ _pq_copy_both_v3(cursorObject *curs)
msg = (replicationMessageObject *) msg = (replicationMessageObject *)
PyObject_CallFunctionObjArgs((PyObject *)&replicationMessageType, PyObject_CallFunctionObjArgs((PyObject *)&replicationMessageType,
obj, NULL); curs, obj, NULL);
Py_DECREF(obj); Py_DECREF(obj);
if (!msg) { goto exit; } if (!msg) { goto exit; }
@ -1706,14 +1706,14 @@ _pq_copy_both_v3(cursorObject *curs)
written_lsn = wal_end; written_lsn = wal_end;
/* if requested by sync_server(msg), we confirm LSN with the server */ /* if requested by sync_server(msg), we confirm LSN with the server */
if (curs->repl_sync_msg) { if (curs->repl_sync_lsn != InvalidXLogRecPtr) {
Dprintf("_pq_copy_both_v3: server sync requested at "XLOGFMTSTR, Dprintf("_pq_copy_both_v3: server sync requested at "XLOGFMTSTR,
XLOGFMTARGS(curs->repl_sync_msg->wal_end)); XLOGFMTARGS(curs->repl_sync_lsn));
if (fsync_lsn < curs->repl_sync_msg->wal_end) if (fsync_lsn < curs->repl_sync_lsn)
fsync_lsn = curs->repl_sync_msg->wal_end; fsync_lsn = curs->repl_sync_lsn;
Py_CLEAR(curs->repl_sync_msg); curs->repl_sync_lsn = InvalidXLogRecPtr;
if (!sendFeedback(conn, written_lsn, fsync_lsn, 0)) { if (!sendFeedback(conn, written_lsn, fsync_lsn, 0)) {
pq_raise(curs->conn, curs, NULL); pq_raise(curs->conn, curs, NULL);

View File

@ -26,6 +26,7 @@
#ifndef PSYCOPG_REPLICATION_MESSAGE_H #ifndef PSYCOPG_REPLICATION_MESSAGE_H
#define PSYCOPG_REPLICATION_MESSAGE_H 1 #define PSYCOPG_REPLICATION_MESSAGE_H 1
#include "cursor.h"
#include "libpq_support.h" #include "libpq_support.h"
#ifdef __cplusplus #ifdef __cplusplus
@ -38,6 +39,7 @@ extern HIDDEN PyTypeObject replicationMessageType;
struct replicationMessageObject { struct replicationMessageObject {
PyObject_HEAD PyObject_HEAD
cursorObject *cursor;
PyObject *payload; PyObject *payload;
XLogRecPtr data_start; XLogRecPtr data_start;

View File

@ -27,7 +27,6 @@
#include "psycopg/psycopg.h" #include "psycopg/psycopg.h"
#include "psycopg/replication_message.h" #include "psycopg/replication_message.h"
#include "psycopg/libpq_support.h"
#include "datetime.h" #include "datetime.h"
@ -59,8 +58,9 @@ replmsg_init(PyObject *obj, PyObject *args, PyObject *kwargs)
{ {
replicationMessageObject *self = (replicationMessageObject*) obj; replicationMessageObject *self = (replicationMessageObject*) obj;
if (!PyArg_ParseTuple(args, "O", &self->payload)) if (!PyArg_ParseTuple(args, "O!O", &cursorType, &self->cursor, &self->payload))
return -1; return -1;
Py_XINCREF(self->cursor);
Py_XINCREF(self->payload); Py_XINCREF(self->payload);
self->data_start = 0; self->data_start = 0;
@ -70,16 +70,17 @@ replmsg_init(PyObject *obj, PyObject *args, PyObject *kwargs)
} }
static int static int
replmsg_clear(PyObject *self) replmsg_clear(replicationMessageObject *self)
{ {
Py_CLEAR(((replicationMessageObject*) self)->payload); Py_CLEAR(self->cursor);
Py_CLEAR(self->payload);
return 0; return 0;
} }
static void static void
replmsg_dealloc(PyObject* obj) replmsg_dealloc(PyObject* obj)
{ {
replmsg_clear(obj); replmsg_clear((replicationMessageObject*) obj);
} }
#define psyco_replmsg_send_time_doc \ #define psyco_replmsg_send_time_doc \
@ -108,6 +109,8 @@ psyco_replmsg_get_send_time(replicationMessageObject *self)
/* object member list */ /* object member list */
static struct PyMemberDef replicationMessageObject_members[] = { static struct PyMemberDef replicationMessageObject_members[] = {
{"cursor", T_OBJECT, OFFSETOF(cursor), READONLY,
"TODO"},
{"payload", T_OBJECT, OFFSETOF(payload), READONLY, {"payload", T_OBJECT, OFFSETOF(payload), READONLY,
"TODO"}, "TODO"},
{"data_start", T_ULONGLONG, OFFSETOF(data_start), READONLY, {"data_start", T_ULONGLONG, OFFSETOF(data_start), READONLY,
@ -151,7 +154,7 @@ PyTypeObject replicationMessageType = {
/*tp_flags*/ /*tp_flags*/
replicationMessageType_doc, /*tp_doc*/ replicationMessageType_doc, /*tp_doc*/
0, /*tp_traverse*/ 0, /*tp_traverse*/
replmsg_clear, /*tp_clear*/ (inquiry)replmsg_clear, /*tp_clear*/
0, /*tp_richcompare*/ 0, /*tp_richcompare*/
0, /*tp_weaklistoffset*/ 0, /*tp_weaklistoffset*/
0, /*tp_iter*/ 0, /*tp_iter*/