diff --git a/NEWS b/NEWS index e62ebfe1..d293f375 100644 --- a/NEWS +++ b/NEWS @@ -5,6 +5,7 @@ What's new in psycopg 2.9.10 (unreleased) ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ - Add support for Python 3.13. +- Receive notifications on commit (:ticket:`#1728`). - Drop support for Python 3.7. - `~psycopg2.errorcodes` map and `~psycopg2.errors` classes updated to PostgreSQL 17. diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c index 8365e048..0584b390 100644 --- a/psycopg/connection_int.c +++ b/psycopg/connection_int.c @@ -1344,6 +1344,11 @@ conn_set_session(connectionObject *self, int autocommit, } } + Py_BLOCK_THREADS; + conn_notifies_process(self); + conn_notice_process(self); + Py_UNBLOCK_THREADS; + if (autocommit != SRV_STATE_UNCHANGED) { self->autocommit = autocommit; } @@ -1408,6 +1413,11 @@ conn_set_client_encoding(connectionObject *self, const char *pgenc) goto endlock; } + Py_BLOCK_THREADS; + conn_notifies_process(self); + conn_notice_process(self); + Py_UNBLOCK_THREADS; + endlock: pthread_mutex_unlock(&self->lock); Py_END_ALLOW_THREADS; diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index 83ab91f2..59c4070d 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -412,6 +412,7 @@ pq_commit(connectionObject *conn) } Py_BLOCK_THREADS; + conn_notifies_process(conn); conn_notice_process(conn); Py_UNBLOCK_THREADS; @@ -468,6 +469,7 @@ pq_abort(connectionObject *conn) retvalue = pq_abort_locked(conn, &_save); Py_BLOCK_THREADS; + conn_notifies_process(conn); conn_notice_process(conn); Py_UNBLOCK_THREADS; @@ -538,6 +540,7 @@ pq_reset(connectionObject *conn) Py_BLOCK_THREADS; conn_notice_process(conn); + conn_notifies_process(conn); Py_UNBLOCK_THREADS; pthread_mutex_unlock(&conn->lock); diff --git a/tests/test_notify.py b/tests/test_notify.py index 03ab4cde..873a419b 100755 --- a/tests/test_notify.py +++ b/tests/test_notify.py @@ -23,13 +23,15 @@ # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. +import os import unittest from collections import deque +from functools import partial import psycopg2 from psycopg2 import extensions from psycopg2.extensions import Notify -from .testutils import ConnectingTestCase, skip_if_crdb, slow +from .testutils import ConnectingTestCase, skip_if_crdb, skip_if_windows, slow from .testconfig import dsn import sys @@ -74,7 +76,9 @@ conn.close() module=psycopg2.__name__, dsn=dsn, sec=sec, name=name, payload=payload)) - return Popen([sys.executable, '-c', script], stdout=PIPE) + env = os.environ.copy() + env.pop("PSYCOPG_DEBUG", None) + return Popen([sys.executable, '-c', script], stdout=PIPE, env=env) @slow def test_notifies_received_on_poll(self): @@ -126,6 +130,52 @@ conn.close() self.assertEqual(pid, self.conn.notifies[0][0]) self.assertEqual('foo', self.conn.notifies[0][1]) + def _test_notifies_received_on_operation(self, operation, execute_query=True): + self.listen('foo') + self.conn.commit() + if execute_query: + self.conn.cursor().execute('select 1;') + pid = int(self.notify('foo').communicate()[0]) + self.assertEqual(0, len(self.conn.notifies)) + operation() + self.assertEqual(1, len(self.conn.notifies)) + self.assertEqual(pid, self.conn.notifies[0][0]) + self.assertEqual('foo', self.conn.notifies[0][1]) + + @slow + @skip_if_windows + def test_notifies_received_on_commit(self): + self._test_notifies_received_on_operation(self.conn.commit) + + @slow + @skip_if_windows + def test_notifies_received_on_rollback(self): + self._test_notifies_received_on_operation(self.conn.rollback) + + @slow + @skip_if_windows + def test_notifies_received_on_reset(self): + self._test_notifies_received_on_operation(self.conn.reset, execute_query=False) + + @slow + @skip_if_windows + def test_notifies_received_on_set_session(self): + self._test_notifies_received_on_operation( + partial(self.conn.set_session, autocommit=True, readonly=True), + execute_query=False, + ) + + @slow + @skip_if_windows + def test_notifies_received_on_set_client_encoding(self): + self._test_notifies_received_on_operation( + partial( + self.conn.set_client_encoding, + 'LATIN1' if self.conn.encoding != 'LATIN1' else 'UTF8' + ), + execute_query=False, + ) + @slow def test_notify_object(self): self.autocommit(self.conn)