mirror of
https://github.com/psycopg/psycopg2.git
synced 2024-11-22 17:06:33 +03:00
Merge branch 'fix-554'
This commit is contained in:
commit
d2e86db8fb
3
NEWS
3
NEWS
|
@ -23,6 +23,9 @@ What's new in psycopg 2.7.2
|
|||
- Maybe fixed random import error on Python 3.6 in multiprocess
|
||||
environment (:ticket:`#550`).
|
||||
- Fixed random `!SystemError` upon receiving abort signal (:ticket:`#551`).
|
||||
- Accept `~psycopg2.sql.Composable` objects in
|
||||
`~psycopg2.extras.ReplicationCursor.start_replication_expert()`
|
||||
(:ticket:`554`).
|
||||
- Parse intervals returned as microseconds from Redshift (:ticket:`#558`).
|
||||
- Added `~psycopg2.extras.Json` `!prepare()` method to consider connection
|
||||
params when adapting (:ticket:`#562`).
|
||||
|
|
|
@ -356,8 +356,13 @@ The individual messages in the replication stream are represented by
|
|||
.. method:: start_replication_expert(command, decode=False)
|
||||
|
||||
Start replication on the connection using provided
|
||||
|START_REPLICATION|_ command. See `start_replication()` for
|
||||
description of *decode* parameter.
|
||||
|START_REPLICATION|_ command.
|
||||
|
||||
:param command: The full replication command. It can be a string or a
|
||||
`~psycopg2.sql.Composable` instance for dynamic generation.
|
||||
:param decode: a flag indicating that unicode conversion should be
|
||||
performed on messages received from the server.
|
||||
|
||||
|
||||
.. method:: consume_stream(consume, keepalive_interval=10)
|
||||
|
||||
|
|
|
@ -95,6 +95,7 @@ BORROWED HIDDEN PyObject *curs_get_cast(cursorObject *self, PyObject *oid);
|
|||
HIDDEN void curs_reset(cursorObject *self);
|
||||
HIDDEN int psyco_curs_withhold_set(cursorObject *self, PyObject *pyvalue);
|
||||
HIDDEN int psyco_curs_scrollable_set(cursorObject *self, PyObject *pyvalue);
|
||||
HIDDEN PyObject *psyco_curs_validate_sql_basic(cursorObject *self, PyObject *sql);
|
||||
|
||||
/* exception-raising macros */
|
||||
#define EXC_IF_CURS_CLOSED(self) \
|
||||
|
|
|
@ -80,3 +80,83 @@ curs_reset(cursorObject *self)
|
|||
Py_CLEAR(self->description);
|
||||
Py_CLEAR(self->casts);
|
||||
}
|
||||
|
||||
|
||||
/* Return 1 if `obj` is a `psycopg2.sql.Composable` instance, else 0
|
||||
* Set an exception and return -1 in case of error.
|
||||
*/
|
||||
RAISES_NEG static int
|
||||
_curs_is_composible(PyObject *obj)
|
||||
{
|
||||
int rv = -1;
|
||||
PyObject *m = NULL;
|
||||
PyObject *comp = NULL;
|
||||
|
||||
if (!(m = PyImport_ImportModule("psycopg2.sql"))) { goto exit; }
|
||||
if (!(comp = PyObject_GetAttrString(m, "Composable"))) { goto exit; }
|
||||
rv = PyObject_IsInstance(obj, comp);
|
||||
|
||||
exit:
|
||||
Py_XDECREF(comp);
|
||||
Py_XDECREF(m);
|
||||
return rv;
|
||||
|
||||
}
|
||||
|
||||
/* Performs very basic validation on an incoming SQL string.
|
||||
* Returns a new reference to a str instance on success; NULL on failure,
|
||||
* after having set an exception.
|
||||
*/
|
||||
PyObject *
|
||||
psyco_curs_validate_sql_basic(cursorObject *self, PyObject *sql)
|
||||
{
|
||||
PyObject *rv = NULL;
|
||||
PyObject *comp = NULL;
|
||||
int iscomp;
|
||||
|
||||
if (!sql || !PyObject_IsTrue(sql)) {
|
||||
psyco_set_error(ProgrammingError, self,
|
||||
"can't execute an empty query");
|
||||
goto exit;
|
||||
}
|
||||
|
||||
if (Bytes_Check(sql)) {
|
||||
/* Necessary for ref-count symmetry with the unicode case: */
|
||||
Py_INCREF(sql);
|
||||
rv = sql;
|
||||
}
|
||||
else if (PyUnicode_Check(sql)) {
|
||||
if (!(rv = conn_encode(self->conn, sql))) { goto exit; }
|
||||
}
|
||||
else if (0 != (iscomp = _curs_is_composible(sql))) {
|
||||
if (iscomp < 0) { goto exit; }
|
||||
if (!(comp = PyObject_CallMethod(sql, "as_string", "O", self->conn))) {
|
||||
goto exit;
|
||||
}
|
||||
|
||||
if (Bytes_Check(comp)) {
|
||||
rv = comp;
|
||||
comp = NULL;
|
||||
}
|
||||
else if (PyUnicode_Check(comp)) {
|
||||
if (!(rv = conn_encode(self->conn, comp))) { goto exit; }
|
||||
}
|
||||
else {
|
||||
PyErr_Format(PyExc_TypeError,
|
||||
"as_string() should return a string: got %s instead",
|
||||
Py_TYPE(comp)->tp_name);
|
||||
goto exit;
|
||||
}
|
||||
}
|
||||
else {
|
||||
/* the is not unicode or string, raise an error */
|
||||
PyErr_Format(PyExc_TypeError,
|
||||
"argument 1 must be a string or unicode object: got %s instead",
|
||||
Py_TYPE(sql)->tp_name);
|
||||
goto exit;
|
||||
}
|
||||
|
||||
exit:
|
||||
Py_XDECREF(comp);
|
||||
return rv;
|
||||
}
|
||||
|
|
|
@ -267,85 +267,6 @@ _mogrify(PyObject *var, PyObject *fmt, cursorObject *curs, PyObject **new)
|
|||
return 0;
|
||||
}
|
||||
|
||||
/* Return 1 if `obj` is a `psycopg2.sql.Composable` instance, else 0
|
||||
* Set an exception and return -1 in case of error.
|
||||
*/
|
||||
RAISES_NEG static int
|
||||
_curs_is_composible(PyObject *obj)
|
||||
{
|
||||
int rv = -1;
|
||||
PyObject *m = NULL;
|
||||
PyObject *comp = NULL;
|
||||
|
||||
if (!(m = PyImport_ImportModule("psycopg2.sql"))) { goto exit; }
|
||||
if (!(comp = PyObject_GetAttrString(m, "Composable"))) { goto exit; }
|
||||
rv = PyObject_IsInstance(obj, comp);
|
||||
|
||||
exit:
|
||||
Py_XDECREF(comp);
|
||||
Py_XDECREF(m);
|
||||
return rv;
|
||||
|
||||
}
|
||||
|
||||
static PyObject *_psyco_curs_validate_sql_basic(
|
||||
cursorObject *self, PyObject *sql
|
||||
)
|
||||
{
|
||||
PyObject *rv = NULL;
|
||||
PyObject *comp = NULL;
|
||||
int iscomp;
|
||||
|
||||
/* Performs very basic validation on an incoming SQL string.
|
||||
Returns a new reference to a str instance on success; NULL on failure,
|
||||
after having set an exception. */
|
||||
|
||||
if (!sql || !PyObject_IsTrue(sql)) {
|
||||
psyco_set_error(ProgrammingError, self,
|
||||
"can't execute an empty query");
|
||||
goto exit;
|
||||
}
|
||||
|
||||
if (Bytes_Check(sql)) {
|
||||
/* Necessary for ref-count symmetry with the unicode case: */
|
||||
Py_INCREF(sql);
|
||||
rv = sql;
|
||||
}
|
||||
else if (PyUnicode_Check(sql)) {
|
||||
if (!(rv = conn_encode(self->conn, sql))) { goto exit; }
|
||||
}
|
||||
else if (0 != (iscomp = _curs_is_composible(sql))) {
|
||||
if (iscomp < 0) { goto exit; }
|
||||
if (!(comp = PyObject_CallMethod(sql, "as_string", "O", self->conn))) {
|
||||
goto exit;
|
||||
}
|
||||
|
||||
if (Bytes_Check(comp)) {
|
||||
rv = comp;
|
||||
comp = NULL;
|
||||
}
|
||||
else if (PyUnicode_Check(comp)) {
|
||||
if (!(rv = conn_encode(self->conn, comp))) { goto exit; }
|
||||
}
|
||||
else {
|
||||
PyErr_Format(PyExc_TypeError,
|
||||
"as_string() should return a string: got %s instead",
|
||||
Py_TYPE(comp)->tp_name);
|
||||
goto exit;
|
||||
}
|
||||
}
|
||||
else {
|
||||
/* the is not unicode or string, raise an error */
|
||||
PyErr_Format(PyExc_TypeError,
|
||||
"argument 1 must be a string or unicode object: got %s instead",
|
||||
Py_TYPE(sql)->tp_name);
|
||||
goto exit;
|
||||
}
|
||||
|
||||
exit:
|
||||
Py_XDECREF(comp);
|
||||
return rv;
|
||||
}
|
||||
|
||||
/* Merge together a query string and its arguments.
|
||||
*
|
||||
|
@ -425,7 +346,7 @@ _psyco_curs_execute(cursorObject *self,
|
|||
PyObject *fquery, *cvt = NULL;
|
||||
const char *scroll;
|
||||
|
||||
operation = _psyco_curs_validate_sql_basic(self, operation);
|
||||
operation = psyco_curs_validate_sql_basic(self, operation);
|
||||
|
||||
/* Any failure from here forward should 'goto fail' rather than 'return 0'
|
||||
directly. */
|
||||
|
@ -622,7 +543,7 @@ _psyco_curs_mogrify(cursorObject *self,
|
|||
{
|
||||
PyObject *fquery = NULL, *cvt = NULL;
|
||||
|
||||
operation = _psyco_curs_validate_sql_basic(self, operation);
|
||||
operation = psyco_curs_validate_sql_basic(self, operation);
|
||||
if (operation == NULL) { goto cleanup; }
|
||||
|
||||
Dprintf("psyco_curs_mogrify: starting mogrify");
|
||||
|
@ -1665,7 +1586,7 @@ psyco_curs_copy_expert(cursorObject *self, PyObject *args, PyObject *kwargs)
|
|||
EXC_IF_GREEN(copy_expert);
|
||||
EXC_IF_TPC_PREPARED(self->conn, copy_expert);
|
||||
|
||||
sql = _psyco_curs_validate_sql_basic(self, sql);
|
||||
sql = psyco_curs_validate_sql_basic(self, sql);
|
||||
|
||||
/* Any failure from here forward should 'goto exit' rather than
|
||||
'return NULL' directly. */
|
||||
|
|
|
@ -48,11 +48,11 @@ psyco_repl_curs_start_replication_expert(replicationCursorObject *self,
|
|||
cursorObject *curs = &self->cur;
|
||||
connectionObject *conn = self->cur.conn;
|
||||
PyObject *res = NULL;
|
||||
char *command;
|
||||
PyObject *command = NULL;
|
||||
long int decode = 0;
|
||||
static char *kwlist[] = {"command", "decode", NULL};
|
||||
|
||||
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s|l", kwlist, &command, &decode)) {
|
||||
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|l", kwlist, &command, &decode)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -60,9 +60,16 @@ psyco_repl_curs_start_replication_expert(replicationCursorObject *self,
|
|||
EXC_IF_GREEN(start_replication_expert);
|
||||
EXC_IF_TPC_PREPARED(conn, start_replication_expert);
|
||||
|
||||
Dprintf("psyco_repl_curs_start_replication_expert: '%s'; decode: %ld", command, decode);
|
||||
if (!(command = psyco_curs_validate_sql_basic(
|
||||
(cursorObject *)self, command))) {
|
||||
goto exit;
|
||||
}
|
||||
|
||||
if (pq_execute(curs, command, conn->async, 1 /* no_result */, 1 /* no_begin */) >= 0) {
|
||||
Dprintf("psyco_repl_curs_start_replication_expert: '%s'; decode: %ld",
|
||||
Bytes_AS_STRING(command), decode);
|
||||
|
||||
if (pq_execute(curs, Bytes_AS_STRING(command), conn->async,
|
||||
1 /* no_result */, 1 /* no_begin */) >= 0) {
|
||||
res = Py_None;
|
||||
Py_INCREF(res);
|
||||
|
||||
|
@ -70,6 +77,8 @@ psyco_repl_curs_start_replication_expert(replicationCursorObject *self,
|
|||
gettimeofday(&self->last_io, NULL);
|
||||
}
|
||||
|
||||
exit:
|
||||
Py_XDECREF(command);
|
||||
return res;
|
||||
}
|
||||
|
||||
|
|
|
@ -137,6 +137,20 @@ class ReplicationTest(ReplicationTestCase):
|
|||
self.create_replication_slot(cur)
|
||||
cur.start_replication(self.slot)
|
||||
|
||||
@skip_before_postgres(9, 4) # slots require 9.4
|
||||
@skip_repl_if_green
|
||||
def test_start_replication_expert_sql(self):
|
||||
from psycopg2 import sql
|
||||
conn = self.repl_connect(connection_factory=LogicalReplicationConnection)
|
||||
if conn is None:
|
||||
return
|
||||
cur = conn.cursor()
|
||||
|
||||
self.create_replication_slot(cur, output_plugin='test_decoding')
|
||||
cur.start_replication_expert(
|
||||
sql.SQL("START_REPLICATION SLOT {slot} LOGICAL 0/00000000").format(
|
||||
slot=sql.Identifier(self.slot)))
|
||||
|
||||
@skip_before_postgres(9, 4) # slots require 9.4
|
||||
@skip_repl_if_green
|
||||
def test_start_and_recover_from_error(self):
|
||||
|
|
Loading…
Reference in New Issue
Block a user