Added async_ as an alias for async

Added in argument for psycopg2.connect() and connection.__init__, and
for the connection.async attribute.
This commit is contained in:
Daniele Varrazzo 2017-02-03 04:28:27 +00:00
parent 8baf6aa372
commit ce9be69615
11 changed files with 282 additions and 50 deletions

View File

@ -82,8 +82,7 @@ else:
del Decimal, Adapter del Decimal, Adapter
def connect(dsn=None, connection_factory=None, cursor_factory=None, def connect(dsn=None, connection_factory=None, cursor_factory=None, **kwargs):
async=False, **kwargs):
""" """
Create a new database connection. Create a new database connection.
@ -111,17 +110,24 @@ def connect(dsn=None, connection_factory=None, cursor_factory=None,
Using the *cursor_factory* parameter, a new default cursor factory will be Using the *cursor_factory* parameter, a new default cursor factory will be
used by cursor(). used by cursor().
Using *async*=True an asynchronous connection will be created. Using *async*=True an asynchronous connection will be created. *async_* is
a valid alias (for Python versions where ``async`` is a keyword).
Any other keyword parameter will be passed to the underlying client Any other keyword parameter will be passed to the underlying client
library: the list of supported parameters depends on the library version. library: the list of supported parameters depends on the library version.
""" """
kwasync = {}
if 'async' in kwargs:
kwasync['async'] = kwargs.pop('async')
if 'async_' in kwargs:
kwasync['async_'] = kwargs.pop('async_')
if dsn is None and not kwargs: if dsn is None and not kwargs:
raise TypeError('missing dsn and no parameters') raise TypeError('missing dsn and no parameters')
dsn = _ext.make_dsn(dsn, **kwargs) dsn = _ext.make_dsn(dsn, **kwargs)
conn = _connect(dsn, connection_factory=connection_factory, async=async) conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
if cursor_factory is not None: if cursor_factory is not None:
conn.cursor_factory = cursor_factory conn.cursor_factory = cursor_factory

View File

@ -1040,6 +1040,8 @@ static struct PyMemberDef connectionObject_members[] = {
"The current connection string."}, "The current connection string."},
{"async", T_LONG, offsetof(connectionObject, async), READONLY, {"async", T_LONG, offsetof(connectionObject, async), READONLY,
"True if the connection is asynchronous."}, "True if the connection is asynchronous."},
{"async_", T_LONG, offsetof(connectionObject, async), READONLY,
"True if the connection is asynchronous."},
{"status", T_INT, {"status", T_INT,
offsetof(connectionObject, status), READONLY, offsetof(connectionObject, status), READONLY,
"The current transaction status."}, "The current transaction status."},
@ -1186,12 +1188,14 @@ static int
connection_init(PyObject *obj, PyObject *args, PyObject *kwds) connection_init(PyObject *obj, PyObject *args, PyObject *kwds)
{ {
const char *dsn; const char *dsn;
long int async = 0; long int async = 0, async_ = 0;
static char *kwlist[] = {"dsn", "async", NULL}; static char *kwlist[] = {"dsn", "async", "async_", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|l", kwlist, &dsn, &async)) if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|ll", kwlist,
&dsn, &async, &async_))
return -1; return -1;
if (async_) { async = async_; }
return connection_setup((connectionObject *)obj, dsn, async); return connection_setup((connectionObject *)obj, dsn, async);
} }

View File

@ -82,15 +82,17 @@ psyco_connect(PyObject *self, PyObject *args, PyObject *keywds)
PyObject *conn = NULL; PyObject *conn = NULL;
PyObject *factory = NULL; PyObject *factory = NULL;
const char *dsn = NULL; const char *dsn = NULL;
int async = 0; int async = 0, async_ = 0;
static char *kwlist[] = {"dsn", "connection_factory", "async", NULL}; static char *kwlist[] = {"dsn", "connection_factory", "async", "async_", NULL};
if (!PyArg_ParseTupleAndKeywords(args, keywds, "s|Oi", kwlist, if (!PyArg_ParseTupleAndKeywords(args, keywds, "s|Oii", kwlist,
&dsn, &factory, &async)) { &dsn, &factory, &async, &async_)) {
return NULL; return NULL;
} }
if (async_) { async = async_; }
Dprintf("psyco_connect: dsn = '%s', async = %d", dsn, async); Dprintf("psyco_connect: dsn = '%s', async = %d", dsn, async);
/* allocate connection, fill with errors and return it */ /* allocate connection, fill with errors and return it */

