Use python-defined make_dsn() for ReplicationConnection class

This commit is contained in:
Oleksandr Shulgin 2016-03-08 15:44:29 +01:00
parent cb7032554e
commit da6e061ee8
8 changed files with 73 additions and 277 deletions

View File

@ -56,7 +56,7 @@ from psycopg2._psycopg import Error, Warning, DataError, DatabaseError, Programm
from psycopg2._psycopg import IntegrityError, InterfaceError, InternalError from psycopg2._psycopg import IntegrityError, InterfaceError, InternalError
from psycopg2._psycopg import NotSupportedError, OperationalError from psycopg2._psycopg import NotSupportedError, OperationalError
from psycopg2._psycopg import _connect, parse_args, apilevel, threadsafety, paramstyle from psycopg2._psycopg import _connect, apilevel, threadsafety, paramstyle
from psycopg2._psycopg import __version__, __libpq_version__ from psycopg2._psycopg import __version__, __libpq_version__
from psycopg2 import tz from psycopg2 import tz

View File

@ -441,25 +441,14 @@ class MinTimeLoggingCursor(LoggingCursor):
return LoggingCursor.callproc(self, procname, vars) return LoggingCursor.callproc(self, procname, vars)
class ReplicationConnectionBase(_replicationConnection): class LogicalReplicationConnection(_replicationConnection):
"""
Base class for Logical and Physical replication connection
classes. Uses `ReplicationCursor` automatically.
"""
def __init__(self, *args, **kwargs):
super(ReplicationConnectionBase, self).__init__(*args, **kwargs)
self.cursor_factory = ReplicationCursor
class LogicalReplicationConnection(ReplicationConnectionBase):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
kwargs['replication_type'] = REPLICATION_LOGICAL kwargs['replication_type'] = REPLICATION_LOGICAL
super(LogicalReplicationConnection, self).__init__(*args, **kwargs) super(LogicalReplicationConnection, self).__init__(*args, **kwargs)
class PhysicalReplicationConnection(ReplicationConnectionBase): class PhysicalReplicationConnection(_replicationConnection):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
kwargs['replication_type'] = REPLICATION_PHYSICAL kwargs['replication_type'] = REPLICATION_PHYSICAL

View File

