diff --git a/tests/dbapi20.py b/tests/dbapi20.py index b3c64051..7f2d0284 100644 --- a/tests/dbapi20.py +++ b/tests/dbapi20.py @@ -167,7 +167,7 @@ class DatabaseAPI20Test(unittest.TestCase): # Must exist threadsafety = self.driver.threadsafety # Must be a valid value - self.failUnless(threadsafety in (0,1,2,3)) + self.assertTrue(threadsafety in (0,1,2,3)) except AttributeError: self.fail("Driver doesn't define threadsafety") @@ -176,7 +176,7 @@ class DatabaseAPI20Test(unittest.TestCase): # Must exist paramstyle = self.driver.paramstyle # Must be a valid value - self.failUnless(paramstyle in ( + self.assertTrue(paramstyle in ( 'qmark','numeric','named','format','pyformat' )) except AttributeError: @@ -185,27 +185,27 @@ class DatabaseAPI20Test(unittest.TestCase): def test_Exceptions(self): # Make sure required exceptions exist, and are in the # defined hierarchy. - self.failUnless(issubclass(self.driver.Warning,Exception)) - self.failUnless(issubclass(self.driver.Error,Exception)) - self.failUnless( + self.assertTrue(issubclass(self.driver.Warning,Exception)) + self.assertTrue(issubclass(self.driver.Error,Exception)) + self.assertTrue( issubclass(self.driver.InterfaceError,self.driver.Error) ) - self.failUnless( + self.assertTrue( issubclass(self.driver.DatabaseError,self.driver.Error) ) - self.failUnless( + self.assertTrue( issubclass(self.driver.OperationalError,self.driver.Error) ) - self.failUnless( + self.assertTrue( issubclass(self.driver.IntegrityError,self.driver.Error) ) - self.failUnless( + self.assertTrue( issubclass(self.driver.InternalError,self.driver.Error) ) - self.failUnless( + self.assertTrue( issubclass(self.driver.ProgrammingError,self.driver.Error) ) - self.failUnless( + self.assertTrue( issubclass(self.driver.NotSupportedError,self.driver.Error) ) @@ -218,15 +218,15 @@ class DatabaseAPI20Test(unittest.TestCase): # by default. con = self._connect() drv = self.driver - self.failUnless(con.Warning is drv.Warning) - self.failUnless(con.Error is drv.Error) - self.failUnless(con.InterfaceError is drv.InterfaceError) - self.failUnless(con.DatabaseError is drv.DatabaseError) - self.failUnless(con.OperationalError is drv.OperationalError) - self.failUnless(con.IntegrityError is drv.IntegrityError) - self.failUnless(con.InternalError is drv.InternalError) - self.failUnless(con.ProgrammingError is drv.ProgrammingError) - self.failUnless(con.NotSupportedError is drv.NotSupportedError) + self.assertTrue(con.Warning is drv.Warning) + self.assertTrue(con.Error is drv.Error) + self.assertTrue(con.InterfaceError is drv.InterfaceError) + self.assertTrue(con.DatabaseError is drv.DatabaseError) + self.assertTrue(con.OperationalError is drv.OperationalError) + self.assertTrue(con.IntegrityError is drv.IntegrityError) + self.assertTrue(con.InternalError is drv.InternalError) + self.assertTrue(con.ProgrammingError is drv.ProgrammingError) + self.assertTrue(con.NotSupportedError is drv.NotSupportedError) def test_commit(self): @@ -318,12 +318,12 @@ class DatabaseAPI20Test(unittest.TestCase): cur.execute("insert into %sbooze values ('Victoria Bitter')" % ( self.table_prefix )) - self.failUnless(cur.rowcount in (-1,1), + self.assertTrue(cur.rowcount in (-1,1), 'cursor.rowcount should == number or rows inserted, or ' 'set to -1 after executing an insert statement' ) cur.execute("select name from %sbooze" % self.table_prefix) - self.failUnless(cur.rowcount in (-1,1), + self.assertTrue(cur.rowcount in (-1,1), 'cursor.rowcount should == number of rows returned, or ' 'set to -1 after executing a select statement' ) @@ -388,7 +388,7 @@ class DatabaseAPI20Test(unittest.TestCase): cur.execute("insert into %sbooze values ('Victoria Bitter')" % ( self.table_prefix )) - self.failUnless(cur.rowcount in (-1,1)) + self.assertTrue(cur.rowcount in (-1,1)) if self.driver.paramstyle == 'qmark': cur.execute( @@ -417,7 +417,7 @@ class DatabaseAPI20Test(unittest.TestCase): ) else: self.fail('Invalid paramstyle') - self.failUnless(cur.rowcount in (-1,1)) + self.assertTrue(cur.rowcount in (-1,1)) cur.execute('select name from %sbooze' % self.table_prefix) res = cur.fetchall() @@ -469,7 +469,7 @@ class DatabaseAPI20Test(unittest.TestCase): ) else: self.fail('Unknown paramstyle') - self.failUnless(cur.rowcount in (-1,2), + self.assertTrue(cur.rowcount in (-1,2), 'insert using cursor.executemany set cursor.rowcount to ' 'incorrect value %r' % cur.rowcount ) @@ -504,7 +504,7 @@ class DatabaseAPI20Test(unittest.TestCase): 'cursor.fetchone should return None if a query retrieves ' 'no rows' ) - self.failUnless(cur.rowcount in (-1,0)) + self.assertTrue(cur.rowcount in (-1,0)) # cursor.fetchone should raise an Error if called after # executing a query that cannot return rows @@ -524,7 +524,7 @@ class DatabaseAPI20Test(unittest.TestCase): self.assertEqual(cur.fetchone(),None, 'cursor.fetchone should return None if no more rows available' ) - self.failUnless(cur.rowcount in (-1,1)) + self.assertTrue(cur.rowcount in (-1,1)) finally: con.close() @@ -580,7 +580,7 @@ class DatabaseAPI20Test(unittest.TestCase): 'cursor.fetchmany should return an empty sequence after ' 'results are exhausted' ) - self.failUnless(cur.rowcount in (-1,6)) + self.assertTrue(cur.rowcount in (-1,6)) # Same as above, using cursor.arraysize cur.arraysize=4 @@ -593,12 +593,12 @@ class DatabaseAPI20Test(unittest.TestCase): self.assertEqual(len(r),2) r = cur.fetchmany() # Should be an empty sequence self.assertEqual(len(r),0) - self.failUnless(cur.rowcount in (-1,6)) + self.assertTrue(cur.rowcount in (-1,6)) cur.arraysize=6 cur.execute('select name from %sbooze' % self.table_prefix) rows = cur.fetchmany() # Should get all rows - self.failUnless(cur.rowcount in (-1,6)) + self.assertTrue(cur.rowcount in (-1,6)) self.assertEqual(len(rows),6) self.assertEqual(len(rows),6) rows = [r[0] for r in rows] @@ -615,7 +615,7 @@ class DatabaseAPI20Test(unittest.TestCase): 'cursor.fetchmany should return an empty sequence if ' 'called after the whole result set has been fetched' ) - self.failUnless(cur.rowcount in (-1,6)) + self.assertTrue(cur.rowcount in (-1,6)) self.executeDDL2(cur) cur.execute('select name from %sbarflys' % self.table_prefix) @@ -624,7 +624,7 @@ class DatabaseAPI20Test(unittest.TestCase): 'cursor.fetchmany should return an empty sequence if ' 'query retrieved no rows' ) - self.failUnless(cur.rowcount in (-1,0)) + self.assertTrue(cur.rowcount in (-1,0)) finally: con.close() @@ -648,7 +648,7 @@ class DatabaseAPI20Test(unittest.TestCase): cur.execute('select name from %sbooze' % self.table_prefix) rows = cur.fetchall() - self.failUnless(cur.rowcount in (-1,len(self.samples))) + self.assertTrue(cur.rowcount in (-1,len(self.samples))) self.assertEqual(len(rows),len(self.samples), 'cursor.fetchall did not retrieve all rows' ) @@ -664,12 +664,12 @@ class DatabaseAPI20Test(unittest.TestCase): 'cursor.fetchall should return an empty list if called ' 'after the whole result set has been fetched' ) - self.failUnless(cur.rowcount in (-1,len(self.samples))) + self.assertTrue(cur.rowcount in (-1,len(self.samples))) self.executeDDL2(cur) cur.execute('select name from %sbarflys' % self.table_prefix) rows = cur.fetchall() - self.failUnless(cur.rowcount in (-1,0)) + self.assertTrue(cur.rowcount in (-1,0)) self.assertEqual(len(rows),0, 'cursor.fetchall should return an empty list if ' 'a select query returns no rows' @@ -691,7 +691,7 @@ class DatabaseAPI20Test(unittest.TestCase): rows23 = cur.fetchmany(2) rows4 = cur.fetchone() rows56 = cur.fetchall() - self.failUnless(cur.rowcount in (-1,6)) + self.assertTrue(cur.rowcount in (-1,6)) self.assertEqual(len(rows23),2, 'fetchmany returned incorrect number of rows' ) @@ -768,7 +768,7 @@ class DatabaseAPI20Test(unittest.TestCase): con = self._connect() try: cur = con.cursor() - self.failUnless(hasattr(cur,'arraysize'), + self.assertTrue(hasattr(cur,'arraysize'), 'cursor.arraysize must be defined' ) finally: @@ -837,26 +837,26 @@ class DatabaseAPI20Test(unittest.TestCase): b = self.driver.Binary(b'') def test_STRING(self): - self.failUnless(hasattr(self.driver,'STRING'), + self.assertTrue(hasattr(self.driver,'STRING'), 'module.STRING must be defined' ) def test_BINARY(self): - self.failUnless(hasattr(self.driver,'BINARY'), + self.assertTrue(hasattr(self.driver,'BINARY'), 'module.BINARY must be defined.' ) def test_NUMBER(self): - self.failUnless(hasattr(self.driver,'NUMBER'), + self.assertTrue(hasattr(self.driver,'NUMBER'), 'module.NUMBER must be defined.' ) def test_DATETIME(self): - self.failUnless(hasattr(self.driver,'DATETIME'), + self.assertTrue(hasattr(self.driver,'DATETIME'), 'module.DATETIME must be defined.' ) def test_ROWID(self): - self.failUnless(hasattr(self.driver,'ROWID'), + self.assertTrue(hasattr(self.driver,'ROWID'), 'module.ROWID must be defined.' ) diff --git a/tests/dbapi20_tpc.py b/tests/dbapi20_tpc.py index fccc7756..f2fa1501 100644 --- a/tests/dbapi20_tpc.py +++ b/tests/dbapi20_tpc.py @@ -28,15 +28,15 @@ class TwoPhaseCommitTests(unittest.TestCase): except self.driver.NotSupportedError: self.fail("Driver does not support transaction IDs.") - self.assertEquals(xid[0], 42) - self.assertEquals(xid[1], "global") - self.assertEquals(xid[2], "bqual") + self.assertEqual(xid[0], 42) + self.assertEqual(xid[1], "global") + self.assertEqual(xid[2], "bqual") # Try some extremes for the transaction ID: xid = con.xid(0, "", "") - self.assertEquals(tuple(xid), (0, "", "")) + self.assertEqual(tuple(xid), (0, "", "")) xid = con.xid(0x7fffffff, "a" * 64, "b" * 64) - self.assertEquals(tuple(xid), (0x7fffffff, "a" * 64, "b" * 64)) + self.assertEqual(tuple(xid), (0x7fffffff, "a" * 64, "b" * 64)) def test_tpc_begin(self): con = self.connect() diff --git a/tests/test_async.py b/tests/test_async.py index ee6651d7..6118a47b 100755 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -77,17 +77,17 @@ class AsyncTests(ConnectingTestCase): sync_cur = self.sync_conn.cursor() del cur, sync_cur - self.assert_(self.conn.async_) - self.assert_(not self.sync_conn.async_) + self.assertTrue(self.conn.async_) + self.assertTrue(not self.sync_conn.async_) # the async connection should be autocommit - self.assert_(self.conn.autocommit) - self.assertEquals(self.conn.isolation_level, ext.ISOLATION_LEVEL_DEFAULT) + self.assertTrue(self.conn.autocommit) + self.assertEqual(self.conn.isolation_level, ext.ISOLATION_LEVEL_DEFAULT) # check other properties to be found on the connection - self.assert_(self.conn.server_version) - self.assert_(self.conn.protocol_version in (2, 3)) - self.assert_(self.conn.encoding in ext.encodings) + self.assertTrue(self.conn.server_version) + self.assertTrue(self.conn.protocol_version in (2, 3)) + self.assertTrue(self.conn.encoding in ext.encodings) def test_async_named_cursor(self): self.assertRaises(psycopg2.ProgrammingError, @@ -102,7 +102,7 @@ class AsyncTests(ConnectingTestCase): self.wait(cur) self.assertFalse(self.conn.isexecuting()) - self.assertEquals(cur.fetchone()[0], "a") + self.assertEqual(cur.fetchone()[0], "a") @slow @skip_before_postgres(8, 2) @@ -133,7 +133,7 @@ class AsyncTests(ConnectingTestCase): cur.execute("select * from table1") self.wait(cur) - self.assertEquals(cur.fetchall()[0][0], 1) + self.assertEqual(cur.fetchall()[0][0], 1) cur.execute("delete from table1") self.wait(cur) @@ -141,7 +141,7 @@ class AsyncTests(ConnectingTestCase): cur.execute("select * from table1") self.wait(cur) - self.assertEquals(cur.fetchone(), None) + self.assertEqual(cur.fetchone(), None) def test_fetch_after_async(self): cur = self.conn.cursor() @@ -152,7 +152,7 @@ class AsyncTests(ConnectingTestCase): cur.fetchall) # but after waiting it should work self.wait(cur) - self.assertEquals(cur.fetchall()[0][0], "a") + self.assertEqual(cur.fetchall()[0][0], "a") def test_rollback_while_async(self): cur = self.conn.cursor() @@ -181,14 +181,14 @@ class AsyncTests(ConnectingTestCase): cur.execute("select * from table1") self.wait(cur) - self.assertEquals(cur.fetchall()[0][0], 1) + self.assertEqual(cur.fetchall()[0][0], 1) cur.execute("delete from table1") self.wait(cur) cur.execute("select * from table1") self.wait(cur) - self.assertEquals(cur.fetchone(), None) + self.assertEqual(cur.fetchone(), None) def test_set_parameters_while_async(self): cur = self.conn.cursor() @@ -197,7 +197,7 @@ class AsyncTests(ConnectingTestCase): self.assertTrue(self.conn.isexecuting()) # getting transaction status works - self.assertEquals(self.conn.info.transaction_status, + self.assertEqual(self.conn.info.transaction_status, ext.TRANSACTION_STATUS_ACTIVE) self.assertTrue(self.conn.isexecuting()) @@ -235,7 +235,7 @@ class AsyncTests(ConnectingTestCase): # but after it's done it should work self.wait(cur) - self.assertEquals(list(cur), [(1, ), (2, ), (3, )]) + self.assertEqual(list(cur), [(1, ), (2, ), (3, )]) self.assertFalse(self.conn.isexecuting()) def test_copy_while_async(self): @@ -275,7 +275,7 @@ class AsyncTests(ConnectingTestCase): # but after it's done it should work self.wait(cur) cur.scroll(1) - self.assertEquals(cur.fetchall(), [(2, ), (3, )]) + self.assertEqual(cur.fetchall(), [(2, ), (3, )]) cur = self.conn.cursor() cur.execute("select id from table1 order by id") @@ -291,7 +291,7 @@ class AsyncTests(ConnectingTestCase): self.wait(cur) cur.scroll(2) cur.scroll(-1) - self.assertEquals(cur.fetchall(), [(2, ), (3, )]) + self.assertEqual(cur.fetchall(), [(2, ), (3, )]) def test_scroll(self): cur = self.sync_conn.cursor() @@ -304,7 +304,7 @@ class AsyncTests(ConnectingTestCase): cur.execute("select id from table1 order by id") cur.scroll(2) cur.scroll(-1) - self.assertEquals(cur.fetchall(), [(2, ), (3, )]) + self.assertEqual(cur.fetchall(), [(2, ), (3, )]) def test_async_dont_read_all(self): cur = self.conn.cursor() @@ -314,7 +314,7 @@ class AsyncTests(ConnectingTestCase): self.wait(cur) # it should be the result of the second query - self.assertEquals(cur.fetchone()[0], "b" * 10000) + self.assertEqual(cur.fetchone()[0], "b" * 10000) def test_async_subclass(self): class MyConn(ext.connection): @@ -322,8 +322,8 @@ class AsyncTests(ConnectingTestCase): ext.connection.__init__(self, dsn, async_=async_) conn = self.connect(connection_factory=MyConn, async_=True) - self.assert_(isinstance(conn, MyConn)) - self.assert_(conn.async_) + self.assertTrue(isinstance(conn, MyConn)) + self.assertTrue(conn.async_) conn.close() @slow @@ -351,7 +351,7 @@ class AsyncTests(ConnectingTestCase): cur.execute("select 1") # polling with a sync query works cur.connection.poll() - self.assertEquals(cur.fetchone()[0], 1) + self.assertEqual(cur.fetchone()[0], 1) @slow @skip_if_crdb("notify") @@ -364,7 +364,7 @@ class AsyncTests(ConnectingTestCase): cur.execute("notify test_notify") self.wait(cur) - self.assertEquals(self.sync_conn.notifies, []) + self.assertEqual(self.sync_conn.notifies, []) pid = self.conn.info.backend_pid for _ in range(5): @@ -372,8 +372,8 @@ class AsyncTests(ConnectingTestCase): if not self.sync_conn.notifies: time.sleep(0.5) continue - self.assertEquals(len(self.sync_conn.notifies), 1) - self.assertEquals(self.sync_conn.notifies.pop(), + self.assertEqual(len(self.sync_conn.notifies), 1) + self.assertEqual(self.sync_conn.notifies.pop(), (pid, "test_notify")) return self.fail("No NOTIFY in 2.5 seconds") @@ -388,7 +388,7 @@ class AsyncTests(ConnectingTestCase): # fetching from a cursor with no results is an error self.assertRaises(psycopg2.ProgrammingError, cur2.fetchone) # fetching from the correct cursor works - self.assertEquals(cur1.fetchone()[0], 1) + self.assertEqual(cur1.fetchone()[0], 1) def test_error(self): cur = self.conn.cursor() @@ -410,7 +410,7 @@ class AsyncTests(ConnectingTestCase): self.wait(cur) cur.execute("select * from table1 order by id") self.wait(cur) - self.assertEquals(cur.fetchall(), [(1, ), (2, ), (3, )]) + self.assertEqual(cur.fetchall(), [(1, ), (2, ), (3, )]) cur.execute("delete from table1") self.wait(cur) @@ -430,7 +430,7 @@ class AsyncTests(ConnectingTestCase): self.assertRaises(psycopg2.ProgrammingError, self.wait, cur) cur2.execute("select 1") self.wait(cur2) - self.assertEquals(cur2.fetchone()[0], 1) + self.assertEqual(cur2.fetchone()[0], 1) @skip_if_crdb("notice") def test_notices(self): @@ -442,7 +442,7 @@ class AsyncTests(ConnectingTestCase): cur.execute("create temp table chatty (id serial primary key);") self.wait(cur) self.assertEqual("CREATE TABLE", cur.statusmessage) - self.assert_(self.conn.notices) + self.assertTrue(self.conn.notices) def test_async_cursor_gone(self): cur = self.conn.cursor() @@ -504,7 +504,7 @@ class AsyncTests(ConnectingTestCase): raise Exception("Unexpected result from poll: %r", state) polls += 1 - self.assert_(polls >= 8, polls) + self.assertTrue(polls >= 8, polls) def test_poll_noop(self): self.conn.poll() diff --git a/tests/test_connection.py b/tests/test_connection.py index c8009cd0..85bf8966 100755 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -61,7 +61,7 @@ class ConnectionTests(ConnectingTestCase): conn = self.conn conn.close() conn.close() - self.assert_(conn.closed) + self.assertTrue(conn.closed) def test_cursor_closed_attribute(self): conn = self.conn @@ -100,19 +100,19 @@ class ConnectionTests(ConnectingTestCase): if self.conn.info.server_version >= 90100: conn.deferrable = False - self.assert_(conn.autocommit) + self.assertTrue(conn.autocommit) self.assertEqual(conn.isolation_level, ext.ISOLATION_LEVEL_SERIALIZABLE) - self.assert_(conn.readonly is True) + self.assertTrue(conn.readonly is True) if self.conn.info.server_version >= 90100: - self.assert_(conn.deferrable is False) + self.assertTrue(conn.deferrable is False) conn.reset() # now the session characteristics should be reverted - self.assert_(not conn.autocommit) + self.assertTrue(not conn.autocommit) self.assertEqual(conn.isolation_level, ext.ISOLATION_LEVEL_DEFAULT) - self.assert_(conn.readonly is None) + self.assertTrue(conn.readonly is None) if self.conn.info.server_version >= 90100: - self.assert_(conn.deferrable is None) + self.assertTrue(conn.deferrable is None) @skip_if_crdb("notice") def test_notices(self): @@ -122,7 +122,7 @@ class ConnectionTests(ConnectingTestCase): cur.execute("set client_min_messages=debug1") cur.execute("create temp table chatty (id serial primary key);") self.assertEqual("CREATE TABLE", cur.statusmessage) - self.assert_(conn.notices) + self.assertTrue(conn.notices) @skip_if_crdb("notice") def test_notices_consistent_order(self): @@ -139,10 +139,10 @@ class ConnectionTests(ConnectingTestCase): create temp table table4 (id serial); """) self.assertEqual(4, len(conn.notices)) - self.assert_('table1' in conn.notices[0]) - self.assert_('table2' in conn.notices[1]) - self.assert_('table3' in conn.notices[2]) - self.assert_('table4' in conn.notices[3]) + self.assertTrue('table1' in conn.notices[0]) + self.assertTrue('table2' in conn.notices[1]) + self.assertTrue('table3' in conn.notices[2]) + self.assertTrue('table4' in conn.notices[3]) @slow @skip_if_crdb("notice") @@ -157,7 +157,7 @@ class ConnectionTests(ConnectingTestCase): cur.execute(sql) self.assertEqual(50, len(conn.notices)) - self.assert_('table99' in conn.notices[-1], conn.notices[-1]) + self.assertTrue('table99' in conn.notices[-1], conn.notices[-1]) @slow @skip_if_crdb("notice") @@ -176,10 +176,10 @@ class ConnectionTests(ConnectingTestCase): create temp table table3 (id serial); create temp table table4 (id serial);""") self.assertEqual(len(conn.notices), 4) - self.assert_('table1' in conn.notices.popleft()) - self.assert_('table2' in conn.notices.popleft()) - self.assert_('table3' in conn.notices.popleft()) - self.assert_('table4' in conn.notices.popleft()) + self.assertTrue('table1' in conn.notices.popleft()) + self.assertTrue('table2' in conn.notices.popleft()) + self.assertTrue('table3' in conn.notices.popleft()) + self.assertTrue('table4' in conn.notices.popleft()) self.assertEqual(len(conn.notices), 0) # not limited, but no error @@ -204,10 +204,10 @@ class ConnectionTests(ConnectingTestCase): self.assertEqual(self.conn.notices, None) def test_server_version(self): - self.assert_(self.conn.server_version) + self.assertTrue(self.conn.server_version) def test_protocol_version(self): - self.assert_(self.conn.protocol_version in (2, 3), + self.assertTrue(self.conn.protocol_version in (2, 3), self.conn.protocol_version) def test_tpc_unsupported(self): @@ -235,7 +235,7 @@ class ConnectionTests(ConnectingTestCase): t2.start() t1.join() t2.join() - self.assert_(time.time() - t0 < 7, + self.assertTrue(time.time() - t0 < 7, "something broken in concurrency") @skip_if_crdb("encoding") @@ -273,7 +273,7 @@ class ConnectionTests(ConnectingTestCase): conn.close() del conn gc.collect() - self.assert_(w() is None) + self.assertTrue(w() is None) @slow def test_commit_concurrency(self): @@ -302,7 +302,7 @@ class ConnectionTests(ConnectingTestCase): # Stop the committer thread stop.append(True) - self.assert_(not notices, f"{len(notices)} notices raised") + self.assertTrue(not notices, f"{len(notices)} notices raised") def test_connect_cursor_factory(self): conn = self.connect(cursor_factory=psycopg2.extras.DictCursor) @@ -348,8 +348,8 @@ class ConnectionTests(ConnectingTestCase): pass c = SubConnection("dbname=thereisnosuchdatabasemate password=foobar") - self.assert_(c.closed, "connection failed so it must be closed") - self.assert_('foobar' not in c.dsn, "password was not obscured") + self.assertTrue(c.closed, "connection failed so it must be closed") + self.assertTrue('foobar' not in c.dsn, "password was not obscured") def test_get_native_connection(self): conn = self.connect() @@ -359,7 +359,7 @@ class ConnectionTests(ConnectingTestCase): def test_pgconn_ptr(self): conn = self.connect() - self.assert_(conn.pgconn_ptr is not None) + self.assertTrue(conn.pgconn_ptr is not None) try: f = self.libpq.PQserverVersion @@ -376,7 +376,7 @@ class ConnectionTests(ConnectingTestCase): self.assertEqual(ver, conn.server_version) conn.close() - self.assert_(conn.pgconn_ptr is None) + self.assertTrue(conn.pgconn_ptr is None) @slow def test_multiprocess_close(self): @@ -434,7 +434,7 @@ class ParseDsnTestCase(ConnectingTestCase): dict(user='tester', password='secret', dbname='test 2'), "DSN with quoting parsed") - # Can't really use assertRaisesRegexp() here since we need to + # Can't really use assertRaisesRegex() here since we need to # make sure that secret is *not* exposed in the error message. raised = False try: @@ -559,7 +559,7 @@ class MakeDsnTestCase(ConnectingTestCase): conn = self.connect() d = conn.get_dsn_parameters() self.assertEqual(d['dbname'], dbname) # the only param we can check reliably - self.assert_('password' not in d, d) + self.assertTrue('password' not in d, d) class IsolationLevelsTestCase(ConnectingTestCase): @@ -593,7 +593,7 @@ class IsolationLevelsTestCase(ConnectingTestCase): def test_encoding(self): conn = self.connect() - self.assert_(conn.encoding in ext.encodings) + self.assertTrue(conn.encoding in ext.encodings) @skip_if_crdb("isolation level") def test_set_isolation_level(self): @@ -634,11 +634,11 @@ class IsolationLevelsTestCase(ConnectingTestCase): conn.set_isolation_level(ext.ISOLATION_LEVEL_AUTOCOMMIT) self.assertEqual(conn.isolation_level, ext.ISOLATION_LEVEL_DEFAULT) - self.assert_(conn.autocommit) + self.assertTrue(conn.autocommit) conn.isolation_level = 'serializable' self.assertEqual(conn.isolation_level, ext.ISOLATION_LEVEL_SERIALIZABLE) - self.assert_(conn.autocommit) + self.assertTrue(conn.autocommit) curs.execute('show transaction_isolation;') self.assertEqual(curs.fetchone()[0], 'serializable') @@ -1350,11 +1350,11 @@ class TransactionControlTests(ConnectingTestCase): self.assertRaises(ValueError, self.conn.set_session, 'whatever') def test_set_read_only(self): - self.assert_(self.conn.readonly is None) + self.assertTrue(self.conn.readonly is None) cur = self.conn.cursor() self.conn.set_session(readonly=True) - self.assert_(self.conn.readonly is True) + self.assertTrue(self.conn.readonly is True) cur.execute("SHOW transaction_read_only;") self.assertEqual(cur.fetchone()[0], 'on') self.conn.rollback() @@ -1363,7 +1363,7 @@ class TransactionControlTests(ConnectingTestCase): self.conn.rollback() self.conn.set_session(readonly=False) - self.assert_(self.conn.readonly is False) + self.assertTrue(self.conn.readonly is False) cur.execute("SHOW transaction_read_only;") self.assertEqual(cur.fetchone()[0], 'off') self.conn.rollback() @@ -1371,12 +1371,12 @@ class TransactionControlTests(ConnectingTestCase): def test_setattr_read_only(self): cur = self.conn.cursor() self.conn.readonly = True - self.assert_(self.conn.readonly is True) + self.assertTrue(self.conn.readonly is True) cur.execute("SHOW transaction_read_only;") self.assertEqual(cur.fetchone()[0], 'on') self.assertRaises(self.conn.ProgrammingError, setattr, self.conn, 'readonly', False) - self.assert_(self.conn.readonly is True) + self.assertTrue(self.conn.readonly is True) self.conn.rollback() cur.execute("SHOW transaction_read_only;") self.assertEqual(cur.fetchone()[0], 'on') @@ -1384,13 +1384,13 @@ class TransactionControlTests(ConnectingTestCase): cur = self.conn.cursor() self.conn.readonly = None - self.assert_(self.conn.readonly is None) + self.assertTrue(self.conn.readonly is None) cur.execute("SHOW transaction_read_only;") self.assertEqual(cur.fetchone()[0], 'off') # assume defined by server self.conn.rollback() self.conn.readonly = False - self.assert_(self.conn.readonly is False) + self.assertTrue(self.conn.readonly is False) cur.execute("SHOW transaction_read_only;") self.assertEqual(cur.fetchone()[0], 'off') self.conn.rollback() @@ -1413,10 +1413,10 @@ class TransactionControlTests(ConnectingTestCase): @skip_before_postgres(9, 1) def test_set_deferrable(self): - self.assert_(self.conn.deferrable is None) + self.assertTrue(self.conn.deferrable is None) cur = self.conn.cursor() self.conn.set_session(readonly=True, deferrable=True) - self.assert_(self.conn.deferrable is True) + self.assertTrue(self.conn.deferrable is True) cur.execute("SHOW transaction_read_only;") self.assertEqual(cur.fetchone()[0], 'on') cur.execute("SHOW transaction_deferrable;") @@ -1427,7 +1427,7 @@ class TransactionControlTests(ConnectingTestCase): self.conn.rollback() self.conn.set_session(deferrable=False) - self.assert_(self.conn.deferrable is False) + self.assertTrue(self.conn.deferrable is False) cur.execute("SHOW transaction_read_only;") self.assertEqual(cur.fetchone()[0], 'on') cur.execute("SHOW transaction_deferrable;") @@ -1445,12 +1445,12 @@ class TransactionControlTests(ConnectingTestCase): def test_setattr_deferrable(self): cur = self.conn.cursor() self.conn.deferrable = True - self.assert_(self.conn.deferrable is True) + self.assertTrue(self.conn.deferrable is True) cur.execute("SHOW transaction_deferrable;") self.assertEqual(cur.fetchone()[0], 'on') self.assertRaises(self.conn.ProgrammingError, setattr, self.conn, 'deferrable', False) - self.assert_(self.conn.deferrable is True) + self.assertTrue(self.conn.deferrable is True) self.conn.rollback() cur.execute("SHOW transaction_deferrable;") self.assertEqual(cur.fetchone()[0], 'on') @@ -1458,13 +1458,13 @@ class TransactionControlTests(ConnectingTestCase): cur = self.conn.cursor() self.conn.deferrable = None - self.assert_(self.conn.deferrable is None) + self.assertTrue(self.conn.deferrable is None) cur.execute("SHOW transaction_deferrable;") self.assertEqual(cur.fetchone()[0], 'off') # assume defined by server self.conn.rollback() self.conn.deferrable = False - self.assert_(self.conn.deferrable is False) + self.assertTrue(self.conn.deferrable is False) cur.execute("SHOW transaction_deferrable;") self.assertEqual(cur.fetchone()[0], 'off') self.conn.rollback() @@ -1555,7 +1555,7 @@ class TestEncryptPassword(ConnectingTestCase): @skip_before_libpq(10) def test_encrypt_scram(self): - self.assert_( + self.assertTrue( ext.encrypt_password( 'psycopg2', 'ashesh', self.conn, 'scram-sha-256') .startswith('SCRAM-SHA-256$')) @@ -1589,12 +1589,12 @@ class AutocommitTests(ConnectingTestCase): # to make it consistent with other methods; meanwhile let's just check # it doesn't explode. try: - self.assert_(self.conn.autocommit in (True, False)) + self.assertTrue(self.conn.autocommit in (True, False)) except psycopg2.InterfaceError: pass def test_default_no_autocommit(self): - self.assert_(not self.conn.autocommit) + self.assertTrue(not self.conn.autocommit) self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertEqual(self.conn.info.transaction_status, ext.TRANSACTION_STATUS_IDLE) @@ -1612,7 +1612,7 @@ class AutocommitTests(ConnectingTestCase): def test_set_autocommit(self): self.conn.autocommit = True - self.assert_(self.conn.autocommit) + self.assertTrue(self.conn.autocommit) self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertEqual(self.conn.info.transaction_status, ext.TRANSACTION_STATUS_IDLE) @@ -1624,7 +1624,7 @@ class AutocommitTests(ConnectingTestCase): ext.TRANSACTION_STATUS_IDLE) self.conn.autocommit = False - self.assert_(not self.conn.autocommit) + self.assertTrue(not self.conn.autocommit) self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertEqual(self.conn.info.transaction_status, ext.TRANSACTION_STATUS_IDLE) @@ -1642,7 +1642,7 @@ class AutocommitTests(ConnectingTestCase): def test_set_session_autocommit(self): self.conn.set_session(autocommit=True) - self.assert_(self.conn.autocommit) + self.assertTrue(self.conn.autocommit) self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertEqual(self.conn.info.transaction_status, ext.TRANSACTION_STATUS_IDLE) @@ -1654,7 +1654,7 @@ class AutocommitTests(ConnectingTestCase): ext.TRANSACTION_STATUS_IDLE) self.conn.set_session(autocommit=False) - self.assert_(not self.conn.autocommit) + self.assertTrue(not self.conn.autocommit) self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertEqual(self.conn.info.transaction_status, ext.TRANSACTION_STATUS_IDLE) @@ -1666,7 +1666,7 @@ class AutocommitTests(ConnectingTestCase): self.conn.rollback() self.conn.set_session('serializable', readonly=True, autocommit=True) - self.assert_(self.conn.autocommit) + self.assertTrue(self.conn.autocommit) cur.execute('select 1;') self.assertEqual(self.conn.status, ext.STATUS_READY) self.assertEqual(self.conn.info.transaction_status, @@ -1773,7 +1773,7 @@ while True: self.assertNotEqual(proc.returncode, 0) # Strip [NNN refs] from output err = re.sub(br'\[[^\]]+\]', b'', err).strip() - self.assert_(not err, err) + self.assertTrue(not err, err) class TestConnectionInfo(ConnectingTestCase): @@ -1789,42 +1789,42 @@ class TestConnectionInfo(ConnectingTestCase): self.bconn = self.connect(connection_factory=BrokenConn) def test_dbname(self): - self.assert_(isinstance(self.conn.info.dbname, str)) - self.assert_(self.bconn.info.dbname is None) + self.assertTrue(isinstance(self.conn.info.dbname, str)) + self.assertTrue(self.bconn.info.dbname is None) def test_user(self): cur = self.conn.cursor() cur.execute("select user") self.assertEqual(self.conn.info.user, cur.fetchone()[0]) - self.assert_(self.bconn.info.user is None) + self.assertTrue(self.bconn.info.user is None) def test_password(self): - self.assert_(isinstance(self.conn.info.password, str)) - self.assert_(self.bconn.info.password is None) + self.assertTrue(isinstance(self.conn.info.password, str)) + self.assertTrue(self.bconn.info.password is None) def test_host(self): expected = dbhost if dbhost else "/" self.assertIn(expected, self.conn.info.host) - self.assert_(self.bconn.info.host is None) + self.assertTrue(self.bconn.info.host is None) def test_host_readonly(self): with self.assertRaises(AttributeError): self.conn.info.host = 'override' def test_port(self): - self.assert_(isinstance(self.conn.info.port, int)) - self.assert_(self.bconn.info.port is None) + self.assertTrue(isinstance(self.conn.info.port, int)) + self.assertTrue(self.bconn.info.port is None) def test_options(self): - self.assert_(isinstance(self.conn.info.options, str)) - self.assert_(self.bconn.info.options is None) + self.assertTrue(isinstance(self.conn.info.options, str)) + self.assertTrue(self.bconn.info.options is None) @skip_before_libpq(9, 3) def test_dsn_parameters(self): d = self.conn.info.dsn_parameters - self.assert_(isinstance(d, dict)) + self.assertTrue(isinstance(d, dict)) self.assertEqual(d['dbname'], dbname) # the only param we can check reliably - self.assert_('password' not in d, d) + self.assertTrue('password' not in d, d) def test_status(self): self.assertEqual(self.conn.info.status, 0) @@ -1861,7 +1861,7 @@ class TestConnectionInfo(ConnectingTestCase): try: cur.execute("show server_version_num") except psycopg2.DatabaseError: - self.assert_(isinstance(self.conn.info.server_version, int)) + self.assertTrue(isinstance(self.conn.info.server_version, int)) else: self.assertEqual( self.conn.info.server_version, int(cur.fetchone()[0])) @@ -1878,11 +1878,11 @@ class TestConnectionInfo(ConnectingTestCase): except psycopg2.DatabaseError: pass - self.assert_('nosuchtable' in self.conn.info.error_message) + self.assertTrue('nosuchtable' in self.conn.info.error_message) def test_socket(self): - self.assert_(self.conn.info.socket >= 0) - self.assert_(self.bconn.info.socket < 0) + self.assertTrue(self.conn.info.socket >= 0) + self.assertTrue(self.bconn.info.socket < 0) @skip_if_crdb("backend pid") def test_backend_pid(self): @@ -1890,12 +1890,12 @@ class TestConnectionInfo(ConnectingTestCase): try: cur.execute("select pg_backend_pid()") except psycopg2.DatabaseError: - self.assert_(self.conn.info.backend_pid > 0) + self.assertTrue(self.conn.info.backend_pid > 0) else: self.assertEqual( self.conn.info.backend_pid, int(cur.fetchone()[0])) - self.assert_(self.bconn.info.backend_pid == 0) + self.assertTrue(self.bconn.info.backend_pid == 0) def test_needs_password(self): self.assertIs(self.conn.info.needs_password, False) @@ -1922,7 +1922,7 @@ class TestConnectionInfo(ConnectingTestCase): @skip_before_libpq(9, 5) def test_ssl_attribute(self): attribs = self.conn.info.ssl_attribute_names - self.assert_(attribs) + self.assertTrue(attribs) if self.conn.info.ssl_in_use: for attrib in attribs: self.assertIsInstance(self.conn.info.ssl_attribute(attrib), str) diff --git a/tests/test_copy.py b/tests/test_copy.py index a36cf918..ef7e5589 100755 --- a/tests/test_copy.py +++ b/tests/test_copy.py @@ -314,23 +314,23 @@ class CopyTests(ConnectingTestCase): curs = self.conn.cursor() curs.copy_from(StringIO('aaa\nbbb\nccc\n'), 'tcopy', columns=['data']) - self.assert_(b"copy " in curs.query.lower()) - self.assert_(b" from stdin" in curs.query.lower()) + self.assertTrue(b"copy " in curs.query.lower()) + self.assertTrue(b" from stdin" in curs.query.lower()) curs.copy_expert( "copy tcopy (data) from stdin", StringIO('ddd\neee\n')) - self.assert_(b"copy " in curs.query.lower()) - self.assert_(b" from stdin" in curs.query.lower()) + self.assertTrue(b"copy " in curs.query.lower()) + self.assertTrue(b" from stdin" in curs.query.lower()) curs.copy_to(StringIO(), "tcopy") - self.assert_(b"copy " in curs.query.lower()) - self.assert_(b" to stdout" in curs.query.lower()) + self.assertTrue(b"copy " in curs.query.lower()) + self.assertTrue(b" to stdout" in curs.query.lower()) curs.execute("insert into tcopy (data) values ('fff')") curs.copy_expert("copy tcopy to stdout", StringIO()) - self.assert_(b"copy " in curs.query.lower()) - self.assert_(b" to stdout" in curs.query.lower()) + self.assertTrue(b"copy " in curs.query.lower()) + self.assertTrue(b" to stdout" in curs.query.lower()) @slow def test_copy_from_segfault(self): @@ -383,7 +383,7 @@ conn.close() try: curs.copy_from(BrokenRead(), "tcopy") except Exception as e: - self.assert_('ZeroDivisionError' in str(e)) + self.assertTrue('ZeroDivisionError' in str(e)) def test_copy_to_propagate_error(self): class BrokenWrite(TextIOBase): diff --git a/tests/test_cursor.py b/tests/test_cursor.py index 04f535a5..2836be90 100755 --- a/tests/test_cursor.py +++ b/tests/test_cursor.py @@ -47,7 +47,7 @@ class CursorTests(ConnectingTestCase): cur = self.conn.cursor() cur.close() cur.close() - self.assert_(cur.closed) + self.assertTrue(cur.closed) def test_empty_query(self): cur = self.conn.cursor() @@ -170,7 +170,7 @@ class CursorTests(ConnectingTestCase): w = ref(curs) del curs gc.collect() - self.assert_(w() is None) + self.assertTrue(w() is None) def test_null_name(self): curs = self.conn.cursor(None) @@ -188,27 +188,27 @@ class CursorTests(ConnectingTestCase): self.assertEqual(len(c), 7) # DBAPI happy for a in ('name', 'type_code', 'display_size', 'internal_size', 'precision', 'scale', 'null_ok'): - self.assert_(hasattr(c, a), a) + self.assertTrue(hasattr(c, a), a) c = curs.description[0] self.assertEqual(c.name, 'pi') - self.assert_(c.type_code in psycopg2.extensions.DECIMAL.values) + self.assertTrue(c.type_code in psycopg2.extensions.DECIMAL.values) if crdb_version(self.conn) is None: - self.assert_(c.internal_size > 0) + self.assertTrue(c.internal_size > 0) self.assertEqual(c.precision, 10) self.assertEqual(c.scale, 2) c = curs.description[1] self.assertEqual(c.name, 'hi') - self.assert_(c.type_code in psycopg2.STRING.values) - self.assert_(c.internal_size < 0) + self.assertTrue(c.type_code in psycopg2.STRING.values) + self.assertTrue(c.internal_size < 0) self.assertEqual(c.precision, None) self.assertEqual(c.scale, None) c = curs.description[2] self.assertEqual(c.name, 'now') - self.assert_(c.type_code in psycopg2.extensions.DATE.values) - self.assert_(c.internal_size > 0) + self.assertTrue(c.type_code in psycopg2.extensions.DATE.values) + self.assertTrue(c.internal_size > 0) self.assertEqual(c.precision, None) self.assertEqual(c.scale, None) @@ -301,7 +301,7 @@ class CursorTests(ConnectingTestCase): # Make sure callproc works right cur.callproc(procname, {paramname: 2}) - self.assertEquals(cur.fetchone()[0], 4) + self.assertEqual(cur.fetchone()[0], 4) # Make sure callproc fails right failing_cases = [ @@ -393,10 +393,10 @@ class CursorTests(ConnectingTestCase): @skip_before_postgres(9) def test_pgresult_ptr(self): curs = self.conn.cursor() - self.assert_(curs.pgresult_ptr is None) + self.assertTrue(curs.pgresult_ptr is None) curs.execute("select 'x'") - self.assert_(curs.pgresult_ptr is not None) + self.assertTrue(curs.pgresult_ptr is not None) try: f = self.libpq.PQcmdStatus @@ -409,7 +409,7 @@ class CursorTests(ConnectingTestCase): self.assertEqual(status, b'SELECT 1') curs.close() - self.assert_(curs.pgresult_ptr is None) + self.assertTrue(curs.pgresult_ptr is None) @skip_if_crdb("named cursor") @@ -582,7 +582,7 @@ class NamedCursorTests(ConnectingTestCase): t1 = next(i)[0] time.sleep(0.2) t2 = next(i)[0] - self.assert_((t2 - t1).microseconds * 1e-6 < 0.1, + self.assertTrue((t2 - t1).microseconds * 1e-6 < 0.1, f"named cursor records fetched in 2 roundtrips (delta: {t2 - t1})") @skip_before_postgres(8, 0) diff --git a/tests/test_dates.py b/tests/test_dates.py index 7f52a916..fc9c596e 100755 --- a/tests/test_dates.py +++ b/tests/test_dates.py @@ -47,7 +47,7 @@ class CommonDatetimeTestsMixin: def test_parse_date(self): value = self.DATE('2007-01-01', self.curs) - self.assert_(value is not None) + self.assertTrue(value is not None) self.assertEqual(value.year, 2007) self.assertEqual(value.month, 1) self.assertEqual(value.day, 1) @@ -62,7 +62,7 @@ class CommonDatetimeTestsMixin: def test_parse_time(self): value = self.TIME('13:30:29', self.curs) - self.assert_(value is not None) + self.assertTrue(value is not None) self.assertEqual(value.hour, 13) self.assertEqual(value.minute, 30) self.assertEqual(value.second, 29) @@ -77,7 +77,7 @@ class CommonDatetimeTestsMixin: def test_parse_datetime(self): value = self.DATETIME('2007-01-01 13:30:29', self.curs) - self.assert_(value is not None) + self.assertTrue(value is not None) self.assertEqual(value.year, 2007) self.assertEqual(value.month, 1) self.assertEqual(value.day, 1) @@ -195,7 +195,7 @@ class DatetimeTests(ConnectingTestCase, CommonDatetimeTestsMixin): def test_default_tzinfo(self): self.curs.execute("select '2000-01-01 00:00+02:00'::timestamptz") dt = self.curs.fetchone()[0] - self.assert_(isinstance(dt.tzinfo, timezone)) + self.assertTrue(isinstance(dt.tzinfo, timezone)) self.assertEqual(dt, datetime(2000, 1, 1, tzinfo=timezone(timedelta(minutes=120)))) @@ -203,8 +203,8 @@ class DatetimeTests(ConnectingTestCase, CommonDatetimeTestsMixin): self.curs.tzinfo_factory = FixedOffsetTimezone self.curs.execute("select '2000-01-01 00:00+02:00'::timestamptz") dt = self.curs.fetchone()[0] - self.assert_(not isinstance(dt.tzinfo, timezone)) - self.assert_(isinstance(dt.tzinfo, FixedOffsetTimezone)) + self.assertTrue(not isinstance(dt.tzinfo, timezone)) + self.assertTrue(isinstance(dt.tzinfo, FixedOffsetTimezone)) self.assertEqual(dt, datetime(2000, 1, 1, tzinfo=timezone(timedelta(minutes=120)))) @@ -421,20 +421,20 @@ class DatetimeTests(ConnectingTestCase, CommonDatetimeTestsMixin): @skip_if_crdb("infinity date") def test_adapt_infinity_tz(self): t = self.execute("select 'infinity'::timestamp") - self.assert_(t.tzinfo is None) - self.assert_(t > datetime(4000, 1, 1)) + self.assertTrue(t.tzinfo is None) + self.assertTrue(t > datetime(4000, 1, 1)) t = self.execute("select '-infinity'::timestamp") - self.assert_(t.tzinfo is None) - self.assert_(t < datetime(1000, 1, 1)) + self.assertTrue(t.tzinfo is None) + self.assertTrue(t < datetime(1000, 1, 1)) t = self.execute("select 'infinity'::timestamptz") - self.assert_(t.tzinfo is not None) - self.assert_(t > datetime(4000, 1, 1, tzinfo=timezone(timedelta(0)))) + self.assertTrue(t.tzinfo is not None) + self.assertTrue(t > datetime(4000, 1, 1, tzinfo=timezone(timedelta(0)))) t = self.execute("select '-infinity'::timestamptz") - self.assert_(t.tzinfo is not None) - self.assert_(t < datetime(1000, 1, 1, tzinfo=timezone(timedelta(0)))) + self.assertTrue(t.tzinfo is not None) + self.assertTrue(t < datetime(1000, 1, 1, tzinfo=timezone(timedelta(0)))) def test_redshift_day(self): # Redshift is reported returning 1 day interval as microsec (bug #558) @@ -478,7 +478,7 @@ class FromTicksTestCase(unittest.TestCase): def test_date_value_error_sec_59_99(self): s = psycopg2.DateFromTicks(1273173119.99992) # The returned date is local - self.assert_(s.adapted in [date(2010, 5, 6), date(2010, 5, 7)]) + self.assertTrue(s.adapted in [date(2010, 5, 6), date(2010, 5, 7)]) def test_time_value_error_sec_59_99(self): s = psycopg2.TimeFromTicks(1273173119.99992) @@ -490,8 +490,8 @@ class FixedOffsetTimezoneTests(unittest.TestCase): def test_init_with_no_args(self): tzinfo = FixedOffsetTimezone() - self.assert_(tzinfo._offset is ZERO) - self.assert_(tzinfo._name is None) + self.assertTrue(tzinfo._offset is ZERO) + self.assertTrue(tzinfo._name is None) def test_repr_with_positive_offset(self): tzinfo = FixedOffsetTimezone(5 * 60) @@ -519,15 +519,15 @@ class FixedOffsetTimezoneTests(unittest.TestCase): % timedelta(0)) def test_instance_caching(self): - self.assert_(FixedOffsetTimezone(name="FOO") + self.assertTrue(FixedOffsetTimezone(name="FOO") is FixedOffsetTimezone(name="FOO")) - self.assert_(FixedOffsetTimezone(7 * 60) + self.assertTrue(FixedOffsetTimezone(7 * 60) is FixedOffsetTimezone(7 * 60)) - self.assert_(FixedOffsetTimezone(-9 * 60, 'FOO') + self.assertTrue(FixedOffsetTimezone(-9 * 60, 'FOO') is FixedOffsetTimezone(-9 * 60, 'FOO')) - self.assert_(FixedOffsetTimezone(9 * 60) + self.assertTrue(FixedOffsetTimezone(9 * 60) is not FixedOffsetTimezone(9 * 60, 'FOO')) - self.assert_(FixedOffsetTimezone(name='FOO') + self.assertTrue(FixedOffsetTimezone(name='FOO') is not FixedOffsetTimezone(9 * 60, 'FOO')) def test_pickle(self): diff --git a/tests/test_errors.py b/tests/test_errors.py index e3b5b237..604f979c 100755 --- a/tests/test_errors.py +++ b/tests/test_errors.py @@ -40,8 +40,8 @@ class ErrorsTests(ConnectingTestCase): except psycopg2.Error as exc: e = exc - self.assert_(isinstance(e, UndefinedTable), type(e)) - self.assert_(isinstance(e, self.conn.ProgrammingError)) + self.assertTrue(isinstance(e, UndefinedTable), type(e)) + self.assertTrue(isinstance(e, self.conn.ProgrammingError)) def test_exception_class_fallback(self): cur = self.conn.cursor() @@ -65,9 +65,9 @@ class ErrorsTests(ConnectingTestCase): def test_connection_exceptions_backwards_compatibility(self): err = errors.lookup('08000') # connection exceptions are classified as operational errors - self.assert_(issubclass(err, errors.OperationalError)) + self.assertTrue(issubclass(err, errors.OperationalError)) # previously these errors were classified only as DatabaseError - self.assert_(issubclass(err, errors.DatabaseError)) + self.assertTrue(issubclass(err, errors.DatabaseError)) def test_has_base_exceptions(self): excs = [] @@ -76,14 +76,14 @@ class ErrorsTests(ConnectingTestCase): if isinstance(obj, type) and issubclass(obj, Exception): excs.append(obj) - self.assert_(len(excs) > 8, str(excs)) + self.assertTrue(len(excs) > 8, str(excs)) excs.append(psycopg2.extensions.QueryCanceledError) excs.append(psycopg2.extensions.TransactionRollbackError) for exc in excs: - self.assert_(hasattr(errors, exc.__name__)) - self.assert_(getattr(errors, exc.__name__) is exc) + self.assertTrue(hasattr(errors, exc.__name__)) + self.assertTrue(getattr(errors, exc.__name__) is exc) def test_suite(): diff --git a/tests/test_extras_dictcursor.py b/tests/test_extras_dictcursor.py index 186daf30..0e564670 100755 --- a/tests/test_extras_dictcursor.py +++ b/tests/test_extras_dictcursor.py @@ -57,8 +57,8 @@ class _DictCursorBase(ConnectingTestCase): recs.append(t) # check that the dataset was not fetched in a single gulp - self.assert_(recs[1]['ts'] - recs[0]['ts'] < timedelta(seconds=0.005)) - self.assert_(recs[2]['ts'] - recs[1]['ts'] > timedelta(seconds=0.0099)) + self.assertTrue(recs[1]['ts'] - recs[0]['ts'] < timedelta(seconds=0.005)) + self.assertTrue(recs[2]['ts'] - recs[1]['ts'] > timedelta(seconds=0.0099)) class ExtrasDictCursorTests(_DictCursorBase): @@ -69,13 +69,13 @@ class ExtrasDictCursorTests(_DictCursorBase): self.conn.close() self.conn = self.connect(connection_factory=psycopg2.extras.DictConnection) cur = self.conn.cursor() - self.assert_(isinstance(cur, psycopg2.extras.DictCursor)) + self.assertTrue(isinstance(cur, psycopg2.extras.DictCursor)) self.assertEqual(cur.name, None) # overridable cur = self.conn.cursor('foo', cursor_factory=psycopg2.extras.NamedTupleCursor) self.assertEqual(cur.name, 'foo') - self.assert_(isinstance(cur, psycopg2.extras.NamedTupleCursor)) + self.assertTrue(isinstance(cur, psycopg2.extras.NamedTupleCursor)) def testDictCursorWithPlainCursorFetchOne(self): self._testWithPlainCursor(lambda curs: curs.fetchone()) @@ -98,8 +98,8 @@ class ExtrasDictCursorTests(_DictCursorBase): def testUpdateRow(self): row = self._testWithPlainCursor(lambda curs: curs.fetchone()) row['foo'] = 'qux' - self.failUnless(row['foo'] == 'qux') - self.failUnless(row[0] == 'qux') + self.assertTrue(row['foo'] == 'qux') + self.assertTrue(row[0] == 'qux') @skip_before_postgres(8, 0) def testDictCursorWithPlainCursorIterRowNumber(self): @@ -110,8 +110,8 @@ class ExtrasDictCursorTests(_DictCursorBase): curs = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) curs.execute("SELECT * FROM ExtrasDictCursorTests") row = getter(curs) - self.failUnless(row['foo'] == 'bar') - self.failUnless(row[0] == 'bar') + self.assertTrue(row['foo'] == 'bar') + self.assertTrue(row[0] == 'bar') return row def testDictCursorWithNamedCursorFetchOne(self): @@ -149,8 +149,8 @@ class ExtrasDictCursorTests(_DictCursorBase): curs = self.conn.cursor('aname', cursor_factory=psycopg2.extras.DictCursor) curs.execute("SELECT * FROM ExtrasDictCursorTests") row = getter(curs) - self.failUnless(row['foo'] == 'bar') - self.failUnless(row[0] == 'bar') + self.assertTrue(row['foo'] == 'bar') + self.assertTrue(row[0] == 'bar') def testPickleDictRow(self): curs = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) @@ -183,11 +183,11 @@ class ExtrasDictCursorTests(_DictCursorBase): curs = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor) curs.execute("select 10 as a, 20 as b") r = curs.fetchone() - self.assert_(not isinstance(r.keys(), list)) + self.assertTrue(not isinstance(r.keys(), list)) self.assertEqual(len(list(r.keys())), 2) - self.assert_(not isinstance(r.values(), list)) + self.assertTrue(not isinstance(r.values(), list)) self.assertEqual(len(list(r.values())), 2) - self.assert_(not isinstance(r.items(), list)) + self.assertTrue(not isinstance(r.items(), list)) self.assertEqual(len(list(r.items())), 2) def test_order(self): @@ -212,7 +212,7 @@ class ExtrasDictCursorRealTests(_DictCursorBase): curs = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) curs.execute("SELECT * FROM ExtrasDictCursorTests") row = curs.fetchone() - self.assert_(isinstance(row, dict)) + self.assertTrue(isinstance(row, dict)) def testDictCursorWithPlainCursorRealFetchOne(self): self._testWithPlainCursorReal(lambda curs: curs.fetchone()) @@ -241,7 +241,7 @@ class ExtrasDictCursorRealTests(_DictCursorBase): curs = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) curs.execute("SELECT * FROM ExtrasDictCursorTests") row = getter(curs) - self.failUnless(row['foo'] == 'bar') + self.assertTrue(row['foo'] == 'bar') def testPickleRealDictRow(self): curs = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) @@ -303,17 +303,17 @@ class ExtrasDictCursorRealTests(_DictCursorBase): cursor_factory=psycopg2.extras.RealDictCursor) curs.execute("SELECT * FROM ExtrasDictCursorTests") row = getter(curs) - self.failUnless(row['foo'] == 'bar') + self.assertTrue(row['foo'] == 'bar') def test_iter_methods(self): curs = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor) curs.execute("select 10 as a, 20 as b") r = curs.fetchone() - self.assert_(not isinstance(r.keys(), list)) + self.assertTrue(not isinstance(r.keys(), list)) self.assertEqual(len(list(r.keys())), 2) - self.assert_(not isinstance(r.values(), list)) + self.assertTrue(not isinstance(r.values(), list)) self.assertEqual(len(list(r.values())), 2) - self.assert_(not isinstance(r.items(), list)) + self.assertTrue(not isinstance(r.items(), list)) self.assertEqual(len(list(r.items())), 2) def test_order(self): @@ -380,7 +380,7 @@ class NamedTupleCursorTest(ConnectingTestCase): def test_cursor_args(self): cur = self.conn.cursor('foo', cursor_factory=psycopg2.extras.DictCursor) self.assertEqual(cur.name, 'foo') - self.assert_(isinstance(cur, psycopg2.extras.DictCursor)) + self.assertTrue(isinstance(cur, psycopg2.extras.DictCursor)) def test_fetchone(self): curs = self.conn.cursor() @@ -577,8 +577,8 @@ class NamedTupleCursorTest(ConnectingTestCase): recs.append(t) # check that the dataset was not fetched in a single gulp - self.assert_(recs[1].ts - recs[0].ts < timedelta(seconds=0.005)) - self.assert_(recs[2].ts - recs[1].ts > timedelta(seconds=0.0099)) + self.assertTrue(recs[1].ts - recs[0].ts < timedelta(seconds=0.005)) + self.assertTrue(recs[2].ts - recs[1].ts > timedelta(seconds=0.0099)) @skip_if_crdb("named cursor") @skip_before_postgres(8, 0) @@ -605,8 +605,8 @@ class NamedTupleCursorTest(ConnectingTestCase): curs.execute("select 10 as a, 30 as b") r3 = curs.fetchone() - self.assert_(type(r1) is type(r3)) - self.assert_(type(r1) is not type(r2)) + self.assertTrue(type(r1) is type(r3)) + self.assertTrue(type(r1) is not type(r2)) cache_info = NamedTupleCursor._cached_make_nt.cache_info() self.assertEqual(cache_info.hits, 1) @@ -627,12 +627,12 @@ class NamedTupleCursorTest(ConnectingTestCase): # Still in cache curs.execute("select 1 as f9") rec = curs.fetchone() - self.assert_(any(type(r) is type(rec) for r in recs)) + self.assertTrue(any(type(r) is type(rec) for r in recs)) # Gone from cache curs.execute("select 1 as f0") rec = curs.fetchone() - self.assert_(all(type(r) is not type(rec) for r in recs)) + self.assertTrue(all(type(r) is not type(rec) for r in recs)) finally: NamedTupleCursor._cached_make_nt = old_func diff --git a/tests/test_fast_executemany.py b/tests/test_fast_executemany.py index 289af5ee..820d5a92 100755 --- a/tests/test_fast_executemany.py +++ b/tests/test_fast_executemany.py @@ -254,8 +254,8 @@ class TestExecuteValues(FastExecuteTestMixin, testutils.ConnectingTestCase): psycopg2.extras.execute_values(cur, "insert into testfast (id, data) values %s -- a%%b", [(1, 'hi')]) - self.assert_(b'a%%b' not in cur.query) - self.assert_(b'a%b' in cur.query) + self.assertTrue(b'a%%b' not in cur.query) + self.assertTrue(b'a%b' in cur.query) cur.execute("select id, data from testfast") self.assertEqual(cur.fetchall(), [(1, 'hi')]) diff --git a/tests/test_green.py b/tests/test_green.py index e3a49467..a7454248 100755 --- a/tests/test_green.py +++ b/tests/test_green.py @@ -101,7 +101,7 @@ class GreenTestCase(ConnectingTestCase): psycopg2.extensions.set_wait_callback(lambda conn: 1 // 0) self.assertRaises(ZeroDivisionError, curs.execute, "select 2") - self.assert_(conn.closed) + self.assertTrue(conn.closed) def test_dont_freak_out(self): # if there is an error in a green query, don't freak out and close @@ -112,7 +112,7 @@ class GreenTestCase(ConnectingTestCase): curs.execute, "select the unselectable") # check that the connection is left in an usable state - self.assert_(not conn.closed) + self.assertTrue(not conn.closed) conn.rollback() curs.execute("select 1") self.assertEqual(curs.fetchone()[0], 1) @@ -152,7 +152,7 @@ class GreenTestCase(ConnectingTestCase): """) polls = stub.polls.count(POLL_READ) - self.assert_(polls > 8, polls) + self.assertTrue(polls > 8, polls) class CallbackErrorTestCase(ConnectingTestCase): diff --git a/tests/test_ipaddress.py b/tests/test_ipaddress.py index 4a2339ef..f548ad8d 100755 --- a/tests/test_ipaddress.py +++ b/tests/test_ipaddress.py @@ -36,17 +36,17 @@ class NetworkingTestCase(testutils.ConnectingTestCase): psycopg2.extras.register_ipaddress(cur) cur.execute("select null::inet") - self.assert_(cur.fetchone()[0] is None) + self.assertTrue(cur.fetchone()[0] is None) cur.execute("select '127.0.0.1/24'::inet") obj = cur.fetchone()[0] - self.assert_(isinstance(obj, ip.IPv4Interface), repr(obj)) - self.assertEquals(obj, ip.ip_interface('127.0.0.1/24')) + self.assertTrue(isinstance(obj, ip.IPv4Interface), repr(obj)) + self.assertEqual(obj, ip.ip_interface('127.0.0.1/24')) cur.execute("select '::ffff:102:300/128'::inet") obj = cur.fetchone()[0] - self.assert_(isinstance(obj, ip.IPv6Interface), repr(obj)) - self.assertEquals(obj, ip.ip_interface('::ffff:102:300/128')) + self.assertTrue(isinstance(obj, ip.IPv6Interface), repr(obj)) + self.assertEqual(obj, ip.ip_interface('::ffff:102:300/128')) @testutils.skip_before_postgres(8, 2) def test_inet_array_cast(self): @@ -54,21 +54,21 @@ class NetworkingTestCase(testutils.ConnectingTestCase): psycopg2.extras.register_ipaddress(cur) cur.execute("select '{NULL,127.0.0.1,::ffff:102:300/128}'::inet[]") l = cur.fetchone()[0] - self.assert_(l[0] is None) - self.assertEquals(l[1], ip.ip_interface('127.0.0.1')) - self.assertEquals(l[2], ip.ip_interface('::ffff:102:300/128')) - self.assert_(isinstance(l[1], ip.IPv4Interface), l) - self.assert_(isinstance(l[2], ip.IPv6Interface), l) + self.assertTrue(l[0] is None) + self.assertEqual(l[1], ip.ip_interface('127.0.0.1')) + self.assertEqual(l[2], ip.ip_interface('::ffff:102:300/128')) + self.assertTrue(isinstance(l[1], ip.IPv4Interface), l) + self.assertTrue(isinstance(l[2], ip.IPv6Interface), l) def test_inet_adapt(self): cur = self.conn.cursor() psycopg2.extras.register_ipaddress(cur) cur.execute("select %s", [ip.ip_interface('127.0.0.1/24')]) - self.assertEquals(cur.fetchone()[0], '127.0.0.1/24') + self.assertEqual(cur.fetchone()[0], '127.0.0.1/24') cur.execute("select %s", [ip.ip_interface('::ffff:102:300/128')]) - self.assertEquals(cur.fetchone()[0], '::ffff:102:300/128') + self.assertEqual(cur.fetchone()[0], '::ffff:102:300/128') @testutils.skip_if_crdb("cidr") def test_cidr_cast(self): @@ -76,17 +76,17 @@ class NetworkingTestCase(testutils.ConnectingTestCase): psycopg2.extras.register_ipaddress(cur) cur.execute("select null::cidr") - self.assert_(cur.fetchone()[0] is None) + self.assertTrue(cur.fetchone()[0] is None) cur.execute("select '127.0.0.0/24'::cidr") obj = cur.fetchone()[0] - self.assert_(isinstance(obj, ip.IPv4Network), repr(obj)) - self.assertEquals(obj, ip.ip_network('127.0.0.0/24')) + self.assertTrue(isinstance(obj, ip.IPv4Network), repr(obj)) + self.assertEqual(obj, ip.ip_network('127.0.0.0/24')) cur.execute("select '::ffff:102:300/128'::cidr") obj = cur.fetchone()[0] - self.assert_(isinstance(obj, ip.IPv6Network), repr(obj)) - self.assertEquals(obj, ip.ip_network('::ffff:102:300/128')) + self.assertTrue(isinstance(obj, ip.IPv6Network), repr(obj)) + self.assertEqual(obj, ip.ip_network('::ffff:102:300/128')) @testutils.skip_if_crdb("cidr") @testutils.skip_before_postgres(8, 2) @@ -95,21 +95,21 @@ class NetworkingTestCase(testutils.ConnectingTestCase): psycopg2.extras.register_ipaddress(cur) cur.execute("select '{NULL,127.0.0.1,::ffff:102:300/128}'::cidr[]") l = cur.fetchone()[0] - self.assert_(l[0] is None) - self.assertEquals(l[1], ip.ip_network('127.0.0.1')) - self.assertEquals(l[2], ip.ip_network('::ffff:102:300/128')) - self.assert_(isinstance(l[1], ip.IPv4Network), l) - self.assert_(isinstance(l[2], ip.IPv6Network), l) + self.assertTrue(l[0] is None) + self.assertEqual(l[1], ip.ip_network('127.0.0.1')) + self.assertEqual(l[2], ip.ip_network('::ffff:102:300/128')) + self.assertTrue(isinstance(l[1], ip.IPv4Network), l) + self.assertTrue(isinstance(l[2], ip.IPv6Network), l) def test_cidr_adapt(self): cur = self.conn.cursor() psycopg2.extras.register_ipaddress(cur) cur.execute("select %s", [ip.ip_network('127.0.0.0/24')]) - self.assertEquals(cur.fetchone()[0], '127.0.0.0/24') + self.assertEqual(cur.fetchone()[0], '127.0.0.0/24') cur.execute("select %s", [ip.ip_network('::ffff:102:300/128')]) - self.assertEquals(cur.fetchone()[0], '::ffff:102:300/128') + self.assertEqual(cur.fetchone()[0], '::ffff:102:300/128') def test_suite(): diff --git a/tests/test_lobject.py b/tests/test_lobject.py index 2ea5c912..5df8465f 100755 --- a/tests/test_lobject.py +++ b/tests/test_lobject.py @@ -134,7 +134,7 @@ class LargeObjectTests(LargeObjectTestCase): self.assertRaises(psycopg2.OperationalError, self.conn.lobject, 0, "w", lo.oid) - self.assert_(not self.conn.closed) + self.assertTrue(not self.conn.closed) def test_import(self): self.tmpdir = tempfile.mkdtemp() @@ -206,7 +206,7 @@ class LargeObjectTests(LargeObjectTestCase): self.assertEqual(lo.read(4), "some") data1 = lo.read() # avoid dumping megacraps in the console in case of error - self.assert_(data == data1, + self.assertTrue(data == data1, f"{data[:100]!r}... != {data1[:100]!r}...") def test_seek_tell(self): @@ -237,7 +237,7 @@ class LargeObjectTests(LargeObjectTestCase): # the object doesn't exist now, so we can't reopen it. self.assertRaises(psycopg2.OperationalError, self.conn.lobject, lo.oid) # And the object has been closed. - self.assertEquals(lo.closed, True) + self.assertEqual(lo.closed, True) def test_export(self): lo = self.conn.lobject() @@ -391,7 +391,7 @@ class LargeObjectTests(LargeObjectTestCase): pass lo = self.conn.lobject(lobject_factory=lobject_subclass) - self.assert_(isinstance(lo, lobject_subclass)) + self.assertTrue(isinstance(lo, lobject_subclass)) @decorate_all_tests diff --git a/tests/test_module.py b/tests/test_module.py index 7c032597..6779acaf 100755 --- a/tests/test_module.py +++ b/tests/test_module.py @@ -93,10 +93,10 @@ class ConnectTestCase(unittest.TestCase): psycopg2.connect(database='foo', user='postgres', password='secret', port=5432) - self.assert_('dbname=foo' in self.args[0]) - self.assert_('user=postgres' in self.args[0]) - self.assert_('password=secret' in self.args[0]) - self.assert_('port=5432' in self.args[0]) + self.assertTrue('dbname=foo' in self.args[0]) + self.assertTrue('user=postgres' in self.args[0]) + self.assertTrue('password=secret' in self.args[0]) + self.assertTrue('port=5432' in self.args[0]) self.assertEqual(len(self.args[0].split()), 4) def test_generic_keywords(self): @@ -121,18 +121,18 @@ class ConnectTestCase(unittest.TestCase): psycopg2.connect(database='foo', host='baz', async_=1) self.assertDsnEqual(self.args[0], 'dbname=foo host=baz') self.assertEqual(self.args[1], None) - self.assert_(self.args[2]) + self.assertTrue(self.args[2]) psycopg2.connect("dbname=foo host=baz", async_=True) self.assertDsnEqual(self.args[0], 'dbname=foo host=baz') self.assertEqual(self.args[1], None) - self.assert_(self.args[2]) + self.assertTrue(self.args[2]) def test_int_port_param(self): psycopg2.connect(database='sony', port=6543) dsn = f" {self.args[0]} " - self.assert_(" dbname=sony " in dsn, dsn) - self.assert_(" port=6543 " in dsn, dsn) + self.assertTrue(" dbname=sony " in dsn, dsn) + self.assertTrue(" port=6543 " in dsn, dsn) def test_empty_param(self): psycopg2.connect(database='sony', password='') @@ -171,8 +171,8 @@ class ExceptionsTestCase(ConnectingTestCase): e = exc self.assertEqual(e.pgcode, '42P01') - self.assert_(e.pgerror) - self.assert_(e.cursor is cur) + self.assertTrue(e.pgerror) + self.assertTrue(e.cursor is cur) def test_diagnostics_attributes(self): cur = self.conn.cursor() @@ -182,7 +182,7 @@ class ExceptionsTestCase(ConnectingTestCase): e = exc diag = e.diag - self.assert_(isinstance(diag, psycopg2.extensions.Diagnostics)) + self.assertTrue(isinstance(diag, psycopg2.extensions.Diagnostics)) for attr in [ 'column_name', 'constraint_name', 'context', 'datatype_name', 'internal_position', 'internal_query', 'message_detail', @@ -191,7 +191,7 @@ class ExceptionsTestCase(ConnectingTestCase): 'source_line', 'sqlstate', 'statement_position', 'table_name', ]: v = getattr(diag, attr) if v is not None: - self.assert_(isinstance(v, str)) + self.assertTrue(isinstance(v, str)) def test_diagnostics_values(self): cur = self.conn.cursor() @@ -311,7 +311,7 @@ class ExceptionsTestCase(ConnectingTestCase): self.assertEqual(e.pgerror, e1.pgerror) self.assertEqual(e.pgcode, e1.pgcode) - self.assert_(e1.cursor is None) + self.assertTrue(e1.cursor is None) @skip_if_crdb("connect any db") def test_pickle_connection_error(self): @@ -325,7 +325,7 @@ class ExceptionsTestCase(ConnectingTestCase): self.assertEqual(e.pgerror, e1.pgerror) self.assertEqual(e.pgcode, e1.pgcode) - self.assert_(e1.cursor is None) + self.assertTrue(e1.cursor is None) class TestExtensionModule(unittest.TestCase): @@ -337,7 +337,7 @@ class TestExtensionModule(unittest.TestCase): # required in ticket #201. pkgdir = os.path.dirname(psycopg2.__file__) pardir = os.path.dirname(pkgdir) - self.assert_(pardir in sys.path) + self.assertTrue(pardir in sys.path) script = f""" import sys sys.path.remove({pardir!r}) diff --git a/tests/test_notify.py b/tests/test_notify.py index 03ab4cde..6f67a390 100755 --- a/tests/test_notify.py +++ b/tests/test_notify.py @@ -86,7 +86,7 @@ conn.close() t0 = time.time() select.select([self.conn], [], [], 5) t1 = time.time() - self.assert_(0.99 < t1 - t0 < 4, t1 - t0) + self.assertTrue(0.99 < t1 - t0 < 4, t1 - t0) pid = int(proc.communicate()[0]) self.assertEqual(0, len(self.conn.notifies)) @@ -134,7 +134,7 @@ conn.close() time.sleep(0.5) self.conn.poll() notify = self.conn.notifies[0] - self.assert_(isinstance(notify, psycopg2.extensions.Notify)) + self.assertTrue(isinstance(notify, psycopg2.extensions.Notify)) @slow def test_notify_attributes(self): @@ -174,7 +174,7 @@ conn.close() time.sleep(0.5) self.conn.poll() notify = self.conn.notifies.popleft() - self.assert_(isinstance(notify, psycopg2.extensions.Notify)) + self.assertTrue(isinstance(notify, psycopg2.extensions.Notify)) self.assertEqual(len(self.conn.notifies), 0) @slow diff --git a/tests/test_quote.py b/tests/test_quote.py index 69d26003..7282e6b0 100755 --- a/tests/test_quote.py +++ b/tests/test_quote.py @@ -61,7 +61,7 @@ class QuotingTestCase(ConnectingTestCase): res = curs.fetchone()[0] self.assertEqual(res, data) - self.assert_(not self.conn.notices) + self.assertTrue(not self.conn.notices) def test_string_null_terminator(self): curs = self.conn.cursor() @@ -70,7 +70,7 @@ class QuotingTestCase(ConnectingTestCase): try: curs.execute("SELECT %s", (data,)) except ValueError as e: - self.assertEquals(str(e), + self.assertEqual(str(e), 'A string literal cannot contain NUL (0x00) characters.') else: self.fail("ValueError not raised") @@ -90,7 +90,7 @@ class QuotingTestCase(ConnectingTestCase): "bytea broken with server >= 9.0, libpq < 9") self.assertEqual(res, data) - self.assert_(not self.conn.notices) + self.assertTrue(not self.conn.notices) def test_unicode(self): curs = self.conn.cursor() @@ -112,7 +112,7 @@ class QuotingTestCase(ConnectingTestCase): res = curs.fetchone()[0] self.assertEqual(res, data) - self.assert_(not self.conn.notices) + self.assertTrue(not self.conn.notices) @skip_if_crdb("encoding") def test_latin1(self): @@ -125,7 +125,7 @@ class QuotingTestCase(ConnectingTestCase): curs.execute("SELECT %s::text;", (data,)) res = curs.fetchone()[0] self.assertEqual(res, data) - self.assert_(not self.conn.notices) + self.assertTrue(not self.conn.notices) @skip_if_crdb("encoding") @@ -139,7 +139,7 @@ class QuotingTestCase(ConnectingTestCase): curs.execute("SELECT %s::text;", (data,)) res = curs.fetchone()[0] self.assertEqual(res, data) - self.assert_(not self.conn.notices) + self.assertTrue(not self.conn.notices) def test_bytes(self): snowman = "\u2603" @@ -149,7 +149,7 @@ class QuotingTestCase(ConnectingTestCase): curs = conn.cursor() curs.execute("select %s::text", (snowman,)) x = curs.fetchone()[0] - self.assert_(isinstance(x, bytes)) + self.assertTrue(isinstance(x, bytes)) self.assertEqual(x, snowman.encode('utf8')) diff --git a/tests/test_sql.py b/tests/test_sql.py index 4519b4a6..2027935e 100755 --- a/tests/test_sql.py +++ b/tests/test_sql.py @@ -38,27 +38,27 @@ class SqlFormatTests(ConnectingTestCase): s = sql.SQL("select {} from {}").format( sql.Identifier('field'), sql.Identifier('table')) s1 = s.as_string(self.conn) - self.assert_(isinstance(s1, str)) + self.assertTrue(isinstance(s1, str)) self.assertEqual(s1, 'select "field" from "table"') def test_pos_spec(self): s = sql.SQL("select {0} from {1}").format( sql.Identifier('field'), sql.Identifier('table')) s1 = s.as_string(self.conn) - self.assert_(isinstance(s1, str)) + self.assertTrue(isinstance(s1, str)) self.assertEqual(s1, 'select "field" from "table"') s = sql.SQL("select {1} from {0}").format( sql.Identifier('table'), sql.Identifier('field')) s1 = s.as_string(self.conn) - self.assert_(isinstance(s1, str)) + self.assertTrue(isinstance(s1, str)) self.assertEqual(s1, 'select "field" from "table"') def test_dict(self): s = sql.SQL("select {f} from {t}").format( f=sql.Identifier('field'), t=sql.Identifier('table')) s1 = s.as_string(self.conn) - self.assert_(isinstance(s1, str)) + self.assertTrue(isinstance(s1, str)) self.assertEqual(s1, 'select "field" from "table"') def test_compose_literal(self): @@ -170,12 +170,12 @@ class SqlFormatTests(ConnectingTestCase): class IdentifierTests(ConnectingTestCase): def test_class(self): - self.assert_(issubclass(sql.Identifier, sql.Composable)) + self.assertTrue(issubclass(sql.Identifier, sql.Composable)) def test_init(self): - self.assert_(isinstance(sql.Identifier('foo'), sql.Identifier)) - self.assert_(isinstance(sql.Identifier('foo'), sql.Identifier)) - self.assert_(isinstance(sql.Identifier('foo', 'bar', 'baz'), sql.Identifier)) + self.assertTrue(isinstance(sql.Identifier('foo'), sql.Identifier)) + self.assertTrue(isinstance(sql.Identifier('foo'), sql.Identifier)) + self.assertTrue(isinstance(sql.Identifier('foo', 'bar', 'baz'), sql.Identifier)) self.assertRaises(TypeError, sql.Identifier) self.assertRaises(TypeError, sql.Identifier, 10) self.assertRaises(TypeError, sql.Identifier, dt.date(2016, 12, 31)) @@ -199,11 +199,11 @@ class IdentifierTests(ConnectingTestCase): self.assertEqual(repr(obj), str(obj)) def test_eq(self): - self.assert_(sql.Identifier('foo') == sql.Identifier('foo')) - self.assert_(sql.Identifier('foo', 'bar') == sql.Identifier('foo', 'bar')) - self.assert_(sql.Identifier('foo') != sql.Identifier('bar')) - self.assert_(sql.Identifier('foo') != 'foo') - self.assert_(sql.Identifier('foo') != sql.SQL('foo')) + self.assertTrue(sql.Identifier('foo') == sql.Identifier('foo')) + self.assertTrue(sql.Identifier('foo', 'bar') == sql.Identifier('foo', 'bar')) + self.assertTrue(sql.Identifier('foo') != sql.Identifier('bar')) + self.assertTrue(sql.Identifier('foo') != 'foo') + self.assertTrue(sql.Identifier('foo') != sql.SQL('foo')) def test_as_str(self): self.assertEqual( @@ -214,19 +214,19 @@ class IdentifierTests(ConnectingTestCase): sql.Identifier("fo'o", 'ba"r').as_string(self.conn), '"fo\'o"."ba""r"') def test_join(self): - self.assert_(not hasattr(sql.Identifier('foo'), 'join')) + self.assertTrue(not hasattr(sql.Identifier('foo'), 'join')) class LiteralTests(ConnectingTestCase): def test_class(self): - self.assert_(issubclass(sql.Literal, sql.Composable)) + self.assertTrue(issubclass(sql.Literal, sql.Composable)) def test_init(self): - self.assert_(isinstance(sql.Literal('foo'), sql.Literal)) - self.assert_(isinstance(sql.Literal('foo'), sql.Literal)) - self.assert_(isinstance(sql.Literal(b'foo'), sql.Literal)) - self.assert_(isinstance(sql.Literal(42), sql.Literal)) - self.assert_(isinstance( + self.assertTrue(isinstance(sql.Literal('foo'), sql.Literal)) + self.assertTrue(isinstance(sql.Literal('foo'), sql.Literal)) + self.assertTrue(isinstance(sql.Literal(b'foo'), sql.Literal)) + self.assertTrue(isinstance(sql.Literal(42), sql.Literal)) + self.assertTrue(isinstance( sql.Literal(dt.date(2016, 12, 31)), sql.Literal)) def test_wrapped(self): @@ -242,10 +242,10 @@ class LiteralTests(ConnectingTestCase): "'2017-01-01'::date") def test_eq(self): - self.assert_(sql.Literal('foo') == sql.Literal('foo')) - self.assert_(sql.Literal('foo') != sql.Literal('bar')) - self.assert_(sql.Literal('foo') != 'foo') - self.assert_(sql.Literal('foo') != sql.SQL('foo')) + self.assertTrue(sql.Literal('foo') == sql.Literal('foo')) + self.assertTrue(sql.Literal('foo') != sql.Literal('bar')) + self.assertTrue(sql.Literal('foo') != 'foo') + self.assertTrue(sql.Literal('foo') != sql.SQL('foo')) def test_must_be_adaptable(self): class Foo: @@ -257,11 +257,11 @@ class LiteralTests(ConnectingTestCase): class SQLTests(ConnectingTestCase): def test_class(self): - self.assert_(issubclass(sql.SQL, sql.Composable)) + self.assertTrue(issubclass(sql.SQL, sql.Composable)) def test_init(self): - self.assert_(isinstance(sql.SQL('foo'), sql.SQL)) - self.assert_(isinstance(sql.SQL('foo'), sql.SQL)) + self.assertTrue(isinstance(sql.SQL('foo'), sql.SQL)) + self.assertTrue(isinstance(sql.SQL('foo'), sql.SQL)) self.assertRaises(TypeError, sql.SQL, 10) self.assertRaises(TypeError, sql.SQL, dt.date(2016, 12, 31)) @@ -274,36 +274,36 @@ class SQLTests(ConnectingTestCase): self.assertEqual(sql.SQL("foo").as_string(self.conn), "foo") def test_eq(self): - self.assert_(sql.SQL('foo') == sql.SQL('foo')) - self.assert_(sql.SQL('foo') != sql.SQL('bar')) - self.assert_(sql.SQL('foo') != 'foo') - self.assert_(sql.SQL('foo') != sql.Literal('foo')) + self.assertTrue(sql.SQL('foo') == sql.SQL('foo')) + self.assertTrue(sql.SQL('foo') != sql.SQL('bar')) + self.assertTrue(sql.SQL('foo') != 'foo') + self.assertTrue(sql.SQL('foo') != sql.Literal('foo')) def test_sum(self): obj = sql.SQL("foo") + sql.SQL("bar") - self.assert_(isinstance(obj, sql.Composed)) + self.assertTrue(isinstance(obj, sql.Composed)) self.assertEqual(obj.as_string(self.conn), "foobar") def test_sum_inplace(self): obj = sql.SQL("foo") obj += sql.SQL("bar") - self.assert_(isinstance(obj, sql.Composed)) + self.assertTrue(isinstance(obj, sql.Composed)) self.assertEqual(obj.as_string(self.conn), "foobar") def test_multiply(self): obj = sql.SQL("foo") * 3 - self.assert_(isinstance(obj, sql.Composed)) + self.assertTrue(isinstance(obj, sql.Composed)) self.assertEqual(obj.as_string(self.conn), "foofoofoo") def test_join(self): obj = sql.SQL(", ").join( [sql.Identifier('foo'), sql.SQL('bar'), sql.Literal(42)]) - self.assert_(isinstance(obj, sql.Composed)) + self.assertTrue(isinstance(obj, sql.Composed)) self.assertEqual(obj.as_string(self.conn), '"foo", bar, 42') obj = sql.SQL(", ").join( sql.Composed([sql.Identifier('foo'), sql.SQL('bar'), sql.Literal(42)])) - self.assert_(isinstance(obj, sql.Composed)) + self.assertTrue(isinstance(obj, sql.Composed)) self.assertEqual(obj.as_string(self.conn), '"foo", bar, 42') obj = sql.SQL(", ").join([]) @@ -312,7 +312,7 @@ class SQLTests(ConnectingTestCase): class ComposedTest(ConnectingTestCase): def test_class(self): - self.assert_(issubclass(sql.Composed, sql.Composable)) + self.assertTrue(issubclass(sql.Composed, sql.Composable)) def test_repr(self): obj = sql.Composed([sql.Literal("foo"), sql.Identifier("b'ar")]) @@ -327,31 +327,31 @@ class ComposedTest(ConnectingTestCase): def test_eq(self): l = [sql.Literal("foo"), sql.Identifier("b'ar")] l2 = [sql.Literal("foo"), sql.Literal("b'ar")] - self.assert_(sql.Composed(l) == sql.Composed(list(l))) - self.assert_(sql.Composed(l) != l) - self.assert_(sql.Composed(l) != sql.Composed(l2)) + self.assertTrue(sql.Composed(l) == sql.Composed(list(l))) + self.assertTrue(sql.Composed(l) != l) + self.assertTrue(sql.Composed(l) != sql.Composed(l2)) def test_join(self): obj = sql.Composed([sql.Literal("foo"), sql.Identifier("b'ar")]) obj = obj.join(", ") - self.assert_(isinstance(obj, sql.Composed)) + self.assertTrue(isinstance(obj, sql.Composed)) self.assertQuotedEqual(obj.as_string(self.conn), "'foo', \"b'ar\"") def test_sum(self): obj = sql.Composed([sql.SQL("foo ")]) obj = obj + sql.Literal("bar") - self.assert_(isinstance(obj, sql.Composed)) + self.assertTrue(isinstance(obj, sql.Composed)) self.assertQuotedEqual(obj.as_string(self.conn), "foo 'bar'") def test_sum_inplace(self): obj = sql.Composed([sql.SQL("foo ")]) obj += sql.Literal("bar") - self.assert_(isinstance(obj, sql.Composed)) + self.assertTrue(isinstance(obj, sql.Composed)) self.assertQuotedEqual(obj.as_string(self.conn), "foo 'bar'") obj = sql.Composed([sql.SQL("foo ")]) obj += sql.Composed([sql.Literal("bar")]) - self.assert_(isinstance(obj, sql.Composed)) + self.assertTrue(isinstance(obj, sql.Composed)) self.assertQuotedEqual(obj.as_string(self.conn), "foo 'bar'") def test_iter(self): @@ -366,7 +366,7 @@ class ComposedTest(ConnectingTestCase): class PlaceholderTest(ConnectingTestCase): def test_class(self): - self.assert_(issubclass(sql.Placeholder, sql.Composable)) + self.assertTrue(issubclass(sql.Placeholder, sql.Composable)) def test_name(self): self.assertEqual(sql.Placeholder().name, None) @@ -396,11 +396,11 @@ class PlaceholderTest(ConnectingTestCase): class ValuesTest(ConnectingTestCase): def test_null(self): - self.assert_(isinstance(sql.NULL, sql.SQL)) + self.assertTrue(isinstance(sql.NULL, sql.SQL)) self.assertEqual(sql.NULL.as_string(self.conn), "NULL") def test_default(self): - self.assert_(isinstance(sql.DEFAULT, sql.SQL)) + self.assertTrue(isinstance(sql.DEFAULT, sql.SQL)) self.assertEqual(sql.DEFAULT.as_string(self.conn), "DEFAULT") diff --git a/tests/test_types_basic.py b/tests/test_types_basic.py index d21309ef..ad72539d 100755 --- a/tests/test_types_basic.py +++ b/tests/test_types_basic.py @@ -48,39 +48,39 @@ class TypesBasicTests(ConnectingTestCase): def testQuoting(self): s = "Quote'this\\! ''ok?''" - self.failUnless(self.execute("SELECT %s AS foo", (s,)) == s, + self.assertTrue(self.execute("SELECT %s AS foo", (s,)) == s, "wrong quoting: " + s) def testUnicode(self): s = "Quote'this\\! ''ok?''" - self.failUnless(self.execute("SELECT %s AS foo", (s,)) == s, + self.assertTrue(self.execute("SELECT %s AS foo", (s,)) == s, "wrong unicode quoting: " + s) def testNumber(self): s = self.execute("SELECT %s AS foo", (1971,)) - self.failUnless(s == 1971, "wrong integer quoting: " + str(s)) + self.assertTrue(s == 1971, "wrong integer quoting: " + str(s)) def testBoolean(self): x = self.execute("SELECT %s as foo", (False,)) - self.assert_(x is False) + self.assertTrue(x is False) x = self.execute("SELECT %s as foo", (True,)) - self.assert_(x is True) + self.assertTrue(x is True) def testDecimal(self): s = self.execute("SELECT %s AS foo", (decimal.Decimal("19.10"),)) - self.failUnless(s - decimal.Decimal("19.10") == 0, + self.assertTrue(s - decimal.Decimal("19.10") == 0, "wrong decimal quoting: " + str(s)) s = self.execute("SELECT %s AS foo", (decimal.Decimal("NaN"),)) - self.failUnless(str(s) == "NaN", "wrong decimal quoting: " + str(s)) - self.failUnless(type(s) == decimal.Decimal, + self.assertTrue(str(s) == "NaN", "wrong decimal quoting: " + str(s)) + self.assertTrue(type(s) == decimal.Decimal, "wrong decimal conversion: " + repr(s)) s = self.execute("SELECT %s AS foo", (decimal.Decimal("infinity"),)) - self.failUnless(str(s) == "NaN", "wrong decimal quoting: " + str(s)) - self.failUnless(type(s) == decimal.Decimal, + self.assertTrue(str(s) == "NaN", "wrong decimal quoting: " + str(s)) + self.assertTrue(type(s) == decimal.Decimal, "wrong decimal conversion: " + repr(s)) s = self.execute("SELECT %s AS foo", (decimal.Decimal("-infinity"),)) - self.failUnless(str(s) == "NaN", "wrong decimal quoting: " + str(s)) - self.failUnless(type(s) == decimal.Decimal, + self.assertTrue(str(s) == "NaN", "wrong decimal quoting: " + str(s)) + self.assertTrue(type(s) == decimal.Decimal, "wrong decimal conversion: " + repr(s)) def testFloatNan(self): @@ -90,8 +90,8 @@ class TypesBasicTests(ConnectingTestCase): return self.skipTest("nan not available on this platform") s = self.execute("SELECT %s AS foo", (float("nan"),)) - self.failUnless(str(s) == "nan", "wrong float quoting: " + str(s)) - self.failUnless(type(s) == float, "wrong float conversion: " + repr(s)) + self.assertTrue(str(s) == "nan", "wrong float quoting: " + str(s)) + self.assertTrue(type(s) == float, "wrong float conversion: " + repr(s)) def testFloatInf(self): try: @@ -101,11 +101,11 @@ class TypesBasicTests(ConnectingTestCase): except ValueError: return self.skipTest("inf not available on this platform") s = self.execute("SELECT %s AS foo", (float("inf"),)) - self.failUnless(str(s) == "inf", "wrong float quoting: " + str(s)) - self.failUnless(type(s) == float, "wrong float conversion: " + repr(s)) + self.assertTrue(str(s) == "inf", "wrong float quoting: " + str(s)) + self.assertTrue(type(s) == float, "wrong float conversion: " + repr(s)) s = self.execute("SELECT %s AS foo", (float("-inf"),)) - self.failUnless(str(s) == "-inf", "wrong float quoting: " + str(s)) + self.assertTrue(str(s) == "-inf", "wrong float quoting: " + str(s)) def testBinary(self): s = bytes(range(256)) @@ -134,9 +134,9 @@ class TypesBasicTests(ConnectingTestCase): @skip_if_crdb("nested array") def testArray(self): s = self.execute("SELECT %s AS foo", ([[1, 2], [3, 4]],)) - self.failUnlessEqual(s, [[1, 2], [3, 4]]) + self.assertEqual(s, [[1, 2], [3, 4]]) s = self.execute("SELECT %s AS foo", (['one', 'two', 'three'],)) - self.failUnlessEqual(s, ['one', 'two', 'three']) + self.assertEqual(s, ['one', 'two', 'three']) @skip_if_crdb("nested array") def testEmptyArrayRegression(self): @@ -171,23 +171,23 @@ class TypesBasicTests(ConnectingTestCase): def testEmptyArray(self): s = self.execute("SELECT '{}'::text[] AS foo") - self.failUnlessEqual(s, []) + self.assertEqual(s, []) s = self.execute("SELECT 1 != ALL(%s)", ([],)) - self.failUnlessEqual(s, True) + self.assertEqual(s, True) # but don't break the strings :) s = self.execute("SELECT '{}'::text AS foo") - self.failUnlessEqual(s, "{}") + self.assertEqual(s, "{}") def testArrayEscape(self): ss = ['', '\\', '"', '\\\\', '\\"'] for s in ss: r = self.execute("SELECT %s AS foo", (s,)) - self.failUnlessEqual(s, r) + self.assertEqual(s, r) r = self.execute("SELECT %s AS foo", ([s],)) - self.failUnlessEqual([s], r) + self.assertEqual([s], r) r = self.execute("SELECT %s AS foo", (ss,)) - self.failUnlessEqual(ss, r) + self.assertEqual(ss, r) def testArrayMalformed(self): curs = self.conn.cursor() @@ -200,7 +200,7 @@ class TypesBasicTests(ConnectingTestCase): curs = self.conn.cursor() curs.execute("select '{a,b,c}'::text[]") x = curs.fetchone()[0] - self.assert_(isinstance(x[0], str)) + self.assertTrue(isinstance(x[0], str)) self.assertEqual(x, ['a', 'b', 'c']) def testUnicodeArray(self): @@ -209,7 +209,7 @@ class TypesBasicTests(ConnectingTestCase): curs = self.conn.cursor() curs.execute("select '{a,b,c}'::text[]") x = curs.fetchone()[0] - self.assert_(isinstance(x[0], str)) + self.assertTrue(isinstance(x[0], str)) self.assertEqual(x, ['a', 'b', 'c']) def testBytesArray(self): @@ -218,7 +218,7 @@ class TypesBasicTests(ConnectingTestCase): curs = self.conn.cursor() curs.execute("select '{a,b,c}'::text[]") x = curs.fetchone()[0] - self.assert_(isinstance(x[0], bytes)) + self.assertTrue(isinstance(x[0], bytes)) self.assertEqual(x, [b'a', b'b', b'c']) @skip_if_crdb("nested array") diff --git a/tests/test_types_extras.py b/tests/test_types_extras.py index 77f89985..ff6936dd 100755 --- a/tests/test_types_extras.py +++ b/tests/test_types_extras.py @@ -53,10 +53,10 @@ class TypesExtrasTests(ConnectingTestCase): psycopg2.extras.register_uuid() u = uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350') s = self.execute("SELECT %s AS foo", (u,)) - self.failUnless(u == s) + self.assertTrue(u == s) # must survive NULL cast to a uuid s = self.execute("SELECT NULL::uuid AS foo") - self.failUnless(s is None) + self.assertTrue(s is None) @skip_if_no_uuid def testUUIDARRAY(self): @@ -64,17 +64,17 @@ class TypesExtrasTests(ConnectingTestCase): u = [uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350'), uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e352')] s = self.execute("SELECT %s AS foo", (u,)) - self.failUnless(u == s) + self.assertTrue(u == s) # array with a NULL element u = [uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350'), None] s = self.execute("SELECT %s AS foo", (u,)) - self.failUnless(u == s) + self.assertTrue(u == s) # must survive NULL cast to a uuid[] s = self.execute("SELECT NULL::uuid[] AS foo") - self.failUnless(s is None) + self.assertTrue(s is None) # what about empty arrays? s = self.execute("SELECT '{}'::uuid[] AS foo") - self.failUnless(type(s) == list and len(s) == 0) + self.assertTrue(type(s) == list and len(s) == 0) @restore_types def testINET(self): @@ -84,10 +84,10 @@ class TypesExtrasTests(ConnectingTestCase): i = psycopg2.extras.Inet("192.168.1.0/24") s = self.execute("SELECT %s AS foo", (i,)) - self.failUnless(i.addr == s.addr) + self.assertTrue(i.addr == s.addr) # must survive NULL cast to inet s = self.execute("SELECT NULL::inet AS foo") - self.failUnless(s is None) + self.assertTrue(s is None) @restore_types def testINETARRAY(self): @@ -97,10 +97,10 @@ class TypesExtrasTests(ConnectingTestCase): i = psycopg2.extras.Inet("192.168.1.0/24") s = self.execute("SELECT %s AS foo", ([i],)) - self.failUnless(i.addr == s[0].addr) + self.assertTrue(i.addr == s[0].addr) # must survive NULL cast to inet s = self.execute("SELECT NULL::inet[] AS foo") - self.failUnless(s is None) + self.assertTrue(s is None) def test_inet_conform(self): i = Inet("192.168.1.0/24") @@ -122,13 +122,13 @@ class TypesExtrasTests(ConnectingTestCase): try: psycopg2.extensions.adapt(Foo(), ext.ISQLQuote, None) except psycopg2.ProgrammingError as err: - self.failUnless(str(err) == "can't adapt type 'Foo'") + self.assertTrue(str(err) == "can't adapt type 'Foo'") def test_point_array(self): # make sure a point array is never casted to a float array, # see https://github.com/psycopg/psycopg2/issues/613 s = self.execute("""SELECT '{"(1,2)","(3,4)"}' AS foo""") - self.failUnless(s == """{"(1,2)","(3,4)"}""") + self.assertTrue(s == """{"(1,2)","(3,4)"}""") def skip_if_no_hstore(f): @@ -156,7 +156,7 @@ class HstoreTestCase(ConnectingTestCase): a.prepare(self.conn) q = a.getquoted() - self.assert_(q.startswith(b"(("), q) + self.assertTrue(q.startswith(b"(("), q) ii = q[1:-1].split(b"||") ii.sort() @@ -181,7 +181,7 @@ class HstoreTestCase(ConnectingTestCase): q = a.getquoted() m = re.match(br'hstore\(ARRAY\[([^\]]+)\], ARRAY\[([^\]]+)\]\)', q) - self.assert_(m, repr(q)) + self.assertTrue(m, repr(q)) kk = m.group(1).split(b",") vv = m.group(2).split(b",") @@ -234,7 +234,7 @@ class HstoreTestCase(ConnectingTestCase): cur = self.conn.cursor() cur.execute("select null::hstore, ''::hstore, 'a => b'::hstore") t = cur.fetchone() - self.assert_(t[0] is None) + self.assertTrue(t[0] is None) self.assertEqual(t[1], {}) self.assertEqual(t[2], {'a': 'b'}) @@ -244,7 +244,7 @@ class HstoreTestCase(ConnectingTestCase): register_hstore(cur) cur.execute("select null::hstore, ''::hstore, 'a => b'::hstore") t = cur.fetchone() - self.assert_(t[0] is None) + self.assertTrue(t[0] is None) self.assertEqual(t[1], {}) self.assertEqual(t[2], {'a': 'b'}) @@ -258,7 +258,7 @@ class HstoreTestCase(ConnectingTestCase): cur2 = self.conn.cursor() cur2.execute("select 'a => b'::hstore") r = cur2.fetchone() - self.assert_(isinstance(r[0], dict)) + self.assertTrue(isinstance(r[0], dict)) finally: conn2.close() @@ -272,7 +272,7 @@ class HstoreTestCase(ConnectingTestCase): d1 = cur.fetchone()[0] self.assertEqual(len(d), len(d1)) for k in d: - self.assert_(k in d1, k) + self.assertTrue(k in d1, k) self.assertEqual(d[k], d1[k]) ok({}) @@ -300,7 +300,7 @@ class HstoreTestCase(ConnectingTestCase): register_hstore(None, globally=True, oid=oid) cur.execute("select null::hstore, ''::hstore, 'a => b'::hstore") t = cur.fetchone() - self.assert_(t[0] is None) + self.assertTrue(t[0] is None) self.assertEqual(t[1], {}) self.assertEqual(t[2], {'a': 'b'}) @@ -347,7 +347,7 @@ class HstoreTestCase(ConnectingTestCase): select null::hstore, ''::hstore, 'a => b'::hstore, '{a=>b}'::hstore[]""") t = cur.fetchone() - self.assert_(t[0] is None) + self.assertTrue(t[0] is None) self.assertEqual(t[1], {}) self.assertEqual(t[2], {'a': 'b'}) self.assertEqual(t[3], [{'a': 'b'}]) @@ -457,7 +457,7 @@ class AdaptTypeTestCase(ConnectingTestCase): self.assertEqual(t.name, 'type_isd') self.assertEqual(t.schema, 'public') self.assertEqual(t.oid, oid) - self.assert_(issubclass(t.type, tuple)) + self.assertTrue(issubclass(t.type, tuple)) self.assertEqual(t.attnames, ['anint', 'astring', 'adate']) self.assertEqual(t.atttypes, [23, 25, 1082]) @@ -465,11 +465,11 @@ class AdaptTypeTestCase(ConnectingTestCase): r = (10, 'hello', date(2011, 1, 2)) curs.execute("select %s::type_isd;", (r,)) v = curs.fetchone()[0] - self.assert_(isinstance(v, t.type)) + self.assertTrue(isinstance(v, t.type)) self.assertEqual(v[0], 10) self.assertEqual(v[1], "hello") self.assertEqual(v[2], date(2011, 1, 2)) - self.assert_(t.type is not tuple) + self.assertTrue(t.type is not tuple) self.assertEqual(v.anint, 10) self.assertEqual(v.astring, "hello") self.assertEqual(v.adate, date(2011, 1, 2)) @@ -598,11 +598,11 @@ class AdaptTypeTestCase(ConnectingTestCase): curs.execute("select %s::type_isd[];", ([r1, r2],)) v = curs.fetchone()[0] self.assertEqual(len(v), 2) - self.assert_(isinstance(v[0], t.type)) + self.assertTrue(isinstance(v[0], t.type)) self.assertEqual(v[0][0], 10) self.assertEqual(v[0][1], "hello") self.assertEqual(v[0][2], date(2011, 1, 2)) - self.assert_(isinstance(v[1], t.type)) + self.assertTrue(isinstance(v[1], t.type)) self.assertEqual(v[1][0], 20) self.assertEqual(v[1][1], "world") self.assertEqual(v[1][2], date(2011, 1, 3)) @@ -702,7 +702,7 @@ class AdaptTypeTestCase(ConnectingTestCase): r = (10, 'hello', date(2011, 1, 2)) curs.execute("select %s::type_isd;", (r,)) v = curs.fetchone()[0] - self.assert_(isinstance(v, dict)) + self.assertTrue(isinstance(v, dict)) self.assertEqual(v['anint'], 10) self.assertEqual(v['astring'], "hello") self.assertEqual(v['adate'], date(2011, 1, 2)) @@ -846,7 +846,7 @@ class JsonTestCase(ConnectingTestCase): curs = self.conn.cursor() curs.execute("""select '{"a": 100.0, "b": null}'::json""") data = curs.fetchone()[0] - self.assert_(isinstance(data['a'], Decimal)) + self.assertTrue(isinstance(data['a'], Decimal)) self.assertEqual(data['a'], Decimal('100.0')) @skip_if_no_json_type @@ -862,7 +862,7 @@ class JsonTestCase(ConnectingTestCase): curs = self.conn.cursor() curs.execute("""select '{"a": 100.0, "b": null}'::json""") data = curs.fetchone()[0] - self.assert_(isinstance(data['a'], Decimal)) + self.assertTrue(isinstance(data['a'], Decimal)) self.assertEqual(data['a'], Decimal('100.0')) @skip_before_postgres(9, 2) @@ -875,12 +875,12 @@ class JsonTestCase(ConnectingTestCase): curs.execute("""select '{"a": 100.0, "b": null}'::json""") data = curs.fetchone()[0] - self.assert_(isinstance(data['a'], Decimal)) + self.assertTrue(isinstance(data['a'], Decimal)) self.assertEqual(data['a'], Decimal('100.0')) curs.execute("""select array['{"a": 100.0, "b": null}']::json[]""") data = curs.fetchone()[0] - self.assert_(isinstance(data[0]['a'], Decimal)) + self.assertTrue(isinstance(data[0]['a'], Decimal)) self.assertEqual(data[0]['a'], Decimal('100.0')) @skip_if_no_json_type @@ -908,10 +908,10 @@ class JsonTestCase(ConnectingTestCase): obj = {'a': [1, 2, snowman]} j = psycopg2.extensions.adapt(psycopg2.extras.Json(obj)) s = str(j) - self.assert_(isinstance(s, str)) + self.assertTrue(isinstance(s, str)) # no pesky b's - self.assert_(s.startswith("'")) - self.assert_(s.endswith("'")) + self.assertTrue(s.startswith("'")) + self.assertTrue(s.endswith("'")) @skip_before_postgres(8, 2) def test_scs(self): @@ -984,13 +984,13 @@ class JsonbTestCase(ConnectingTestCase): curs = self.conn.cursor() curs.execute("""select '{"a": 100.0, "b": null}'::jsonb""") data = curs.fetchone()[0] - self.assert_(isinstance(data['a'], Decimal)) + self.assertTrue(isinstance(data['a'], Decimal)) self.assertEqual(data['a'], Decimal('100.0')) # sure we are not mangling json too? if crdb_version(self.conn) is None: curs.execute("""select '{"a": 100.0, "b": null}'::json""") data = curs.fetchone()[0] - self.assert_(isinstance(data['a'], float)) + self.assertTrue(isinstance(data['a'], float)) self.assertEqual(data['a'], 100.0) def test_register_default(self): @@ -1003,13 +1003,13 @@ class JsonbTestCase(ConnectingTestCase): curs.execute("""select '{"a": 100.0, "b": null}'::jsonb""") data = curs.fetchone()[0] - self.assert_(isinstance(data['a'], Decimal)) + self.assertTrue(isinstance(data['a'], Decimal)) self.assertEqual(data['a'], Decimal('100.0')) if crdb_version(self.conn) is None: curs.execute("""select array['{"a": 100.0, "b": null}']::jsonb[]""") data = curs.fetchone()[0] - self.assert_(isinstance(data[0]['a'], Decimal)) + self.assertTrue(isinstance(data[0]['a'], Decimal)) self.assertEqual(data[0]['a'], Decimal('100.0')) def test_null(self): @@ -1025,34 +1025,34 @@ class RangeTestCase(unittest.TestCase): def test_noparam(self): r = Range() - self.assert_(not r.isempty) + self.assertTrue(not r.isempty) self.assertEqual(r.lower, None) self.assertEqual(r.upper, None) - self.assert_(r.lower_inf) - self.assert_(r.upper_inf) - self.assert_(not r.lower_inc) - self.assert_(not r.upper_inc) + self.assertTrue(r.lower_inf) + self.assertTrue(r.upper_inf) + self.assertTrue(not r.lower_inc) + self.assertTrue(not r.upper_inc) def test_empty(self): r = Range(empty=True) - self.assert_(r.isempty) + self.assertTrue(r.isempty) self.assertEqual(r.lower, None) self.assertEqual(r.upper, None) - self.assert_(not r.lower_inf) - self.assert_(not r.upper_inf) - self.assert_(not r.lower_inc) - self.assert_(not r.upper_inc) + self.assertTrue(not r.lower_inf) + self.assertTrue(not r.upper_inf) + self.assertTrue(not r.lower_inc) + self.assertTrue(not r.upper_inc) def test_nobounds(self): r = Range(10, 20) self.assertEqual(r.lower, 10) self.assertEqual(r.upper, 20) - self.assert_(not r.isempty) - self.assert_(not r.lower_inf) - self.assert_(not r.upper_inf) - self.assert_(r.lower_inc) - self.assert_(not r.upper_inc) + self.assertTrue(not r.isempty) + self.assertTrue(not r.lower_inf) + self.assertTrue(not r.upper_inf) + self.assertTrue(r.lower_inc) + self.assertTrue(not r.upper_inc) def test_bounds(self): for bounds, lower_inc, upper_inc in [ @@ -1063,9 +1063,9 @@ class RangeTestCase(unittest.TestCase): r = Range(10, 20, bounds) self.assertEqual(r.lower, 10) self.assertEqual(r.upper, 20) - self.assert_(not r.isempty) - self.assert_(not r.lower_inf) - self.assert_(not r.upper_inf) + self.assertTrue(not r.isempty) + self.assertTrue(not r.lower_inf) + self.assertTrue(not r.upper_inf) self.assertEqual(r.lower_inc, lower_inc) self.assertEqual(r.upper_inc, upper_inc) @@ -1073,20 +1073,20 @@ class RangeTestCase(unittest.TestCase): r = Range(upper=20) self.assertEqual(r.lower, None) self.assertEqual(r.upper, 20) - self.assert_(not r.isempty) - self.assert_(r.lower_inf) - self.assert_(not r.upper_inf) - self.assert_(not r.lower_inc) - self.assert_(not r.upper_inc) + self.assertTrue(not r.isempty) + self.assertTrue(r.lower_inf) + self.assertTrue(not r.upper_inf) + self.assertTrue(not r.lower_inc) + self.assertTrue(not r.upper_inc) r = Range(lower=10, bounds='(]') self.assertEqual(r.lower, 10) self.assertEqual(r.upper, None) - self.assert_(not r.isempty) - self.assert_(not r.lower_inf) - self.assert_(r.upper_inf) - self.assert_(not r.lower_inc) - self.assert_(not r.upper_inc) + self.assertTrue(not r.isempty) + self.assertTrue(not r.lower_inf) + self.assertTrue(r.upper_inf) + self.assertTrue(not r.lower_inc) + self.assertTrue(not r.upper_inc) def test_bad_bounds(self): self.assertRaises(ValueError, Range, bounds='(') @@ -1094,64 +1094,64 @@ class RangeTestCase(unittest.TestCase): def test_in(self): r = Range(empty=True) - self.assert_(10 not in r) + self.assertTrue(10 not in r) r = Range() - self.assert_(10 in r) + self.assertTrue(10 in r) r = Range(lower=10, bounds='[)') - self.assert_(9 not in r) - self.assert_(10 in r) - self.assert_(11 in r) + self.assertTrue(9 not in r) + self.assertTrue(10 in r) + self.assertTrue(11 in r) r = Range(lower=10, bounds='()') - self.assert_(9 not in r) - self.assert_(10 not in r) - self.assert_(11 in r) + self.assertTrue(9 not in r) + self.assertTrue(10 not in r) + self.assertTrue(11 in r) r = Range(upper=20, bounds='()') - self.assert_(19 in r) - self.assert_(20 not in r) - self.assert_(21 not in r) + self.assertTrue(19 in r) + self.assertTrue(20 not in r) + self.assertTrue(21 not in r) r = Range(upper=20, bounds='(]') - self.assert_(19 in r) - self.assert_(20 in r) - self.assert_(21 not in r) + self.assertTrue(19 in r) + self.assertTrue(20 in r) + self.assertTrue(21 not in r) r = Range(10, 20) - self.assert_(9 not in r) - self.assert_(10 in r) - self.assert_(11 in r) - self.assert_(19 in r) - self.assert_(20 not in r) - self.assert_(21 not in r) + self.assertTrue(9 not in r) + self.assertTrue(10 in r) + self.assertTrue(11 in r) + self.assertTrue(19 in r) + self.assertTrue(20 not in r) + self.assertTrue(21 not in r) r = Range(10, 20, '(]') - self.assert_(9 not in r) - self.assert_(10 not in r) - self.assert_(11 in r) - self.assert_(19 in r) - self.assert_(20 in r) - self.assert_(21 not in r) + self.assertTrue(9 not in r) + self.assertTrue(10 not in r) + self.assertTrue(11 in r) + self.assertTrue(19 in r) + self.assertTrue(20 in r) + self.assertTrue(21 not in r) r = Range(20, 10) - self.assert_(9 not in r) - self.assert_(10 not in r) - self.assert_(11 not in r) - self.assert_(19 not in r) - self.assert_(20 not in r) - self.assert_(21 not in r) + self.assertTrue(9 not in r) + self.assertTrue(10 not in r) + self.assertTrue(11 not in r) + self.assertTrue(19 not in r) + self.assertTrue(20 not in r) + self.assertTrue(21 not in r) def test_nonzero(self): - self.assert_(Range()) - self.assert_(Range(10, 20)) - self.assert_(not Range(empty=True)) + self.assertTrue(Range()) + self.assertTrue(Range(10, 20)) + self.assertTrue(not Range(empty=True)) def test_eq_hash(self): def assert_equal(r1, r2): - self.assert_(r1 == r2) - self.assert_(hash(r1) == hash(r2)) + self.assertTrue(r1 == r2) + self.assertTrue(hash(r1) == hash(r2)) assert_equal(Range(empty=True), Range(empty=True)) assert_equal(Range(), Range()) @@ -1161,8 +1161,8 @@ class RangeTestCase(unittest.TestCase): assert_equal(Range(10, 20, '[]'), Range(10, 20, '[]')) def assert_not_equal(r1, r2): - self.assert_(r1 != r2) - self.assert_(hash(r1) != hash(r2)) + self.assertTrue(r1 != r2) + self.assertTrue(hash(r1) != hash(r2)) assert_not_equal(Range(10, 20), Range(10, 21)) assert_not_equal(Range(10, 20), Range(11, 20)) @@ -1186,64 +1186,64 @@ class RangeTestCase(unittest.TestCase): # and consistent. def test_lt_ordering(self): - self.assert_(Range(empty=True) < Range(0, 4)) - self.assert_(not Range(1, 2) < Range(0, 4)) - self.assert_(Range(0, 4) < Range(1, 2)) - self.assert_(not Range(1, 2) < Range()) - self.assert_(Range() < Range(1, 2)) - self.assert_(not Range(1) < Range(upper=1)) - self.assert_(not Range() < Range()) - self.assert_(not Range(empty=True) < Range(empty=True)) - self.assert_(not Range(1, 2) < Range(1, 2)) + self.assertTrue(Range(empty=True) < Range(0, 4)) + self.assertTrue(not Range(1, 2) < Range(0, 4)) + self.assertTrue(Range(0, 4) < Range(1, 2)) + self.assertTrue(not Range(1, 2) < Range()) + self.assertTrue(Range() < Range(1, 2)) + self.assertTrue(not Range(1) < Range(upper=1)) + self.assertTrue(not Range() < Range()) + self.assertTrue(not Range(empty=True) < Range(empty=True)) + self.assertTrue(not Range(1, 2) < Range(1, 2)) with raises_typeerror(): - self.assert_(1 < Range(1, 2)) + self.assertTrue(1 < Range(1, 2)) with raises_typeerror(): - self.assert_(not Range(1, 2) < 1) + self.assertTrue(not Range(1, 2) < 1) def test_gt_ordering(self): - self.assert_(not Range(empty=True) > Range(0, 4)) - self.assert_(Range(1, 2) > Range(0, 4)) - self.assert_(not Range(0, 4) > Range(1, 2)) - self.assert_(Range(1, 2) > Range()) - self.assert_(not Range() > Range(1, 2)) - self.assert_(Range(1) > Range(upper=1)) - self.assert_(not Range() > Range()) - self.assert_(not Range(empty=True) > Range(empty=True)) - self.assert_(not Range(1, 2) > Range(1, 2)) + self.assertTrue(not Range(empty=True) > Range(0, 4)) + self.assertTrue(Range(1, 2) > Range(0, 4)) + self.assertTrue(not Range(0, 4) > Range(1, 2)) + self.assertTrue(Range(1, 2) > Range()) + self.assertTrue(not Range() > Range(1, 2)) + self.assertTrue(Range(1) > Range(upper=1)) + self.assertTrue(not Range() > Range()) + self.assertTrue(not Range(empty=True) > Range(empty=True)) + self.assertTrue(not Range(1, 2) > Range(1, 2)) with raises_typeerror(): - self.assert_(not 1 > Range(1, 2)) + self.assertTrue(not 1 > Range(1, 2)) with raises_typeerror(): - self.assert_(Range(1, 2) > 1) + self.assertTrue(Range(1, 2) > 1) def test_le_ordering(self): - self.assert_(Range(empty=True) <= Range(0, 4)) - self.assert_(not Range(1, 2) <= Range(0, 4)) - self.assert_(Range(0, 4) <= Range(1, 2)) - self.assert_(not Range(1, 2) <= Range()) - self.assert_(Range() <= Range(1, 2)) - self.assert_(not Range(1) <= Range(upper=1)) - self.assert_(Range() <= Range()) - self.assert_(Range(empty=True) <= Range(empty=True)) - self.assert_(Range(1, 2) <= Range(1, 2)) + self.assertTrue(Range(empty=True) <= Range(0, 4)) + self.assertTrue(not Range(1, 2) <= Range(0, 4)) + self.assertTrue(Range(0, 4) <= Range(1, 2)) + self.assertTrue(not Range(1, 2) <= Range()) + self.assertTrue(Range() <= Range(1, 2)) + self.assertTrue(not Range(1) <= Range(upper=1)) + self.assertTrue(Range() <= Range()) + self.assertTrue(Range(empty=True) <= Range(empty=True)) + self.assertTrue(Range(1, 2) <= Range(1, 2)) with raises_typeerror(): - self.assert_(1 <= Range(1, 2)) + self.assertTrue(1 <= Range(1, 2)) with raises_typeerror(): - self.assert_(not Range(1, 2) <= 1) + self.assertTrue(not Range(1, 2) <= 1) def test_ge_ordering(self): - self.assert_(not Range(empty=True) >= Range(0, 4)) - self.assert_(Range(1, 2) >= Range(0, 4)) - self.assert_(not Range(0, 4) >= Range(1, 2)) - self.assert_(Range(1, 2) >= Range()) - self.assert_(not Range() >= Range(1, 2)) - self.assert_(Range(1) >= Range(upper=1)) - self.assert_(Range() >= Range()) - self.assert_(Range(empty=True) >= Range(empty=True)) - self.assert_(Range(1, 2) >= Range(1, 2)) + self.assertTrue(not Range(empty=True) >= Range(0, 4)) + self.assertTrue(Range(1, 2) >= Range(0, 4)) + self.assertTrue(not Range(0, 4) >= Range(1, 2)) + self.assertTrue(Range(1, 2) >= Range()) + self.assertTrue(not Range() >= Range(1, 2)) + self.assertTrue(Range(1) >= Range(upper=1)) + self.assertTrue(Range() >= Range()) + self.assertTrue(Range(empty=True) >= Range(empty=True)) + self.assertTrue(Range(1, 2) >= Range(1, 2)) with raises_typeerror(): - self.assert_(not 1 >= Range(1, 2)) + self.assertTrue(not 1 >= Range(1, 2)) with raises_typeerror(): - self.assert_(Range(1, 2) >= 1) + self.assertTrue(Range(1, 2) >= 1) def test_pickling(self): r = Range(0, 4) @@ -1308,56 +1308,56 @@ class RangeCasterTestCase(ConnectingTestCase): for type in self.builtin_ranges: cur.execute(f"select 'empty'::{type}") r = cur.fetchone()[0] - self.assert_(isinstance(r, Range), type) - self.assert_(r.isempty) + self.assertTrue(isinstance(r, Range), type) + self.assertTrue(r.isempty) def test_cast_inf(self): cur = self.conn.cursor() for type in self.builtin_ranges: cur.execute(f"select '(,)'::{type}") r = cur.fetchone()[0] - self.assert_(isinstance(r, Range), type) - self.assert_(not r.isempty) - self.assert_(r.lower_inf) - self.assert_(r.upper_inf) + self.assertTrue(isinstance(r, Range), type) + self.assertTrue(not r.isempty) + self.assertTrue(r.lower_inf) + self.assertTrue(r.upper_inf) def test_cast_numbers(self): cur = self.conn.cursor() for type in ('int4range', 'int8range'): cur.execute(f"select '(10,20)'::{type}") r = cur.fetchone()[0] - self.assert_(isinstance(r, NumericRange)) - self.assert_(not r.isempty) + self.assertTrue(isinstance(r, NumericRange)) + self.assertTrue(not r.isempty) self.assertEqual(r.lower, 11) self.assertEqual(r.upper, 20) - self.assert_(not r.lower_inf) - self.assert_(not r.upper_inf) - self.assert_(r.lower_inc) - self.assert_(not r.upper_inc) + self.assertTrue(not r.lower_inf) + self.assertTrue(not r.upper_inf) + self.assertTrue(r.lower_inc) + self.assertTrue(not r.upper_inc) cur.execute("select '(10.2,20.6)'::numrange") r = cur.fetchone()[0] - self.assert_(isinstance(r, NumericRange)) - self.assert_(not r.isempty) + self.assertTrue(isinstance(r, NumericRange)) + self.assertTrue(not r.isempty) self.assertEqual(r.lower, Decimal('10.2')) self.assertEqual(r.upper, Decimal('20.6')) - self.assert_(not r.lower_inf) - self.assert_(not r.upper_inf) - self.assert_(not r.lower_inc) - self.assert_(not r.upper_inc) + self.assertTrue(not r.lower_inf) + self.assertTrue(not r.upper_inf) + self.assertTrue(not r.lower_inc) + self.assertTrue(not r.upper_inc) def test_cast_date(self): cur = self.conn.cursor() cur.execute("select '(2000-01-01,2012-12-31)'::daterange") r = cur.fetchone()[0] - self.assert_(isinstance(r, DateRange)) - self.assert_(not r.isempty) + self.assertTrue(isinstance(r, DateRange)) + self.assertTrue(not r.isempty) self.assertEqual(r.lower, date(2000, 1, 2)) self.assertEqual(r.upper, date(2012, 12, 31)) - self.assert_(not r.lower_inf) - self.assert_(not r.upper_inf) - self.assert_(r.lower_inc) - self.assert_(not r.upper_inc) + self.assertTrue(not r.lower_inf) + self.assertTrue(not r.upper_inf) + self.assertTrue(r.lower_inc) + self.assertTrue(not r.upper_inc) def test_cast_timestamp(self): cur = self.conn.cursor() @@ -1365,14 +1365,14 @@ class RangeCasterTestCase(ConnectingTestCase): ts2 = datetime(2000, 12, 31, 23, 59, 59, 999) cur.execute("select tsrange(%s, %s, '()')", (ts1, ts2)) r = cur.fetchone()[0] - self.assert_(isinstance(r, DateTimeRange)) - self.assert_(not r.isempty) + self.assertTrue(isinstance(r, DateTimeRange)) + self.assertTrue(not r.isempty) self.assertEqual(r.lower, ts1) self.assertEqual(r.upper, ts2) - self.assert_(not r.lower_inf) - self.assert_(not r.upper_inf) - self.assert_(not r.lower_inc) - self.assert_(not r.upper_inc) + self.assertTrue(not r.lower_inf) + self.assertTrue(not r.upper_inf) + self.assertTrue(not r.lower_inc) + self.assertTrue(not r.upper_inc) def test_cast_timestamptz(self): cur = self.conn.cursor() @@ -1381,14 +1381,14 @@ class RangeCasterTestCase(ConnectingTestCase): tzinfo=timezone(timedelta(minutes=600))) cur.execute("select tstzrange(%s, %s, '[]')", (ts1, ts2)) r = cur.fetchone()[0] - self.assert_(isinstance(r, DateTimeTZRange)) - self.assert_(not r.isempty) + self.assertTrue(isinstance(r, DateTimeTZRange)) + self.assertTrue(not r.isempty) self.assertEqual(r.lower, ts1) self.assertEqual(r.upper, ts2) - self.assert_(not r.lower_inf) - self.assert_(not r.upper_inf) - self.assert_(r.lower_inc) - self.assert_(r.upper_inc) + self.assertTrue(not r.lower_inf) + self.assertTrue(not r.upper_inf) + self.assertTrue(r.lower_inc) + self.assertTrue(r.upper_inc) def test_adapt_number_range(self): cur = self.conn.cursor() @@ -1396,26 +1396,26 @@ class RangeCasterTestCase(ConnectingTestCase): r = NumericRange(empty=True) cur.execute("select %s::int4range", (r,)) r1 = cur.fetchone()[0] - self.assert_(isinstance(r1, NumericRange)) - self.assert_(r1.isempty) + self.assertTrue(isinstance(r1, NumericRange)) + self.assertTrue(r1.isempty) r = NumericRange(10, 20) cur.execute("select %s::int8range", (r,)) r1 = cur.fetchone()[0] - self.assert_(isinstance(r1, NumericRange)) + self.assertTrue(isinstance(r1, NumericRange)) self.assertEqual(r1.lower, 10) self.assertEqual(r1.upper, 20) - self.assert_(r1.lower_inc) - self.assert_(not r1.upper_inc) + self.assertTrue(r1.lower_inc) + self.assertTrue(not r1.upper_inc) r = NumericRange(Decimal('10.2'), Decimal('20.5'), '(]') cur.execute("select %s::numrange", (r,)) r1 = cur.fetchone()[0] - self.assert_(isinstance(r1, NumericRange)) + self.assertTrue(isinstance(r1, NumericRange)) self.assertEqual(r1.lower, Decimal('10.2')) self.assertEqual(r1.upper, Decimal('20.5')) - self.assert_(not r1.lower_inc) - self.assert_(r1.upper_inc) + self.assertTrue(not r1.lower_inc) + self.assertTrue(r1.upper_inc) def test_adapt_numeric_range(self): cur = self.conn.cursor() @@ -1423,26 +1423,26 @@ class RangeCasterTestCase(ConnectingTestCase): r = NumericRange(empty=True) cur.execute("select %s::int4range", (r,)) r1 = cur.fetchone()[0] - self.assert_(isinstance(r1, NumericRange), r1) - self.assert_(r1.isempty) + self.assertTrue(isinstance(r1, NumericRange), r1) + self.assertTrue(r1.isempty) r = NumericRange(10, 20) cur.execute("select %s::int8range", (r,)) r1 = cur.fetchone()[0] - self.assert_(isinstance(r1, NumericRange)) + self.assertTrue(isinstance(r1, NumericRange)) self.assertEqual(r1.lower, 10) self.assertEqual(r1.upper, 20) - self.assert_(r1.lower_inc) - self.assert_(not r1.upper_inc) + self.assertTrue(r1.lower_inc) + self.assertTrue(not r1.upper_inc) r = NumericRange(Decimal('10.2'), Decimal('20.5'), '(]') cur.execute("select %s::numrange", (r,)) r1 = cur.fetchone()[0] - self.assert_(isinstance(r1, NumericRange)) + self.assertTrue(isinstance(r1, NumericRange)) self.assertEqual(r1.lower, Decimal('10.2')) self.assertEqual(r1.upper, Decimal('20.5')) - self.assert_(not r1.lower_inc) - self.assert_(r1.upper_inc) + self.assertTrue(not r1.lower_inc) + self.assertTrue(r1.upper_inc) def test_adapt_date_range(self): cur = self.conn.cursor() @@ -1452,17 +1452,17 @@ class RangeCasterTestCase(ConnectingTestCase): r = DateRange(d1, d2) cur.execute("select %s", (r,)) r1 = cur.fetchone()[0] - self.assert_(isinstance(r1, DateRange)) + self.assertTrue(isinstance(r1, DateRange)) self.assertEqual(r1.lower, d1) self.assertEqual(r1.upper, d2) - self.assert_(r1.lower_inc) - self.assert_(not r1.upper_inc) + self.assertTrue(r1.lower_inc) + self.assertTrue(not r1.upper_inc) r = DateTimeRange(empty=True) cur.execute("select %s", (r,)) r1 = cur.fetchone()[0] - self.assert_(isinstance(r1, DateTimeRange)) - self.assert_(r1.isempty) + self.assertTrue(isinstance(r1, DateTimeRange)) + self.assertTrue(r1.isempty) ts1 = datetime(2000, 1, 1, tzinfo=timezone(timedelta(minutes=600))) ts2 = datetime(2000, 12, 31, 23, 59, 59, 999, @@ -1470,11 +1470,11 @@ class RangeCasterTestCase(ConnectingTestCase): r = DateTimeTZRange(ts1, ts2, '(]') cur.execute("select %s", (r,)) r1 = cur.fetchone()[0] - self.assert_(isinstance(r1, DateTimeTZRange)) + self.assertTrue(isinstance(r1, DateTimeTZRange)) self.assertEqual(r1.lower, ts1) self.assertEqual(r1.upper, ts2) - self.assert_(not r1.lower_inc) - self.assert_(r1.upper_inc) + self.assertTrue(not r1.lower_inc) + self.assertTrue(r1.upper_inc) @restore_types def test_register_range_adapter(self): @@ -1483,7 +1483,7 @@ class RangeCasterTestCase(ConnectingTestCase): rc = register_range('textrange', 'TextRange', cur) TextRange = rc.range - self.assert_(issubclass(TextRange, Range)) + self.assertTrue(issubclass(TextRange, Range)) self.assertEqual(TextRange.__name__, 'TextRange') r = TextRange('a', 'b', '(]') @@ -1491,8 +1491,8 @@ class RangeCasterTestCase(ConnectingTestCase): r1 = cur.fetchone()[0] self.assertEqual(r1.lower, 'a') self.assertEqual(r1.upper, 'b') - self.assert_(not r1.lower_inc) - self.assert_(r1.upper_inc) + self.assertTrue(not r1.lower_inc) + self.assertTrue(r1.upper_inc) cur.execute("select %s", ([r, r, r],)) rs = cur.fetchone()[0] @@ -1500,8 +1500,8 @@ class RangeCasterTestCase(ConnectingTestCase): for r1 in rs: self.assertEqual(r1.lower, 'a') self.assertEqual(r1.upper, 'b') - self.assert_(not r1.lower_inc) - self.assert_(r1.upper_inc) + self.assertTrue(not r1.lower_inc) + self.assertTrue(r1.upper_inc) def test_range_escaping(self): cur = self.conn.cursor() @@ -1541,7 +1541,7 @@ class RangeCasterTestCase(ConnectingTestCase): # ...not too many errors! in the above collate there are 17 errors: # assume in other collates we won't find more than 30 - self.assert_(errs < 30, + self.assertTrue(errs < 30, "too many collate errors. Is the test working?") cur.execute("select id, range from rangetest order by id") diff --git a/tests/test_with.py b/tests/test_with.py index f71989dc..35c54c51 100755 --- a/tests/test_with.py +++ b/tests/test_with.py @@ -47,14 +47,14 @@ class WithTestCase(ConnectingTestCase): class WithConnectionTestCase(WithTestCase): def test_with_ok(self): with self.conn as conn: - self.assert_(self.conn is conn) + self.assertTrue(self.conn is conn) self.assertEqual(conn.status, ext.STATUS_READY) curs = conn.cursor() curs.execute("insert into test_with values (1)") self.assertEqual(conn.status, ext.STATUS_BEGIN) self.assertEqual(self.conn.status, ext.STATUS_READY) - self.assert_(not self.conn.closed) + self.assertTrue(not self.conn.closed) curs = self.conn.cursor() curs.execute("select * from test_with") @@ -68,7 +68,7 @@ class WithConnectionTestCase(WithTestCase): self.assertEqual(conn.status, ext.STATUS_BEGIN) self.assertEqual(self.conn.status, ext.STATUS_READY) - self.assert_(not self.conn.closed) + self.assertTrue(not self.conn.closed) curs = self.conn.cursor() curs.execute("select * from test_with") @@ -82,7 +82,7 @@ class WithConnectionTestCase(WithTestCase): self.assertRaises(psycopg2.DataError, f) self.assertEqual(self.conn.status, ext.STATUS_READY) - self.assert_(not self.conn.closed) + self.assertTrue(not self.conn.closed) curs = self.conn.cursor() curs.execute("select * from test_with") @@ -97,7 +97,7 @@ class WithConnectionTestCase(WithTestCase): self.assertRaises(ZeroDivisionError, f) self.assertEqual(self.conn.status, ext.STATUS_READY) - self.assert_(not self.conn.closed) + self.assertTrue(not self.conn.closed) curs = self.conn.cursor() curs.execute("select * from test_with") @@ -124,7 +124,7 @@ class WithConnectionTestCase(WithTestCase): curs.execute("insert into test_with values (10)") self.assertEqual(conn.status, ext.STATUS_READY) - self.assert_(commits) + self.assertTrue(commits) curs = self.conn.cursor() curs.execute("select * from test_with") @@ -146,10 +146,10 @@ class WithConnectionTestCase(WithTestCase): except ZeroDivisionError: pass else: - self.assert_("exception not raised") + self.assertTrue("exception not raised") self.assertEqual(conn.status, ext.STATUS_READY) - self.assert_(rollbacks) + self.assertTrue(rollbacks) curs = conn.cursor() curs.execute("select * from test_with") @@ -164,7 +164,7 @@ class WithConnectionTestCase(WithTestCase): except psycopg2.ProgrammingError: raised_ok = True - self.assert_(raised_ok) + self.assertTrue(raised_ok) # Still good with self.conn: @@ -207,7 +207,7 @@ class WithConnectionTestCase(WithTestCase): except ZeroDivisionError: raised_ok = True - self.assert_(raised_ok) + self.assertTrue(raised_ok) self.assertEqual( self.conn.info.transaction_status, ext.TRANSACTION_STATUS_IDLE ) @@ -232,7 +232,7 @@ class WithConnectionTestCase(WithTestCase): except psycopg2.errors.InvalidTextRepresentation: raised_ok = True - self.assert_(raised_ok) + self.assertTrue(raised_ok) self.assertEqual( self.conn.info.transaction_status, ext.TRANSACTION_STATUS_IDLE ) @@ -248,12 +248,12 @@ class WithCursorTestCase(WithTestCase): with self.conn as conn: with conn.cursor() as curs: curs.execute("insert into test_with values (4)") - self.assert_(not curs.closed) + self.assertTrue(not curs.closed) self.assertEqual(self.conn.status, ext.STATUS_BEGIN) - self.assert_(curs.closed) + self.assertTrue(curs.closed) self.assertEqual(self.conn.status, ext.STATUS_READY) - self.assert_(not self.conn.closed) + self.assertTrue(not self.conn.closed) curs = self.conn.cursor() curs.execute("select * from test_with") @@ -269,8 +269,8 @@ class WithCursorTestCase(WithTestCase): pass self.assertEqual(self.conn.status, ext.STATUS_READY) - self.assert_(not self.conn.closed) - self.assert_(curs.closed) + self.assertTrue(not self.conn.closed) + self.assertTrue(curs.closed) curs = self.conn.cursor() curs.execute("select * from test_with") @@ -285,10 +285,10 @@ class WithCursorTestCase(WithTestCase): super().close() with self.conn.cursor(cursor_factory=MyCurs) as curs: - self.assert_(isinstance(curs, MyCurs)) + self.assertTrue(isinstance(curs, MyCurs)) - self.assert_(curs.closed) - self.assert_(closes) + self.assertTrue(curs.closed) + self.assertTrue(closes) @skip_if_crdb("named cursor") def test_exception_swallow(self): diff --git a/tests/testutils.py b/tests/testutils.py index 1384e843..793bf1ff 100644 --- a/tests/testutils.py +++ b/tests/testutils.py @@ -45,18 +45,6 @@ import psycopg2.extensions from .testconfig import green, dsn, repl_dsn -# Silence warnings caused by the stubbornness of the Python unittest -# maintainers -# https://bugs.python.org/issue9424 -if (not hasattr(unittest.TestCase, 'assert_') - or unittest.TestCase.assert_ is not unittest.TestCase.assertTrue): - # mavaff... - unittest.TestCase.assert_ = unittest.TestCase.assertTrue - unittest.TestCase.failUnless = unittest.TestCase.assertTrue - unittest.TestCase.assertEquals = unittest.TestCase.assertEqual - unittest.TestCase.failUnlessEqual = unittest.TestCase.assertEqual - - def assertDsnEqual(self, dsn1, dsn2, msg=None): """Check that two conninfo string have the same content""" self.assertEqual(set(dsn1.split()), set(dsn2.split()), msg)