diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c index 43d0fdae..a34e5ef9 100644 --- a/psycopg/connection_int.c +++ b/psycopg/connection_int.c @@ -494,6 +494,25 @@ conn_setup_cancel(connectionObject *self, PGconn *pgconn) return 0; } +/* Return 1 if the "replication" keyword is set in the DSN, 0 otherwise */ +static int +dsn_has_replication(char *pgdsn) +{ + int ret = 0; + PQconninfoOption *connopts, *ptr; + + connopts = PQconninfoParse(pgdsn, NULL); + + for(ptr = connopts; ptr->keyword != NULL; ptr++) { + if(strcmp(ptr->keyword, "replication") == 0 && ptr->val != NULL) + ret = 1; + } + + PQconninfoFree(connopts); + + return ret; +} + /* Return 1 if the server datestyle allows us to work without problems, 0 if it needs to be set to something better, e.g. ISO. */ @@ -522,28 +541,29 @@ conn_setup(connectionObject *self, PGconn *pgconn) { PGresult *pgres = NULL; char *error = NULL; + int rv = -1; self->equote = conn_get_standard_conforming_strings(pgconn); self->server_version = conn_get_server_version(pgconn); self->protocol = conn_get_protocol_version(self->pgconn); if (3 != self->protocol) { PyErr_SetString(InterfaceError, "only protocol 3 supported"); - return -1; + goto exit; } if (0 > conn_read_encoding(self, pgconn)) { - return -1; + goto exit; } if (0 > conn_setup_cancel(self, pgconn)) { - return -1; + goto exit; } Py_BEGIN_ALLOW_THREADS; pthread_mutex_lock(&self->lock); Py_BLOCK_THREADS; - if (!conn_is_datestyle_ok(self->pgconn)) { + if (!dsn_has_replication(self->dsn) && !conn_is_datestyle_ok(self->pgconn)) { int res; Py_UNBLOCK_THREADS; res = pq_set_guc_locked(self, "datestyle", "ISO", @@ -551,18 +571,23 @@ conn_setup(connectionObject *self, PGconn *pgconn) Py_BLOCK_THREADS; if (res < 0) { pq_complete_error(self, &pgres, &error); - return -1; + goto unlock; } } /* for reset */ self->autocommit = 0; + /* success */ + rv = 0; + +unlock: Py_UNBLOCK_THREADS; pthread_mutex_unlock(&self->lock); Py_END_ALLOW_THREADS; - return 0; +exit: + return rv; } /* conn_connect - execute a connection to the database */ @@ -859,8 +884,11 @@ _conn_poll_setup_async(connectionObject *self) self->autocommit = 1; /* If the datestyle is ISO or anything else good, - * we can skip the CONN_STATUS_DATESTYLE step. */ - if (!conn_is_datestyle_ok(self->pgconn)) { + * we can skip the CONN_STATUS_DATESTYLE step. + * Note that we cannot change the datestyle on a replication + * connection. + */ + if (!dsn_has_replication(self->dsn) && !conn_is_datestyle_ok(self->pgconn)) { Dprintf("conn_poll: status -> CONN_STATUS_DATESTYLE"); self->status = CONN_STATUS_DATESTYLE; if (0 == pq_send_query(self, psyco_datestyle)) { diff --git a/scripts/travis_prepare.sh b/scripts/travis_prepare.sh index f4e86118..2b1e12eb 100755 --- a/scripts/travis_prepare.sh +++ b/scripts/travis_prepare.sh @@ -23,10 +23,27 @@ create () { dbname=psycopg2_test pg_createcluster -p $port --start-conf manual $version psycopg + + # for two-phase commit testing set_param "$version" max_prepared_transactions 10 + + # for replication testing + set_param "$version" max_wal_senders 5 + set_param "$version" max_replication_slots 5 + if [ "$version" == "9.2" -o "$version" == "9.3" ] + then + set_param "$version" wal_level hot_standby + else + set_param "$version" wal_level logical + fi + + echo "local replication travis trust" \ + >> "/etc/postgresql/$version/psycopg/pg_hba.conf" + + pg_ctlcluster "$version" psycopg start - sudo -u postgres psql -c "create user travis" "port=$port" + sudo -u postgres psql -c "create user travis replication" "port=$port" sudo -u postgres psql -c "create database $dbname" "port=$port" sudo -u postgres psql -c "grant create on database $dbname to travis" "port=$port" sudo -u postgres psql -c "create extension hstore" "port=$port dbname=$dbname" diff --git a/scripts/travis_test.sh b/scripts/travis_test.sh index df9413a1..15783088 100755 --- a/scripts/travis_test.sh +++ b/scripts/travis_test.sh @@ -14,11 +14,13 @@ run_test () { export PSYCOPG2_TESTDB=$dbname export PSYCOPG2_TESTDB_PORT=$port export PSYCOPG2_TESTDB_USER=travis - make check + export PSYCOPG2_TEST_REPL_DSN= + + python -c "from psycopg2 import tests; tests.unittest.main(defaultTest='tests.test_suite')" --verbose printf "\n\nRunning tests against PostgreSQL $version (green mode)\n\n" export PSYCOPG2_TEST_GREEN=1 - make check + python -c "from psycopg2 import tests; tests.unittest.main(defaultTest='tests.test_suite')" --verbose } run_test 9.6 54396 diff --git a/tests/test_replication.py b/tests/test_replication.py old mode 100644 new mode 100755 index 2ccd4c77..79d1295d --- a/tests/test_replication.py +++ b/tests/test_replication.py @@ -23,14 +23,14 @@ # License for more details. import psycopg2 -import psycopg2.extensions from psycopg2.extras import ( PhysicalReplicationConnection, LogicalReplicationConnection, StopReplication) import testconfig -from testutils import unittest -from testutils import skip_before_postgres -from testutils import ConnectingTestCase +from testutils import unittest, ConnectingTestCase +from testutils import skip_before_postgres, skip_if_green + +skip_repl_if_green = skip_if_green("replication not supported in green mode") class ReplicationTestCase(ConnectingTestCase): @@ -89,6 +89,20 @@ class ReplicationTest(ReplicationTestCase): cur.execute("IDENTIFY_SYSTEM") cur.fetchall() + @skip_before_postgres(9, 0) + def test_datestyle(self): + if testconfig.repl_dsn is None: + return self.skipTest("replication tests disabled by default") + + conn = self.repl_connect( + dsn=testconfig.repl_dsn, options='-cdatestyle=german', + connection_factory=PhysicalReplicationConnection) + if conn is None: + return + cur = conn.cursor() + cur.execute("IDENTIFY_SYSTEM") + cur.fetchall() + @skip_before_postgres(9, 4) def test_logical_replication_connection(self): conn = self.repl_connect(connection_factory=LogicalReplicationConnection) @@ -110,6 +124,7 @@ class ReplicationTest(ReplicationTestCase): psycopg2.ProgrammingError, self.create_replication_slot, cur) @skip_before_postgres(9, 4) # slots require 9.4 + @skip_repl_if_green def test_start_on_missing_replication_slot(self): conn = self.repl_connect(connection_factory=PhysicalReplicationConnection) if conn is None: @@ -123,6 +138,7 @@ class ReplicationTest(ReplicationTestCase): cur.start_replication(self.slot) @skip_before_postgres(9, 4) # slots require 9.4 + @skip_repl_if_green def test_start_and_recover_from_error(self): conn = self.repl_connect(connection_factory=LogicalReplicationConnection) if conn is None: @@ -144,6 +160,7 @@ class ReplicationTest(ReplicationTestCase): cur.start_replication(slot_name=self.slot) @skip_before_postgres(9, 4) # slots require 9.4 + @skip_repl_if_green def test_stop_replication(self): conn = self.repl_connect(connection_factory=LogicalReplicationConnection) if conn is None: @@ -163,12 +180,13 @@ class ReplicationTest(ReplicationTestCase): class AsyncReplicationTest(ReplicationTestCase): @skip_before_postgres(9, 4) # slots require 9.4 + @skip_repl_if_green def test_async_replication(self): conn = self.repl_connect( connection_factory=LogicalReplicationConnection, async=1) if conn is None: return - self.wait(conn) + cur = conn.cursor() self.create_replication_slot(cur, output_plugin='test_decoding') diff --git a/tests/testutils.py b/tests/testutils.py index 1dd0c999..93477357 100644 --- a/tests/testutils.py +++ b/tests/testutils.py @@ -130,8 +130,17 @@ class ConnectingTestCase(unittest.TestCase): import psycopg2 try: conn = self.connect(**kwargs) + if conn.async == 1: + self.wait(conn) except psycopg2.OperationalError, e: - return self.skipTest("replication db not configured: %s" % e) + # If pgcode is not set it is a genuine connection error + # Otherwise we tried to run some bad operation in the connection + # (e.g. bug #482) and we'd rather know that. + if e.pgcode is None: + return self.skipTest("replication db not configured: %s" % e) + else: + raise + return conn def _get_conn(self):