From 0bb81fc84811134bca70b59daa4661bd0697f2ff Mon Sep 17 00:00:00 2001
From: Oleksandr Shulgin <oleksandr.shulgin@zalando.de>
Date: Mon, 19 Oct 2015 20:00:39 +0200
Subject: [PATCH] Properly subclass ReplicationCursor on C level.

---
 doc/src/extras.rst                 |  50 ++--
 lib/extensions.py                  |   2 +-
 lib/extras.py                      |  10 +-
 psycopg/cursor.h                   |  27 +--
 psycopg/cursor_type.c              | 235 +------------------
 psycopg/pqpath.c                   |  97 ++++----
 psycopg/pqpath.h                   |   8 +-
 psycopg/psycopgmodule.c            |   9 +-
 psycopg/replication_cursor.h       |  77 ++++++
 psycopg/replication_cursor_type.c  | 360 +++++++++++++++++++++++++++++
 psycopg/replication_message_type.c |   2 +-
 psycopg2.cproj                     |   2 +
 setup.py                           |   4 +-
 tests/test_replication.py          |  20 +-
 14 files changed, 554 insertions(+), 349 deletions(-)
 create mode 100644 psycopg/replication_cursor.h
 create mode 100644 psycopg/replication_cursor_type.c

diff --git a/doc/src/extras.rst b/doc/src/extras.rst
index ddf989d7..9384a961 100644
--- a/doc/src/extras.rst
+++ b/doc/src/extras.rst
@@ -324,16 +324,15 @@ The individual messages in the replication stream are presented by
         `start_replication_expert()` internally.
 
         After starting the replication, to actually consume the incoming
-        server messages, use `consume_replication_stream()` or implement a
-        loop around `read_replication_message()` in case of asynchronous
-        connection.
+        server messages, use `consume_stream()` or implement a loop around
+        `read_message()` in case of asynchronous connection.
 
     .. method:: start_replication_expert(command)
 
         Start replication on the connection using provided ``START_REPLICATION``
         command.
 
-    .. method:: consume_replication_stream(consume, decode=False, keepalive_interval=10)
+    .. method:: consume_stream(consume, decode=False, keepalive_interval=10)
 
         :param consume: a callable object with signature ``consume(msg)``
         :param decode: a flag indicating that unicode conversion should be
@@ -342,7 +341,7 @@ The individual messages in the replication stream are presented by
                                    messages to the server
 
         This method can only be used with synchronous connection.  For
-        asynchronous connections see `read_replication_message()`.
+        asynchronous connections see `read_message()`.
 
         Before calling this method to consume the stream, use
         `start_replication()` first.
@@ -372,18 +371,18 @@ The individual messages in the replication stream are presented by
                     self.store_message_data(msg.payload)
 
                     if self.should_report_to_the_server_now(msg):
-                        msg.cursor.send_replication_feedback(flush_lsn=msg.data_start)
+                        msg.cursor.send_feedback(flush_lsn=msg.data_start)
 
             consumer = LogicalStreamConsumer()
-            cur.consume_replication_stream(consumer, decode=True)
+            cur.consume_stream(consumer, decode=True)
 
         The *msg* objects passed to ``consume()`` are instances of
         `ReplicationMessage` class.
 
         After storing certain amount of messages' data reliably, the client
         should send a confirmation message to the server.  This should be done
-        by calling `send_replication_feedback()` method on the corresponding
-        replication cursor.  A reference to the cursor is provided in the
+        by calling `send_feedback()` method on the corresponding replication
+        cursor.  A reference to the cursor is provided in the
         `ReplicationMessage` as an attribute.
 
         .. warning::
@@ -400,7 +399,7 @@ The individual messages in the replication stream are presented by
             load on network and the server.  A possible strategy is to confirm
             after every COMMIT message.
 
-    .. method:: send_replication_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False)
+    .. method:: send_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False)
 
         :param write_lsn: a LSN position up to which the client has written the data locally
         :param flush_lsn: a LSN position up to which the client has stored the
@@ -419,16 +418,15 @@ The individual messages in the replication stream are presented by
         just send a keepalive message to the server.
 
         If the feedback message could not be sent, updates the passed LSN
-        positions in the cursor for a later call to
-        `flush_replication_feedback()` and returns `!False`, otherwise returns
-        `!True`.
+        positions in the cursor for a later call to `flush_feedback()` and
+        returns `!False`, otherwise returns `!True`.
 
-    .. method:: flush_replication_feedback(reply=False)
+    .. method:: flush_feedback(reply=False)
 
         :param reply: request the server to send back a keepalive message immediately
 
         This method tries to flush the latest replication feedback message
-        that `send_replication_feedback()` was trying to send but couldn't.
+        that `send_feedback()` was trying to send but couldn't.
 
         If *reply* is `!True` sends a keepalive message in either case.
 
@@ -437,14 +435,13 @@ The individual messages in the replication stream are presented by
 
     Low-level methods for asynchronous connection operation.
 
-    With the synchronous connection, a call to `consume_replication_stream()`
-    handles all the complexity of handling the incoming messages and sending
-    keepalive replies, but at times it might be beneficial to use low-level
-    interface for better control, in particular to `~select.select()` on
-    multiple sockets.  The following methods are provided for asynchronous
-    operation:
+    With the synchronous connection, a call to `consume_stream()` handles all
+    the complexity of handling the incoming messages and sending keepalive
+    replies, but at times it might be beneficial to use low-level interface
+    for better control, in particular to `~select.select()` on multiple
+    sockets.  The following methods are provided for asynchronous operation:
 
-    .. method:: read_replication_message(decode=True)
+    .. method:: read_message(decode=True)
 
         :param decode: a flag indicating that unicode conversion should be
                        performed on the data received from the server
@@ -475,7 +472,7 @@ The individual messages in the replication stream are presented by
         This is a convenience method which allows replication cursor to be
         used directly in `~select.select()` or `~select.poll()` calls.
 
