Added support for preparing the encrypted password of a PostgreSQL

password using the libpq functions - 'PQencryptPasswordConn', and
'PQencryptPassword'.
This commit is contained in:
Ashesh Vashi 2017-07-17 10:32:56 +05:30
parent d2e86db8fb
commit cfb0937605
3 changed files with 151 additions and 1 deletions

View File

@ -681,6 +681,33 @@ The ``connection`` class
.. __: http://www.postgresql.org/docs/current/static/libpq-status.html#LIBPQ-PQTRANSACTIONSTATUS .. __: http://www.postgresql.org/docs/current/static/libpq-status.html#LIBPQ-PQTRANSACTIONSTATUS
.. method:: encrypt_password(password, user, [algorithm])
Returns the encrypted form of a PostgreSQL password based on the
current password encryption algorithm.
Raises `~psycopg2.NotSupportedError` if the ``psycopg2`` module was
compiled with a ``libpq`` version lesser than 10 (which can be detected
by the `~psycopg2.__libpq_version__` constant), when encryption
algorithm other than 'md5' is specified for the server version greater
than, or equal to 10.
Ignores the encrytion algorithm for servers version less than 10, and
always uses 'md5' as encryption algorithm.
.. seealso:: libpq docs for `PQencryptPasswordConn()`__ for details.
.. __: https://www.postgresql.org/docs/devel/static/libpq-misc.html#libpq-pqencryptpasswordconn
.. seealso:: libpq docs for `PQencryptPassword()`__ for details.
.. __: https://www.postgresql.org/docs/devel/static/libpq-misc.html#libpq-pqencryptpassword
.. seealso:: libpq docs for `password_encryption`__ for details.
.. __: https://www.postgresql.org/docs/devel/static/runtime-config-connection.html#guc-password-encryption
.. index:: .. index::
pair: Protocol; Version pair: Protocol; Version

View File

@ -547,6 +547,73 @@ do { \
EXC_IF_TPC_PREPARED(self, what); \ EXC_IF_TPC_PREPARED(self, what); \
} while(0) } while(0)
/* encrypt_password - Prepare the encrypted password form */
#define psyco_encrypt_password_doc \
"encrypt_password('password', 'user', ...) -- Prepares the encrypted form of a PostgreSQL password.\n\n" \
"Accepted arguments are 'algorithm'."
static PyObject *
psyco_encrypt_password(connectionObject *self, PyObject *args, PyObject *kwargs)
{
const char *password = NULL,
*user = NULL,
*algorithm = NULL;
char *encrypted = NULL;
PyObject *res = Py_None;
static char *kwlist[] = {"password", "user", "algorithm", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "ss|s", kwlist, &password, &user, &algorithm)) {
return NULL;
}
if (self->server_version < 100000 ||
(algorithm && strcmp(algorithm, "md5") == 0)
) {
encrypted = PQencryptPassword(password, user);
if (encrypted != NULL)
{
res = PyString_FromString(encrypted);
free(encrypted);
}
return res;
}
else
{
#if PG_VERSION_NUM >= 100000
encrypted = PQencryptPasswordConn(self->pgconn, password, user, algorithm);
if (!encrypted)
{
const char *msg;
msg = PQerrorMessage(self->pgconn);
if (msg && *msg) {
PyErr_Format(
ProgrammingError,
"Error encrypting the password!\n%s",
msg
);
return NULL;
}
}
else
{
res = PyString_FromString(encrypted);
}
return res;
#else
PyErr_SetString(
NotSupportedError,
"Password encryption (other than 'md5' algorithm) is not supported for the server version >= 10 in libpq < 10"
);
#endif
}
return NULL;
}
/* set_session - set default transaction characteristics */ /* set_session - set default transaction characteristics */
#define psyco_conn_set_session_doc \ #define psyco_conn_set_session_doc \
@ -1176,6 +1243,8 @@ static struct PyMethodDef connectionObject_methods[] = {
METH_NOARGS, psyco_conn_isexecuting_doc}, METH_NOARGS, psyco_conn_isexecuting_doc},
{"cancel", (PyCFunction)psyco_conn_cancel, {"cancel", (PyCFunction)psyco_conn_cancel,
METH_NOARGS, psyco_conn_cancel_doc}, METH_NOARGS, psyco_conn_cancel_doc},
{"encrypt_password", (PyCFunction)psyco_encrypt_password,
METH_VARARGS|METH_KEYWORDS, psyco_encrypt_password_doc},
{NULL} {NULL}
}; };

View File

@ -37,7 +37,9 @@ from psycopg2 import extensions as ext
from testutils import ( from testutils import (
script_to_py3, unittest, decorate_all_tests, skip_if_no_superuser, script_to_py3, unittest, decorate_all_tests, skip_if_no_superuser,
skip_before_postgres, skip_after_postgres, skip_before_libpq, skip_before_postgres, skip_after_postgres, skip_before_libpq,
ConnectingTestCase, skip_if_tpc_disabled, skip_if_windows, slow) ConnectingTestCase, skip_if_tpc_disabled, skip_if_windows, slow,
libpq_version
)
from testconfig import dsn, dbname from testconfig import dsn, dbname
@ -1382,6 +1384,58 @@ class TransactionControlTests(ConnectingTestCase):
cur.execute("SHOW default_transaction_read_only;") cur.execute("SHOW default_transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'off') self.assertEqual(cur.fetchone()[0], 'off')
@skip_before_postgres(10)
def test_encrypt_password_post_9_6(self):
cur = self.conn.cursor()
cur.execute("SHOW password_encryption;")
server_encryption_algorithm = cur.fetchone()[0]
# MD5 algorithm
self.assertEqual(
self.conn.encrypt_password('psycopg2', 'ashesh', 'md5'),
'md594839d658c28a357126f105b9cb14cfc'
)
if libpq_version() < 100000:
if server_encryption_algorithm == 'md5':
self.assertEqual(
self.conn.encrypt_password('psycopg2', 'ashesh'),
'md594839d658c28a357126f105b9cb14cfc'
)
else:
self.assertRaises(
psycopg2.ProgrammingError,
self.conn.encrypt_password, 'psycopg2', 'ashesh'
)
else:
enc_password = self.conn.encrypt_password('psycopg2', 'ashesh')
if server_encryption_algorithm == 'md5':
self.assertEqual(
enc_password, 'md594839d658c28a357126f105b9cb14cfc'
)
elif server_encryption_algorithm == 'scram-sha-256':
self.assertEqual(enc_password[:14], 'SCRAM-SHA-256$')
self.assertEqual(
self.conn.encrypt_password(
'psycopg2', 'ashesh', 'scram-sha-256'
)[:14], 'SCRAM-SHA-256$'
)
@skip_after_postgres(10)
def test_encrypt_password_pre_10(self):
self.assertEqual(
self.conn.encrypt_password('psycopg2', 'ashesh'),
'md594839d658c28a357126f105b9cb14cfc'
)
# Encryption algorithm will be ignored for postgres version < 10, it
# will always use MD5.
self.assertEqual(
self.conn.encrypt_password('psycopg2', 'ashesh', 'abc'),
'md594839d658c28a357126f105b9cb14cfc'
)
class AutocommitTests(ConnectingTestCase): class AutocommitTests(ConnectingTestCase):
def test_closed(self): def test_closed(self):