mirror of
https://github.com/psycopg/psycopg2.git
synced 2024-11-22 08:56:34 +03:00
Implementation of make_dsn in Python
This is equivalent to what proposed in #363, but with a much simpler implementation.
This commit is contained in:
parent
d40f81865f
commit
1c4523f0ac
|
@ -80,28 +80,9 @@ else:
|
||||||
_ext.register_adapter(Decimal, Adapter)
|
_ext.register_adapter(Decimal, Adapter)
|
||||||
del Decimal, Adapter
|
del Decimal, Adapter
|
||||||
|
|
||||||
import re
|
|
||||||
|
|
||||||
def _param_escape(s,
|
def connect(dsn=None, connection_factory=None, cursor_factory=None,
|
||||||
re_escape=re.compile(r"([\\'])"),
|
async=False, **kwargs):
|
||||||
re_space=re.compile(r'\s')):
|
|
||||||
"""
|
|
||||||
Apply the escaping rule required by PQconnectdb
|
|
||||||
"""
|
|
||||||
if not s: return "''"
|
|
||||||
|
|
||||||
s = re_escape.sub(r'\\\1', s)
|
|
||||||
if re_space.search(s):
|
|
||||||
s = "'" + s + "'"
|
|
||||||
|
|
||||||
return s
|
|
||||||
|
|
||||||
del re
|
|
||||||
|
|
||||||
|
|
||||||
def connect(dsn=None,
|
|
||||||
database=None, user=None, password=None, host=None, port=None,
|
|
||||||
connection_factory=None, cursor_factory=None, async=False, **kwargs):
|
|
||||||
"""
|
"""
|
||||||
Create a new database connection.
|
Create a new database connection.
|
||||||
|
|
||||||
|
@ -115,7 +96,7 @@ def connect(dsn=None,
|
||||||
|
|
||||||
The basic connection parameters are:
|
The basic connection parameters are:
|
||||||
|
|
||||||
- *dbname*: the database name (only in dsn string)
|
- *dbname*: the database name
|
||||||
- *database*: the database name (only as keyword argument)
|
- *database*: the database name (only as keyword argument)
|
||||||
- *user*: user name used to authenticate
|
- *user*: user name used to authenticate
|
||||||
- *password*: password used to authenticate
|
- *password*: password used to authenticate
|
||||||
|
@ -135,32 +116,10 @@ def connect(dsn=None,
|
||||||
library: the list of supported parameters depends on the library version.
|
library: the list of supported parameters depends on the library version.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
items = []
|
if dsn is None and not kwargs:
|
||||||
if database is not None:
|
raise TypeError('missing dsn and no parameters')
|
||||||
items.append(('dbname', database))
|
|
||||||
if user is not None:
|
|
||||||
items.append(('user', user))
|
|
||||||
if password is not None:
|
|
||||||
items.append(('password', password))
|
|
||||||
if host is not None:
|
|
||||||
items.append(('host', host))
|
|
||||||
if port is not None:
|
|
||||||
items.append(('port', port))
|
|
||||||
|
|
||||||
items.extend([(k, v) for (k, v) in kwargs.iteritems() if v is not None])
|
|
||||||
|
|
||||||
if dsn is not None and items:
|
|
||||||
raise TypeError(
|
|
||||||
"'%s' is an invalid keyword argument when the dsn is specified"
|
|
||||||
% items[0][0])
|
|
||||||
|
|
||||||
if dsn is None:
|
|
||||||
if not items:
|
|
||||||
raise TypeError('missing dsn and no parameters')
|
|
||||||
else:
|
|
||||||
dsn = " ".join(["%s=%s" % (k, _param_escape(str(v)))
|
|
||||||
for (k, v) in items])
|
|
||||||
|
|
||||||
|
dsn = _ext.make_dsn(dsn, **kwargs)
|
||||||
conn = _connect(dsn, connection_factory=connection_factory, async=async)
|
conn = _connect(dsn, connection_factory=connection_factory, async=async)
|
||||||
if cursor_factory is not None:
|
if cursor_factory is not None:
|
||||||
conn.cursor_factory = cursor_factory
|
conn.cursor_factory = cursor_factory
|
||||||
|
|
|
@ -7,7 +7,7 @@ This module holds all the extensions to the DBAPI-2.0 provided by psycopg.
|
||||||
- `lobject` -- the new-type inheritable large object class
|
- `lobject` -- the new-type inheritable large object class
|
||||||
- `adapt()` -- exposes the PEP-246_ compatible adapting mechanism used
|
- `adapt()` -- exposes the PEP-246_ compatible adapting mechanism used
|
||||||
by psycopg to adapt Python types to PostgreSQL ones
|
by psycopg to adapt Python types to PostgreSQL ones
|
||||||
|
|
||||||
.. _PEP-246: http://www.python.org/peps/pep-0246.html
|
.. _PEP-246: http://www.python.org/peps/pep-0246.html
|
||||||
"""
|
"""
|
||||||
# psycopg/extensions.py - DBAPI-2.0 extensions specific to psycopg
|
# psycopg/extensions.py - DBAPI-2.0 extensions specific to psycopg
|
||||||
|
@ -32,6 +32,9 @@ This module holds all the extensions to the DBAPI-2.0 provided by psycopg.
|
||||||
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
|
||||||
# License for more details.
|
# License for more details.
|
||||||
|
|
||||||
|
import re as _re
|
||||||
|
import sys as _sys
|
||||||
|
|
||||||
from psycopg2._psycopg import UNICODE, INTEGER, LONGINTEGER, BOOLEAN, FLOAT
|
from psycopg2._psycopg import UNICODE, INTEGER, LONGINTEGER, BOOLEAN, FLOAT
|
||||||
from psycopg2._psycopg import TIME, DATE, INTERVAL, DECIMAL
|
from psycopg2._psycopg import TIME, DATE, INTERVAL, DECIMAL
|
||||||
from psycopg2._psycopg import BINARYARRAY, BOOLEANARRAY, DATEARRAY, DATETIMEARRAY
|
from psycopg2._psycopg import BINARYARRAY, BOOLEANARRAY, DATEARRAY, DATETIMEARRAY
|
||||||
|
@ -56,7 +59,8 @@ try:
|
||||||
except ImportError:
|
except ImportError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
from psycopg2._psycopg import adapt, adapters, encodings, connection, cursor, lobject, Xid, libpq_version, parse_dsn, quote_ident
|
from psycopg2._psycopg import adapt, adapters, encodings, connection, cursor
|
||||||
|
from psycopg2._psycopg import lobject, Xid, libpq_version, parse_dsn, quote_ident
|
||||||
from psycopg2._psycopg import string_types, binary_types, new_type, new_array_type, register_type
|
from psycopg2._psycopg import string_types, binary_types, new_type, new_array_type, register_type
|
||||||
from psycopg2._psycopg import ISQLQuote, Notify, Diagnostics, Column
|
from psycopg2._psycopg import ISQLQuote, Notify, Diagnostics, Column
|
||||||
|
|
||||||
|
@ -98,7 +102,6 @@ TRANSACTION_STATUS_INTRANS = 2
|
||||||
TRANSACTION_STATUS_INERROR = 3
|
TRANSACTION_STATUS_INERROR = 3
|
||||||
TRANSACTION_STATUS_UNKNOWN = 4
|
TRANSACTION_STATUS_UNKNOWN = 4
|
||||||
|
|
||||||
import sys as _sys
|
|
||||||
|
|
||||||
# Return bytes from a string
|
# Return bytes from a string
|
||||||
if _sys.version_info[0] < 3:
|
if _sys.version_info[0] < 3:
|
||||||
|
@ -108,6 +111,7 @@ else:
|
||||||
def b(s):
|
def b(s):
|
||||||
return s.encode('utf8')
|
return s.encode('utf8')
|
||||||
|
|
||||||
|
|
||||||
def register_adapter(typ, callable):
|
def register_adapter(typ, callable):
|
||||||
"""Register 'callable' as an ISQLQuote adapter for type 'typ'."""
|
"""Register 'callable' as an ISQLQuote adapter for type 'typ'."""
|
||||||
adapters[(typ, ISQLQuote)] = callable
|
adapters[(typ, ISQLQuote)] = callable
|
||||||
|
@ -151,6 +155,41 @@ class NoneAdapter(object):
|
||||||
return _null
|
return _null
|
||||||
|
|
||||||
|
|
||||||
|
def make_dsn(dsn=None, **kwargs):
|
||||||
|
"""Convert a set of keywords into a connection strings."""
|
||||||
|
# Override the dsn with the parameters
|
||||||
|
if 'database' in kwargs:
|
||||||
|
if 'dbname' in kwargs:
|
||||||
|
raise TypeError(
|
||||||
|
"you can't specify both 'database' and 'dbname' arguments")
|
||||||
|
kwargs['dbname'] = kwargs.pop('database')
|
||||||
|
|
||||||
|
if dsn is not None:
|
||||||
|
tmp = parse_dsn(dsn)
|
||||||
|
tmp.update(kwargs)
|
||||||
|
kwargs = tmp
|
||||||
|
|
||||||
|
dsn = " ".join(["%s=%s" % (k, _param_escape(str(v)))
|
||||||
|
for (k, v) in kwargs.iteritems()])
|
||||||
|
return dsn
|
||||||
|
|
||||||
|
|
||||||
|
def _param_escape(s,
|
||||||
|
re_escape=_re.compile(r"([\\'])"),
|
||||||
|
re_space=_re.compile(r'\s')):
|
||||||
|
"""
|
||||||
|
Apply the escaping rule required by PQconnectdb
|
||||||
|
"""
|
||||||
|
if not s:
|
||||||
|
return "''"
|
||||||
|
|
||||||
|
s = re_escape.sub(r'\\\1', s)
|
||||||
|
if re_space.search(s):
|
||||||
|
s = "'" + s + "'"
|
||||||
|
|
||||||
|
return s
|
||||||
|
|
||||||
|
|
||||||
# Create default json typecasters for PostgreSQL 9.2 oids
|
# Create default json typecasters for PostgreSQL 9.2 oids
|
||||||
from psycopg2._json import register_default_json, register_default_jsonb
|
from psycopg2._json import register_default_json, register_default_jsonb
|
||||||
|
|
||||||
|
|
|
@ -43,6 +43,9 @@ class ConnectTestCase(unittest.TestCase):
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
psycopg2._connect = self._connect_orig
|
psycopg2._connect = self._connect_orig
|
||||||
|
|
||||||
|
def assertDsnEqual(self, dsn1, dsn2):
|
||||||
|
self.assertEqual(set(dsn1.split()), set(dsn2.split()))
|
||||||
|
|
||||||
def test_there_has_to_be_something(self):
|
def test_there_has_to_be_something(self):
|
||||||
self.assertRaises(TypeError, psycopg2.connect)
|
self.assertRaises(TypeError, psycopg2.connect)
|
||||||
self.assertRaises(TypeError, psycopg2.connect,
|
self.assertRaises(TypeError, psycopg2.connect,
|
||||||
|
@ -57,8 +60,8 @@ class ConnectTestCase(unittest.TestCase):
|
||||||
self.assertEqual(self.args[2], False)
|
self.assertEqual(self.args[2], False)
|
||||||
|
|
||||||
def test_dsn(self):
|
def test_dsn(self):
|
||||||
psycopg2.connect('dbname=blah x=y')
|
psycopg2.connect('dbname=blah application_name=y')
|
||||||
self.assertEqual(self.args[0], 'dbname=blah x=y')
|
self.assertDsnEqual(self.args[0], 'dbname=blah application_name=y')
|
||||||
self.assertEqual(self.args[1], None)
|
self.assertEqual(self.args[1], None)
|
||||||
self.assertEqual(self.args[2], False)
|
self.assertEqual(self.args[2], False)
|
||||||
|
|
||||||
|
@ -90,30 +93,30 @@ class ConnectTestCase(unittest.TestCase):
|
||||||
def f(dsn, async=False):
|
def f(dsn, async=False):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
psycopg2.connect(database='foo', bar='baz', connection_factory=f)
|
psycopg2.connect(database='foo', application_name='baz', connection_factory=f)
|
||||||
self.assertEqual(self.args[0], 'dbname=foo bar=baz')
|
self.assertDsnEqual(self.args[0], 'dbname=foo application_name=baz')
|
||||||
self.assertEqual(self.args[1], f)
|
self.assertEqual(self.args[1], f)
|
||||||
self.assertEqual(self.args[2], False)
|
self.assertEqual(self.args[2], False)
|
||||||
|
|
||||||
psycopg2.connect("dbname=foo bar=baz", connection_factory=f)
|
psycopg2.connect("dbname=foo application_name=baz", connection_factory=f)
|
||||||
self.assertEqual(self.args[0], 'dbname=foo bar=baz')
|
self.assertDsnEqual(self.args[0], 'dbname=foo application_name=baz')
|
||||||
self.assertEqual(self.args[1], f)
|
self.assertEqual(self.args[1], f)
|
||||||
self.assertEqual(self.args[2], False)
|
self.assertEqual(self.args[2], False)
|
||||||
|
|
||||||
def test_async(self):
|
def test_async(self):
|
||||||
psycopg2.connect(database='foo', bar='baz', async=1)
|
psycopg2.connect(database='foo', application_name='baz', async=1)
|
||||||
self.assertEqual(self.args[0], 'dbname=foo bar=baz')
|
self.assertDsnEqual(self.args[0], 'dbname=foo application_name=baz')
|
||||||
self.assertEqual(self.args[1], None)
|
self.assertEqual(self.args[1], None)
|
||||||
self.assert_(self.args[2])
|
self.assert_(self.args[2])
|
||||||
|
|
||||||
psycopg2.connect("dbname=foo bar=baz", async=True)
|
psycopg2.connect("dbname=foo application_name=baz", async=True)
|
||||||
self.assertEqual(self.args[0], 'dbname=foo bar=baz')
|
self.assertDsnEqual(self.args[0], 'dbname=foo application_name=baz')
|
||||||
self.assertEqual(self.args[1], None)
|
self.assertEqual(self.args[1], None)
|
||||||
self.assert_(self.args[2])
|
self.assert_(self.args[2])
|
||||||
|
|
||||||
def test_empty_param(self):
|
def test_empty_param(self):
|
||||||
psycopg2.connect(database='sony', password='')
|
psycopg2.connect(database='sony', password='')
|
||||||
self.assertEqual(self.args[0], "dbname=sony password=''")
|
self.assertDsnEqual(self.args[0], "dbname=sony password=''")
|
||||||
|
|
||||||
def test_escape(self):
|
def test_escape(self):
|
||||||
psycopg2.connect(database='hello world')
|
psycopg2.connect(database='hello world')
|
||||||
|
@ -131,13 +134,12 @@ class ConnectTestCase(unittest.TestCase):
|
||||||
psycopg2.connect(database=r"\every thing'")
|
psycopg2.connect(database=r"\every thing'")
|
||||||
self.assertEqual(self.args[0], r"dbname='\\every thing\''")
|
self.assertEqual(self.args[0], r"dbname='\\every thing\''")
|
||||||
|
|
||||||
def test_no_kwargs_swallow(self):
|
def test_params_merging(self):
|
||||||
self.assertRaises(TypeError,
|
psycopg2.connect('dbname=foo', database='bar')
|
||||||
psycopg2.connect, 'dbname=foo', database='foo')
|
self.assertEqual(self.args[0], 'dbname=bar')
|
||||||
self.assertRaises(TypeError,
|
|
||||||
psycopg2.connect, 'dbname=foo', user='postgres')
|
psycopg2.connect('dbname=foo', user='postgres')
|
||||||
self.assertRaises(TypeError,
|
self.assertDsnEqual(self.args[0], 'dbname=foo user=postgres')
|
||||||
psycopg2.connect, 'dbname=foo', no_such_param='meh')
|
|
||||||
|
|
||||||
|
|
||||||
class ExceptionsTestCase(ConnectingTestCase):
|
class ExceptionsTestCase(ConnectingTestCase):
|
||||||
|
|
Loading…
Reference in New Issue
Block a user