Notifications are automatically read after each query.

Added tests for basic notifications process.
This commit is contained in:
Daniele Varrazzo 2010-04-20 18:17:27 +01:00
parent 12ef826d50
commit 02a28ff028
7 changed files with 145 additions and 27 deletions

View File

@ -2,6 +2,8 @@
* lib/pqpath.c: Fixed reference leak in notify reception.
* Notifies are collected if available after every query execution.
2010-04-13 Daniele Varrazzo <daniele.varrazzo@gmail.com>
* lib/extensions.py: DECIMAL typecaster imported from _psycopg.

View File

@ -227,11 +227,16 @@ manner.
.. |NOTIFY| replace:: :sql:`NOTIFY`
.. _NOTIFY: http://www.postgresql.org/docs/8.4/static/sql-notify.html
Notification are received using the `~connection.poll()` method. A simple
application could poll the connection from time to time to check if something
new has arrived. A better strategy is to use some I/O completion function such
as |select()|_ to sleep until awaken from the kernel when there is some data to
read on the connection, thereby using no CPU unless there is something to read::
Notification are received after every query execution. If the user is interested
in receiveing notification but not in performing any query, the
`~connection.poll()` method can be used to check for notification without
wasting resources.
A simple application could poll the connection from time to time to check if
something new has arrived. A better strategy is to use some I/O completion
function such as |select()|_ to sleep until awaken from the kernel when there is
some data to read on the connection, thereby using no CPU unless there is
something to read::
import select
import psycopg2

View File

@ -123,6 +123,7 @@ HIDDEN int conn_get_isolation_level(PGresult *pgres);
HIDDEN int conn_get_protocol_version(PGconn *pgconn);
HIDDEN void conn_notice_process(connectionObject *self);
HIDDEN void conn_notice_clean(connectionObject *self);
HIDDEN void conn_notifies_process(connectionObject *self);
HIDDEN int conn_setup(connectionObject *self, PGconn *pgconn);
HIDDEN int conn_connect(connectionObject *self, long int async);
HIDDEN void conn_close(connectionObject *self);

View File

