Properly subclass ReplicationCursor on C level.

This commit is contained in:
Oleksandr Shulgin 2015-10-19 20:00:39 +02:00
parent 7aea2cef6e
commit 0bb81fc848
14 changed files with 554 additions and 349 deletions

View File

@ -324,16 +324,15 @@ The individual messages in the replication stream are presented by
`start_replication_expert()` internally. `start_replication_expert()` internally.
After starting the replication, to actually consume the incoming After starting the replication, to actually consume the incoming
server messages, use `consume_replication_stream()` or implement a server messages, use `consume_stream()` or implement a loop around
loop around `read_replication_message()` in case of asynchronous `read_message()` in case of asynchronous connection.
connection.
.. method:: start_replication_expert(command) .. method:: start_replication_expert(command)
Start replication on the connection using provided ``START_REPLICATION`` Start replication on the connection using provided ``START_REPLICATION``
command. command.
.. method:: consume_replication_stream(consume, decode=False, keepalive_interval=10) .. method:: consume_stream(consume, decode=False, keepalive_interval=10)
:param consume: a callable object with signature ``consume(msg)`` :param consume: a callable object with signature ``consume(msg)``
:param decode: a flag indicating that unicode conversion should be :param decode: a flag indicating that unicode conversion should be
@ -342,7 +341,7 @@ The individual messages in the replication stream are presented by
messages to the server messages to the server
This method can only be used with synchronous connection. For This method can only be used with synchronous connection. For
asynchronous connections see `read_replication_message()`. asynchronous connections see `read_message()`.
Before calling this method to consume the stream, use Before calling this method to consume the stream, use
`start_replication()` first. `start_replication()` first.
@ -372,18 +371,18 @@ The individual messages in the replication stream are presented by
self.store_message_data(msg.payload) self.store_message_data(msg.payload)
if self.should_report_to_the_server_now(msg): if self.should_report_to_the_server_now(msg):
msg.cursor.send_replication_feedback(flush_lsn=msg.data_start) msg.cursor.send_feedback(flush_lsn=msg.data_start)
consumer = LogicalStreamConsumer() consumer = LogicalStreamConsumer()
cur.consume_replication_stream(consumer, decode=True) cur.consume_stream(consumer, decode=True)
The *msg* objects passed to ``consume()`` are instances of The *msg* objects passed to ``consume()`` are instances of
`ReplicationMessage` class. `ReplicationMessage` class.
After storing certain amount of messages' data reliably, the client After storing certain amount of messages' data reliably, the client
should send a confirmation message to the server. This should be done should send a confirmation message to the server. This should be done
by calling `send_replication_feedback()` method on the corresponding by calling `send_feedback()` method on the corresponding replication
replication cursor. A reference to the cursor is provided in the cursor. A reference to the cursor is provided in the
`ReplicationMessage` as an attribute. `ReplicationMessage` as an attribute.
.. warning:: .. warning::
@ -400,7 +399,7 @@ The individual messages in the replication stream are presented by
load on network and the server. A possible strategy is to confirm load on network and the server. A possible strategy is to confirm
after every COMMIT message. after every COMMIT message.
.. method:: send_replication_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False) .. method:: send_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False)
:param write_lsn: a LSN position up to which the client has written the data locally :param write_lsn: a LSN position up to which the client has written the data locally
:param flush_lsn: a LSN position up to which the client has stored the :param flush_lsn: a LSN position up to which the client has stored the
@ -419,16 +418,15 @@ The individual messages in the replication stream are presented by
just send a keepalive message to the server. just send a keepalive message to the server.
If the feedback message could not be sent, updates the passed LSN If the feedback message could not be sent, updates the passed LSN
positions in the cursor for a later call to positions in the cursor for a later call to `flush_feedback()` and
`flush_replication_feedback()` and returns `!False`, otherwise returns returns `!False`, otherwise returns `!True`.
`!True`.
.. method:: flush_replication_feedback(reply=False) .. method:: flush_feedback(reply=False)
:param reply: request the server to send back a keepalive message immediately :param reply: request the server to send back a keepalive message immediately
This method tries to flush the latest replication feedback message This method tries to flush the latest replication feedback message
that `send_replication_feedback()` was trying to send but couldn't. that `send_feedback()` was trying to send but couldn't.
If *reply* is `!True` sends a keepalive message in either case. If *reply* is `!True` sends a keepalive message in either case.
@ -437,14 +435,13 @@ The individual messages in the replication stream are presented by
Low-level methods for asynchronous connection operation. Low-level methods for asynchronous connection operation.
With the synchronous connection, a call to `consume_replication_stream()` With the synchronous connection, a call to `consume_stream()` handles all
handles all the complexity of handling the incoming messages and sending the complexity of handling the incoming messages and sending keepalive
keepalive replies, but at times it might be beneficial to use low-level replies, but at times it might be beneficial to use low-level interface
interface for better control, in particular to `~select.select()` on for better control, in particular to `~select.select()` on multiple
multiple sockets. The following methods are provided for asynchronous sockets. The following methods are provided for asynchronous operation:
operation:
.. method:: read_replication_message(decode=True) .. method:: read_message(decode=True)
:param decode: a flag indicating that unicode conversion should be :param decode: a flag indicating that unicode conversion should be
performed on the data received from the server performed on the data received from the server
@ -475,7 +472,7 @@ The individual messages in the replication stream are presented by
This is a convenience method which allows replication cursor to be This is a convenience method which allows replication cursor to be
used directly in `~select.select()` or `~select.poll()` calls. used directly in `~select.select()` or `~select.poll()` calls.
.. attribute:: replication_io_timestamp .. attribute:: io_timestamp
A `~datetime` object representing the timestamp at the moment of last A `~datetime` object representing the timestamp at the moment of last
communication with the server (a data or keepalive message in either communication with the server (a data or keepalive message in either
@ -488,18 +485,19 @@ The individual messages in the replication stream are presented by
keepalive_interval = 10.0 keepalive_interval = 10.0
while True: while True:
msg = cur.read_replication_message() msg = cur.read_message()
if msg: if msg:
consume(msg) consume(msg)
else: else:
timeout = keepalive_interval - (datetime.now() - cur.replication_io_timestamp).total_seconds() now = datetime.now()
timeout = keepalive_interval - (now - cur.io_timestamp).total_seconds()
if timeout > 0: if timeout > 0:
sel = select.select([cur], [], [], timeout) sel = select.select([cur], [], [], timeout)
else: else:
sel = ([], [], []) sel = ([], [], [])
if not sel[0]: if not sel[0]:
cur.send_replication_feedback() cur.send_feedback()
.. index:: .. index::
pair: Cursor; Replication pair: Cursor; Replication

View File

@ -61,7 +61,7 @@ from psycopg2._psycopg import string_types, binary_types, new_type, new_array_ty
from psycopg2._psycopg import ISQLQuote, Notify, Diagnostics, Column from psycopg2._psycopg import ISQLQuote, Notify, Diagnostics, Column
from psycopg2._psycopg import QueryCanceledError, TransactionRollbackError from psycopg2._psycopg import QueryCanceledError, TransactionRollbackError
from psycopg2._psycopg import replicationMessage from psycopg2._psycopg import ReplicationCursor, ReplicationMessage
try: try:
from psycopg2._psycopg import set_wait_callback, get_wait_callback from psycopg2._psycopg import set_wait_callback, get_wait_callback

View File

@ -39,7 +39,8 @@ import psycopg2
from psycopg2 import extensions as _ext from psycopg2 import extensions as _ext
from psycopg2.extensions import cursor as _cursor from psycopg2.extensions import cursor as _cursor
from psycopg2.extensions import connection as _connection from psycopg2.extensions import connection as _connection
from psycopg2.extensions import replicationMessage as ReplicationMessage from psycopg2.extensions import ReplicationCursor as _replicationCursor
from psycopg2.extensions import ReplicationMessage
from psycopg2.extensions import adapt as _A, quote_ident from psycopg2.extensions import adapt as _A, quote_ident
from psycopg2.extensions import b from psycopg2.extensions import b
@ -503,7 +504,7 @@ class PhysicalReplicationConnection(ReplicationConnectionBase):
class StopReplication(Exception): class StopReplication(Exception):
""" """
Exception used to break out of the endless loop in Exception used to break out of the endless loop in
`~ReplicationCursor.consume_replication_stream()`. `~ReplicationCursor.consume_stream()`.
Subclass of `~exceptions.Exception`. Intentionally *not* inherited from Subclass of `~exceptions.Exception`. Intentionally *not* inherited from
`~psycopg2.Error` as occurrence of this exception does not indicate an `~psycopg2.Error` as occurrence of this exception does not indicate an
@ -512,7 +513,7 @@ class StopReplication(Exception):
pass pass
class ReplicationCursor(_cursor): class ReplicationCursor(_replicationCursor):
"""A cursor used for communication on the replication protocol.""" """A cursor used for communication on the replication protocol."""
def create_replication_slot(self, slot_name, slot_type=None, output_plugin=None): def create_replication_slot(self, slot_name, slot_type=None, output_plugin=None):
@ -598,9 +599,6 @@ class ReplicationCursor(_cursor):
self.start_replication_expert(command) self.start_replication_expert(command)
def send_feedback_message(self, written_lsn=0, sync_lsn=0, apply_lsn=0, reply_requested=False):
return self.send_replication_feedback(written_lsn, sync_lsn, apply_lsn, reply_requested)
# allows replication cursors to be used in select.select() directly # allows replication cursors to be used in select.select() directly
def fileno(self): def fileno(self):
return self.connection.fileno() return self.connection.fileno()

View File

@ -27,7 +27,6 @@
#define PSYCOPG_CURSOR_H 1 #define PSYCOPG_CURSOR_H 1
#include "psycopg/connection.h" #include "psycopg/connection.h"
#include "libpq_support.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -74,14 +73,6 @@ struct cursorObject {
#define DEFAULT_COPYBUFF 8192 #define DEFAULT_COPYBUFF 8192
/* replication cursor attrs */ /* replication cursor attrs */
int repl_started:1; /* if replication is started */
int repl_consuming:1; /* if running the consume loop */
struct timeval repl_keepalive_interval; /* interval for keepalive messages in replication mode */
XLogRecPtr repl_write_lsn; /* LSN stats for replication feedback messages */
XLogRecPtr repl_flush_lsn;
XLogRecPtr repl_apply_lsn;
int repl_feedback_pending; /* flag set when we couldn't send the feedback to the server */
struct timeval repl_last_io; /* timestamp of the last exchange with the server */
PyObject *tuple_factory; /* factory for result tuples */ PyObject *tuple_factory; /* factory for result tuples */
PyObject *tzinfo_factory; /* factory for tzinfo objects */ PyObject *tzinfo_factory; /* factory for tzinfo objects */
@ -106,7 +97,7 @@ HIDDEN void curs_reset(cursorObject *self);
HIDDEN int psyco_curs_withhold_set(cursorObject *self, PyObject *pyvalue); HIDDEN int psyco_curs_withhold_set(cursorObject *self, PyObject *pyvalue);
HIDDEN int psyco_curs_scrollable_set(cursorObject *self, PyObject *pyvalue); HIDDEN int psyco_curs_scrollable_set(cursorObject *self, PyObject *pyvalue);
RAISES_NEG int psyco_curs_datetime_init(void); HIDDEN int psyco_curs_init(PyObject *obj, PyObject *args, PyObject *kwargs);
/* exception-raising macros */ /* exception-raising macros */
#define EXC_IF_CURS_CLOSED(self) \ #define EXC_IF_CURS_CLOSED(self) \
@ -149,22 +140,6 @@ do \
return NULL; } \ return NULL; } \
while (0) while (0)
#define EXC_IF_REPLICATING(self, cmd) \
do \
if ((self)->repl_started) { \
PyErr_SetString(ProgrammingError, \
#cmd " cannot be used when replication is already in progress"); \
return NULL; } \
while (0)
#define EXC_IF_NOT_REPLICATING(self, cmd) \
do \
if (!(self)->repl_started) { \
PyErr_SetString(ProgrammingError, \
#cmd " cannot be used when replication is not in progress"); \
return NULL; } \
while (0)
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -28,7 +28,6 @@
#include "psycopg/cursor.h" #include "psycopg/cursor.h"
#include "psycopg/connection.h" #include "psycopg/connection.h"
#include "psycopg/replication_message.h"
#include "psycopg/green.h" #include "psycopg/green.h"
#include "psycopg/pqpath.h" #include "psycopg/pqpath.h"
#include "psycopg/typecast.h" #include "psycopg/typecast.h"
@ -39,9 +38,6 @@
#include <stdlib.h> #include <stdlib.h>
/* python */
#include "datetime.h"
/** DBAPI methods **/ /** DBAPI methods **/
@ -1583,222 +1579,6 @@ exit:
return res; return res;
} }
#define psyco_curs_start_replication_expert_doc \
"start_replication_expert(command, writer=None, keepalive_interval=10) -- Start and consume replication stream with direct command."
static PyObject *
psyco_curs_start_replication_expert(cursorObject *self, PyObject *args, PyObject *kwargs)
{
PyObject *res = NULL;
char *command;
static char *kwlist[] = {"command", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s", kwlist, &command)) {
return NULL;
}
EXC_IF_CURS_CLOSED(self);
EXC_IF_GREEN(start_replication_expert);
EXC_IF_TPC_PREPARED(self->conn, start_replication_expert);
EXC_IF_REPLICATING(self, start_replication_expert);
Dprintf("psyco_curs_start_replication_expert: %s", command);
self->copysize = 0;
self->repl_consuming = 0;
self->repl_write_lsn = InvalidXLogRecPtr;
self->repl_flush_lsn = InvalidXLogRecPtr;
self->repl_apply_lsn = InvalidXLogRecPtr;
self->repl_feedback_pending = 0;
gettimeofday(&self->repl_last_io, NULL);
if (pq_execute(self, command, self->conn->async,
1 /* no_result */, 1 /* no_begin */) >= 0) {
res = Py_None;
Py_INCREF(res);
self->repl_started = 1;
}
return res;
}
#define psyco_curs_consume_replication_stream_doc \
"consume_replication_stream(consumer, keepalive_interval=10) -- Consume replication stream."
static PyObject *
psyco_curs_consume_replication_stream(cursorObject *self, PyObject *args, PyObject *kwargs)
{
PyObject *consume = NULL, *res = NULL;
int decode = 0;
double keepalive_interval = 10;
static char *kwlist[] = {"consume", "decode", "keepalive_interval", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|id", kwlist,
&consume, &decode, &keepalive_interval)) {
return NULL;
}
EXC_IF_CURS_CLOSED(self);
EXC_IF_CURS_ASYNC(self, consume_replication_stream);
EXC_IF_GREEN(consume_replication_stream);
EXC_IF_TPC_PREPARED(self->conn, consume_replication_stream);
EXC_IF_NOT_REPLICATING(self, consume_replication_stream);
if (self->repl_consuming) {
PyErr_SetString(ProgrammingError,
"consume_replication_stream cannot be used when already in the consume loop");
return NULL;
}
Dprintf("psyco_curs_consume_replication_stream");
if (keepalive_interval < 1.0) {
psyco_set_error(ProgrammingError, self, "keepalive_interval must be >= 1 (sec)");
return NULL;
}
self->repl_consuming = 1;
if (pq_copy_both(self, consume, decode, keepalive_interval) >= 0) {
res = Py_None;
Py_INCREF(res);
}
self->repl_consuming = 0;
return res;
}
#define psyco_curs_read_replication_message_doc \
"read_replication_message(decode=True) -- Try reading a replication message from the server (non-blocking)."
static PyObject *
psyco_curs_read_replication_message(cursorObject *self, PyObject *args, PyObject *kwargs)
{
int decode = 1;
static char *kwlist[] = {"decode", NULL};
EXC_IF_CURS_CLOSED(self);
EXC_IF_GREEN(read_replication_message);
EXC_IF_TPC_PREPARED(self->conn, read_replication_message);
EXC_IF_NOT_REPLICATING(self, read_replication_message);
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|i", kwlist,
&decode)) {
return NULL;
}
return pq_read_replication_message(self, decode);
}
static PyObject *
curs_flush_replication_feedback(cursorObject *self, int reply)
{
if (!(self->repl_feedback_pending || reply))
Py_RETURN_TRUE;
if (pq_send_replication_feedback(self, reply)) {
self->repl_feedback_pending = 0;
Py_RETURN_TRUE;
} else {
self->repl_feedback_pending = 1;
Py_RETURN_FALSE;
}
}
#define psyco_curs_send_replication_feedback_doc \
"send_replication_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False) -- Try sending a replication feedback message to the server and optionally request a reply."
static PyObject *
psyco_curs_send_replication_feedback(cursorObject *self, PyObject *args, PyObject *kwargs)
{
XLogRecPtr write_lsn = InvalidXLogRecPtr,
flush_lsn = InvalidXLogRecPtr,
apply_lsn = InvalidXLogRecPtr;
int reply = 0;
static char* kwlist[] = {"write_lsn", "flush_lsn", "apply_lsn", "reply", NULL};
EXC_IF_CURS_CLOSED(self);
EXC_IF_NOT_REPLICATING(self, send_replication_feedback);
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|KKKi", kwlist,
&write_lsn, &flush_lsn, &apply_lsn, &reply)) {
return NULL;
}
if (write_lsn > self->repl_write_lsn)
self->repl_write_lsn = write_lsn;
if (flush_lsn > self->repl_flush_lsn)
self->repl_flush_lsn = flush_lsn;
if (apply_lsn > self->repl_apply_lsn)
self->repl_apply_lsn = apply_lsn;
self->repl_feedback_pending = 1;
return curs_flush_replication_feedback(self, reply);
}
#define psyco_curs_flush_replication_feedback_doc \
"flush_replication_feedback(reply=False) -- Try flushing the latest pending replication feedback message to the server and optionally request a reply."
static PyObject *
psyco_curs_flush_replication_feedback(cursorObject *self, PyObject *args, PyObject *kwargs)
{
int reply = 0;
static char *kwlist[] = {"reply", NULL};
EXC_IF_CURS_CLOSED(self);
EXC_IF_NOT_REPLICATING(self, flush_replication_feedback);
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|i", kwlist,
&reply)) {
return NULL;
}
return curs_flush_replication_feedback(self, reply);
}
RAISES_NEG int
psyco_curs_datetime_init(void)
{
Dprintf("psyco_curs_datetime_init: datetime init");
PyDateTime_IMPORT;
if (!PyDateTimeAPI) {
PyErr_SetString(PyExc_ImportError, "datetime initialization failed");
return -1;
}
return 0;
}
#define psyco_curs_replication_io_timestamp_doc \
"replication_io_timestamp -- the timestamp of latest IO with the server"
static PyObject *
psyco_curs_get_replication_io_timestamp(cursorObject *self)
{
PyObject *tval, *res = NULL;
double seconds;
EXC_IF_CURS_CLOSED(self);
seconds = self->repl_last_io.tv_sec + self->repl_last_io.tv_usec / 1.0e6;
tval = Py_BuildValue("(d)", seconds);
if (tval) {
res = PyDateTime_FromTimestamp(tval);
Py_DECREF(tval);
}
return res;
}
/* extension: closed - return true if cursor is closed */ /* extension: closed - return true if cursor is closed */
#define psyco_curs_closed_doc \ #define psyco_curs_closed_doc \
@ -1973,16 +1753,6 @@ static struct PyMethodDef cursorObject_methods[] = {
METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_to_doc}, METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_to_doc},
{"copy_expert", (PyCFunction)psyco_curs_copy_expert, {"copy_expert", (PyCFunction)psyco_curs_copy_expert,
METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_expert_doc}, METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_expert_doc},
{"start_replication_expert", (PyCFunction)psyco_curs_start_replication_expert,
METH_VARARGS|METH_KEYWORDS, psyco_curs_start_replication_expert_doc},
{"consume_replication_stream", (PyCFunction)psyco_curs_consume_replication_stream,
METH_VARARGS|METH_KEYWORDS, psyco_curs_consume_replication_stream_doc},
{"read_replication_message", (PyCFunction)psyco_curs_read_replication_message,
METH_VARARGS|METH_KEYWORDS, psyco_curs_read_replication_message_doc},
{"send_replication_feedback", (PyCFunction)psyco_curs_send_replication_feedback,
METH_VARARGS|METH_KEYWORDS, psyco_curs_send_replication_feedback_doc},
{"flush_replication_feedback", (PyCFunction)psyco_curs_flush_replication_feedback,
METH_VARARGS|METH_KEYWORDS, psyco_curs_flush_replication_feedback_doc},
{NULL} {NULL}
}; };
@ -2033,9 +1803,6 @@ static struct PyGetSetDef cursorObject_getsets[] = {
(getter)psyco_curs_scrollable_get, (getter)psyco_curs_scrollable_get,
(setter)psyco_curs_scrollable_set, (setter)psyco_curs_scrollable_set,
psyco_curs_scrollable_doc, NULL }, psyco_curs_scrollable_doc, NULL },
{ "replication_io_timestamp",
(getter)psyco_curs_get_replication_io_timestamp, NULL,
psyco_curs_replication_io_timestamp_doc, NULL },
{NULL} {NULL}
}; };
@ -2134,7 +1901,7 @@ cursor_dealloc(PyObject* obj)
Py_TYPE(obj)->tp_free(obj); Py_TYPE(obj)->tp_free(obj);
} }
static int int
cursor_init(PyObject *obj, PyObject *args, PyObject *kwargs) cursor_init(PyObject *obj, PyObject *args, PyObject *kwargs)
{ {
PyObject *conn; PyObject *conn;

View File

@ -35,6 +35,7 @@
#include "psycopg/pqpath.h" #include "psycopg/pqpath.h"
#include "psycopg/connection.h" #include "psycopg/connection.h"
#include "psycopg/cursor.h" #include "psycopg/cursor.h"
#include "psycopg/replication_cursor.h"
#include "psycopg/replication_message.h" #include "psycopg/replication_message.h"
#include "psycopg/green.h" #include "psycopg/green.h"
#include "psycopg/typecast.h" #include "psycopg/typecast.h"
@ -1542,19 +1543,23 @@ exit:
are never returned to the caller. are never returned to the caller.
*/ */
PyObject * PyObject *
pq_read_replication_message(cursorObject *curs, int decode) pq_read_replication_message(replicationCursorObject *repl, int decode)
{ {
cursorObject *curs = &repl->cur;
connectionObject *conn = curs->conn;
PGconn *pgconn = conn->pgconn;
char *buffer = NULL; char *buffer = NULL;
int len, data_size, consumed, hdr, reply; int len, data_size, consumed, hdr, reply;
XLogRecPtr data_start, wal_end; XLogRecPtr data_start, wal_end;
pg_int64 send_time; pg_int64 send_time;
PyObject *str = NULL, *msg = NULL; PyObject *str = NULL, *result = NULL;
replicationMessageObject *msg = NULL;
Dprintf("pq_read_replication_message(decode=%d)", decode); Dprintf("pq_read_replication_message(decode=%d)", decode);
consumed = 0; consumed = 0;
retry: retry:
len = PQgetCopyData(curs->conn->pgconn, &buffer, 1 /* async */); len = PQgetCopyData(pgconn, &buffer, 1 /* async */);
if (len == 0) { if (len == 0) {
/* If we've tried reading some data, but there was none, bail out. */ /* If we've tried reading some data, but there was none, bail out. */
@ -1566,8 +1571,8 @@ retry:
server we might be reading a number of messages for every single server we might be reading a number of messages for every single
one we process, thus overgrowing the internal buffer until the one we process, thus overgrowing the internal buffer until the
client system runs out of memory. */ client system runs out of memory. */
if (!PQconsumeInput(curs->conn->pgconn)) { if (!PQconsumeInput(pgconn)) {
pq_raise(curs->conn, curs, NULL); pq_raise(conn, curs, NULL);
goto exit; goto exit;
} }
/* But PQconsumeInput() doesn't tell us if it has actually read /* But PQconsumeInput() doesn't tell us if it has actually read
@ -1581,15 +1586,15 @@ retry:
if (len == -2) { if (len == -2) {
/* serious error */ /* serious error */
pq_raise(curs->conn, curs, NULL); pq_raise(conn, curs, NULL);
goto exit; goto exit;
} }
if (len == -1) { if (len == -1) {
/* EOF */ /* EOF */
curs->pgres = PQgetResult(curs->conn->pgconn); curs->pgres = PQgetResult(pgconn);
if (curs->pgres && PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR) { if (curs->pgres && PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR) {
pq_raise(curs->conn, curs, NULL); pq_raise(conn, curs, NULL);
goto exit; goto exit;
} }
@ -1603,7 +1608,7 @@ retry:
consumed = 1; consumed = 1;
/* ok, we did really read something: update the io timestamp */ /* ok, we did really read something: update the io timestamp */
gettimeofday(&curs->repl_last_io, NULL); gettimeofday(&repl->last_io, NULL);
Dprintf("pq_read_replication_message: msg=%c, len=%d", buffer[0], len); Dprintf("pq_read_replication_message: msg=%c, len=%d", buffer[0], len);
if (buffer[0] == 'w') { if (buffer[0] == 'w') {
@ -1626,21 +1631,22 @@ retry:
/* XXX it would be wise to check if it's really a logical replication */ /* XXX it would be wise to check if it's really a logical replication */
if (decode) { if (decode) {
str = PyUnicode_Decode(buffer + hdr, data_size, curs->conn->codec, NULL); str = PyUnicode_Decode(buffer + hdr, data_size, conn->codec, NULL);
} else { } else {
str = Bytes_FromStringAndSize(buffer + hdr, data_size); str = Bytes_FromStringAndSize(buffer + hdr, data_size);
} }
if (!str) { goto exit; } if (!str) { goto exit; }
msg = PyObject_CallFunctionObjArgs((PyObject *)&replicationMessageType, result = PyObject_CallFunctionObjArgs((PyObject *)&replicationMessageType,
curs, str, NULL); curs, str, NULL);
Py_DECREF(str); Py_DECREF(str);
if (!msg) { goto exit; } if (!result) { goto exit; }
((replicationMessageObject *)msg)->data_size = data_size; msg = (replicationMessageObject *)result;
((replicationMessageObject *)msg)->data_start = data_start; msg->data_size = data_size;
((replicationMessageObject *)msg)->wal_end = wal_end; msg->data_start = data_start;
((replicationMessageObject *)msg)->send_time = send_time; msg->wal_end = wal_end;
msg->send_time = send_time;
} }
else if (buffer[0] == 'k') { else if (buffer[0] == 'k') {
/* Primary keepalive message: msgtype(1), walEnd(8), sendTime(8), reply(1) */ /* Primary keepalive message: msgtype(1), walEnd(8), sendTime(8), reply(1) */
@ -1652,17 +1658,17 @@ retry:
reply = buffer[hdr]; reply = buffer[hdr];
if (reply) { if (reply) {
if (!pq_send_replication_feedback(curs, 0)) { if (!pq_send_replication_feedback(repl, 0)) {
if (curs->conn->async) { if (conn->async) {
curs->repl_feedback_pending = 1; repl->feedback_pending = 1;
} else { } else {
/* XXX not sure if this was a good idea after all */ /* XXX not sure if this was a good idea after all */
pq_raise(curs->conn, curs, NULL); pq_raise(conn, curs, NULL);
goto exit; goto exit;
} }
} }
else { else {
gettimeofday(&curs->repl_last_io, NULL); gettimeofday(&repl->last_io, NULL);
} }
} }
@ -1680,37 +1686,38 @@ exit:
PQfreemem(buffer); PQfreemem(buffer);
} }
return msg; return result;
none: none:
msg = Py_None; result = Py_None;
Py_INCREF(msg); Py_INCREF(result);
goto exit; goto exit;
} }
int int
pq_send_replication_feedback(cursorObject* curs, int reply_requested) pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested)
{ {
cursorObject *curs = &repl->cur;
PGconn *pgconn = curs->conn->pgconn;
char replybuf[1 + 8 + 8 + 8 + 8 + 1]; char replybuf[1 + 8 + 8 + 8 + 8 + 1];
int len = 0; int len = 0;
Dprintf("pq_send_replication_feedback: write="XLOGFMTSTR", flush="XLOGFMTSTR", apply="XLOGFMTSTR, Dprintf("pq_send_replication_feedback: write="XLOGFMTSTR", flush="XLOGFMTSTR", apply="XLOGFMTSTR,
XLOGFMTARGS(curs->repl_write_lsn), XLOGFMTARGS(repl->write_lsn),
XLOGFMTARGS(curs->repl_flush_lsn), XLOGFMTARGS(repl->flush_lsn),
XLOGFMTARGS(curs->repl_apply_lsn)); XLOGFMTARGS(repl->apply_lsn));
replybuf[len] = 'r'; len += 1; replybuf[len] = 'r'; len += 1;
fe_sendint64(curs->repl_write_lsn, &replybuf[len]); len += 8; fe_sendint64(repl->write_lsn, &replybuf[len]); len += 8;
fe_sendint64(curs->repl_flush_lsn, &replybuf[len]); len += 8; fe_sendint64(repl->flush_lsn, &replybuf[len]); len += 8;
fe_sendint64(curs->repl_apply_lsn, &replybuf[len]); len += 8; fe_sendint64(repl->apply_lsn, &replybuf[len]); len += 8;
fe_sendint64(feGetCurrentTimestamp(), &replybuf[len]); len += 8; fe_sendint64(feGetCurrentTimestamp(), &replybuf[len]); len += 8;
replybuf[len] = reply_requested ? 1 : 0; len += 1; replybuf[len] = reply_requested ? 1 : 0; len += 1;
if (PQputCopyData(curs->conn->pgconn, replybuf, len) <= 0 || if (PQputCopyData(pgconn, replybuf, len) <= 0 || PQflush(pgconn) != 0) {
PQflush(curs->conn->pgconn) != 0) {
return 0; return 0;
} }
gettimeofday(&curs->repl_last_io, NULL); gettimeofday(&repl->last_io, NULL);
return 1; return 1;
} }
@ -1723,12 +1730,15 @@ pq_send_replication_feedback(cursorObject* curs, int reply_requested)
manages to send keepalive messages to the server as needed. manages to send keepalive messages to the server as needed.
*/ */
int int
pq_copy_both(cursorObject *curs, PyObject *consume, int decode, double keepalive_interval) pq_copy_both(replicationCursorObject *repl, PyObject *consume, int decode,
double keepalive_interval)
{ {
cursorObject *curs = &repl->cur;
connectionObject *conn = curs->conn;
PGconn *pgconn = conn->pgconn;
PyObject *msg, *tmp = NULL; PyObject *msg, *tmp = NULL;
PyObject *consume_func = NULL; PyObject *consume_func = NULL;
int fd, sel, ret = -1; int fd, sel, ret = -1;
PGconn *pgconn;
fd_set fds; fd_set fds;
struct timeval keep_intr, curr_time, ping_time, timeout; struct timeval keep_intr, curr_time, ping_time, timeout;
@ -1738,13 +1748,12 @@ pq_copy_both(cursorObject *curs, PyObject *consume, int decode, double keepalive
} }
CLEARPGRES(curs->pgres); CLEARPGRES(curs->pgres);
pgconn = curs->conn->pgconn;
keep_intr.tv_sec = (int)keepalive_interval; keep_intr.tv_sec = (int)keepalive_interval;
keep_intr.tv_usec = (keepalive_interval - keep_intr.tv_sec)*1.0e6; keep_intr.tv_usec = (keepalive_interval - keep_intr.tv_sec)*1.0e6;
while (1) { while (1) {
msg = pq_read_replication_message(curs, decode); msg = pq_read_replication_message(repl, decode);
if (!msg) { if (!msg) {
goto exit; goto exit;
} }
@ -1753,7 +1762,7 @@ pq_copy_both(cursorObject *curs, PyObject *consume, int decode, double keepalive
fd = PQsocket(pgconn); fd = PQsocket(pgconn);
if (fd < 0) { if (fd < 0) {
pq_raise(curs->conn, curs, NULL); pq_raise(conn, curs, NULL);
goto exit; goto exit;
} }
@ -1763,7 +1772,7 @@ pq_copy_both(cursorObject *curs, PyObject *consume, int decode, double keepalive
/* how long can we wait before we need to send a keepalive? */ /* how long can we wait before we need to send a keepalive? */
gettimeofday(&curr_time, NULL); gettimeofday(&curr_time, NULL);
timeradd(&curs->repl_last_io, &keep_intr, &ping_time); timeradd(&repl->last_io, &keep_intr, &ping_time);
timersub(&ping_time, &curr_time, &timeout); timersub(&ping_time, &curr_time, &timeout);
if (timeout.tv_sec >= 0) { if (timeout.tv_sec >= 0) {
@ -1787,8 +1796,8 @@ pq_copy_both(cursorObject *curs, PyObject *consume, int decode, double keepalive
} }
if (sel == 0) { if (sel == 0) {
if (!pq_send_replication_feedback(curs, 0)) { if (!pq_send_replication_feedback(repl, 0)) {
pq_raise(curs->conn, curs, NULL); pq_raise(conn, curs, NULL);
goto exit; goto exit;
} }
} }
@ -1876,7 +1885,7 @@ pq_fetch(cursorObject *curs, int no_result)
Dprintf("pq_fetch: data from a streaming replication slot (no tuples)"); Dprintf("pq_fetch: data from a streaming replication slot (no tuples)");
curs->rowcount = -1; curs->rowcount = -1;
ex = 0; ex = 0;
/* nothing to do here: _pq_copy_both_v3 will be called separately */ /* nothing to do here: pq_copy_both will be called separately */
CLEARPGRES(curs->pgres); CLEARPGRES(curs->pgres);
break; break;

View File

@ -27,6 +27,7 @@
#define PSYCOPG_PQPATH_H 1 #define PSYCOPG_PQPATH_H 1
#include "psycopg/cursor.h" #include "psycopg/cursor.h"
#include "psycopg/replication_cursor.h"
#include "psycopg/connection.h" #include "psycopg/connection.h"
/* macro to clean the pg result */ /* macro to clean the pg result */
@ -72,9 +73,10 @@ HIDDEN int pq_execute_command_locked(connectionObject *conn,
RAISES HIDDEN void pq_complete_error(connectionObject *conn, PGresult **pgres, RAISES HIDDEN void pq_complete_error(connectionObject *conn, PGresult **pgres,
char **error); char **error);
HIDDEN int pq_copy_both(cursorObject *curs, PyObject *consumer, /* replication protocol support */
HIDDEN int pq_copy_both(replicationCursorObject *repl, PyObject *consumer,
int decode, double keepalive_interval); int decode, double keepalive_interval);
HIDDEN PyObject *pq_read_replication_message(cursorObject *curs, int decode); HIDDEN PyObject *pq_read_replication_message(replicationCursorObject *repl, int decode);
HIDDEN int pq_send_replication_feedback(cursorObject *curs, int reply_requested); HIDDEN int pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested);
#endif /* !defined(PSYCOPG_PQPATH_H) */ #endif /* !defined(PSYCOPG_PQPATH_H) */

View File

@ -28,6 +28,7 @@
#include "psycopg/connection.h" #include "psycopg/connection.h"
#include "psycopg/cursor.h" #include "psycopg/cursor.h"
#include "psycopg/replication_cursor.h"
#include "psycopg/replication_message.h" #include "psycopg/replication_message.h"
#include "psycopg/green.h" #include "psycopg/green.h"
#include "psycopg/lobject.h" #include "psycopg/lobject.h"
@ -917,6 +918,9 @@ INIT_MODULE(_psycopg)(void)
Py_TYPE(&cursorType) = &PyType_Type; Py_TYPE(&cursorType) = &PyType_Type;
if (PyType_Ready(&cursorType) == -1) goto exit; if (PyType_Ready(&cursorType) == -1) goto exit;
Py_TYPE(&replicationCursorType) = &PyType_Type;
if (PyType_Ready(&replicationCursorType) == -1) goto exit;
Py_TYPE(&replicationMessageType) = &PyType_Type; Py_TYPE(&replicationMessageType) = &PyType_Type;
if (PyType_Ready(&replicationMessageType) == -1) goto exit; if (PyType_Ready(&replicationMessageType) == -1) goto exit;
@ -1000,7 +1004,7 @@ INIT_MODULE(_psycopg)(void)
/* Initialize the PyDateTimeAPI everywhere is used */ /* Initialize the PyDateTimeAPI everywhere is used */
PyDateTime_IMPORT; PyDateTime_IMPORT;
if (psyco_adapter_datetime_init()) { goto exit; } if (psyco_adapter_datetime_init()) { goto exit; }
if (psyco_curs_datetime_init()) { goto exit; } if (psyco_repl_curs_datetime_init()) { goto exit; }
if (psyco_replmsg_datetime_init()) { goto exit; } if (psyco_replmsg_datetime_init()) { goto exit; }
Py_TYPE(&pydatetimeType) = &PyType_Type; Py_TYPE(&pydatetimeType) = &PyType_Type;
@ -1044,7 +1048,8 @@ INIT_MODULE(_psycopg)(void)
/* put new types in module dictionary */ /* put new types in module dictionary */
PyModule_AddObject(module, "connection", (PyObject*)&connectionType); PyModule_AddObject(module, "connection", (PyObject*)&connectionType);
PyModule_AddObject(module, "cursor", (PyObject*)&cursorType); PyModule_AddObject(module, "cursor", (PyObject*)&cursorType);
PyModule_AddObject(module, "replicationMessage", (PyObject*)&replicationMessageType); PyModule_AddObject(module, "ReplicationCursor", (PyObject*)&replicationCursorType);
PyModule_AddObject(module, "ReplicationMessage", (PyObject*)&replicationMessageType);
PyModule_AddObject(module, "ISQLQuote", (PyObject*)&isqlquoteType); PyModule_AddObject(module, "ISQLQuote", (PyObject*)&isqlquoteType);
PyModule_AddObject(module, "Notify", (PyObject*)&notifyType); PyModule_AddObject(module, "Notify", (PyObject*)&notifyType);
PyModule_AddObject(module, "Xid", (PyObject*)&xidType); PyModule_AddObject(module, "Xid", (PyObject*)&xidType);

View File

@ -0,0 +1,77 @@
/* replication_cursor.h - definition for the psycopg replication cursor type
*
* Copyright (C) 2015 Daniele Varrazzo <daniele.varrazzo@gmail.com>
*
* This file is part of psycopg.
*
* psycopg2 is free software: you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* In addition, as a special exception, the copyright holders give
* permission to link this program with the OpenSSL library (or with
* modified versions of OpenSSL that use the same license as OpenSSL),
* and distribute linked combinations including the two.
*
* You must obey the GNU Lesser General Public License in all respects for
* all of the code used other than OpenSSL.
*
* psycopg2 is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
* License for more details.
*/
#ifndef PSYCOPG_REPLICATION_CURSOR_H
#define PSYCOPG_REPLICATION_CURSOR_H 1
#include "psycopg/cursor.h"
#include "libpq_support.h"
#ifdef __cplusplus
extern "C" {
#endif
extern HIDDEN PyTypeObject replicationCursorType;
typedef struct replicationCursorObject {
cursorObject cur;
int started:1; /* if replication is started */
int consuming:1; /* if running the consume loop */
struct timeval last_io; /* timestamp of the last exchange with the server */
struct timeval keepalive_interval; /* interval for keepalive messages in replication mode */
XLogRecPtr write_lsn; /* LSN stats for replication feedback messages */
XLogRecPtr flush_lsn;
XLogRecPtr apply_lsn;
int feedback_pending; /* flag set when we couldn't send the feedback to the server */
} replicationCursorObject;
RAISES_NEG int psyco_repl_curs_datetime_init(void);
/* exception-raising macros */
#define EXC_IF_REPLICATING(self, cmd) \
do \
if ((self)->started) { \
PyErr_SetString(ProgrammingError, \
#cmd " cannot be used when replication is already in progress"); \
return NULL; } \
while (0)
#define EXC_IF_NOT_REPLICATING(self, cmd) \
do \
if (!(self)->started) { \
PyErr_SetString(ProgrammingError, \
#cmd " cannot be used when replication is not in progress"); \
return NULL; } \
while (0)
#ifdef __cplusplus
}
#endif
#endif /* !defined(PSYCOPG_REPLICATION_CURSOR_H) */

View File

@ -0,0 +1,360 @@
/* replication_cursor_type.c - python interface to replication cursor objects
*
* Copyright (C) 2015 Daniele Varrazzo <daniele.varrazzo@gmail.com>
*
* This file is part of psycopg.
*
* psycopg2 is free software: you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* In addition, as a special exception, the copyright holders give
* permission to link this program with the OpenSSL library (or with
* modified versions of OpenSSL that use the same license as OpenSSL),
* and distribute linked combinations including the two.
*
* You must obey the GNU Lesser General Public License in all respects for
* all of the code used other than OpenSSL.
*
* psycopg2 is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
* License for more details.
*/
#define PSYCOPG_MODULE
#include "psycopg/psycopg.h"
#include "psycopg/replication_cursor.h"
#include "psycopg/replication_message.h"
#include "psycopg/green.h"
#include "psycopg/pqpath.h"
#include <string.h>
#include <stdlib.h>
/* python */
#include "datetime.h"
#define psyco_repl_curs_start_replication_expert_doc \
"start_replication_expert(command, writer=None, keepalive_interval=10) -- Start replication stream with a directly given command."
static PyObject *
psyco_repl_curs_start_replication_expert(replicationCursorObject *self,
PyObject *args, PyObject *kwargs)
{
cursorObject *curs = &self->cur;
connectionObject *conn = self->cur.conn;
PyObject *res = NULL;
char *command;
static char *kwlist[] = {"command", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s", kwlist, &command)) {
return NULL;
}
EXC_IF_CURS_CLOSED(curs);
EXC_IF_GREEN(start_replication_expert);
EXC_IF_TPC_PREPARED(conn, start_replication_expert);
EXC_IF_REPLICATING(self, start_replication_expert);
Dprintf("psyco_repl_curs_start_replication_expert: %s", command);
/* self->copysize = 0;*/
gettimeofday(&self->last_io, NULL);
if (pq_execute(curs, command, conn->async, 1 /* no_result */, 1 /* no_begin */) >= 0) {
res = Py_None;
Py_INCREF(res);
self->started = 1;
}
return res;
}
#define psyco_repl_curs_consume_stream_doc \
"consume_stream(consumer, keepalive_interval=10) -- Consume replication stream."
static PyObject *
psyco_repl_curs_consume_stream(replicationCursorObject *self,
PyObject *args, PyObject *kwargs)
{
cursorObject *curs = &self->cur;
PyObject *consume = NULL, *res = NULL;
int decode = 0;
double keepalive_interval = 10;
static char *kwlist[] = {"consume", "decode", "keepalive_interval", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|id", kwlist,
&consume, &decode, &keepalive_interval)) {
return NULL;
}
EXC_IF_CURS_CLOSED(curs);
EXC_IF_CURS_ASYNC(curs, consume_stream);
EXC_IF_GREEN(consume_stream);
EXC_IF_TPC_PREPARED(self->cur.conn, consume_stream);
EXC_IF_NOT_REPLICATING(self, consume_stream);
if (self->consuming) {
PyErr_SetString(ProgrammingError,
"consume_stream cannot be used when already in the consume loop");
return NULL;
}
Dprintf("psyco_repl_curs_consume_stream");
if (keepalive_interval < 1.0) {
psyco_set_error(ProgrammingError, curs, "keepalive_interval must be >= 1 (sec)");
return NULL;
}
self->consuming = 1;
if (pq_copy_both(self, consume, decode, keepalive_interval) >= 0) {
res = Py_None;
Py_INCREF(res);
}
self->consuming = 0;
return res;
}
#define psyco_repl_curs_read_message_doc \
"read_message(decode=True) -- Try reading a replication message from the server (non-blocking)."
static PyObject *
psyco_repl_curs_read_message(replicationCursorObject *self,
PyObject *args, PyObject *kwargs)
{
cursorObject *curs = &self->cur;
int decode = 1;
static char *kwlist[] = {"decode", NULL};
EXC_IF_CURS_CLOSED(curs);
EXC_IF_GREEN(read_message);
EXC_IF_TPC_PREPARED(self->cur.conn, read_message);
EXC_IF_NOT_REPLICATING(self, read_message);
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|i", kwlist,
&decode)) {
return NULL;
}
return pq_read_replication_message(self, decode);
}
static PyObject *
repl_curs_flush_feedback(replicationCursorObject *self, int reply)
{
if (!(self->feedback_pending || reply))
Py_RETURN_TRUE;
if (pq_send_replication_feedback(self, reply)) {
self->feedback_pending = 0;
Py_RETURN_TRUE;
} else {
self->feedback_pending = 1;
Py_RETURN_FALSE;
}
}
#define psyco_repl_curs_send_feedback_doc \
"send_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False) -- Try sending a replication feedback message to the server and optionally request a reply."
static PyObject *
psyco_repl_curs_send_feedback(replicationCursorObject *self,
PyObject *args, PyObject *kwargs)
{
cursorObject *curs = &self->cur;
XLogRecPtr write_lsn = InvalidXLogRecPtr,
flush_lsn = InvalidXLogRecPtr,
apply_lsn = InvalidXLogRecPtr;
int reply = 0;
static char* kwlist[] = {"write_lsn", "flush_lsn", "apply_lsn", "reply", NULL};
EXC_IF_CURS_CLOSED(curs);
EXC_IF_NOT_REPLICATING(self, send_feedback);
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|KKKi", kwlist,
&write_lsn, &flush_lsn, &apply_lsn, &reply)) {
return NULL;
}
if (write_lsn > self->write_lsn)
self->write_lsn = write_lsn;
if (flush_lsn > self->flush_lsn)
self->flush_lsn = flush_lsn;
if (apply_lsn > self->apply_lsn)
self->apply_lsn = apply_lsn;
self->feedback_pending = 1;
return repl_curs_flush_feedback(self, reply);
}
#define psyco_repl_curs_flush_feedback_doc \
"flush_feedback(reply=False) -- Try flushing the latest pending replication feedback message to the server and optionally request a reply."
static PyObject *
psyco_repl_curs_flush_feedback(replicationCursorObject *self,
PyObject *args, PyObject *kwargs)
{
cursorObject *curs = &self->cur;
int reply = 0;
static char *kwlist[] = {"reply", NULL};
EXC_IF_CURS_CLOSED(curs);
EXC_IF_NOT_REPLICATING(self, flush_feedback);
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|i", kwlist,
&reply)) {
return NULL;
}
return repl_curs_flush_feedback(self, reply);
}
RAISES_NEG int
psyco_repl_curs_datetime_init(void)
{
Dprintf("psyco_repl_curs_datetime_init: datetime init");
PyDateTime_IMPORT;
if (!PyDateTimeAPI) {
PyErr_SetString(PyExc_ImportError, "datetime initialization failed");
return -1;
}
return 0;
}
#define psyco_repl_curs_io_timestamp_doc \
"io_timestamp -- the timestamp of latest IO with the server"
static PyObject *
psyco_repl_curs_get_io_timestamp(replicationCursorObject *self)
{
cursorObject *curs = &self->cur;
PyObject *tval, *res = NULL;
double seconds;
EXC_IF_CURS_CLOSED(curs);
seconds = self->last_io.tv_sec + self->last_io.tv_usec / 1.0e6;
tval = Py_BuildValue("(d)", seconds);
if (tval) {
res = PyDateTime_FromTimestamp(tval);
Py_DECREF(tval);
}
return res;
}
/* object method list */
static struct PyMethodDef replicationCursorObject_methods[] = {
{"start_replication_expert", (PyCFunction)psyco_repl_curs_start_replication_expert,
METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_start_replication_expert_doc},
{"consume_stream", (PyCFunction)psyco_repl_curs_consume_stream,
METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_consume_stream_doc},
{"read_message", (PyCFunction)psyco_repl_curs_read_message,
METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_read_message_doc},
{"send_feedback", (PyCFunction)psyco_repl_curs_send_feedback,
METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_send_feedback_doc},
{"flush_feedback", (PyCFunction)psyco_repl_curs_flush_feedback,
METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_flush_feedback_doc},
{NULL}
};
/* object calculated member list */
static struct PyGetSetDef replicationCursorObject_getsets[] = {
{ "io_timestamp",
(getter)psyco_repl_curs_get_io_timestamp, NULL,
psyco_repl_curs_io_timestamp_doc, NULL },
{NULL}
};
static int
replicationCursor_setup(replicationCursorObject* self)
{
self->started = 0;
self->consuming = 0;
self->write_lsn = InvalidXLogRecPtr;
self->flush_lsn = InvalidXLogRecPtr;
self->apply_lsn = InvalidXLogRecPtr;
self->feedback_pending = 0;
return 0;
}
static int
replicationCursor_init(PyObject *obj, PyObject *args, PyObject *kwargs)
{
replicationCursor_setup((replicationCursorObject *)obj);
return cursor_init(obj, args, kwargs);
}
static PyObject *
replicationCursor_repr(replicationCursorObject *self)
{
return PyString_FromFormat(
"<ReplicationCursor object at %p; closed: %d>", self, self->cur.closed);
}
/* object type */
#define replicationCursorType_doc \
"A database replication cursor."
PyTypeObject replicationCursorType = {
PyVarObject_HEAD_INIT(NULL, 0)
"psycopg2.extensions.ReplicationCursor",
sizeof(replicationCursorObject), 0,
0, /*tp_dealloc*/
0, /*tp_print*/
0, /*tp_getattr*/
0, /*tp_setattr*/
0, /*tp_compare*/
(reprfunc)replicationCursor_repr, /*tp_repr*/
0, /*tp_as_number*/
0, /*tp_as_sequence*/
0, /*tp_as_mapping*/
0, /*tp_hash*/
0, /*tp_call*/
(reprfunc)replicationCursor_repr, /*tp_str*/
0, /*tp_getattro*/
0, /*tp_setattro*/
0, /*tp_as_buffer*/
Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_ITER |
Py_TPFLAGS_HAVE_GC, /*tp_flags*/
replicationCursorType_doc, /*tp_doc*/
0, /*tp_traverse*/
0, /*tp_clear*/
0, /*tp_richcompare*/
0, /*tp_weaklistoffset*/
0, /*tp_iter*/
0, /*tp_iternext*/
replicationCursorObject_methods, /*tp_methods*/
0, /*tp_members*/
replicationCursorObject_getsets, /*tp_getset*/
&cursorType, /*tp_base*/
0, /*tp_dict*/
0, /*tp_descr_get*/
0, /*tp_descr_set*/
0, /*tp_dictoffset*/
replicationCursor_init, /*tp_init*/
0, /*tp_alloc*/
0, /*tp_new*/
};

View File

@ -49,7 +49,7 @@ static PyObject *
replmsg_repr(replicationMessageObject *self) replmsg_repr(replicationMessageObject *self)
{ {
return PyString_FromFormat( return PyString_FromFormat(
"<replicationMessage object at %p; data_size: %d; data_start: "XLOGFMTSTR"; wal_end: "XLOGFMTSTR"; send_time: %lld>", "<ReplicationMessage object at %p; data_size: %d; data_start: "XLOGFMTSTR"; wal_end: "XLOGFMTSTR"; send_time: %lld>",
self, self->data_size, XLOGFMTARGS(self->data_start), XLOGFMTARGS(self->wal_end), self, self->data_size, XLOGFMTARGS(self->data_start), XLOGFMTARGS(self->wal_end),
self->send_time); self->send_time);
} }

View File

@ -92,6 +92,7 @@
<None Include="psycopg\pqpath.h" /> <None Include="psycopg\pqpath.h" />
<None Include="psycopg\psycopg.h" /> <None Include="psycopg\psycopg.h" />
<None Include="psycopg\python.h" /> <None Include="psycopg\python.h" />
<None Include="psycopg\replication_cursor.h" />
<None Include="psycopg\replication_message.h" /> <None Include="psycopg\replication_message.h" />
<None Include="psycopg\typecast.h" /> <None Include="psycopg\typecast.h" />
<None Include="psycopg\typecast_binary.h" /> <None Include="psycopg\typecast_binary.h" />
@ -225,6 +226,7 @@
<Compile Include="psycopg\microprotocols_proto.c" /> <Compile Include="psycopg\microprotocols_proto.c" />
<Compile Include="psycopg\pqpath.c" /> <Compile Include="psycopg\pqpath.c" />
<Compile Include="psycopg\psycopgmodule.c" /> <Compile Include="psycopg\psycopgmodule.c" />
<Compile Include="psycopg\replication_cursor_type.c" />
<Compile Include="psycopg\replication_message_type.c" /> <Compile Include="psycopg\replication_message_type.c" />
<Compile Include="psycopg\typecast.c" /> <Compile Include="psycopg\typecast.c" />
<Compile Include="psycopg\typecast_array.c" /> <Compile Include="psycopg\typecast_array.c" />

View File

@ -466,7 +466,7 @@ sources = [
'connection_int.c', 'connection_type.c', 'connection_int.c', 'connection_type.c',
'cursor_int.c', 'cursor_type.c', 'cursor_int.c', 'cursor_type.c',
'replication_message_type.c', 'replication_cursor_type.c', 'replication_message_type.c',
'diagnostics_type.c', 'error_type.c', 'diagnostics_type.c', 'error_type.c',
'lobject_int.c', 'lobject_type.c', 'lobject_int.c', 'lobject_type.c',
'notify_type.c', 'xid_type.c', 'notify_type.c', 'xid_type.c',
@ -482,7 +482,7 @@ depends = [
# headers # headers
'config.h', 'pgtypes.h', 'psycopg.h', 'python.h', 'connection.h', 'config.h', 'pgtypes.h', 'psycopg.h', 'python.h', 'connection.h',
'cursor.h', 'diagnostics.h', 'error.h', 'green.h', 'lobject.h', 'cursor.h', 'diagnostics.h', 'error.h', 'green.h', 'lobject.h',
'replication_message.h', 'replication_cursor.h', 'replication_message.h',
'notify.h', 'pqpath.h', 'xid.h', 'notify.h', 'pqpath.h', 'xid.h',
'libpq_support.h', 'win32_support.h', 'libpq_support.h', 'win32_support.h',

View File

@ -47,12 +47,16 @@ class ReplicationTestCase(ConnectingTestCase):
# first close all connections, as they might keep the slot(s) active # first close all connections, as they might keep the slot(s) active
super(ReplicationTestCase, self).tearDown() super(ReplicationTestCase, self).tearDown()
import time
time.sleep(0.025) # sometimes the slot is still active, wait a little
if self._slots: if self._slots:
kill_conn = self.repl_connect(connection_factory=PhysicalReplicationConnection) kill_conn = self.connect()
if kill_conn: if kill_conn:
kill_cur = kill_conn.cursor() kill_cur = kill_conn.cursor()
for slot in self._slots: for slot in self._slots:
kill_cur.drop_replication_slot(slot) kill_cur.execute("SELECT pg_drop_replication_slot(%s)", (slot,))
kill_conn.commit()
kill_conn.close() kill_conn.close()
def create_replication_slot(self, cur, slot_name=testconfig.repl_slot, **kwargs): def create_replication_slot(self, cur, slot_name=testconfig.repl_slot, **kwargs):
@ -127,7 +131,7 @@ class ReplicationTest(ReplicationTestCase):
cur.start_replication(self.slot) cur.start_replication(self.slot)
def consume(msg): def consume(msg):
raise StopReplication() raise StopReplication()
self.assertRaises(StopReplication, cur.consume_replication_stream, consume) self.assertRaises(StopReplication, cur.consume_stream, consume)
class AsyncReplicationTest(ReplicationTestCase): class AsyncReplicationTest(ReplicationTestCase):
@ -148,14 +152,22 @@ class AsyncReplicationTest(ReplicationTestCase):
self.msg_count = 0 self.msg_count = 0
def consume(msg): def consume(msg):
# just check the methods
log = "%s: %s" % (cur.io_timestamp, repr(msg))
self.msg_count += 1 self.msg_count += 1
if self.msg_count > 3: if self.msg_count > 3:
cur.flush_feedback(reply=True)
raise StopReplication() raise StopReplication()
cur.send_feedback(flush_lsn=msg.data_start)
self.assertRaises(psycopg2.ProgrammingError, cur.consume_stream, consume)
def process_stream(): def process_stream():
from select import select from select import select
while True: while True:
msg = cur.read_replication_message() msg = cur.read_message()
if msg: if msg:
consume(msg) consume(msg)
else: else: