Added ConnectionInfo.ssl_attribute()

This commit is contained in:
Daniele Varrazzo 2018-10-13 02:21:38 +01:00
parent cb3d5f9d92
commit 9f6a3a5e96
3 changed files with 96 additions and 1 deletions

View File

@ -188,6 +188,8 @@ introspection etc.
.. autoattribute:: needs_password .. autoattribute:: needs_password
.. autoattribute:: used_password .. autoattribute:: used_password
.. autoattribute:: ssl_in_use .. autoattribute:: ssl_in_use
.. automethod:: ssl_attribute(name)
.. autoattribute:: ssl_attribute_names
.. class:: Column(\*args, \*\*kwargs) .. class:: Column(\*args, \*\*kwargs)

View File

@ -366,6 +366,76 @@ ssl_in_use_get(connInfoObject *self)
} }
static const char ssl_attribute_doc[] =
"Returns SSL-related information about the connection.\n"
"\n"
":param name: The name of the attribute to return.\n"
":type name: `!str`\n"
":return: The attribute value, `!None` if unknown.\n"
":rtype: `!str`\n"
"\n"
"Valid names are available in `ssl_attribute_names`.\n"
"\n"
".. seealso:: libpq docs for `PQsslAttribute()`__ for details.\n"
".. __: https://www.postgresql.org/docs/current/static/libpq-status.html"
"#LIBPQ-PQSSLATTRIBUTE";
static PyObject *
ssl_attribute(connInfoObject *self, PyObject *args, PyObject *kwargs)
{
static char *kwlist[] = {"name", NULL};
const char *name;
const char *val;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "s", kwlist, &name)) {
return NULL;
}
val = PQsslAttribute(self->conn->pgconn, name);
if (!val) {
Py_RETURN_NONE;
}
else {
return conn_text_from_chars(self->conn, val);
}
}
static const char ssl_attribute_names_doc[] =
"The list of the SSL attribute names available.\n"
"\n"
":type: `!list` of `!str`\n"
"\n"
".. seealso:: libpq docs for `PQsslAttributeNames()`__ for details.\n"
".. __: https://www.postgresql.org/docs/current/static/libpq-status.html"
"#LIBPQ-PQSSLATTRIBUTENAMES";
static PyObject *
ssl_attribute_names_get(connInfoObject *self)
{
const char* const* names;
int i;
PyObject *l = NULL, *s = NULL, *rv = NULL;
names = PQsslAttributeNames(self->conn->pgconn);
if (!(l = PyList_New(0))) { goto exit; }
for (i = 0; names[i]; i++) {
if (!(s = conn_text_from_chars(self->conn, names[i]))) { goto exit; }
if (0 != PyList_Append(l, s)) { goto exit; }
Py_CLEAR(s);
}
rv = l;
l = NULL;
exit:
Py_XDECREF(l);
Py_XDECREF(s);
return rv;
}
static struct PyGetSetDef connInfoObject_getsets[] = { static struct PyGetSetDef connInfoObject_getsets[] = {
{ "dbname", (getter)dbname_get, NULL, (char *)dbname_doc }, { "dbname", (getter)dbname_get, NULL, (char *)dbname_doc },
{ "user", (getter)user_get, NULL, (char *)user_doc }, { "user", (getter)user_get, NULL, (char *)user_doc },
@ -390,6 +460,14 @@ static struct PyGetSetDef connInfoObject_getsets[] = {
(char *)needs_password_doc }, (char *)needs_password_doc },
{ "ssl_in_use", (getter)ssl_in_use_get, NULL, { "ssl_in_use", (getter)ssl_in_use_get, NULL,
(char *)ssl_in_use_doc }, (char *)ssl_in_use_doc },
{ "ssl_attribute_names", (getter)ssl_attribute_names_get, NULL,
(char *)ssl_attribute_names_doc },
{NULL}
};
static struct PyMethodDef connInfoObject_methods[] = {
{"ssl_attribute", (PyCFunction)ssl_attribute,
METH_VARARGS|METH_KEYWORDS, ssl_attribute_doc},
{NULL} {NULL}
}; };
@ -457,7 +535,7 @@ PyTypeObject connInfoType = {
0, /*tp_weaklistoffset*/ 0, /*tp_weaklistoffset*/
0, /*tp_iter*/ 0, /*tp_iter*/
0, /*tp_iternext*/ 0, /*tp_iternext*/
0, /*tp_methods*/ connInfoObject_methods, /*tp_methods*/
0, /*tp_members*/ 0, /*tp_members*/
connInfoObject_getsets, /*tp_getset*/ connInfoObject_getsets, /*tp_getset*/
0, /*tp_base*/ 0, /*tp_base*/

View File

@ -1792,6 +1792,21 @@ class TestConnectionInfo(ConnectingTestCase):
self.assertIsInstance(self.conn.info.ssl_in_use, bool) self.assertIsInstance(self.conn.info.ssl_in_use, bool)
self.assertIs(self.bconn.info.ssl_in_use, False) self.assertIs(self.bconn.info.ssl_in_use, False)
def test_ssl_attribute(self):
attribs = self.conn.info.ssl_attribute_names
self.assert_(attribs)
if self.conn.info.ssl_in_use:
for attrib in attribs:
self.assertIsInstance(self.conn.info.ssl_attribute(attrib), str)
else:
for attrib in attribs:
self.assertIsNone(self.conn.info.ssl_attribute(attrib))
self.assertIsNone(self.conn.info.ssl_attribute('wat'))
for attrib in attribs:
self.assertIsNone(self.bconn.info.ssl_attribute(attrib))
def test_suite(): def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__) return unittest.TestLoader().loadTestsFromName(__name__)