@ -72,8 +72,6 @@ struct cursorObject {
#define DEFAULT_COPYSIZE 16384 #define DEFAULT_COPYSIZE 16384
#define DEFAULT_COPYBUFF 8192 #define DEFAULT_COPYBUFF 8192
/* replication cursor attrs */
PyObject *tuple_factory; /* factory for result tuples */ PyObject *tuple_factory; /* factory for result tuples */
PyObject *tzinfo_factory; /* factory for tzinfo objects */ PyObject *tzinfo_factory; /* factory for tzinfo objects */

View File

@ -120,11 +120,6 @@ typedef struct connectionObject connectionObject;
typedef struct replicationMessageObject replicationMessageObject; typedef struct replicationMessageObject replicationMessageObject;
/* some utility functions */ /* some utility functions */
HIDDEN PyObject *parse_arg(int pos, char *name, PyObject *defval, PyObject *args, PyObject *kwargs);
HIDDEN PyObject *psyco_parse_args(PyObject *self, PyObject *args, PyObject *kwargs);
HIDDEN PyObject *psyco_parse_dsn(PyObject *self, PyObject *args, PyObject *kwargs);
HIDDEN PyObject *psyco_make_dsn(PyObject *self, PyObject *args, PyObject *kwargs);
RAISES HIDDEN PyObject *psyco_set_error(PyObject *exc, cursorObject *curs, const char *msg); RAISES HIDDEN PyObject *psyco_set_error(PyObject *exc, cursorObject *curs, const char *msg);
HIDDEN char *psycopg_escape_string(connectionObject *conn, HIDDEN char *psycopg_escape_string(connectionObject *conn,

View File

@ -74,103 +74,23 @@ HIDDEN PyObject *psyco_null = NULL;
HIDDEN PyObject *psyco_DescriptionType = NULL; HIDDEN PyObject *psyco_DescriptionType = NULL;
/* finds a keyword or positional arg (pops it from kwargs if found there) */
PyObject *
parse_arg(int pos, char *name, PyObject *defval, PyObject *args, PyObject *kwargs)
{
Py_ssize_t nargs = PyTuple_GET_SIZE(args);
PyObject *val = NULL;
if (kwargs && PyMapping_HasKeyString(kwargs, name)) {
val = PyMapping_GetItemString(kwargs, name);
Py_XINCREF(val);
PyMapping_DelItemString(kwargs, name); /* pop from the kwargs dict! */
}
if (nargs > pos) {
if (!val) {
val = PyTuple_GET_ITEM(args, pos);
Py_XINCREF(val);
} else {
PyErr_Format(PyExc_TypeError,
"parse_args() got multiple values for keyword argument '%s'", name);
return NULL;
}
}
if (!val) {
val = defval;
Py_XINCREF(val);
}
return val;
}
#define psyco_parse_args_doc \
"parse_args(...) -- parse connection parameters.\n\n" \
"Return a tuple of (dsn, connection_factory, async)"
PyObject *
psyco_parse_args(PyObject *self, PyObject *args, PyObject *kwargs)
{
Py_ssize_t nargs = PyTuple_GET_SIZE(args);
PyObject *dsn = NULL;
PyObject *factory = NULL;
PyObject *async = NULL;
PyObject *res = NULL;
if (nargs > 3) {
PyErr_Format(PyExc_TypeError,
"parse_args() takes at most 3 arguments (%d given)", (int)nargs);
goto exit;
}
/* parse and remove all keywords we know, so they are not interpreted as part of DSN */
if (!(dsn = parse_arg(0, "dsn", Py_None, args, kwargs))) { goto exit; }
if (!(factory = parse_arg(1, "connection_factory", Py_None,
args, kwargs))) { goto exit; }
if (!(async = parse_arg(2, "async", Py_False, args, kwargs))) { goto exit; }
if (kwargs && PyMapping_Size(kwargs) > 0) {
if (dsn == Py_None) {
Py_DECREF(dsn);
if (!(dsn = psyco_make_dsn(NULL, NULL, kwargs))) { goto exit; }
} else {
PyErr_SetString(PyExc_TypeError, "both dsn and parameters given");
goto exit;
}
} else {
if (dsn == Py_None) {
PyErr_SetString(PyExc_TypeError, "missing dsn and no parameters");
goto exit;
}
}
res = PyTuple_Pack(3, dsn, factory, async);
exit:
Py_XDECREF(dsn);
Py_XDECREF(factory);
Py_XDECREF(async);
return res;
}
/** connect module-level function **/ /** connect module-level function **/
#define psyco_connect_doc \ #define psyco_connect_doc \
"_connect(dsn, [connection_factory], [async], **kwargs) -- New database connection.\n\n" "_connect(dsn, [connection_factory], [async]) -- New database connection.\n\n"
static PyObject * static PyObject *
psyco_connect(PyObject *self, PyObject *args, PyObject *keywds) psyco_connect(PyObject *self, PyObject *args, PyObject *keywds)
{ {
PyObject *conn = NULL; PyObject *conn = NULL;
PyObject *tuple = NULL;
PyObject *factory = NULL; PyObject *factory = NULL;
const char *dsn = NULL; const char *dsn = NULL;
int async = 0; int async = 0;
if (!(tuple = psyco_parse_args(self, args, keywds))) { goto exit; } static char *kwlist[] = {"dsn", "connection_factory", "async", NULL};
if (!PyArg_ParseTupleAndKeywords(args, keywds, "s|Oi", kwlist,
if (!PyArg_ParseTuple(tuple, "s|Oi", &dsn, &factory, &async)) { goto exit; } &dsn, &factory, &async)) {
return NULL;
}
Dprintf("psyco_connect: dsn = '%s', async = %d", dsn, async); Dprintf("psyco_connect: dsn = '%s', async = %d", dsn, async);
@ -192,9 +112,6 @@ psyco_connect(PyObject *self, PyObject *args, PyObject *keywds)
conn = PyObject_CallFunction(factory, "si", dsn, async); conn = PyObject_CallFunction(factory, "si", dsn, async);
} }
exit:
Py_XDECREF(tuple);
return conn; return conn;
} }
@ -202,7 +119,7 @@ exit:
#define psyco_parse_dsn_doc \ #define psyco_parse_dsn_doc \
"parse_dsn(dsn) -> dict -- parse a connection string into parameters" "parse_dsn(dsn) -> dict -- parse a connection string into parameters"
PyObject * static PyObject *
psyco_parse_dsn(PyObject *self, PyObject *args, PyObject *kwargs) psyco_parse_dsn(PyObject *self, PyObject *args, PyObject *kwargs)
{ {
char *err = NULL; char *err = NULL;
@ -254,114 +171,6 @@ exit:
} }
#define psyco_make_dsn_doc "make_dsn(**kwargs) -> str"
PyObject *
psyco_make_dsn(PyObject *self, PyObject *args, PyObject *kwargs)
{
Py_ssize_t len, pos;
PyObject *res = NULL;
PyObject *key = NULL, *value = NULL;
PyObject *newkey, *newval;
PyObject *dict = NULL;
char *str = NULL, *p, *q;
if (args && (len = PyTuple_Size(args)) > 0) {
PyErr_Format(PyExc_TypeError, "make_dsn() takes no arguments (%d given)", (int)len);
goto exit;
}
if (kwargs == NULL) {
return Text_FromUTF8("");
}
/* iterate through kwargs, calculating the total resulting string
length and saving prepared key/values to a temp. dict */
if (!(dict = PyDict_New())) { goto exit; }
len = 0;
pos = 0;
while (PyDict_Next(kwargs, &pos, &key, &value)) {
if (value == NULL || value == Py_None) { continue; }
Py_INCREF(key); /* for ensure_bytes */
if (!(newkey = psycopg_ensure_bytes(key))) { goto exit; }
/* special handling of 'database' keyword */
if (strcmp(Bytes_AsString(newkey), "database") == 0) {
key = Bytes_FromString("dbname");
Py_DECREF(newkey);
} else {
key = newkey;
}
/* now transform the value */
if (Bytes_CheckExact(value)) {
Py_INCREF(value);
} else if (PyUnicode_CheckExact(value)) {
if (!(value = PyUnicode_AsUTF8String(value))) { goto exit; }
} else {
/* this could be port=5432, so we need to get the text representation */
if (!(value = PyObject_Str(value))) { goto exit; }
/* and still ensure it's bytes() (but no need to incref here) */
if (!(value = psycopg_ensure_bytes(value))) { goto exit; }
}
/* passing NULL for plen checks for NIL bytes in content and errors out */
if (Bytes_AsStringAndSize(value, &str, NULL) < 0) { goto exit; }
/* escape any special chars */
if (!(str = psycopg_escape_conninfo(str, 0))) { goto exit; }
if (!(newval = Bytes_FromString(str))) {
goto exit;
}
PyMem_Free(str);
str = NULL;
Py_DECREF(value);
value = newval;
/* finally put into the temp. dict */
if (PyDict_SetItem(dict, key, value) < 0) { goto exit; }
len += Bytes_GET_SIZE(key) + Bytes_GET_SIZE(value) + 2; /* =, space or NIL */
Py_DECREF(key);
Py_DECREF(value);
}
key = NULL;
value = NULL;
if (!(str = PyMem_Malloc(len))) {
PyErr_NoMemory();
goto exit;
}
p = str;
pos = 0;
while (PyDict_Next(dict, &pos, &newkey, &newval)) {
if (p != str) {
*(p++) = ' ';
}
if (Bytes_AsStringAndSize(newkey, &q, &len) < 0) { goto exit; }
strncpy(p, q, len);
p += len;
*(p++) = '=';
if (Bytes_AsStringAndSize(newval, &q, &len) < 0) { goto exit; }
strncpy(p, q, len);
p += len;
}
*p = '\0';
res = Text_FromUTF8AndSize(str, p - str);
exit:
PyMem_Free(str);
Py_XDECREF(key);
Py_XDECREF(value);
Py_XDECREF(dict);
return res;
}
#define psyco_quote_ident_doc \ #define psyco_quote_ident_doc \
"quote_ident(str, conn_or_curs) -> str -- wrapper around PQescapeIdentifier\n\n" \ "quote_ident(str, conn_or_curs) -> str -- wrapper around PQescapeIdentifier\n\n" \
":Parameters:\n" \ ":Parameters:\n" \
@ -1016,12 +825,8 @@ error:
static PyMethodDef psycopgMethods[] = { static PyMethodDef psycopgMethods[] = {
{"_connect", (PyCFunction)psyco_connect, {"_connect", (PyCFunction)psyco_connect,
METH_VARARGS|METH_KEYWORDS, psyco_connect_doc}, METH_VARARGS|METH_KEYWORDS, psyco_connect_doc},
{"parse_args", (PyCFunction)psyco_parse_args,
METH_VARARGS|METH_KEYWORDS, psyco_parse_args_doc},
{"parse_dsn", (PyCFunction)psyco_parse_dsn, {"parse_dsn", (PyCFunction)psyco_parse_dsn,
METH_VARARGS|METH_KEYWORDS, psyco_parse_dsn_doc}, METH_VARARGS|METH_KEYWORDS, psyco_parse_dsn_doc},
{"make_dsn", (PyCFunction)psyco_make_dsn,
METH_VARARGS|METH_KEYWORDS, psyco_make_dsn_doc},
{"quote_ident", (PyCFunction)psyco_quote_ident, {"quote_ident", (PyCFunction)psyco_quote_ident,
METH_VARARGS|METH_KEYWORDS, psyco_quote_ident_doc}, METH_VARARGS|METH_KEYWORDS, psyco_quote_ident_doc},
{"adapt", (PyCFunction)psyco_microprotocols_adapt, {"adapt", (PyCFunction)psyco_microprotocols_adapt,

View File

@ -40,8 +40,10 @@ typedef struct replicationConnectionObject {
long int type; long int type;
} replicationConnectionObject; } replicationConnectionObject;
#define REPLICATION_PHYSICAL 1 /* The funny constant values should help to avoid mixups with some
#define REPLICATION_LOGICAL 2 commonly used numbers like 1 and 2. */
#define REPLICATION_PHYSICAL 12345678
#define REPLICATION_LOGICAL 87654321
extern HIDDEN PyObject *replicationPhysicalConst; extern HIDDEN PyObject *replicationPhysicalConst;
extern HIDDEN PyObject *replicationLogicalConst; extern HIDDEN PyObject *replicationLogicalConst;

View File

@ -58,81 +58,88 @@ psyco_repl_conn_get_type(replicationConnectionObject *self)
return res; return res;
} }
static int static int
replicationConnection_init(PyObject *obj, PyObject *args, PyObject *kwargs) replicationConnection_init(PyObject *obj, PyObject *args, PyObject *kwargs)
{ {
replicationConnectionObject *self = (replicationConnectionObject *)obj; replicationConnectionObject *self = (replicationConnectionObject *)obj;
PyObject *dsn = NULL; PyObject *dsn = NULL, *replication_type = NULL,
PyObject *async = NULL; *item = NULL, *ext = NULL, *make_dsn = NULL,
PyObject *tmp = NULL; *extras = NULL, *cursor = NULL;
const char *repl = NULL; int async = 0;
int ret = -1; int ret = -1;
Py_XINCREF(args); /* 'replication_type' is not actually optional, but there's no
Py_XINCREF(kwargs); good way to put it before 'async' in the list */
static char *kwlist[] = {"dsn", "async", "replication_type", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O|iO", kwlist,
&dsn, &async, &replication_type)) { return ret; }
/* dsn, async, replication_type */ /*
if (!(dsn = parse_arg(0, "dsn", Py_None, args, kwargs))) { goto exit; } We have to call make_dsn() to add replication-specific
if (!(async = parse_arg(1, "async", Py_False, args, kwargs))) { goto exit; } connection parameters, because the DSN might be an URI (if there
if (!(tmp = parse_arg(2, "replication_type", Py_None, args, kwargs))) { goto exit; } were no keyword arguments to connect() it is passed unchanged).
*/
/* we reuse args and kwargs to call make_dsn() and parent type's tp_init() */
if (!(kwargs = PyDict_New())) { return ret; }
Py_INCREF(args);
if (tmp == replicationPhysicalConst) { /* we also reuse the dsn to hold the result of the make_dsn() call */
Py_INCREF(dsn);
if (!(ext = PyImport_ImportModule("psycopg2.extensions"))) { goto exit; }
if (!(make_dsn = PyObject_GetAttrString(ext, "make_dsn"))) { goto exit; }
/* all the nice stuff is located in python-level ReplicationCursor class */
if (!(extras = PyImport_ImportModule("psycopg2.extras"))) { goto exit; }
if (!(cursor = PyObject_GetAttrString(extras, "ReplicationCursor"))) { goto exit; }
/* checking the object reference helps to avoid recognizing
unrelated integer constants as valid input values */
if (replication_type == replicationPhysicalConst) {
self->type = REPLICATION_PHYSICAL; self->type = REPLICATION_PHYSICAL;
repl = "true";
} else if (tmp == replicationLogicalConst) { #define SET_ITEM(k, v) \
if (!(item = Text_FromUTF8(#v))) { goto exit; } \
if (PyDict_SetItemString(kwargs, #k, item) != 0) { goto exit; } \
Py_DECREF(item); \
item = NULL;
SET_ITEM(replication, true);
SET_ITEM(dbname, replication); /* required for .pgpass lookup */
} else if (replication_type == replicationLogicalConst) {
self->type = REPLICATION_LOGICAL; self->type = REPLICATION_LOGICAL;
repl = "database";
SET_ITEM(replication, database);
#undef SET_ITEM
} else { } else {
PyErr_SetString(PyExc_TypeError, PyErr_SetString(PyExc_TypeError,
"replication_type must be either REPLICATION_PHYSICAL or REPLICATION_LOGICAL"); "replication_type must be either REPLICATION_PHYSICAL or REPLICATION_LOGICAL");
goto exit; goto exit;
} }
Py_DECREF(tmp);
tmp = NULL;
if (dsn != Py_None) {
if (kwargs && PyMapping_Size(kwargs) > 0) {
PyErr_SetString(PyExc_TypeError, "both dsn and parameters given");
goto exit;
} else {
if (!(tmp = PyTuple_Pack(1, dsn))) { goto exit; }
Py_XDECREF(kwargs);
if (!(kwargs = psyco_parse_dsn(NULL, tmp, NULL))) { goto exit; }
}
} else {
if (!(kwargs && PyMapping_Size(kwargs) > 0)) {
PyErr_SetString(PyExc_TypeError, "missing dsn and no parameters");
goto exit;
}
}
if (!PyMapping_HasKeyString(kwargs, "replication")) {
PyMapping_SetItemString(kwargs, "replication", Text_FromUTF8(repl));
}
/* with physical specify dbname=replication for .pgpass lookup */
if (self->type == REPLICATION_PHYSICAL) {
PyMapping_SetItemString(kwargs, "dbname", Text_FromUTF8("replication"));
}
Py_DECREF(dsn);
if (!(dsn = psyco_make_dsn(NULL, NULL, kwargs))) { goto exit; }
Py_DECREF(args); Py_DECREF(args);
Py_DECREF(kwargs); if (!(args = PyTuple_Pack(1, dsn))) { goto exit; }
kwargs = NULL;
if (!(args = PyTuple_Pack(2, dsn, async))) { goto exit; }
Py_DECREF(dsn);
if (!(dsn = PyObject_Call(make_dsn, args, kwargs))) { goto exit; }
Py_DECREF(args);
if (!(args = Py_BuildValue("(Oi)", dsn, async))) { goto exit; }
/* only attempt the connection once we've handled all possible errors */
if ((ret = connectionType.tp_init(obj, args, NULL)) < 0) { goto exit; } if ((ret = connectionType.tp_init(obj, args, NULL)) < 0) { goto exit; }
self->conn.autocommit = 1; self->conn.autocommit = 1;
self->conn.cursor_factory = (PyObject *)&replicationCursorType; Py_INCREF(self->conn.cursor_factory = cursor);
Py_INCREF(self->conn.cursor_factory);
exit: exit:
Py_XDECREF(tmp); Py_XDECREF(item);
Py_XDECREF(ext);
Py_XDECREF(make_dsn);
Py_XDECREF(extras);
Py_XDECREF(cursor);
Py_XDECREF(dsn); Py_XDECREF(dsn);
Py_XDECREF(async);
Py_XDECREF(args); Py_XDECREF(args);
Py_XDECREF(kwargs); Py_XDECREF(kwargs);

View File

@ -40,7 +40,7 @@ class ConnectTestCase(unittest.TestCase):
self.args = (dsn, connection_factory, async) self.args = (dsn, connection_factory, async)
self._connect_orig = psycopg2._connect self._connect_orig = psycopg2._connect
psycopg2._connect = connect_stub psycopg2._connect = conect_stub
def tearDown(self): def tearDown(self):
psycopg2._connect = self._connect_orig psycopg2._connect = self._connect_orig