From 1f330e9cac9c5d40c33f4f58d0dbfc0109c62edc Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Tue, 2 Jun 2015 17:02:04 +0100 Subject: [PATCH] Allow connection.notices and notifies to be replaced. Close #326 --- NEWS | 3 ++ doc/src/advanced.rst | 4 +++ doc/src/connection.rst | 18 ++++++++-- psycopg/connection_int.c | 69 +++++++++++++++++++++++++++++---------- psycopg/connection_type.c | 4 +-- tests/test_connection.py | 36 ++++++++++++++++++++ tests/test_notify.py | 22 +++++++++++++ 7 files changed, 134 insertions(+), 22 deletions(-) diff --git a/NEWS b/NEWS index 4aebbff3..fe6cea41 100644 --- a/NEWS +++ b/NEWS @@ -10,6 +10,9 @@ New features: `~psycopg2.extensions.libpq_version()` to inspect the version of the ``libpq`` library the module was compiled/loaded with (:tickets:`#35, #323`). +- The attributes `~connection.notices` and `~connection.notifies` can be + customized replacing them with any object exposing an `!append()` method + (:ticket:`#326`). What's new in psycopg 2.6.1 diff --git a/doc/src/advanced.rst b/doc/src/advanced.rst index eecbcfda..f0483cea 100644 --- a/doc/src/advanced.rst +++ b/doc/src/advanced.rst @@ -312,6 +312,10 @@ received from a previous version server will have the Added `~psycopg2.extensions.Notify` object and handling notification payload. +.. versionchanged:: 2.7 + The `~connection.notifies` attribute is writable: it is possible to + replace it with any object exposing an `!append()` method. An useful + example would be to use a `~collections.deque` object. .. index:: diff --git a/doc/src/connection.rst b/doc/src/connection.rst index 92178f34..cceef1e5 100644 --- a/doc/src/connection.rst +++ b/doc/src/connection.rst @@ -483,13 +483,21 @@ The ``connection`` class ['NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "foo_pkey" for table "foo"\n', 'NOTICE: CREATE TABLE will create implicit sequence "foo_id_seq" for serial column "foo.id"\n'] + .. versionchanged:: 2.7 + The `!notices` attribute is writable: the user may replace it + with any Python object exposing an `!append()` method. If + appending raises an exception the notice is silently + dropped. + To avoid a leak in case excessive notices are generated, only the last - 50 messages are kept. + 50 messages are kept. This check is only in place if the `!notices` + attribute is a list: if any other object is used it will be up to the + user to guard from leakage. You can configure what messages to receive using `PostgreSQL logging configuration parameters`__ such as ``log_statement``, ``client_min_messages``, ``log_min_duration_statement`` etc. - + .. __: http://www.postgresql.org/docs/current/static/runtime-config-logging.html @@ -506,6 +514,12 @@ The ``connection`` class the payload was not accessible. To keep backward compatibility, `!Notify` objects can still be accessed as 2 items tuples. + .. versionchanged:: 2.7 + The `!notifies` attribute is writable: the user may replace it + with any Python object exposing an `!append()` method. If + appending raises an exception the notification is silently + dropped. + .. attribute:: cursor_factory diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c index 40f7e6ca..43d0fdae 100644 --- a/psycopg/connection_int.c +++ b/psycopg/connection_int.c @@ -111,41 +111,60 @@ void conn_notice_process(connectionObject *self) { struct connectionObject_notice *notice; - Py_ssize_t nnotices; + PyObject *msg = NULL; + PyObject *tmp = NULL; + static PyObject *append; if (NULL == self->notice_pending) { return; } - notice = self->notice_pending; + if (!append) { + if (!(append = Text_FromUTF8("append"))) { + goto error; + } + } + notice = self->notice_pending; while (notice != NULL) { - PyObject *msg; - msg = conn_text_from_chars(self, notice->message); Dprintf("conn_notice_process: %s", notice->message); - if (msg) { - PyList_Append(self->notice_list, msg); - Py_DECREF(msg); - } - else { - /* We don't really have a way to report errors, so gulp it. - * The function should only fail for out of memory, so we are - * likely going to die anyway. */ - PyErr_Clear(); + if (!(msg = conn_text_from_chars(self, notice->message))) { goto error; } + + if (!(tmp = PyObject_CallMethodObjArgs( + self->notice_list, append, msg, NULL))) { + + goto error; } + Py_DECREF(tmp); tmp = NULL; + Py_DECREF(msg); msg = NULL; + notice = notice->next; } /* Remove the oldest item if the queue is getting too long. */ - nnotices = PyList_GET_SIZE(self->notice_list); - if (nnotices > CONN_NOTICES_LIMIT) { - PySequence_DelSlice(self->notice_list, - 0, nnotices - CONN_NOTICES_LIMIT); + if (PyList_Check(self->notice_list)) { + Py_ssize_t nnotices; + nnotices = PyList_GET_SIZE(self->notice_list); + if (nnotices > CONN_NOTICES_LIMIT) { + if (-1 == PySequence_DelSlice(self->notice_list, + 0, nnotices - CONN_NOTICES_LIMIT)) { + PyErr_Clear(); + } + } } conn_notice_clean(self); + return; + +error: + Py_XDECREF(tmp); + Py_XDECREF(msg); + conn_notice_clean(self); + + /* TODO: the caller doesn't expects errors from us */ + PyErr_Clear(); } void @@ -177,6 +196,15 @@ conn_notifies_process(connectionObject *self) PGnotify *pgn = NULL; PyObject *notify = NULL; PyObject *pid = NULL, *channel = NULL, *payload = NULL; + PyObject *tmp = NULL; + + static PyObject *append; + + if (!append) { + if (!(append = Text_FromUTF8("append"))) { + goto error; + } + } while ((pgn = PQnotifies(self->pgconn)) != NULL) { @@ -196,7 +224,11 @@ conn_notifies_process(connectionObject *self) Py_DECREF(channel); channel = NULL; Py_DECREF(payload); payload = NULL; - PyList_Append(self->notifies, (PyObject *)notify); + if (!(tmp = PyObject_CallMethodObjArgs( + self->notifies, append, notify, NULL))) { + goto error; + } + Py_DECREF(tmp); tmp = NULL; Py_DECREF(notify); notify = NULL; PQfreemem(pgn); pgn = NULL; @@ -205,6 +237,7 @@ conn_notifies_process(connectionObject *self) error: if (pgn) { PQfreemem(pgn); } + Py_XDECREF(tmp); Py_XDECREF(notify); Py_XDECREF(pid); Py_XDECREF(channel); diff --git a/psycopg/connection_type.c b/psycopg/connection_type.c index 9931399b..2c1dddf2 100644 --- a/psycopg/connection_type.c +++ b/psycopg/connection_type.c @@ -1001,8 +1001,8 @@ static struct PyMemberDef connectionObject_members[] = { "True if the connection is closed."}, {"encoding", T_STRING, offsetof(connectionObject, encoding), READONLY, "The current client encoding."}, - {"notices", T_OBJECT, offsetof(connectionObject, notice_list), READONLY}, - {"notifies", T_OBJECT, offsetof(connectionObject, notifies), READONLY}, + {"notices", T_OBJECT, offsetof(connectionObject, notice_list), 0}, + {"notifies", T_OBJECT, offsetof(connectionObject, notifies), 0}, {"dsn", T_STRING, offsetof(connectionObject, dsn), READONLY, "The current connection string."}, {"async", T_LONG, offsetof(connectionObject, async), READONLY, diff --git a/tests/test_connection.py b/tests/test_connection.py index 340693e2..fa78eb37 100755 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -129,6 +129,42 @@ class ConnectionTests(ConnectingTestCase): self.assertEqual(50, len(conn.notices)) self.assert_('table99' in conn.notices[-1], conn.notices[-1]) + def test_notices_deque(self): + from collections import deque + + conn = self.conn + self.conn.notices = deque() + cur = conn.cursor() + if self.conn.server_version >= 90300: + cur.execute("set client_min_messages=debug1") + + cur.execute("create temp table table1 (id serial); create temp table table2 (id serial);") + cur.execute("create temp table table3 (id serial); create temp table table4 (id serial);") + self.assertEqual(len(conn.notices), 4) + self.assert_('table1' in conn.notices.popleft()) + self.assert_('table2' in conn.notices.popleft()) + self.assert_('table3' in conn.notices.popleft()) + self.assert_('table4' in conn.notices.popleft()) + self.assertEqual(len(conn.notices), 0) + + # not limited, but no error + for i in range(0, 100, 10): + sql = " ".join(["create temp table table2_%d (id serial);" % j for j in range(i, i+10)]) + cur.execute(sql) + + self.assertEqual(100, len(conn.notices)) + + def test_notices_noappend(self): + conn = self.conn + self.conn.notices = None # will make an error swallowes ok + cur = conn.cursor() + if self.conn.server_version >= 90300: + cur.execute("set client_min_messages=debug1") + + cur.execute("create temp table table1 (id serial);") + + self.assertEqual(self.conn.notices, None) + def test_server_version(self): self.assert_(self.conn.server_version) diff --git a/tests/test_notify.py b/tests/test_notify.py index f8383899..fc6224d7 100755 --- a/tests/test_notify.py +++ b/tests/test_notify.py @@ -155,6 +155,27 @@ conn.close() self.assertEqual('foo', notify.channel) self.assertEqual('Hello, world!', notify.payload) + def test_notify_deque(self): + from collections import deque + self.autocommit(self.conn) + self.conn.notifies = deque() + self.listen('foo') + self.notify('foo').communicate() + time.sleep(0.5) + self.conn.poll() + notify = self.conn.notifies.popleft() + self.assert_(isinstance(notify, psycopg2.extensions.Notify)) + self.assertEqual(len(self.conn.notifies), 0) + + def test_notify_noappend(self): + self.autocommit(self.conn) + self.conn.notifies = None + self.listen('foo') + self.notify('foo').communicate() + time.sleep(0.5) + self.conn.poll() + self.assertEqual(self.conn.notifies, None) + def test_notify_init(self): n = psycopg2.extensions.Notify(10, 'foo') self.assertEqual(10, n.pid) @@ -192,6 +213,7 @@ conn.close() self.assertNotEqual(hash(Notify(10, 'foo', 'bar')), hash(Notify(10, 'foo'))) + def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__)