Merge branch 'async-keyword'

Close #495
This commit is contained in:
Daniele Varrazzo 2017-02-03 04:45:17 +00:00
commit 1911b250e3
18 changed files with 323 additions and 56 deletions

2
NEWS
View File

@ -35,6 +35,8 @@ Bug fixes:
- Fixed error caused by missing decoding `~psycopg2.extras.LoggingConnection`
(:ticket:`#483`).
- Added ``async_`` as an alias for ``async`` to support Python 3.7 where
``async`` will become a keyword (:ticket:`#495`).
Other changes:

View File

@ -706,9 +706,13 @@ The ``connection`` class
.. attribute:: async
async_
Read only attribute: 1 if the connection is asynchronous, 0 otherwise.
.. versionchanged:: 2.7 added the `!async_` alias for Python versions
where `!async` is a keyword.
.. method:: poll()

View File

@ -29,6 +29,8 @@ introspection etc.
For a complete description of the class, see `connection`.
.. versionchanged:: 2.7
*async_* can be used as alias for *async*.
.. class:: cursor(conn, name=None)

View File

@ -64,7 +64,8 @@ The module interface respects the standard defined in the |DBAPI|_.
cursors you can use this parameter instead of subclassing a connection.
Using *async*\=\ `!True` an asynchronous connection will be created: see
:ref:`async-support` to know about advantages and limitations.
:ref:`async-support` to know about advantages and limitations. *async_* is
a valid alias for the Python version where ``async`` is a keyword.
.. versionchanged:: 2.4.3
any keyword argument is passed to the connection. Previously only the
@ -76,6 +77,9 @@ The module interface respects the standard defined in the |DBAPI|_.
.. versionchanged:: 2.7
both *dsn* and keyword arguments can be specified.
.. versionchanged:: 2.7
added *async_* alias.
.. seealso::
- `~psycopg2.extensions.parse_dsn`

View File

@ -82,8 +82,7 @@ else:
del Decimal, Adapter
def connect(dsn=None, connection_factory=None, cursor_factory=None,
async=False, **kwargs):
def connect(dsn=None, connection_factory=None, cursor_factory=None, **kwargs):
"""
Create a new database connection.
@ -111,17 +110,24 @@ def connect(dsn=None, connection_factory=None, cursor_factory=None,
Using the *cursor_factory* parameter, a new default cursor factory will be
used by cursor().
Using *async*=True an asynchronous connection will be created.
Using *async*=True an asynchronous connection will be created. *async_* is
a valid alias (for Python versions where ``async`` is a keyword).
Any other keyword parameter will be passed to the underlying client
library: the list of supported parameters depends on the library version.
"""
kwasync = {}
if 'async' in kwargs:
kwasync['async'] = kwargs.pop('async')
if 'async_' in kwargs:
kwasync['async_'] = kwargs.pop('async_')
if dsn is None and not kwargs:
raise TypeError('missing dsn and no parameters')
dsn = _ext.make_dsn(dsn, **kwargs)
conn = _connect(dsn, connection_factory=connection_factory, async=async)
conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
if cursor_factory is not None:
conn.cursor_factory = cursor_factory

View File

@ -1040,6 +1040,8 @@ static struct PyMemberDef connectionObject_members[] = {
"The current connection string."},
{"async", T_LONG, offsetof(connectionObject, async), READONLY,
"True if the connection is asynchronous."},
{"async_", T_LONG, offsetof(connectionObject, async), READONLY,
"True if the connection is asynchronous."},
{"status", T_INT,
offsetof(connectionObject, status), READONLY,
"The current transaction status."},
@ -1186,12 +1188,14 @@ static int
connection_init(PyObject *obj, PyObject *args, PyObject *kwds)
{
const char *dsn;
long int async = 0;
static char *kwlist[] = {"dsn", "async", NULL};
long int async = 0, async_ = 0;
static char *kwlist[] = {"dsn", "async", "async_", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|l", kwlist, &dsn, &async))
if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|ll", kwlist,
&dsn, &async, &async_))
return -1;
if (async_) { async = async_; }
return connection_setup((connectionObject *)obj, dsn, async);
}

View File

