Merge branch 'test-dinosaurs'

This commit is contained in:
Daniele Varrazzo 2017-02-07 00:58:48 +00:00
commit 3ff350cd24
13 changed files with 251 additions and 119 deletions

View File

@ -1,60 +1,131 @@
#!/bin/bash
set -e
set -e -x
# Prepare the test databases in Travis CI.
#
# The script should be run with sudo.
# The script is not idempotent: it assumes the machine in a clean state
# and is designed for a sudo-enabled Trusty environment.
#
# The variables TEST_PAST, TEST_FUTURE, DONT_TEST_PRESENT can be used to test
# against unsupported Postgres versions and skip tests with supported ones.
#
# The variables can be set in the travis configuration
# (https://travis-ci.org/psycopg/psycopg2/settings)
set_param () {
# Set a parameter in a postgresql.conf file
version=$1
param=$2
value=$3
param=$1
value=$2
sed -i "s/^\s*#\?\s*$param.*/$param = $value/" \
"/etc/postgresql/$version/psycopg/postgresql.conf"
sed -i "s/^\s*#\?\s*$param.*/$param = $value/" "$DATADIR/postgresql.conf"
}
create () {
version=$1
port=$2
dbname=psycopg2_test
export VERSION=$1
export PACKAGE=${2:-$VERSION}
pg_createcluster -p $port --start-conf manual $version psycopg
# Version as number: 9.6 -> 906
export VERNUM=$(( $(echo $VERSION \
| sed 's/\(.\+\)\.\(.\+\)/100 * \1 + \2/') ))
# for two-phase commit testing
set_param "$version" max_prepared_transactions 10
# Port number: 9.6 -> 50906
export PORT=$(( 50000 + $VERNUM ))
# for replication testing
set_param "$version" max_wal_senders 5
set_param "$version" max_replication_slots 5
if [ "$version" == "9.2" -o "$version" == "9.3" ]
then
set_param "$version" wal_level hot_standby
else
set_param "$version" wal_level logical
export DATADIR="/var/lib/postgresql/$PACKAGE/psycopg"
export PGDIR="/usr/lib/postgresql/$PACKAGE"
export PGBIN="$PGDIR/bin"
# install postgres versions not available on the image
if (( "$VERNUM" < 902 || "$VERNUM" > 906 )); then
wget -O - http://initd.org/psycopg/tarballs/postgresql/postgresql-${PACKAGE}.tar.bz2 \
| sudo tar xjf - -C /usr/lib/postgresql
fi
echo "local replication travis trust" \
>> "/etc/postgresql/$version/psycopg/pg_hba.conf"
sudo -u postgres "$PGBIN/initdb" -D "$DATADIR"
set_param port "$PORT"
if (( "$VERNUM" >= 800 )); then
set_param listen_addresses "'*'"
else
set_param tcpip_socket true
fi
pg_ctlcluster "$version" psycopg start
# for two-phase commit testing
if (( "$VERNUM" >= 801 )); then set_param max_prepared_transactions 10; fi
sudo -u postgres psql -c "create user travis replication" "port=$port"
sudo -u postgres psql -c "create database $dbname" "port=$port"
sudo -u postgres psql -c "grant create on database $dbname to travis" "port=$port"
sudo -u postgres psql -c "create extension hstore" "port=$port dbname=$dbname"
# for replication testing
if (( "$VERNUM" >= 900 )); then set_param max_wal_senders 5; fi
if (( "$VERNUM" >= 904 )); then set_param max_replication_slots 5; fi
if (( "$VERNUM" >= 904 )); then
set_param wal_level logical
elif (( "$VERNUM" >= 900 )); then
set_param wal_level hot_standby
fi
if (( "$VERNUM" >= 900 )); then
echo "host replication travis 0.0.0.0/0 trust" >> "$DATADIR/pg_hba.conf"
fi
# start the server, wait for start
sudo -u postgres "$PGBIN/pg_ctl" -w -l /dev/null -D "$DATADIR" start
# create the test database
DBNAME=psycopg2_test
CONNINFO="user=postgres host=localhost port=$PORT dbname=template1"
if (( "$VERNUM" >= 901 )); then
psql -c "create user travis createdb createrole replication" "$CONNINFO"
elif (( "$VERNUM" >= 801 )); then
psql -c "create user travis createdb createrole" "$CONNINFO"
else
psql -c "create user travis createdb createuser" "$CONNINFO"
fi
psql -c "create database $DBNAME with owner travis" "$CONNINFO"
# configure global objects on the test database
CONNINFO="user=postgres host=localhost port=$PORT dbname=$DBNAME"
if (( "$VERNUM" >= 901 )); then
psql -c "create extension hstore" "$CONNINFO"
elif (( "$VERNUM" >= 803 )); then
psql -f "$PGDIR/share/contrib/hstore.sql" "$CONNINFO"
fi
if (( "$VERNUM" == 901 )); then
psql -c "create extension json" "$CONNINFO"
fi
}
# Would give a permission denied error in the travis build dir
cd /
create 9.6 54396
create 9.5 54395
create 9.4 54394
create 9.3 54393
create 9.2 54392
# Postgres versions supported by Travis CI
if [[ -z "$DONT_TEST_PRESENT" ]]; then
create 9.6
create 9.5
create 9.4
create 9.3
create 9.2
fi
# Unsupported postgres versions that we still support
# Images built by https://github.com/psycopg/psycopg2-wheels/tree/build-dinosaurs
if [[ -n "$TEST_PAST" ]]; then
create 7.4
create 8.0
create 8.1
create 8.2
create 8.3
create 8.4
create 9.0
create 9.1
fi
# Postgres built from master
if [[ -n "$TEST_FUTURE" ]]; then
create 10.0 10-master
fi

