Better testing of encryption function with libpq < 10

This commit is contained in:
Daniele Varrazzo 2018-08-17 00:55:40 +01:00
parent 153b0983c1
commit 344ce15261

View File

@ -35,11 +35,9 @@ import psycopg2.errorcodes
from psycopg2 import extensions as ext from psycopg2 import extensions as ext
from .testutils import ( from .testutils import (
unittest, decorate_all_tests, skip_if_no_superuser, unittest, decorate_all_tests, skip_if_no_superuser, skip_before_postgres,
skip_before_postgres, skip_after_postgres, skip_before_libpq, skip_after_postgres, skip_before_libpq, skip_after_libpq,
ConnectingTestCase, skip_if_tpc_disabled, skip_if_windows, slow, ConnectingTestCase, skip_if_tpc_disabled, skip_if_windows, slow)
libpq_version
)
from .testconfig import dsn, dbname from .testconfig import dsn, dbname
@ -1411,55 +1409,37 @@ class TransactionControlTests(ConnectingTestCase):
class TestEncryptPassword(ConnectingTestCase): class TestEncryptPassword(ConnectingTestCase):
@skip_before_postgres(10) @skip_before_postgres(10)
def test_encrypt_password_post_9_6(self): def test_encrypt_password_post_9_6(self):
cur = self.conn.cursor()
cur.execute("SHOW password_encryption;")
server_encryption_algorithm = cur.fetchone()[0]
# MD5 algorithm # MD5 algorithm
self.assertEqual( self.assertEqual(
ext.encrypt_password('psycopg2', 'ashesh', self.conn, 'md5'), ext.encrypt_password('psycopg2', 'ashesh', self.conn, 'md5'),
'md594839d658c28a357126f105b9cb14cfc' 'md594839d658c28a357126f105b9cb14cfc')
)
# keywords # keywords
self.assertEqual( self.assertEqual(
ext.encrypt_password( ext.encrypt_password(
password='psycopg2', user='ashesh', password='psycopg2', user='ashesh',
scope=self.conn, algorithm='md5'), scope=self.conn, algorithm='md5'),
'md594839d658c28a357126f105b9cb14cfc' 'md594839d658c28a357126f105b9cb14cfc')
)
if libpq_version() < 100000: @skip_before_postgres(10)
self.assertRaises( def test_encrypt_server(self):
psycopg2.NotSupportedError, cur = self.conn.cursor()
ext.encrypt_password, 'psycopg2', 'ashesh', self.conn, cur.execute("SHOW password_encryption;")
'scram-sha-256' server_encryption_algorithm = cur.fetchone()[0]
)
else:
enc_password = ext.encrypt_password( enc_password = ext.encrypt_password(
'psycopg2', 'ashesh', self.conn 'psycopg2', 'ashesh', self.conn)
)
if server_encryption_algorithm == 'md5': if server_encryption_algorithm == 'md5':
self.assertEqual( self.assertEqual(
enc_password, 'md594839d658c28a357126f105b9cb14cfc' enc_password, 'md594839d658c28a357126f105b9cb14cfc')
)
elif server_encryption_algorithm == 'scram-sha-256': elif server_encryption_algorithm == 'scram-sha-256':
self.assertEqual(enc_password[:14], 'SCRAM-SHA-256$') self.assertEqual(enc_password[:14], 'SCRAM-SHA-256$')
self.assertEqual( self.assertEqual(
ext.encrypt_password( ext.encrypt_password(
'psycopg2', 'ashesh', self.conn, 'scram-sha-256' 'psycopg2', 'ashesh', self.conn, 'scram-sha-256'
)[:14], 'SCRAM-SHA-256$' )[:14], 'SCRAM-SHA-256$')
)
self.assertRaises(psycopg2.ProgrammingError,
ext.encrypt_password, 'psycopg2', 'ashesh', self.conn, 'abc')
@skip_after_postgres(10)
def test_encrypt_password_pre_10(self):
self.assertEqual(
ext.encrypt_password('psycopg2', 'ashesh', self.conn),
'md594839d658c28a357126f105b9cb14cfc'
)
self.assertRaises(psycopg2.ProgrammingError, self.assertRaises(psycopg2.ProgrammingError,
ext.encrypt_password, 'psycopg2', 'ashesh', self.conn, 'abc') ext.encrypt_password, 'psycopg2', 'ashesh', self.conn, 'abc')
@ -1467,16 +1447,27 @@ class TestEncryptPassword(ConnectingTestCase):
def test_encrypt_md5(self): def test_encrypt_md5(self):
self.assertEqual( self.assertEqual(
ext.encrypt_password('psycopg2', 'ashesh', algorithm='md5'), ext.encrypt_password('psycopg2', 'ashesh', algorithm='md5'),
'md594839d658c28a357126f105b9cb14cfc' 'md594839d658c28a357126f105b9cb14cfc')
)
@skip_before_libpq(10)
def test_encrypt_bad_libpq_10(self):
self.assertRaises(psycopg2.ProgrammingError,
ext.encrypt_password, 'psycopg2', 'ashesh', self.conn, 'abc')
@skip_after_libpq(10)
def test_encrypt_bad_before_libpq_10(self):
self.assertRaises(psycopg2.NotSupportedError,
ext.encrypt_password, 'psycopg2', 'ashesh', self.conn, 'abc')
@skip_before_libpq(10)
def test_encrypt_scram(self): def test_encrypt_scram(self):
if libpq_version() >= 100000:
self.assert_( self.assert_(
ext.encrypt_password( ext.encrypt_password(
'psycopg2', 'ashesh', self.conn, 'scram-sha-256') 'psycopg2', 'ashesh', self.conn, 'scram-sha-256')
.startswith('SCRAM-SHA-256$')) .startswith('SCRAM-SHA-256$'))
else:
@skip_after_libpq(10)
def test_encrypt_scram_pre_10(self):
self.assertRaises(psycopg2.NotSupportedError, self.assertRaises(psycopg2.NotSupportedError,
ext.encrypt_password, ext.encrypt_password,
password='psycopg2', user='ashesh', password='psycopg2', user='ashesh',