Fix ReplicationTest: no NotSupportedError now.

This commit is contained in:
Oleksandr Shulgin 2015-10-14 12:43:26 +02:00
parent 6ad299945f
commit 54079072db
2 changed files with 14 additions and 8 deletions

View File

@ -1180,14 +1180,22 @@ class AutocommitTests(ConnectingTestCase):
class ReplicationTest(ConnectingTestCase):
@skip_before_postgres(9, 0)
def test_replication_not_supported(self):
conn = self.repl_connect()
def test_physical_replication_connection(self):
import psycopg2.extras
conn = self.repl_connect(connection_factory=psycopg2.extras.PhysicalReplicationConnection)
if conn is None: return
cur = conn.cursor()
f = StringIO()
self.assertRaises(psycopg2.NotSupportedError,
cur.copy_expert, "START_REPLICATION 0/0", f)
cur.execute("IDENTIFY_SYSTEM")
cur.fetchall()
@skip_before_postgres(9, 4)
def test_logical_replication_connection(self):
import psycopg2.extras
conn = self.repl_connect(connection_factory=psycopg2.extras.LogicalReplicationConnection)
if conn is None: return
cur = conn.cursor()
cur.execute("IDENTIFY_SYSTEM")
cur.fetchall()
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)

View File

@ -7,8 +7,6 @@ dbhost = os.environ.get('PSYCOPG2_TESTDB_HOST', None)
dbport = os.environ.get('PSYCOPG2_TESTDB_PORT', None)
dbuser = os.environ.get('PSYCOPG2_TESTDB_USER', None)
dbpass = os.environ.get('PSYCOPG2_TESTDB_PASSWORD', None)
repl_dsn = os.environ.get('PSYCOPG2_TEST_REPL_DSN',
"dbname=psycopg2_test replication=1")
# Check if we want to test psycopg's green path.
green = os.environ.get('PSYCOPG2_TEST_GREEN', None)
@ -35,4 +33,4 @@ if dbuser is not None:
if dbpass is not None:
dsn += ' password=%s' % dbpass
repl_dsn = os.environ.get('PSYCOPG2_TEST_REPL_DSN', dsn)