2010-04-20 21:17:27 +04:00
|
|
|
#!/usr/bin/env python
|
|
|
|
import unittest
|
2010-10-15 12:42:44 +04:00
|
|
|
import warnings
|
2010-04-20 21:17:27 +04:00
|
|
|
|
|
|
|
import psycopg2
|
|
|
|
from psycopg2 import extensions
|
|
|
|
|
|
|
|
import time
|
|
|
|
import select
|
|
|
|
import signal
|
2010-10-15 20:14:47 +04:00
|
|
|
from subprocess import Popen, PIPE
|
2010-04-20 21:17:27 +04:00
|
|
|
|
|
|
|
import sys
|
|
|
|
if sys.version_info < (3,):
|
|
|
|
import tests
|
|
|
|
else:
|
|
|
|
import py3tests as tests
|
|
|
|
|
|
|
|
|
|
|
|
class NotifiesTests(unittest.TestCase):
|
|
|
|
|
|
|
|
def setUp(self):
|
|
|
|
self.conn = psycopg2.connect(tests.dsn)
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
|
|
self.conn.close()
|
|
|
|
|
|
|
|
def autocommit(self, conn):
|
|
|
|
"""Set a connection in autocommit mode."""
|
|
|
|
conn.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
|
|
|
|
|
|
|
def listen(self, name):
|
|
|
|
"""Start listening for a name on self.conn."""
|
|
|
|
curs = self.conn.cursor()
|
|
|
|
curs.execute("LISTEN " + name)
|
|
|
|
curs.close()
|
|
|
|
|
2010-10-15 12:42:44 +04:00
|
|
|
def notify(self, name, sec=0, payload=None):
|
2010-04-20 21:17:27 +04:00
|
|
|
"""Send a notification to the database, eventually after some time."""
|
2010-10-15 12:42:44 +04:00
|
|
|
if payload is None:
|
|
|
|
payload = ''
|
|
|
|
else:
|
|
|
|
payload = ", %r" % payload
|
|
|
|
|
2010-04-20 21:17:27 +04:00
|
|
|
script = ("""\
|
|
|
|
import time
|
|
|
|
time.sleep(%(sec)s)
|
|
|
|
import psycopg2
|
|
|
|
import psycopg2.extensions
|
|
|
|
conn = psycopg2.connect(%(dsn)r)
|
|
|
|
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
2010-10-15 20:14:47 +04:00
|
|
|
print conn.get_backend_pid()
|
2010-04-20 21:17:27 +04:00
|
|
|
curs = conn.cursor()
|
2010-10-15 12:42:44 +04:00
|
|
|
curs.execute("NOTIFY " %(name)r %(payload)r)
|
2010-04-20 21:17:27 +04:00
|
|
|
curs.close()
|
|
|
|
conn.close()
|
|
|
|
"""
|
2010-10-15 12:42:44 +04:00
|
|
|
% { 'dsn': tests.dsn, 'sec': sec, 'name': name, 'payload': payload})
|
2010-04-20 21:17:27 +04:00
|
|
|
|
2010-10-15 20:14:47 +04:00
|
|
|
return Popen([sys.executable, '-c', script], stdout=PIPE)
|
2010-04-20 21:17:27 +04:00
|
|
|
|
|
|
|
def test_notifies_received_on_poll(self):
|
|
|
|
self.autocommit(self.conn)
|
|
|
|
self.listen('foo')
|
|
|
|
|
2010-10-15 20:14:47 +04:00
|
|
|
proc = self.notify('foo', 1)
|
2010-04-20 21:17:27 +04:00
|
|
|
|
|
|
|
t0 = time.time()
|
2010-04-22 20:43:50 +04:00
|
|
|
ready = select.select([self.conn], [], [], 5)
|
2010-04-20 21:17:27 +04:00
|
|
|
t1 = time.time()
|
2010-04-22 20:43:50 +04:00
|
|
|
self.assert_(0.99 < t1 - t0 < 4, t1 - t0)
|
2010-04-20 21:17:27 +04:00
|
|
|
|
2010-10-15 20:14:47 +04:00
|
|
|
pid = int(proc.communicate()[0])
|
2010-04-20 21:17:27 +04:00
|
|
|
self.assertEqual(0, len(self.conn.notifies))
|
|
|
|
self.assertEqual(extensions.POLL_OK, self.conn.poll())
|
|
|
|
self.assertEqual(1, len(self.conn.notifies))
|
2010-10-15 20:14:47 +04:00
|
|
|
self.assertEqual(pid, self.conn.notifies[0][0])
|
2010-04-20 21:17:27 +04:00
|
|
|
self.assertEqual('foo', self.conn.notifies[0][1])
|
|
|
|
|
|
|
|
def test_many_notifies(self):
|
|
|
|
self.autocommit(self.conn)
|
|
|
|
for name in ['foo', 'bar', 'baz']:
|
|
|
|
self.listen(name)
|
|
|
|
|
2010-10-15 20:14:47 +04:00
|
|
|
pids = {}
|
2010-04-20 21:17:27 +04:00
|
|
|
for name in ['foo', 'bar', 'baz', 'qux']:
|
2010-10-15 20:14:47 +04:00
|
|
|
pids[name] = int(self.notify(name).communicate()[0])
|
2010-04-20 21:17:27 +04:00
|
|
|
|
|
|
|
self.assertEqual(0, len(self.conn.notifies))
|
|
|
|
self.assertEqual(extensions.POLL_OK, self.conn.poll())
|
|
|
|
self.assertEqual(3, len(self.conn.notifies))
|
2010-10-15 20:14:47 +04:00
|
|
|
|
|
|
|
names = dict.fromkeys(['foo', 'bar', 'baz'])
|
|
|
|
for (pid, name) in self.conn.notifies:
|
|
|
|
self.assertEqual(pids[name], pid)
|
|
|
|
names.pop(name) # raise if name found twice
|
2010-04-20 21:17:27 +04:00
|
|
|
|
|
|
|
def test_notifies_received_on_execute(self):
|
|
|
|
self.autocommit(self.conn)
|
|
|
|
self.listen('foo')
|
2010-10-15 20:14:47 +04:00
|
|
|
pid = int(self.notify('foo').communicate()[0])
|
2010-04-20 21:17:27 +04:00
|
|
|
self.assertEqual(0, len(self.conn.notifies))
|
|
|
|
self.conn.cursor().execute('select 1;')
|
|
|
|
self.assertEqual(1, len(self.conn.notifies))
|
2010-10-15 20:14:47 +04:00
|
|
|
self.assertEqual(pid, self.conn.notifies[0][0])
|
2010-04-20 21:17:27 +04:00
|
|
|
self.assertEqual('foo', self.conn.notifies[0][1])
|
|
|
|
|
2010-10-16 02:09:47 +04:00
|
|
|
def test_notify_object(self):
|
|
|
|
self.autocommit(self.conn)
|
|
|
|
self.listen('foo')
|
|
|
|
self.notify('foo').communicate()
|
|
|
|
self.conn.poll()
|
|
|
|
notify = self.conn.notifies[0]
|
|
|
|
self.assert_(isinstance(notify, psycopg2.extensions.Notify))
|
|
|
|
|
2010-10-15 12:42:44 +04:00
|
|
|
def test_notify_attributes(self):
|
|
|
|
self.autocommit(self.conn)
|
|
|
|
self.listen('foo')
|
|
|
|
pid = int(self.notify('foo').communicate()[0])
|
|
|
|
self.conn.poll()
|
|
|
|
self.assertEqual(1, len(self.conn.notifies))
|
|
|
|
notify = self.conn.notifies[0]
|
|
|
|
self.assertEqual(pid, notify.pid)
|
|
|
|
self.assertEqual('foo', notify.channel)
|
|
|
|
self.assertEqual('', notify.payload)
|
|
|
|
|
|
|
|
def test_notify_payload(self):
|
|
|
|
if self.conn.server_version < 90000:
|
|
|
|
warnings.warn("server version %s doesn't support notify payload: skipping test"
|
|
|
|
% self.conn.server_version)
|
|
|
|
return
|
|
|
|
self.autocommit(self.conn)
|
|
|
|
self.listen('foo')
|
|
|
|
pid = int(self.notify('foo', payload="Hello, world!").communicate()[0])
|
|
|
|
self.conn.poll()
|
|
|
|
self.assertEqual(1, len(self.conn.notifies))
|
|
|
|
notify = self.conn.notifies[0]
|
|
|
|
self.assertEqual(pid, notify.pid)
|
|
|
|
self.assertEqual('foo', notify.channel)
|
|
|
|
self.assertEqual('Hello, world!', notify.payload)
|
|
|
|
|
2010-10-16 02:09:47 +04:00
|
|
|
def test_notify_init(self):
|
|
|
|
n = psycopg2.extensions.Notify(10, 'foo')
|
|
|
|
self.assertEqual(10, n.pid)
|
|
|
|
self.assertEqual('foo', n.channel)
|
|
|
|
self.assertEqual(None, n.payload)
|
|
|
|
(pid, channel) = n
|
|
|
|
self.assertEqual((pid, channel), (10, 'foo'))
|
|
|
|
|
|
|
|
n = psycopg2.extensions.Notify(42, 'bar', 'baz')
|
|
|
|
self.assertEqual(42, n.pid)
|
|
|
|
self.assertEqual('bar', n.channel)
|
|
|
|
self.assertEqual('baz', n.payload)
|
|
|
|
(pid, channel) = n
|
|
|
|
self.assertEqual((pid, channel), (42, 'bar'))
|
|
|
|
|
2010-10-15 12:42:44 +04:00
|
|
|
|
2010-04-20 21:17:27 +04:00
|
|
|
def test_suite():
|
|
|
|
return unittest.TestLoader().loadTestsFromName(__name__)
|
|
|
|
|
|
|
|
if __name__ == "__main__":
|
|
|
|
unittest.main()
|
|
|
|
|