From 282360dd048eea5c7f3021dd9eb542f97c753c9a Mon Sep 17 00:00:00 2001 From: Roman Konoval Date: Wed, 11 Sep 2024 16:29:22 +0200 Subject: [PATCH] adds notifications processing after every PQexec --- psycopg/connection_int.c | 10 ++++++ psycopg/pqpath.c | 2 ++ tests/test_notify.py | 67 ++++++++++++++++++++++++++++++++++++---- 3 files changed, 73 insertions(+), 6 deletions(-) diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c index 8365e048..0584b390 100644 --- a/psycopg/connection_int.c +++ b/psycopg/connection_int.c @@ -1344,6 +1344,11 @@ conn_set_session(connectionObject *self, int autocommit, } } + Py_BLOCK_THREADS; + conn_notifies_process(self); + conn_notice_process(self); + Py_UNBLOCK_THREADS; + if (autocommit != SRV_STATE_UNCHANGED) { self->autocommit = autocommit; } @@ -1408,6 +1413,11 @@ conn_set_client_encoding(connectionObject *self, const char *pgenc) goto endlock; } + Py_BLOCK_THREADS; + conn_notifies_process(self); + conn_notice_process(self); + Py_UNBLOCK_THREADS; + endlock: pthread_mutex_unlock(&self->lock); Py_END_ALLOW_THREADS; diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index 7b5342b9..59c4070d 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -469,6 +469,7 @@ pq_abort(connectionObject *conn) retvalue = pq_abort_locked(conn, &_save); Py_BLOCK_THREADS; + conn_notifies_process(conn); conn_notice_process(conn); Py_UNBLOCK_THREADS; @@ -539,6 +540,7 @@ pq_reset(connectionObject *conn) Py_BLOCK_THREADS; conn_notice_process(conn); + conn_notifies_process(conn); Py_UNBLOCK_THREADS; pthread_mutex_unlock(&conn->lock); diff --git a/tests/test_notify.py b/tests/test_notify.py index 27bcba61..e3bbccd0 100755 --- a/tests/test_notify.py +++ b/tests/test_notify.py @@ -23,13 +23,14 @@ # FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public # License for more details. +import os import unittest from collections import deque import psycopg2 from psycopg2 import extensions from psycopg2.extensions import Notify -from .testutils import ConnectingTestCase, skip_if_crdb, slow +from .testutils import ConnectingTestCase, skip_if_crdb, skip_if_windows, slow from .testconfig import dsn import sys @@ -74,7 +75,9 @@ conn.close() module=psycopg2.__name__, dsn=dsn, sec=sec, name=name, payload=payload)) - return Popen([sys.executable, '-c', script], stdout=PIPE) + env = os.environ.copy() + env.pop("PSYCOPG_DEBUG", None) + return Popen([sys.executable, '-c', script], stdout=PIPE, env=env) @slow def test_notifies_received_on_poll(self): @@ -127,16 +130,68 @@ conn.close() self.assertEqual('foo', self.conn.notifies[0][1]) @slow + @skip_if_windows def test_notifies_received_on_commit(self): - self.listen("foo") + self.listen('foo') self.conn.commit() - self.conn.cursor().execute("select 1;") - pid = int(self.notify("foo").communicate()[0]) + self.conn.cursor().execute('select 1;') + pid = int(self.notify('foo').communicate()[0]) self.assertEqual(0, len(self.conn.notifies)) self.conn.commit() self.assertEqual(1, len(self.conn.notifies)) self.assertEqual(pid, self.conn.notifies[0][0]) - self.assertEqual("foo", self.conn.notifies[0][1]) + self.assertEqual('foo', self.conn.notifies[0][1]) + + @slow + @skip_if_windows + def test_notifies_received_on_rollback(self): + self.listen('foo') + self.conn.commit() + self.conn.cursor().execute('select 1;') + pid = int(self.notify('foo').communicate()[0]) + self.assertEqual(0, len(self.conn.notifies)) + self.conn.rollback() + self.assertEqual(1, len(self.conn.notifies)) + self.assertEqual(pid, self.conn.notifies[0][0]) + self.assertEqual('foo', self.conn.notifies[0][1]) + + @slow + @skip_if_windows + def test_notifies_received_on_reset(self): + self.listen('foo') + self.conn.commit() + pid = int(self.notify('foo').communicate()[0]) + self.assertEqual(0, len(self.conn.notifies)) + self.conn.reset() + self.assertEqual(1, len(self.conn.notifies)) + self.assertEqual(pid, self.conn.notifies[0][0]) + self.assertEqual('foo', self.conn.notifies[0][1]) + + @slow + @skip_if_windows + def test_notifies_received_on_set_session(self): + self.listen('foo') + self.conn.commit() + pid = int(self.notify('foo').communicate()[0]) + self.assertEqual(0, len(self.conn.notifies)) + self.conn.set_session(autocommit=True, readonly=True) + self.assertEqual(1, len(self.conn.notifies)) + self.assertEqual(pid, self.conn.notifies[0][0]) + self.assertEqual('foo', self.conn.notifies[0][1]) + + @slow + @skip_if_windows + def test_notifies_received_on_set_client_encoding(self): + self.listen('foo') + self.conn.commit() + pid = int(self.notify('foo').communicate()[0]) + self.assertEqual(0, len(self.conn.notifies)) + self.conn.set_client_encoding( + 'LATIN1' if self.conn.encoding != 'LATIN1' else 'UTF8' + ) + self.assertEqual(1, len(self.conn.notifies)) + self.assertEqual(pid, self.conn.notifies[0][0]) + self.assertEqual('foo', self.conn.notifies[0][1]) @slow def test_notify_object(self):