Merge branch 'networking-improvement'

This commit is contained in:
Daniele Varrazzo 2016-10-11 04:55:09 +01:00
commit 51aa166d52
10 changed files with 281 additions and 15 deletions

3
NEWS
View File

@ -19,6 +19,9 @@ New features:
- The attributes `~connection.notices` and `~connection.notifies` can be
customized replacing them with any object exposing an `!append()` method
(:ticket:`#326`).
- Adapt network types to `ipaddress` objects when available. When not
enabled, convert arrays of network types to lists by default. The old `!Inet`
adapter is deprecated (:tickets:`#317, #343, #387`).
- Added `~psycopg2.extensions.quote_ident()` function (:ticket:`#359`).
- Added `~connection.get_dsn_parameters()` connection method (:ticket:`#364`).

View File

@ -62,7 +62,7 @@ except ImportError:
intersphinx_mapping = {
'py': ('http://docs.python.org/', None),
'py3': ('http://docs.python.org/3.2', None),
'py3': ('http://docs.python.org/3.4', None),
}
# Pattern to generate links to the bug tracker

View File

@ -930,12 +930,29 @@ UUID data type
.. index::
pair: INET; Data types
pair: CIDR; Data types
pair: MACADDR; Data types
:sql:`inet` data type
^^^^^^^^^^^^^^^^^^^^^^
.. _adapt-network:
.. versionadded:: 2.0.9
.. versionchanged:: 2.4.5 added inet array support.
Networking data types
^^^^^^^^^^^^^^^^^^^^^
By default Psycopg casts the PostgreSQL networking data types (:sql:`inet`,
:sql:`cidr`, :sql:`macaddr`) into ordinary strings; array of such types are
converted into lists of strings.
.. versionchanged:: 2.7
in previous version array of networking types were not treated as arrays.
.. autofunction:: register_ipaddress
.. autofunction:: register_inet
.. deprecated:: 2.7
this function will not receive further development and disappear in
future versions.
.. doctest::
@ -950,10 +967,11 @@ UUID data type
'192.168.0.1/24'
.. autofunction:: register_inet
.. autoclass:: Inet
.. deprecated:: 2.7
this object will not receive further development and may disappear in
future versions.
.. index::

View File

@ -264,7 +264,10 @@ types:
+--------------------+-------------------------+--------------------------+
| Anything\ |tm| | :sql:`json` | :ref:`adapt-json` |
+--------------------+-------------------------+--------------------------+
| `uuid` | :sql:`uuid` | :ref:`adapt-uuid` |
| `~uuid.UUID` | :sql:`uuid` | :ref:`adapt-uuid` |
+--------------------+-------------------------+--------------------------+
| `ipaddress` | | :sql:`inet` | :ref:`adapt-network` |
| objects | | :sql:`cidr` | |
+--------------------+-------------------------+--------------------------+
.. |tm| unicode:: U+2122

89
lib/_ipaddress.py Normal file
View File

@ -0,0 +1,89 @@
"""Implementation of the ipaddres-based network types adaptation
"""
# psycopg/_ipaddress.py - Ipaddres-based network types adaptation
#
# Copyright (C) 2016 Daniele Varrazzo <daniele.varrazzo@gmail.com>
#
# psycopg2 is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# In addition, as a special exception, the copyright holders give
# permission to link this program with the OpenSSL library (or with
# modified versions of OpenSSL that use the same license as OpenSSL),
# and distribute linked combinations including the two.
#
# You must obey the GNU Lesser General Public License in all respects for
# all of the code used other than OpenSSL.
#
# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
from psycopg2.extensions import (
new_type, new_array_type, register_type, register_adapter, QuotedString)
# The module is imported on register_ipaddress
ipaddress = None
# The typecasters are created only once
_casters = None
def register_ipaddress(conn_or_curs=None):
"""
Register conversion support between `ipaddress` objects and `network types`__.
:param conn_or_curs: the scope where to register the type casters.
If `!None` register them globally.
After the function is called, PostgreSQL :sql:`inet` values will be
converted into `~ipaddress.IPv4Interface` or `~ipaddress.IPv6Interface`
objects, :sql:`cidr` values into into `~ipaddress.IPv4Network` or
`~ipaddress.IPv6Network`.
.. __: https://www.postgresql.org/docs/current/static/datatype-net-types.html
"""
global ipaddress
import ipaddress
global _casters
if _casters is None:
_casters = _make_casters()
for c in _casters:
register_type(c, conn_or_curs)
for t in [ipaddress.IPv4Interface, ipaddress.IPv6Interface,
ipaddress.IPv4Network, ipaddress.IPv6Network]:
register_adapter(t, adapt_ipaddress)
def _make_casters():
inet = new_type((869,), 'INET', cast_interface)
ainet = new_array_type((1041,), 'INET[]', inet)
cidr = new_type((650,), 'CIDR', cast_network)
acidr = new_array_type((651,), 'CIDR[]', cidr)
return [inet, ainet, cidr, acidr]
def cast_interface(s, cur=None):
if s is None:
return None
# Py2 version force the use of unicode. meh.
return ipaddress.ip_interface(unicode(s))
def cast_network(s, cur=None):
if s is None:
return None
return ipaddress.ip_network(unicode(s))
def adapt_ipaddress(obj):
return QuotedString(str(obj))

