From 344ce152612949755d3a54c0bcb5e347e7c2472c Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Fri, 17 Aug 2018 00:55:40 +0100 Subject: [PATCH] Better testing of encryption function with libpq < 10 --- tests/test_connection.py | 97 ++++++++++++++++++---------------------- 1 file changed, 44 insertions(+), 53 deletions(-) diff --git a/tests/test_connection.py b/tests/test_connection.py index 13635f1f..1342f9f8 100755 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -35,11 +35,9 @@ import psycopg2.errorcodes from psycopg2 import extensions as ext from .testutils import ( - unittest, decorate_all_tests, skip_if_no_superuser, - skip_before_postgres, skip_after_postgres, skip_before_libpq, - ConnectingTestCase, skip_if_tpc_disabled, skip_if_windows, slow, - libpq_version -) + unittest, decorate_all_tests, skip_if_no_superuser, skip_before_postgres, + skip_after_postgres, skip_before_libpq, skip_after_libpq, + ConnectingTestCase, skip_if_tpc_disabled, skip_if_windows, slow) from .testconfig import dsn, dbname @@ -1411,55 +1409,37 @@ class TransactionControlTests(ConnectingTestCase): class TestEncryptPassword(ConnectingTestCase): @skip_before_postgres(10) def test_encrypt_password_post_9_6(self): - cur = self.conn.cursor() - cur.execute("SHOW password_encryption;") - server_encryption_algorithm = cur.fetchone()[0] - # MD5 algorithm self.assertEqual( ext.encrypt_password('psycopg2', 'ashesh', self.conn, 'md5'), - 'md594839d658c28a357126f105b9cb14cfc' - ) + 'md594839d658c28a357126f105b9cb14cfc') # keywords self.assertEqual( ext.encrypt_password( password='psycopg2', user='ashesh', scope=self.conn, algorithm='md5'), - 'md594839d658c28a357126f105b9cb14cfc' - ) - if libpq_version() < 100000: - self.assertRaises( - psycopg2.NotSupportedError, - ext.encrypt_password, 'psycopg2', 'ashesh', self.conn, - 'scram-sha-256' - ) - else: - enc_password = ext.encrypt_password( - 'psycopg2', 'ashesh', self.conn - ) - if server_encryption_algorithm == 'md5': - self.assertEqual( - enc_password, 'md594839d658c28a357126f105b9cb14cfc' - ) - elif server_encryption_algorithm == 'scram-sha-256': - self.assertEqual(enc_password[:14], 'SCRAM-SHA-256$') + 'md594839d658c28a357126f105b9cb14cfc') + @skip_before_postgres(10) + def test_encrypt_server(self): + cur = self.conn.cursor() + cur.execute("SHOW password_encryption;") + server_encryption_algorithm = cur.fetchone()[0] + + enc_password = ext.encrypt_password( + 'psycopg2', 'ashesh', self.conn) + + if server_encryption_algorithm == 'md5': self.assertEqual( - ext.encrypt_password( - 'psycopg2', 'ashesh', self.conn, 'scram-sha-256' - )[:14], 'SCRAM-SHA-256$' - ) + enc_password, 'md594839d658c28a357126f105b9cb14cfc') + elif server_encryption_algorithm == 'scram-sha-256': + self.assertEqual(enc_password[:14], 'SCRAM-SHA-256$') - self.assertRaises(psycopg2.ProgrammingError, - ext.encrypt_password, 'psycopg2', 'ashesh', self.conn, 'abc') - - @skip_after_postgres(10) - def test_encrypt_password_pre_10(self): self.assertEqual( - ext.encrypt_password('psycopg2', 'ashesh', self.conn), - 'md594839d658c28a357126f105b9cb14cfc' - ) + ext.encrypt_password( + 'psycopg2', 'ashesh', self.conn, 'scram-sha-256' + )[:14], 'SCRAM-SHA-256$') self.assertRaises(psycopg2.ProgrammingError, ext.encrypt_password, 'psycopg2', 'ashesh', self.conn, 'abc') @@ -1467,20 +1447,31 @@ class TestEncryptPassword(ConnectingTestCase): def test_encrypt_md5(self): self.assertEqual( ext.encrypt_password('psycopg2', 'ashesh', algorithm='md5'), - 'md594839d658c28a357126f105b9cb14cfc' - ) + 'md594839d658c28a357126f105b9cb14cfc') + @skip_before_libpq(10) + def test_encrypt_bad_libpq_10(self): + self.assertRaises(psycopg2.ProgrammingError, + ext.encrypt_password, 'psycopg2', 'ashesh', self.conn, 'abc') + + @skip_after_libpq(10) + def test_encrypt_bad_before_libpq_10(self): + self.assertRaises(psycopg2.NotSupportedError, + ext.encrypt_password, 'psycopg2', 'ashesh', self.conn, 'abc') + + @skip_before_libpq(10) def test_encrypt_scram(self): - if libpq_version() >= 100000: - self.assert_( - ext.encrypt_password( - 'psycopg2', 'ashesh', self.conn, 'scram-sha-256') - .startswith('SCRAM-SHA-256$')) - else: - self.assertRaises(psycopg2.NotSupportedError, - ext.encrypt_password, - password='psycopg2', user='ashesh', - scope=self.conn, algorithm='scram-sha-256') + self.assert_( + ext.encrypt_password( + 'psycopg2', 'ashesh', self.conn, 'scram-sha-256') + .startswith('SCRAM-SHA-256$')) + + @skip_after_libpq(10) + def test_encrypt_scram_pre_10(self): + self.assertRaises(psycopg2.NotSupportedError, + ext.encrypt_password, + password='psycopg2', user='ashesh', + scope=self.conn, algorithm='scram-sha-256') def test_bad_types(self): self.assertRaises(TypeError, ext.encrypt_password)