From 02a28ff028b6f06625d2bb1dc4dadd639fbe3b43 Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Tue, 20 Apr 2010 18:17:27 +0100 Subject: [PATCH] Notifications are automatically read after each query. Added tests for basic notifications process. --- ChangeLog | 2 + doc/src/advanced.rst | 15 ++++-- psycopg/connection.h | 1 + psycopg/connection_int.c | 27 +++++++++++ psycopg/pqpath.c | 25 ++-------- tests/__init__.py | 2 + tests/test_notify.py | 100 +++++++++++++++++++++++++++++++++++++++ 7 files changed, 145 insertions(+), 27 deletions(-) create mode 100755 tests/test_notify.py diff --git a/ChangeLog b/ChangeLog index 613a1347..12cc6ab3 100644 --- a/ChangeLog +++ b/ChangeLog @@ -2,6 +2,8 @@ * lib/pqpath.c: Fixed reference leak in notify reception. + * Notifies are collected if available after every query execution. + 2010-04-13 Daniele Varrazzo * lib/extensions.py: DECIMAL typecaster imported from _psycopg. diff --git a/doc/src/advanced.rst b/doc/src/advanced.rst index a601339c..b8ed5375 100644 --- a/doc/src/advanced.rst +++ b/doc/src/advanced.rst @@ -227,11 +227,16 @@ manner. .. |NOTIFY| replace:: :sql:`NOTIFY` .. _NOTIFY: http://www.postgresql.org/docs/8.4/static/sql-notify.html -Notification are received using the `~connection.poll()` method. A simple -application could poll the connection from time to time to check if something -new has arrived. A better strategy is to use some I/O completion function such -as |select()|_ to sleep until awaken from the kernel when there is some data to -read on the connection, thereby using no CPU unless there is something to read:: +Notification are received after every query execution. If the user is interested +in receiveing notification but not in performing any query, the +`~connection.poll()` method can be used to check for notification without +wasting resources. + +A simple application could poll the connection from time to time to check if +something new has arrived. A better strategy is to use some I/O completion +function such as |select()|_ to sleep until awaken from the kernel when there is +some data to read on the connection, thereby using no CPU unless there is +something to read:: import select import psycopg2 diff --git a/psycopg/connection.h b/psycopg/connection.h index 6b74ca7b..3a57e8da 100644 --- a/psycopg/connection.h +++ b/psycopg/connection.h @@ -123,6 +123,7 @@ HIDDEN int conn_get_isolation_level(PGresult *pgres); HIDDEN int conn_get_protocol_version(PGconn *pgconn); HIDDEN void conn_notice_process(connectionObject *self); HIDDEN void conn_notice_clean(connectionObject *self); +HIDDEN void conn_notifies_process(connectionObject *self); HIDDEN int conn_setup(connectionObject *self, PGconn *pgconn); HIDDEN int conn_connect(connectionObject *self, long int async); HIDDEN void conn_close(connectionObject *self); diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c index 81eae8ac..66a22524 100644 --- a/psycopg/connection_int.c +++ b/psycopg/connection_int.c @@ -123,6 +123,33 @@ conn_notice_clean(connectionObject *self) Py_END_ALLOW_THREADS; } + +/* conn_notifies_process - make received notification available + * + * The function should be called with the connection lock and holding the GIL. + */ + +void +conn_notifies_process(connectionObject *self) +{ + PGnotify *pgn; + + while ((pgn = PQnotifies(self->pgconn)) != NULL) { + PyObject *notify; + + Dprintf("conn_notifies_process: got NOTIFY from pid %d, msg = %s", + (int) pgn->be_pid, pgn->relname); + + notify = PyTuple_New(2); + PyTuple_SET_ITEM(notify, 0, PyInt_FromLong((long)pgn->be_pid)); + PyTuple_SET_ITEM(notify, 1, PyString_FromString(pgn->relname)); + PyList_Append(self->notifies, notify); + Py_DECREF(notify); + PQfreemem(pgn); + } +} + + /* * the conn_get_* family of functions makes it easier to obtain the connection * parameters from query results or by interrogating the connection itself diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index a229943a..a79533db 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -603,8 +603,6 @@ int pq_is_busy(connectionObject *conn) { int res; - PGnotify *pgn; - Dprintf("pq_is_busy: consuming input"); Py_BEGIN_ALLOW_THREADS; @@ -618,30 +616,13 @@ pq_is_busy(connectionObject *conn) return -1; } - - /* now check for notifies */ - while ((pgn = PQnotifies(conn->pgconn)) != NULL) { - PyObject *notify; - - Dprintf("curs_is_busy: got NOTIFY from pid %d, msg = %s", - (int) pgn->be_pid, pgn->relname); - - Py_BLOCK_THREADS; - notify = PyTuple_New(2); - PyTuple_SET_ITEM(notify, 0, PyInt_FromLong((long)pgn->be_pid)); - PyTuple_SET_ITEM(notify, 1, PyString_FromString(pgn->relname)); - PyList_Append(conn->notifies, notify); - Py_DECREF(notify); - Py_UNBLOCK_THREADS; - PQfreemem(pgn); - } - res = PQisBusy(conn->pgconn); pthread_mutex_unlock(&(conn->lock)); Py_END_ALLOW_THREADS; conn_notice_process(conn); + conn_notifies_process(conn); return res; } @@ -1327,13 +1308,13 @@ pq_fetch(cursorObject *curs) break; } - Dprintf("pq_fetch: fetching done; check for critical errors"); - conn_notice_process(curs->conn); + conn_notifies_process(curs->conn); /* error checking, close the connection if necessary (some critical errors are not really critical, like a COPY FROM error: if that's the case we raise the exception but we avoid to close the connection) */ + Dprintf("pq_fetch: fetching done; check for critical errors"); if (curs->conn->critical) { if (ex == -1) { pq_resolve_critical(curs->conn, 1); diff --git a/tests/__init__.py b/tests/__init__.py index cb15389b..5863de61 100755 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -28,6 +28,7 @@ import types_basic import types_extras import test_lobject import test_copy +import test_notify import test_async def test_suite(): @@ -43,6 +44,7 @@ def test_suite(): suite.addTest(types_extras.test_suite()) suite.addTest(test_lobject.test_suite()) suite.addTest(test_copy.test_suite()) + suite.addTest(test_notify.test_suite()) suite.addTest(test_async.test_suite()) return suite diff --git a/tests/test_notify.py b/tests/test_notify.py new file mode 100755 index 00000000..722aecf5 --- /dev/null +++ b/tests/test_notify.py @@ -0,0 +1,100 @@ +#!/usr/bin/env python +import unittest + +import psycopg2 +from psycopg2 import extensions + +import time +import select +import signal +from subprocess import Popen + +import sys +if sys.version_info < (3,): + import tests +else: + import py3tests as tests + + +class NotifiesTests(unittest.TestCase): + + def setUp(self): + self.conn = psycopg2.connect(tests.dsn) + + def tearDown(self): + self.conn.close() + + def autocommit(self, conn): + """Set a connection in autocommit mode.""" + conn.set_isolation_level(extensions.ISOLATION_LEVEL_AUTOCOMMIT) + + def listen(self, name): + """Start listening for a name on self.conn.""" + curs = self.conn.cursor() + curs.execute("LISTEN " + name) + curs.close() + + def notify(self, name, sec=0): + """Send a notification to the database, eventually after some time.""" + script = ("""\ +import time +time.sleep(%(sec)s) +import psycopg2 +import psycopg2.extensions +conn = psycopg2.connect(%(dsn)r) +conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) +curs = conn.cursor() +curs.execute("NOTIFY " %(name)r) +curs.close() +conn.close() +""" + % { 'dsn': tests.dsn, 'sec': sec, 'name': name}) + + return Popen([sys.executable, '-c', script]) + + def test_notifies_received_on_poll(self): + self.autocommit(self.conn) + self.listen('foo') + + self.notify('foo', 1); + + t0 = time.time() + ready = select.select([self.conn], [], [], 2) + t1 = time.time() + self.assert_(0.99 < t1 - t0 < 1.2, t1 - t0) + + self.assertEqual(0, len(self.conn.notifies)) + self.assertEqual(extensions.POLL_OK, self.conn.poll()) + self.assertEqual(1, len(self.conn.notifies)) + self.assertEqual('foo', self.conn.notifies[0][1]) + + def test_many_notifies(self): + self.autocommit(self.conn) + for name in ['foo', 'bar', 'baz']: + self.listen(name) + + for name in ['foo', 'bar', 'baz', 'qux']: + self.notify(name).wait() + + self.assertEqual(0, len(self.conn.notifies)) + self.assertEqual(extensions.POLL_OK, self.conn.poll()) + self.assertEqual(3, len(self.conn.notifies)) + names = [n[1] for n in self.conn.notifies] + for name in ['foo', 'bar', 'baz']: + self.assert_(name in names, name) + + def test_notifies_received_on_execute(self): + self.autocommit(self.conn) + self.listen('foo') + self.notify('foo').wait() + self.assertEqual(0, len(self.conn.notifies)) + self.conn.cursor().execute('select 1;') + self.assertEqual(1, len(self.conn.notifies)) + self.assertEqual('foo', self.conn.notifies[0][1]) + +def test_suite(): + return unittest.TestLoader().loadTestsFromName(__name__) + +if __name__ == "__main__": + unittest.main() +