View File

@ -2,29 +2,71 @@
# Run the tests in all the databases
# The script is designed for a Trusty environment.
#
# The variables TEST_PAST, TEST_FUTURE, DONT_TEST_PRESENT can be used to test
# against unsupported Postgres versions and skip tests with supported ones.
#
# The variables TEST_VERBOSE enables verbose test log.
#
# The variables can be set in the travis configuration
# (https://travis-ci.org/psycopg/psycopg2/settings)
set -e
set -e -x
run_test () {
version=$1
port=$2
dbname=psycopg2_test
VERSION=$1
DBNAME=psycopg2_test
if [[ -n "$TEST_VERBOSE" ]]; then
VERBOSE=--verbose
else
VERBOSE=
fi
printf "\n\nRunning tests against PostgreSQL $version\n\n"
export PSYCOPG2_TESTDB=$dbname
# Port number: 9.6 -> 50906
port=$(( 50000 + $(echo $VERSION \
| sed 's/\(.\+\)\.\(.\+\)/100 * \1 + \2/') ))
printf "\n\nRunning tests against PostgreSQL $VERSION (port $port)\n\n"
export PSYCOPG2_TESTDB=$DBNAME
export PSYCOPG2_TESTDB_HOST=localhost
export PSYCOPG2_TESTDB_PORT=$port
export PSYCOPG2_TESTDB_USER=travis
export PSYCOPG2_TEST_REPL_DSN=
unset PSYCOPG2_TEST_GREEN
python -c "from psycopg2 import tests; tests.unittest.main(defaultTest='tests.test_suite')"
python -c \
"from psycopg2 import tests; tests.unittest.main(defaultTest='tests.test_suite')" \
$VERBOSE
printf "\n\nRunning tests against PostgreSQL $version (green mode)\n\n"
printf "\n\nRunning tests against PostgreSQL $VERSION (green mode)\n\n"
export PSYCOPG2_TEST_GREEN=1
python -c "from psycopg2 import tests; tests.unittest.main(defaultTest='tests.test_suite')"
python -c \
"from psycopg2 import tests; tests.unittest.main(defaultTest='tests.test_suite')" \
$VERBOSE
}
run_test 9.6 54396
run_test 9.5 54395
run_test 9.4 54394
run_test 9.3 54393
run_test 9.2 54392
# Postgres versions supported by Travis CI
if [[ -z "$DONT_TEST_PRESENT" ]]; then
run_test 9.6
run_test 9.5
run_test 9.4
run_test 9.3
run_test 9.2
fi
# Unsupported postgres versions that we still support
# Images built by https://github.com/psycopg/psycopg2-wheels/tree/build-dinosaurs
if [[ -n "$TEST_PAST" ]]; then
run_test 7.4
run_test 8.0
run_test 8.1
run_test 8.2
run_test 8.3
run_test 8.4
run_test 9.0
run_test 9.1
fi
# Postgres built from master
if [[ -n "$TEST_FUTURE" ]]; then
run_test 10.0
fi