@ -82,15 +82,17 @@ psyco_connect(PyObject *self, PyObject *args, PyObject *keywds)
PyObject *conn = NULL;
PyObject *factory = NULL;
const char *dsn = NULL;
int async = 0;
int async = 0, async_ = 0;
static char *kwlist[] = {"dsn", "connection_factory", "async", NULL};
static char *kwlist[] = {"dsn", "connection_factory", "async", "async_", NULL};
if (!PyArg_ParseTupleAndKeywords(args, keywds, "s|Oi", kwlist,
&dsn, &factory, &async)) {
if (!PyArg_ParseTupleAndKeywords(args, keywds, "s|Oii", kwlist,
&dsn, &factory, &async, &async_)) {
return NULL;
}
if (async_) { async = async_; }
Dprintf("psyco_connect: dsn = '%s', async = %d", dsn, async);
/* allocate connection, fill with errors and return it */

View File

@ -50,7 +50,11 @@ else:
# workaround subclass for ticket #153
pass
sys.path.insert(0, 'scripts')
# Configure distutils to run our custom 2to3 fixers as well
from lib2to3.refactor import get_fixers_from_package
build_py.fixer_names = [f for f in get_fixers_from_package('lib2to3.fixes')
# creates a pending deprecation warning on py 3.4
if not f.endswith('.fix_reload')]
try:
import configparser

View File

@ -22,6 +22,11 @@
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
# Convert warnings into errors here. We can't do it with -W because on
# Travis importing site raises a warning.
import warnings
warnings.simplefilter('error') # noqa
import sys
from testconfig import dsn
from testutils import unittest
@ -50,6 +55,9 @@ import test_types_basic
import test_types_extras
import test_with
if sys.version_info[:2] < (3, 6):
import test_async_keyword
def test_suite():
# If connection to test db fails, bail out early.
@ -65,6 +73,8 @@ def test_suite():
suite = unittest.TestSuite()
suite.addTest(test_async.test_suite())
if sys.version_info[:2] < (3, 6):
suite.addTest(test_async_keyword.test_suite())
suite.addTest(test_bugX000.test_suite())
suite.addTest(test_bug_gc.test_suite())
suite.addTest(test_cancel.test_suite())

View File

@ -55,7 +55,7 @@ class AsyncTests(ConnectingTestCase):
ConnectingTestCase.setUp(self)
self.sync_conn = self.conn
self.conn = self.connect(async=True)
self.conn = self.connect(async_=True)
self.wait(self.conn)
@ -71,8 +71,8 @@ class AsyncTests(ConnectingTestCase):
sync_cur = self.sync_conn.cursor()
del cur, sync_cur
self.assert_(self.conn.async)
self.assert_(not self.sync_conn.async)
self.assert_(self.conn.async_)
self.assert_(not self.sync_conn.async_)
# the async connection should be in isolevel 0
self.assertEquals(self.conn.isolation_level, 0)
@ -312,12 +312,12 @@ class AsyncTests(ConnectingTestCase):
def test_async_subclass(self):
class MyConn(psycopg2.extensions.connection):
def __init__(self, dsn, async=0):
psycopg2.extensions.connection.__init__(self, dsn, async=async)
def __init__(self, dsn, async_=0):
psycopg2.extensions.connection.__init__(self, dsn, async_=async_)
conn = self.connect(connection_factory=MyConn, async=True)
conn = self.connect(connection_factory=MyConn, async_=True)
self.assert_(isinstance(conn, MyConn))
self.assert_(conn.async)
self.assert_(conn.async_)
conn.close()
@slow
@ -441,7 +441,7 @@ class AsyncTests(ConnectingTestCase):
def test_async_connection_error_message(self):
try:
cnn = psycopg2.connect('dbname=thisdatabasedoesntexist', async=True)
cnn = psycopg2.connect('dbname=thisdatabasedoesntexist', async_=True)
self.wait(cnn)
except psycopg2.Error, e:
self.assertNotEqual(str(e), "asynchronous connection failed",

217
tests/test_async_keyword.py Executable file
View File

@ -0,0 +1,217 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# test_async_keyword.py - test for objects using 'async' as attribute/param
#
# Copyright (C) 2017 Daniele Varrazzo <daniele.varrazzo@gmail.com>
#
# psycopg2 is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# In addition, as a special exception, the copyright holders give
# permission to link this program with the OpenSSL library (or with
# modified versions of OpenSSL that use the same license as OpenSSL),
# and distribute linked combinations including the two.
#
# You must obey the GNU Lesser General Public License in all respects for
# all of the code used other than OpenSSL.
#
# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
import psycopg2
from psycopg2 import extras
from testconfig import dsn
from testutils import (ConnectingTestCase, unittest, skip_before_postgres,
assertDsnEqual)
from test_replication import ReplicationTestCase, skip_repl_if_green
from psycopg2.extras import LogicalReplicationConnection, StopReplication
class AsyncTests(ConnectingTestCase):
def setUp(self):
ConnectingTestCase.setUp(self)
self.sync_conn = self.conn
self.conn = self.connect(async=True)
self.wait(self.conn)
curs = self.conn.cursor()
curs.execute('''
CREATE TEMPORARY TABLE table1 (
id int PRIMARY KEY
)''')
self.wait(curs)
def test_connection_setup(self):
cur = self.conn.cursor()
sync_cur = self.sync_conn.cursor()
del cur, sync_cur
self.assert_(self.conn.async)
self.assert_(not self.sync_conn.async)
# the async connection should be in isolevel 0
self.assertEquals(self.conn.isolation_level, 0)
# check other properties to be found on the connection
self.assert_(self.conn.server_version)
self.assert_(self.conn.protocol_version in (2, 3))
self.assert_(self.conn.encoding in psycopg2.extensions.encodings)
def test_async_subclass(self):
class MyConn(psycopg2.extensions.connection):
def __init__(self, dsn, async=0):
psycopg2.extensions.connection.__init__(self, dsn, async=async)
conn = self.connect(connection_factory=MyConn, async=True)
self.assert_(isinstance(conn, MyConn))
self.assert_(conn.async)
conn.close()
def test_async_connection_error_message(self):
try:
cnn = psycopg2.connect('dbname=thisdatabasedoesntexist', async=True)
self.wait(cnn)
except psycopg2.Error, e:
self.assertNotEqual(str(e), "asynchronous connection failed",
"connection error reason lost")
else:
self.fail("no exception raised")
class CancelTests(ConnectingTestCase):
def setUp(self):
ConnectingTestCase.setUp(self)
cur = self.conn.cursor()
cur.execute('''
CREATE TEMPORARY TABLE table1 (
id int PRIMARY KEY
)''')
self.conn.commit()
@skip_before_postgres(8, 2)
def test_async_cancel(self):
async_conn = psycopg2.connect(dsn, async=True)
self.assertRaises(psycopg2.OperationalError, async_conn.cancel)
extras.wait_select(async_conn)
cur = async_conn.cursor()
cur.execute("select pg_sleep(10000)")
self.assertTrue(async_conn.isexecuting())
async_conn.cancel()
self.assertRaises(psycopg2.extensions.QueryCanceledError,
extras.wait_select, async_conn)
cur.execute("select 1")
extras.wait_select(async_conn)
self.assertEqual(cur.fetchall(), [(1, )])
def test_async_connection_cancel(self):
async_conn = psycopg2.connect(dsn, async=True)
async_conn.close()
self.assertTrue(async_conn.closed)
class ConnectTestCase(unittest.TestCase):
def setUp(self):
self.args = None
def connect_stub(dsn, connection_factory=None, async=False):
self.args = (dsn, connection_factory, async)
self._connect_orig = psycopg2._connect
psycopg2._connect = connect_stub
def tearDown(self):
psycopg2._connect = self._connect_orig
def test_there_has_to_be_something(self):
self.assertRaises(TypeError, psycopg2.connect)
self.assertRaises(TypeError, psycopg2.connect,
connection_factory=lambda dsn, async=False: None)
self.assertRaises(TypeError, psycopg2.connect,
async=True)
def test_factory(self):
def f(dsn, async=False):
pass
psycopg2.connect(database='foo', host='baz', connection_factory=f)
assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], f)
self.assertEqual(self.args[2], False)
psycopg2.connect("dbname=foo host=baz", connection_factory=f)
assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], f)
self.assertEqual(self.args[2], False)
def test_async(self):
psycopg2.connect(database='foo', host='baz', async=1)
assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], None)
self.assert_(self.args[2])
psycopg2.connect("dbname=foo host=baz", async=True)
assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], None)
self.assert_(self.args[2])
class AsyncReplicationTest(ReplicationTestCase):
@skip_before_postgres(9, 4) # slots require 9.4
@skip_repl_if_green
def test_async_replication(self):
conn = self.repl_connect(
connection_factory=LogicalReplicationConnection, async=1)
if conn is None:
return
cur = conn.cursor()
self.create_replication_slot(cur, output_plugin='test_decoding')
self.wait(cur)
cur.start_replication(self.slot)
self.wait(cur)
self.make_replication_events()
self.msg_count = 0
def consume(msg):
# just check the methods
"%s: %s" % (cur.io_timestamp, repr(msg))
self.msg_count += 1
if self.msg_count > 3:
cur.send_feedback(reply=True)
raise StopReplication()
cur.send_feedback(flush_lsn=msg.data_start)
# cannot be used in asynchronous mode
self.assertRaises(psycopg2.ProgrammingError, cur.consume_stream, consume)
def process_stream():
from select import select
while True:
msg = cur.read_message()
if msg:
consume(msg)
else:
select([cur], [], [])
self.assertRaises(StopReplication, process_stream)
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)
if __name__ == "__main__":
unittest.main()