@ -123,6 +123,33 @@ conn_notice_clean(connectionObject *self)
Py_END_ALLOW_THREADS;
}
/* conn_notifies_process - make received notification available
*
* The function should be called with the connection lock and holding the GIL.
*/
void
conn_notifies_process(connectionObject *self)
{
PGnotify *pgn;
while ((pgn = PQnotifies(self->pgconn)) != NULL) {
PyObject *notify;
Dprintf("conn_notifies_process: got NOTIFY from pid %d, msg = %s",
(int) pgn->be_pid, pgn->relname);
notify = PyTuple_New(2);
PyTuple_SET_ITEM(notify, 0, PyInt_FromLong((long)pgn->be_pid));
PyTuple_SET_ITEM(notify, 1, PyString_FromString(pgn->relname));
PyList_Append(self->notifies, notify);
Py_DECREF(notify);
PQfreemem(pgn);
}
}
/*
* the conn_get_* family of functions makes it easier to obtain the connection
* parameters from query results or by interrogating the connection itself

View File

@ -603,8 +603,6 @@ int
pq_is_busy(connectionObject *conn)
{
int res;
PGnotify *pgn;
Dprintf("pq_is_busy: consuming input");
Py_BEGIN_ALLOW_THREADS;
@ -618,30 +616,13 @@ pq_is_busy(connectionObject *conn)
return -1;
}
/* now check for notifies */
while ((pgn = PQnotifies(conn->pgconn)) != NULL) {
PyObject *notify;
Dprintf("curs_is_busy: got NOTIFY from pid %d, msg = %s",
(int) pgn->be_pid, pgn->relname);
Py_BLOCK_THREADS;
notify = PyTuple_New(2);
PyTuple_SET_ITEM(notify, 0, PyInt_FromLong((long)pgn->be_pid));
PyTuple_SET_ITEM(notify, 1, PyString_FromString(pgn->relname));
PyList_Append(conn->notifies, notify);
Py_DECREF(notify);
Py_UNBLOCK_THREADS;
PQfreemem(pgn);
}
res = PQisBusy(conn->pgconn);
pthread_mutex_unlock(&(conn->lock));
Py_END_ALLOW_THREADS;
conn_notice_process(conn);
conn_notifies_process(conn);
return res;
}
@ -1327,13 +1308,13 @@ pq_fetch(cursorObject *curs)
break;
}
Dprintf("pq_fetch: fetching done; check for critical errors");
conn_notice_process(curs->conn);
conn_notifies_process(curs->conn);
/* error checking, close the connection if necessary (some critical errors
are not really critical, like a COPY FROM error: if that's the case we
raise the exception but we avoid to close the connection) */
Dprintf("pq_fetch: fetching done; check for critical errors");
if (curs->conn->critical) {
if (ex == -1) {
pq_resolve_critical(curs->conn, 1);

View File

@ -28,6 +28,7 @@ import types_basic
import types_extras
import test_lobject
import test_copy
import test_notify
import test_async
def test_suite():
@ -43,6 +44,7 @@ def test_suite():
suite.addTest(types_extras.test_suite())
suite.addTest(test_lobject.test_suite())
suite.addTest(test_copy.test_suite())
suite.addTest(test_notify.test_suite())
suite.addTest(test_async.test_suite())
return suite

100
tests/test_notify.py Executable file
View File

@ -0,0 +1,100 @@
#!/usr/bin/env python
import unittest
import psycopg2
from psycopg2 import extensions
import time
import select
import signal
from subprocess import Popen
import sys
if sys.version_info < (3,):
import tests
else:
import py3tests as tests
class NotifiesTests(unittest.TestCase):
def setUp(self):
self.conn = psycopg2.connect(tests.dsn)
def tearDown(self):
self.conn.close()
def autocommit(self, conn):
"""Set a connection in autocommit mode."""
conn.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT)
def listen(self, name):
"""Start listening for a name on self.conn."""
curs = self.conn.cursor()
curs.execute("LISTEN " + name)
curs.close()
def notify(self, name, sec=0):
"""Send a notification to the database, eventually after some time."""
script = ("""\
import time
time.sleep(%(sec)s)
import psycopg2
import psycopg2.extensions
conn = psycopg2.connect(%(dsn)r)
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
curs = conn.cursor()
curs.execute("NOTIFY " %(name)r)
curs.close()
conn.close()
"""
% { 'dsn': tests.dsn, 'sec': sec, 'name': name})
return Popen([sys.executable, '-c', script])
def test_notifies_received_on_poll(self):
self.autocommit(self.conn)
self.listen('foo')
self.notify('foo', 1);
t0 = time.time()
ready = select.select([self.conn], [], [], 2)
t1 = time.time()
self.assert_(0.99 < t1 - t0 < 1.2, t1 - t0)
self.assertEqual(0, len(self.conn.notifies))
self.assertEqual(extensions.POLL_OK, self.conn.poll())
self.assertEqual(1, len(self.conn.notifies))
self.assertEqual('foo', self.conn.notifies[0][1])
def test_many_notifies(self):
self.autocommit(self.conn)
for name in ['foo', 'bar', 'baz']:
self.listen(name)
for name in ['foo', 'bar', 'baz', 'qux']:
self.notify(name).wait()
self.assertEqual(0, len(self.conn.notifies))
self.assertEqual(extensions.POLL_OK, self.conn.poll())
self.assertEqual(3, len(self.conn.notifies))
names = [n[1] for n in self.conn.notifies]
for name in ['foo', 'bar', 'baz']:
self.assert_(name in names, name)
def test_notifies_received_on_execute(self):
self.autocommit(self.conn)
self.listen('foo')
self.notify('foo').wait()
self.assertEqual(0, len(self.conn.notifies))
self.conn.cursor().execute('select 1;')
self.assertEqual(1, len(self.conn.notifies))
self.assertEqual('foo', self.conn.notifies[0][1])
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)
if __name__ == "__main__":
unittest.main()