Merge branch 'dont_set_datestyle_in_replication_mode'

This commit is contained in:
Daniele Varrazzo 2016-12-25 19:28:23 +01:00
commit c9798ecb15
5 changed files with 91 additions and 17 deletions

View File

@ -494,6 +494,25 @@ conn_setup_cancel(connectionObject *self, PGconn *pgconn)
return 0; return 0;
} }
/* Return 1 if the "replication" keyword is set in the DSN, 0 otherwise */
static int
dsn_has_replication(char *pgdsn)
{
int ret = 0;
PQconninfoOption *connopts, *ptr;
connopts = PQconninfoParse(pgdsn, NULL);
for(ptr = connopts; ptr->keyword != NULL; ptr++) {
if(strcmp(ptr->keyword, "replication") == 0 && ptr->val != NULL)
ret = 1;
}
PQconninfoFree(connopts);
return ret;
}
/* Return 1 if the server datestyle allows us to work without problems, /* Return 1 if the server datestyle allows us to work without problems,
0 if it needs to be set to something better, e.g. ISO. */ 0 if it needs to be set to something better, e.g. ISO. */
@ -522,28 +541,29 @@ conn_setup(connectionObject *self, PGconn *pgconn)
{ {
PGresult *pgres = NULL; PGresult *pgres = NULL;
char *error = NULL; char *error = NULL;
int rv = -1;
self->equote = conn_get_standard_conforming_strings(pgconn); self->equote = conn_get_standard_conforming_strings(pgconn);
self->server_version = conn_get_server_version(pgconn); self->server_version = conn_get_server_version(pgconn);
self->protocol = conn_get_protocol_version(self->pgconn); self->protocol = conn_get_protocol_version(self->pgconn);
if (3 != self->protocol) { if (3 != self->protocol) {
PyErr_SetString(InterfaceError, "only protocol 3 supported"); PyErr_SetString(InterfaceError, "only protocol 3 supported");
return -1; goto exit;
} }
if (0 > conn_read_encoding(self, pgconn)) { if (0 > conn_read_encoding(self, pgconn)) {
return -1; goto exit;
} }
if (0 > conn_setup_cancel(self, pgconn)) { if (0 > conn_setup_cancel(self, pgconn)) {
return -1; goto exit;
} }
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&self->lock); pthread_mutex_lock(&self->lock);
Py_BLOCK_THREADS; Py_BLOCK_THREADS;
if (!conn_is_datestyle_ok(self->pgconn)) { if (!dsn_has_replication(self->dsn) && !conn_is_datestyle_ok(self->pgconn)) {
int res; int res;
Py_UNBLOCK_THREADS; Py_UNBLOCK_THREADS;
res = pq_set_guc_locked(self, "datestyle", "ISO", res = pq_set_guc_locked(self, "datestyle", "ISO",
@ -551,18 +571,23 @@ conn_setup(connectionObject *self, PGconn *pgconn)
Py_BLOCK_THREADS; Py_BLOCK_THREADS;
if (res < 0) { if (res < 0) {
pq_complete_error(self, &pgres, &error); pq_complete_error(self, &pgres, &error);
return -1; goto unlock;
} }
} }
/* for reset */ /* for reset */
self->autocommit = 0; self->autocommit = 0;
/* success */
rv = 0;
unlock:
Py_UNBLOCK_THREADS; Py_UNBLOCK_THREADS;
pthread_mutex_unlock(&self->lock); pthread_mutex_unlock(&self->lock);
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
return 0; exit:
return rv;
} }
/* conn_connect - execute a connection to the database */ /* conn_connect - execute a connection to the database */
@ -859,8 +884,11 @@ _conn_poll_setup_async(connectionObject *self)
self->autocommit = 1; self->autocommit = 1;
/* If the datestyle is ISO or anything else good, /* If the datestyle is ISO or anything else good,
* we can skip the CONN_STATUS_DATESTYLE step. */ * we can skip the CONN_STATUS_DATESTYLE step.
if (!conn_is_datestyle_ok(self->pgconn)) { * Note that we cannot change the datestyle on a replication
* connection.
*/
if (!dsn_has_replication(self->dsn) && !conn_is_datestyle_ok(self->pgconn)) {
Dprintf("conn_poll: status -> CONN_STATUS_DATESTYLE"); Dprintf("conn_poll: status -> CONN_STATUS_DATESTYLE");
self->status = CONN_STATUS_DATESTYLE; self->status = CONN_STATUS_DATESTYLE;
if (0 == pq_send_query(self, psyco_datestyle)) { if (0 == pq_send_query(self, psyco_datestyle)) {

View File

@ -23,10 +23,27 @@ create () {
dbname=psycopg2_test dbname=psycopg2_test
pg_createcluster -p $port --start-conf manual $version psycopg pg_createcluster -p $port --start-conf manual $version psycopg
# for two-phase commit testing
set_param "$version" max_prepared_transactions 10 set_param "$version" max_prepared_transactions 10
# for replication testing
set_param "$version" max_wal_senders 5
set_param "$version" max_replication_slots 5
if [ "$version" == "9.2" -o "$version" == "9.3" ]
then
set_param "$version" wal_level hot_standby
else
set_param "$version" wal_level logical
fi
echo "local replication travis trust" \
>> "/etc/postgresql/$version/psycopg/pg_hba.conf"
pg_ctlcluster "$version" psycopg start pg_ctlcluster "$version" psycopg start
sudo -u postgres psql -c "create user travis" "port=$port" sudo -u postgres psql -c "create user travis replication" "port=$port"
sudo -u postgres psql -c "create database $dbname" "port=$port" sudo -u postgres psql -c "create database $dbname" "port=$port"
sudo -u postgres psql -c "grant create on database $dbname to travis" "port=$port" sudo -u postgres psql -c "grant create on database $dbname to travis" "port=$port"
sudo -u postgres psql -c "create extension hstore" "port=$port dbname=$dbname" sudo -u postgres psql -c "create extension hstore" "port=$port dbname=$dbname"

View File

@ -14,11 +14,13 @@ run_test () {
export PSYCOPG2_TESTDB=$dbname export PSYCOPG2_TESTDB=$dbname
export PSYCOPG2_TESTDB_PORT=$port export PSYCOPG2_TESTDB_PORT=$port
export PSYCOPG2_TESTDB_USER=travis export PSYCOPG2_TESTDB_USER=travis
make check export PSYCOPG2_TEST_REPL_DSN=
python -c "from psycopg2 import tests; tests.unittest.main(defaultTest='tests.test_suite')" --verbose
printf "\n\nRunning tests against PostgreSQL $version (green mode)\n\n" printf "\n\nRunning tests against PostgreSQL $version (green mode)\n\n"
export PSYCOPG2_TEST_GREEN=1 export PSYCOPG2_TEST_GREEN=1
make check python -c "from psycopg2 import tests; tests.unittest.main(defaultTest='tests.test_suite')" --verbose
} }
run_test 9.6 54396 run_test 9.6 54396

28
tests/test_replication.py Normal file → Executable file
View File

@ -23,14 +23,14 @@
# License for more details. # License for more details.
import psycopg2 import psycopg2
import psycopg2.extensions
from psycopg2.extras import ( from psycopg2.extras import (
PhysicalReplicationConnection, LogicalReplicationConnection, StopReplication) PhysicalReplicationConnection, LogicalReplicationConnection, StopReplication)
import testconfig import testconfig
from testutils import unittest from testutils import unittest, ConnectingTestCase
from testutils import skip_before_postgres from testutils import skip_before_postgres, skip_if_green
from testutils import ConnectingTestCase
skip_repl_if_green = skip_if_green("replication not supported in green mode")
class ReplicationTestCase(ConnectingTestCase): class ReplicationTestCase(ConnectingTestCase):
@ -89,6 +89,20 @@ class ReplicationTest(ReplicationTestCase):
cur.execute("IDENTIFY_SYSTEM") cur.execute("IDENTIFY_SYSTEM")
cur.fetchall() cur.fetchall()
@skip_before_postgres(9, 0)
def test_datestyle(self):
if testconfig.repl_dsn is None:
return self.skipTest("replication tests disabled by default")
conn = self.repl_connect(
dsn=testconfig.repl_dsn, options='-cdatestyle=german',
connection_factory=PhysicalReplicationConnection)
if conn is None:
return
cur = conn.cursor()
cur.execute("IDENTIFY_SYSTEM")
cur.fetchall()
@skip_before_postgres(9, 4) @skip_before_postgres(9, 4)
def test_logical_replication_connection(self): def test_logical_replication_connection(self):
conn = self.repl_connect(connection_factory=LogicalReplicationConnection) conn = self.repl_connect(connection_factory=LogicalReplicationConnection)
@ -110,6 +124,7 @@ class ReplicationTest(ReplicationTestCase):
psycopg2.ProgrammingError, self.create_replication_slot, cur) psycopg2.ProgrammingError, self.create_replication_slot, cur)
@skip_before_postgres(9, 4) # slots require 9.4 @skip_before_postgres(9, 4) # slots require 9.4
@skip_repl_if_green
def test_start_on_missing_replication_slot(self): def test_start_on_missing_replication_slot(self):
conn = self.repl_connect(connection_factory=PhysicalReplicationConnection) conn = self.repl_connect(connection_factory=PhysicalReplicationConnection)
if conn is None: if conn is None:
@ -123,6 +138,7 @@ class ReplicationTest(ReplicationTestCase):
cur.start_replication(self.slot) cur.start_replication(self.slot)
@skip_before_postgres(9, 4) # slots require 9.4 @skip_before_postgres(9, 4) # slots require 9.4
@skip_repl_if_green
def test_start_and_recover_from_error(self): def test_start_and_recover_from_error(self):
conn = self.repl_connect(connection_factory=LogicalReplicationConnection) conn = self.repl_connect(connection_factory=LogicalReplicationConnection)
if conn is None: if conn is None:
@ -144,6 +160,7 @@ class ReplicationTest(ReplicationTestCase):
cur.start_replication(slot_name=self.slot) cur.start_replication(slot_name=self.slot)
@skip_before_postgres(9, 4) # slots require 9.4 @skip_before_postgres(9, 4) # slots require 9.4
@skip_repl_if_green
def test_stop_replication(self): def test_stop_replication(self):
conn = self.repl_connect(connection_factory=LogicalReplicationConnection) conn = self.repl_connect(connection_factory=LogicalReplicationConnection)
if conn is None: if conn is None:
@ -163,12 +180,13 @@ class ReplicationTest(ReplicationTestCase):
class AsyncReplicationTest(ReplicationTestCase): class AsyncReplicationTest(ReplicationTestCase):
@skip_before_postgres(9, 4) # slots require 9.4 @skip_before_postgres(9, 4) # slots require 9.4
@skip_repl_if_green
def test_async_replication(self): def test_async_replication(self):
conn = self.repl_connect( conn = self.repl_connect(
connection_factory=LogicalReplicationConnection, async=1) connection_factory=LogicalReplicationConnection, async=1)
if conn is None: if conn is None:
return return
self.wait(conn)
cur = conn.cursor() cur = conn.cursor()
self.create_replication_slot(cur, output_plugin='test_decoding') self.create_replication_slot(cur, output_plugin='test_decoding')

View File

@ -130,8 +130,17 @@ class ConnectingTestCase(unittest.TestCase):
import psycopg2 import psycopg2
try: try:
conn = self.connect(**kwargs) conn = self.connect(**kwargs)
if conn.async == 1:
self.wait(conn)
except psycopg2.OperationalError, e: except psycopg2.OperationalError, e:
# If pgcode is not set it is a genuine connection error
# Otherwise we tried to run some bad operation in the connection
# (e.g. bug #482) and we'd rather know that.
if e.pgcode is None:
return self.skipTest("replication db not configured: %s" % e) return self.skipTest("replication db not configured: %s" % e)
else:
raise
return conn return conn
def _get_conn(self): def _get_conn(self):