This commit is contained in:
Jon Dufresne 2017-11-22 03:28:01 +00:00 committed by GitHub
commit 029cb95ce2
21 changed files with 567 additions and 578 deletions

View File

@ -171,7 +171,7 @@ class DatabaseAPI20Test(unittest.TestCase):
# Must exist
threadsafety = self.driver.threadsafety
# Must be a valid value
self.failUnless(threadsafety in (0,1,2,3))
self.assertTrue(threadsafety in (0,1,2,3))
except AttributeError:
self.fail("Driver doesn't define threadsafety")
@ -180,7 +180,7 @@ class DatabaseAPI20Test(unittest.TestCase):
# Must exist
paramstyle = self.driver.paramstyle
# Must be a valid value
self.failUnless(paramstyle in (
self.assertTrue(paramstyle in (
'qmark','numeric','named','format','pyformat'
))
except AttributeError:
@ -190,31 +190,31 @@ class DatabaseAPI20Test(unittest.TestCase):
# Make sure required exceptions exist, and are in the
# defined hierarchy.
if sys.version[0] == '3': #under Python 3 StardardError no longer exists
self.failUnless(issubclass(self.driver.Warning,Exception))
self.failUnless(issubclass(self.driver.Error,Exception))
self.assertTrue(issubclass(self.driver.Warning,Exception))
self.assertTrue(issubclass(self.driver.Error,Exception))
else:
self.failUnless(issubclass(self.driver.Warning,StandardError))
self.failUnless(issubclass(self.driver.Error,StandardError))
self.assertTrue(issubclass(self.driver.Warning,StandardError))
self.assertTrue(issubclass(self.driver.Error,StandardError))
self.failUnless(
self.assertTrue(
issubclass(self.driver.InterfaceError,self.driver.Error)
)
self.failUnless(
self.assertTrue(
issubclass(self.driver.DatabaseError,self.driver.Error)
)
self.failUnless(
self.assertTrue(
issubclass(self.driver.OperationalError,self.driver.Error)
)
self.failUnless(
self.assertTrue(
issubclass(self.driver.IntegrityError,self.driver.Error)
)
self.failUnless(
self.assertTrue(
issubclass(self.driver.InternalError,self.driver.Error)
)
self.failUnless(
self.assertTrue(
issubclass(self.driver.ProgrammingError,self.driver.Error)
)
self.failUnless(
self.assertTrue(
issubclass(self.driver.NotSupportedError,self.driver.Error)
)
@ -227,15 +227,15 @@ class DatabaseAPI20Test(unittest.TestCase):
# by default.
con = self._connect()
drv = self.driver
self.failUnless(con.Warning is drv.Warning)
self.failUnless(con.Error is drv.Error)
self.failUnless(con.InterfaceError is drv.InterfaceError)
self.failUnless(con.DatabaseError is drv.DatabaseError)
self.failUnless(con.OperationalError is drv.OperationalError)
self.failUnless(con.IntegrityError is drv.IntegrityError)
self.failUnless(con.InternalError is drv.InternalError)
self.failUnless(con.ProgrammingError is drv.ProgrammingError)
self.failUnless(con.NotSupportedError is drv.NotSupportedError)
self.assertTrue(con.Warning is drv.Warning)
self.assertTrue(con.Error is drv.Error)
self.assertTrue(con.InterfaceError is drv.InterfaceError)
self.assertTrue(con.DatabaseError is drv.DatabaseError)
self.assertTrue(con.OperationalError is drv.OperationalError)
self.assertTrue(con.IntegrityError is drv.IntegrityError)
self.assertTrue(con.InternalError is drv.InternalError)
self.assertTrue(con.ProgrammingError is drv.ProgrammingError)
self.assertTrue(con.NotSupportedError is drv.NotSupportedError)
def test_commit(self):
@ -327,12 +327,12 @@ class DatabaseAPI20Test(unittest.TestCase):
cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
self.table_prefix
))
self.failUnless(cur.rowcount in (-1,1),
self.assertTrue(cur.rowcount in (-1,1),
'cursor.rowcount should == number or rows inserted, or '
'set to -1 after executing an insert statement'
)
cur.execute("select name from %sbooze" % self.table_prefix)
self.failUnless(cur.rowcount in (-1,1),
self.assertTrue(cur.rowcount in (-1,1),
'cursor.rowcount should == number of rows returned, or '
'set to -1 after executing a select statement'
)
@ -397,7 +397,7 @@ class DatabaseAPI20Test(unittest.TestCase):
cur.execute("insert into %sbooze values ('Victoria Bitter')" % (
self.table_prefix
))
self.failUnless(cur.rowcount in (-1,1))
self.assertTrue(cur.rowcount in (-1,1))
if self.driver.paramstyle == 'qmark':
cur.execute(
@ -426,7 +426,7 @@ class DatabaseAPI20Test(unittest.TestCase):
)
else:
self.fail('Invalid paramstyle')
self.failUnless(cur.rowcount in (-1,1))
self.assertTrue(cur.rowcount in (-1,1))
cur.execute('select name from %sbooze' % self.table_prefix)
res = cur.fetchall()
@ -478,7 +478,7 @@ class DatabaseAPI20Test(unittest.TestCase):
)
else:
self.fail('Unknown paramstyle')
self.failUnless(cur.rowcount in (-1,2),
self.assertTrue(cur.rowcount in (-1,2),
'insert using cursor.executemany set cursor.rowcount to '
'incorrect value %r' % cur.rowcount
)
@ -513,7 +513,7 @@ class DatabaseAPI20Test(unittest.TestCase):
'cursor.fetchone should return None if a query retrieves '
'no rows'
)
self.failUnless(cur.rowcount in (-1,0))
self.assertTrue(cur.rowcount in (-1,0))
# cursor.fetchone should raise an Error if called after
# executing a query that cannot return rows
@ -533,7 +533,7 @@ class DatabaseAPI20Test(unittest.TestCase):
self.assertEqual(cur.fetchone(),None,
'cursor.fetchone should return None if no more rows available'
)
self.failUnless(cur.rowcount in (-1,1))
self.assertTrue(cur.rowcount in (-1,1))
finally:
con.close()
@ -589,7 +589,7 @@ class DatabaseAPI20Test(unittest.TestCase):
'cursor.fetchmany should return an empty sequence after '
'results are exhausted'
)
self.failUnless(cur.rowcount in (-1,6))
self.assertTrue(cur.rowcount in (-1,6))
# Same as above, using cursor.arraysize
cur.arraysize=4
@ -602,12 +602,12 @@ class DatabaseAPI20Test(unittest.TestCase):
self.assertEqual(len(r),2)
r = cur.fetchmany() # Should be an empty sequence
self.assertEqual(len(r),0)
self.failUnless(cur.rowcount in (-1,6))
self.assertTrue(cur.rowcount in (-1,6))
cur.arraysize=6
cur.execute('select name from %sbooze' % self.table_prefix)
rows = cur.fetchmany() # Should get all rows
self.failUnless(cur.rowcount in (-1,6))
self.assertTrue(cur.rowcount in (-1,6))
self.assertEqual(len(rows),6)
self.assertEqual(len(rows),6)
rows = [r[0] for r in rows]
@ -624,7 +624,7 @@ class DatabaseAPI20Test(unittest.TestCase):
'cursor.fetchmany should return an empty sequence if '
'called after the whole result set has been fetched'
)
self.failUnless(cur.rowcount in (-1,6))
self.assertTrue(cur.rowcount in (-1,6))
self.executeDDL2(cur)
cur.execute('select name from %sbarflys' % self.table_prefix)
@ -633,7 +633,7 @@ class DatabaseAPI20Test(unittest.TestCase):
'cursor.fetchmany should return an empty sequence if '
'query retrieved no rows'
)
self.failUnless(cur.rowcount in (-1,0))
self.assertTrue(cur.rowcount in (-1,0))
finally:
con.close()
@ -657,7 +657,7 @@ class DatabaseAPI20Test(unittest.TestCase):
cur.execute('select name from %sbooze' % self.table_prefix)
rows = cur.fetchall()
self.failUnless(cur.rowcount in (-1,len(self.samples)))
self.assertTrue(cur.rowcount in (-1,len(self.samples)))
self.assertEqual(len(rows),len(self.samples),
'cursor.fetchall did not retrieve all rows'
)
@ -673,12 +673,12 @@ class DatabaseAPI20Test(unittest.TestCase):
'cursor.fetchall should return an empty list if called '
'after the whole result set has been fetched'
)
self.failUnless(cur.rowcount in (-1,len(self.samples)))
self.assertTrue(cur.rowcount in (-1,len(self.samples)))
self.executeDDL2(cur)
cur.execute('select name from %sbarflys' % self.table_prefix)
rows = cur.fetchall()
self.failUnless(cur.rowcount in (-1,0))
self.assertTrue(cur.rowcount in (-1,0))
self.assertEqual(len(rows),0,
'cursor.fetchall should return an empty list if '
'a select query returns no rows'
@ -700,7 +700,7 @@ class DatabaseAPI20Test(unittest.TestCase):
rows23 = cur.fetchmany(2)
rows4 = cur.fetchone()
rows56 = cur.fetchall()
self.failUnless(cur.rowcount in (-1,6))
self.assertTrue(cur.rowcount in (-1,6))
self.assertEqual(len(rows23),2,
'fetchmany returned incorrect number of rows'
)
@ -777,7 +777,7 @@ class DatabaseAPI20Test(unittest.TestCase):
con = self._connect()
try:
cur = con.cursor()
self.failUnless(hasattr(cur,'arraysize'),
self.assertTrue(hasattr(cur,'arraysize'),
'cursor.arraysize must be defined'
)
finally:
@ -846,27 +846,27 @@ class DatabaseAPI20Test(unittest.TestCase):
b = self.driver.Binary(str2bytes(''))
def test_STRING(self):
self.failUnless(hasattr(self.driver,'STRING'),
self.assertTrue(hasattr(self.driver,'STRING'),
'module.STRING must be defined'
)
def test_BINARY(self):
self.failUnless(hasattr(self.driver,'BINARY'),
self.assertTrue(hasattr(self.driver,'BINARY'),
'module.BINARY must be defined.'
)
def test_NUMBER(self):
self.failUnless(hasattr(self.driver,'NUMBER'),
self.assertTrue(hasattr(self.driver,'NUMBER'),
'module.NUMBER must be defined.'
)
def test_DATETIME(self):
self.failUnless(hasattr(self.driver,'DATETIME'),
self.assertTrue(hasattr(self.driver,'DATETIME'),
'module.DATETIME must be defined.'
)
def test_ROWID(self):
self.failUnless(hasattr(self.driver,'ROWID'),
self.assertTrue(hasattr(self.driver,'ROWID'),
'module.ROWID must be defined.'
)

View File

@ -28,15 +28,15 @@ class TwoPhaseCommitTests(unittest.TestCase):
except self.driver.NotSupportedError:
self.fail("Driver does not support transaction IDs.")
self.assertEquals(xid[0], 42)
self.assertEquals(xid[1], "global")
self.assertEquals(xid[2], "bqual")
self.assertEqual(xid[0], 42)
self.assertEqual(xid[1], "global")
self.assertEqual(xid[2], "bqual")
# Try some extremes for the transaction ID:
xid = con.xid(0, "", "")
self.assertEquals(tuple(xid), (0, "", ""))
self.assertEqual(tuple(xid), (0, "", ""))
xid = con.xid(0x7fffffff, "a" * 64, "b" * 64)
self.assertEquals(tuple(xid), (0x7fffffff, "a" * 64, "b" * 64))
self.assertEqual(tuple(xid), (0x7fffffff, "a" * 64, "b" * 64))
def test_tpc_begin(self):
con = self.connect()

View File

@ -71,17 +71,17 @@ class AsyncTests(ConnectingTestCase):
sync_cur = self.sync_conn.cursor()
del cur, sync_cur
self.assert_(self.conn.async_)
self.assert_(not self.sync_conn.async_)
self.assertTrue(self.conn.async_)
self.assertTrue(not self.sync_conn.async_)
# the async connection should be autocommit
self.assert_(self.conn.autocommit)
self.assertEquals(self.conn.isolation_level, ext.ISOLATION_LEVEL_DEFAULT)
self.assertTrue(self.conn.autocommit)
self.assertEqual(self.conn.isolation_level, ext.ISOLATION_LEVEL_DEFAULT)
# check other properties to be found on the connection
self.assert_(self.conn.server_version)
self.assert_(self.conn.protocol_version in (2, 3))
self.assert_(self.conn.encoding in ext.encodings)
self.assertTrue(self.conn.server_version)
self.assertTrue(self.conn.protocol_version in (2, 3))
self.assertTrue(self.conn.encoding in ext.encodings)
def test_async_named_cursor(self):
self.assertRaises(psycopg2.ProgrammingError,
@ -96,7 +96,7 @@ class AsyncTests(ConnectingTestCase):
self.wait(cur)
self.assertFalse(self.conn.isexecuting())
self.assertEquals(cur.fetchone()[0], "a")
self.assertEqual(cur.fetchone()[0], "a")
@slow
@skip_before_postgres(8, 2)
@ -107,7 +107,7 @@ class AsyncTests(ConnectingTestCase):
self.wait(cur)
self.assertFalse(self.conn.isexecuting())
self.assertEquals(cur.fetchall()[0][0], '')
self.assertEqual(cur.fetchall()[0][0], '')
@slow
def test_async_after_async(self):
@ -128,7 +128,7 @@ class AsyncTests(ConnectingTestCase):
cur.execute("select * from table1")
self.wait(cur)
self.assertEquals(cur.fetchall()[0][0], 1)
self.assertEqual(cur.fetchall()[0][0], 1)
cur.execute("delete from table1")
self.wait(cur)
@ -136,7 +136,7 @@ class AsyncTests(ConnectingTestCase):
cur.execute("select * from table1")
self.wait(cur)
self.assertEquals(cur.fetchone(), None)
self.assertEqual(cur.fetchone(), None)
def test_fetch_after_async(self):
cur = self.conn.cursor()
@ -147,7 +147,7 @@ class AsyncTests(ConnectingTestCase):
cur.fetchall)
# but after waiting it should work
self.wait(cur)
self.assertEquals(cur.fetchall()[0][0], "a")
self.assertEqual(cur.fetchall()[0][0], "a")
def test_rollback_while_async(self):
cur = self.conn.cursor()
@ -176,14 +176,14 @@ class AsyncTests(ConnectingTestCase):
cur.execute("select * from table1")
self.wait(cur)
self.assertEquals(cur.fetchall()[0][0], 1)
self.assertEqual(cur.fetchall()[0][0], 1)
cur.execute("delete from table1")
self.wait(cur)
cur.execute("select * from table1")
self.wait(cur)
self.assertEquals(cur.fetchone(), None)
self.assertEqual(cur.fetchone(), None)
def test_set_parameters_while_async(self):
cur = self.conn.cursor()
@ -192,7 +192,7 @@ class AsyncTests(ConnectingTestCase):
self.assertTrue(self.conn.isexecuting())
# getting transaction status works
self.assertEquals(self.conn.get_transaction_status(),
self.assertEqual(self.conn.get_transaction_status(),
ext.TRANSACTION_STATUS_ACTIVE)
self.assertTrue(self.conn.isexecuting())
@ -230,7 +230,7 @@ class AsyncTests(ConnectingTestCase):
# but after it's done it should work
self.wait(cur)
self.assertEquals(list(cur), [(1, ), (2, ), (3, )])
self.assertEqual(list(cur), [(1, ), (2, ), (3, )])
self.assertFalse(self.conn.isexecuting())
def test_copy_while_async(self):
@ -270,7 +270,7 @@ class AsyncTests(ConnectingTestCase):
# but after it's done it should work
self.wait(cur)
cur.scroll(1)
self.assertEquals(cur.fetchall(), [(2, ), (3, )])
self.assertEqual(cur.fetchall(), [(2, ), (3, )])
cur = self.conn.cursor()
cur.execute("select id from table1 order by id")
@ -286,7 +286,7 @@ class AsyncTests(ConnectingTestCase):
self.wait(cur)
cur.scroll(2)
cur.scroll(-1)
self.assertEquals(cur.fetchall(), [(2, ), (3, )])
self.assertEqual(cur.fetchall(), [(2, ), (3, )])
def test_scroll(self):
cur = self.sync_conn.cursor()
@ -299,7 +299,7 @@ class AsyncTests(ConnectingTestCase):
cur.execute("select id from table1 order by id")
cur.scroll(2)
cur.scroll(-1)
self.assertEquals(cur.fetchall(), [(2, ), (3, )])
self.assertEqual(cur.fetchall(), [(2, ), (3, )])
def test_async_dont_read_all(self):
cur = self.conn.cursor()
@ -309,7 +309,7 @@ class AsyncTests(ConnectingTestCase):
self.wait(cur)
# it should be the result of the second query
self.assertEquals(cur.fetchone()[0], "b" * 10000)
self.assertEqual(cur.fetchone()[0], "b" * 10000)
def test_async_subclass(self):
class MyConn(ext.connection):
@ -317,8 +317,8 @@ class AsyncTests(ConnectingTestCase):
ext.connection.__init__(self, dsn, async_=async_)
conn = self.connect(connection_factory=MyConn, async_=True)
self.assert_(isinstance(conn, MyConn))
self.assert_(conn.async_)
self.assertTrue(isinstance(conn, MyConn))
self.assertTrue(conn.async_)
conn.close()
@slow
@ -346,7 +346,7 @@ class AsyncTests(ConnectingTestCase):
cur.execute("select 1")
# polling with a sync query works
cur.connection.poll()
self.assertEquals(cur.fetchone()[0], 1)
self.assertEqual(cur.fetchone()[0], 1)
def test_notify(self):
cur = self.conn.cursor()
@ -357,7 +357,7 @@ class AsyncTests(ConnectingTestCase):
cur.execute("notify test_notify")
self.wait(cur)
self.assertEquals(self.sync_conn.notifies, [])
self.assertEqual(self.sync_conn.notifies, [])
pid = self.conn.get_backend_pid()
for _ in range(5):
@ -365,8 +365,8 @@ class AsyncTests(ConnectingTestCase):
if not self.sync_conn.notifies:
time.sleep(0.5)
continue
self.assertEquals(len(self.sync_conn.notifies), 1)
self.assertEquals(self.sync_conn.notifies.pop(),
self.assertEqual(len(self.sync_conn.notifies), 1)
self.assertEqual(self.sync_conn.notifies.pop(),
(pid, "test_notify"))
return
self.fail("No NOTIFY in 2.5 seconds")
@ -381,7 +381,7 @@ class AsyncTests(ConnectingTestCase):
# fetching from a cursor with no results is an error
self.assertRaises(psycopg2.ProgrammingError, cur2.fetchone)
# fetching from the correct cursor works
self.assertEquals(cur1.fetchone()[0], 1)
self.assertEqual(cur1.fetchone()[0], 1)
def test_error(self):
cur = self.conn.cursor()
@ -402,7 +402,7 @@ class AsyncTests(ConnectingTestCase):
self.wait(cur)
cur.execute("select * from table1 order by id")
self.wait(cur)
self.assertEquals(cur.fetchall(), [(1, ), (2, ), (3, )])
self.assertEqual(cur.fetchall(), [(1, ), (2, ), (3, )])
cur.execute("delete from table1")
self.wait(cur)
@ -413,7 +413,7 @@ class AsyncTests(ConnectingTestCase):
self.assertRaises(psycopg2.ProgrammingError, self.wait, cur)
cur2.execute("select 1")
self.wait(cur2)
self.assertEquals(cur2.fetchone()[0], 1)
self.assertEqual(cur2.fetchone()[0], 1)
def test_notices(self):
del self.conn.notices[:]
@ -424,7 +424,7 @@ class AsyncTests(ConnectingTestCase):
cur.execute("create temp table chatty (id serial primary key);")
self.wait(cur)
self.assertEqual("CREATE TABLE", cur.statusmessage)
self.assert_(self.conn.notices)
self.assertTrue(self.conn.notices)
def test_async_cursor_gone(self):
import gc

View File

@ -56,16 +56,16 @@ class AsyncTests(ConnectingTestCase):
sync_cur = self.sync_conn.cursor()
del cur, sync_cur
self.assert_(self.conn.async)
self.assert_(not self.sync_conn.async)
self.assertTrue(self.conn.async)
self.assertTrue(not self.sync_conn.async)
# the async connection should be autocommit
self.assert_(self.conn.autocommit)
self.assertTrue(self.conn.autocommit)
# check other properties to be found on the connection
self.assert_(self.conn.server_version)
self.assert_(self.conn.protocol_version in (2, 3))
self.assert_(self.conn.encoding in psycopg2.extensions.encodings)
self.assertTrue(self.conn.server_version)
self.assertTrue(self.conn.protocol_version in (2, 3))
self.assertTrue(self.conn.encoding in psycopg2.extensions.encodings)
def test_async_subclass(self):
class MyConn(psycopg2.extensions.connection):
@ -73,8 +73,8 @@ class AsyncTests(ConnectingTestCase):
psycopg2.extensions.connection.__init__(self, dsn, async=async)
conn = self.connect(connection_factory=MyConn, async=True)
self.assert_(isinstance(conn, MyConn))
self.assert_(conn.async)
self.assertTrue(isinstance(conn, MyConn))
self.assertTrue(conn.async)
conn.close()
def test_async_connection_error_message(self):
@ -160,12 +160,12 @@ class ConnectTestCase(unittest.TestCase):
psycopg2.connect(database='foo', host='baz', async=1)
self.assertDsnEqual(self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], None)
self.assert_(self.args[2])
self.assertTrue(self.args[2])
psycopg2.connect("dbname=foo host=baz", async=True)
self.assertDsnEqual(self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], None)
self.assert_(self.args[2])
self.assertTrue(self.args[2])
class AsyncReplicationTest(ReplicationTestCase):

View File

@ -53,7 +53,7 @@ class ConnectionTests(ConnectingTestCase):
conn = self.conn
conn.close()
conn.close()
self.assert_(conn.closed)
self.assertTrue(conn.closed)
def test_cursor_closed_attribute(self):
conn = self.conn
@ -90,19 +90,19 @@ class ConnectionTests(ConnectingTestCase):
if self.conn.server_version >= 90100:
conn.deferrable = False
self.assert_(conn.autocommit)
self.assertTrue(conn.autocommit)
self.assertEqual(conn.isolation_level, ext.ISOLATION_LEVEL_SERIALIZABLE)
self.assert_(conn.readonly is True)
self.assertTrue(conn.readonly is True)
if self.conn.server_version >= 90100:
self.assert_(conn.deferrable is False)
self.assertTrue(conn.deferrable is False)
conn.reset()
# now the session characteristics should be reverted
self.assert_(not conn.autocommit)
self.assertTrue(not conn.autocommit)
self.assertEqual(conn.isolation_level, ext.ISOLATION_LEVEL_DEFAULT)
self.assert_(conn.readonly is None)
self.assertTrue(conn.readonly is None)
if self.conn.server_version >= 90100:
self.assert_(conn.deferrable is None)
self.assertTrue(conn.deferrable is None)
def test_notices(self):
conn = self.conn
@ -111,7 +111,7 @@ class ConnectionTests(ConnectingTestCase):
cur.execute("set client_min_messages=debug1")
cur.execute("create temp table chatty (id serial primary key);")
self.assertEqual("CREATE TABLE", cur.statusmessage)
self.assert_(conn.notices)
self.assertTrue(conn.notices)
def test_notices_consistent_order(self):
conn = self.conn
@ -127,10 +127,10 @@ class ConnectionTests(ConnectingTestCase):
create temp table table4 (id serial);
""")
self.assertEqual(4, len(conn.notices))
self.assert_('table1' in conn.notices[0])
self.assert_('table2' in conn.notices[1])
self.assert_('table3' in conn.notices[2])
self.assert_('table4' in conn.notices[3])
self.assertTrue('table1' in conn.notices[0])
self.assertTrue('table2' in conn.notices[1])
self.assertTrue('table3' in conn.notices[2])
self.assertTrue('table4' in conn.notices[3])
@slow
def test_notices_limited(self):
@ -144,7 +144,7 @@ class ConnectionTests(ConnectingTestCase):
cur.execute(sql)
self.assertEqual(50, len(conn.notices))
self.assert_('table99' in conn.notices[-1], conn.notices[-1])
self.assertTrue('table99' in conn.notices[-1], conn.notices[-1])
@slow
def test_notices_deque(self):
@ -164,10 +164,10 @@ class ConnectionTests(ConnectingTestCase):
create temp table table3 (id serial);
create temp table table4 (id serial);""")
self.assertEqual(len(conn.notices), 4)
self.assert_('table1' in conn.notices.popleft())
self.assert_('table2' in conn.notices.popleft())
self.assert_('table3' in conn.notices.popleft())
self.assert_('table4' in conn.notices.popleft())
self.assertTrue('table1' in conn.notices.popleft())
self.assertTrue('table2' in conn.notices.popleft())
self.assertTrue('table3' in conn.notices.popleft())
self.assertTrue('table4' in conn.notices.popleft())
self.assertEqual(len(conn.notices), 0)
# not limited, but no error
@ -191,10 +191,10 @@ class ConnectionTests(ConnectingTestCase):
self.assertEqual(self.conn.notices, None)
def test_server_version(self):
self.assert_(self.conn.server_version)
self.assertTrue(self.conn.server_version)
def test_protocol_version(self):
self.assert_(self.conn.protocol_version in (2, 3),
self.assertTrue(self.conn.protocol_version in (2, 3),
self.conn.protocol_version)
def test_tpc_unsupported(self):
@ -222,7 +222,7 @@ class ConnectionTests(ConnectingTestCase):
t2.start()
t1.join()
t2.join()
self.assert_(time.time() - t0 < 7,
self.assertTrue(time.time() - t0 < 7,
"something broken in concurrency")
def test_encoding_name(self):
@ -254,7 +254,7 @@ class ConnectionTests(ConnectingTestCase):
conn.close()
del conn
gc.collect()
self.assert_(w() is None)
self.assertTrue(w() is None)
@slow
def test_commit_concurrency(self):
@ -284,7 +284,7 @@ class ConnectionTests(ConnectingTestCase):
# Stop the committer thread
stop.append(True)
self.assert_(not notices, "%d notices raised" % len(notices))
self.assertTrue(not notices, "%d notices raised" % len(notices))
def test_connect_cursor_factory(self):
import psycopg2.extras
@ -330,8 +330,8 @@ class ConnectionTests(ConnectingTestCase):
pass
c = SubConnection("dbname=thereisnosuchdatabasemate password=foobar")
self.assert_(c.closed, "connection failed so it must be closed")
self.assert_('foobar' not in c.dsn, "password was not obscured")
self.assertTrue(c.closed, "connection failed so it must be closed")
self.assertTrue('foobar' not in c.dsn, "password was not obscured")
class ParseDsnTestCase(ConnectingTestCase):
@ -477,7 +477,7 @@ class MakeDsnTestCase(ConnectingTestCase):
conn = self.connect()
d = conn.get_dsn_parameters()
self.assertEqual(d['dbname'], dbname) # the only param we can check reliably
self.assert_('password' not in d, d)
self.assertTrue('password' not in d, d)
class IsolationLevelsTestCase(ConnectingTestCase):
@ -503,7 +503,7 @@ class IsolationLevelsTestCase(ConnectingTestCase):
def test_encoding(self):
conn = self.connect()
self.assert_(conn.encoding in ext.encodings)
self.assertTrue(conn.encoding in ext.encodings)
def test_set_isolation_level(self):
conn = self.connect()
@ -543,11 +543,11 @@ class IsolationLevelsTestCase(ConnectingTestCase):
conn.set_isolation_level(ext.ISOLATION_LEVEL_AUTOCOMMIT)
self.assertEqual(conn.isolation_level, ext.ISOLATION_LEVEL_DEFAULT)
self.assert_(conn.autocommit)
self.assertTrue(conn.autocommit)
conn.isolation_level = 'serializable'
self.assertEqual(conn.isolation_level, ext.ISOLATION_LEVEL_SERIALIZABLE)
self.assert_(conn.autocommit)
self.assertTrue(conn.autocommit)
curs.execute('show transaction_isolation;')
self.assertEqual(curs.fetchone()[0], 'serializable')
@ -1245,11 +1245,11 @@ class TransactionControlTests(ConnectingTestCase):
self.assertRaises(ValueError, self.conn.set_session, 'whatever')
def test_set_read_only(self):
self.assert_(self.conn.readonly is None)
self.assertTrue(self.conn.readonly is None)
cur = self.conn.cursor()
self.conn.set_session(readonly=True)
self.assert_(self.conn.readonly is True)
self.assertTrue(self.conn.readonly is True)
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
self.conn.rollback()
@ -1258,7 +1258,7 @@ class TransactionControlTests(ConnectingTestCase):
self.conn.rollback()
self.conn.set_session(readonly=False)
self.assert_(self.conn.readonly is False)
self.assertTrue(self.conn.readonly is False)
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'off')
self.conn.rollback()
@ -1266,12 +1266,12 @@ class TransactionControlTests(ConnectingTestCase):
def test_setattr_read_only(self):
cur = self.conn.cursor()
self.conn.readonly = True
self.assert_(self.conn.readonly is True)
self.assertTrue(self.conn.readonly is True)
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
self.assertRaises(self.conn.ProgrammingError,
setattr, self.conn, 'readonly', False)
self.assert_(self.conn.readonly is True)
self.assertTrue(self.conn.readonly is True)
self.conn.rollback()
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
@ -1279,13 +1279,13 @@ class TransactionControlTests(ConnectingTestCase):
cur = self.conn.cursor()
self.conn.readonly = None
self.assert_(self.conn.readonly is None)
self.assertTrue(self.conn.readonly is None)
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'off') # assume defined by server
self.conn.rollback()
self.conn.readonly = False
self.assert_(self.conn.readonly is False)
self.assertTrue(self.conn.readonly is False)
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'off')
self.conn.rollback()
@ -1308,10 +1308,10 @@ class TransactionControlTests(ConnectingTestCase):
@skip_before_postgres(9, 1)
def test_set_deferrable(self):
self.assert_(self.conn.deferrable is None)
self.assertTrue(self.conn.deferrable is None)
cur = self.conn.cursor()
self.conn.set_session(readonly=True, deferrable=True)
self.assert_(self.conn.deferrable is True)
self.assertTrue(self.conn.deferrable is True)
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
cur.execute("SHOW transaction_deferrable;")
@ -1322,7 +1322,7 @@ class TransactionControlTests(ConnectingTestCase):
self.conn.rollback()
self.conn.set_session(deferrable=False)
self.assert_(self.conn.deferrable is False)
self.assertTrue(self.conn.deferrable is False)
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
cur.execute("SHOW transaction_deferrable;")
@ -1340,12 +1340,12 @@ class TransactionControlTests(ConnectingTestCase):
def test_setattr_deferrable(self):
cur = self.conn.cursor()
self.conn.deferrable = True
self.assert_(self.conn.deferrable is True)
self.assertTrue(self.conn.deferrable is True)
cur.execute("SHOW transaction_deferrable;")
self.assertEqual(cur.fetchone()[0], 'on')
self.assertRaises(self.conn.ProgrammingError,
setattr, self.conn, 'deferrable', False)
self.assert_(self.conn.deferrable is True)
self.assertTrue(self.conn.deferrable is True)
self.conn.rollback()
cur.execute("SHOW transaction_deferrable;")
self.assertEqual(cur.fetchone()[0], 'on')
@ -1353,13 +1353,13 @@ class TransactionControlTests(ConnectingTestCase):
cur = self.conn.cursor()
self.conn.deferrable = None
self.assert_(self.conn.deferrable is None)
self.assertTrue(self.conn.deferrable is None)
cur.execute("SHOW transaction_deferrable;")
self.assertEqual(cur.fetchone()[0], 'off') # assume defined by server
self.conn.rollback()
self.conn.deferrable = False
self.assert_(self.conn.deferrable is False)
self.assertTrue(self.conn.deferrable is False)
cur.execute("SHOW transaction_deferrable;")
self.assertEqual(cur.fetchone()[0], 'off')
self.conn.rollback()
@ -1393,12 +1393,12 @@ class AutocommitTests(ConnectingTestCase):
# to make it consistent with other methods; meanwhile let's just check
# it doesn't explode.
try:
self.assert_(self.conn.autocommit in (True, False))
self.assertTrue(self.conn.autocommit in (True, False))
except psycopg2.InterfaceError:
pass
def test_default_no_autocommit(self):
self.assert_(not self.conn.autocommit)
self.assertTrue(not self.conn.autocommit)
self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
ext.TRANSACTION_STATUS_IDLE)
@ -1416,7 +1416,7 @@ class AutocommitTests(ConnectingTestCase):
def test_set_autocommit(self):
self.conn.autocommit = True
self.assert_(self.conn.autocommit)
self.assertTrue(self.conn.autocommit)
self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
ext.TRANSACTION_STATUS_IDLE)
@ -1428,7 +1428,7 @@ class AutocommitTests(ConnectingTestCase):
ext.TRANSACTION_STATUS_IDLE)
self.conn.autocommit = False
self.assert_(not self.conn.autocommit)
self.assertTrue(not self.conn.autocommit)
self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
ext.TRANSACTION_STATUS_IDLE)
@ -1446,7 +1446,7 @@ class AutocommitTests(ConnectingTestCase):
def test_set_session_autocommit(self):
self.conn.set_session(autocommit=True)
self.assert_(self.conn.autocommit)
self.assertTrue(self.conn.autocommit)
self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
ext.TRANSACTION_STATUS_IDLE)
@ -1458,7 +1458,7 @@ class AutocommitTests(ConnectingTestCase):
ext.TRANSACTION_STATUS_IDLE)
self.conn.set_session(autocommit=False)
self.assert_(not self.conn.autocommit)
self.assertTrue(not self.conn.autocommit)
self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
ext.TRANSACTION_STATUS_IDLE)
@ -1470,7 +1470,7 @@ class AutocommitTests(ConnectingTestCase):
self.conn.rollback()
self.conn.set_session('serializable', readonly=True, autocommit=True)
self.assert_(self.conn.autocommit)
self.assertTrue(self.conn.autocommit)
cur.execute('select 1;')
self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
@ -1573,7 +1573,7 @@ while True:
self.assertNotEqual(proc.returncode, 0)
# Strip [NNN refs] from output
err = re.sub(br'\[[^\]]+\]', b'', err).strip()
self.assert_(not err, err)
self.assertTrue(not err, err)
def test_suite():

View File

@ -365,7 +365,7 @@ conn.close()
try:
curs.copy_from(BrokenRead(), "tcopy")
except Exception as e:
self.assert_('ZeroDivisionError' in str(e))
self.assertTrue('ZeroDivisionError' in str(e))
def test_copy_to_propagate_error(self):
class BrokenWrite(_base):

View File

@ -39,7 +39,7 @@ class CursorTests(ConnectingTestCase):
cur = self.conn.cursor()
cur.close()
cur.close()
self.assert_(cur.closed)
self.assertTrue(cur.closed)
def test_empty_query(self):
cur = self.conn.cursor()
@ -171,7 +171,7 @@ class CursorTests(ConnectingTestCase):
del curs
import gc
gc.collect()
self.assert_(w() is None)
self.assertTrue(w() is None)
def test_null_name(self):
curs = self.conn.cursor(None)
@ -345,7 +345,7 @@ class CursorTests(ConnectingTestCase):
t1 = (i.next())[0] # the brackets work around a 2to3 bug
time.sleep(0.2)
t2 = (i.next())[0]
self.assert_((t2 - t1).microseconds * 1e-6 < 0.1,
self.assertTrue((t2 - t1).microseconds * 1e-6 < 0.1,
"named cursor records fetched in 2 roundtrips (delta: %s)"
% (t2 - t1))
@ -390,26 +390,26 @@ class CursorTests(ConnectingTestCase):
self.assertEqual(len(c), 7) # DBAPI happy
for a in ('name', 'type_code', 'display_size', 'internal_size',
'precision', 'scale', 'null_ok'):
self.assert_(hasattr(c, a), a)
self.assertTrue(hasattr(c, a), a)
c = curs.description[0]
self.assertEqual(c.name, 'pi')
self.assert_(c.type_code in psycopg2.extensions.DECIMAL.values)
self.assert_(c.internal_size > 0)
self.assertTrue(c.type_code in psycopg2.extensions.DECIMAL.values)
self.assertTrue(c.internal_size > 0)
self.assertEqual(c.precision, 10)
self.assertEqual(c.scale, 2)
c = curs.description[1]
self.assertEqual(c.name, 'hi')
self.assert_(c.type_code in psycopg2.STRING.values)
self.assert_(c.internal_size < 0)
self.assertTrue(c.type_code in psycopg2.STRING.values)
self.assertTrue(c.internal_size < 0)
self.assertEqual(c.precision, None)
self.assertEqual(c.scale, None)
c = curs.description[2]
self.assertEqual(c.name, 'now')
self.assert_(c.type_code in psycopg2.extensions.DATE.values)
self.assert_(c.internal_size > 0)
self.assertTrue(c.type_code in psycopg2.extensions.DATE.values)
self.assertTrue(c.internal_size > 0)
self.assertEqual(c.precision, None)
self.assertEqual(c.scale, None)
@ -525,7 +525,7 @@ class CursorTests(ConnectingTestCase):
# Make sure callproc works right
cur.callproc(procname, {paramname: 2})
self.assertEquals(cur.fetchone()[0], 4)
self.assertEqual(cur.fetchone()[0], 4)
# Make sure callproc fails right
failing_cases = [

View File

@ -41,7 +41,7 @@ class CommonDatetimeTestsMixin:
def test_parse_date(self):
value = self.DATE('2007-01-01', self.curs)
self.assert_(value is not None)
self.assertTrue(value is not None)
self.assertEqual(value.year, 2007)
self.assertEqual(value.month, 1)
self.assertEqual(value.day, 1)
@ -56,7 +56,7 @@ class CommonDatetimeTestsMixin:
def test_parse_time(self):
value = self.TIME('13:30:29', self.curs)
self.assert_(value is not None)
self.assertTrue(value is not None)
self.assertEqual(value.hour, 13)
self.assertEqual(value.minute, 30)
self.assertEqual(value.second, 29)
@ -71,7 +71,7 @@ class CommonDatetimeTestsMixin:
def test_parse_datetime(self):
value = self.DATETIME('2007-01-01 13:30:29', self.curs)
self.assert_(value is not None)
self.assertTrue(value is not None)
self.assertEqual(value.year, 2007)
self.assertEqual(value.month, 1)
self.assertEqual(value.day, 1)
@ -401,20 +401,20 @@ class DatetimeTests(ConnectingTestCase, CommonDatetimeTestsMixin):
from datetime import datetime
t = self.execute("select 'infinity'::timestamp")
self.assert_(t.tzinfo is None)
self.assert_(t > datetime(4000, 1, 1))
self.assertTrue(t.tzinfo is None)
self.assertTrue(t > datetime(4000, 1, 1))
t = self.execute("select '-infinity'::timestamp")
self.assert_(t.tzinfo is None)
self.assert_(t < datetime(1000, 1, 1))
self.assertTrue(t.tzinfo is None)
self.assertTrue(t < datetime(1000, 1, 1))
t = self.execute("select 'infinity'::timestamptz")
self.assert_(t.tzinfo is not None)
self.assert_(t > datetime(4000, 1, 1, tzinfo=FixedOffsetTimezone()))
self.assertTrue(t.tzinfo is not None)
self.assertTrue(t > datetime(4000, 1, 1, tzinfo=FixedOffsetTimezone()))
t = self.execute("select '-infinity'::timestamptz")
self.assert_(t.tzinfo is not None)
self.assert_(t < datetime(1000, 1, 1, tzinfo=FixedOffsetTimezone()))
self.assertTrue(t.tzinfo is not None)
self.assertTrue(t < datetime(1000, 1, 1, tzinfo=FixedOffsetTimezone()))
def test_redshift_day(self):
# Redshift is reported returning 1 day interval as microsec (bug #558)
@ -470,7 +470,7 @@ class mxDateTimeTests(ConnectingTestCase, CommonDatetimeTestsMixin):
def test_parse_bc_date(self):
value = self.DATE('00042-01-01 BC', self.curs)
self.assert_(value is not None)
self.assertTrue(value is not None)
# mx.DateTime numbers BC dates from 0 rather than 1.
self.assertEqual(value.year, -41)
self.assertEqual(value.month, 1)
@ -478,7 +478,7 @@ class mxDateTimeTests(ConnectingTestCase, CommonDatetimeTestsMixin):
def test_parse_bc_datetime(self):
value = self.DATETIME('00042-01-01 13:30:29 BC', self.curs)
self.assert_(value is not None)
self.assertTrue(value is not None)
# mx.DateTime numbers BC dates from 0 rather than 1.
self.assertEqual(value.year, -41)
self.assertEqual(value.month, 1)
@ -529,7 +529,7 @@ class mxDateTimeTests(ConnectingTestCase, CommonDatetimeTestsMixin):
def test_parse_interval(self):
value = self.INTERVAL('42 days 05:50:05', self.curs)
self.assert_(value is not None)
self.assertTrue(value is not None)
self.assertEqual(value.day, 42)
self.assertEqual(value.hour, 5)
self.assertEqual(value.minute, 50)
@ -553,7 +553,7 @@ class mxDateTimeTests(ConnectingTestCase, CommonDatetimeTestsMixin):
[DateTime(-41, 1, 1, 13, 30, 29.123456)])
# microsecs for BC timestamps look not available in PG < 8.4
# but more likely it's determined at compile time.
self.assert_(value in (
self.assertTrue(value in (
'0042-01-01 13:30:29.123456 BC',
'0042-01-01 13:30:29 BC'), value)
@ -651,8 +651,8 @@ class FixedOffsetTimezoneTests(unittest.TestCase):
def test_init_with_no_args(self):
tzinfo = FixedOffsetTimezone()
self.assert_(tzinfo._offset is ZERO)
self.assert_(tzinfo._name is None)
self.assertTrue(tzinfo._offset is ZERO)
self.assertTrue(tzinfo._name is None)
def test_repr_with_positive_offset(self):
tzinfo = FixedOffsetTimezone(5 * 60)
@ -670,15 +670,15 @@ class FixedOffsetTimezoneTests(unittest.TestCase):
"psycopg2.tz.FixedOffsetTimezone(offset=0, name='FOO')")
def test_instance_caching(self):
self.assert_(FixedOffsetTimezone(name="FOO")
self.assertTrue(FixedOffsetTimezone(name="FOO")
is FixedOffsetTimezone(name="FOO"))
self.assert_(FixedOffsetTimezone(7 * 60)
self.assertTrue(FixedOffsetTimezone(7 * 60)
is FixedOffsetTimezone(7 * 60))
self.assert_(FixedOffsetTimezone(-9 * 60, 'FOO')
self.assertTrue(FixedOffsetTimezone(-9 * 60, 'FOO')
is FixedOffsetTimezone(-9 * 60, 'FOO'))
self.assert_(FixedOffsetTimezone(9 * 60)
self.assertTrue(FixedOffsetTimezone(9 * 60)
is not FixedOffsetTimezone(9 * 60, 'FOO'))
self.assert_(FixedOffsetTimezone(name='FOO')
self.assertTrue(FixedOffsetTimezone(name='FOO')
is not FixedOffsetTimezone(9 * 60, 'FOO'))
def test_pickle(self):

View File

@ -36,13 +36,13 @@ class ExtrasDictCursorTests(ConnectingTestCase):
self.conn.close()
self.conn = self.connect(connection_factory=psycopg2.extras.DictConnection)
cur = self.conn.cursor()
self.assert_(isinstance(cur, psycopg2.extras.DictCursor))
self.assertTrue(isinstance(cur, psycopg2.extras.DictCursor))
self.assertEqual(cur.name, None)
# overridable
cur = self.conn.cursor('foo',
cursor_factory=psycopg2.extras.NamedTupleCursor)
self.assertEqual(cur.name, 'foo')
self.assert_(isinstance(cur, psycopg2.extras.NamedTupleCursor))
self.assertTrue(isinstance(cur, psycopg2.extras.NamedTupleCursor))
def testDictCursorWithPlainCursorFetchOne(self):
self._testWithPlainCursor(lambda curs: curs.fetchone())
@ -65,8 +65,8 @@ class ExtrasDictCursorTests(ConnectingTestCase):
def testUpdateRow(self):
row = self._testWithPlainCursor(lambda curs: curs.fetchone())
row['foo'] = 'qux'
self.failUnless(row['foo'] == 'qux')
self.failUnless(row[0] == 'qux')
self.assertTrue(row['foo'] == 'qux')
self.assertTrue(row[0] == 'qux')
@skip_before_postgres(8, 0)
def testDictCursorWithPlainCursorIterRowNumber(self):
@ -77,8 +77,8 @@ class ExtrasDictCursorTests(ConnectingTestCase):
curs = self.conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
curs.execute("SELECT * FROM ExtrasDictCursorTests")
row = getter(curs)
self.failUnless(row['foo'] == 'bar')
self.failUnless(row[0] == 'bar')
self.assertTrue(row['foo'] == 'bar')
self.assertTrue(row[0] == 'bar')
return row
def testDictCursorWithPlainCursorRealFetchOne(self):
@ -108,7 +108,7 @@ class ExtrasDictCursorTests(ConnectingTestCase):
curs = self.conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
curs.execute("SELECT * FROM ExtrasDictCursorTests")
row = getter(curs)
self.failUnless(row['foo'] == 'bar')
self.assertTrue(row['foo'] == 'bar')
def testDictCursorWithNamedCursorFetchOne(self):
self._testWithNamedCursor(lambda curs: curs.fetchone())
@ -142,8 +142,8 @@ class ExtrasDictCursorTests(ConnectingTestCase):
curs = self.conn.cursor('aname', cursor_factory=psycopg2.extras.DictCursor)
curs.execute("SELECT * FROM ExtrasDictCursorTests")
row = getter(curs)
self.failUnless(row['foo'] == 'bar')
self.failUnless(row[0] == 'bar')
self.assertTrue(row['foo'] == 'bar')
self.assertTrue(row[0] == 'bar')
def testDictCursorRealWithNamedCursorFetchOne(self):
self._testWithNamedCursorReal(lambda curs: curs.fetchone())
@ -178,7 +178,7 @@ class ExtrasDictCursorTests(ConnectingTestCase):
cursor_factory=psycopg2.extras.RealDictCursor)
curs.execute("SELECT * FROM ExtrasDictCursorTests")
row = getter(curs)
self.failUnless(row['foo'] == 'bar')
self.assertTrue(row['foo'] == 'bar')
def _testNamedCursorNotGreedy(self, curs):
curs.itersize = 2
@ -189,8 +189,8 @@ class ExtrasDictCursorTests(ConnectingTestCase):
recs.append(t)
# check that the dataset was not fetched in a single gulp
self.assert_(recs[1]['ts'] - recs[0]['ts'] < timedelta(seconds=0.005))
self.assert_(recs[2]['ts'] - recs[1]['ts'] > timedelta(seconds=0.0099))
self.assertTrue(recs[1]['ts'] - recs[0]['ts'] < timedelta(seconds=0.005))
self.assertTrue(recs[2]['ts'] - recs[1]['ts'] > timedelta(seconds=0.0099))
def _testIterRowNumber(self, curs):
# Only checking for dataset < itersize:
@ -249,7 +249,7 @@ class NamedTupleCursorTest(ConnectingTestCase):
def test_cursor_args(self):
cur = self.conn.cursor('foo', cursor_factory=psycopg2.extras.DictCursor)
self.assertEqual(cur.name, 'foo')
self.assert_(isinstance(cur, psycopg2.extras.DictCursor))
self.assertTrue(isinstance(cur, psycopg2.extras.DictCursor))
@skip_if_no_namedtuple
def test_fetchone(self):
@ -460,8 +460,8 @@ class NamedTupleCursorTest(ConnectingTestCase):
recs.append(t)
# check that the dataset was not fetched in a single gulp
self.assert_(recs[1].ts - recs[0].ts < timedelta(seconds=0.005))
self.assert_(recs[2].ts - recs[1].ts > timedelta(seconds=0.0099))
self.assertTrue(recs[1].ts - recs[0].ts < timedelta(seconds=0.005))
self.assertTrue(recs[2].ts - recs[1].ts > timedelta(seconds=0.0099))
@skip_if_no_namedtuple
@skip_before_postgres(8, 0)

View File

@ -224,8 +224,8 @@ class TestExecuteValues(FastExecuteTestMixin, testutils.ConnectingTestCase):
psycopg2.extras.execute_values(cur,
"insert into testfast (id, data) values %s -- a%%b",
[(1, 'hi')])
self.assert_(b'a%%b' not in cur.query)
self.assert_(b'a%b' in cur.query)
self.assertTrue(b'a%%b' not in cur.query)
self.assertTrue(b'a%b' in cur.query)
cur.execute("select id, data from testfast")
self.assertEqual(cur.fetchall(), [(1, 'hi')])

View File

@ -95,7 +95,7 @@ class GreenTestCase(ConnectingTestCase):
psycopg2.extensions.set_wait_callback(lambda conn: 1 // 0)
self.assertRaises(ZeroDivisionError, curs.execute, "select 2")
self.assert_(conn.closed)
self.assertTrue(conn.closed)
def test_dont_freak_out(self):
# if there is an error in a green query, don't freak out and close
@ -106,7 +106,7 @@ class GreenTestCase(ConnectingTestCase):
curs.execute, "select the unselectable")
# check that the connection is left in an usable state
self.assert_(not conn.closed)
self.assertTrue(not conn.closed)
conn.rollback()
curs.execute("select 1")
self.assertEqual(curs.fetchone()[0], 1)

View File

@ -47,17 +47,17 @@ class NetworkingTestCase(testutils.ConnectingTestCase):
psycopg2.extras.register_ipaddress(cur)
cur.execute("select null::inet")
self.assert_(cur.fetchone()[0] is None)
self.assertTrue(cur.fetchone()[0] is None)
cur.execute("select '127.0.0.1/24'::inet")
obj = cur.fetchone()[0]
self.assert_(isinstance(obj, ip.IPv4Interface), repr(obj))
self.assertEquals(obj, ip.ip_interface('127.0.0.1/24'))
self.assertTrue(isinstance(obj, ip.IPv4Interface), repr(obj))
self.assertEqual(obj, ip.ip_interface('127.0.0.1/24'))
cur.execute("select '::ffff:102:300/128'::inet")
obj = cur.fetchone()[0]
self.assert_(isinstance(obj, ip.IPv6Interface), repr(obj))
self.assertEquals(obj, ip.ip_interface('::ffff:102:300/128'))
self.assertTrue(isinstance(obj, ip.IPv6Interface), repr(obj))
self.assertEqual(obj, ip.ip_interface('::ffff:102:300/128'))
@testutils.skip_before_postgres(8, 2)
def test_inet_array_cast(self):
@ -66,11 +66,11 @@ class NetworkingTestCase(testutils.ConnectingTestCase):
psycopg2.extras.register_ipaddress(cur)
cur.execute("select '{NULL,127.0.0.1,::ffff:102:300/128}'::inet[]")
l = cur.fetchone()[0]
self.assert_(l[0] is None)
self.assertEquals(l[1], ip.ip_interface('127.0.0.1'))
self.assertEquals(l[2], ip.ip_interface('::ffff:102:300/128'))
self.assert_(isinstance(l[1], ip.IPv4Interface), l)
self.assert_(isinstance(l[2], ip.IPv6Interface), l)
self.assertTrue(l[0] is None)
self.assertEqual(l[1], ip.ip_interface('127.0.0.1'))
self.assertEqual(l[2], ip.ip_interface('::ffff:102:300/128'))
self.assertTrue(isinstance(l[1], ip.IPv4Interface), l)
self.assertTrue(isinstance(l[2], ip.IPv6Interface), l)
def test_inet_adapt(self):
import ipaddress as ip
@ -78,10 +78,10 @@ class NetworkingTestCase(testutils.ConnectingTestCase):
psycopg2.extras.register_ipaddress(cur)
cur.execute("select %s", [ip.ip_interface('127.0.0.1/24')])
self.assertEquals(cur.fetchone()[0], '127.0.0.1/24')
self.assertEqual(cur.fetchone()[0], '127.0.0.1/24')
cur.execute("select %s", [ip.ip_interface('::ffff:102:300/128')])
self.assertEquals(cur.fetchone()[0], '::ffff:102:300/128')
self.assertEqual(cur.fetchone()[0], '::ffff:102:300/128')
def test_cidr_cast(self):
import ipaddress as ip
@ -89,17 +89,17 @@ class NetworkingTestCase(testutils.ConnectingTestCase):
psycopg2.extras.register_ipaddress(cur)
cur.execute("select null::cidr")
self.assert_(cur.fetchone()[0] is None)
self.assertTrue(cur.fetchone()[0] is None)
cur.execute("select '127.0.0.0/24'::cidr")
obj = cur.fetchone()[0]
self.assert_(isinstance(obj, ip.IPv4Network), repr(obj))
self.assertEquals(obj, ip.ip_network('127.0.0.0/24'))
self.assertTrue(isinstance(obj, ip.IPv4Network), repr(obj))
self.assertEqual(obj, ip.ip_network('127.0.0.0/24'))
cur.execute("select '::ffff:102:300/128'::cidr")
obj = cur.fetchone()[0]
self.assert_(isinstance(obj, ip.IPv6Network), repr(obj))
self.assertEquals(obj, ip.ip_network('::ffff:102:300/128'))
self.assertTrue(isinstance(obj, ip.IPv6Network), repr(obj))
self.assertEqual(obj, ip.ip_network('::ffff:102:300/128'))
@testutils.skip_before_postgres(8, 2)
def test_cidr_array_cast(self):
@ -108,11 +108,11 @@ class NetworkingTestCase(testutils.ConnectingTestCase):
psycopg2.extras.register_ipaddress(cur)
cur.execute("select '{NULL,127.0.0.1,::ffff:102:300/128}'::cidr[]")
l = cur.fetchone()[0]
self.assert_(l[0] is None)
self.assertEquals(l[1], ip.ip_network('127.0.0.1'))
self.assertEquals(l[2], ip.ip_network('::ffff:102:300/128'))
self.assert_(isinstance(l[1], ip.IPv4Network), l)
self.assert_(isinstance(l[2], ip.IPv6Network), l)
self.assertTrue(l[0] is None)
self.assertEqual(l[1], ip.ip_network('127.0.0.1'))
self.assertEqual(l[2], ip.ip_network('::ffff:102:300/128'))
self.assertTrue(isinstance(l[1], ip.IPv4Network), l)
self.assertTrue(isinstance(l[2], ip.IPv6Network), l)
def test_cidr_adapt(self):
import ipaddress as ip
@ -120,10 +120,10 @@ class NetworkingTestCase(testutils.ConnectingTestCase):
psycopg2.extras.register_ipaddress(cur)
cur.execute("select %s", [ip.ip_network('127.0.0.0/24')])
self.assertEquals(cur.fetchone()[0], '127.0.0.0/24')
self.assertEqual(cur.fetchone()[0], '127.0.0.0/24')
cur.execute("select %s", [ip.ip_network('::ffff:102:300/128')])
self.assertEquals(cur.fetchone()[0], '::ffff:102:300/128')
self.assertEqual(cur.fetchone()[0], '::ffff:102:300/128')
testutils.decorate_all_tests(NetworkingTestCase, skip_if_no_ipaddress)

View File

@ -137,7 +137,7 @@ class LargeObjectTests(LargeObjectTestCase):
self.assertRaises(psycopg2.OperationalError,
self.conn.lobject, 0, "w", lo.oid)
self.assert_(not self.conn.closed)
self.assertTrue(not self.conn.closed)
def test_import(self):
self.tmpdir = tempfile.mkdtemp()
@ -209,7 +209,7 @@ class LargeObjectTests(LargeObjectTestCase):
self.assertEqual(lo.read(4), "some")
data1 = lo.read()
# avoid dumping megacraps in the console in case of error
self.assert_(data == data1,
self.assertTrue(data == data1,
"%r... != %r..." % (data[:100], data1[:100]))
def test_seek_tell(self):
@ -240,7 +240,7 @@ class LargeObjectTests(LargeObjectTestCase):
# the object doesn't exist now, so we can't reopen it.
self.assertRaises(psycopg2.OperationalError, self.conn.lobject, lo.oid)
# And the object has been closed.
self.assertEquals(lo.closed, True)
self.assertEqual(lo.closed, True)
def test_export(self):
lo = self.conn.lobject()
@ -394,7 +394,7 @@ class LargeObjectTests(LargeObjectTestCase):
pass
lo = self.conn.lobject(lobject_factory=lobject_subclass)
self.assert_(isinstance(lo, lobject_subclass))
self.assertTrue(isinstance(lo, lobject_subclass))
decorate_all_tests(LargeObjectTests, skip_if_no_lo, skip_lo_if_green)

View File

@ -78,10 +78,10 @@ class ConnectTestCase(unittest.TestCase):
psycopg2.connect(database='foo',
user='postgres', password='secret', port=5432)
self.assert_('dbname=foo' in self.args[0])
self.assert_('user=postgres' in self.args[0])
self.assert_('password=secret' in self.args[0])
self.assert_('port=5432' in self.args[0])
self.assertTrue('dbname=foo' in self.args[0])
self.assertTrue('user=postgres' in self.args[0])
self.assertTrue('password=secret' in self.args[0])
self.assertTrue('port=5432' in self.args[0])
self.assertEqual(len(self.args[0].split()), 4)
def test_generic_keywords(self):
@ -106,18 +106,18 @@ class ConnectTestCase(unittest.TestCase):
psycopg2.connect(database='foo', host='baz', async_=1)
self.assertDsnEqual(self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], None)
self.assert_(self.args[2])
self.assertTrue(self.args[2])
psycopg2.connect("dbname=foo host=baz", async_=True)
self.assertDsnEqual(self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], None)
self.assert_(self.args[2])
self.assertTrue(self.args[2])
def test_int_port_param(self):
psycopg2.connect(database='sony', port=6543)
dsn = " %s " % self.args[0]
self.assert_(" dbname=sony " in dsn, dsn)
self.assert_(" port=6543 " in dsn, dsn)
self.assertTrue(" dbname=sony " in dsn, dsn)
self.assertTrue(" port=6543 " in dsn, dsn)
def test_empty_param(self):
psycopg2.connect(database='sony', password='')
@ -156,8 +156,8 @@ class ExceptionsTestCase(ConnectingTestCase):
e = exc
self.assertEqual(e.pgcode, '42P01')
self.assert_(e.pgerror)
self.assert_(e.cursor is cur)
self.assertTrue(e.pgerror)
self.assertTrue(e.cursor is cur)
def test_diagnostics_attributes(self):
cur = self.conn.cursor()
@ -167,7 +167,7 @@ class ExceptionsTestCase(ConnectingTestCase):
e = exc
diag = e.diag
self.assert_(isinstance(diag, psycopg2.extensions.Diagnostics))
self.assertTrue(isinstance(diag, psycopg2.extensions.Diagnostics))
for attr in [
'column_name', 'constraint_name', 'context', 'datatype_name',
'internal_position', 'internal_query', 'message_detail',
@ -176,7 +176,7 @@ class ExceptionsTestCase(ConnectingTestCase):
'statement_position', 'table_name', ]:
v = getattr(diag, attr)
if v is not None:
self.assert_(isinstance(v, str))
self.assertTrue(isinstance(v, str))
def test_diagnostics_values(self):
cur = self.conn.cursor()
@ -289,7 +289,7 @@ class ExceptionsTestCase(ConnectingTestCase):
self.assertEqual(e.pgerror, e1.pgerror)
self.assertEqual(e.pgcode, e1.pgcode)
self.assert_(e1.cursor is None)
self.assertTrue(e1.cursor is None)
@skip_before_python(2, 5)
def test_pickle_connection_error(self):
@ -304,7 +304,7 @@ class ExceptionsTestCase(ConnectingTestCase):
self.assertEqual(e.pgerror, e1.pgerror)
self.assertEqual(e.pgcode, e1.pgcode)
self.assert_(e1.cursor is None)
self.assertTrue(e1.cursor is None)
class TestExtensionModule(unittest.TestCase):
@ -316,7 +316,7 @@ class TestExtensionModule(unittest.TestCase):
# required in ticket #201.
pkgdir = os.path.dirname(psycopg2.__file__)
pardir = os.path.dirname(pkgdir)
self.assert_(pardir in sys.path)
self.assertTrue(pardir in sys.path)
script = ("""
import sys
sys.path.remove(%r)

View File

@ -82,7 +82,7 @@ conn.close()
t0 = time.time()
select.select([self.conn], [], [], 5)
t1 = time.time()
self.assert_(0.99 < t1 - t0 < 4, t1 - t0)
self.assertTrue(0.99 < t1 - t0 < 4, t1 - t0)
pid = int(proc.communicate()[0])
self.assertEqual(0, len(self.conn.notifies))
@ -130,7 +130,7 @@ conn.close()
time.sleep(0.5)
self.conn.poll()
notify = self.conn.notifies[0]
self.assert_(isinstance(notify, psycopg2.extensions.Notify))
self.assertTrue(isinstance(notify, psycopg2.extensions.Notify))
@slow
def test_notify_attributes(self):
@ -171,7 +171,7 @@ conn.close()
time.sleep(0.5)
self.conn.poll()
notify = self.conn.notifies.popleft()
self.assert_(isinstance(notify, psycopg2.extensions.Notify))
self.assertTrue(isinstance(notify, psycopg2.extensions.Notify))
self.assertEqual(len(self.conn.notifies), 0)
@slow

View File

@ -59,7 +59,7 @@ class QuotingTestCase(ConnectingTestCase):
res = curs.fetchone()[0]
self.assertEqual(res, data)
self.assert_(not self.conn.notices)
self.assertTrue(not self.conn.notices)
def test_string_null_terminator(self):
curs = self.conn.cursor()
@ -68,7 +68,7 @@ class QuotingTestCase(ConnectingTestCase):
try:
curs.execute("SELECT %s", (data,))
except ValueError as e:
self.assertEquals(str(e),
self.assertEqual(str(e),
'A string literal cannot contain NUL (0x00) characters.')
else:
self.fail("ValueError not raised")
@ -94,7 +94,7 @@ class QuotingTestCase(ConnectingTestCase):
"bytea broken with server >= 9.0, libpq < 9")
self.assertEqual(res, data)
self.assert_(not self.conn.notices)
self.assertTrue(not self.conn.notices)
def test_unicode(self):
curs = self.conn.cursor()
@ -117,7 +117,7 @@ class QuotingTestCase(ConnectingTestCase):
res = curs.fetchone()[0]
self.assertEqual(res, data)
self.assert_(not self.conn.notices)
self.assertTrue(not self.conn.notices)
def test_latin1(self):
self.conn.set_client_encoding('LATIN1')
@ -131,7 +131,7 @@ class QuotingTestCase(ConnectingTestCase):
curs.execute("SELECT %s::text;", (data,))
res = curs.fetchone()[0]
self.assertEqual(res, data)
self.assert_(not self.conn.notices)
self.assertTrue(not self.conn.notices)
# as unicode
if sys.version_info[0] < 3:
@ -141,7 +141,7 @@ class QuotingTestCase(ConnectingTestCase):
curs.execute("SELECT %s::text;", (data,))
res = curs.fetchone()[0]
self.assertEqual(res, data)
self.assert_(not self.conn.notices)
self.assertTrue(not self.conn.notices)
def test_koi8(self):
self.conn.set_client_encoding('KOI8')
@ -155,7 +155,7 @@ class QuotingTestCase(ConnectingTestCase):
curs.execute("SELECT %s::text;", (data,))
res = curs.fetchone()[0]
self.assertEqual(res, data)
self.assert_(not self.conn.notices)
self.assertTrue(not self.conn.notices)
# as unicode
if sys.version_info[0] < 3:
@ -165,7 +165,7 @@ class QuotingTestCase(ConnectingTestCase):
curs.execute("SELECT %s::text;", (data,))
res = curs.fetchone()[0]
self.assertEqual(res, data)
self.assert_(not self.conn.notices)
self.assertTrue(not self.conn.notices)
class TestQuotedString(ConnectingTestCase):

View File

@ -37,34 +37,34 @@ class SqlFormatTests(ConnectingTestCase):
s = sql.SQL("select {} from {}").format(
sql.Identifier('field'), sql.Identifier('table'))
s1 = s.as_string(self.conn)
self.assert_(isinstance(s1, str))
self.assertTrue(isinstance(s1, str))
self.assertEqual(s1, 'select "field" from "table"')
def test_pos_spec(self):
s = sql.SQL("select {0} from {1}").format(
sql.Identifier('field'), sql.Identifier('table'))
s1 = s.as_string(self.conn)
self.assert_(isinstance(s1, str))
self.assertTrue(isinstance(s1, str))
self.assertEqual(s1, 'select "field" from "table"')
s = sql.SQL("select {1} from {0}").format(
sql.Identifier('table'), sql.Identifier('field'))
s1 = s.as_string(self.conn)
self.assert_(isinstance(s1, str))
self.assertTrue(isinstance(s1, str))
self.assertEqual(s1, 'select "field" from "table"')
def test_dict(self):
s = sql.SQL("select {f} from {t}").format(
f=sql.Identifier('field'), t=sql.Identifier('table'))
s1 = s.as_string(self.conn)
self.assert_(isinstance(s1, str))
self.assertTrue(isinstance(s1, str))
self.assertEqual(s1, 'select "field" from "table"')
def test_unicode(self):
s = sql.SQL(u"select {0} from {1}").format(
sql.Identifier(u'field'), sql.Identifier('table'))
s1 = s.as_string(self.conn)
self.assert_(isinstance(s1, unicode))
self.assertTrue(isinstance(s1, unicode))
self.assertEqual(s1, u'select "field" from "table"')
def test_compose_literal(self):
@ -176,11 +176,11 @@ class SqlFormatTests(ConnectingTestCase):
class IdentifierTests(ConnectingTestCase):
def test_class(self):
self.assert_(issubclass(sql.Identifier, sql.Composable))
self.assertTrue(issubclass(sql.Identifier, sql.Composable))
def test_init(self):
self.assert_(isinstance(sql.Identifier('foo'), sql.Identifier))
self.assert_(isinstance(sql.Identifier(u'foo'), sql.Identifier))
self.assertTrue(isinstance(sql.Identifier('foo'), sql.Identifier))
self.assertTrue(isinstance(sql.Identifier(u'foo'), sql.Identifier))
self.assertRaises(TypeError, sql.Identifier, 10)
self.assertRaises(TypeError, sql.Identifier, dt.date(2016, 12, 31))
@ -193,29 +193,29 @@ class IdentifierTests(ConnectingTestCase):
self.assertEqual(repr(obj), str(obj))
def test_eq(self):
self.assert_(sql.Identifier('foo') == sql.Identifier('foo'))
self.assert_(sql.Identifier('foo') != sql.Identifier('bar'))
self.assert_(sql.Identifier('foo') != 'foo')
self.assert_(sql.Identifier('foo') != sql.SQL('foo'))
self.assertTrue(sql.Identifier('foo') == sql.Identifier('foo'))
self.assertTrue(sql.Identifier('foo') != sql.Identifier('bar'))
self.assertTrue(sql.Identifier('foo') != 'foo')
self.assertTrue(sql.Identifier('foo') != sql.SQL('foo'))
def test_as_str(self):
self.assertEqual(sql.Identifier('foo').as_string(self.conn), '"foo"')
self.assertEqual(sql.Identifier("fo'o").as_string(self.conn), '"fo\'o"')
def test_join(self):
self.assert_(not hasattr(sql.Identifier('foo'), 'join'))
self.assertTrue(not hasattr(sql.Identifier('foo'), 'join'))
class LiteralTests(ConnectingTestCase):
def test_class(self):
self.assert_(issubclass(sql.Literal, sql.Composable))
self.assertTrue(issubclass(sql.Literal, sql.Composable))
def test_init(self):
self.assert_(isinstance(sql.Literal('foo'), sql.Literal))
self.assert_(isinstance(sql.Literal(u'foo'), sql.Literal))
self.assert_(isinstance(sql.Literal(b'foo'), sql.Literal))
self.assert_(isinstance(sql.Literal(42), sql.Literal))
self.assert_(isinstance(
self.assertTrue(isinstance(sql.Literal('foo'), sql.Literal))
self.assertTrue(isinstance(sql.Literal(u'foo'), sql.Literal))
self.assertTrue(isinstance(sql.Literal(b'foo'), sql.Literal))
self.assertTrue(isinstance(sql.Literal(42), sql.Literal))
self.assertTrue(isinstance(
sql.Literal(dt.date(2016, 12, 31)), sql.Literal))
def test_wrapped(self):
@ -231,10 +231,10 @@ class LiteralTests(ConnectingTestCase):
"'2017-01-01'::date")
def test_eq(self):
self.assert_(sql.Literal('foo') == sql.Literal('foo'))
self.assert_(sql.Literal('foo') != sql.Literal('bar'))
self.assert_(sql.Literal('foo') != 'foo')
self.assert_(sql.Literal('foo') != sql.SQL('foo'))
self.assertTrue(sql.Literal('foo') == sql.Literal('foo'))
self.assertTrue(sql.Literal('foo') != sql.Literal('bar'))
self.assertTrue(sql.Literal('foo') != 'foo')
self.assertTrue(sql.Literal('foo') != sql.SQL('foo'))
def test_must_be_adaptable(self):
class Foo(object):
@ -246,11 +246,11 @@ class LiteralTests(ConnectingTestCase):
class SQLTests(ConnectingTestCase):
def test_class(self):
self.assert_(issubclass(sql.SQL, sql.Composable))
self.assertTrue(issubclass(sql.SQL, sql.Composable))
def test_init(self):
self.assert_(isinstance(sql.SQL('foo'), sql.SQL))
self.assert_(isinstance(sql.SQL(u'foo'), sql.SQL))
self.assertTrue(isinstance(sql.SQL('foo'), sql.SQL))
self.assertTrue(isinstance(sql.SQL(u'foo'), sql.SQL))
self.assertRaises(TypeError, sql.SQL, 10)
self.assertRaises(TypeError, sql.SQL, dt.date(2016, 12, 31))
@ -263,36 +263,36 @@ class SQLTests(ConnectingTestCase):
self.assertEqual(sql.SQL("foo").as_string(self.conn), "foo")
def test_eq(self):
self.assert_(sql.SQL('foo') == sql.SQL('foo'))
self.assert_(sql.SQL('foo') != sql.SQL('bar'))
self.assert_(sql.SQL('foo') != 'foo')
self.assert_(sql.SQL('foo') != sql.Literal('foo'))
self.assertTrue(sql.SQL('foo') == sql.SQL('foo'))
self.assertTrue(sql.SQL('foo') != sql.SQL('bar'))
self.assertTrue(sql.SQL('foo') != 'foo')
self.assertTrue(sql.SQL('foo') != sql.Literal('foo'))
def test_sum(self):
obj = sql.SQL("foo") + sql.SQL("bar")
self.assert_(isinstance(obj, sql.Composed))
self.assertTrue(isinstance(obj, sql.Composed))
self.assertEqual(obj.as_string(self.conn), "foobar")
def test_sum_inplace(self):
obj = sql.SQL("foo")
obj += sql.SQL("bar")
self.assert_(isinstance(obj, sql.Composed))
self.assertTrue(isinstance(obj, sql.Composed))
self.assertEqual(obj.as_string(self.conn), "foobar")
def test_multiply(self):
obj = sql.SQL("foo") * 3
self.assert_(isinstance(obj, sql.Composed))
self.assertTrue(isinstance(obj, sql.Composed))
self.assertEqual(obj.as_string(self.conn), "foofoofoo")
def test_join(self):
obj = sql.SQL(", ").join(
[sql.Identifier('foo'), sql.SQL('bar'), sql.Literal(42)])
self.assert_(isinstance(obj, sql.Composed))
self.assertTrue(isinstance(obj, sql.Composed))
self.assertEqual(obj.as_string(self.conn), '"foo", bar, 42')
obj = sql.SQL(", ").join(
sql.Composed([sql.Identifier('foo'), sql.SQL('bar'), sql.Literal(42)]))
self.assert_(isinstance(obj, sql.Composed))
self.assertTrue(isinstance(obj, sql.Composed))
self.assertEqual(obj.as_string(self.conn), '"foo", bar, 42')
obj = sql.SQL(", ").join([])
@ -301,7 +301,7 @@ class SQLTests(ConnectingTestCase):
class ComposedTest(ConnectingTestCase):
def test_class(self):
self.assert_(issubclass(sql.Composed, sql.Composable))
self.assertTrue(issubclass(sql.Composed, sql.Composable))
def test_repr(self):
obj = sql.Composed([sql.Literal("foo"), sql.Identifier("b'ar")])
@ -316,31 +316,31 @@ class ComposedTest(ConnectingTestCase):
def test_eq(self):
l = [sql.Literal("foo"), sql.Identifier("b'ar")]
l2 = [sql.Literal("foo"), sql.Literal("b'ar")]
self.assert_(sql.Composed(l) == sql.Composed(list(l)))
self.assert_(sql.Composed(l) != l)
self.assert_(sql.Composed(l) != sql.Composed(l2))
self.assertTrue(sql.Composed(l) == sql.Composed(list(l)))
self.assertTrue(sql.Composed(l) != l)
self.assertTrue(sql.Composed(l) != sql.Composed(l2))
def test_join(self):
obj = sql.Composed([sql.Literal("foo"), sql.Identifier("b'ar")])
obj = obj.join(", ")
self.assert_(isinstance(obj, sql.Composed))
self.assertTrue(isinstance(obj, sql.Composed))
self.assertQuotedEqual(obj.as_string(self.conn), "'foo', \"b'ar\"")
def test_sum(self):
obj = sql.Composed([sql.SQL("foo ")])
obj = obj + sql.Literal("bar")
self.assert_(isinstance(obj, sql.Composed))
self.assertTrue(isinstance(obj, sql.Composed))
self.assertQuotedEqual(obj.as_string(self.conn), "foo 'bar'")
def test_sum_inplace(self):
obj = sql.Composed([sql.SQL("foo ")])
obj += sql.Literal("bar")
self.assert_(isinstance(obj, sql.Composed))
self.assertTrue(isinstance(obj, sql.Composed))
self.assertQuotedEqual(obj.as_string(self.conn), "foo 'bar'")
obj = sql.Composed([sql.SQL("foo ")])
obj += sql.Composed([sql.Literal("bar")])
self.assert_(isinstance(obj, sql.Composed))
self.assertTrue(isinstance(obj, sql.Composed))
self.assertQuotedEqual(obj.as_string(self.conn), "foo 'bar'")
def test_iter(self):
@ -355,41 +355,41 @@ class ComposedTest(ConnectingTestCase):
class PlaceholderTest(ConnectingTestCase):
def test_class(self):
self.assert_(issubclass(sql.Placeholder, sql.Composable))
self.assertTrue(issubclass(sql.Placeholder, sql.Composable))
def test_name(self):
self.assertEqual(sql.Placeholder().name, None)
self.assertEqual(sql.Placeholder('foo').name, 'foo')
def test_repr(self):
self.assert_(str(sql.Placeholder()), 'Placeholder()')
self.assert_(repr(sql.Placeholder()), 'Placeholder()')
self.assert_(sql.Placeholder().as_string(self.conn), '%s')
self.assertTrue(str(sql.Placeholder()), 'Placeholder()')
self.assertTrue(repr(sql.Placeholder()), 'Placeholder()')
self.assertTrue(sql.Placeholder().as_string(self.conn), '%s')
def test_repr_name(self):
self.assert_(str(sql.Placeholder('foo')), "Placeholder('foo')")
self.assert_(repr(sql.Placeholder('foo')), "Placeholder('foo')")
self.assert_(sql.Placeholder('foo').as_string(self.conn), '%(foo)s')
self.assertTrue(str(sql.Placeholder('foo')), "Placeholder('foo')")
self.assertTrue(repr(sql.Placeholder('foo')), "Placeholder('foo')")
self.assertTrue(sql.Placeholder('foo').as_string(self.conn), '%(foo)s')
def test_bad_name(self):
self.assertRaises(ValueError, sql.Placeholder, ')')
def test_eq(self):
self.assert_(sql.Placeholder('foo') == sql.Placeholder('foo'))
self.assert_(sql.Placeholder('foo') != sql.Placeholder('bar'))
self.assert_(sql.Placeholder('foo') != 'foo')
self.assert_(sql.Placeholder() == sql.Placeholder())
self.assert_(sql.Placeholder('foo') != sql.Placeholder())
self.assert_(sql.Placeholder('foo') != sql.Literal('foo'))
self.assertTrue(sql.Placeholder('foo') == sql.Placeholder('foo'))
self.assertTrue(sql.Placeholder('foo') != sql.Placeholder('bar'))
self.assertTrue(sql.Placeholder('foo') != 'foo')
self.assertTrue(sql.Placeholder() == sql.Placeholder())
self.assertTrue(sql.Placeholder('foo') != sql.Placeholder())
self.assertTrue(sql.Placeholder('foo') != sql.Literal('foo'))
class ValuesTest(ConnectingTestCase):
def test_null(self):
self.assert_(isinstance(sql.NULL, sql.SQL))
self.assertTrue(isinstance(sql.NULL, sql.SQL))
self.assertEqual(sql.NULL.as_string(self.conn), "NULL")
def test_default(self):
self.assert_(isinstance(sql.DEFAULT, sql.SQL))
self.assertTrue(isinstance(sql.DEFAULT, sql.SQL))
self.assertEqual(sql.DEFAULT.as_string(self.conn), "DEFAULT")

View File

@ -42,41 +42,41 @@ class TypesBasicTests(ConnectingTestCase):
def testQuoting(self):
s = "Quote'this\\! ''ok?''"
self.failUnless(self.execute("SELECT %s AS foo", (s,)) == s,
self.assertTrue(self.execute("SELECT %s AS foo", (s,)) == s,
"wrong quoting: " + s)
def testUnicode(self):
s = u"Quote'this\\! ''ok?''"
self.failUnless(self.execute("SELECT %s AS foo", (s,)) == s,
self.assertTrue(self.execute("SELECT %s AS foo", (s,)) == s,
"wrong unicode quoting: " + s)
def testNumber(self):
s = self.execute("SELECT %s AS foo", (1971,))
self.failUnless(s == 1971, "wrong integer quoting: " + str(s))
self.assertTrue(s == 1971, "wrong integer quoting: " + str(s))
s = self.execute("SELECT %s AS foo", (1971L,))
self.failUnless(s == 1971L, "wrong integer quoting: " + str(s))
self.assertTrue(s == 1971L, "wrong integer quoting: " + str(s))
def testBoolean(self):
x = self.execute("SELECT %s as foo", (False,))
self.assert_(x is False)
self.assertTrue(x is False)
x = self.execute("SELECT %s as foo", (True,))
self.assert_(x is True)
self.assertTrue(x is True)
def testDecimal(self):
s = self.execute("SELECT %s AS foo", (decimal.Decimal("19.10"),))
self.failUnless(s - decimal.Decimal("19.10") == 0,
self.assertTrue(s - decimal.Decimal("19.10") == 0,
"wrong decimal quoting: " + str(s))
s = self.execute("SELECT %s AS foo", (decimal.Decimal("NaN"),))
self.failUnless(str(s) == "NaN", "wrong decimal quoting: " + str(s))
self.failUnless(type(s) == decimal.Decimal,
self.assertTrue(str(s) == "NaN", "wrong decimal quoting: " + str(s))
self.assertTrue(type(s) == decimal.Decimal,
"wrong decimal conversion: " + repr(s))
s = self.execute("SELECT %s AS foo", (decimal.Decimal("infinity"),))
self.failUnless(str(s) == "NaN", "wrong decimal quoting: " + str(s))
self.failUnless(type(s) == decimal.Decimal,
self.assertTrue(str(s) == "NaN", "wrong decimal quoting: " + str(s))
self.assertTrue(type(s) == decimal.Decimal,
"wrong decimal conversion: " + repr(s))
s = self.execute("SELECT %s AS foo", (decimal.Decimal("-infinity"),))
self.failUnless(str(s) == "NaN", "wrong decimal quoting: " + str(s))
self.failUnless(type(s) == decimal.Decimal,
self.assertTrue(str(s) == "NaN", "wrong decimal quoting: " + str(s))
self.assertTrue(type(s) == decimal.Decimal,
"wrong decimal conversion: " + repr(s))
def testFloatNan(self):
@ -86,8 +86,8 @@ class TypesBasicTests(ConnectingTestCase):
return self.skipTest("nan not available on this platform")
s = self.execute("SELECT %s AS foo", (float("nan"),))
self.failUnless(str(s) == "nan", "wrong float quoting: " + str(s))
self.failUnless(type(s) == float, "wrong float conversion: " + repr(s))
self.assertTrue(str(s) == "nan", "wrong float quoting: " + str(s))
self.assertTrue(type(s) == float, "wrong float conversion: " + repr(s))
def testFloatInf(self):
try:
@ -97,11 +97,11 @@ class TypesBasicTests(ConnectingTestCase):
except ValueError:
return self.skipTest("inf not available on this platform")
s = self.execute("SELECT %s AS foo", (float("inf"),))
self.failUnless(str(s) == "inf", "wrong float quoting: " + str(s))
self.failUnless(type(s) == float, "wrong float conversion: " + repr(s))
self.assertTrue(str(s) == "inf", "wrong float quoting: " + str(s))
self.assertTrue(type(s) == float, "wrong float conversion: " + repr(s))
s = self.execute("SELECT %s AS foo", (float("-inf"),))
self.failUnless(str(s) == "-inf", "wrong float quoting: " + str(s))
self.assertTrue(str(s) == "-inf", "wrong float quoting: " + str(s))
def testBinary(self):
if sys.version_info[0] < 3:
@ -145,9 +145,9 @@ class TypesBasicTests(ConnectingTestCase):
def testArray(self):
s = self.execute("SELECT %s AS foo", ([[1, 2], [3, 4]],))
self.failUnlessEqual(s, [[1, 2], [3, 4]])
self.assertEqual(s, [[1, 2], [3, 4]])
s = self.execute("SELECT %s AS foo", (['one', 'two', 'three'],))
self.failUnlessEqual(s, ['one', 'two', 'three'])
self.assertEqual(s, ['one', 'two', 'three'])
def testEmptyArrayRegression(self):
# ticket #42
@ -174,23 +174,23 @@ class TypesBasicTests(ConnectingTestCase):
def testEmptyArray(self):
s = self.execute("SELECT '{}'::text[] AS foo")
self.failUnlessEqual(s, [])
self.assertEqual(s, [])
s = self.execute("SELECT 1 != ALL(%s)", ([],))
self.failUnlessEqual(s, True)
self.assertEqual(s, True)
# but don't break the strings :)
s = self.execute("SELECT '{}'::text AS foo")
self.failUnlessEqual(s, "{}")
self.assertEqual(s, "{}")
def testArrayEscape(self):
ss = ['', '\\', '"', '\\\\', '\\"']
for s in ss:
r = self.execute("SELECT %s AS foo", (s,))
self.failUnlessEqual(s, r)
self.assertEqual(s, r)
r = self.execute("SELECT %s AS foo", ([s],))
self.failUnlessEqual([s], r)
self.assertEqual([s], r)
r = self.execute("SELECT %s AS foo", (ss,))
self.failUnlessEqual(ss, r)
self.assertEqual(ss, r)
def testArrayMalformed(self):
curs = self.conn.cursor()

View File

@ -45,10 +45,10 @@ class TypesExtrasTests(ConnectingTestCase):
psycopg2.extras.register_uuid()
u = uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350')
s = self.execute("SELECT %s AS foo", (u,))
self.failUnless(u == s)
self.assertTrue(u == s)
# must survive NULL cast to a uuid
s = self.execute("SELECT NULL::uuid AS foo")
self.failUnless(s is None)
self.assertTrue(s is None)
@skip_if_no_uuid
def testUUIDARRAY(self):
@ -57,17 +57,17 @@ class TypesExtrasTests(ConnectingTestCase):
u = [uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350'),
uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e352')]
s = self.execute("SELECT %s AS foo", (u,))
self.failUnless(u == s)
self.assertTrue(u == s)
# array with a NULL element
u = [uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350'), None]
s = self.execute("SELECT %s AS foo", (u,))
self.failUnless(u == s)
self.assertTrue(u == s)
# must survive NULL cast to a uuid[]
s = self.execute("SELECT NULL::uuid[] AS foo")
self.failUnless(s is None)
self.assertTrue(s is None)
# what about empty arrays?
s = self.execute("SELECT '{}'::uuid[] AS foo")
self.failUnless(type(s) == list and len(s) == 0)
self.assertTrue(type(s) == list and len(s) == 0)
def testINET(self):
with warnings.catch_warnings():
@ -76,10 +76,10 @@ class TypesExtrasTests(ConnectingTestCase):
i = psycopg2.extras.Inet("192.168.1.0/24")
s = self.execute("SELECT %s AS foo", (i,))
self.failUnless(i.addr == s.addr)
self.assertTrue(i.addr == s.addr)
# must survive NULL cast to inet
s = self.execute("SELECT NULL::inet AS foo")
self.failUnless(s is None)
self.assertTrue(s is None)
def testINETARRAY(self):
with warnings.catch_warnings():
@ -88,10 +88,10 @@ class TypesExtrasTests(ConnectingTestCase):
i = psycopg2.extras.Inet("192.168.1.0/24")
s = self.execute("SELECT %s AS foo", ([i],))
self.failUnless(i.addr == s[0].addr)
self.assertTrue(i.addr == s[0].addr)
# must survive NULL cast to inet
s = self.execute("SELECT NULL::inet[] AS foo")
self.failUnless(s is None)
self.assertTrue(s is None)
def test_inet_conform(self):
from psycopg2.extras import Inet
@ -114,13 +114,13 @@ class TypesExtrasTests(ConnectingTestCase):
try:
psycopg2.extensions.adapt(Foo(), ext.ISQLQuote, None)
except psycopg2.ProgrammingError as err:
self.failUnless(str(err) == "can't adapt type 'Foo'")
self.assertTrue(str(err) == "can't adapt type 'Foo'")
def test_point_array(self):
# make sure a point array is never casted to a float array,
# see https://github.com/psycopg/psycopg2/issues/613
s = self.execute("""SELECT '{"(1,2)","(3,4)"}' AS foo""")
self.failUnless(s == """{"(1,2)","(3,4)"}""")
self.assertTrue(s == """{"(1,2)","(3,4)"}""")
def skip_if_no_hstore(f):
@wraps(f)
@ -149,7 +149,7 @@ class HstoreTestCase(ConnectingTestCase):
a.prepare(self.conn)
q = a.getquoted()
self.assert_(q.startswith(b"(("), q)
self.assertTrue(q.startswith(b"(("), q)
ii = q[1:-1].split(b"||")
ii.sort()
@ -176,7 +176,7 @@ class HstoreTestCase(ConnectingTestCase):
q = a.getquoted()
m = re.match(br'hstore\(ARRAY\[([^\]]+)\], ARRAY\[([^\]]+)\]\)', q)
self.assert_(m, repr(q))
self.assertTrue(m, repr(q))
kk = m.group(1).split(b", ")
vv = m.group(2).split(b", ")
@ -233,7 +233,7 @@ class HstoreTestCase(ConnectingTestCase):
cur = self.conn.cursor()
cur.execute("select null::hstore, ''::hstore, 'a => b'::hstore")
t = cur.fetchone()
self.assert_(t[0] is None)
self.assertTrue(t[0] is None)
self.assertEqual(t[1], {})
self.assertEqual(t[2], {'a': 'b'})
@ -245,7 +245,7 @@ class HstoreTestCase(ConnectingTestCase):
register_hstore(cur)
cur.execute("select null::hstore, ''::hstore, 'a => b'::hstore")
t = cur.fetchone()
self.assert_(t[0] is None)
self.assertTrue(t[0] is None)
self.assertEqual(t[1], {})
self.assertEqual(t[2], {'a': 'b'})
@ -257,11 +257,11 @@ class HstoreTestCase(ConnectingTestCase):
cur = self.conn.cursor()
cur.execute("select null::hstore, ''::hstore, 'a => b'::hstore")
t = cur.fetchone()
self.assert_(t[0] is None)
self.assertTrue(t[0] is None)
self.assertEqual(t[1], {})
self.assertEqual(t[2], {u'a': u'b'})
self.assert_(isinstance(t[2].keys()[0], unicode))
self.assert_(isinstance(t[2].values()[0], unicode))
self.assertTrue(isinstance(t[2].keys()[0], unicode))
self.assertTrue(isinstance(t[2].values()[0], unicode))
@skip_if_no_hstore
def test_register_globally(self):
@ -275,7 +275,7 @@ class HstoreTestCase(ConnectingTestCase):
cur2 = self.conn.cursor()
cur2.execute("select 'a => b'::hstore")
r = cur2.fetchone()
self.assert_(isinstance(r[0], dict))
self.assertTrue(isinstance(r[0], dict))
finally:
conn2.close()
finally:
@ -285,7 +285,7 @@ class HstoreTestCase(ConnectingTestCase):
cur = self.conn.cursor()
cur.execute("select 'a => b'::hstore")
r = cur.fetchone()
self.assert_(isinstance(r[0], str))
self.assertTrue(isinstance(r[0], str))
@skip_if_no_hstore
def test_roundtrip(self):
@ -298,7 +298,7 @@ class HstoreTestCase(ConnectingTestCase):
d1 = cur.fetchone()[0]
self.assertEqual(len(d), len(d1))
for k in d:
self.assert_(k in d1, k)
self.assertTrue(k in d1, k)
self.assertEqual(d[k], d1[k])
ok({})
@ -328,10 +328,10 @@ class HstoreTestCase(ConnectingTestCase):
d1 = cur.fetchone()[0]
self.assertEqual(len(d), len(d1))
for k, v in d1.iteritems():
self.assert_(k in d, k)
self.assertTrue(k in d, k)
self.assertEqual(d[k], v)
self.assert_(isinstance(k, unicode))
self.assert_(v is None or isinstance(v, unicode))
self.assertTrue(isinstance(k, unicode))
self.assertTrue(v is None or isinstance(v, unicode))
ok({})
ok({'a': 'b', 'c': None, 'd': u'\u20ac', u'\u2603': 'e'})
@ -353,7 +353,7 @@ class HstoreTestCase(ConnectingTestCase):
try:
cur.execute("select null::hstore, ''::hstore, 'a => b'::hstore")
t = cur.fetchone()
self.assert_(t[0] is None)
self.assertTrue(t[0] is None)
self.assertEqual(t[1], {})
self.assertEqual(t[2], {'a': 'b'})
@ -411,7 +411,7 @@ class HstoreTestCase(ConnectingTestCase):
select null::hstore, ''::hstore,
'a => b'::hstore, '{a=>b}'::hstore[]""")
t = cur.fetchone()
self.assert_(t[0] is None)
self.assertTrue(t[0] is None)
self.assertEqual(t[1], {})
self.assertEqual(t[2], {'a': 'b'})
self.assertEqual(t[3], [{'a': 'b'}])
@ -529,7 +529,7 @@ class AdaptTypeTestCase(ConnectingTestCase):
self.assertEqual(t.name, 'type_isd')
self.assertEqual(t.schema, 'public')
self.assertEqual(t.oid, oid)
self.assert_(issubclass(t.type, tuple))
self.assertTrue(issubclass(t.type, tuple))
self.assertEqual(t.attnames, ['anint', 'astring', 'adate'])
self.assertEqual(t.atttypes, [23, 25, 1082])
@ -537,7 +537,7 @@ class AdaptTypeTestCase(ConnectingTestCase):
r = (10, 'hello', date(2011, 1, 2))
curs.execute("select %s::type_isd;", (r,))
v = curs.fetchone()[0]
self.assert_(isinstance(v, t.type))
self.assertTrue(isinstance(v, t.type))
self.assertEqual(v[0], 10)
self.assertEqual(v[1], "hello")
self.assertEqual(v[2], date(2011, 1, 2))
@ -547,7 +547,7 @@ class AdaptTypeTestCase(ConnectingTestCase):
except ImportError:
pass
else:
self.assert_(t.type is not tuple)
self.assertTrue(t.type is not tuple)
self.assertEqual(v.anint, 10)
self.assertEqual(v.astring, "hello")
self.assertEqual(v.adate, date(2011, 1, 2))
@ -689,11 +689,11 @@ class AdaptTypeTestCase(ConnectingTestCase):
curs.execute("select %s::type_isd[];", ([r1, r2],))
v = curs.fetchone()[0]
self.assertEqual(len(v), 2)
self.assert_(isinstance(v[0], t.type))
self.assertTrue(isinstance(v[0], t.type))
self.assertEqual(v[0][0], 10)
self.assertEqual(v[0][1], "hello")
self.assertEqual(v[0][2], date(2011, 1, 2))
self.assert_(isinstance(v[1], t.type))
self.assertTrue(isinstance(v[1], t.type))
self.assertEqual(v[1][0], 20)
self.assertEqual(v[1][1], "world")
self.assertEqual(v[1][2], date(2011, 1, 3))
@ -798,7 +798,7 @@ class AdaptTypeTestCase(ConnectingTestCase):
r = (10, 'hello', date(2011, 1, 2))
curs.execute("select %s::type_isd;", (r,))
v = curs.fetchone()[0]
self.assert_(isinstance(v, dict))
self.assertTrue(isinstance(v, dict))
self.assertEqual(v['anint'], 10)
self.assertEqual(v['astring'], "hello")
self.assertEqual(v['adate'], date(2011, 1, 2))
@ -1008,7 +1008,7 @@ class JsonTestCase(ConnectingTestCase):
curs = self.conn.cursor()
curs.execute("""select '{"a": 100.0, "b": null}'::json""")
data = curs.fetchone()[0]
self.assert_(isinstance(data['a'], Decimal))
self.assertTrue(isinstance(data['a'], Decimal))
self.assertEqual(data['a'], Decimal('100.0'))
@skip_if_no_json_module
@ -1028,7 +1028,7 @@ class JsonTestCase(ConnectingTestCase):
curs = self.conn.cursor()
curs.execute("""select '{"a": 100.0, "b": null}'::json""")
data = curs.fetchone()[0]
self.assert_(isinstance(data['a'], Decimal))
self.assertTrue(isinstance(data['a'], Decimal))
self.assertEqual(data['a'], Decimal('100.0'))
finally:
psycopg2.extensions.string_types.pop(new.values[0])
@ -1049,12 +1049,12 @@ class JsonTestCase(ConnectingTestCase):
curs.execute("""select '{"a": 100.0, "b": null}'::json""")
data = curs.fetchone()[0]
self.assert_(isinstance(data['a'], Decimal))
self.assertTrue(isinstance(data['a'], Decimal))
self.assertEqual(data['a'], Decimal('100.0'))
curs.execute("""select array['{"a": 100.0, "b": null}']::json[]""")
data = curs.fetchone()[0]
self.assert_(isinstance(data[0]['a'], Decimal))
self.assertTrue(isinstance(data[0]['a'], Decimal))
self.assertEqual(data[0]['a'], Decimal('100.0'))
@skip_if_no_json_module
@ -1085,10 +1085,10 @@ class JsonTestCase(ConnectingTestCase):
obj = {'a': [1, 2, snowman]}
j = psycopg2.extensions.adapt(psycopg2.extras.Json(obj))
s = str(j)
self.assert_(isinstance(s, str))
self.assertTrue(isinstance(s, str))
# no pesky b's
self.assert_(s.startswith("'"))
self.assert_(s.endswith("'"))
self.assertTrue(s.startswith("'"))
self.assertTrue(s.endswith("'"))
@skip_if_no_json_module
@skip_before_postgres(8, 2)
@ -1170,12 +1170,12 @@ class JsonbTestCase(ConnectingTestCase):
curs = self.conn.cursor()
curs.execute("""select '{"a": 100.0, "b": null}'::jsonb""")
data = curs.fetchone()[0]
self.assert_(isinstance(data['a'], Decimal))
self.assertTrue(isinstance(data['a'], Decimal))
self.assertEqual(data['a'], Decimal('100.0'))
# sure we are not manling json too?
curs.execute("""select '{"a": 100.0, "b": null}'::json""")
data = curs.fetchone()[0]
self.assert_(isinstance(data['a'], float))
self.assertTrue(isinstance(data['a'], float))
self.assertEqual(data['a'], 100.0)
def test_register_default(self):
@ -1188,12 +1188,12 @@ class JsonbTestCase(ConnectingTestCase):
curs.execute("""select '{"a": 100.0, "b": null}'::jsonb""")
data = curs.fetchone()[0]
self.assert_(isinstance(data['a'], Decimal))
self.assertTrue(isinstance(data['a'], Decimal))
self.assertEqual(data['a'], Decimal('100.0'))
curs.execute("""select array['{"a": 100.0, "b": null}']::jsonb[]""")
data = curs.fetchone()[0]
self.assert_(isinstance(data[0]['a'], Decimal))
self.assertTrue(isinstance(data[0]['a'], Decimal))
self.assertEqual(data[0]['a'], Decimal('100.0'))
def test_null(self):
@ -1212,36 +1212,36 @@ class RangeTestCase(unittest.TestCase):
from psycopg2.extras import Range
r = Range()
self.assert_(not r.isempty)
self.assertTrue(not r.isempty)
self.assertEqual(r.lower, None)
self.assertEqual(r.upper, None)
self.assert_(r.lower_inf)
self.assert_(r.upper_inf)
self.assert_(not r.lower_inc)
self.assert_(not r.upper_inc)
self.assertTrue(r.lower_inf)
self.assertTrue(r.upper_inf)
self.assertTrue(not r.lower_inc)
self.assertTrue(not r.upper_inc)
def test_empty(self):
from psycopg2.extras import Range
r = Range(empty=True)
self.assert_(r.isempty)
self.assertTrue(r.isempty)
self.assertEqual(r.lower, None)
self.assertEqual(r.upper, None)
self.assert_(not r.lower_inf)
self.assert_(not r.upper_inf)
self.assert_(not r.lower_inc)
self.assert_(not r.upper_inc)
self.assertTrue(not r.lower_inf)
self.assertTrue(not r.upper_inf)
self.assertTrue(not r.lower_inc)
self.assertTrue(not r.upper_inc)
def test_nobounds(self):
from psycopg2.extras import Range
r = Range(10, 20)
self.assertEqual(r.lower, 10)
self.assertEqual(r.upper, 20)
self.assert_(not r.isempty)
self.assert_(not r.lower_inf)
self.assert_(not r.upper_inf)
self.assert_(r.lower_inc)
self.assert_(not r.upper_inc)
self.assertTrue(not r.isempty)
self.assertTrue(not r.lower_inf)
self.assertTrue(not r.upper_inf)
self.assertTrue(r.lower_inc)
self.assertTrue(not r.upper_inc)
def test_bounds(self):
from psycopg2.extras import Range
@ -1253,9 +1253,9 @@ class RangeTestCase(unittest.TestCase):
r = Range(10, 20, bounds)
self.assertEqual(r.lower, 10)
self.assertEqual(r.upper, 20)
self.assert_(not r.isempty)
self.assert_(not r.lower_inf)
self.assert_(not r.upper_inf)
self.assertTrue(not r.isempty)
self.assertTrue(not r.lower_inf)
self.assertTrue(not r.upper_inf)
self.assertEqual(r.lower_inc, lower_inc)
self.assertEqual(r.upper_inc, upper_inc)
@ -1264,20 +1264,20 @@ class RangeTestCase(unittest.TestCase):
r = Range(upper=20)
self.assertEqual(r.lower, None)
self.assertEqual(r.upper, 20)
self.assert_(not r.isempty)
self.assert_(r.lower_inf)
self.assert_(not r.upper_inf)
self.assert_(not r.lower_inc)
self.assert_(not r.upper_inc)
self.assertTrue(not r.isempty)
self.assertTrue(r.lower_inf)
self.assertTrue(not r.upper_inf)
self.assertTrue(not r.lower_inc)
self.assertTrue(not r.upper_inc)
r = Range(lower=10, bounds='(]')
self.assertEqual(r.lower, 10)
self.assertEqual(r.upper, None)
self.assert_(not r.isempty)
self.assert_(not r.lower_inf)
self.assert_(r.upper_inf)
self.assert_(not r.lower_inc)
self.assert_(not r.upper_inc)
self.assertTrue(not r.isempty)
self.assertTrue(not r.lower_inf)
self.assertTrue(r.upper_inf)
self.assertTrue(not r.lower_inc)
self.assertTrue(not r.upper_inc)
def test_bad_bounds(self):
from psycopg2.extras import Range
@ -1287,65 +1287,65 @@ class RangeTestCase(unittest.TestCase):
def test_in(self):
from psycopg2.extras import Range
r = Range(empty=True)
self.assert_(10 not in r)
self.assertTrue(10 not in r)
r = Range()
self.assert_(10 in r)
self.assertTrue(10 in r)
r = Range(lower=10, bounds='[)')
self.assert_(9 not in r)
self.assert_(10 in r)
self.assert_(11 in r)
self.assertTrue(9 not in r)
self.assertTrue(10 in r)
self.assertTrue(11 in r)
r = Range(lower=10, bounds='()')
self.assert_(9 not in r)
self.assert_(10 not in r)
self.assert_(11 in r)
self.assertTrue(9 not in r)
self.assertTrue(10 not in r)
self.assertTrue(11 in r)
r = Range(upper=20, bounds='()')
self.assert_(19 in r)
self.assert_(20 not in r)
self.assert_(21 not in r)
self.assertTrue(19 in r)
self.assertTrue(20 not in r)
self.assertTrue(21 not in r)
r = Range(upper=20, bounds='(]')
self.assert_(19 in r)
self.assert_(20 in r)
self.assert_(21 not in r)
self.assertTrue(19 in r)
self.assertTrue(20 in r)
self.assertTrue(21 not in r)
r = Range(10, 20)
self.assert_(9 not in r)
self.assert_(10 in r)
self.assert_(11 in r)
self.assert_(19 in r)
self.assert_(20 not in r)
self.assert_(21 not in r)
self.assertTrue(9 not in r)
self.assertTrue(10 in r)
self.assertTrue(11 in r)
self.assertTrue(19 in r)
self.assertTrue(20 not in r)
self.assertTrue(21 not in r)
r = Range(10, 20, '(]')
self.assert_(9 not in r)
self.assert_(10 not in r)
self.assert_(11 in r)
self.assert_(19 in r)
self.assert_(20 in r)
self.assert_(21 not in r)
self.assertTrue(9 not in r)
self.assertTrue(10 not in r)
self.assertTrue(11 in r)
self.assertTrue(19 in r)
self.assertTrue(20 in r)
self.assertTrue(21 not in r)
r = Range(20, 10)
self.assert_(9 not in r)
self.assert_(10 not in r)
self.assert_(11 not in r)
self.assert_(19 not in r)
self.assert_(20 not in r)
self.assert_(21 not in r)
self.assertTrue(9 not in r)
self.assertTrue(10 not in r)
self.assertTrue(11 not in r)
self.assertTrue(19 not in r)
self.assertTrue(20 not in r)
self.assertTrue(21 not in r)
def test_nonzero(self):
from psycopg2.extras import Range
self.assert_(Range())
self.assert_(Range(10, 20))
self.assert_(not Range(empty=True))
self.assertTrue(Range())
self.assertTrue(Range(10, 20))
self.assertTrue(not Range(empty=True))
def test_eq_hash(self):
def assert_equal(r1, r2):
self.assert_(r1 == r2)
self.assert_(hash(r1) == hash(r2))
self.assertTrue(r1 == r2)
self.assertTrue(hash(r1) == hash(r2))
from psycopg2.extras import Range
assert_equal(Range(empty=True), Range(empty=True))
@ -1356,8 +1356,8 @@ class RangeTestCase(unittest.TestCase):
assert_equal(Range(10, 20, '[]'), Range(10, 20, '[]'))
def assert_not_equal(r1, r2):
self.assert_(r1 != r2)
self.assert_(hash(r1) != hash(r2))
self.assertTrue(r1 != r2)
self.assertTrue(hash(r1) != hash(r2))
assert_not_equal(Range(10, 20), Range(10, 21))
assert_not_equal(Range(10, 20), Range(11, 20))
@ -1385,67 +1385,67 @@ class RangeTestCase(unittest.TestCase):
def test_lt_ordering(self):
from psycopg2.extras import Range
self.assert_(Range(empty=True) < Range(0, 4))
self.assert_(not Range(1, 2) < Range(0, 4))
self.assert_(Range(0, 4) < Range(1, 2))
self.assert_(not Range(1, 2) < Range())
self.assert_(Range() < Range(1, 2))
self.assert_(not Range(1) < Range(upper=1))
self.assert_(not Range() < Range())
self.assert_(not Range(empty=True) < Range(empty=True))
self.assert_(not Range(1, 2) < Range(1, 2))
self.assertTrue(Range(empty=True) < Range(0, 4))
self.assertTrue(not Range(1, 2) < Range(0, 4))
self.assertTrue(Range(0, 4) < Range(1, 2))
self.assertTrue(not Range(1, 2) < Range())
self.assertTrue(Range() < Range(1, 2))
self.assertTrue(not Range(1) < Range(upper=1))
self.assertTrue(not Range() < Range())
self.assertTrue(not Range(empty=True) < Range(empty=True))
self.assertTrue(not Range(1, 2) < Range(1, 2))
with py3_raises_typeerror():
self.assert_(1 < Range(1, 2))
self.assertTrue(1 < Range(1, 2))
with py3_raises_typeerror():
self.assert_(not Range(1, 2) < 1)
self.assertTrue(not Range(1, 2) < 1)
def test_gt_ordering(self):
from psycopg2.extras import Range
self.assert_(not Range(empty=True) > Range(0, 4))
self.assert_(Range(1, 2) > Range(0, 4))
self.assert_(not Range(0, 4) > Range(1, 2))
self.assert_(Range(1, 2) > Range())
self.assert_(not Range() > Range(1, 2))
self.assert_(Range(1) > Range(upper=1))
self.assert_(not Range() > Range())
self.assert_(not Range(empty=True) > Range(empty=True))
self.assert_(not Range(1, 2) > Range(1, 2))
self.assertTrue(not Range(empty=True) > Range(0, 4))
self.assertTrue(Range(1, 2) > Range(0, 4))
self.assertTrue(not Range(0, 4) > Range(1, 2))
self.assertTrue(Range(1, 2) > Range())
self.assertTrue(not Range() > Range(1, 2))
self.assertTrue(Range(1) > Range(upper=1))
self.assertTrue(not Range() > Range())
self.assertTrue(not Range(empty=True) > Range(empty=True))
self.assertTrue(not Range(1, 2) > Range(1, 2))
with py3_raises_typeerror():
self.assert_(not 1 > Range(1, 2))
self.assertTrue(not 1 > Range(1, 2))
with py3_raises_typeerror():
self.assert_(Range(1, 2) > 1)
self.assertTrue(Range(1, 2) > 1)
def test_le_ordering(self):
from psycopg2.extras import Range
self.assert_(Range(empty=True) <= Range(0, 4))
self.assert_(not Range(1, 2) <= Range(0, 4))
self.assert_(Range(0, 4) <= Range(1, 2))
self.assert_(not Range(1, 2) <= Range())
self.assert_(Range() <= Range(1, 2))
self.assert_(not Range(1) <= Range(upper=1))
self.assert_(Range() <= Range())
self.assert_(Range(empty=True) <= Range(empty=True))
self.assert_(Range(1, 2) <= Range(1, 2))
self.assertTrue(Range(empty=True) <= Range(0, 4))
self.assertTrue(not Range(1, 2) <= Range(0, 4))
self.assertTrue(Range(0, 4) <= Range(1, 2))
self.assertTrue(not Range(1, 2) <= Range())
self.assertTrue(Range() <= Range(1, 2))
self.assertTrue(not Range(1) <= Range(upper=1))
self.assertTrue(Range() <= Range())
self.assertTrue(Range(empty=True) <= Range(empty=True))
self.assertTrue(Range(1, 2) <= Range(1, 2))
with py3_raises_typeerror():
self.assert_(1 <= Range(1, 2))
self.assertTrue(1 <= Range(1, 2))
with py3_raises_typeerror():
self.assert_(not Range(1, 2) <= 1)
self.assertTrue(not Range(1, 2) <= 1)
def test_ge_ordering(self):
from psycopg2.extras import Range
self.assert_(not Range(empty=True) >= Range(0, 4))
self.assert_(Range(1, 2) >= Range(0, 4))
self.assert_(not Range(0, 4) >= Range(1, 2))
self.assert_(Range(1, 2) >= Range())
self.assert_(not Range() >= Range(1, 2))
self.assert_(Range(1) >= Range(upper=1))
self.assert_(Range() >= Range())
self.assert_(Range(empty=True) >= Range(empty=True))
self.assert_(Range(1, 2) >= Range(1, 2))
self.assertTrue(not Range(empty=True) >= Range(0, 4))
self.assertTrue(Range(1, 2) >= Range(0, 4))
self.assertTrue(not Range(0, 4) >= Range(1, 2))
self.assertTrue(Range(1, 2) >= Range())
self.assertTrue(not Range() >= Range(1, 2))
self.assertTrue(Range(1) >= Range(upper=1))
self.assertTrue(Range() >= Range())
self.assertTrue(Range(empty=True) >= Range(empty=True))
self.assertTrue(Range(1, 2) >= Range(1, 2))
with py3_raises_typeerror():
self.assert_(not 1 >= Range(1, 2))
self.assertTrue(not 1 >= Range(1, 2))
with py3_raises_typeerror():
self.assert_(Range(1, 2) >= 1)
self.assertTrue(Range(1, 2) >= 1)
def test_pickling(self):
from psycopg2.extras import Range
@ -1485,8 +1485,8 @@ class RangeCasterTestCase(ConnectingTestCase):
for type in self.builtin_ranges:
cur.execute("select 'empty'::%s" % type)
r = cur.fetchone()[0]
self.assert_(isinstance(r, Range), type)
self.assert_(r.isempty)
self.assertTrue(isinstance(r, Range), type)
self.assertTrue(r.isempty)
def test_cast_inf(self):
from psycopg2.extras import Range
@ -1494,10 +1494,10 @@ class RangeCasterTestCase(ConnectingTestCase):
for type in self.builtin_ranges:
cur.execute("select '(,)'::%s" % type)
r = cur.fetchone()[0]
self.assert_(isinstance(r, Range), type)
self.assert_(not r.isempty)
self.assert_(r.lower_inf)
self.assert_(r.upper_inf)
self.assertTrue(isinstance(r, Range), type)
self.assertTrue(not r.isempty)
self.assertTrue(r.lower_inf)
self.assertTrue(r.upper_inf)
def test_cast_numbers(self):
from psycopg2.extras import NumericRange
@ -1505,39 +1505,39 @@ class RangeCasterTestCase(ConnectingTestCase):
for type in ('int4range', 'int8range'):
cur.execute("select '(10,20)'::%s" % type)
r = cur.fetchone()[0]
self.assert_(isinstance(r, NumericRange))
self.assert_(not r.isempty)
self.assertTrue(isinstance(r, NumericRange))
self.assertTrue(not r.isempty)
self.assertEqual(r.lower, 11)
self.assertEqual(r.upper, 20)
self.assert_(not r.lower_inf)
self.assert_(not r.upper_inf)
self.assert_(r.lower_inc)
self.assert_(not r.upper_inc)
self.assertTrue(not r.lower_inf)
self.assertTrue(not r.upper_inf)
self.assertTrue(r.lower_inc)
self.assertTrue(not r.upper_inc)
cur.execute("select '(10.2,20.6)'::numrange")
r = cur.fetchone()[0]
self.assert_(isinstance(r, NumericRange))
self.assert_(not r.isempty)
self.assertTrue(isinstance(r, NumericRange))
self.assertTrue(not r.isempty)
self.assertEqual(r.lower, Decimal('10.2'))
self.assertEqual(r.upper, Decimal('20.6'))
self.assert_(not r.lower_inf)
self.assert_(not r.upper_inf)
self.assert_(not r.lower_inc)
self.assert_(not r.upper_inc)
self.assertTrue(not r.lower_inf)
self.assertTrue(not r.upper_inf)
self.assertTrue(not r.lower_inc)
self.assertTrue(not r.upper_inc)
def test_cast_date(self):
from psycopg2.extras import DateRange
cur = self.conn.cursor()
cur.execute("select '(2000-01-01,2012-12-31)'::daterange")
r = cur.fetchone()[0]
self.assert_(isinstance(r, DateRange))
self.assert_(not r.isempty)
self.assertTrue(isinstance(r, DateRange))
self.assertTrue(not r.isempty)
self.assertEqual(r.lower, date(2000, 1, 2))
self.assertEqual(r.upper, date(2012, 12, 31))
self.assert_(not r.lower_inf)
self.assert_(not r.upper_inf)
self.assert_(r.lower_inc)
self.assert_(not r.upper_inc)
self.assertTrue(not r.lower_inf)
self.assertTrue(not r.upper_inf)
self.assertTrue(r.lower_inc)
self.assertTrue(not r.upper_inc)
def test_cast_timestamp(self):
from psycopg2.extras import DateTimeRange
@ -1546,14 +1546,14 @@ class RangeCasterTestCase(ConnectingTestCase):
ts2 = datetime(2000, 12, 31, 23, 59, 59, 999)
cur.execute("select tsrange(%s, %s, '()')", (ts1, ts2))
r = cur.fetchone()[0]
self.assert_(isinstance(r, DateTimeRange))
self.assert_(not r.isempty)
self.assertTrue(isinstance(r, DateTimeRange))
self.assertTrue(not r.isempty)
self.assertEqual(r.lower, ts1)
self.assertEqual(r.upper, ts2)
self.assert_(not r.lower_inf)
self.assert_(not r.upper_inf)
self.assert_(not r.lower_inc)
self.assert_(not r.upper_inc)
self.assertTrue(not r.lower_inf)
self.assertTrue(not r.upper_inf)
self.assertTrue(not r.lower_inc)
self.assertTrue(not r.upper_inc)
def test_cast_timestamptz(self):
from psycopg2.extras import DateTimeTZRange
@ -1564,14 +1564,14 @@ class RangeCasterTestCase(ConnectingTestCase):
tzinfo=FixedOffsetTimezone(600))
cur.execute("select tstzrange(%s, %s, '[]')", (ts1, ts2))
r = cur.fetchone()[0]
self.assert_(isinstance(r, DateTimeTZRange))
self.assert_(not r.isempty)
self.assertTrue(isinstance(r, DateTimeTZRange))
self.assertTrue(not r.isempty)
self.assertEqual(r.lower, ts1)
self.assertEqual(r.upper, ts2)
self.assert_(not r.lower_inf)
self.assert_(not r.upper_inf)
self.assert_(r.lower_inc)
self.assert_(r.upper_inc)
self.assertTrue(not r.lower_inf)
self.assertTrue(not r.upper_inf)
self.assertTrue(r.lower_inc)
self.assertTrue(r.upper_inc)
def test_adapt_number_range(self):
from psycopg2.extras import NumericRange
@ -1580,26 +1580,26 @@ class RangeCasterTestCase(ConnectingTestCase):
r = NumericRange(empty=True)
cur.execute("select %s::int4range", (r,))
r1 = cur.fetchone()[0]
self.assert_(isinstance(r1, NumericRange))
self.assert_(r1.isempty)
self.assertTrue(isinstance(r1, NumericRange))
self.assertTrue(r1.isempty)
r = NumericRange(10, 20)
cur.execute("select %s::int8range", (r,))
r1 = cur.fetchone()[0]
self.assert_(isinstance(r1, NumericRange))
self.assertTrue(isinstance(r1, NumericRange))
self.assertEqual(r1.lower, 10)
self.assertEqual(r1.upper, 20)
self.assert_(r1.lower_inc)
self.assert_(not r1.upper_inc)
self.assertTrue(r1.lower_inc)
self.assertTrue(not r1.upper_inc)
r = NumericRange(Decimal('10.2'), Decimal('20.5'), '(]')
cur.execute("select %s::numrange", (r,))
r1 = cur.fetchone()[0]
self.assert_(isinstance(r1, NumericRange))
self.assertTrue(isinstance(r1, NumericRange))
self.assertEqual(r1.lower, Decimal('10.2'))
self.assertEqual(r1.upper, Decimal('20.5'))
self.assert_(not r1.lower_inc)
self.assert_(r1.upper_inc)
self.assertTrue(not r1.lower_inc)
self.assertTrue(r1.upper_inc)
def test_adapt_numeric_range(self):
from psycopg2.extras import NumericRange
@ -1608,26 +1608,26 @@ class RangeCasterTestCase(ConnectingTestCase):
r = NumericRange(empty=True)
cur.execute("select %s::int4range", (r,))
r1 = cur.fetchone()[0]
self.assert_(isinstance(r1, NumericRange), r1)
self.assert_(r1.isempty)
self.assertTrue(isinstance(r1, NumericRange), r1)
self.assertTrue(r1.isempty)
r = NumericRange(10, 20)
cur.execute("select %s::int8range", (r,))
r1 = cur.fetchone()[0]
self.assert_(isinstance(r1, NumericRange))
self.assertTrue(isinstance(r1, NumericRange))
self.assertEqual(r1.lower, 10)
self.assertEqual(r1.upper, 20)
self.assert_(r1.lower_inc)
self.assert_(not r1.upper_inc)
self.assertTrue(r1.lower_inc)
self.assertTrue(not r1.upper_inc)
r = NumericRange(Decimal('10.2'), Decimal('20.5'), '(]')
cur.execute("select %s::numrange", (r,))
r1 = cur.fetchone()[0]
self.assert_(isinstance(r1, NumericRange))
self.assertTrue(isinstance(r1, NumericRange))
self.assertEqual(r1.lower, Decimal('10.2'))
self.assertEqual(r1.upper, Decimal('20.5'))
self.assert_(not r1.lower_inc)
self.assert_(r1.upper_inc)
self.assertTrue(not r1.lower_inc)
self.assertTrue(r1.upper_inc)
def test_adapt_date_range(self):
from psycopg2.extras import DateRange, DateTimeRange, DateTimeTZRange
@ -1639,17 +1639,17 @@ class RangeCasterTestCase(ConnectingTestCase):
r = DateRange(d1, d2)
cur.execute("select %s", (r,))
r1 = cur.fetchone()[0]
self.assert_(isinstance(r1, DateRange))
self.assertTrue(isinstance(r1, DateRange))
self.assertEqual(r1.lower, d1)
self.assertEqual(r1.upper, d2)
self.assert_(r1.lower_inc)
self.assert_(not r1.upper_inc)
self.assertTrue(r1.lower_inc)
self.assertTrue(not r1.upper_inc)
r = DateTimeRange(empty=True)
cur.execute("select %s", (r,))
r1 = cur.fetchone()[0]
self.assert_(isinstance(r1, DateTimeRange))
self.assert_(r1.isempty)
self.assertTrue(isinstance(r1, DateTimeRange))
self.assertTrue(r1.isempty)
ts1 = datetime(2000, 1, 1, tzinfo=FixedOffsetTimezone(600))
ts2 = datetime(2000, 12, 31, 23, 59, 59, 999,
@ -1657,11 +1657,11 @@ class RangeCasterTestCase(ConnectingTestCase):
r = DateTimeTZRange(ts1, ts2, '(]')
cur.execute("select %s", (r,))
r1 = cur.fetchone()[0]
self.assert_(isinstance(r1, DateTimeTZRange))
self.assertTrue(isinstance(r1, DateTimeTZRange))
self.assertEqual(r1.lower, ts1)
self.assertEqual(r1.upper, ts2)
self.assert_(not r1.lower_inc)
self.assert_(r1.upper_inc)
self.assertTrue(not r1.lower_inc)
self.assertTrue(r1.upper_inc)
def test_register_range_adapter(self):
from psycopg2.extras import Range, register_range
@ -1670,7 +1670,7 @@ class RangeCasterTestCase(ConnectingTestCase):
rc = register_range('textrange', 'TextRange', cur)
TextRange = rc.range
self.assert_(issubclass(TextRange, Range))
self.assertTrue(issubclass(TextRange, Range))
self.assertEqual(TextRange.__name__, 'TextRange')
r = TextRange('a', 'b', '(]')
@ -1678,8 +1678,8 @@ class RangeCasterTestCase(ConnectingTestCase):
r1 = cur.fetchone()[0]
self.assertEqual(r1.lower, 'a')
self.assertEqual(r1.upper, 'b')
self.assert_(not r1.lower_inc)
self.assert_(r1.upper_inc)
self.assertTrue(not r1.lower_inc)
self.assertTrue(r1.upper_inc)
cur.execute("select %s", ([r, r, r],))
rs = cur.fetchone()[0]
@ -1687,8 +1687,8 @@ class RangeCasterTestCase(ConnectingTestCase):
for r1 in rs:
self.assertEqual(r1.lower, 'a')
self.assertEqual(r1.upper, 'b')
self.assert_(not r1.lower_inc)
self.assert_(r1.upper_inc)
self.assertTrue(not r1.lower_inc)
self.assertTrue(r1.upper_inc)
# clear the adapters to allow precise count by scripts/refcounter.py
del ext.adapters[rc.range, ext.ISQLQuote]
@ -1732,7 +1732,7 @@ class RangeCasterTestCase(ConnectingTestCase):
# ...not too many errors! in the above collate there are 17 errors:
# assume in other collates we won't find more than 30
self.assert_(errs < 30,
self.assertTrue(errs < 30,
"too many collate errors. Is the test working?")
cur.execute("select id, range from rangetest order by id")

View File

@ -48,14 +48,14 @@ class WithTestCase(ConnectingTestCase):
class WithConnectionTestCase(WithTestCase):
def test_with_ok(self):
with self.conn as conn:
self.assert_(self.conn is conn)
self.assertTrue(self.conn is conn)
self.assertEqual(conn.status, ext.STATUS_READY)
curs = conn.cursor()
curs.execute("insert into test_with values (1)")
self.assertEqual(conn.status, ext.STATUS_BEGIN)
self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assert_(not self.conn.closed)
self.assertTrue(not self.conn.closed)
curs = self.conn.cursor()
curs.execute("select * from test_with")
@ -69,7 +69,7 @@ class WithConnectionTestCase(WithTestCase):
self.assertEqual(conn.status, ext.STATUS_BEGIN)
self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assert_(not self.conn.closed)
self.assertTrue(not self.conn.closed)
curs = self.conn.cursor()
curs.execute("select * from test_with")
@ -83,7 +83,7 @@ class WithConnectionTestCase(WithTestCase):
self.assertRaises(psycopg2.DataError, f)
self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assert_(not self.conn.closed)
self.assertTrue(not self.conn.closed)
curs = self.conn.cursor()
curs.execute("select * from test_with")
@ -98,7 +98,7 @@ class WithConnectionTestCase(WithTestCase):
self.assertRaises(ZeroDivisionError, f)
self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assert_(not self.conn.closed)
self.assertTrue(not self.conn.closed)
curs = self.conn.cursor()
curs.execute("select * from test_with")
@ -125,7 +125,7 @@ class WithConnectionTestCase(WithTestCase):
curs.execute("insert into test_with values (10)")
self.assertEqual(conn.status, ext.STATUS_READY)
self.assert_(commits)
self.assertTrue(commits)
curs = self.conn.cursor()
curs.execute("select * from test_with")
@ -147,10 +147,10 @@ class WithConnectionTestCase(WithTestCase):
except ZeroDivisionError:
pass
else:
self.assert_("exception not raised")
self.assertTrue("exception not raised")
self.assertEqual(conn.status, ext.STATUS_READY)
self.assert_(rollbacks)
self.assertTrue(rollbacks)
curs = conn.cursor()
curs.execute("select * from test_with")
@ -162,12 +162,12 @@ class WithCursorTestCase(WithTestCase):
with self.conn as conn:
with conn.cursor() as curs:
curs.execute("insert into test_with values (4)")
self.assert_(not curs.closed)
self.assertTrue(not curs.closed)
self.assertEqual(self.conn.status, ext.STATUS_BEGIN)
self.assert_(curs.closed)
self.assertTrue(curs.closed)
self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assert_(not self.conn.closed)
self.assertTrue(not self.conn.closed)
curs = self.conn.cursor()
curs.execute("select * from test_with")
@ -183,8 +183,8 @@ class WithCursorTestCase(WithTestCase):
pass
self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assert_(not self.conn.closed)
self.assert_(curs.closed)
self.assertTrue(not self.conn.closed)
self.assertTrue(curs.closed)
curs = self.conn.cursor()
curs.execute("select * from test_with")
@ -199,10 +199,10 @@ class WithCursorTestCase(WithTestCase):
super(MyCurs, self).close()
with self.conn.cursor(cursor_factory=MyCurs) as curs:
self.assert_(isinstance(curs, MyCurs))
self.assertTrue(isinstance(curs, MyCurs))
self.assert_(curs.closed)
self.assert_(closes)
self.assertTrue(curs.closed)
self.assertTrue(closes)
def test_exception_swallow(self):
# bug #262: __exit__ calls cur.close() that hides the exception

View File

@ -64,17 +64,6 @@ else:
unittest.TestCase.skipTest = skipTest
# Silence warnings caused by the stubbornness of the Python unittest
# maintainers
# http://bugs.python.org/issue9424
if (not hasattr(unittest.TestCase, 'assert_')
or unittest.TestCase.assert_ is not unittest.TestCase.assertTrue):
# mavaff...
unittest.TestCase.assert_ = unittest.TestCase.assertTrue
unittest.TestCase.failUnless = unittest.TestCase.assertTrue
unittest.TestCase.assertEquals = unittest.TestCase.assertEqual
unittest.TestCase.failUnlessEqual = unittest.TestCase.assertEqual
def assertDsnEqual(self, dsn1, dsn2, msg=None):
"""Check that two conninfo string have the same content"""