View File

@ -59,6 +59,10 @@ from psycopg2._range import ( # noqa
register_range, RangeAdapter, RangeCaster)
# Expose ipaddress-related objects
from psycopg2._ipaddress import register_ipaddress # noqa
class DictCursorBase(_cursor):
"""Base class for all dict-like cursors."""
@ -686,6 +690,11 @@ def register_inet(oid=None, conn_or_curs=None):
:param conn_or_curs: where to register the typecaster. If not specified,
register it globally.
"""
import warnings
warnings.warn(
"the inet adapter is deprecated, it's not very useful",
DeprecationWarning)
if not oid:
oid1 = 869
oid2 = 1041

View File

@ -25,6 +25,9 @@ static long int typecast_DATEARRAY_types[] = {1182, 0};
static long int typecast_INTERVALARRAY_types[] = {1187, 0};
static long int typecast_BINARYARRAY_types[] = {1001, 0};
static long int typecast_ROWIDARRAY_types[] = {1028, 1013, 0};
static long int typecast_INETARRAY_types[] = {1041, 0};
static long int typecast_CIDRARRAY_types[] = {651, 0};
static long int typecast_MACADDRARRAY_types[] = {1040, 0};
static long int typecast_UNKNOWN_types[] = {705, 0};
@ -57,6 +60,9 @@ static typecastObject_initlist typecast_builtins[] = {
{"BINARYARRAY", typecast_BINARYARRAY_types, typecast_BINARYARRAY_cast, "BINARY"},
{"ROWIDARRAY", typecast_ROWIDARRAY_types, typecast_ROWIDARRAY_cast, "ROWID"},
{"UNKNOWN", typecast_UNKNOWN_types, typecast_UNKNOWN_cast, NULL},
{"INETARRAY", typecast_INETARRAY_types, typecast_STRINGARRAY_cast, "STRING"},
{"CIDRARRAY", typecast_CIDRARRAY_types, typecast_STRINGARRAY_cast, "STRING"},
{"MACADDRARRAY", typecast_MACADDRARRAY_types, typecast_STRINGARRAY_cast, "STRING"},
{NULL, NULL, NULL, NULL}
};

View File

@ -38,6 +38,7 @@ import test_dates
import test_errcodes
import test_extras_dictcursor
import test_green
import test_ipaddress
import test_lobject
import test_module
import test_notify
@ -46,11 +47,7 @@ import test_quote
import test_transaction
import test_types_basic
import test_types_extras
if sys.version_info[:2] >= (2, 5):
import test_with
else:
test_with = None
import test_with
def test_suite():
@ -78,6 +75,7 @@ def test_suite():
suite.addTest(test_errcodes.test_suite())
suite.addTest(test_extras_dictcursor.test_suite())
suite.addTest(test_green.test_suite())
suite.addTest(test_ipaddress.test_suite())
suite.addTest(test_lobject.test_suite())
suite.addTest(test_module.test_suite())
suite.addTest(test_notify.test_suite())
@ -86,8 +84,7 @@ def test_suite():
suite.addTest(test_transaction.test_suite())
suite.addTest(test_types_basic.test_suite())
suite.addTest(test_types_extras.test_suite())
if test_with:
suite.addTest(test_with.test_suite())
suite.addTest(test_with.test_suite())
return suite
if __name__ == '__main__':

131
tests/test_ipaddress.py Executable file
View File

@ -0,0 +1,131 @@
#!/usr/bin/env python
# # test_ipaddress.py - tests for ipaddress support #
# Copyright (C) 2016 Daniele Varrazzo <daniele.varrazzo@gmail.com>
#
# psycopg2 is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
from __future__ import unicode_literals
import sys
from functools import wraps
from testutils import unittest, ConnectingTestCase, decorate_all_tests
import psycopg2
import psycopg2.extras
def skip_if_no_ipaddress(f):
@wraps(f)
def skip_if_no_ipaddress_(self):
if sys.version_info[:2] < (3, 3):
try:
import ipaddress # noqa
except ImportError:
return self.skipTest("'ipaddress' module not available")
return f(self)
return skip_if_no_ipaddress_
class NetworkingTestCase(ConnectingTestCase):
def test_inet_cast(self):
import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
cur.execute("select null::inet")
self.assert_(cur.fetchone()[0] is None)
cur.execute("select '127.0.0.1/24'::inet")
obj = cur.fetchone()[0]
self.assert_(isinstance(obj, ip.IPv4Interface), repr(obj))
self.assertEquals(obj, ip.ip_interface('127.0.0.1/24'))
cur.execute("select '::ffff:102:300/128'::inet")
obj = cur.fetchone()[0]
self.assert_(isinstance(obj, ip.IPv6Interface), repr(obj))
self.assertEquals(obj, ip.ip_interface('::ffff:102:300/128'))
def test_inet_array_cast(self):
import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
cur.execute("select '{NULL,127.0.0.1,::ffff:102:300/128}'::inet[]")
l = cur.fetchone()[0]
self.assert_(l[0] is None)
self.assertEquals(l[1], ip.ip_interface('127.0.0.1'))
self.assertEquals(l[2], ip.ip_interface('::ffff:102:300/128'))
self.assert_(isinstance(l[1], ip.IPv4Interface), l)
self.assert_(isinstance(l[2], ip.IPv6Interface), l)
def test_inet_adapt(self):
import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
cur.execute("select %s", [ip.ip_interface('127.0.0.1/24')])
self.assertEquals(cur.fetchone()[0], '127.0.0.1/24')
cur.execute("select %s", [ip.ip_interface('::ffff:102:300/128')])
self.assertEquals(cur.fetchone()[0], '::ffff:102:300/128')
def test_cidr_cast(self):
import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
cur.execute("select null::cidr")
self.assert_(cur.fetchone()[0] is None)
cur.execute("select '127.0.0.0/24'::cidr")
obj = cur.fetchone()[0]
self.assert_(isinstance(obj, ip.IPv4Network), repr(obj))
self.assertEquals(obj, ip.ip_network('127.0.0.0/24'))
cur.execute("select '::ffff:102:300/128'::cidr")
obj = cur.fetchone()[0]
self.assert_(isinstance(obj, ip.IPv6Network), repr(obj))
self.assertEquals(obj, ip.ip_network('::ffff:102:300/128'))
def test_cidr_array_cast(self):
import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
cur.execute("select '{NULL,127.0.0.1,::ffff:102:300/128}'::cidr[]")
l = cur.fetchone()[0]
self.assert_(l[0] is None)
self.assertEquals(l[1], ip.ip_network('127.0.0.1'))
self.assertEquals(l[2], ip.ip_network('::ffff:102:300/128'))
self.assert_(isinstance(l[1], ip.IPv4Network), l)
self.assert_(isinstance(l[2], ip.IPv6Network), l)
def test_cidr_adapt(self):
import ipaddress as ip
cur = self.conn.cursor()
psycopg2.extras.register_ipaddress(cur)
cur.execute("select %s", [ip.ip_network('127.0.0.0/24')])
self.assertEquals(cur.fetchone()[0], '127.0.0.0/24')
cur.execute("select %s", [ip.ip_network('::ffff:102:300/128')])
self.assertEquals(cur.fetchone()[0], '::ffff:102:300/128')
decorate_all_tests(NetworkingTestCase, skip_if_no_ipaddress)
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)
if __name__ == "__main__":
unittest.main()

View File

@ -349,6 +349,16 @@ class TypesBasicTests(ConnectingTestCase):
a = self.execute("select '{1, 2, NULL}'::int4[]")
self.assertEqual(a, [2, 4, 'nada'])
@testutils.skip_before_postgres(8, 2)
def testNetworkArray(self):
# we don't know these types, but we know their arrays
a = self.execute("select '{192.168.0.1/24}'::inet[]")
self.assertEqual(a, ['192.168.0.1/24'])
a = self.execute("select '{192.168.0.0/24}'::cidr[]")
self.assertEqual(a, ['192.168.0.0/24'])
a = self.execute("select '{10:20:30:40:50:60}'::macaddr[]")
self.assertEqual(a, ['10:20:30:40:50:60'])
class AdaptSubclassTest(unittest.TestCase):
def test_adapt_subtype(self):