diff --git a/psycopg/cursor.h b/psycopg/cursor.h index 78ee21c4..1a630553 100644 --- a/psycopg/cursor.h +++ b/psycopg/cursor.h @@ -27,6 +27,7 @@ #define PSYCOPG_CURSOR_H 1 #include "psycopg/connection.h" +#include "libpq_support.h" #ifdef __cplusplus extern "C" { @@ -75,7 +76,7 @@ struct cursorObject { int in_replication; /* we're in streaming replication loop */ int stop_replication; /* client requested to stop replication */ int keepalive_interval; /* interval for keepalive messages in replication mode */ - replicationMessageObject *repl_sync_msg; /* set when the client asks us to sync the server */ + XLogRecPtr repl_sync_lsn; /* set when the client asks us to sync the server */ PyObject *tuple_factory; /* factory for result tuples */ PyObject *tzinfo_factory; /* factory for tzinfo objects */ diff --git a/psycopg/cursor_type.c b/psycopg/cursor_type.c index 1ea922bb..19f82c60 100644 --- a/psycopg/cursor_type.c +++ b/psycopg/cursor_type.c @@ -1609,14 +1609,13 @@ psyco_curs_start_replication_expert(cursorObject *self, PyObject *args) self->in_replication = 1; self->keepalive_interval = keepalive_interval; self->stop_replication = 0; - self->repl_sync_msg = NULL; + self->repl_sync_lsn = InvalidXLogRecPtr; if (pq_execute(self, command, 0, 1 /* no_result */, 1 /* no_begin */) >= 0) { res = Py_None; Py_INCREF(Py_None); } - Py_CLEAR(self->repl_sync_msg); Py_CLEAR(self->copyfile); self->in_replication = 0; @@ -1647,24 +1646,12 @@ psyco_curs_stop_replication(cursorObject *self) static PyObject * psyco_curs_replication_sync_server(cursorObject *self, PyObject *args) { - replicationMessageObject *msg; - EXC_IF_CURS_CLOSED(self); - if (!PyArg_ParseTuple(args, "O!", &replicationMessageType, &msg)) { + if (!PyArg_ParseTuple(args, "K", &self->repl_sync_lsn)) { return NULL; } - if (!self->in_replication) { - PyErr_SetString(ProgrammingError, - "replication_sync_server() called when not in streaming replication loop"); - } else { - Py_CLEAR(self->repl_sync_msg); - - self->repl_sync_msg = msg; - Py_XINCREF(self->repl_sync_msg); - } - Py_RETURN_NONE; } @@ -1964,7 +1951,6 @@ cursor_clear(cursorObject *self) Py_CLEAR(self->casts); Py_CLEAR(self->caster); Py_CLEAR(self->copyfile); - Py_CLEAR(self->repl_sync_msg); Py_CLEAR(self->tuple_factory); Py_CLEAR(self->tzinfo_factory); Py_CLEAR(self->query); @@ -2054,7 +2040,6 @@ cursor_traverse(cursorObject *self, visitproc visit, void *arg) Py_VISIT(self->casts); Py_VISIT(self->caster); Py_VISIT(self->copyfile); - Py_VISIT(self->repl_sync_msg); Py_VISIT(self->tuple_factory); Py_VISIT(self->tzinfo_factory); Py_VISIT(self->query); diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index 7a3ec19e..7ce06a86 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -1685,7 +1685,7 @@ _pq_copy_both_v3(cursorObject *curs) msg = (replicationMessageObject *) PyObject_CallFunctionObjArgs((PyObject *)&replicationMessageType, - obj, NULL); + curs, obj, NULL); Py_DECREF(obj); if (!msg) { goto exit; } @@ -1706,14 +1706,14 @@ _pq_copy_both_v3(cursorObject *curs) written_lsn = wal_end; /* if requested by sync_server(msg), we confirm LSN with the server */ - if (curs->repl_sync_msg) { + if (curs->repl_sync_lsn != InvalidXLogRecPtr) { Dprintf("_pq_copy_both_v3: server sync requested at "XLOGFMTSTR, - XLOGFMTARGS(curs->repl_sync_msg->wal_end)); + XLOGFMTARGS(curs->repl_sync_lsn)); - if (fsync_lsn < curs->repl_sync_msg->wal_end) - fsync_lsn = curs->repl_sync_msg->wal_end; + if (fsync_lsn < curs->repl_sync_lsn) + fsync_lsn = curs->repl_sync_lsn; - Py_CLEAR(curs->repl_sync_msg); + curs->repl_sync_lsn = InvalidXLogRecPtr; if (!sendFeedback(conn, written_lsn, fsync_lsn, 0)) { pq_raise(curs->conn, curs, NULL); diff --git a/psycopg/replication_message.h b/psycopg/replication_message.h index b03d1c4f..a7567a1d 100644 --- a/psycopg/replication_message.h +++ b/psycopg/replication_message.h @@ -26,6 +26,7 @@ #ifndef PSYCOPG_REPLICATION_MESSAGE_H #define PSYCOPG_REPLICATION_MESSAGE_H 1 +#include "cursor.h" #include "libpq_support.h" #ifdef __cplusplus @@ -38,6 +39,7 @@ extern HIDDEN PyTypeObject replicationMessageType; struct replicationMessageObject { PyObject_HEAD + cursorObject *cursor; PyObject *payload; XLogRecPtr data_start; diff --git a/psycopg/replication_message_type.c b/psycopg/replication_message_type.c index 5d15ca61..27a9c916 100644 --- a/psycopg/replication_message_type.c +++ b/psycopg/replication_message_type.c @@ -27,7 +27,6 @@ #include "psycopg/psycopg.h" #include "psycopg/replication_message.h" -#include "psycopg/libpq_support.h" #include "datetime.h" @@ -59,8 +58,9 @@ replmsg_init(PyObject *obj, PyObject *args, PyObject *kwargs) { replicationMessageObject *self = (replicationMessageObject*) obj; - if (!PyArg_ParseTuple(args, "O", &self->payload)) + if (!PyArg_ParseTuple(args, "O!O", &cursorType, &self->cursor, &self->payload)) return -1; + Py_XINCREF(self->cursor); Py_XINCREF(self->payload); self->data_start = 0; @@ -70,16 +70,17 @@ replmsg_init(PyObject *obj, PyObject *args, PyObject *kwargs) } static int -replmsg_clear(PyObject *self) +replmsg_clear(replicationMessageObject *self) { - Py_CLEAR(((replicationMessageObject*) self)->payload); + Py_CLEAR(self->cursor); + Py_CLEAR(self->payload); return 0; } static void replmsg_dealloc(PyObject* obj) { - replmsg_clear(obj); + replmsg_clear((replicationMessageObject*) obj); } #define psyco_replmsg_send_time_doc \ @@ -108,6 +109,8 @@ psyco_replmsg_get_send_time(replicationMessageObject *self) /* object member list */ static struct PyMemberDef replicationMessageObject_members[] = { + {"cursor", T_OBJECT, OFFSETOF(cursor), READONLY, + "TODO"}, {"payload", T_OBJECT, OFFSETOF(payload), READONLY, "TODO"}, {"data_start", T_ULONGLONG, OFFSETOF(data_start), READONLY, @@ -151,7 +154,7 @@ PyTypeObject replicationMessageType = { /*tp_flags*/ replicationMessageType_doc, /*tp_doc*/ 0, /*tp_traverse*/ - replmsg_clear, /*tp_clear*/ + (inquiry)replmsg_clear, /*tp_clear*/ 0, /*tp_richcompare*/ 0, /*tp_weaklistoffset*/ 0, /*tp_iter*/