mirror of
https://github.com/psycopg/psycopg2.git
synced 2025-07-10 16:22:33 +03:00
Add handling of send_time field in replmsg
This commit is contained in:
parent
1ac385d1fb
commit
9fc5bf4436
|
@ -41,13 +41,6 @@
|
||||||
|
|
||||||
/* support routines taken from pg_basebackup/streamutil.c */
|
/* support routines taken from pg_basebackup/streamutil.c */
|
||||||
|
|
||||||
/* Julian-date equivalents of Day 0 in Unix and Postgres reckoning */
|
|
||||||
#define UNIX_EPOCH_JDATE 2440588 /* == date2j(1970, 1, 1) */
|
|
||||||
#define POSTGRES_EPOCH_JDATE 2451545 /* == date2j(2000, 1, 1) */
|
|
||||||
|
|
||||||
#define SECS_PER_DAY 86400
|
|
||||||
#define USECS_PER_SEC 1000000LL
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Frontend version of GetCurrentTimestamp(), since we are not linked with
|
* Frontend version of GetCurrentTimestamp(), since we are not linked with
|
||||||
* backend code. The protocol always uses integer timestamps, regardless of
|
* backend code. The protocol always uses integer timestamps, regardless of
|
||||||
|
|
|
@ -37,6 +37,13 @@ typedef unsigned PG_INT64_TYPE XLogRecPtr;
|
||||||
#define XLOGFMTSTR "%x/%x"
|
#define XLOGFMTSTR "%x/%x"
|
||||||
#define XLOGFMTARGS(x) ((uint32)((x) >> 32)), ((uint32)((x) & 0xFFFFFFFF))
|
#define XLOGFMTARGS(x) ((uint32)((x) >> 32)), ((uint32)((x) & 0xFFFFFFFF))
|
||||||
|
|
||||||
|
/* Julian-date equivalents of Day 0 in Unix and Postgres reckoning */
|
||||||
|
#define UNIX_EPOCH_JDATE 2440588 /* == date2j(1970, 1, 1) */
|
||||||
|
#define POSTGRES_EPOCH_JDATE 2451545 /* == date2j(2000, 1, 1) */
|
||||||
|
|
||||||
|
#define SECS_PER_DAY 86400
|
||||||
|
#define USECS_PER_SEC 1000000LL
|
||||||
|
|
||||||
HIDDEN pg_int64 feGetCurrentTimestamp(void);
|
HIDDEN pg_int64 feGetCurrentTimestamp(void);
|
||||||
HIDDEN void fe_sendint64(pg_int64 i, char *buf);
|
HIDDEN void fe_sendint64(pg_int64 i, char *buf);
|
||||||
HIDDEN pg_int64 fe_recvint64(char *buf);
|
HIDDEN pg_int64 fe_recvint64(char *buf);
|
||||||
|
|
|
@ -1572,6 +1572,7 @@ _pq_copy_both_v3(cursorObject *curs)
|
||||||
XLogRecPtr written_lsn = InvalidXLogRecPtr,
|
XLogRecPtr written_lsn = InvalidXLogRecPtr,
|
||||||
fsync_lsn = InvalidXLogRecPtr,
|
fsync_lsn = InvalidXLogRecPtr,
|
||||||
data_start, wal_end;
|
data_start, wal_end;
|
||||||
|
pg_int64 send_time;
|
||||||
|
|
||||||
if (!curs->copyfile) {
|
if (!curs->copyfile) {
|
||||||
PyErr_SetString(ProgrammingError,
|
PyErr_SetString(ProgrammingError,
|
||||||
|
@ -1669,10 +1670,10 @@ _pq_copy_both_v3(cursorObject *curs)
|
||||||
|
|
||||||
data_start = fe_recvint64(buffer + 1);
|
data_start = fe_recvint64(buffer + 1);
|
||||||
wal_end = fe_recvint64(buffer + 1 + 8);
|
wal_end = fe_recvint64(buffer + 1 + 8);
|
||||||
/*send_time = fe_recvint64(buffer + 1 + 8 + 8);*/
|
send_time = fe_recvint64(buffer + 1 + 8 + 8);
|
||||||
|
|
||||||
Dprintf("_pq_copy_both_v3: data_start="XLOGFMTSTR", wal_end="XLOGFMTSTR,
|
Dprintf("_pq_copy_both_v3: data_start="XLOGFMTSTR", wal_end="XLOGFMTSTR", send_time=%lld",
|
||||||
XLOGFMTARGS(data_start), XLOGFMTARGS(wal_end));
|
XLOGFMTARGS(data_start), XLOGFMTARGS(wal_end), send_time);
|
||||||
|
|
||||||
if (is_text) {
|
if (is_text) {
|
||||||
obj = PyUnicode_Decode(buffer + hdr, len - hdr, curs->conn->codec, NULL);
|
obj = PyUnicode_Decode(buffer + hdr, len - hdr, curs->conn->codec, NULL);
|
||||||
|
@ -1690,6 +1691,7 @@ _pq_copy_both_v3(cursorObject *curs)
|
||||||
|
|
||||||
msg->data_start = data_start;
|
msg->data_start = data_start;
|
||||||
msg->wal_end = wal_end;
|
msg->wal_end = wal_end;
|
||||||
|
msg->send_time = send_time;
|
||||||
|
|
||||||
tmp = PyObject_CallFunctionObjArgs(write_func, msg, NULL);
|
tmp = PyObject_CallFunctionObjArgs(write_func, msg, NULL);
|
||||||
|
|
||||||
|
|
|
@ -869,6 +869,7 @@ INIT_MODULE(_psycopg)(void)
|
||||||
/* Initialize the PyDateTimeAPI everywhere is used */
|
/* Initialize the PyDateTimeAPI everywhere is used */
|
||||||
PyDateTime_IMPORT;
|
PyDateTime_IMPORT;
|
||||||
if (psyco_adapter_datetime_init()) { goto exit; }
|
if (psyco_adapter_datetime_init()) { goto exit; }
|
||||||
|
if (psyco_replmsg_datetime_init()) { goto exit; }
|
||||||
|
|
||||||
Py_TYPE(&pydatetimeType) = &PyType_Type;
|
Py_TYPE(&pydatetimeType) = &PyType_Type;
|
||||||
if (PyType_Ready(&pydatetimeType) == -1) goto exit;
|
if (PyType_Ready(&pydatetimeType) == -1) goto exit;
|
||||||
|
|
|
@ -42,9 +42,11 @@ struct replicationMessageObject {
|
||||||
|
|
||||||
XLogRecPtr data_start;
|
XLogRecPtr data_start;
|
||||||
XLogRecPtr wal_end;
|
XLogRecPtr wal_end;
|
||||||
/* send_time */
|
pg_int64 send_time;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
RAISES_NEG int psyco_replmsg_datetime_init(void);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -27,14 +27,31 @@
|
||||||
#include "psycopg/psycopg.h"
|
#include "psycopg/psycopg.h"
|
||||||
|
|
||||||
#include "psycopg/replication_message.h"
|
#include "psycopg/replication_message.h"
|
||||||
|
#include "psycopg/libpq_support.h"
|
||||||
|
|
||||||
|
#include "datetime.h"
|
||||||
|
|
||||||
|
RAISES_NEG int
|
||||||
|
psyco_replmsg_datetime_init(void)
|
||||||
|
{
|
||||||
|
Dprintf("psyco_replmsg_datetime_init: datetime init");
|
||||||
|
|
||||||
|
PyDateTime_IMPORT;
|
||||||
|
|
||||||
|
if (!PyDateTimeAPI) {
|
||||||
|
PyErr_SetString(PyExc_ImportError, "datetime initialization failed");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static PyObject *
|
static PyObject *
|
||||||
replmsg_repr(replicationMessageObject *self)
|
replmsg_repr(replicationMessageObject *self)
|
||||||
{
|
{
|
||||||
return PyString_FromFormat(
|
return PyString_FromFormat(
|
||||||
"<replicationMessage object at %p; data_start: "XLOGFMTSTR"; wal_end: "XLOGFMTSTR">",
|
"<replicationMessage object at %p; data_start: "XLOGFMTSTR"; wal_end: "XLOGFMTSTR"; send_time: %lld>",
|
||||||
self, XLOGFMTARGS(self->data_start), XLOGFMTARGS(self->wal_end));
|
self, XLOGFMTARGS(self->data_start), XLOGFMTARGS(self->wal_end), self->send_time);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int
|
static int
|
||||||
|
@ -65,6 +82,26 @@ replmsg_dealloc(PyObject* obj)
|
||||||
replmsg_clear(obj);
|
replmsg_clear(obj);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#define psyco_replmsg_send_time_doc \
|
||||||
|
"send_time - Timestamp of the replication message departure from the server."
|
||||||
|
|
||||||
|
static PyObject *
|
||||||
|
psyco_replmsg_get_send_time(replicationMessageObject *self)
|
||||||
|
{
|
||||||
|
PyObject *tval, *res = NULL;
|
||||||
|
double t;
|
||||||
|
|
||||||
|
t = (double)self->send_time / USECS_PER_SEC +
|
||||||
|
((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
|
||||||
|
|
||||||
|
tval = Py_BuildValue("(d)", t);
|
||||||
|
if (tval) {
|
||||||
|
res = PyDateTime_FromTimestamp(tval);
|
||||||
|
Py_DECREF(tval);
|
||||||
|
}
|
||||||
|
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
#define OFFSETOF(x) offsetof(replicationMessageObject, x)
|
#define OFFSETOF(x) offsetof(replicationMessageObject, x)
|
||||||
|
|
||||||
|
@ -80,6 +117,12 @@ static struct PyMemberDef replicationMessageObject_members[] = {
|
||||||
{NULL}
|
{NULL}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
static struct PyGetSetDef replicationMessageObject_getsets[] = {
|
||||||
|
{ "send_time", (getter)psyco_replmsg_get_send_time, NULL,
|
||||||
|
psyco_replmsg_send_time_doc, NULL },
|
||||||
|
{NULL}
|
||||||
|
};
|
||||||
|
|
||||||
/* object type */
|
/* object type */
|
||||||
|
|
||||||
#define replicationMessageType_doc \
|
#define replicationMessageType_doc \
|
||||||
|
@ -115,7 +158,7 @@ PyTypeObject replicationMessageType = {
|
||||||
0, /*tp_iternext*/
|
0, /*tp_iternext*/
|
||||||
0, /*tp_methods*/
|
0, /*tp_methods*/
|
||||||
replicationMessageObject_members, /*tp_members*/
|
replicationMessageObject_members, /*tp_members*/
|
||||||
0, /*tp_getset*/
|
replicationMessageObject_getsets, /*tp_getset*/
|
||||||
0, /*tp_base*/
|
0, /*tp_base*/
|
||||||
0, /*tp_dict*/
|
0, /*tp_dict*/
|
||||||
0, /*tp_descr_get*/
|
0, /*tp_descr_get*/
|
||||||
|
|
Loading…
Reference in New Issue
Block a user