From fb1dbc2a9b308dafa1d8d8e21ef39722d4c6473c Mon Sep 17 00:00:00 2001 From: Christoph Moench-Tegeder Date: Fri, 21 Oct 2016 15:32:11 +0200 Subject: [PATCH 1/6] do not "SET datestyle" on replication connections A replication connection - marked by the use of the keyword "replication" in the DSN - does not support SET commands. Trying to sent "SET datestyle" will result in an exception. --- psycopg/connection_int.c | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c index 43d0fdae..c8880b16 100644 --- a/psycopg/connection_int.c +++ b/psycopg/connection_int.c @@ -494,6 +494,26 @@ conn_setup_cancel(connectionObject *self, PGconn *pgconn) return 0; } +/* Return 1 if the "replication" keyword is set in the DSN, 0 otherwise */ +static int +dsn_has_replication(char *pgdsn) +{ + int ret = 0; + PQconninfoOption *connopts, *ptr; + + connopts = PQconninfoParse(pgdsn, NULL); + + for(ptr = connopts; ptr->keyword != NULL; ptr++) { + printf("keyword %s val %s\n", ptr->keyword, ptr->val); + if(strcmp(ptr->keyword, "replication") == 0 && ptr->val != NULL) + ret = 1; + } + + PQconninfoFree(connopts); + + return ret; +} + /* Return 1 if the server datestyle allows us to work without problems, 0 if it needs to be set to something better, e.g. ISO. */ @@ -543,7 +563,7 @@ conn_setup(connectionObject *self, PGconn *pgconn) pthread_mutex_lock(&self->lock); Py_BLOCK_THREADS; - if (!conn_is_datestyle_ok(self->pgconn)) { + if (!dsn_has_replication(self->dsn) && !conn_is_datestyle_ok(self->pgconn)) { int res; Py_UNBLOCK_THREADS; res = pq_set_guc_locked(self, "datestyle", "ISO", @@ -859,8 +879,11 @@ _conn_poll_setup_async(connectionObject *self) self->autocommit = 1; /* If the datestyle is ISO or anything else good, - * we can skip the CONN_STATUS_DATESTYLE step. */ - if (!conn_is_datestyle_ok(self->pgconn)) { + * we can skip the CONN_STATUS_DATESTYLE step. + * Note that we cannot change the datestyle on a replication + * connection. + */ + if (!dsn_has_replication(self->dsn) && !conn_is_datestyle_ok(self->pgconn)) { Dprintf("conn_poll: status -> CONN_STATUS_DATESTYLE"); self->status = CONN_STATUS_DATESTYLE; if (0 == pq_send_query(self, psyco_datestyle)) { From c2d405116b7b68808930eebf7e7b076d8dd17030 Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Sun, 25 Dec 2016 17:44:25 +0100 Subject: [PATCH 2/6] Dropped testing print --- psycopg/connection_int.c | 1 - 1 file changed, 1 deletion(-) diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c index c8880b16..e5c6579f 100644 --- a/psycopg/connection_int.c +++ b/psycopg/connection_int.c @@ -504,7 +504,6 @@ dsn_has_replication(char *pgdsn) connopts = PQconninfoParse(pgdsn, NULL); for(ptr = connopts; ptr->keyword != NULL; ptr++) { - printf("keyword %s val %s\n", ptr->keyword, ptr->val); if(strcmp(ptr->keyword, "replication") == 0 && ptr->val != NULL) ret = 1; } From e27579292aff953dacdc1892f00dc32bb73a29c1 Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Sun, 25 Dec 2016 17:45:01 +0100 Subject: [PATCH 3/6] Avoid deadlock on close if set datestyle failed --- psycopg/connection_int.c | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c index e5c6579f..a34e5ef9 100644 --- a/psycopg/connection_int.c +++ b/psycopg/connection_int.c @@ -541,21 +541,22 @@ conn_setup(connectionObject *self, PGconn *pgconn) { PGresult *pgres = NULL; char *error = NULL; + int rv = -1; self->equote = conn_get_standard_conforming_strings(pgconn); self->server_version = conn_get_server_version(pgconn); self->protocol = conn_get_protocol_version(self->pgconn); if (3 != self->protocol) { PyErr_SetString(InterfaceError, "only protocol 3 supported"); - return -1; + goto exit; } if (0 > conn_read_encoding(self, pgconn)) { - return -1; + goto exit; } if (0 > conn_setup_cancel(self, pgconn)) { - return -1; + goto exit; } Py_BEGIN_ALLOW_THREADS; @@ -570,18 +571,23 @@ conn_setup(connectionObject *self, PGconn *pgconn) Py_BLOCK_THREADS; if (res < 0) { pq_complete_error(self, &pgres, &error); - return -1; + goto unlock; } } /* for reset */ self->autocommit = 0; + /* success */ + rv = 0; + +unlock: Py_UNBLOCK_THREADS; pthread_mutex_unlock(&self->lock); Py_END_ALLOW_THREADS; - return 0; +exit: + return rv; } /* conn_connect - execute a connection to the database */ From b73115ac41559c31fc3a2a3fdb0893046c08d1a5 Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Sun, 25 Dec 2016 17:46:11 +0100 Subject: [PATCH 4/6] Added test to verify bug #482 --- tests/test_replication.py | 17 +++++++++++++++-- tests/testutils.py | 11 ++++++++++- 2 files changed, 25 insertions(+), 3 deletions(-) mode change 100644 => 100755 tests/test_replication.py diff --git a/tests/test_replication.py b/tests/test_replication.py old mode 100644 new mode 100755 index 2ccd4c77..33a8065a --- a/tests/test_replication.py +++ b/tests/test_replication.py @@ -23,7 +23,6 @@ # License for more details. import psycopg2 -import psycopg2.extensions from psycopg2.extras import ( PhysicalReplicationConnection, LogicalReplicationConnection, StopReplication) @@ -89,6 +88,20 @@ class ReplicationTest(ReplicationTestCase): cur.execute("IDENTIFY_SYSTEM") cur.fetchall() + @skip_before_postgres(9, 0) + def test_datestyle(self): + if testconfig.repl_dsn is None: + return self.skipTest("replication tests disabled by default") + + conn = self.repl_connect( + dsn=testconfig.repl_dsn, options='-cdatestyle=german', + connection_factory=PhysicalReplicationConnection) + if conn is None: + return + cur = conn.cursor() + cur.execute("IDENTIFY_SYSTEM") + cur.fetchall() + @skip_before_postgres(9, 4) def test_logical_replication_connection(self): conn = self.repl_connect(connection_factory=LogicalReplicationConnection) @@ -168,7 +181,7 @@ class AsyncReplicationTest(ReplicationTestCase): connection_factory=LogicalReplicationConnection, async=1) if conn is None: return - self.wait(conn) + cur = conn.cursor() self.create_replication_slot(cur, output_plugin='test_decoding') diff --git a/tests/testutils.py b/tests/testutils.py index 1dd0c999..93477357 100644 --- a/tests/testutils.py +++ b/tests/testutils.py @@ -130,8 +130,17 @@ class ConnectingTestCase(unittest.TestCase): import psycopg2 try: conn = self.connect(**kwargs) + if conn.async == 1: + self.wait(conn) except psycopg2.OperationalError, e: - return self.skipTest("replication db not configured: %s" % e) + # If pgcode is not set it is a genuine connection error + # Otherwise we tried to run some bad operation in the connection + # (e.g. bug #482) and we'd rather know that. + if e.pgcode is None: + return self.skipTest("replication db not configured: %s" % e) + else: + raise + return conn def _get_conn(self): From 874705db429de5cc23a20c5e5cb85287c163f037 Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Sun, 25 Dec 2016 17:49:58 +0100 Subject: [PATCH 5/6] Configure Travis to test replication --- scripts/travis_prepare.sh | 19 ++++++++++++++++++- scripts/travis_test.sh | 6 ++++-- 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/scripts/travis_prepare.sh b/scripts/travis_prepare.sh index f4e86118..2b1e12eb 100755 --- a/scripts/travis_prepare.sh +++ b/scripts/travis_prepare.sh @@ -23,10 +23,27 @@ create () { dbname=psycopg2_test pg_createcluster -p $port --start-conf manual $version psycopg + + # for two-phase commit testing set_param "$version" max_prepared_transactions 10 + + # for replication testing + set_param "$version" max_wal_senders 5 + set_param "$version" max_replication_slots 5 + if [ "$version" == "9.2" -o "$version" == "9.3" ] + then + set_param "$version" wal_level hot_standby + else + set_param "$version" wal_level logical + fi + + echo "local replication travis trust" \ + >> "/etc/postgresql/$version/psycopg/pg_hba.conf" + + pg_ctlcluster "$version" psycopg start - sudo -u postgres psql -c "create user travis" "port=$port" + sudo -u postgres psql -c "create user travis replication" "port=$port" sudo -u postgres psql -c "create database $dbname" "port=$port" sudo -u postgres psql -c "grant create on database $dbname to travis" "port=$port" sudo -u postgres psql -c "create extension hstore" "port=$port dbname=$dbname" diff --git a/scripts/travis_test.sh b/scripts/travis_test.sh index df9413a1..15783088 100755 --- a/scripts/travis_test.sh +++ b/scripts/travis_test.sh @@ -14,11 +14,13 @@ run_test () { export PSYCOPG2_TESTDB=$dbname export PSYCOPG2_TESTDB_PORT=$port export PSYCOPG2_TESTDB_USER=travis - make check + export PSYCOPG2_TEST_REPL_DSN= + + python -c "from psycopg2 import tests; tests.unittest.main(defaultTest='tests.test_suite')" --verbose printf "\n\nRunning tests against PostgreSQL $version (green mode)\n\n" export PSYCOPG2_TEST_GREEN=1 - make check + python -c "from psycopg2 import tests; tests.unittest.main(defaultTest='tests.test_suite')" --verbose } run_test 9.6 54396 From c22093ddd49ea6045e05b9eaafafc7a001bac1a5 Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Sun, 25 Dec 2016 19:00:30 +0100 Subject: [PATCH 6/6] Skip replication tests in green mode --- tests/test_replication.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/tests/test_replication.py b/tests/test_replication.py index 33a8065a..79d1295d 100755 --- a/tests/test_replication.py +++ b/tests/test_replication.py @@ -27,9 +27,10 @@ from psycopg2.extras import ( PhysicalReplicationConnection, LogicalReplicationConnection, StopReplication) import testconfig -from testutils import unittest -from testutils import skip_before_postgres -from testutils import ConnectingTestCase +from testutils import unittest, ConnectingTestCase +from testutils import skip_before_postgres, skip_if_green + +skip_repl_if_green = skip_if_green("replication not supported in green mode") class ReplicationTestCase(ConnectingTestCase): @@ -123,6 +124,7 @@ class ReplicationTest(ReplicationTestCase): psycopg2.ProgrammingError, self.create_replication_slot, cur) @skip_before_postgres(9, 4) # slots require 9.4 + @skip_repl_if_green def test_start_on_missing_replication_slot(self): conn = self.repl_connect(connection_factory=PhysicalReplicationConnection) if conn is None: @@ -136,6 +138,7 @@ class ReplicationTest(ReplicationTestCase): cur.start_replication(self.slot) @skip_before_postgres(9, 4) # slots require 9.4 + @skip_repl_if_green def test_start_and_recover_from_error(self): conn = self.repl_connect(connection_factory=LogicalReplicationConnection) if conn is None: @@ -157,6 +160,7 @@ class ReplicationTest(ReplicationTestCase): cur.start_replication(slot_name=self.slot) @skip_before_postgres(9, 4) # slots require 9.4 + @skip_repl_if_green def test_stop_replication(self): conn = self.repl_connect(connection_factory=LogicalReplicationConnection) if conn is None: @@ -176,6 +180,7 @@ class ReplicationTest(ReplicationTestCase): class AsyncReplicationTest(ReplicationTestCase): @skip_before_postgres(9, 4) # slots require 9.4 + @skip_repl_if_green def test_async_replication(self): conn = self.repl_connect( connection_factory=LogicalReplicationConnection, async=1)