View File

@ -88,7 +88,7 @@ class CancelTests(ConnectingTestCase):
@skip_before_postgres(8, 2)
def test_async_cancel(self):
async_conn = psycopg2.connect(dsn, async=True)
async_conn = psycopg2.connect(dsn, async_=True)
self.assertRaises(psycopg2.OperationalError, async_conn.cancel)
extras.wait_select(async_conn)
cur = async_conn.cursor()
@ -102,7 +102,7 @@ class CancelTests(ConnectingTestCase):
self.assertEqual(cur.fetchall(), [(1, )])
def test_async_connection_cancel(self):
async_conn = psycopg2.connect(dsn, async=True)
async_conn = psycopg2.connect(dsn, async_=True)
async_conn.close()
self.assertTrue(async_conn.closed)

View File

@ -33,7 +33,7 @@ import psycopg2.errorcodes
from psycopg2 import extensions as ext
from testutils import (
unittest, decorate_all_tests, skip_if_no_superuser,
unittest, assertDsnEqual, decorate_all_tests, skip_if_no_superuser,
skip_before_postgres, skip_after_postgres, skip_before_libpq,
ConnectingTestCase, skip_if_tpc_disabled, skip_if_windows, slow)
@ -396,9 +396,6 @@ class ParseDsnTestCase(ConnectingTestCase):
class MakeDsnTestCase(ConnectingTestCase):
def assertDsnEqual(self, dsn1, dsn2):
self.assertEqual(set(dsn1.split()), set(dsn2.split()))
def test_empty_arguments(self):
self.assertEqual(ext.make_dsn(), '')
@ -416,7 +413,7 @@ class MakeDsnTestCase(ConnectingTestCase):
def test_empty_param(self):
dsn = ext.make_dsn(dbname='sony', password='')
self.assertDsnEqual(dsn, "dbname=sony password=''")
assertDsnEqual(self, dsn, "dbname=sony password=''")
def test_escape(self):
dsn = ext.make_dsn(dbname='hello world')
@ -439,10 +436,10 @@ class MakeDsnTestCase(ConnectingTestCase):
def test_params_merging(self):
dsn = ext.make_dsn('dbname=foo host=bar', host='baz')
self.assertDsnEqual(dsn, 'dbname=foo host=baz')
assertDsnEqual(self, dsn, 'dbname=foo host=baz')
dsn = ext.make_dsn('dbname=foo', user='postgres')
self.assertDsnEqual(dsn, 'dbname=foo user=postgres')
assertDsnEqual(self, dsn, 'dbname=foo user=postgres')
def test_no_dsn_munging(self):
dsnin = 'dbname=a host=b user=c password=d'
@ -456,7 +453,7 @@ class MakeDsnTestCase(ConnectingTestCase):
self.assertEqual(dsn, url)
dsn = ext.make_dsn(url, application_name='woot')
self.assertDsnEqual(dsn,
assertDsnEqual(self, dsn,
'dbname=test user=tester password=secret application_name=woot')
self.assertRaises(psycopg2.ProgrammingError,

View File

@ -27,6 +27,9 @@ from testutils import unittest, ConnectingTestCase, slow
try:
reload
except NameError:
try:
from importlib import reload
except ImportError:
from imp import reload
from threading import Thread

View File

@ -27,7 +27,7 @@ import sys
from subprocess import Popen
from testutils import (unittest, skip_before_python, skip_before_postgres,
ConnectingTestCase, skip_copy_if_green, script_to_py3, slow)
ConnectingTestCase, skip_copy_if_green, script_to_py3, assertDsnEqual, slow)
import psycopg2
@ -36,24 +36,21 @@ class ConnectTestCase(unittest.TestCase):
def setUp(self):
self.args = None
def conect_stub(dsn, connection_factory=None, async=False):
self.args = (dsn, connection_factory, async)
def connect_stub(dsn, connection_factory=None, async_=False):
self.args = (dsn, connection_factory, async_)
self._connect_orig = psycopg2._connect
psycopg2._connect = conect_stub
psycopg2._connect = connect_stub
def tearDown(self):
psycopg2._connect = self._connect_orig
def assertDsnEqual(self, dsn1, dsn2):
self.assertEqual(set(dsn1.split()), set(dsn2.split()))
def test_there_has_to_be_something(self):
self.assertRaises(TypeError, psycopg2.connect)
self.assertRaises(TypeError, psycopg2.connect,
connection_factory=lambda dsn, async=False: None)
connection_factory=lambda dsn, async_=False: None)
self.assertRaises(TypeError, psycopg2.connect,
async=True)
async_=True)
def test_no_keywords(self):
psycopg2.connect('')
@ -92,27 +89,27 @@ class ConnectTestCase(unittest.TestCase):
self.assertEqual(self.args[0], 'options=stuff')
def test_factory(self):
def f(dsn, async=False):
def f(dsn, async_=False):
pass
psycopg2.connect(database='foo', host='baz', connection_factory=f)
self.assertDsnEqual(self.args[0], 'dbname=foo host=baz')
assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], f)
self.assertEqual(self.args[2], False)
psycopg2.connect("dbname=foo host=baz", connection_factory=f)
self.assertDsnEqual(self.args[0], 'dbname=foo host=baz')
assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], f)
self.assertEqual(self.args[2], False)
def test_async(self):
psycopg2.connect(database='foo', host='baz', async=1)
self.assertDsnEqual(self.args[0], 'dbname=foo host=baz')
psycopg2.connect(database='foo', host='baz', async_=1)
assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], None)
self.assert_(self.args[2])
psycopg2.connect("dbname=foo host=baz", async=True)
self.assertDsnEqual(self.args[0], 'dbname=foo host=baz')
psycopg2.connect("dbname=foo host=baz", async_=True)
assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], None)
self.assert_(self.args[2])
@ -124,7 +121,7 @@ class ConnectTestCase(unittest.TestCase):
def test_empty_param(self):
psycopg2.connect(database='sony', password='')
self.assertDsnEqual(self.args[0], "dbname=sony password=''")
assertDsnEqual(self, self.args[0], "dbname=sony password=''")
def test_escape(self):
psycopg2.connect(database='hello world')
@ -147,7 +144,7 @@ class ConnectTestCase(unittest.TestCase):
self.assertEqual(self.args[0], 'dbname=bar')
psycopg2.connect('dbname=foo', user='postgres')
self.assertDsnEqual(self.args[0], 'dbname=foo user=postgres')
assertDsnEqual(self, self.args[0], 'dbname=foo user=postgres')
class ExceptionsTestCase(ConnectingTestCase):

