mirror of
https://github.com/psycopg/psycopg2.git
synced 2025-06-30 17:53:06 +03:00
Check the correctness of the PID in the notify tests.
This commit is contained in:
parent
d9e49e940a
commit
e651308287
|
@ -7,7 +7,7 @@ from psycopg2 import extensions
|
||||||
import time
|
import time
|
||||||
import select
|
import select
|
||||||
import signal
|
import signal
|
||||||
from subprocess import Popen
|
from subprocess import Popen, PIPE
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
if sys.version_info < (3,):
|
if sys.version_info < (3,):
|
||||||
|
@ -43,6 +43,7 @@ import psycopg2
|
||||||
import psycopg2.extensions
|
import psycopg2.extensions
|
||||||
conn = psycopg2.connect(%(dsn)r)
|
conn = psycopg2.connect(%(dsn)r)
|
||||||
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
|
||||||
|
print conn.get_backend_pid()
|
||||||
curs = conn.cursor()
|
curs = conn.cursor()
|
||||||
curs.execute("NOTIFY " %(name)r)
|
curs.execute("NOTIFY " %(name)r)
|
||||||
curs.close()
|
curs.close()
|
||||||
|
@ -50,22 +51,24 @@ conn.close()
|
||||||
"""
|
"""
|
||||||
% { 'dsn': tests.dsn, 'sec': sec, 'name': name})
|
% { 'dsn': tests.dsn, 'sec': sec, 'name': name})
|
||||||
|
|
||||||
return Popen([sys.executable, '-c', script])
|
return Popen([sys.executable, '-c', script], stdout=PIPE)
|
||||||
|
|
||||||
def test_notifies_received_on_poll(self):
|
def test_notifies_received_on_poll(self):
|
||||||
self.autocommit(self.conn)
|
self.autocommit(self.conn)
|
||||||
self.listen('foo')
|
self.listen('foo')
|
||||||
|
|
||||||
self.notify('foo', 1);
|
proc = self.notify('foo', 1)
|
||||||
|
|
||||||
t0 = time.time()
|
t0 = time.time()
|
||||||
ready = select.select([self.conn], [], [], 5)
|
ready = select.select([self.conn], [], [], 5)
|
||||||
t1 = time.time()
|
t1 = time.time()
|
||||||
self.assert_(0.99 < t1 - t0 < 4, t1 - t0)
|
self.assert_(0.99 < t1 - t0 < 4, t1 - t0)
|
||||||
|
|
||||||
|
pid = int(proc.communicate()[0])
|
||||||
self.assertEqual(0, len(self.conn.notifies))
|
self.assertEqual(0, len(self.conn.notifies))
|
||||||
self.assertEqual(extensions.POLL_OK, self.conn.poll())
|
self.assertEqual(extensions.POLL_OK, self.conn.poll())
|
||||||
self.assertEqual(1, len(self.conn.notifies))
|
self.assertEqual(1, len(self.conn.notifies))
|
||||||
|
self.assertEqual(pid, self.conn.notifies[0][0])
|
||||||
self.assertEqual('foo', self.conn.notifies[0][1])
|
self.assertEqual('foo', self.conn.notifies[0][1])
|
||||||
|
|
||||||
def test_many_notifies(self):
|
def test_many_notifies(self):
|
||||||
|
@ -73,23 +76,27 @@ conn.close()
|
||||||
for name in ['foo', 'bar', 'baz']:
|
for name in ['foo', 'bar', 'baz']:
|
||||||
self.listen(name)
|
self.listen(name)
|
||||||
|
|
||||||
|
pids = {}
|
||||||
for name in ['foo', 'bar', 'baz', 'qux']:
|
for name in ['foo', 'bar', 'baz', 'qux']:
|
||||||
self.notify(name).wait()
|
pids[name] = int(self.notify(name).communicate()[0])
|
||||||
|
|
||||||
self.assertEqual(0, len(self.conn.notifies))
|
self.assertEqual(0, len(self.conn.notifies))
|
||||||
self.assertEqual(extensions.POLL_OK, self.conn.poll())
|
self.assertEqual(extensions.POLL_OK, self.conn.poll())
|
||||||
self.assertEqual(3, len(self.conn.notifies))
|
self.assertEqual(3, len(self.conn.notifies))
|
||||||
names = [n[1] for n in self.conn.notifies]
|
|
||||||
for name in ['foo', 'bar', 'baz']:
|
names = dict.fromkeys(['foo', 'bar', 'baz'])
|
||||||
self.assert_(name in names, name)
|
for (pid, name) in self.conn.notifies:
|
||||||
|
self.assertEqual(pids[name], pid)
|
||||||
|
names.pop(name) # raise if name found twice
|
||||||
|
|
||||||
def test_notifies_received_on_execute(self):
|
def test_notifies_received_on_execute(self):
|
||||||
self.autocommit(self.conn)
|
self.autocommit(self.conn)
|
||||||
self.listen('foo')
|
self.listen('foo')
|
||||||
self.notify('foo').wait()
|
pid = int(self.notify('foo').communicate()[0])
|
||||||
self.assertEqual(0, len(self.conn.notifies))
|
self.assertEqual(0, len(self.conn.notifies))
|
||||||
self.conn.cursor().execute('select 1;')
|
self.conn.cursor().execute('select 1;')
|
||||||
self.assertEqual(1, len(self.conn.notifies))
|
self.assertEqual(1, len(self.conn.notifies))
|
||||||
|
self.assertEqual(pid, self.conn.notifies[0][0])
|
||||||
self.assertEqual('foo', self.conn.notifies[0][1])
|
self.assertEqual('foo', self.conn.notifies[0][1])
|
||||||
|
|
||||||
def test_suite():
|
def test_suite():
|
||||||
|
|
Loading…
Reference in New Issue
Block a user