View File

@ -54,6 +54,9 @@ import test_types_basic
import test_types_extras import test_types_extras
import test_with import test_with
if sys.version_info[:2] < (3, 6):
import test_async_keyword
def test_suite(): def test_suite():
# If connection to test db fails, bail out early. # If connection to test db fails, bail out early.
@ -69,6 +72,8 @@ def test_suite():
suite = unittest.TestSuite() suite = unittest.TestSuite()
suite.addTest(test_async.test_suite()) suite.addTest(test_async.test_suite())
if sys.version_info[:2] < (3, 6):
suite.addTest(test_async_keyword.test_suite())
suite.addTest(test_bugX000.test_suite()) suite.addTest(test_bugX000.test_suite())
suite.addTest(test_bug_gc.test_suite()) suite.addTest(test_bug_gc.test_suite())
suite.addTest(test_cancel.test_suite()) suite.addTest(test_cancel.test_suite())

View File

@ -55,7 +55,7 @@ class AsyncTests(ConnectingTestCase):
ConnectingTestCase.setUp(self) ConnectingTestCase.setUp(self)
self.sync_conn = self.conn self.sync_conn = self.conn
self.conn = self.connect(async=True) self.conn = self.connect(async_=True)
self.wait(self.conn) self.wait(self.conn)
@ -71,8 +71,8 @@ class AsyncTests(ConnectingTestCase):
sync_cur = self.sync_conn.cursor() sync_cur = self.sync_conn.cursor()
del cur, sync_cur del cur, sync_cur
self.assert_(self.conn.async) self.assert_(self.conn.async_)
self.assert_(not self.sync_conn.async) self.assert_(not self.sync_conn.async_)
# the async connection should be in isolevel 0 # the async connection should be in isolevel 0
self.assertEquals(self.conn.isolation_level, 0) self.assertEquals(self.conn.isolation_level, 0)
@ -310,12 +310,12 @@ class AsyncTests(ConnectingTestCase):
def test_async_subclass(self): def test_async_subclass(self):
class MyConn(psycopg2.extensions.connection): class MyConn(psycopg2.extensions.connection):
def __init__(self, dsn, async=0): def __init__(self, dsn, async_=0):
psycopg2.extensions.connection.__init__(self, dsn, async=async) psycopg2.extensions.connection.__init__(self, dsn, async_=async_)
conn = self.connect(connection_factory=MyConn, async=True) conn = self.connect(connection_factory=MyConn, async_=True)
self.assert_(isinstance(conn, MyConn)) self.assert_(isinstance(conn, MyConn))
self.assert_(conn.async) self.assert_(conn.async_)
conn.close() conn.close()
def test_flush_on_write(self): def test_flush_on_write(self):
@ -438,7 +438,7 @@ class AsyncTests(ConnectingTestCase):
def test_async_connection_error_message(self): def test_async_connection_error_message(self):
try: try:
cnn = psycopg2.connect('dbname=thisdatabasedoesntexist', async=True) cnn = psycopg2.connect('dbname=thisdatabasedoesntexist', async_=True)
self.wait(cnn) self.wait(cnn)
except psycopg2.Error, e: except psycopg2.Error, e:
self.assertNotEqual(str(e), "asynchronous connection failed", self.assertNotEqual(str(e), "asynchronous connection failed",

217
tests/test_async_keyword.py Executable file
View File

@ -0,0 +1,217 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# test_async_keyword.py - test for objects using 'async' as attribute/param
#
# Copyright (C) 2017 Daniele Varrazzo <daniele.varrazzo@gmail.com>
#
# psycopg2 is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# In addition, as a special exception, the copyright holders give
# permission to link this program with the OpenSSL library (or with
# modified versions of OpenSSL that use the same license as OpenSSL),
# and distribute linked combinations including the two.
#
# You must obey the GNU Lesser General Public License in all respects for
# all of the code used other than OpenSSL.
#
# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
import psycopg2
from psycopg2 import extras
from testconfig import dsn
from testutils import (ConnectingTestCase, unittest, skip_before_postgres,
assertDsnEqual)
from test_replication import ReplicationTestCase, skip_repl_if_green
from psycopg2.extras import LogicalReplicationConnection, StopReplication
class AsyncTests(ConnectingTestCase):
def setUp(self):
ConnectingTestCase.setUp(self)
self.sync_conn = self.conn
self.conn = self.connect(async=True)
self.wait(self.conn)
curs = self.conn.cursor()
curs.execute('''
CREATE TEMPORARY TABLE table1 (
id int PRIMARY KEY
)''')
self.wait(curs)
def test_connection_setup(self):
cur = self.conn.cursor()
sync_cur = self.sync_conn.cursor()
del cur, sync_cur
self.assert_(self.conn.async)
self.assert_(not self.sync_conn.async)
# the async connection should be in isolevel 0
self.assertEquals(self.conn.isolation_level, 0)
# check other properties to be found on the connection
self.assert_(self.conn.server_version)
self.assert_(self.conn.protocol_version in (2, 3))
self.assert_(self.conn.encoding in psycopg2.extensions.encodings)
def test_async_subclass(self):
class MyConn(psycopg2.extensions.connection):
def __init__(self, dsn, async=0):
psycopg2.extensions.connection.__init__(self, dsn, async=async)
conn = self.connect(connection_factory=MyConn, async=True)
self.assert_(isinstance(conn, MyConn))
self.assert_(conn.async)
conn.close()
def test_async_connection_error_message(self):
try:
cnn = psycopg2.connect('dbname=thisdatabasedoesntexist', async=True)
self.wait(cnn)
except psycopg2.Error, e:
self.assertNotEqual(str(e), "asynchronous connection failed",
"connection error reason lost")
else:
self.fail("no exception raised")
class CancelTests(ConnectingTestCase):
def setUp(self):
ConnectingTestCase.setUp(self)
cur = self.conn.cursor()
cur.execute('''
CREATE TEMPORARY TABLE table1 (
id int PRIMARY KEY
)''')
self.conn.commit()
@skip_before_postgres(8, 2)
def test_async_cancel(self):
async_conn = psycopg2.connect(dsn, async=True)
self.assertRaises(psycopg2.OperationalError, async_conn.cancel)
extras.wait_select(async_conn)
cur = async_conn.cursor()
cur.execute("select pg_sleep(10000)")
self.assertTrue(async_conn.isexecuting())
async_conn.cancel()
self.assertRaises(psycopg2.extensions.QueryCanceledError,
extras.wait_select, async_conn)
cur.execute("select 1")
extras.wait_select(async_conn)
self.assertEqual(cur.fetchall(), [(1, )])
def test_async_connection_cancel(self):
async_conn = psycopg2.connect(dsn, async=True)
async_conn.close()
self.assertTrue(async_conn.closed)
class ConnectTestCase(unittest.TestCase):
def setUp(self):
self.args = None
def connect_stub(dsn, connection_factory=None, async=False):
self.args = (dsn, connection_factory, async)
self._connect_orig = psycopg2._connect
psycopg2._connect = connect_stub
def tearDown(self):
psycopg2._connect = self._connect_orig
def test_there_has_to_be_something(self):
self.assertRaises(TypeError, psycopg2.connect)
self.assertRaises(TypeError, psycopg2.connect,
connection_factory=lambda dsn, async=False: None)
self.assertRaises(TypeError, psycopg2.connect,
async=True)
def test_factory(self):
def f(dsn, async=False):
pass
psycopg2.connect(database='foo', host='baz', connection_factory=f)
assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], f)
self.assertEqual(self.args[2], False)
psycopg2.connect("dbname=foo host=baz", connection_factory=f)
assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], f)
self.assertEqual(self.args[2], False)
def test_async(self):
psycopg2.connect(database='foo', host='baz', async=1)
assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], None)
self.assert_(self.args[2])
psycopg2.connect("dbname=foo host=baz", async=True)
assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], None)
self.assert_(self.args[2])
class AsyncReplicationTest(ReplicationTestCase):
@skip_before_postgres(9, 4) # slots require 9.4
@skip_repl_if_green
def test_async_replication(self):
conn = self.repl_connect(
connection_factory=LogicalReplicationConnection, async=1)
if conn is None:
return
cur = conn.cursor()
self.create_replication_slot(cur, output_plugin='test_decoding')
self.wait(cur)
cur.start_replication(self.slot)
self.wait(cur)
self.make_replication_events()
self.msg_count = 0
def consume(msg):
# just check the methods
"%s: %s" % (cur.io_timestamp, repr(msg))
self.msg_count += 1
if self.msg_count > 3:
cur.send_feedback(reply=True)
raise StopReplication()
cur.send_feedback(flush_lsn=msg.data_start)
# cannot be used in asynchronous mode
self.assertRaises(psycopg2.ProgrammingError, cur.consume_stream, consume)
def process_stream():
from select import select
while True:
msg = cur.read_message()
if msg:
consume(msg)
else:
select([cur], [], [])
self.assertRaises(StopReplication, process_stream)
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)
if __name__ == "__main__":
unittest.main()