View File

@ -23,12 +23,14 @@
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
import time
import psycopg2
from psycopg2 import extras
from testconfig import dsn
from testutils import (ConnectingTestCase, unittest, skip_before_postgres,
assertDsnEqual)
from testutils import ConnectingTestCase, unittest, skip_before_postgres, slow
from test_replication import ReplicationTestCase, skip_repl_if_green
from psycopg2.extras import LogicalReplicationConnection, StopReplication
@ -97,13 +99,15 @@ class CancelTests(ConnectingTestCase):
)''')
self.conn.commit()
@slow
@skip_before_postgres(8, 2)
def test_async_cancel(self):
async_conn = psycopg2.connect(dsn, async=True)
self.assertRaises(psycopg2.OperationalError, async_conn.cancel)
extras.wait_select(async_conn)
cur = async_conn.cursor()
cur.execute("select pg_sleep(10000)")
cur.execute("select pg_sleep(10)")
time.sleep(1)
self.assertTrue(async_conn.isexecuting())
async_conn.cancel()
self.assertRaises(psycopg2.extensions.QueryCanceledError,
@ -143,23 +147,23 @@ class ConnectTestCase(unittest.TestCase):
pass
psycopg2.connect(database='foo', host='baz', connection_factory=f)
assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertDsnEqual(self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], f)
self.assertEqual(self.args[2], False)
psycopg2.connect("dbname=foo host=baz", connection_factory=f)
assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertDsnEqual(self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], f)
self.assertEqual(self.args[2], False)
def test_async(self):
psycopg2.connect(database='foo', host='baz', async=1)
assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertDsnEqual(self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], None)
self.assert_(self.args[2])
psycopg2.connect("dbname=foo host=baz", async=True)
assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertDsnEqual(self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], None)
self.assert_(self.args[2])

View File

@ -23,6 +23,7 @@
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
import time
import threading
import psycopg2
@ -86,13 +87,15 @@ class CancelTests(ConnectingTestCase):
self.assertEqual(errors, [])
@slow
@skip_before_postgres(8, 2)
def test_async_cancel(self):
async_conn = psycopg2.connect(dsn, async_=True)
self.assertRaises(psycopg2.OperationalError, async_conn.cancel)
extras.wait_select(async_conn)
cur = async_conn.cursor()
cur.execute("select pg_sleep(2)")
cur.execute("select pg_sleep(10)")
time.sleep(1)
self.assertTrue(async_conn.isexecuting())
async_conn.cancel()
self.assertRaises(psycopg2.extensions.QueryCanceledError,

View File

@ -33,7 +33,7 @@ import psycopg2.errorcodes
from psycopg2 import extensions as ext
from testutils import (
unittest, assertDsnEqual, decorate_all_tests, skip_if_no_superuser,
unittest, decorate_all_tests, skip_if_no_superuser,
skip_before_postgres, skip_after_postgres, skip_before_libpq,
ConnectingTestCase, skip_if_tpc_disabled, skip_if_windows, slow)
@ -413,7 +413,7 @@ class MakeDsnTestCase(ConnectingTestCase):
def test_empty_param(self):
dsn = ext.make_dsn(dbname='sony', password='')
assertDsnEqual(self, dsn, "dbname=sony password=''")
self.assertDsnEqual(dsn, "dbname=sony password=''")
def test_escape(self):
dsn = ext.make_dsn(dbname='hello world')
@ -436,10 +436,10 @@ class MakeDsnTestCase(ConnectingTestCase):
def test_params_merging(self):
dsn = ext.make_dsn('dbname=foo host=bar', host='baz')
assertDsnEqual(self, dsn, 'dbname=foo host=baz')
self.assertDsnEqual(dsn, 'dbname=foo host=baz')
dsn = ext.make_dsn('dbname=foo', user='postgres')
assertDsnEqual(self, dsn, 'dbname=foo user=postgres')
self.assertDsnEqual(dsn, 'dbname=foo user=postgres')
def test_no_dsn_munging(self):
dsnin = 'dbname=a host=b user=c password=d'
@ -453,7 +453,7 @@ class MakeDsnTestCase(ConnectingTestCase):
self.assertEqual(dsn, url)
dsn = ext.make_dsn(url, application_name='woot')
assertDsnEqual(self, dsn,
self.assertDsnEqual(dsn,
'dbname=test user=tester password=secret application_name=woot')
self.assertRaises(psycopg2.ProgrammingError,

View File

@ -79,20 +79,20 @@ class CursorTests(ConnectingTestCase):
# unicode query with non-ascii data
cur.execute(u"SELECT '%s';" % snowman)
self.assertEqual(snowman.encode('utf8'), b(cur.fetchone()[0]))
self.assertEqual(("SELECT '%s';" % snowman).encode('utf8'),
cur.mogrify(u"SELECT '%s';" % snowman).replace(b"E'", b"'"))
self.assertQuotedEqual(("SELECT '%s';" % snowman).encode('utf8'),
cur.mogrify(u"SELECT '%s';" % snowman))
# unicode args
cur.execute("SELECT %s;", (snowman,))
self.assertEqual(snowman.encode("utf-8"), b(cur.fetchone()[0]))
self.assertEqual(("SELECT '%s';" % snowman).encode('utf8'),
cur.mogrify("SELECT %s;", (snowman,)).replace(b"E'", b"'"))
self.assertQuotedEqual(("SELECT '%s';" % snowman).encode('utf8'),
cur.mogrify("SELECT %s;", (snowman,)))
# unicode query and args
cur.execute(u"SELECT %s;", (snowman,))
self.assertEqual(snowman.encode("utf-8"), b(cur.fetchone()[0]))
self.assertEqual(("SELECT '%s';" % snowman).encode('utf8'),
cur.mogrify(u"SELECT %s;", (snowman,)).replace(b"E'", b"'"))
self.assertQuotedEqual(("SELECT '%s';" % snowman).encode('utf8'),
cur.mogrify(u"SELECT %s;", (snowman,)))
def test_mogrify_decimal_explodes(self):
# issue #7: explodes on windows with python 2.5 and psycopg 2.2.2

View File

@ -14,10 +14,10 @@
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
import unittest
from datetime import date
from testutils import ConnectingTestCase
import testutils
from testutils import unittest
import psycopg2
import psycopg2.extras
@ -49,7 +49,7 @@ class FastExecuteTestMixin(object):
id serial primary key, date date, val int, data text)""")
class TestExecuteBatch(FastExecuteTestMixin, ConnectingTestCase):
class TestExecuteBatch(FastExecuteTestMixin, testutils.ConnectingTestCase):
def test_empty(self):
cur = self.conn.cursor()
psycopg2.extras.execute_batch(cur,
@ -96,6 +96,7 @@ class TestExecuteBatch(FastExecuteTestMixin, ConnectingTestCase):
cur.execute("select id, val from testfast order by id")
self.assertEqual(cur.fetchall(), [(i, i * 10) for i in range(25)])
@testutils.skip_before_postgres(8, 0)
def test_unicode(self):
cur = self.conn.cursor()
ext.register_type(ext.UNICODE, cur)
@ -123,7 +124,7 @@ class TestExecuteBatch(FastExecuteTestMixin, ConnectingTestCase):
self.assertEqual(cur.fetchone(), (3, snowman))
class TestExecuteValuse(FastExecuteTestMixin, ConnectingTestCase):
class TestExecuteValues(FastExecuteTestMixin, testutils.ConnectingTestCase):
def test_empty(self):
cur = self.conn.cursor()
psycopg2.extras.execute_values(cur,
@ -230,6 +231,10 @@ class TestExecuteValuse(FastExecuteTestMixin, ConnectingTestCase):
self.assertEqual(cur.fetchall(), [(1, 'hi')])
testutils.decorate_all_tests(TestExecuteValues,
testutils.skip_before_postgres(8, 2))
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)

View File

@ -19,7 +19,8 @@ from __future__ import unicode_literals
import sys
from functools import wraps
from testutils import unittest, ConnectingTestCase, decorate_all_tests
import testutils
from testutils import unittest
import psycopg2
import psycopg2.extras
@ -39,7 +40,7 @@ def skip_if_no_ipaddress(f):
return skip_if_no_ipaddress_
class NetworkingTestCase(ConnectingTestCase):
class NetworkingTestCase(testutils.ConnectingTestCase):
def test_inet_cast(self):
import ipaddress as ip
cur = self.conn.cursor()
@ -58,6 +59,7 @@ class NetworkingTestCase(ConnectingTestCase):
self.assert_(isinstance(obj, ip.IPv6Interface), repr(obj))
self.assertEquals(obj, ip.ip_interface('::ffff:102:300/128'))
@testutils.skip_before_postgres(8, 2)
def test_inet_array_cast(self):
import ipaddress as ip
cur = self.conn.cursor()
@ -99,6 +101,7 @@ class NetworkingTestCase(ConnectingTestCase):
self.assert_(isinstance(obj, ip.IPv6Network), repr(obj))
self.assertEquals(obj, ip.ip_network('::ffff:102:300/128'))
@testutils.skip_before_postgres(8, 2)
def test_cidr_array_cast(self):
import ipaddress as ip
cur = self.conn.cursor()
@ -122,7 +125,7 @@ class NetworkingTestCase(ConnectingTestCase):
cur.execute("select %s", [ip.ip_network('::ffff:102:300/128')])
self.assertEquals(cur.fetchone()[0], '::ffff:102:300/128')
decorate_all_tests(NetworkingTestCase, skip_if_no_ipaddress)
testutils.decorate_all_tests(NetworkingTestCase, skip_if_no_ipaddress)
def test_suite():

View File

@ -27,7 +27,7 @@ import sys
from subprocess import Popen
from testutils import (unittest, skip_before_python, skip_before_postgres,
ConnectingTestCase, skip_copy_if_green, script_to_py3, assertDsnEqual, slow)
ConnectingTestCase, skip_copy_if_green, script_to_py3, slow)
import psycopg2
@ -93,23 +93,23 @@ class ConnectTestCase(unittest.TestCase):
pass
psycopg2.connect(database='foo', host='baz', connection_factory=f)
assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertDsnEqual(self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], f)
self.assertEqual(self.args[2], False)
psycopg2.connect("dbname=foo host=baz", connection_factory=f)
assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertDsnEqual(self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], f)
self.assertEqual(self.args[2], False)
def test_async(self):
psycopg2.connect(database='foo', host='baz', async_=1)
assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertDsnEqual(self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], None)
self.assert_(self.args[2])
psycopg2.connect("dbname=foo host=baz", async_=True)
assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertDsnEqual(self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], None)
self.assert_(self.args[2])
@ -121,7 +121,7 @@ class ConnectTestCase(unittest.TestCase):
def test_empty_param(self):
psycopg2.connect(database='sony', password='')
assertDsnEqual(self, self.args[0], "dbname=sony password=''")
self.assertDsnEqual(self.args[0], "dbname=sony password=''")
def test_escape(self):
psycopg2.connect(database='hello world')
@ -144,7 +144,7 @@ class ConnectTestCase(unittest.TestCase):
self.assertEqual(self.args[0], 'dbname=bar')
psycopg2.connect('dbname=foo', user='postgres')
assertDsnEqual(self, self.args[0], 'dbname=foo user=postgres')
self.assertDsnEqual(self.args[0], 'dbname=foo user=postgres')
class ExceptionsTestCase(ConnectingTestCase):

View File

@ -185,6 +185,7 @@ class TestQuotedIdentifier(ConnectingTestCase):
self.assertEqual(quote_ident('blah-blah', self.conn), '"blah-blah"')
self.assertEqual(quote_ident('quote"inside', self.conn), '"quote""inside"')
@testutils.skip_before_postgres(8, 0)
@testutils.skip_before_libpq(9, 0)
def test_unicode_ident(self):
from psycopg2.extensions import quote_ident
@ -236,7 +237,7 @@ class TestStringAdapter(ConnectingTestCase):
a.prepare(self.conn)
self.assertEqual(a.encoding, 'utf_8')
self.assertEqual(a.getquoted(), b"'\xe2\x98\x83'")
self.assertQuotedEqual(a.getquoted(), b"'\xe2\x98\x83'")
@testutils.skip_before_python(3)
def test_adapt_bytes(self):
@ -244,7 +245,7 @@ class TestStringAdapter(ConnectingTestCase):
self.conn.set_client_encoding('utf8')
a = psycopg2.extensions.QuotedString(snowman.encode('utf8'))
a.prepare(self.conn)
self.assertEqual(a.getquoted(), b"'\xe2\x98\x83'")
self.assertQuotedEqual(a.getquoted(), b"'\xe2\x98\x83'")
def test_suite():

View File

@ -200,9 +200,7 @@ class LiteralTests(ConnectingTestCase):
def test_repr(self):
self.assertEqual(repr(sql.Literal("foo")), "Literal('foo')")
self.assertEqual(str(sql.Literal("foo")), "Literal('foo')")
self.assertEqual(
sql.Literal("foo").as_string(self.conn).replace("E'", "'"),
"'foo'")
self.assertQuotedEqual(sql.Literal("foo").as_string(self.conn), "'foo'")
self.assertEqual(sql.Literal(42).as_string(self.conn), "42")
self.assertEqual(
sql.Literal(dt.date(2017, 1, 1)).as_string(self.conn),
@ -302,24 +300,24 @@ class ComposedTest(ConnectingTestCase):
obj = sql.Composed([sql.Literal("foo"), sql.Identifier("b'ar")])
obj = obj.join(", ")
self.assert_(isinstance(obj, sql.Composed))
self.assertEqual(obj.as_string(self.conn), "'foo', \"b'ar\"")
self.assertQuotedEqual(obj.as_string(self.conn), "'foo', \"b'ar\"")
def test_sum(self):
obj = sql.Composed([sql.SQL("foo ")])
obj = obj + sql.Literal("bar")
self.assert_(isinstance(obj, sql.Composed))
self.assertEqual(obj.as_string(self.conn), "foo 'bar'")
self.assertQuotedEqual(obj.as_string(self.conn), "foo 'bar'")
def test_sum_inplace(self):
obj = sql.Composed([sql.SQL("foo ")])
obj += sql.Literal("bar")
self.assert_(isinstance(obj, sql.Composed))
self.assertEqual(obj.as_string(self.conn), "foo 'bar'")
self.assertQuotedEqual(obj.as_string(self.conn), "foo 'bar'")
obj = sql.Composed([sql.SQL("foo ")])
obj += sql.Composed([sql.Literal("bar")])
self.assert_(isinstance(obj, sql.Composed))
self.assertEqual(obj.as_string(self.conn), "foo 'bar'")
self.assertQuotedEqual(obj.as_string(self.conn), "foo 'bar'")
def test_iter(self):
obj = sql.Composed([sql.SQL("foo"), sql.SQL('bar')])

View File

@ -31,13 +31,6 @@ import psycopg2.extras
import psycopg2.extensions as ext
def filter_scs(conn, s):
if conn.get_parameter_status("standard_conforming_strings") == 'off':
return s
else:
return s.replace(b"E'", b"'")
class TypesExtrasTests(ConnectingTestCase):
"""Test that all type conversions are working."""
@ -105,17 +98,13 @@ class TypesExtrasTests(ConnectingTestCase):
i = Inet("192.168.1.0/24")
a = psycopg2.extensions.adapt(i)
a.prepare(self.conn)
self.assertEqual(
filter_scs(self.conn, b"E'192.168.1.0/24'::inet"),
a.getquoted())
self.assertQuotedEqual(a.getquoted(), b"'192.168.1.0/24'::inet")
# adapts ok with unicode too
i = Inet(u"192.168.1.0/24")
a = psycopg2.extensions.adapt(i)
a.prepare(self.conn)
self.assertEqual(
filter_scs(self.conn, b"E'192.168.1.0/24'::inet"),
a.getquoted())
self.assertQuotedEqual(a.getquoted(), b"'192.168.1.0/24'::inet")
def test_adapt_fail(self):
class Foo(object):
@ -160,13 +149,12 @@ class HstoreTestCase(ConnectingTestCase):
ii.sort()
self.assertEqual(len(ii), len(o))
self.assertEqual(ii[0], filter_scs(self.conn, b"(E'a' => E'1')"))
self.assertEqual(ii[1], filter_scs(self.conn, b"(E'b' => E'''')"))
self.assertEqual(ii[2], filter_scs(self.conn, b"(E'c' => NULL)"))
self.assertQuotedEqual(ii[0], b"('a' => '1')")
self.assertQuotedEqual(ii[1], b"('b' => '''')")
self.assertQuotedEqual(ii[2], b"('c' => NULL)")
if 'd' in o:
encc = u'\xe0'.encode(psycopg2.extensions.encodings[self.conn.encoding])
self.assertEqual(ii[3],
filter_scs(self.conn, b"(E'd' => E'" + encc + b"')"))
self.assertQuotedEqual(ii[3], b"('d' => '" + encc + b"')")
def test_adapt_9(self):
if self.conn.server_version < 90000:
@ -190,16 +178,17 @@ class HstoreTestCase(ConnectingTestCase):
ii = zip(kk, vv)
ii.sort()
def f(*args):
return tuple([filter_scs(self.conn, s) for s in args])
self.assertEqual(len(ii), len(o))
self.assertEqual(ii[0], f(b"E'a'", b"E'1'"))
self.assertEqual(ii[1], f(b"E'b'", b"E''''"))
self.assertEqual(ii[2], f(b"E'c'", b"NULL"))
self.assertQuotedEqual(ii[0][0], b"'a'")
self.assertQuotedEqual(ii[0][1], b"'1'")
self.assertQuotedEqual(ii[1][0], b"'b'")
self.assertQuotedEqual(ii[1][1], b"''''")
self.assertQuotedEqual(ii[2][0], b"'c'")
self.assertQuotedEqual(ii[2][1], b"NULL")
if 'd' in o:
encc = u'\xe0'.encode(psycopg2.extensions.encodings[self.conn.encoding])
self.assertEqual(ii[3], f(b"E'd'", b"E'" + encc + b"'"))
self.assertQuotedEqual(ii[3][0], b"'d'")
self.assertQuotedEqual(ii[3][1], b"'" + encc + b"'")
def test_parse(self):
from psycopg2.extras import HstoreAdapter

