diff --git a/NEWS b/NEWS index a914a7ed..81d5fe25 100644 --- a/NEWS +++ b/NEWS @@ -23,6 +23,9 @@ What's new in psycopg 2.7.2 - Maybe fixed random import error on Python 3.6 in multiprocess environment (:ticket:`#550`). - Fixed random `!SystemError` upon receiving abort signal (:ticket:`#551`). +- Accept `~psycopg2.sql.Composable` objects in + `~psycopg2.extras.ReplicationCursor.start_replication_expert()` + (:ticket:`554`). - Parse intervals returned as microseconds from Redshift (:ticket:`#558`). - Added `~psycopg2.extras.Json` `!prepare()` method to consider connection params when adapting (:ticket:`#562`). diff --git a/doc/src/extras.rst b/doc/src/extras.rst index beb05110..36118e7e 100644 --- a/doc/src/extras.rst +++ b/doc/src/extras.rst @@ -356,8 +356,13 @@ The individual messages in the replication stream are represented by .. method:: start_replication_expert(command, decode=False) Start replication on the connection using provided - |START_REPLICATION|_ command. See `start_replication()` for - description of *decode* parameter. + |START_REPLICATION|_ command. + + :param command: The full replication command. It can be a string or a + `~psycopg2.sql.Composable` instance for dynamic generation. + :param decode: a flag indicating that unicode conversion should be + performed on messages received from the server. + .. method:: consume_stream(consume, keepalive_interval=10) diff --git a/psycopg/cursor.h b/psycopg/cursor.h index 3c94fe3d..d89bf219 100644 --- a/psycopg/cursor.h +++ b/psycopg/cursor.h @@ -95,6 +95,7 @@ BORROWED HIDDEN PyObject *curs_get_cast(cursorObject *self, PyObject *oid); HIDDEN void curs_reset(cursorObject *self); HIDDEN int psyco_curs_withhold_set(cursorObject *self, PyObject *pyvalue); HIDDEN int psyco_curs_scrollable_set(cursorObject *self, PyObject *pyvalue); +HIDDEN PyObject *psyco_curs_validate_sql_basic(cursorObject *self, PyObject *sql); /* exception-raising macros */ #define EXC_IF_CURS_CLOSED(self) \ diff --git a/psycopg/cursor_int.c b/psycopg/cursor_int.c index dd4c0d7d..bacef776 100644 --- a/psycopg/cursor_int.c +++ b/psycopg/cursor_int.c @@ -80,3 +80,83 @@ curs_reset(cursorObject *self) Py_CLEAR(self->description); Py_CLEAR(self->casts); } + + +/* Return 1 if `obj` is a `psycopg2.sql.Composable` instance, else 0 + * Set an exception and return -1 in case of error. + */ +RAISES_NEG static int +_curs_is_composible(PyObject *obj) +{ + int rv = -1; + PyObject *m = NULL; + PyObject *comp = NULL; + + if (!(m = PyImport_ImportModule("psycopg2.sql"))) { goto exit; } + if (!(comp = PyObject_GetAttrString(m, "Composable"))) { goto exit; } + rv = PyObject_IsInstance(obj, comp); + +exit: + Py_XDECREF(comp); + Py_XDECREF(m); + return rv; + +} + +/* Performs very basic validation on an incoming SQL string. + * Returns a new reference to a str instance on success; NULL on failure, + * after having set an exception. + */ +PyObject * +psyco_curs_validate_sql_basic(cursorObject *self, PyObject *sql) +{ + PyObject *rv = NULL; + PyObject *comp = NULL; + int iscomp; + + if (!sql || !PyObject_IsTrue(sql)) { + psyco_set_error(ProgrammingError, self, + "can't execute an empty query"); + goto exit; + } + + if (Bytes_Check(sql)) { + /* Necessary for ref-count symmetry with the unicode case: */ + Py_INCREF(sql); + rv = sql; + } + else if (PyUnicode_Check(sql)) { + if (!(rv = conn_encode(self->conn, sql))) { goto exit; } + } + else if (0 != (iscomp = _curs_is_composible(sql))) { + if (iscomp < 0) { goto exit; } + if (!(comp = PyObject_CallMethod(sql, "as_string", "O", self->conn))) { + goto exit; + } + + if (Bytes_Check(comp)) { + rv = comp; + comp = NULL; + } + else if (PyUnicode_Check(comp)) { + if (!(rv = conn_encode(self->conn, comp))) { goto exit; } + } + else { + PyErr_Format(PyExc_TypeError, + "as_string() should return a string: got %s instead", + Py_TYPE(comp)->tp_name); + goto exit; + } + } + else { + /* the is not unicode or string, raise an error */ + PyErr_Format(PyExc_TypeError, + "argument 1 must be a string or unicode object: got %s instead", + Py_TYPE(sql)->tp_name); + goto exit; + } + +exit: + Py_XDECREF(comp); + return rv; +} diff --git a/psycopg/cursor_type.c b/psycopg/cursor_type.c index 5031033c..a70e9d34 100644 --- a/psycopg/cursor_type.c +++ b/psycopg/cursor_type.c @@ -267,85 +267,6 @@ _mogrify(PyObject *var, PyObject *fmt, cursorObject *curs, PyObject **new) return 0; } -/* Return 1 if `obj` is a `psycopg2.sql.Composable` instance, else 0 - * Set an exception and return -1 in case of error. - */ -RAISES_NEG static int -_curs_is_composible(PyObject *obj) -{ - int rv = -1; - PyObject *m = NULL; - PyObject *comp = NULL; - - if (!(m = PyImport_ImportModule("psycopg2.sql"))) { goto exit; } - if (!(comp = PyObject_GetAttrString(m, "Composable"))) { goto exit; } - rv = PyObject_IsInstance(obj, comp); - -exit: - Py_XDECREF(comp); - Py_XDECREF(m); - return rv; - -} - -static PyObject *_psyco_curs_validate_sql_basic( - cursorObject *self, PyObject *sql - ) -{ - PyObject *rv = NULL; - PyObject *comp = NULL; - int iscomp; - - /* Performs very basic validation on an incoming SQL string. - Returns a new reference to a str instance on success; NULL on failure, - after having set an exception. */ - - if (!sql || !PyObject_IsTrue(sql)) { - psyco_set_error(ProgrammingError, self, - "can't execute an empty query"); - goto exit; - } - - if (Bytes_Check(sql)) { - /* Necessary for ref-count symmetry with the unicode case: */ - Py_INCREF(sql); - rv = sql; - } - else if (PyUnicode_Check(sql)) { - if (!(rv = conn_encode(self->conn, sql))) { goto exit; } - } - else if (0 != (iscomp = _curs_is_composible(sql))) { - if (iscomp < 0) { goto exit; } - if (!(comp = PyObject_CallMethod(sql, "as_string", "O", self->conn))) { - goto exit; - } - - if (Bytes_Check(comp)) { - rv = comp; - comp = NULL; - } - else if (PyUnicode_Check(comp)) { - if (!(rv = conn_encode(self->conn, comp))) { goto exit; } - } - else { - PyErr_Format(PyExc_TypeError, - "as_string() should return a string: got %s instead", - Py_TYPE(comp)->tp_name); - goto exit; - } - } - else { - /* the is not unicode or string, raise an error */ - PyErr_Format(PyExc_TypeError, - "argument 1 must be a string or unicode object: got %s instead", - Py_TYPE(sql)->tp_name); - goto exit; - } - -exit: - Py_XDECREF(comp); - return rv; -} /* Merge together a query string and its arguments. * @@ -425,7 +346,7 @@ _psyco_curs_execute(cursorObject *self, PyObject *fquery, *cvt = NULL; const char *scroll; - operation = _psyco_curs_validate_sql_basic(self, operation); + operation = psyco_curs_validate_sql_basic(self, operation); /* Any failure from here forward should 'goto fail' rather than 'return 0' directly. */ @@ -622,7 +543,7 @@ _psyco_curs_mogrify(cursorObject *self, { PyObject *fquery = NULL, *cvt = NULL; - operation = _psyco_curs_validate_sql_basic(self, operation); + operation = psyco_curs_validate_sql_basic(self, operation); if (operation == NULL) { goto cleanup; } Dprintf("psyco_curs_mogrify: starting mogrify"); @@ -1665,7 +1586,7 @@ psyco_curs_copy_expert(cursorObject *self, PyObject *args, PyObject *kwargs) EXC_IF_GREEN(copy_expert); EXC_IF_TPC_PREPARED(self->conn, copy_expert); - sql = _psyco_curs_validate_sql_basic(self, sql); + sql = psyco_curs_validate_sql_basic(self, sql); /* Any failure from here forward should 'goto exit' rather than 'return NULL' directly. */ diff --git a/psycopg/replication_cursor_type.c b/psycopg/replication_cursor_type.c index d00b31ce..0c0578bf 100644 --- a/psycopg/replication_cursor_type.c +++ b/psycopg/replication_cursor_type.c @@ -48,11 +48,11 @@ psyco_repl_curs_start_replication_expert(replicationCursorObject *self, cursorObject *curs = &self->cur; connectionObject *conn = self->cur.conn; PyObject *res = NULL; - char *command; + PyObject *command = NULL; long int decode = 0; static char *kwlist[] = {"command", "decode", NULL}; - if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|l", kwlist, &command, &decode)) { + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|l", kwlist, &command, &decode)) { return NULL; } @@ -60,9 +60,16 @@ psyco_repl_curs_start_replication_expert(replicationCursorObject *self, EXC_IF_GREEN(start_replication_expert); EXC_IF_TPC_PREPARED(conn, start_replication_expert); - Dprintf("psyco_repl_curs_start_replication_expert: '%s'; decode: %ld", command, decode); + if (!(command = psyco_curs_validate_sql_basic( + (cursorObject *)self, command))) { + goto exit; + } - if (pq_execute(curs, command, conn->async, 1 /* no_result */, 1 /* no_begin */) >= 0) { + Dprintf("psyco_repl_curs_start_replication_expert: '%s'; decode: %ld", + Bytes_AS_STRING(command), decode); + + if (pq_execute(curs, Bytes_AS_STRING(command), conn->async, + 1 /* no_result */, 1 /* no_begin */) >= 0) { res = Py_None; Py_INCREF(res); @@ -70,6 +77,8 @@ psyco_repl_curs_start_replication_expert(replicationCursorObject *self, gettimeofday(&self->last_io, NULL); } +exit: + Py_XDECREF(command); return res; } diff --git a/tests/test_replication.py b/tests/test_replication.py index 182bff21..d03c6566 100755 --- a/tests/test_replication.py +++ b/tests/test_replication.py @@ -137,6 +137,20 @@ class ReplicationTest(ReplicationTestCase): self.create_replication_slot(cur) cur.start_replication(self.slot) + @skip_before_postgres(9, 4) # slots require 9.4 + @skip_repl_if_green + def test_start_replication_expert_sql(self): + from psycopg2 import sql + conn = self.repl_connect(connection_factory=LogicalReplicationConnection) + if conn is None: + return + cur = conn.cursor() + + self.create_replication_slot(cur, output_plugin='test_decoding') + cur.start_replication_expert( + sql.SQL("START_REPLICATION SLOT {slot} LOGICAL 0/00000000").format( + slot=sql.Identifier(self.slot))) + @skip_before_postgres(9, 4) # slots require 9.4 @skip_repl_if_green def test_start_and_recover_from_error(self):