View File

@ -87,7 +87,7 @@ class CancelTests(ConnectingTestCase):
@skip_before_postgres(8, 2) @skip_before_postgres(8, 2)
def test_async_cancel(self): def test_async_cancel(self):
async_conn = psycopg2.connect(dsn, async=True) async_conn = psycopg2.connect(dsn, async_=True)
self.assertRaises(psycopg2.OperationalError, async_conn.cancel) self.assertRaises(psycopg2.OperationalError, async_conn.cancel)
extras.wait_select(async_conn) extras.wait_select(async_conn)
cur = async_conn.cursor() cur = async_conn.cursor()
@ -101,7 +101,7 @@ class CancelTests(ConnectingTestCase):
self.assertEqual(cur.fetchall(), [(1, )]) self.assertEqual(cur.fetchall(), [(1, )])
def test_async_connection_cancel(self): def test_async_connection_cancel(self):
async_conn = psycopg2.connect(dsn, async=True) async_conn = psycopg2.connect(dsn, async_=True)
async_conn.close() async_conn.close()
self.assertTrue(async_conn.closed) self.assertTrue(async_conn.closed)

View File

@ -33,7 +33,7 @@ import psycopg2.errorcodes
from psycopg2 import extensions as ext from psycopg2 import extensions as ext
from testutils import ( from testutils import (
unittest, decorate_all_tests, skip_if_no_superuser, unittest, assertDsnEqual, decorate_all_tests, skip_if_no_superuser,
skip_before_postgres, skip_after_postgres, skip_before_libpq, skip_before_postgres, skip_after_postgres, skip_before_libpq,
ConnectingTestCase, skip_if_tpc_disabled, skip_if_windows) ConnectingTestCase, skip_if_tpc_disabled, skip_if_windows)
@ -392,9 +392,6 @@ class ParseDsnTestCase(ConnectingTestCase):
class MakeDsnTestCase(ConnectingTestCase): class MakeDsnTestCase(ConnectingTestCase):
def assertDsnEqual(self, dsn1, dsn2):
self.assertEqual(set(dsn1.split()), set(dsn2.split()))
def test_empty_arguments(self): def test_empty_arguments(self):
self.assertEqual(ext.make_dsn(), '') self.assertEqual(ext.make_dsn(), '')
@ -412,7 +409,7 @@ class MakeDsnTestCase(ConnectingTestCase):
def test_empty_param(self): def test_empty_param(self):
dsn = ext.make_dsn(dbname='sony', password='') dsn = ext.make_dsn(dbname='sony', password='')
self.assertDsnEqual(dsn, "dbname=sony password=''") assertDsnEqual(self, dsn, "dbname=sony password=''")
def test_escape(self): def test_escape(self):
dsn = ext.make_dsn(dbname='hello world') dsn = ext.make_dsn(dbname='hello world')
@ -435,10 +432,10 @@ class MakeDsnTestCase(ConnectingTestCase):
def test_params_merging(self): def test_params_merging(self):
dsn = ext.make_dsn('dbname=foo host=bar', host='baz') dsn = ext.make_dsn('dbname=foo host=bar', host='baz')
self.assertDsnEqual(dsn, 'dbname=foo host=baz') assertDsnEqual(self, dsn, 'dbname=foo host=baz')
dsn = ext.make_dsn('dbname=foo', user='postgres') dsn = ext.make_dsn('dbname=foo', user='postgres')
self.assertDsnEqual(dsn, 'dbname=foo user=postgres') assertDsnEqual(self, dsn, 'dbname=foo user=postgres')
def test_no_dsn_munging(self): def test_no_dsn_munging(self):
dsnin = 'dbname=a host=b user=c password=d' dsnin = 'dbname=a host=b user=c password=d'
@ -452,7 +449,7 @@ class MakeDsnTestCase(ConnectingTestCase):
self.assertEqual(dsn, url) self.assertEqual(dsn, url)
dsn = ext.make_dsn(url, application_name='woot') dsn = ext.make_dsn(url, application_name='woot')
self.assertDsnEqual(dsn, assertDsnEqual(self, dsn,
'dbname=test user=tester password=secret application_name=woot') 'dbname=test user=tester password=secret application_name=woot')
self.assertRaises(psycopg2.ProgrammingError, self.assertRaises(psycopg2.ProgrammingError,

View File

@ -26,8 +26,8 @@ import os
import sys import sys
from subprocess import Popen from subprocess import Popen
from testutils import unittest, skip_before_python, skip_before_postgres from testutils import (unittest, skip_before_python, skip_before_postgres,
from testutils import ConnectingTestCase, skip_copy_if_green, script_to_py3 ConnectingTestCase, skip_copy_if_green, script_to_py3, assertDsnEqual)
import psycopg2 import psycopg2
@ -36,24 +36,21 @@ class ConnectTestCase(unittest.TestCase):
def setUp(self): def setUp(self):
self.args = None self.args = None
def conect_stub(dsn, connection_factory=None, async=False): def connect_stub(dsn, connection_factory=None, async_=False):
self.args = (dsn, connection_factory, async) self.args = (dsn, connection_factory, async_)
self._connect_orig = psycopg2._connect self._connect_orig = psycopg2._connect
psycopg2._connect = conect_stub psycopg2._connect = connect_stub
def tearDown(self): def tearDown(self):
psycopg2._connect = self._connect_orig psycopg2._connect = self._connect_orig
def assertDsnEqual(self, dsn1, dsn2):
self.assertEqual(set(dsn1.split()), set(dsn2.split()))
def test_there_has_to_be_something(self): def test_there_has_to_be_something(self):
self.assertRaises(TypeError, psycopg2.connect) self.assertRaises(TypeError, psycopg2.connect)
self.assertRaises(TypeError, psycopg2.connect, self.assertRaises(TypeError, psycopg2.connect,
connection_factory=lambda dsn, async=False: None) connection_factory=lambda dsn, async_=False: None)
self.assertRaises(TypeError, psycopg2.connect, self.assertRaises(TypeError, psycopg2.connect,
async=True) async_=True)
def test_no_keywords(self): def test_no_keywords(self):
psycopg2.connect('') psycopg2.connect('')
@ -92,27 +89,27 @@ class ConnectTestCase(unittest.TestCase):
self.assertEqual(self.args[0], 'options=stuff') self.assertEqual(self.args[0], 'options=stuff')
def test_factory(self): def test_factory(self):
def f(dsn, async=False): def f(dsn, async_=False):
pass pass
psycopg2.connect(database='foo', host='baz', connection_factory=f) psycopg2.connect(database='foo', host='baz', connection_factory=f)
self.assertDsnEqual(self.args[0], 'dbname=foo host=baz') assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], f) self.assertEqual(self.args[1], f)
self.assertEqual(self.args[2], False) self.assertEqual(self.args[2], False)
psycopg2.connect("dbname=foo host=baz", connection_factory=f) psycopg2.connect("dbname=foo host=baz", connection_factory=f)
self.assertDsnEqual(self.args[0], 'dbname=foo host=baz') assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], f) self.assertEqual(self.args[1], f)
self.assertEqual(self.args[2], False) self.assertEqual(self.args[2], False)
def test_async(self): def test_async(self):
psycopg2.connect(database='foo', host='baz', async=1) psycopg2.connect(database='foo', host='baz', async_=1)
self.assertDsnEqual(self.args[0], 'dbname=foo host=baz') assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], None) self.assertEqual(self.args[1], None)
self.assert_(self.args[2]) self.assert_(self.args[2])
psycopg2.connect("dbname=foo host=baz", async=True) psycopg2.connect("dbname=foo host=baz", async_=True)
self.assertDsnEqual(self.args[0], 'dbname=foo host=baz') assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], None) self.assertEqual(self.args[1], None)
self.assert_(self.args[2]) self.assert_(self.args[2])
@ -124,7 +121,7 @@ class ConnectTestCase(unittest.TestCase):
def test_empty_param(self): def test_empty_param(self):
psycopg2.connect(database='sony', password='') psycopg2.connect(database='sony', password='')
self.assertDsnEqual(self.args[0], "dbname=sony password=''") assertDsnEqual(self, self.args[0], "dbname=sony password=''")
def test_escape(self): def test_escape(self):
psycopg2.connect(database='hello world') psycopg2.connect(database='hello world')
@ -147,7 +144,7 @@ class ConnectTestCase(unittest.TestCase):
self.assertEqual(self.args[0], 'dbname=bar') self.assertEqual(self.args[0], 'dbname=bar')
psycopg2.connect('dbname=foo', user='postgres') psycopg2.connect('dbname=foo', user='postgres')
self.assertDsnEqual(self.args[0], 'dbname=foo user=postgres') assertDsnEqual(self, self.args[0], 'dbname=foo user=postgres')
class ExceptionsTestCase(ConnectingTestCase): class ExceptionsTestCase(ConnectingTestCase):

View File

@ -183,7 +183,7 @@ class AsyncReplicationTest(ReplicationTestCase):
@skip_repl_if_green @skip_repl_if_green
def test_async_replication(self): def test_async_replication(self):
conn = self.repl_connect( conn = self.repl_connect(
connection_factory=LogicalReplicationConnection, async=1) connection_factory=LogicalReplicationConnection, async_=1)
if conn is None: if conn is None:
return return

View File

@ -134,7 +134,7 @@ class ConnectingTestCase(unittest.TestCase):
import psycopg2 import psycopg2
try: try:
conn = self.connect(**kwargs) conn = self.connect(**kwargs)
if conn.async == 1: if conn.async_ == 1:
self.wait(conn) self.wait(conn)
except psycopg2.OperationalError, e: except psycopg2.OperationalError, e:
# If pgcode is not set it is a genuine connection error # If pgcode is not set it is a genuine connection error
@ -459,3 +459,7 @@ class py3_raises_typeerror(object):
if sys.version_info[0] >= 3: if sys.version_info[0] >= 3:
assert type is TypeError assert type is TypeError
return True return True
def assertDsnEqual(testsuite, dsn1, dsn2):
testsuite.assertEqual(set(dsn1.split()), set(dsn2.split()))