Merge branch 'make_dsn'

Close issue #363 instead of the proposed merge request.
This commit is contained in:
Daniele Varrazzo 2016-03-03 17:10:39 +00:00
commit d43b74681f
8 changed files with 236 additions and 125 deletions

5
NEWS
View File

@ -6,7 +6,10 @@ What's new in psycopg 2.7
New features:
- Added `~psycopg2.extensions.parse_dsn()` function (:ticket:`#321`).
- Added `~psycopg2.extensions.parse_dsn()` and
`~psycopg2.extensions.make_dsn()` functions (:tickets:`#321, #363`).
`~psycopg2.connect()` now can take both *dsn* and keyword arguments, merging
them together.
- Added `~psycopg2.__libpq_version__` and
`~psycopg2.extensions.libpq_version()` to inspect the version of the
``libpq`` library the module was compiled/loaded with

View File

@ -491,20 +491,53 @@ Other functions
.. __: http://www.postgresql.org/docs/current/static/libpq-misc.html#LIBPQ-PQLIBVERSION
.. function:: make_dsn(dsn=None, \*\*kwargs)
Create a valid connection string from arguments.
Put together the arguments in *kwargs* into a connection string. If *dsn*
is specified too, merge the arguments coming from both the sources. If the
same argument name is specified in both the sources, the *kwargs* value
overrides the *dsn* value.
The input arguments are validated: the output should always be a valid
connection string (as far as `parse_dsn()` is concerned). If not raise
`~psycopg2.ProgrammingError`.
Example::
>>> from psycopg2.extensions import make_dsn
>>> make_dsn('dbname=foo host=example.com', password="s3cr3t")
'host=example.com password=s3cr3t dbname=foo'
.. versionadded:: 2.7
.. function:: parse_dsn(dsn)
Parse connection string into a dictionary of keywords and values.
Uses libpq's ``PQconninfoParse`` to parse the string according to
accepted format(s) and check for supported keywords.
Parsing is delegated to the libpq: different versions of the client
library may support different formats or parameters (for example,
`connection URIs`__ are only supported from libpq 9.2). Raise
`~psycopg2.ProgrammingError` if the *dsn* is not valid.
.. __: http://www.postgresql.org/docs/current/static/libpq-connect.html#LIBPQ-CONNSTRING
Example::
>>> psycopg2.extensions.parse_dsn('dbname=test user=postgres password=secret')
>>> from psycopg2.extensions import parse_dsn
>>> parse_dsn('dbname=test user=postgres password=secret')
{'password': 'secret', 'user': 'postgres', 'dbname': 'test'}
>>> parse_dsn("postgresql://someone@example.com/somedb?connect_timeout=10")
{'host': 'example.com', 'user': 'someone', 'dbname': 'somedb', 'connect_timeout': '10'}
.. versionadded:: 2.7
.. seealso:: libpq docs for `PQconninfoParse()`__.
.. __: http://www.postgresql.org/docs/current/static/libpq-connect.html#LIBPQ-PQCONNINFOPARSE
.. function:: quote_ident(str, scope)

View File

