Test notifies are received ok polling an async cur

This commit is contained in:
Daniele Varrazzo 2019-04-05 18:24:19 +01:00
parent 755a128ffb
commit 46106e1b78

View File

@ -497,8 +497,29 @@ class AsyncTests(ConnectingTestCase):
def test_poll_noop(self): def test_poll_noop(self):
self.conn.poll() self.conn.poll()
@skip_before_postgres(9, 0)
def test_poll_conn_for_notification(self):
with self.conn.cursor() as cur:
cur.execute("listen test")
self.wait(cur)
with self.sync_conn.cursor() as cur:
cur.execute("notify test, 'hello'")
self.sync_conn.commit()
for i in range(10):
self.conn.poll() self.conn.poll()
if self.conn.notifies:
n = self.conn.notifies.pop()
self.assertEqual(n.channel, 'test')
self.assertEqual(n.payload, 'hello')
break
time.sleep(0.1)
else:
self.fail("No notification received")
def test_suite(): def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__) return unittest.TestLoader().loadTestsFromName(__name__)