View File

@ -24,10 +24,11 @@
# Use unittest2 if available. Otherwise mock a skip facility with warnings.
import re
import os
import platform
import sys
import select
import platform
from functools import wraps
from testconfig import dsn, repl_dsn
@ -82,6 +83,13 @@ if (not hasattr(unittest.TestCase, 'assert_')
unittest.TestCase.failUnlessEqual = unittest.TestCase.assertEqual
def assertDsnEqual(self, dsn1, dsn2, msg=None):
"""Check that two conninfo string have the same content"""
self.assertEqual(set(dsn1.split()), set(dsn2.split()), msg)
unittest.TestCase.assertDsnEqual = assertDsnEqual
class ConnectingTestCase(unittest.TestCase):
"""A test case providing connections for tests.
@ -100,6 +108,18 @@ class ConnectingTestCase(unittest.TestCase):
if not conn.closed:
conn.close()
def assertQuotedEqual(self, first, second, msg=None):
"""Compare two quoted strings disregarding eventual E'' quotes"""
def f(s):
if isinstance(s, unicode):
return re.sub(r"\bE'", "'", s)
elif isinstance(first, bytes):
return re.sub(br"\bE'", b"'", s)
else:
return s
return self.assertEqual(f(first), f(second), msg)
def connect(self, **kwargs):
try:
self._conns
@ -473,7 +493,3 @@ def slow(f):
return self.skipTest("slow test")
return f(self)
return slow_
def assertDsnEqual(testsuite, dsn1, dsn2):
testsuite.assertEqual(set(dsn1.split()), set(dsn2.split()))