@ -17,30 +17,25 @@ The module interface respects the standard defined in the |DBAPI|_.
single: DSN (Database Source Name)
.. function::
connect(dsn, connection_factory=None, cursor_factory=None, async=False)
connect(\*\*kwargs, connection_factory=None, cursor_factory=None, async=False)
connect(dsn, connection_factory=None, cursor_factory=None, async=False, \*\*kwargs)
Create a new database session and return a new `connection` object.
The connection parameters can be specified either as a `libpq connection
The connection parameters can be specified as a `libpq connection
string`__ using the *dsn* parameter::
conn = psycopg2.connect("dbname=test user=postgres password=secret")
or using a set of keyword arguments::
conn = psycopg2.connect(database="test", user="postgres", password="secret")
conn = psycopg2.connect(dbname"test", user="postgres", password="secret")
The two call styles are mutually exclusive: you cannot specify connection
parameters as keyword arguments together with a connection string; only
the parameters not needed for the database connection (*i.e.*
*connection_factory*, *cursor_factory*, and *async*) are supported
together with the *dsn* argument.
or using a mix of both: if the same parameter name is specified in both
sources the *kwargs* value will have precedence over the *dsn* value.
The basic connection parameters are:
- `!dbname` -- the database name (only in the *dsn* string)
- `!database` -- the database name (only as keyword argument)
- `!dbname` -- the database name (`!database` is a deprecated alias)
- `!user` -- user name used to authenticate
- `!password` -- password used to authenticate
- `!host` -- database host address (defaults to UNIX socket if not provided)
@ -76,6 +71,9 @@ The module interface respects the standard defined in the |DBAPI|_.
.. versionchanged:: 2.5
added the *cursor_factory* parameter.
.. versionchanged:: 2.7
both *dsn* and keyword arguments can be specified.
.. seealso::
- `~psycopg2.extensions.parse_dsn`

View File

@ -80,32 +80,13 @@ else:
_ext.register_adapter(Decimal, Adapter)
del Decimal, Adapter
import re
def _param_escape(s,
re_escape=re.compile(r"([\\'])"),
re_space=re.compile(r'\s')):
"""
Apply the escaping rule required by PQconnectdb
"""
if not s: return "''"
s = re_escape.sub(r'\\\1', s)
if re_space.search(s):
s = "'" + s + "'"
return s
del re
def connect(dsn=None,
database=None, user=None, password=None, host=None, port=None,
connection_factory=None, cursor_factory=None, async=False, **kwargs):
def connect(dsn=None, connection_factory=None, cursor_factory=None,
async=False, **kwargs):
"""
Create a new database connection.
The connection parameters can be specified either as a string:
The connection parameters can be specified as a string:
conn = psycopg2.connect("dbname=test user=postgres password=secret")
@ -113,9 +94,9 @@ def connect(dsn=None,
conn = psycopg2.connect(database="test", user="postgres", password="secret")
The basic connection parameters are:
Or as a mix of both. The basic connection parameters are:
- *dbname*: the database name (only in dsn string)
- *dbname*: the database name
- *database*: the database name (only as keyword argument)
- *user*: user name used to authenticate
- *password*: password used to authenticate
@ -135,32 +116,10 @@ def connect(dsn=None,
library: the list of supported parameters depends on the library version.
"""
items = []
if database is not None:
items.append(('dbname', database))
if user is not None:
items.append(('user', user))
if password is not None:
items.append(('password', password))
if host is not None:
items.append(('host', host))
if port is not None:
items.append(('port', port))
items.extend([(k, v) for (k, v) in kwargs.iteritems() if v is not None])
if dsn is not None and items:
raise TypeError(
"'%s' is an invalid keyword argument when the dsn is specified"
% items[0][0])
if dsn is None:
if not items:
raise TypeError('missing dsn and no parameters')
else:
dsn = " ".join(["%s=%s" % (k, _param_escape(str(v)))
for (k, v) in items])
if dsn is None and not kwargs:
raise TypeError('missing dsn and no parameters')
dsn = _ext.make_dsn(dsn, **kwargs)
conn = _connect(dsn, connection_factory=connection_factory, async=async)
if cursor_factory is not None:
conn.cursor_factory = cursor_factory

View File

@ -7,7 +7,7 @@ This module holds all the extensions to the DBAPI-2.0 provided by psycopg.
- `lobject` -- the new-type inheritable large object class
- `adapt()` -- exposes the PEP-246_ compatible adapting mechanism used
by psycopg to adapt Python types to PostgreSQL ones
.. _PEP-246: http://www.python.org/peps/pep-0246.html
"""
# psycopg/extensions.py - DBAPI-2.0 extensions specific to psycopg
@ -32,6 +32,9 @@ This module holds all the extensions to the DBAPI-2.0 provided by psycopg.
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
import re as _re
import sys as _sys
from psycopg2._psycopg import UNICODE, INTEGER, LONGINTEGER, BOOLEAN, FLOAT
from psycopg2._psycopg import TIME, DATE, INTERVAL, DECIMAL
from psycopg2._psycopg import BINARYARRAY, BOOLEANARRAY, DATEARRAY, DATETIMEARRAY
@ -56,7 +59,8 @@ try:
except ImportError:
pass
from psycopg2._psycopg import adapt, adapters, encodings, connection, cursor, lobject, Xid, libpq_version, parse_dsn, quote_ident
from psycopg2._psycopg import adapt, adapters, encodings, connection, cursor
from psycopg2._psycopg import lobject, Xid, libpq_version, parse_dsn, quote_ident
from psycopg2._psycopg import string_types, binary_types, new_type, new_array_type, register_type
from psycopg2._psycopg import ISQLQuote, Notify, Diagnostics, Column
@ -98,7 +102,6 @@ TRANSACTION_STATUS_INTRANS = 2
TRANSACTION_STATUS_INERROR = 3
TRANSACTION_STATUS_UNKNOWN = 4
import sys as _sys
# Return bytes from a string
if _sys.version_info[0] < 3:
@ -108,6 +111,7 @@ else:
def b(s):
return s.encode('utf8')
def register_adapter(typ, callable):
"""Register 'callable' as an ISQLQuote adapter for type 'typ'."""
adapters[(typ, ISQLQuote)] = callable
@ -151,6 +155,53 @@ class NoneAdapter(object):
return _null
def make_dsn(dsn=None, **kwargs):
"""Convert a set of keywords into a connection strings."""
if dsn is None and not kwargs:
return ''
# If no kwarg is specified don't mung the dsn, but verify it
if not kwargs:
parse_dsn(dsn)
return dsn
# Override the dsn with the parameters
if 'database' in kwargs:
if 'dbname' in kwargs:
raise TypeError(
"you can't specify both 'database' and 'dbname' arguments")
kwargs['dbname'] = kwargs.pop('database')
if dsn is not None:
tmp = parse_dsn(dsn)
tmp.update(kwargs)
kwargs = tmp
dsn = " ".join(["%s=%s" % (k, _param_escape(str(v)))
for (k, v) in kwargs.iteritems()])
# verify that the returned dsn is valid
parse_dsn(dsn)
return dsn
def _param_escape(s,
re_escape=_re.compile(r"([\\'])"),
re_space=_re.compile(r'\s')):
"""
Apply the escaping rule required by PQconnectdb
"""
if not s:
return "''"
s = re_escape.sub(r'\\\1', s)
if re_space.search(s):
s = "'" + s + "'"
return s
# Create default json typecasters for PostgreSQL 9.2 oids
from psycopg2._json import register_default_json, register_default_jsonb

View File

@ -133,7 +133,7 @@ psyco_parse_dsn(PyObject *self, PyObject *args, PyObject *kwargs)
options = PQconninfoParse(Bytes_AS_STRING(dsn), &err);
if (options == NULL) {
if (err != NULL) {
PyErr_Format(ProgrammingError, "error parsing the dsn: %s", err);
PyErr_Format(ProgrammingError, "invalid dsn: %s", err);
PQfreemem(err);
} else {
PyErr_SetString(OperationalError, "PQconninfoParse() failed");

View File

@ -32,6 +32,7 @@ from StringIO import StringIO
import psycopg2
import psycopg2.errorcodes
import psycopg2.extensions
ext = psycopg2.extensions
from testutils import unittest, decorate_all_tests, skip_if_no_superuser
from testutils import skip_before_postgres, skip_after_postgres, skip_before_libpq
@ -125,7 +126,7 @@ class ConnectionTests(ConnectingTestCase):
if self.conn.server_version >= 90300:
cur.execute("set client_min_messages=debug1")
for i in range(0, 100, 10):
sql = " ".join(["create temp table table%d (id serial);" % j for j in range(i, i+10)])
sql = " ".join(["create temp table table%d (id serial);" % j for j in range(i, i + 10)])
cur.execute(sql)
self.assertEqual(50, len(conn.notices))
@ -151,7 +152,7 @@ class ConnectionTests(ConnectingTestCase):
# not limited, but no error
for i in range(0, 100, 10):
sql = " ".join(["create temp table table2_%d (id serial);" % j for j in range(i, i+10)])
sql = " ".join(["create temp table table2_%d (id serial);" % j for j in range(i, i + 10)])
cur.execute(sql)
self.assertEqual(len([n for n in conn.notices if 'CREATE TABLE' in n]),
@ -172,7 +173,7 @@ class ConnectionTests(ConnectingTestCase):
self.assert_(self.conn.server_version)
def test_protocol_version(self):
self.assert_(self.conn.protocol_version in (2,3),
self.assert_(self.conn.protocol_version in (2, 3),
self.conn.protocol_version)
def test_tpc_unsupported(self):
@ -252,7 +253,7 @@ class ConnectionTests(ConnectingTestCase):
t1.start()
i = 1
for i in range(1000):
cur.execute("select %s;",(i,))
cur.execute("select %s;", (i,))
conn.commit()
while conn.notices:
notices.append((1, conn.notices.pop()))
@ -313,16 +314,15 @@ class ConnectionTests(ConnectingTestCase):
class ParseDsnTestCase(ConnectingTestCase):
def test_parse_dsn(self):
from psycopg2 import ProgrammingError
from psycopg2.extensions import parse_dsn
self.assertEqual(parse_dsn('dbname=test user=tester password=secret'),
self.assertEqual(ext.parse_dsn('dbname=test user=tester password=secret'),
dict(user='tester', password='secret', dbname='test'),
"simple DSN parsed")
self.assertRaises(ProgrammingError, parse_dsn,
self.assertRaises(ProgrammingError, ext.parse_dsn,
"dbname=test 2 user=tester password=secret")
self.assertEqual(parse_dsn("dbname='test 2' user=tester password=secret"),
self.assertEqual(ext.parse_dsn("dbname='test 2' user=tester password=secret"),
dict(user='tester', password='secret', dbname='test 2'),
"DSN with quoting parsed")
@ -332,7 +332,7 @@ class ParseDsnTestCase(ConnectingTestCase):
raised = False
try:
# unterminated quote after dbname:
parse_dsn("dbname='test 2 user=tester password=secret")
ext.parse_dsn("dbname='test 2 user=tester password=secret")
except ProgrammingError, e:
raised = True
self.assertTrue(str(e).find('secret') < 0,
@ -343,16 +343,14 @@ class ParseDsnTestCase(ConnectingTestCase):
@skip_before_libpq(9, 2)
def test_parse_dsn_uri(self):
from psycopg2.extensions import parse_dsn
self.assertEqual(parse_dsn('postgresql://tester:secret@/test'),
self.assertEqual(ext.parse_dsn('postgresql://tester:secret@/test'),
dict(user='tester', password='secret', dbname='test'),
"valid URI dsn parsed")
raised = False
try:
# extra '=' after port value
parse_dsn(dsn='postgresql://tester:secret@/test?port=1111=x')
ext.parse_dsn(dsn='postgresql://tester:secret@/test?port=1111=x')
except psycopg2.ProgrammingError, e:
raised = True
self.assertTrue(str(e).find('secret') < 0,
@ -362,24 +360,91 @@ class ParseDsnTestCase(ConnectingTestCase):
self.assertTrue(raised, "ProgrammingError raised due to invalid URI")
def test_unicode_value(self):
from psycopg2.extensions import parse_dsn
snowman = u"\u2603"
d = parse_dsn('dbname=' + snowman)
d = ext.parse_dsn('dbname=' + snowman)
if sys.version_info[0] < 3:
self.assertEqual(d['dbname'], snowman.encode('utf8'))
else:
self.assertEqual(d['dbname'], snowman)
def test_unicode_key(self):
from psycopg2.extensions import parse_dsn
snowman = u"\u2603"
self.assertRaises(psycopg2.ProgrammingError, parse_dsn,
self.assertRaises(psycopg2.ProgrammingError, ext.parse_dsn,
snowman + '=' + snowman)
def test_bad_param(self):
from psycopg2.extensions import parse_dsn
self.assertRaises(TypeError, parse_dsn, None)
self.assertRaises(TypeError, parse_dsn, 42)
self.assertRaises(TypeError, ext.parse_dsn, None)
self.assertRaises(TypeError, ext.parse_dsn, 42)
class MakeDsnTestCase(ConnectingTestCase):
def assertDsnEqual(self, dsn1, dsn2):
self.assertEqual(set(dsn1.split()), set(dsn2.split()))
def test_empty_arguments(self):
self.assertEqual(ext.make_dsn(), '')
def test_empty_string(self):
dsn = ext.make_dsn('')
self.assertEqual(dsn, '')
def test_params_validation(self):
self.assertRaises(psycopg2.ProgrammingError,
ext.make_dsn, 'dbnamo=a')
self.assertRaises(psycopg2.ProgrammingError,
ext.make_dsn, dbnamo='a')
self.assertRaises(psycopg2.ProgrammingError,
ext.make_dsn, 'dbname=a', nosuchparam='b')
def test_empty_param(self):
dsn = ext.make_dsn(dbname='sony', password='')
self.assertDsnEqual(dsn, "dbname=sony password=''")
def test_escape(self):
dsn = ext.make_dsn(dbname='hello world')
self.assertEqual(dsn, "dbname='hello world'")
dsn = ext.make_dsn(dbname=r'back\slash')
self.assertEqual(dsn, r"dbname=back\\slash")
dsn = ext.make_dsn(dbname="quo'te")
self.assertEqual(dsn, r"dbname=quo\'te")
dsn = ext.make_dsn(dbname="with\ttab")
self.assertEqual(dsn, "dbname='with\ttab'")
dsn = ext.make_dsn(dbname=r"\every thing'")
self.assertEqual(dsn, r"dbname='\\every thing\''")
def test_database_is_a_keyword(self):
self.assertEqual(ext.make_dsn(database='sigh'), "dbname=sigh")
def test_params_merging(self):
dsn = ext.make_dsn('dbname=foo host=bar', host='baz')
self.assertDsnEqual(dsn, 'dbname=foo host=baz')
dsn = ext.make_dsn('dbname=foo', user='postgres')
self.assertDsnEqual(dsn, 'dbname=foo user=postgres')
def test_no_dsn_munging(self):
dsnin = 'dbname=a host=b user=c password=d'
dsn = ext.make_dsn(dsnin)
self.assertEqual(dsn, dsnin)
@skip_before_libpq(9, 2)
def test_url_is_cool(self):
url = 'postgresql://tester:secret@/test?application_name=wat'
dsn = ext.make_dsn(url)
self.assertEqual(dsn, url)
dsn = ext.make_dsn(url, application_name='woot')
self.assertDsnEqual(dsn,
'dbname=test user=tester password=secret application_name=woot')
self.assertRaises(psycopg2.ProgrammingError,
ext.make_dsn, 'postgresql://tester:secret@/test?nosuch=param')
self.assertRaises(psycopg2.ProgrammingError,
ext.make_dsn, url, nosuch="param")
class IsolationLevelsTestCase(ConnectingTestCase):
@ -587,7 +652,7 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
cnn.close()
return
gids = [ r[0] for r in cur ]
gids = [r[0] for r in cur]
for gid in gids:
cur.execute("rollback prepared %s;", (gid,))
cnn.close()
@ -761,13 +826,13 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
def test_status_after_recover(self):
cnn = self.connect()
self.assertEqual(psycopg2.extensions.STATUS_READY, cnn.status)
xns = cnn.tpc_recover()
cnn.tpc_recover()
self.assertEqual(psycopg2.extensions.STATUS_READY, cnn.status)
cur = cnn.cursor()
cur.execute("select 1")
self.assertEqual(psycopg2.extensions.STATUS_BEGIN, cnn.status)
xns = cnn.tpc_recover()
cnn.tpc_recover()
self.assertEqual(psycopg2.extensions.STATUS_BEGIN, cnn.status)
def test_recovered_xids(self):
@ -789,12 +854,12 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
cnn = self.connect()
xids = cnn.tpc_recover()
xids = [ xid for xid in xids if xid.database == dbname ]
xids = [xid for xid in xids if xid.database == dbname]
xids.sort(key=attrgetter('gtrid'))
# check the values returned
self.assertEqual(len(okvals), len(xids))
for (xid, (gid, prepared, owner, database)) in zip (xids, okvals):
for (xid, (gid, prepared, owner, database)) in zip(xids, okvals):
self.assertEqual(xid.gtrid, gid)
self.assertEqual(xid.prepared, prepared)
self.assertEqual(xid.owner, owner)
@ -825,8 +890,7 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
cnn.close()
cnn = self.connect()
xids = [ xid for xid in cnn.tpc_recover()
if xid.database == dbname ]
xids = [x for x in cnn.tpc_recover() if x.database == dbname]
self.assertEqual(1, len(xids))
xid = xids[0]
self.assertEqual(xid.format_id, fid)
@ -847,8 +911,7 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
cnn.close()
cnn = self.connect()
xids = [ xid for xid in cnn.tpc_recover()
if xid.database == dbname ]
xids = [x for x in cnn.tpc_recover() if x.database == dbname]
self.assertEqual(1, len(xids))
xid = xids[0]
self.assertEqual(xid.format_id, None)
@ -893,8 +956,7 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
cnn.tpc_begin(x1)
cnn.tpc_prepare()
cnn.reset()
xid = [ xid for xid in cnn.tpc_recover()
if xid.database == dbname ][0]
xid = [x for x in cnn.tpc_recover() if x.database == dbname][0]
self.assertEqual(10, xid.format_id)
self.assertEqual('uni', xid.gtrid)
self.assertEqual('code', xid.bqual)
@ -909,8 +971,7 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
cnn.tpc_prepare()
cnn.reset()
xid = [ xid for xid in cnn.tpc_recover()
if xid.database == dbname ][0]
xid = [x for x in cnn.tpc_recover() if x.database == dbname][0]
self.assertEqual(None, xid.format_id)
self.assertEqual('transaction-id', xid.gtrid)
self.assertEqual(None, xid.bqual)
@ -929,7 +990,7 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
cnn.reset()
xids = cnn.tpc_recover()
xid = [ xid for xid in xids if xid.database == dbname ][0]
xid = [x for x in xids if x.database == dbname][0]
self.assertEqual(None, xid.format_id)
self.assertEqual('dict-connection', xid.gtrid)
self.assertEqual(None, xid.bqual)
@ -1182,7 +1243,8 @@ class ReplicationTest(ConnectingTestCase):
@skip_before_postgres(9, 0)
def test_replication_not_supported(self):
conn = self.repl_connect()
if conn is None: return
if conn is None:
return
cur = conn.cursor()
f = StringIO()
self.assertRaises(psycopg2.NotSupportedError,

View File

@ -31,9 +31,11 @@ from testutils import ConnectingTestCase, skip_copy_if_green, script_to_py3
import psycopg2
class ConnectTestCase(unittest.TestCase):
def setUp(self):
self.args = None
def conect_stub(dsn, connection_factory=None, async=False):
self.args = (dsn, connection_factory, async)
@ -43,6 +45,9 @@ class ConnectTestCase(unittest.TestCase):
def tearDown(self):
psycopg2._connect = self._connect_orig
def assertDsnEqual(self, dsn1, dsn2):
self.assertEqual(set(dsn1.split()), set(dsn2.split()))
def test_there_has_to_be_something(self):
self.assertRaises(TypeError, psycopg2.connect)
self.assertRaises(TypeError, psycopg2.connect,
@ -57,8 +62,8 @@ class ConnectTestCase(unittest.TestCase):
self.assertEqual(self.args[2], False)
def test_dsn(self):
psycopg2.connect('dbname=blah x=y')
self.assertEqual(self.args[0], 'dbname=blah x=y')
psycopg2.connect('dbname=blah host=y')
self.assertEqual(self.args[0], 'dbname=blah host=y')
self.assertEqual(self.args[1], None)
self.assertEqual(self.args[2], False)
@ -83,37 +88,37 @@ class ConnectTestCase(unittest.TestCase):
self.assertEqual(len(self.args[0].split()), 4)
def test_generic_keywords(self):
psycopg2.connect(foo='bar')
self.assertEqual(self.args[0], 'foo=bar')
psycopg2.connect(options='stuff')
self.assertEqual(self.args[0], 'options=stuff')
def test_factory(self):
def f(dsn, async=False):
pass
psycopg2.connect(database='foo', bar='baz', connection_factory=f)
self.assertEqual(self.args[0], 'dbname=foo bar=baz')
psycopg2.connect(database='foo', host='baz', connection_factory=f)
self.assertDsnEqual(self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], f)
self.assertEqual(self.args[2], False)
psycopg2.connect("dbname=foo bar=baz", connection_factory=f)
self.assertEqual(self.args[0], 'dbname=foo bar=baz')
psycopg2.connect("dbname=foo host=baz", connection_factory=f)
self.assertDsnEqual(self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], f)
self.assertEqual(self.args[2], False)
def test_async(self):
psycopg2.connect(database='foo', bar='baz', async=1)
self.assertEqual(self.args[0], 'dbname=foo bar=baz')
psycopg2.connect(database='foo', host='baz', async=1)
self.assertDsnEqual(self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], None)
self.assert_(self.args[2])
psycopg2.connect("dbname=foo bar=baz", async=True)
self.assertEqual(self.args[0], 'dbname=foo bar=baz')
psycopg2.connect("dbname=foo host=baz", async=True)
self.assertDsnEqual(self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], None)
self.assert_(self.args[2])
def test_empty_param(self):
psycopg2.connect(database='sony', password='')
self.assertEqual(self.args[0], "dbname=sony password=''")
self.assertDsnEqual(self.args[0], "dbname=sony password=''")
def test_escape(self):
psycopg2.connect(database='hello world')
@ -131,13 +136,12 @@ class ConnectTestCase(unittest.TestCase):
psycopg2.connect(database=r"\every thing'")
self.assertEqual(self.args[0], r"dbname='\\every thing\''")
def test_no_kwargs_swallow(self):
self.assertRaises(TypeError,
psycopg2.connect, 'dbname=foo', database='foo')
self.assertRaises(TypeError,
psycopg2.connect, 'dbname=foo', user='postgres')
self.assertRaises(TypeError,
psycopg2.connect, 'dbname=foo', no_such_param='meh')
def test_params_merging(self):
psycopg2.connect('dbname=foo', database='bar')
self.assertEqual(self.args[0], 'dbname=bar')
psycopg2.connect('dbname=foo', user='postgres')
self.assertDsnEqual(self.args[0], 'dbname=foo user=postgres')
class ExceptionsTestCase(ConnectingTestCase):
@ -203,7 +207,8 @@ class ExceptionsTestCase(ConnectingTestCase):
self.assertEqual(diag.sqlstate, '42P01')
del diag
gc.collect(); gc.collect()
gc.collect()
gc.collect()
assert(w() is None)
@skip_copy_if_green
@ -325,7 +330,7 @@ class TestVersionDiscovery(unittest.TestCase):
self.assertTrue(type(psycopg2.__libpq_version__) is int)
try:
self.assertTrue(type(psycopg2.extensions.libpq_version()) is int)
except NotSupportedError:
except psycopg2.NotSupportedError:
self.assertTrue(psycopg2.__libpq_version__ < 90100)