View File

@ -183,7 +183,7 @@ class AsyncReplicationTest(ReplicationTestCase):
@skip_repl_if_green
def test_async_replication(self):
conn = self.repl_connect(
connection_factory=LogicalReplicationConnection, async=1)
connection_factory=LogicalReplicationConnection, async_=1)
if conn is None:
return

View File

@ -17,6 +17,7 @@ from __future__ import with_statement
import re
import sys
import warnings
from decimal import Decimal
from datetime import date, datetime
from functools import wraps
@ -76,7 +77,10 @@ class TypesExtrasTests(ConnectingTestCase):
self.failUnless(type(s) == list and len(s) == 0)
def testINET(self):
with warnings.catch_warnings():
warnings.simplefilter('ignore', DeprecationWarning)
psycopg2.extras.register_inet()
i = psycopg2.extras.Inet("192.168.1.0/24")
s = self.execute("SELECT %s AS foo", (i,))
self.failUnless(i.addr == s.addr)
@ -85,7 +89,10 @@ class TypesExtrasTests(ConnectingTestCase):
self.failUnless(s is None)
def testINETARRAY(self):
with warnings.catch_warnings():
warnings.simplefilter('ignore', DeprecationWarning)
psycopg2.extras.register_inet()
i = psycopg2.extras.Inet("192.168.1.0/24")
s = self.execute("SELECT %s AS foo", ([i],))
self.failUnless(i.addr == s[0].addr)

View File

@ -50,6 +50,8 @@ else:
@wraps(f)
def skipIf__(self):
if cond:
with warnings.catch_warnings():
warnings.simplefilter('always', UserWarning)
warnings.warn(msg)
return
else:
@ -61,6 +63,8 @@ else:
return skipIf(True, msg)
def skipTest(self, msg):
with warnings.catch_warnings():
warnings.simplefilter('always', UserWarning)
warnings.warn(msg)
return
@ -130,7 +134,7 @@ class ConnectingTestCase(unittest.TestCase):
import psycopg2
try:
conn = self.connect(**kwargs)
if conn.async == 1:
if conn.async_ == 1:
self.wait(conn)
except psycopg2.OperationalError, e:
# If pgcode is not set it is a genuine connection error
@ -469,3 +473,7 @@ def slow(f):
return self.skipTest("slow test")
return f(self)
return slow_
def assertDsnEqual(testsuite, dsn1, dsn2):
testsuite.assertEqual(set(dsn1.split()), set(dsn2.split()))