-    .. attribute:: replication_io_timestamp
+    .. attribute:: io_timestamp
 
         A `~datetime` object representing the timestamp at the moment of last
         communication with the server (a data or keepalive message in either
@@ -488,18 +485,19 @@ The individual messages in the replication stream are presented by
 
         keepalive_interval = 10.0
         while True:
-            msg = cur.read_replication_message()
+            msg = cur.read_message()
             if msg:
                 consume(msg)
             else:
-                timeout = keepalive_interval - (datetime.now() - cur.replication_io_timestamp).total_seconds()
+                now = datetime.now()
+                timeout = keepalive_interval - (now - cur.io_timestamp).total_seconds()
                 if timeout > 0:
                     sel = select.select([cur], [], [], timeout)
                 else:
                     sel = ([], [], [])
 
                 if not sel[0]:
-                    cur.send_replication_feedback()
+                    cur.send_feedback()
 
 .. index::
     pair: Cursor; Replication
diff --git a/lib/extensions.py b/lib/extensions.py
index 513b7fc7..af27bca6 100644
--- a/lib/extensions.py
+++ b/lib/extensions.py
@@ -61,7 +61,7 @@ from psycopg2._psycopg import string_types, binary_types, new_type, new_array_ty
 from psycopg2._psycopg import ISQLQuote, Notify, Diagnostics, Column
 
 from psycopg2._psycopg import QueryCanceledError, TransactionRollbackError
-from psycopg2._psycopg import replicationMessage
+from psycopg2._psycopg import ReplicationCursor, ReplicationMessage
 
 try:
     from psycopg2._psycopg import set_wait_callback, get_wait_callback
diff --git a/lib/extras.py b/lib/extras.py
index 8854ec2b..7c713573 100644
--- a/lib/extras.py
+++ b/lib/extras.py
@@ -39,7 +39,8 @@ import psycopg2
 from psycopg2 import extensions as _ext
 from psycopg2.extensions import cursor as _cursor
 from psycopg2.extensions import connection as _connection
-from psycopg2.extensions import replicationMessage as ReplicationMessage
+from psycopg2.extensions import ReplicationCursor as _replicationCursor
+from psycopg2.extensions import ReplicationMessage
 from psycopg2.extensions import adapt as _A, quote_ident
 from psycopg2.extensions import b
 
@@ -503,7 +504,7 @@ class PhysicalReplicationConnection(ReplicationConnectionBase):
 class StopReplication(Exception):
     """
     Exception used to break out of the endless loop in
-    `~ReplicationCursor.consume_replication_stream()`.
+    `~ReplicationCursor.consume_stream()`.
 
     Subclass of `~exceptions.Exception`.  Intentionally *not* inherited from
     `~psycopg2.Error` as occurrence of this exception does not indicate an
@@ -512,7 +513,7 @@ class StopReplication(Exception):
     pass
 
 
-class ReplicationCursor(_cursor):
+class ReplicationCursor(_replicationCursor):
     """A cursor used for communication on the replication protocol."""
 
     def create_replication_slot(self, slot_name, slot_type=None, output_plugin=None):
@@ -598,9 +599,6 @@ class ReplicationCursor(_cursor):
 
         self.start_replication_expert(command)
 
-    def send_feedback_message(self, written_lsn=0, sync_lsn=0, apply_lsn=0, reply_requested=False):
-        return self.send_replication_feedback(written_lsn, sync_lsn, apply_lsn, reply_requested)
-
     # allows replication cursors to be used in select.select() directly
     def fileno(self):
         return self.connection.fileno()
diff --git a/psycopg/cursor.h b/psycopg/cursor.h
index 669e176d..18e31e5f 100644
--- a/psycopg/cursor.h
+++ b/psycopg/cursor.h
@@ -27,7 +27,6 @@
 #define PSYCOPG_CURSOR_H 1
 
 #include "psycopg/connection.h"
-#include "libpq_support.h"
 
 #ifdef __cplusplus
 extern "C" {
@@ -74,14 +73,6 @@ struct cursorObject {
 #define DEFAULT_COPYBUFF  8192
 
     /* replication cursor attrs */
-    int         repl_started:1;          /* if replication is started */
-    int         repl_consuming:1;        /* if running the consume loop */
-    struct timeval repl_keepalive_interval;   /* interval for keepalive messages in replication mode */
-    XLogRecPtr  repl_write_lsn;        /* LSN stats for replication feedback messages */
-    XLogRecPtr  repl_flush_lsn;
-    XLogRecPtr  repl_apply_lsn;
-    int         repl_feedback_pending; /* flag set when we couldn't send the feedback to the server */
-    struct timeval repl_last_io;       /* timestamp of the last exchange with the server */
 
     PyObject *tuple_factory;    /* factory for result tuples */
     PyObject *tzinfo_factory;   /* factory for tzinfo objects */
@@ -106,7 +97,7 @@ HIDDEN void curs_reset(cursorObject *self);
 HIDDEN int psyco_curs_withhold_set(cursorObject *self, PyObject *pyvalue);
 HIDDEN int psyco_curs_scrollable_set(cursorObject *self, PyObject *pyvalue);
 
-RAISES_NEG int psyco_curs_datetime_init(void);
+HIDDEN int psyco_curs_init(PyObject *obj, PyObject *args, PyObject *kwargs);
 
 /* exception-raising macros */
 #define EXC_IF_CURS_CLOSED(self) \
@@ -149,22 +140,6 @@ do \
     return NULL; } \
 while (0)
 
-#define EXC_IF_REPLICATING(self, cmd) \
-do \
-    if ((self)->repl_started) { \
-        PyErr_SetString(ProgrammingError, \
-            #cmd " cannot be used when replication is already in progress"); \
-    return NULL; } \
-while (0)
-
-#define EXC_IF_NOT_REPLICATING(self, cmd) \
-do \
-    if (!(self)->repl_started) { \
-        PyErr_SetString(ProgrammingError, \
-            #cmd " cannot be used when replication is not in progress"); \
-    return NULL; } \
-while (0)
-
 #ifdef __cplusplus
 }
 #endif
diff --git a/psycopg/cursor_type.c b/psycopg/cursor_type.c
index d51f7a55..63bd5a10 100644
--- a/psycopg/cursor_type.c
+++ b/psycopg/cursor_type.c
@@ -28,7 +28,6 @@
 
 #include "psycopg/cursor.h"
 #include "psycopg/connection.h"
-#include "psycopg/replication_message.h"
 #include "psycopg/green.h"
 #include "psycopg/pqpath.h"
 #include "psycopg/typecast.h"
@@ -39,9 +38,6 @@
 
 #include <stdlib.h>
 
-/* python */
-#include "datetime.h"
-
 
 /** DBAPI methods **/
 
@@ -1583,222 +1579,6 @@ exit:
     return res;
 }
 
-#define psyco_curs_start_replication_expert_doc \
-"start_replication_expert(command, writer=None, keepalive_interval=10) -- Start and consume replication stream with direct command."
-
-static PyObject *
-psyco_curs_start_replication_expert(cursorObject *self, PyObject *args, PyObject *kwargs)
-{
-    PyObject *res = NULL;
-    char *command;
-    static char *kwlist[] = {"command", NULL};
-
-    if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s", kwlist, &command)) {
-        return NULL;
-    }
-
-    EXC_IF_CURS_CLOSED(self);
-    EXC_IF_GREEN(start_replication_expert);
-    EXC_IF_TPC_PREPARED(self->conn, start_replication_expert);
-    EXC_IF_REPLICATING(self, start_replication_expert);
-
-    Dprintf("psyco_curs_start_replication_expert: %s", command);
-
-    self->copysize = 0;
-    self->repl_consuming = 0;
-
-    self->repl_write_lsn = InvalidXLogRecPtr;
-    self->repl_flush_lsn = InvalidXLogRecPtr;
-    self->repl_apply_lsn = InvalidXLogRecPtr;
-    self->repl_feedback_pending = 0;
-
-    gettimeofday(&self->repl_last_io, NULL);
-
-    if (pq_execute(self, command, self->conn->async,
-                   1 /* no_result */, 1 /* no_begin */) >= 0) {
-        res = Py_None;
-        Py_INCREF(res);
-
-        self->repl_started = 1;
-    }
-
-    return res;
-}
-
-#define psyco_curs_consume_replication_stream_doc \
-"consume_replication_stream(consumer, keepalive_interval=10) -- Consume replication stream."
-
-static PyObject *
-psyco_curs_consume_replication_stream(cursorObject *self, PyObject *args, PyObject *kwargs)
-{
-    PyObject *consume = NULL, *res = NULL;
-    int decode = 0;
-    double keepalive_interval = 10;
-    static char *kwlist[] = {"consume", "decode", "keepalive_interval", NULL};
-
-    if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|id", kwlist,
-                                     &consume, &decode, &keepalive_interval)) {
-        return NULL;
-    }
-
-    EXC_IF_CURS_CLOSED(self);
-    EXC_IF_CURS_ASYNC(self, consume_replication_stream);
-    EXC_IF_GREEN(consume_replication_stream);
-    EXC_IF_TPC_PREPARED(self->conn, consume_replication_stream);
-    EXC_IF_NOT_REPLICATING(self, consume_replication_stream);
-
-    if (self->repl_consuming) {
-        PyErr_SetString(ProgrammingError,
-                        "consume_replication_stream cannot be used when already in the consume loop");
-        return NULL;
-    }
-
-    Dprintf("psyco_curs_consume_replication_stream");
-
-    if (keepalive_interval < 1.0) {
-        psyco_set_error(ProgrammingError, self, "keepalive_interval must be >= 1 (sec)");
-        return NULL;
-    }
-
-    self->repl_consuming = 1;
-
-    if (pq_copy_both(self, consume, decode, keepalive_interval) >= 0) {
-        res = Py_None;
-        Py_INCREF(res);
-    }
-
-    self->repl_consuming = 0;
-
-    return res;
-}
-
-#define psyco_curs_read_replication_message_doc \
-"read_replication_message(decode=True) -- Try reading a replication message from the server (non-blocking)."
-
-static PyObject *
-psyco_curs_read_replication_message(cursorObject *self, PyObject *args, PyObject *kwargs)
-{
-    int decode = 1;
-    static char *kwlist[] = {"decode", NULL};
-
-    EXC_IF_CURS_CLOSED(self);
-    EXC_IF_GREEN(read_replication_message);
-    EXC_IF_TPC_PREPARED(self->conn, read_replication_message);
-    EXC_IF_NOT_REPLICATING(self, read_replication_message);
-
-    if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|i", kwlist,
-                                     &decode)) {
-        return NULL;
-    }
-
-    return pq_read_replication_message(self, decode);
-}
-
-static PyObject *
-curs_flush_replication_feedback(cursorObject *self, int reply)
-{
-    if (!(self->repl_feedback_pending || reply))
-        Py_RETURN_TRUE;
-
-    if (pq_send_replication_feedback(self, reply)) {
-        self->repl_feedback_pending = 0;
-        Py_RETURN_TRUE;
-    } else {
-        self->repl_feedback_pending = 1;
-        Py_RETURN_FALSE;
-    }
-}
-
-#define psyco_curs_send_replication_feedback_doc \
-"send_replication_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False) -- Try sending a replication feedback message to the server and optionally request a reply."
-
-static PyObject *
-psyco_curs_send_replication_feedback(cursorObject *self, PyObject *args, PyObject *kwargs)
-{
-    XLogRecPtr write_lsn = InvalidXLogRecPtr,
-               flush_lsn = InvalidXLogRecPtr,
-               apply_lsn = InvalidXLogRecPtr;
-    int reply = 0;
-    static char* kwlist[] = {"write_lsn", "flush_lsn", "apply_lsn", "reply", NULL};
-
-    EXC_IF_CURS_CLOSED(self);
-    EXC_IF_NOT_REPLICATING(self, send_replication_feedback);
-
-    if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|KKKi", kwlist,
-                                     &write_lsn, &flush_lsn, &apply_lsn, &reply)) {
-        return NULL;
-    }
-
-    if (write_lsn > self->repl_write_lsn)
-        self->repl_write_lsn = write_lsn;
-
-    if (flush_lsn > self->repl_flush_lsn)
-        self->repl_flush_lsn = flush_lsn;
-
-    if (apply_lsn > self->repl_apply_lsn)
-        self->repl_apply_lsn = apply_lsn;
-
-    self->repl_feedback_pending = 1;
-
-    return curs_flush_replication_feedback(self, reply);
-}
-
-#define psyco_curs_flush_replication_feedback_doc \
-"flush_replication_feedback(reply=False) -- Try flushing the latest pending replication feedback message to the server and optionally request a reply."
-
-static PyObject *
-psyco_curs_flush_replication_feedback(cursorObject *self, PyObject *args, PyObject *kwargs)
-{
-    int reply = 0;
-    static char *kwlist[] = {"reply", NULL};
-
-    EXC_IF_CURS_CLOSED(self);
-    EXC_IF_NOT_REPLICATING(self, flush_replication_feedback);
-
-    if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|i", kwlist,
-                                     &reply)) {
-        return NULL;
-    }
-
-    return curs_flush_replication_feedback(self, reply);
-}
-
-
-RAISES_NEG int
-psyco_curs_datetime_init(void)
-{
-    Dprintf("psyco_curs_datetime_init: datetime init");
-
-    PyDateTime_IMPORT;
-
-    if (!PyDateTimeAPI) {
-        PyErr_SetString(PyExc_ImportError, "datetime initialization failed");
-        return -1;
-    }
-    return 0;
-}
-
-#define psyco_curs_replication_io_timestamp_doc \
-"replication_io_timestamp -- the timestamp of latest IO with the server"
-
-static PyObject *
-psyco_curs_get_replication_io_timestamp(cursorObject *self)
-{
-    PyObject *tval, *res = NULL;
-    double seconds;
-
-    EXC_IF_CURS_CLOSED(self);
-
-    seconds = self->repl_last_io.tv_sec + self->repl_last_io.tv_usec / 1.0e6;
-
-    tval = Py_BuildValue("(d)", seconds);
-    if (tval) {
-        res = PyDateTime_FromTimestamp(tval);
-        Py_DECREF(tval);
-    }
-    return res;
-}
-
 /* extension: closed - return true if cursor is closed */
 
 #define psyco_curs_closed_doc \
@@ -1973,16 +1753,6 @@ static struct PyMethodDef cursorObject_methods[] = {
      METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_to_doc},
     {"copy_expert", (PyCFunction)psyco_curs_copy_expert,
      METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_expert_doc},
-    {"start_replication_expert", (PyCFunction)psyco_curs_start_replication_expert,
-     METH_VARARGS|METH_KEYWORDS, psyco_curs_start_replication_expert_doc},
-    {"consume_replication_stream", (PyCFunction)psyco_curs_consume_replication_stream,
-     METH_VARARGS|METH_KEYWORDS, psyco_curs_consume_replication_stream_doc},
-    {"read_replication_message", (PyCFunction)psyco_curs_read_replication_message,
-     METH_VARARGS|METH_KEYWORDS, psyco_curs_read_replication_message_doc},
-    {"send_replication_feedback", (PyCFunction)psyco_curs_send_replication_feedback,
-     METH_VARARGS|METH_KEYWORDS, psyco_curs_send_replication_feedback_doc},
-    {"flush_replication_feedback", (PyCFunction)psyco_curs_flush_replication_feedback,
-     METH_VARARGS|METH_KEYWORDS, psyco_curs_flush_replication_feedback_doc},
     {NULL}
 };
 
@@ -2033,9 +1803,6 @@ static struct PyGetSetDef cursorObject_getsets[] = {
       (getter)psyco_curs_scrollable_get,
       (setter)psyco_curs_scrollable_set,
       psyco_curs_scrollable_doc, NULL },
-    { "replication_io_timestamp",
-      (getter)psyco_curs_get_replication_io_timestamp, NULL,
-      psyco_curs_replication_io_timestamp_doc, NULL },
     {NULL}
 };
 
@@ -2134,7 +1901,7 @@ cursor_dealloc(PyObject* obj)
     Py_TYPE(obj)->tp_free(obj);
 }
 
-static int
+int
 cursor_init(PyObject *obj, PyObject *args, PyObject *kwargs)
 {
     PyObject *conn;
diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c
index f38fbd39..d6886981 100644
--- a/psycopg/pqpath.c
+++ b/psycopg/pqpath.c
@@ -35,6 +35,7 @@
 #include "psycopg/pqpath.h"
 #include "psycopg/connection.h"
 #include "psycopg/cursor.h"
+#include "psycopg/replication_cursor.h"
 #include "psycopg/replication_message.h"
 #include "psycopg/green.h"
 #include "psycopg/typecast.h"
@@ -1542,19 +1543,23 @@ exit:
    are never returned to the caller.
  */
 PyObject *
-pq_read_replication_message(cursorObject *curs, int decode)
+pq_read_replication_message(replicationCursorObject *repl, int decode)
 {
+    cursorObject *curs = &repl->cur;
+    connectionObject *conn = curs->conn;
+    PGconn *pgconn = conn->pgconn;
     char *buffer = NULL;
     int len, data_size, consumed, hdr, reply;
     XLogRecPtr data_start, wal_end;
     pg_int64 send_time;
-    PyObject *str = NULL, *msg = NULL;
+    PyObject *str = NULL, *result = NULL;
+    replicationMessageObject *msg = NULL;
 
     Dprintf("pq_read_replication_message(decode=%d)", decode);
 
     consumed = 0;
 retry:
-    len = PQgetCopyData(curs->conn->pgconn, &buffer, 1 /* async */);
+    len = PQgetCopyData(pgconn, &buffer, 1 /* async */);
 
     if (len == 0) {
         /* If we've tried reading some data, but there was none, bail out. */
@@ -1566,8 +1571,8 @@ retry:
            server we might be reading a number of messages for every single
            one we process, thus overgrowing the internal buffer until the
            client system runs out of memory. */
-        if (!PQconsumeInput(curs->conn->pgconn)) {
-            pq_raise(curs->conn, curs, NULL);
+        if (!PQconsumeInput(pgconn)) {
+            pq_raise(conn, curs, NULL);
             goto exit;
         }
         /* But PQconsumeInput() doesn't tell us if it has actually read
@@ -1581,15 +1586,15 @@ retry:
 
     if (len == -2) {
         /* serious error */
-        pq_raise(curs->conn, curs, NULL);
+        pq_raise(conn, curs, NULL);
         goto exit;
     }
     if (len == -1) {
         /* EOF */
-        curs->pgres = PQgetResult(curs->conn->pgconn);
+        curs->pgres = PQgetResult(pgconn);
 
         if (curs->pgres && PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR) {
-            pq_raise(curs->conn, curs, NULL);
+            pq_raise(conn, curs, NULL);
             goto exit;
         }
 
@@ -1603,7 +1608,7 @@ retry:
     consumed = 1;
 
     /* ok, we did really read something: update the io timestamp */
-    gettimeofday(&curs->repl_last_io, NULL);
+    gettimeofday(&repl->last_io, NULL);
 
     Dprintf("pq_read_replication_message: msg=%c, len=%d", buffer[0], len);
     if (buffer[0] == 'w') {
@@ -1626,21 +1631,22 @@ retry:
 
         /* XXX it would be wise to check if it's really a logical replication */
         if (decode) {
-            str = PyUnicode_Decode(buffer + hdr, data_size, curs->conn->codec, NULL);
+            str = PyUnicode_Decode(buffer + hdr, data_size, conn->codec, NULL);
         } else {
             str = Bytes_FromStringAndSize(buffer + hdr, data_size);
         }
         if (!str) { goto exit; }
 
-        msg = PyObject_CallFunctionObjArgs((PyObject *)&replicationMessageType,
-                                           curs, str, NULL);
+        result = PyObject_CallFunctionObjArgs((PyObject *)&replicationMessageType,
+                                              curs, str, NULL);
         Py_DECREF(str);
-        if (!msg) { goto exit; }
+        if (!result) { goto exit; }
 
-        ((replicationMessageObject *)msg)->data_size = data_size;
-        ((replicationMessageObject *)msg)->data_start = data_start;
-        ((replicationMessageObject *)msg)->wal_end = wal_end;
-        ((replicationMessageObject *)msg)->send_time = send_time;
+        msg = (replicationMessageObject *)result;
+        msg->data_size  = data_size;
+        msg->data_start = data_start;
+        msg->wal_end    = wal_end;
+        msg->send_time  = send_time;
     }
     else if (buffer[0] == 'k') {
         /* Primary keepalive message: msgtype(1), walEnd(8), sendTime(8), reply(1) */
@@ -1652,17 +1658,17 @@ retry:
 
         reply = buffer[hdr];
         if (reply) {
-            if (!pq_send_replication_feedback(curs, 0)) {
-                if (curs->conn->async) {
-                    curs->repl_feedback_pending = 1;
+            if (!pq_send_replication_feedback(repl, 0)) {
+                if (conn->async) {
+                    repl->feedback_pending = 1;
                 } else {
                     /* XXX not sure if this was a good idea after all */
-                    pq_raise(curs->conn, curs, NULL);
+                    pq_raise(conn, curs, NULL);
                     goto exit;
                 }
             }
             else {
-                gettimeofday(&curs->repl_last_io, NULL);
+                gettimeofday(&repl->last_io, NULL);
             }
         }
 
@@ -1680,37 +1686,38 @@ exit:
         PQfreemem(buffer);
     }
 
-    return msg;
+    return result;
 
 none:
-    msg = Py_None;
-    Py_INCREF(msg);
+    result = Py_None;
+    Py_INCREF(result);
     goto exit;
 }
 
 int
-pq_send_replication_feedback(cursorObject* curs, int reply_requested)
+pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested)
 {
+    cursorObject *curs = &repl->cur;
+    PGconn *pgconn = curs->conn->pgconn;
     char replybuf[1 + 8 + 8 + 8 + 8 + 1];
     int len = 0;
 
     Dprintf("pq_send_replication_feedback: write="XLOGFMTSTR", flush="XLOGFMTSTR", apply="XLOGFMTSTR,
-            XLOGFMTARGS(curs->repl_write_lsn),
-            XLOGFMTARGS(curs->repl_flush_lsn),
-            XLOGFMTARGS(curs->repl_apply_lsn));
+            XLOGFMTARGS(repl->write_lsn),
+            XLOGFMTARGS(repl->flush_lsn),
+            XLOGFMTARGS(repl->apply_lsn));
 
     replybuf[len] = 'r'; len += 1;
-    fe_sendint64(curs->repl_write_lsn, &replybuf[len]); len += 8;
-    fe_sendint64(curs->repl_flush_lsn, &replybuf[len]); len += 8;
-    fe_sendint64(curs->repl_apply_lsn, &replybuf[len]); len += 8;
+    fe_sendint64(repl->write_lsn, &replybuf[len]); len += 8;
+    fe_sendint64(repl->flush_lsn, &replybuf[len]); len += 8;
+    fe_sendint64(repl->apply_lsn, &replybuf[len]); len += 8;
     fe_sendint64(feGetCurrentTimestamp(), &replybuf[len]); len += 8;
     replybuf[len] = reply_requested ? 1 : 0; len += 1;
 
-    if (PQputCopyData(curs->conn->pgconn, replybuf, len) <= 0 ||
-        PQflush(curs->conn->pgconn) != 0) {
+    if (PQputCopyData(pgconn, replybuf, len) <= 0 || PQflush(pgconn) != 0) {
         return 0;
     }
-    gettimeofday(&curs->repl_last_io, NULL);
+    gettimeofday(&repl->last_io, NULL);
 
     return 1;
 }
@@ -1723,12 +1730,15 @@ pq_send_replication_feedback(cursorObject* curs, int reply_requested)
    manages to send keepalive messages to the server as needed.
 */
 int
-pq_copy_both(cursorObject *curs, PyObject *consume, int decode, double keepalive_interval)
+pq_copy_both(replicationCursorObject *repl, PyObject *consume, int decode,
+             double keepalive_interval)
 {
+    cursorObject *curs = &repl->cur;
+    connectionObject *conn = curs->conn;
+    PGconn *pgconn = conn->pgconn;
     PyObject *msg, *tmp = NULL;
     PyObject *consume_func = NULL;
     int fd, sel, ret = -1;
-    PGconn *pgconn;
     fd_set fds;
     struct timeval keep_intr, curr_time, ping_time, timeout;
 
@@ -1738,13 +1748,12 @@ pq_copy_both(cursorObject *curs, PyObject *consume, int decode, double keepalive
     }
 
     CLEARPGRES(curs->pgres);
-    pgconn = curs->conn->pgconn;
 
     keep_intr.tv_sec  = (int)keepalive_interval;
     keep_intr.tv_usec = (keepalive_interval - keep_intr.tv_sec)*1.0e6;
 
     while (1) {
-        msg = pq_read_replication_message(curs, decode);
+        msg = pq_read_replication_message(repl, decode);
         if (!msg) {
             goto exit;
         }
@@ -1753,7 +1762,7 @@ pq_copy_both(cursorObject *curs, PyObject *consume, int decode, double keepalive
 
             fd = PQsocket(pgconn);
             if (fd < 0) {
-                pq_raise(curs->conn, curs, NULL);
+                pq_raise(conn, curs, NULL);
                 goto exit;
             }
 
@@ -1763,7 +1772,7 @@ pq_copy_both(cursorObject *curs, PyObject *consume, int decode, double keepalive
             /* how long can we wait before we need to send a keepalive? */
             gettimeofday(&curr_time, NULL);
 
-            timeradd(&curs->repl_last_io, &keep_intr, &ping_time);
+            timeradd(&repl->last_io, &keep_intr, &ping_time);
             timersub(&ping_time, &curr_time, &timeout);
 
             if (timeout.tv_sec >= 0) {
@@ -1787,8 +1796,8 @@ pq_copy_both(cursorObject *curs, PyObject *consume, int decode, double keepalive
             }
 
             if (sel == 0) {
-                if (!pq_send_replication_feedback(curs, 0)) {
-                    pq_raise(curs->conn, curs, NULL);
+                if (!pq_send_replication_feedback(repl, 0)) {
+                    pq_raise(conn, curs, NULL);
                     goto exit;
                 }
             }
@@ -1876,7 +1885,7 @@ pq_fetch(cursorObject *curs, int no_result)
         Dprintf("pq_fetch: data from a streaming replication slot (no tuples)");
         curs->rowcount = -1;
         ex = 0;
-        /* nothing to do here: _pq_copy_both_v3 will be called separately */
+        /* nothing to do here: pq_copy_both will be called separately */
         CLEARPGRES(curs->pgres);
         break;
 
diff --git a/psycopg/pqpath.h b/psycopg/pqpath.h
index a858a269..568f0768 100644
--- a/psycopg/pqpath.h
+++ b/psycopg/pqpath.h
@@ -27,6 +27,7 @@
 #define PSYCOPG_PQPATH_H 1
 
 #include "psycopg/cursor.h"
+#include "psycopg/replication_cursor.h"
 #include "psycopg/connection.h"
 
 /* macro to clean the pg result */
@@ -72,9 +73,10 @@ HIDDEN int pq_execute_command_locked(connectionObject *conn,
 RAISES HIDDEN void pq_complete_error(connectionObject *conn, PGresult **pgres,
                               char **error);
 
-HIDDEN int pq_copy_both(cursorObject *curs, PyObject *consumer,
+/* replication protocol support */
+HIDDEN int pq_copy_both(replicationCursorObject *repl, PyObject *consumer,
                         int decode, double keepalive_interval);
-HIDDEN PyObject *pq_read_replication_message(cursorObject *curs, int decode);
-HIDDEN int pq_send_replication_feedback(cursorObject *curs, int reply_requested);
+HIDDEN PyObject *pq_read_replication_message(replicationCursorObject *repl, int decode);
+HIDDEN int pq_send_replication_feedback(replicationCursorObject *repl, int reply_requested);
 
 #endif /* !defined(PSYCOPG_PQPATH_H) */
diff --git a/psycopg/psycopgmodule.c b/psycopg/psycopgmodule.c
index f9f29a2e..25e32598 100644
--- a/psycopg/psycopgmodule.c
+++ b/psycopg/psycopgmodule.c
@@ -28,6 +28,7 @@
 
 #include "psycopg/connection.h"
 #include "psycopg/cursor.h"
+#include "psycopg/replication_cursor.h"
 #include "psycopg/replication_message.h"
 #include "psycopg/green.h"
 #include "psycopg/lobject.h"
@@ -917,6 +918,9 @@ INIT_MODULE(_psycopg)(void)
     Py_TYPE(&cursorType) = &PyType_Type;
     if (PyType_Ready(&cursorType) == -1) goto exit;
 
+    Py_TYPE(&replicationCursorType) = &PyType_Type;
+    if (PyType_Ready(&replicationCursorType) == -1) goto exit;
+
     Py_TYPE(&replicationMessageType) = &PyType_Type;
     if (PyType_Ready(&replicationMessageType) == -1) goto exit;
 
@@ -1000,7 +1004,7 @@ INIT_MODULE(_psycopg)(void)
     /* Initialize the PyDateTimeAPI everywhere is used */
     PyDateTime_IMPORT;
     if (psyco_adapter_datetime_init()) { goto exit; }
-    if (psyco_curs_datetime_init()) { goto exit; }
+    if (psyco_repl_curs_datetime_init()) { goto exit; }
     if (psyco_replmsg_datetime_init()) { goto exit; }
 
     Py_TYPE(&pydatetimeType) = &PyType_Type;
@@ -1044,7 +1048,8 @@ INIT_MODULE(_psycopg)(void)
     /* put new types in module dictionary */
     PyModule_AddObject(module, "connection", (PyObject*)&connectionType);
     PyModule_AddObject(module, "cursor", (PyObject*)&cursorType);
-    PyModule_AddObject(module, "replicationMessage", (PyObject*)&replicationMessageType);
+    PyModule_AddObject(module, "ReplicationCursor", (PyObject*)&replicationCursorType);
+    PyModule_AddObject(module, "ReplicationMessage", (PyObject*)&replicationMessageType);
     PyModule_AddObject(module, "ISQLQuote", (PyObject*)&isqlquoteType);
     PyModule_AddObject(module, "Notify", (PyObject*)&notifyType);
     PyModule_AddObject(module, "Xid", (PyObject*)&xidType);
diff --git a/psycopg/replication_cursor.h b/psycopg/replication_cursor.h
new file mode 100644
index 00000000..1b6dbfab
--- /dev/null
+++ b/psycopg/replication_cursor.h
@@ -0,0 +1,77 @@
+/* replication_cursor.h - definition for the psycopg replication cursor type
+ *
+ * Copyright (C) 2015 Daniele Varrazzo <daniele.varrazzo@gmail.com>
+ *
+ * This file is part of psycopg.
+ *
+ * psycopg2 is free software: you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * In addition, as a special exception, the copyright holders give
+ * permission to link this program with the OpenSSL library (or with
+ * modified versions of OpenSSL that use the same license as OpenSSL),
+ * and distribute linked combinations including the two.
+ *
+ * You must obey the GNU Lesser General Public License in all respects for
+ * all of the code used other than OpenSSL.
+ *
+ * psycopg2 is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
+ * License for more details.
+ */
+
+#ifndef PSYCOPG_REPLICATION_CURSOR_H
+#define PSYCOPG_REPLICATION_CURSOR_H 1
+
+#include "psycopg/cursor.h"
+#include "libpq_support.h"
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+extern HIDDEN PyTypeObject replicationCursorType;
+
+typedef struct replicationCursorObject {
+    cursorObject cur;
+
+    int         started:1;          /* if replication is started */
+    int         consuming:1;        /* if running the consume loop */
+
+    struct timeval last_io;       /* timestamp of the last exchange with the server */
+    struct timeval keepalive_interval;   /* interval for keepalive messages in replication mode */
+
+    XLogRecPtr  write_lsn;        /* LSN stats for replication feedback messages */
+    XLogRecPtr  flush_lsn;
+    XLogRecPtr  apply_lsn;
+    int         feedback_pending; /* flag set when we couldn't send the feedback to the server */
+} replicationCursorObject;
+
+
+RAISES_NEG int psyco_repl_curs_datetime_init(void);
+
+/* exception-raising macros */
+#define EXC_IF_REPLICATING(self, cmd) \
+do \
+    if ((self)->started) { \
+        PyErr_SetString(ProgrammingError, \
+            #cmd " cannot be used when replication is already in progress"); \
+    return NULL; } \
+while (0)
+
+#define EXC_IF_NOT_REPLICATING(self, cmd) \
+do \
+    if (!(self)->started) { \
+        PyErr_SetString(ProgrammingError, \
+            #cmd " cannot be used when replication is not in progress"); \
+    return NULL; } \
+while (0)
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif /* !defined(PSYCOPG_REPLICATION_CURSOR_H) */
diff --git a/psycopg/replication_cursor_type.c b/psycopg/replication_cursor_type.c
new file mode 100644
index 00000000..d1f7939a
--- /dev/null
+++ b/psycopg/replication_cursor_type.c
@@ -0,0 +1,360 @@
+/* replication_cursor_type.c - python interface to replication cursor objects
+ *
+ * Copyright (C) 2015 Daniele Varrazzo <daniele.varrazzo@gmail.com>
+ *
+ * This file is part of psycopg.
+ *
+ * psycopg2 is free software: you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as published
+ * by the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * In addition, as a special exception, the copyright holders give
+ * permission to link this program with the OpenSSL library (or with
+ * modified versions of OpenSSL that use the same license as OpenSSL),
+ * and distribute linked combinations including the two.
+ *
+ * You must obey the GNU Lesser General Public License in all respects for
+ * all of the code used other than OpenSSL.
+ *
+ * psycopg2 is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.  See the GNU Lesser General Public
+ * License for more details.
+ */
+
+#define PSYCOPG_MODULE
+#include "psycopg/psycopg.h"
+
+#include "psycopg/replication_cursor.h"
+#include "psycopg/replication_message.h"
+#include "psycopg/green.h"
+#include "psycopg/pqpath.h"
+
+#include <string.h>
+#include <stdlib.h>
+
+/* python */
+#include "datetime.h"
+
+
+#define psyco_repl_curs_start_replication_expert_doc                                \
+"start_replication_expert(command, writer=None, keepalive_interval=10) -- Start replication stream with a directly given command."
+
+static PyObject *
+psyco_repl_curs_start_replication_expert(replicationCursorObject *self,
+                                         PyObject *args, PyObject *kwargs)
+{
+    cursorObject *curs = &self->cur;
+    connectionObject *conn = self->cur.conn;
+    PyObject *res = NULL;
+    char *command;
+    static char *kwlist[] = {"command", NULL};
+
+    if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s", kwlist, &command)) {
+        return NULL;
+    }
+
+    EXC_IF_CURS_CLOSED(curs);
+    EXC_IF_GREEN(start_replication_expert);
+    EXC_IF_TPC_PREPARED(conn, start_replication_expert);
+    EXC_IF_REPLICATING(self, start_replication_expert);
+
+    Dprintf("psyco_repl_curs_start_replication_expert: %s", command);
+
+    /*    self->copysize = 0;*/
+
+    gettimeofday(&self->last_io, NULL);
+
+    if (pq_execute(curs, command, conn->async, 1 /* no_result */, 1 /* no_begin */) >= 0) {
+        res = Py_None;
+        Py_INCREF(res);
+
+        self->started = 1;
+    }
+
+    return res;
+}
+
+#define psyco_repl_curs_consume_stream_doc \
+"consume_stream(consumer, keepalive_interval=10) -- Consume replication stream."
+
+static PyObject *
+psyco_repl_curs_consume_stream(replicationCursorObject *self,
+                               PyObject *args, PyObject *kwargs)
+{
+    cursorObject *curs = &self->cur;
+    PyObject *consume = NULL, *res = NULL;
+    int decode = 0;
+    double keepalive_interval = 10;
+    static char *kwlist[] = {"consume", "decode", "keepalive_interval", NULL};
+
+    if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|id", kwlist,
+                                     &consume, &decode, &keepalive_interval)) {
+        return NULL;
+    }
+
+    EXC_IF_CURS_CLOSED(curs);
+    EXC_IF_CURS_ASYNC(curs, consume_stream);
+    EXC_IF_GREEN(consume_stream);
+    EXC_IF_TPC_PREPARED(self->cur.conn, consume_stream);
+    EXC_IF_NOT_REPLICATING(self, consume_stream);
+
+    if (self->consuming) {
+        PyErr_SetString(ProgrammingError,
+                        "consume_stream cannot be used when already in the consume loop");
+        return NULL;
+    }
+
+    Dprintf("psyco_repl_curs_consume_stream");
+
+    if (keepalive_interval < 1.0) {
+        psyco_set_error(ProgrammingError, curs, "keepalive_interval must be >= 1 (sec)");
+        return NULL;
+    }
+
+    self->consuming = 1;
+
+    if (pq_copy_both(self, consume, decode, keepalive_interval) >= 0) {
+        res = Py_None;
+        Py_INCREF(res);
+    }
+
+    self->consuming = 0;
+
+    return res;
+}
+
+#define psyco_repl_curs_read_message_doc \
+"read_message(decode=True) -- Try reading a replication message from the server (non-blocking)."
+
+static PyObject *
+psyco_repl_curs_read_message(replicationCursorObject *self,
+                             PyObject *args, PyObject *kwargs)
+{
+    cursorObject *curs = &self->cur;
+    int decode = 1;
+    static char *kwlist[] = {"decode", NULL};
+
+    EXC_IF_CURS_CLOSED(curs);
+    EXC_IF_GREEN(read_message);
+    EXC_IF_TPC_PREPARED(self->cur.conn, read_message);
+    EXC_IF_NOT_REPLICATING(self, read_message);
+
+    if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|i", kwlist,
+                                     &decode)) {
+        return NULL;
+    }
+
+    return pq_read_replication_message(self, decode);
+}
+
+static PyObject *
+repl_curs_flush_feedback(replicationCursorObject *self, int reply)
+{
+    if (!(self->feedback_pending || reply))
+        Py_RETURN_TRUE;
+
+    if (pq_send_replication_feedback(self, reply)) {
+        self->feedback_pending = 0;
+        Py_RETURN_TRUE;
+    } else {
+        self->feedback_pending = 1;
+        Py_RETURN_FALSE;
+    }
+}
+
+#define psyco_repl_curs_send_feedback_doc \
+"send_feedback(write_lsn=0, flush_lsn=0, apply_lsn=0, reply=False) -- Try sending a replication feedback message to the server and optionally request a reply."
+
+static PyObject *
+psyco_repl_curs_send_feedback(replicationCursorObject *self,
+                              PyObject *args, PyObject *kwargs)
+{
+    cursorObject *curs = &self->cur;
+    XLogRecPtr write_lsn = InvalidXLogRecPtr,
+               flush_lsn = InvalidXLogRecPtr,
+               apply_lsn = InvalidXLogRecPtr;
+    int reply = 0;
+    static char* kwlist[] = {"write_lsn", "flush_lsn", "apply_lsn", "reply", NULL};
+
+    EXC_IF_CURS_CLOSED(curs);
+    EXC_IF_NOT_REPLICATING(self, send_feedback);
+
+    if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|KKKi", kwlist,
+                                     &write_lsn, &flush_lsn, &apply_lsn, &reply)) {
+        return NULL;
+    }
+
+    if (write_lsn > self->write_lsn)
+        self->write_lsn = write_lsn;
+
+    if (flush_lsn > self->flush_lsn)
+        self->flush_lsn = flush_lsn;
+
+    if (apply_lsn > self->apply_lsn)
+        self->apply_lsn = apply_lsn;
+
+    self->feedback_pending = 1;
+
+    return repl_curs_flush_feedback(self, reply);
+}
+
+#define psyco_repl_curs_flush_feedback_doc \
+"flush_feedback(reply=False) -- Try flushing the latest pending replication feedback message to the server and optionally request a reply."
+
+static PyObject *
+psyco_repl_curs_flush_feedback(replicationCursorObject *self,
+                               PyObject *args, PyObject *kwargs)
+{
+    cursorObject *curs = &self->cur;
+    int reply = 0;
+    static char *kwlist[] = {"reply", NULL};
+
+    EXC_IF_CURS_CLOSED(curs);
+    EXC_IF_NOT_REPLICATING(self, flush_feedback);
+
+    if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|i", kwlist,
+                                     &reply)) {
+        return NULL;
+    }
+
+    return repl_curs_flush_feedback(self, reply);
+}
+
+
+RAISES_NEG int
+psyco_repl_curs_datetime_init(void)
+{
+    Dprintf("psyco_repl_curs_datetime_init: datetime init");
+
+    PyDateTime_IMPORT;
+
+    if (!PyDateTimeAPI) {
+        PyErr_SetString(PyExc_ImportError, "datetime initialization failed");
+        return -1;
+    }
+    return 0;
+}
+
+#define psyco_repl_curs_io_timestamp_doc \
+"io_timestamp -- the timestamp of latest IO with the server"
+
+static PyObject *
+psyco_repl_curs_get_io_timestamp(replicationCursorObject *self)
+{
+    cursorObject *curs = &self->cur;
+    PyObject *tval, *res = NULL;
+    double seconds;
+
+    EXC_IF_CURS_CLOSED(curs);
+
+    seconds = self->last_io.tv_sec + self->last_io.tv_usec / 1.0e6;
+
+    tval = Py_BuildValue("(d)", seconds);
+    if (tval) {
+        res = PyDateTime_FromTimestamp(tval);
+        Py_DECREF(tval);
+    }
+    return res;
+}
+
+/* object method list */
+
+static struct PyMethodDef replicationCursorObject_methods[] = {
+    {"start_replication_expert", (PyCFunction)psyco_repl_curs_start_replication_expert,
+     METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_start_replication_expert_doc},
+    {"consume_stream", (PyCFunction)psyco_repl_curs_consume_stream,
+     METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_consume_stream_doc},
+    {"read_message", (PyCFunction)psyco_repl_curs_read_message,
+     METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_read_message_doc},
+    {"send_feedback", (PyCFunction)psyco_repl_curs_send_feedback,
+     METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_send_feedback_doc},
+    {"flush_feedback", (PyCFunction)psyco_repl_curs_flush_feedback,
+     METH_VARARGS|METH_KEYWORDS, psyco_repl_curs_flush_feedback_doc},
+    {NULL}
+};
+
+/* object calculated member list */
+
+static struct PyGetSetDef replicationCursorObject_getsets[] = {
+    { "io_timestamp",
+      (getter)psyco_repl_curs_get_io_timestamp, NULL,
+      psyco_repl_curs_io_timestamp_doc, NULL },
+    {NULL}
+};
+
+static int
+replicationCursor_setup(replicationCursorObject* self)
+{
+    self->started = 0;
+    self->consuming = 0;
+
+    self->write_lsn = InvalidXLogRecPtr;
+    self->flush_lsn = InvalidXLogRecPtr;
+    self->apply_lsn = InvalidXLogRecPtr;
+    self->feedback_pending = 0;
+
+    return 0;
+}
+
+static int
+replicationCursor_init(PyObject *obj, PyObject *args, PyObject *kwargs)
+{
+    replicationCursor_setup((replicationCursorObject *)obj);
+    return cursor_init(obj, args, kwargs);
+}
+
+static PyObject *
+replicationCursor_repr(replicationCursorObject *self)
+{
+    return PyString_FromFormat(
+        "<ReplicationCursor object at %p; closed: %d>", self, self->cur.closed);
+}
+
+
+/* object type */
+
+#define replicationCursorType_doc \
+"A database replication cursor."
+
+PyTypeObject replicationCursorType = {
+    PyVarObject_HEAD_INIT(NULL, 0)
+    "psycopg2.extensions.ReplicationCursor",
+    sizeof(replicationCursorObject), 0,
+    0,          /*tp_dealloc*/
+    0,          /*tp_print*/
+    0,          /*tp_getattr*/
+    0,          /*tp_setattr*/
+    0,          /*tp_compare*/
+    (reprfunc)replicationCursor_repr, /*tp_repr*/
+    0,          /*tp_as_number*/
+    0,          /*tp_as_sequence*/
+    0,          /*tp_as_mapping*/
+    0,          /*tp_hash*/
+    0,          /*tp_call*/
+    (reprfunc)replicationCursor_repr, /*tp_str*/
+    0,          /*tp_getattro*/
+    0,          /*tp_setattro*/
+    0,          /*tp_as_buffer*/
+    Py_TPFLAGS_DEFAULT | Py_TPFLAGS_BASETYPE | Py_TPFLAGS_HAVE_ITER |
+      Py_TPFLAGS_HAVE_GC, /*tp_flags*/
+    replicationCursorType_doc, /*tp_doc*/
+    0,          /*tp_traverse*/
+    0,          /*tp_clear*/
+    0,          /*tp_richcompare*/
+    0,          /*tp_weaklistoffset*/
+    0,          /*tp_iter*/
+    0,          /*tp_iternext*/
+    replicationCursorObject_methods, /*tp_methods*/
+    0,          /*tp_members*/
+    replicationCursorObject_getsets, /*tp_getset*/
+    &cursorType, /*tp_base*/
+    0,          /*tp_dict*/
+    0,          /*tp_descr_get*/
+    0,          /*tp_descr_set*/
+    0,          /*tp_dictoffset*/
+    replicationCursor_init, /*tp_init*/
+    0,          /*tp_alloc*/
+    0,          /*tp_new*/
+};
diff --git a/psycopg/replication_message_type.c b/psycopg/replication_message_type.c
index 61833931..d4b0457b 100644
--- a/psycopg/replication_message_type.c
+++ b/psycopg/replication_message_type.c
@@ -49,7 +49,7 @@ static PyObject *
 replmsg_repr(replicationMessageObject *self)
 {
     return PyString_FromFormat(
-        "<replicationMessage object at %p; data_size: %d; data_start: "XLOGFMTSTR"; wal_end: "XLOGFMTSTR"; send_time: %lld>",
+        "<ReplicationMessage object at %p; data_size: %d; data_start: "XLOGFMTSTR"; wal_end: "XLOGFMTSTR"; send_time: %lld>",
         self, self->data_size, XLOGFMTARGS(self->data_start), XLOGFMTARGS(self->wal_end),
         self->send_time);
 }
diff --git a/psycopg2.cproj b/psycopg2.cproj
index 386287c1..682b69d0 100644
--- a/psycopg2.cproj
+++ b/psycopg2.cproj
@@ -92,6 +92,7 @@
     <None Include="psycopg\pqpath.h" />
     <None Include="psycopg\psycopg.h" />
     <None Include="psycopg\python.h" />
+    <None Include="psycopg\replication_cursor.h" />
     <None Include="psycopg\replication_message.h" />
     <None Include="psycopg\typecast.h" />
     <None Include="psycopg\typecast_binary.h" />
@@ -225,6 +226,7 @@
     <Compile Include="psycopg\microprotocols_proto.c" />
     <Compile Include="psycopg\pqpath.c" />
     <Compile Include="psycopg\psycopgmodule.c" />
+    <Compile Include="psycopg\replication_cursor_type.c" />
     <Compile Include="psycopg\replication_message_type.c" />
     <Compile Include="psycopg\typecast.c" />
     <Compile Include="psycopg\typecast_array.c" />
diff --git a/setup.py b/setup.py
index 339d7f2a..18c47b7c 100644
--- a/setup.py
+++ b/setup.py
@@ -466,7 +466,7 @@ sources = [
 
     'connection_int.c', 'connection_type.c',
     'cursor_int.c', 'cursor_type.c',
-    'replication_message_type.c',
+    'replication_cursor_type.c', 'replication_message_type.c',
     'diagnostics_type.c', 'error_type.c',
     'lobject_int.c', 'lobject_type.c',
     'notify_type.c', 'xid_type.c',
@@ -482,7 +482,7 @@ depends = [
     # headers
     'config.h', 'pgtypes.h', 'psycopg.h', 'python.h', 'connection.h',
     'cursor.h', 'diagnostics.h', 'error.h', 'green.h', 'lobject.h',
-    'replication_message.h',
+    'replication_cursor.h', 'replication_message.h',
     'notify.h', 'pqpath.h', 'xid.h',
     'libpq_support.h', 'win32_support.h',
 
diff --git a/tests/test_replication.py b/tests/test_replication.py
index 5c029c88..2dbb0086 100644
--- a/tests/test_replication.py
+++ b/tests/test_replication.py
@@ -47,12 +47,16 @@ class ReplicationTestCase(ConnectingTestCase):
         # first close all connections, as they might keep the slot(s) active
         super(ReplicationTestCase, self).tearDown()
 
+        import time
+        time.sleep(0.025)  # sometimes the slot is still active, wait a little
+
         if self._slots:
-            kill_conn = self.repl_connect(connection_factory=PhysicalReplicationConnection)
+            kill_conn = self.connect()
             if kill_conn:
                 kill_cur = kill_conn.cursor()
                 for slot in self._slots:
-                    kill_cur.drop_replication_slot(slot)
+                    kill_cur.execute("SELECT pg_drop_replication_slot(%s)", (slot,))
+                kill_conn.commit()
                 kill_conn.close()
 
     def create_replication_slot(self, cur, slot_name=testconfig.repl_slot, **kwargs):
@@ -127,7 +131,7 @@ class ReplicationTest(ReplicationTestCase):
         cur.start_replication(self.slot)
         def consume(msg):
             raise StopReplication()
-        self.assertRaises(StopReplication, cur.consume_replication_stream, consume)
+        self.assertRaises(StopReplication, cur.consume_stream, consume)
 
 
 class AsyncReplicationTest(ReplicationTestCase):
@@ -148,14 +152,22 @@ class AsyncReplicationTest(ReplicationTestCase):
 
         self.msg_count = 0
         def consume(msg):
+            # just check the methods
+            log = "%s: %s" % (cur.io_timestamp, repr(msg))
+
             self.msg_count += 1
             if self.msg_count > 3:
+                cur.flush_feedback(reply=True)
                 raise StopReplication()
 
+            cur.send_feedback(flush_lsn=msg.data_start)
+
+        self.assertRaises(psycopg2.ProgrammingError, cur.consume_stream, consume)
+
         def process_stream():
             from select import select
             while True:
-                msg = cur.read_replication_message()
+                msg = cur.read_message()
                 if msg:
                     consume(msg)
                 else: