mirror of
https://github.com/psycopg/psycopg2.git
synced 2024-12-02 13:43:44 +03:00
12ef826d50
If the connection is sync, notices will be processed by pq_fetch() downstream. If the connection is async, here we have only sent the query: no result is ready yet, and neither notices have had a chance to arrive: they will be retrieved later by pq_is_busy(). Added tests to check the above statement don't break.
52 lines
1.4 KiB
Python
52 lines
1.4 KiB
Python
#!/usr/bin/env python
|
|
|
|
import unittest
|
|
import psycopg2
|
|
import tests
|
|
|
|
class ConnectionTests(unittest.TestCase):
|
|
|
|
def connect(self):
|
|
return psycopg2.connect(tests.dsn)
|
|
|
|
def test_closed_attribute(self):
|
|
conn = self.connect()
|
|
self.assertEqual(conn.closed, False)
|
|
conn.close()
|
|
self.assertEqual(conn.closed, True)
|
|
|
|
def test_cursor_closed_attribute(self):
|
|
conn = self.connect()
|
|
curs = conn.cursor()
|
|
self.assertEqual(curs.closed, False)
|
|
curs.close()
|
|
self.assertEqual(curs.closed, True)
|
|
|
|
# Closing the connection closes the cursor:
|
|
curs = conn.cursor()
|
|
conn.close()
|
|
self.assertEqual(curs.closed, True)
|
|
|
|
def test_reset(self):
|
|
conn = self.connect()
|
|
# switch isolation level, then reset
|
|
level = conn.isolation_level
|
|
conn.set_isolation_level(0)
|
|
self.assertEqual(conn.isolation_level, 0)
|
|
conn.reset()
|
|
# now the isolation level should be equal to saved one
|
|
self.assertEqual(conn.isolation_level, level)
|
|
|
|
def test_notices(self):
|
|
conn = self.connect()
|
|
cur = conn.cursor()
|
|
cur.execute("create temp table chatty (id serial primary key);")
|
|
self.assertEqual("CREATE TABLE", cur.statusmessage)
|
|
self.assert_(conn.notices)
|
|
conn.close()
|
|
|
|
def test_suite():
|
|
return unittest.TestLoader().loadTestsFromName(__name__)
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main() |