Merge branch 'master' into sql-compose

This commit is contained in:
Daniele Varrazzo 2017-02-03 04:56:02 +00:00
commit de8b335d80
34 changed files with 921 additions and 104 deletions

1
.gitignore vendored
View File

@ -13,3 +13,4 @@ scripts/pypi_docs_upload.py
env
.tox
/rel
/wheels

View File

@ -6,7 +6,7 @@ language: python
python:
- 2.7
- 3.6-dev
- 3.6
- 2.6
- 3.5
- 3.4

8
NEWS
View File

@ -28,6 +28,14 @@ New features:
- `~cursor.callproc()` now accepts a dictionary of parameters (:ticket:`#381`).
- Using Python C API decoding functions and codecs caching for faster
unicode encoding/decoding (:ticket:`#473`).
- `~cursor.executemany()` slowness addressed by
`~psycopg2.extras.execute_batch()` and `~psycopg2.extras.execute_values()`
(:ticket:`#491`).
- Added ``async_`` as an alias for ``async`` to support Python 3.7 where
``async`` will become a keyword (:ticket:`#495`).
Bug fixes:
- Fixed error caused by missing decoding `~psycopg2.extras.LoggingConnection`
(:ticket:`#483`).

View File

@ -1,4 +1,4 @@
@import url("default.css");
@import url("classic.css");
blockquote {
font-style: italic;
@ -14,11 +14,13 @@ div.dbapi-extension {
border: 1px solid #aaf;
}
code.sql,
tt.sql {
font-size: 1em;
background-color: transparent;
}
a > code.sql:hover,
a > tt.sql:hover {
text-decoration: underline;
}

View File

@ -127,7 +127,7 @@ rst_epilog = """
# The theme to use for HTML and HTML Help pages. Major themes that come with
# Sphinx are currently 'default' and 'sphinxdoc'.
html_theme = 'default'
html_theme = 'classic'
# The stylesheet to use with HTML output: this will include the original one
# adding a few classes.

View File

@ -706,9 +706,13 @@ The ``connection`` class
.. attribute:: async
async_
Read only attribute: 1 if the connection is asynchronous, 0 otherwise.
.. versionchanged:: 2.7 added the `!async_` alias for Python versions
where `!async` is a keyword.
.. method:: poll()

View File

@ -196,6 +196,11 @@ The ``cursor`` class
Parameters are bounded to the query using the same rules described in
the `~cursor.execute()` method.
.. warning::
In its current implementation this method is not faster than
executing `~cursor.execute()` in a loop. For better performance
you can use the functions described in :ref:`fast-exec`.
.. method:: callproc(procname [, parameters])

View File

@ -29,6 +29,8 @@ introspection etc.
For a complete description of the class, see `connection`.
.. versionchanged:: 2.7
*async_* can be used as alias for *async*.
.. class:: cursor(conn, name=None)

View File

@ -974,6 +974,63 @@ converted into lists of strings.
future versions.
.. _fast-exec:
Fast execution helpers
----------------------
The current implementation of `~cursor.executemany()` is (using an extremely
charitable understatement) not particularly performing. These functions can
be used to speed up the repeated execution of a statement againts a set of
parameters. By reducing the number of server roundtrips the performance can be
`orders of magnitude better`__ than using `!executemany()`.
.. __: https://github.com/psycopg/psycopg2/issues/491#issuecomment-276551038
.. autofunction:: execute_batch
.. versionadded:: 2.7
.. note::
`!execute_batch()` can be also used in conjunction with PostgreSQL
prepared statements using |PREPARE|_, |EXECUTE|_, |DEALLOCATE|_.
Instead of executing::
execute_batch(cur,
"big and complex SQL with %s %s params",
params_list)
it is possible to execute something like::
cur.execute("PREPARE stmt AS big and complex SQL with $1 $2 params")
execute_batch(cur, "EXECUTE stmt (%s, %s)", params_list)
cur.execute("DEALLOCATE stmt")
which may bring further performance benefits: if the operation to perform
is complex, every single execution will be faster as the query plan is
already cached; furthermore the amount of data to send on the server will
be lesser (one |EXECUTE| per param set instead of the whole, likely
longer, statement).
.. |PREPARE| replace:: :sql:`PREPARE`
.. _PREPARE: https://www.postgresql.org/docs/current/static/sql-prepare.html
.. |EXECUTE| replace:: :sql:`EXECUTE`
.. _EXECUTE: https://www.postgresql.org/docs/current/static/sql-execute.html
.. |DEALLOCATE| replace:: :sql:`DEALLOCATE`
.. _DEALLOCATE: https://www.postgresql.org/docs/current/static/sql-deallocate.html
.. autofunction:: execute_values
.. versionadded:: 2.7
.. index::
single: Time zones; Fractional

View File

@ -18,8 +18,8 @@ The current `!psycopg2` implementation supports:
NOTE: keep consistent with setup.py and the /features/ page.
- Python 2 versions from 2.6 to 2.7
- Python 3 versions from 3.1 to 3.5
- PostgreSQL server versions from 7.4 to 9.5
- Python 3 versions from 3.1 to 3.6
- PostgreSQL server versions from 7.4 to 9.6
- PostgreSQL client library version from 9.1
.. _PostgreSQL: http://www.postgresql.org/

View File

@ -64,7 +64,8 @@ The module interface respects the standard defined in the |DBAPI|_.
cursors you can use this parameter instead of subclassing a connection.
Using *async*\=\ `!True` an asynchronous connection will be created: see
:ref:`async-support` to know about advantages and limitations.
:ref:`async-support` to know about advantages and limitations. *async_* is
a valid alias for the Python version where ``async`` is a keyword.
.. versionchanged:: 2.4.3
any keyword argument is passed to the connection. Previously only the
@ -76,6 +77,9 @@ The module interface respects the standard defined in the |DBAPI|_.
.. versionchanged:: 2.7
both *dsn* and keyword arguments can be specified.
.. versionchanged:: 2.7
added *async_* alias.
.. seealso::
- `~psycopg2.extensions.parse_dsn`

View File

@ -82,8 +82,7 @@ else:
del Decimal, Adapter
def connect(dsn=None, connection_factory=None, cursor_factory=None,
async=False, **kwargs):
def connect(dsn=None, connection_factory=None, cursor_factory=None, **kwargs):
"""
Create a new database connection.
@ -111,17 +110,24 @@ def connect(dsn=None, connection_factory=None, cursor_factory=None,
Using the *cursor_factory* parameter, a new default cursor factory will be
used by cursor().
Using *async*=True an asynchronous connection will be created.
Using *async*=True an asynchronous connection will be created. *async_* is
a valid alias (for Python versions where ``async`` is a keyword).
Any other keyword parameter will be passed to the underlying client
library: the list of supported parameters depends on the library version.
"""
kwasync = {}
if 'async' in kwargs:
kwasync['async'] = kwargs.pop('async')
if 'async_' in kwargs:
kwasync['async_'] = kwargs.pop('async_')
if dsn is None and not kwargs:
raise TypeError('missing dsn and no parameters')
dsn = _ext.make_dsn(dsn, **kwargs)
conn = _connect(dsn, connection_factory=connection_factory, async=async)
conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
if cursor_factory is not None:
conn.cursor_factory = cursor_factory

View File

@ -106,6 +106,7 @@ class DictCursorBase(_cursor):
return res
def __iter__(self):
try:
if self._prefetch:
res = super(DictCursorBase, self).__iter__()
first = res.next()
@ -118,6 +119,8 @@ class DictCursorBase(_cursor):
yield first
while 1:
yield res.next()
except StopIteration:
return
class DictConnection(_connection):
@ -343,6 +346,7 @@ class NamedTupleCursor(_cursor):
return map(nt._make, ts)
def __iter__(self):
try:
it = super(NamedTupleCursor, self).__iter__()
t = it.next()
@ -354,6 +358,8 @@ class NamedTupleCursor(_cursor):
while 1:
yield nt._make(it.next())
except StopIteration:
return
try:
from collections import namedtuple
@ -1135,3 +1141,142 @@ def register_composite(name, conn_or_curs, globally=False, factory=None):
caster.array_typecaster, not globally and conn_or_curs or None)
return caster
def _paginate(seq, page_size):
"""Consume an iterable and return it in chunks.
Every chunk is at most `page_size`. Never return an empty chunk.
"""
page = []
it = iter(seq)
while 1:
try:
for i in xrange(page_size):
page.append(it.next())
yield page
page = []
except StopIteration:
if page:
yield page
return
def execute_batch(cur, sql, argslist, page_size=100):
"""Execute groups of statements in fewer server roundtrips.
Execute *sql* several times, against all parameters set (sequences or
mappings) found in *argslist*.
The function is semantically similar to
.. parsed-literal::
*cur*\.\ `~cursor.executemany`\ (\ *sql*\ , *argslist*\ )
but has a different implementation: Psycopg will join the statements into
fewer multi-statement commands, each one containing at most *page_size*
statements, resulting in a reduced number of server roundtrips.
"""
for page in _paginate(argslist, page_size=page_size):
sqls = [cur.mogrify(sql, args) for args in page]
cur.execute(b";".join(sqls))
def execute_values(cur, sql, argslist, template=None, page_size=100):
'''Execute a statement using :sql:`VALUES` with a sequence of parameters.
:param cur: the cursor to use to execute the query.
:param sql: the query to execute. It must contain a single ``%s``
placeholder, which will be replaced by a `VALUES list`__.
Example: ``"INSERT INTO mytable (id, f1, f2) VALUES %s"``.
:param argslist: sequence of sequences or dictionaries with the arguments
to send to the query. The type and content must be consistent with
*template*.
:param template: the snippet to merge to every item in *argslist* to
compose the query. If *argslist* items are sequences it should contain
positional placeholders (e.g. ``"(%s, %s, %s)"``, or ``"(%s, %s, 42)``"
if there are constants value...); If *argslist* is items are mapping
it should contain named placeholders (e.g. ``"(%(id)s, %(f1)s, 42)"``).
If not specified, assume the arguments are sequence and use a simple
positional template (i.e. ``(%s, %s, ...)``), with the number of
placeholders sniffed by the first element in *argslist*.
:param page_size: maximum number of *argslist* items to include in every
statement. If there are more items the function will execute more than
one statement.
.. __: https://www.postgresql.org/docs/current/static/queries-values.html
While :sql:`INSERT` is an obvious candidate for this function it is
possible to use it with other statements, for example::
>>> cur.execute(
... "create table test (id int primary key, v1 int, v2 int)")
>>> execute_values(cur,
... "INSERT INTO test (id, v1, v2) VALUES %s",
... [(1, 2, 3), (4, 5, 6), (7, 8, 9)])
>>> execute_values(cur,
... """UPDATE test SET v1 = data.v1 FROM (VALUES %s) AS data (id, v1)
... WHERE test.id = data.id""",
... [(1, 20), (4, 50)])
>>> cur.execute("select * from test order by id")
>>> cur.fetchall()
[(1, 20, 3), (4, 50, 6), (7, 8, 9)])
'''
# we can't just use sql % vals because vals is bytes: if sql is bytes
# there will be some decoding error because of stupid codec used, and Py3
# doesn't implement % on bytes.
if not isinstance(sql, bytes):
sql = sql.encode(_ext.encodings[cur.connection.encoding])
pre, post = _split_sql(sql)
for page in _paginate(argslist, page_size=page_size):
if template is None:
template = b'(' + b','.join([b'%s'] * len(page[0])) + b')'
parts = pre[:]
for args in page:
parts.append(cur.mogrify(template, args))
parts.append(b',')
parts[-1:] = post
cur.execute(b''.join(parts))
def _split_sql(sql):
"""Split *sql* on a single ``%s`` placeholder.
Split on the %s, perform %% replacement and return pre, post lists of
snippets.
"""
curr = pre = []
post = []
tokens = _re.split(br'(%.)', sql)
for token in tokens:
if len(token) != 2 or token[:1] != b'%':
curr.append(token)
continue
if token[1:] == b's':
if curr is pre:
curr = post
else:
raise ValueError(
"the query contains more than one '%s' placeholder")
elif token[1:] == b'%':
curr.append(b'%')
else:
raise ValueError("unsupported format character: '%s'"
% token[1:].decode('ascii', 'replace'))
if curr is pre:
raise ValueError("the query doesn't contain any '%s' placeholder")
return pre, post

View File

@ -1040,6 +1040,8 @@ static struct PyMemberDef connectionObject_members[] = {
"The current connection string."},
{"async", T_LONG, offsetof(connectionObject, async), READONLY,
"True if the connection is asynchronous."},
{"async_", T_LONG, offsetof(connectionObject, async), READONLY,
"True if the connection is asynchronous."},
{"status", T_INT,
offsetof(connectionObject, status), READONLY,
"The current transaction status."},
@ -1186,12 +1188,14 @@ static int
connection_init(PyObject *obj, PyObject *args, PyObject *kwds)
{
const char *dsn;
long int async = 0;
static char *kwlist[] = {"dsn", "async", NULL};
long int async = 0, async_ = 0;
static char *kwlist[] = {"dsn", "async", "async_", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|l", kwlist, &dsn, &async))
if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|ll", kwlist,
&dsn, &async, &async_))
return -1;
if (async_) { async = async_; }
return connection_setup((connectionObject *)obj, dsn, async);
}

View File

@ -82,15 +82,17 @@ psyco_connect(PyObject *self, PyObject *args, PyObject *keywds)
PyObject *conn = NULL;
PyObject *factory = NULL;
const char *dsn = NULL;
int async = 0;
int async = 0, async_ = 0;
static char *kwlist[] = {"dsn", "connection_factory", "async", NULL};
static char *kwlist[] = {"dsn", "connection_factory", "async", "async_", NULL};
if (!PyArg_ParseTupleAndKeywords(args, keywds, "s|Oi", kwlist,
&dsn, &factory, &async)) {
if (!PyArg_ParseTupleAndKeywords(args, keywds, "s|Oii", kwlist,
&dsn, &factory, &async, &async_)) {
return NULL;
}
if (async_) { async = async_; }
Dprintf("psyco_connect: dsn = '%s', async = %d", dsn, async);
/* allocate connection, fill with errors and return it */

50
scripts/build-manylinux.sh Executable file
View File

@ -0,0 +1,50 @@
#!/bin/bash
# Create manylinux1 wheels for psycopg2
#
# Run this script with something like:
#
# docker run --rm -v `pwd`:/psycopg2 quay.io/pypa/manylinux1_x86_64 /psycopg2/scripts/build-manylinux.sh
# docker run --rm -v `pwd`:/psycopg2 quay.io/pypa/manylinux1_i686 linux32 /psycopg2/scripts/build-manylinux.sh
#
# Tests run against a postgres on the host. Use -e PSYCOPG_TESTDB_USER=... etc
# to configure tests run.
set -e -x
# Install postgres packages for build and testing
# This doesn't work:
# rpm -Uvh "http://yum.postgresql.org/9.5/redhat/rhel-5-x86_64/pgdg-redhat95-9.5-3.noarch.rpm"
wget -O "/tmp/pgdg.rpm" "https://download.postgresql.org/pub/repos/yum/9.5/redhat/rhel-5-x86_64/pgdg-centos95-9.5-3.noarch.rpm"
rpm -Uv "/tmp/pgdg.rpm"
yum install -y postgresql95-devel
# Make pg_config available
export PGPATH=/usr/pgsql-9.5/bin/
export PATH="$PGPATH:$PATH"
# Find psycopg version
export VERSION=$(grep -e ^PSYCOPG_VERSION /psycopg2/setup.py | sed "s/.*'\(.*\)'/\1/")
export WHEELSDIR="/psycopg2/wheels/psycopg2-$VERSION"
# Create the wheel packages
for PYBIN in /opt/python/*/bin; do
"${PYBIN}/pip" wheel /psycopg2/ -w "$WHEELSDIR"
done
# Bundle external shared libraries into the wheels
for WHL in "$WHEELSDIR"/*.whl; do
auditwheel repair "$WHL" -w "$WHEELSDIR"
done
# Make sure libpq is not in the system
yum remove -y postgresql95-devel
# Connect to the host to test. Use 'docker -e' to pass other variables
export PSYCOPG2_TESTDB_HOST=$(ip route show | awk '/default/ {print $3}')
# Install packages and test
for PYBIN in /opt/python/*/bin; do
"${PYBIN}/pip" install psycopg2 --no-index -f "$WHEELSDIR"
"${PYBIN}/python" -c "from psycopg2 import tests; tests.unittest.main(defaultTest='tests.test_suite')"
done

View File

@ -50,7 +50,11 @@ else:
# workaround subclass for ticket #153
pass
sys.path.insert(0, 'scripts')
# Configure distutils to run our custom 2to3 fixers as well
from lib2to3.refactor import get_fixers_from_package
build_py.fixer_names = [f for f in get_fixers_from_package('lib2to3.fixes')
# creates a pending deprecation warning on py 3.4
if not f.endswith('.fix_reload')]
try:
import configparser
@ -79,6 +83,7 @@ Programming Language :: Python :: 3.2
Programming Language :: Python :: 3.3
Programming Language :: Python :: 3.4
Programming Language :: Python :: 3.5
Programming Language :: Python :: 3.6
Programming Language :: C
Programming Language :: SQL
Topic :: Database

View File

@ -22,6 +22,11 @@
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
# Convert warnings into errors here. We can't do it with -W because on
# Travis importing site raises a warning.
import warnings
warnings.simplefilter('error') # noqa
import sys
from testconfig import dsn
from testutils import unittest
@ -36,6 +41,7 @@ import test_cursor
import test_dates
import test_errcodes
import test_extras_dictcursor
import test_fast_executemany
import test_green
import test_ipaddress
import test_lobject
@ -50,6 +56,9 @@ import test_types_basic
import test_types_extras
import test_with
if sys.version_info[:2] < (3, 6):
import test_async_keyword
def test_suite():
# If connection to test db fails, bail out early.
@ -65,6 +74,8 @@ def test_suite():
suite = unittest.TestSuite()
suite.addTest(test_async.test_suite())
if sys.version_info[:2] < (3, 6):
suite.addTest(test_async_keyword.test_suite())
suite.addTest(test_bugX000.test_suite())
suite.addTest(test_bug_gc.test_suite())
suite.addTest(test_cancel.test_suite())
@ -74,6 +85,7 @@ def test_suite():
suite.addTest(test_dates.test_suite())
suite.addTest(test_errcodes.test_suite())
suite.addTest(test_extras_dictcursor.test_suite())
suite.addTest(test_fast_executemany.test_suite())
suite.addTest(test_green.test_suite())
suite.addTest(test_ipaddress.test_suite())
suite.addTest(test_lobject.test_suite())

View File

@ -23,7 +23,7 @@
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
from testutils import unittest, skip_before_postgres
from testutils import unittest, skip_before_postgres, slow
import psycopg2
from psycopg2 import extensions
@ -55,7 +55,7 @@ class AsyncTests(ConnectingTestCase):
ConnectingTestCase.setUp(self)
self.sync_conn = self.conn
self.conn = self.connect(async=True)
self.conn = self.connect(async_=True)
self.wait(self.conn)
@ -71,8 +71,8 @@ class AsyncTests(ConnectingTestCase):
sync_cur = self.sync_conn.cursor()
del cur, sync_cur
self.assert_(self.conn.async)
self.assert_(not self.sync_conn.async)
self.assert_(self.conn.async_)
self.assert_(not self.sync_conn.async_)
# the async connection should be in isolevel 0
self.assertEquals(self.conn.isolation_level, 0)
@ -97,6 +97,7 @@ class AsyncTests(ConnectingTestCase):
self.assertFalse(self.conn.isexecuting())
self.assertEquals(cur.fetchone()[0], "a")
@slow
@skip_before_postgres(8, 2)
def test_async_callproc(self):
cur = self.conn.cursor()
@ -107,6 +108,7 @@ class AsyncTests(ConnectingTestCase):
self.assertFalse(self.conn.isexecuting())
self.assertEquals(cur.fetchall()[0][0], '')
@slow
def test_async_after_async(self):
cur = self.conn.cursor()
cur2 = self.conn.cursor()
@ -310,14 +312,15 @@ class AsyncTests(ConnectingTestCase):
def test_async_subclass(self):
class MyConn(psycopg2.extensions.connection):
def __init__(self, dsn, async=0):
psycopg2.extensions.connection.__init__(self, dsn, async=async)
def __init__(self, dsn, async_=0):
psycopg2.extensions.connection.__init__(self, dsn, async_=async_)
conn = self.connect(connection_factory=MyConn, async=True)
conn = self.connect(connection_factory=MyConn, async_=True)
self.assert_(isinstance(conn, MyConn))
self.assert_(conn.async)
self.assert_(conn.async_)
conn.close()
@slow
def test_flush_on_write(self):
# a very large query requires a flush loop to be sent to the backend
curs = self.conn.cursor()
@ -438,7 +441,7 @@ class AsyncTests(ConnectingTestCase):
def test_async_connection_error_message(self):
try:
cnn = psycopg2.connect('dbname=thisdatabasedoesntexist', async=True)
cnn = psycopg2.connect('dbname=thisdatabasedoesntexist', async_=True)
self.wait(cnn)
except psycopg2.Error, e:
self.assertNotEqual(str(e), "asynchronous connection failed",

217
tests/test_async_keyword.py Executable file
View File

@ -0,0 +1,217 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# test_async_keyword.py - test for objects using 'async' as attribute/param
#
# Copyright (C) 2017 Daniele Varrazzo <daniele.varrazzo@gmail.com>
#
# psycopg2 is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# In addition, as a special exception, the copyright holders give
# permission to link this program with the OpenSSL library (or with
# modified versions of OpenSSL that use the same license as OpenSSL),
# and distribute linked combinations including the two.
#
# You must obey the GNU Lesser General Public License in all respects for
# all of the code used other than OpenSSL.
#
# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
import psycopg2
from psycopg2 import extras
from testconfig import dsn
from testutils import (ConnectingTestCase, unittest, skip_before_postgres,
assertDsnEqual)
from test_replication import ReplicationTestCase, skip_repl_if_green
from psycopg2.extras import LogicalReplicationConnection, StopReplication
class AsyncTests(ConnectingTestCase):
def setUp(self):
ConnectingTestCase.setUp(self)
self.sync_conn = self.conn
self.conn = self.connect(async=True)
self.wait(self.conn)
curs = self.conn.cursor()
curs.execute('''
CREATE TEMPORARY TABLE table1 (
id int PRIMARY KEY
)''')
self.wait(curs)
def test_connection_setup(self):
cur = self.conn.cursor()
sync_cur = self.sync_conn.cursor()
del cur, sync_cur
self.assert_(self.conn.async)
self.assert_(not self.sync_conn.async)
# the async connection should be in isolevel 0
self.assertEquals(self.conn.isolation_level, 0)
# check other properties to be found on the connection
self.assert_(self.conn.server_version)
self.assert_(self.conn.protocol_version in (2, 3))
self.assert_(self.conn.encoding in psycopg2.extensions.encodings)
def test_async_subclass(self):
class MyConn(psycopg2.extensions.connection):
def __init__(self, dsn, async=0):
psycopg2.extensions.connection.__init__(self, dsn, async=async)
conn = self.connect(connection_factory=MyConn, async=True)
self.assert_(isinstance(conn, MyConn))
self.assert_(conn.async)
conn.close()
def test_async_connection_error_message(self):
try:
cnn = psycopg2.connect('dbname=thisdatabasedoesntexist', async=True)
self.wait(cnn)
except psycopg2.Error, e:
self.assertNotEqual(str(e), "asynchronous connection failed",
"connection error reason lost")
else:
self.fail("no exception raised")
class CancelTests(ConnectingTestCase):
def setUp(self):
ConnectingTestCase.setUp(self)
cur = self.conn.cursor()
cur.execute('''
CREATE TEMPORARY TABLE table1 (
id int PRIMARY KEY
)''')
self.conn.commit()
@skip_before_postgres(8, 2)
def test_async_cancel(self):
async_conn = psycopg2.connect(dsn, async=True)
self.assertRaises(psycopg2.OperationalError, async_conn.cancel)
extras.wait_select(async_conn)
cur = async_conn.cursor()
cur.execute("select pg_sleep(10000)")
self.assertTrue(async_conn.isexecuting())
async_conn.cancel()
self.assertRaises(psycopg2.extensions.QueryCanceledError,
extras.wait_select, async_conn)
cur.execute("select 1")
extras.wait_select(async_conn)
self.assertEqual(cur.fetchall(), [(1, )])
def test_async_connection_cancel(self):
async_conn = psycopg2.connect(dsn, async=True)
async_conn.close()
self.assertTrue(async_conn.closed)
class ConnectTestCase(unittest.TestCase):
def setUp(self):
self.args = None
def connect_stub(dsn, connection_factory=None, async=False):
self.args = (dsn, connection_factory, async)
self._connect_orig = psycopg2._connect
psycopg2._connect = connect_stub
def tearDown(self):
psycopg2._connect = self._connect_orig
def test_there_has_to_be_something(self):
self.assertRaises(TypeError, psycopg2.connect)
self.assertRaises(TypeError, psycopg2.connect,
connection_factory=lambda dsn, async=False: None)
self.assertRaises(TypeError, psycopg2.connect,
async=True)
def test_factory(self):
def f(dsn, async=False):
pass
psycopg2.connect(database='foo', host='baz', connection_factory=f)
assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], f)
self.assertEqual(self.args[2], False)
psycopg2.connect("dbname=foo host=baz", connection_factory=f)
assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], f)
self.assertEqual(self.args[2], False)
def test_async(self):
psycopg2.connect(database='foo', host='baz', async=1)
assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], None)
self.assert_(self.args[2])
psycopg2.connect("dbname=foo host=baz", async=True)
assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], None)
self.assert_(self.args[2])
class AsyncReplicationTest(ReplicationTestCase):
@skip_before_postgres(9, 4) # slots require 9.4
@skip_repl_if_green
def test_async_replication(self):
conn = self.repl_connect(
connection_factory=LogicalReplicationConnection, async=1)
if conn is None:
return
cur = conn.cursor()
self.create_replication_slot(cur, output_plugin='test_decoding')
self.wait(cur)
cur.start_replication(self.slot)
self.wait(cur)
self.make_replication_events()
self.msg_count = 0
def consume(msg):
# just check the methods
"%s: %s" % (cur.io_timestamp, repr(msg))
self.msg_count += 1
if self.msg_count > 3:
cur.send_feedback(reply=True)
raise StopReplication()
cur.send_feedback(flush_lsn=msg.data_start)
# cannot be used in asynchronous mode
self.assertRaises(psycopg2.ProgrammingError, cur.consume_stream, consume)
def process_stream():
from select import select
while True:
msg = cur.read_message()
if msg:
consume(msg)
else:
select([cur], [], [])
self.assertRaises(StopReplication, process_stream)
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)
if __name__ == "__main__":
unittest.main()

View File

@ -30,7 +30,7 @@ import psycopg2.extensions
from psycopg2 import extras
from testconfig import dsn
from testutils import unittest, ConnectingTestCase, skip_before_postgres
from testutils import unittest, ConnectingTestCase, skip_before_postgres, slow
class CancelTests(ConnectingTestCase):
@ -48,6 +48,7 @@ class CancelTests(ConnectingTestCase):
def test_empty_cancel(self):
self.conn.cancel()
@slow
@skip_before_postgres(8, 2)
def test_cancel(self):
errors = []
@ -87,7 +88,7 @@ class CancelTests(ConnectingTestCase):
@skip_before_postgres(8, 2)
def test_async_cancel(self):
async_conn = psycopg2.connect(dsn, async=True)
async_conn = psycopg2.connect(dsn, async_=True)
self.assertRaises(psycopg2.OperationalError, async_conn.cancel)
extras.wait_select(async_conn)
cur = async_conn.cursor()
@ -101,7 +102,7 @@ class CancelTests(ConnectingTestCase):
self.assertEqual(cur.fetchall(), [(1, )])
def test_async_connection_cancel(self):
async_conn = psycopg2.connect(dsn, async=True)
async_conn = psycopg2.connect(dsn, async_=True)
async_conn.close()
self.assertTrue(async_conn.closed)

View File

@ -33,9 +33,9 @@ import psycopg2.errorcodes
from psycopg2 import extensions as ext
from testutils import (
unittest, decorate_all_tests, skip_if_no_superuser,
unittest, assertDsnEqual, decorate_all_tests, skip_if_no_superuser,
skip_before_postgres, skip_after_postgres, skip_before_libpq,
ConnectingTestCase, skip_if_tpc_disabled, skip_if_windows)
ConnectingTestCase, skip_if_tpc_disabled, skip_if_windows, slow)
from testconfig import dsn, dbname
@ -125,6 +125,7 @@ class ConnectionTests(ConnectingTestCase):
self.assert_('table3' in conn.notices[2])
self.assert_('table4' in conn.notices[3])
@slow
def test_notices_limited(self):
conn = self.conn
cur = conn.cursor()
@ -138,6 +139,7 @@ class ConnectionTests(ConnectingTestCase):
self.assertEqual(50, len(conn.notices))
self.assert_('table99' in conn.notices[-1], conn.notices[-1])
@slow
def test_notices_deque(self):
from collections import deque
@ -196,6 +198,7 @@ class ConnectionTests(ConnectingTestCase):
self.assertRaises(psycopg2.NotSupportedError,
cnn.xid, 42, "foo", "bar")
@slow
@skip_before_postgres(8, 2)
def test_concurrent_execution(self):
def slave():
@ -246,6 +249,7 @@ class ConnectionTests(ConnectingTestCase):
gc.collect()
self.assert_(w() is None)
@slow
def test_commit_concurrency(self):
# The problem is the one reported in ticket #103. Because of bad
# status check, we commit even when a commit is already on its way.
@ -392,9 +396,6 @@ class ParseDsnTestCase(ConnectingTestCase):
class MakeDsnTestCase(ConnectingTestCase):
def assertDsnEqual(self, dsn1, dsn2):
self.assertEqual(set(dsn1.split()), set(dsn2.split()))
def test_empty_arguments(self):
self.assertEqual(ext.make_dsn(), '')
@ -412,7 +413,7 @@ class MakeDsnTestCase(ConnectingTestCase):
def test_empty_param(self):
dsn = ext.make_dsn(dbname='sony', password='')
self.assertDsnEqual(dsn, "dbname=sony password=''")
assertDsnEqual(self, dsn, "dbname=sony password=''")
def test_escape(self):
dsn = ext.make_dsn(dbname='hello world')
@ -435,10 +436,10 @@ class MakeDsnTestCase(ConnectingTestCase):
def test_params_merging(self):
dsn = ext.make_dsn('dbname=foo host=bar', host='baz')
self.assertDsnEqual(dsn, 'dbname=foo host=baz')
assertDsnEqual(self, dsn, 'dbname=foo host=baz')
dsn = ext.make_dsn('dbname=foo', user='postgres')
self.assertDsnEqual(dsn, 'dbname=foo user=postgres')
assertDsnEqual(self, dsn, 'dbname=foo user=postgres')
def test_no_dsn_munging(self):
dsnin = 'dbname=a host=b user=c password=d'
@ -452,7 +453,7 @@ class MakeDsnTestCase(ConnectingTestCase):
self.assertEqual(dsn, url)
dsn = ext.make_dsn(url, application_name='woot')
self.assertDsnEqual(dsn,
assertDsnEqual(self, dsn,
'dbname=test user=tester password=secret application_name=woot')
self.assertRaises(psycopg2.ProgrammingError,
@ -899,6 +900,7 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
(dbname,))
self.assertEqual('42_Z3RyaWQ=_YnF1YWw=', cur.fetchone()[0])
@slow
def test_xid_roundtrip(self):
for fid, gtrid, bqual in [
(0, "", ""),
@ -921,6 +923,7 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
cnn.tpc_rollback(xid)
@slow
def test_unparsed_roundtrip(self):
for tid in [
'',

View File

@ -24,8 +24,8 @@
import sys
import string
from testutils import unittest, ConnectingTestCase, decorate_all_tests
from testutils import skip_if_no_iobase, skip_before_postgres
from testutils import (unittest, ConnectingTestCase, decorate_all_tests,
skip_if_no_iobase, skip_before_postgres, slow)
from cStringIO import StringIO
from itertools import cycle, izip
from subprocess import Popen, PIPE
@ -77,6 +77,7 @@ class CopyTests(ConnectingTestCase):
data text
)''')
@slow
def test_copy_from(self):
curs = self.conn.cursor()
try:
@ -84,6 +85,7 @@ class CopyTests(ConnectingTestCase):
finally:
curs.close()
@slow
def test_copy_from_insane_size(self):
# Trying to trigger a "would block" error
curs = self.conn.cursor()
@ -120,6 +122,7 @@ class CopyTests(ConnectingTestCase):
self.assertRaises(ZeroDivisionError,
curs.copy_from, MinimalRead(f), "tcopy", columns=cols())
@slow
def test_copy_to(self):
curs = self.conn.cursor()
try:
@ -309,6 +312,7 @@ class CopyTests(ConnectingTestCase):
curs.copy_from, StringIO('aaa\nbbb\nccc\n'), 'tcopy')
self.assertEqual(curs.rowcount, -1)
@slow
def test_copy_from_segfault(self):
# issue #219
script = ("""\
@ -327,6 +331,7 @@ conn.close()
proc.communicate()
self.assertEqual(0, proc.returncode)
@slow
def test_copy_to_segfault(self):
# issue #219
script = ("""\

View File

@ -26,8 +26,8 @@ import time
import pickle
import psycopg2
import psycopg2.extensions
from testutils import unittest, ConnectingTestCase, skip_before_postgres
from testutils import skip_if_no_namedtuple, skip_if_no_getrefcount
from testutils import (unittest, ConnectingTestCase, skip_before_postgres,
skip_if_no_namedtuple, skip_if_no_getrefcount, slow)
class CursorTests(ConnectingTestCase):
@ -331,6 +331,7 @@ class CursorTests(ConnectingTestCase):
curs.scroll(2)
self.assertRaises(psycopg2.OperationalError, curs.scroll, -1)
@slow
@skip_before_postgres(8, 2)
def test_iter_named_cursor_efficient(self):
curs = self.conn.cursor('tmp')

View File

@ -22,11 +22,14 @@
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
from testutils import unittest, ConnectingTestCase
from testutils import unittest, ConnectingTestCase, slow
try:
reload
except NameError:
try:
from importlib import reload
except ImportError:
from imp import reload
from threading import Thread
@ -34,6 +37,7 @@ from psycopg2 import errorcodes
class ErrocodeTests(ConnectingTestCase):
@slow
def test_lookup_threadsafe(self):
# Increase if it does not fail with KeyError

237
tests/test_fast_executemany.py Executable file
View File

@ -0,0 +1,237 @@
#!/usr/bin/env python
#
# test_fast_executemany.py - tests for fast executemany implementations
#
# Copyright (C) 2017 Daniele Varrazzo <daniele.varrazzo@gmail.com>
#
# psycopg2 is free software: you can redistribute it and/or modify it
# under the terms of the GNU Lesser General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# psycopg2 is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
import unittest
from datetime import date
from testutils import ConnectingTestCase
import psycopg2
import psycopg2.extras
import psycopg2.extensions as ext
class TestPaginate(unittest.TestCase):
def test_paginate(self):
def pag(seq):
return psycopg2.extras._paginate(seq, 100)
self.assertEqual(list(pag([])), [])
self.assertEqual(list(pag([1])), [[1]])
self.assertEqual(list(pag(range(99))), [list(range(99))])
self.assertEqual(list(pag(range(100))), [list(range(100))])
self.assertEqual(list(pag(range(101))), [list(range(100)), [100]])
self.assertEqual(
list(pag(range(200))), [list(range(100)), list(range(100, 200))])
self.assertEqual(
list(pag(range(1000))),
[list(range(i * 100, (i + 1) * 100)) for i in range(10)])
class FastExecuteTestMixin(object):
def setUp(self):
super(FastExecuteTestMixin, self).setUp()
cur = self.conn.cursor()
cur.execute("""create table testfast (
id serial primary key, date date, val int, data text)""")
class TestExecuteBatch(FastExecuteTestMixin, ConnectingTestCase):
def test_empty(self):
cur = self.conn.cursor()
psycopg2.extras.execute_batch(cur,
"insert into testfast (id, val) values (%s, %s)",
[])
cur.execute("select * from testfast order by id")
self.assertEqual(cur.fetchall(), [])
def test_one(self):
cur = self.conn.cursor()
psycopg2.extras.execute_batch(cur,
"insert into testfast (id, val) values (%s, %s)",
iter([(1, 10)]))
cur.execute("select id, val from testfast order by id")
self.assertEqual(cur.fetchall(), [(1, 10)])
def test_tuples(self):
cur = self.conn.cursor()
psycopg2.extras.execute_batch(cur,
"insert into testfast (id, date, val) values (%s, %s, %s)",
((i, date(2017, 1, i + 1), i * 10) for i in range(10)))
cur.execute("select id, date, val from testfast order by id")
self.assertEqual(cur.fetchall(),
[(i, date(2017, 1, i + 1), i * 10) for i in range(10)])
def test_many(self):
cur = self.conn.cursor()
psycopg2.extras.execute_batch(cur,
"insert into testfast (id, val) values (%s, %s)",
((i, i * 10) for i in range(1000)))
cur.execute("select id, val from testfast order by id")
self.assertEqual(cur.fetchall(), [(i, i * 10) for i in range(1000)])
def test_pages(self):
cur = self.conn.cursor()
psycopg2.extras.execute_batch(cur,
"insert into testfast (id, val) values (%s, %s)",
((i, i * 10) for i in range(25)),
page_size=10)
# last command was 5 statements
self.assertEqual(sum(c == u';' for c in cur.query.decode('ascii')), 4)
cur.execute("select id, val from testfast order by id")
self.assertEqual(cur.fetchall(), [(i, i * 10) for i in range(25)])
def test_unicode(self):
cur = self.conn.cursor()
ext.register_type(ext.UNICODE, cur)
snowman = u"\u2603"
# unicode in statement
psycopg2.extras.execute_batch(cur,
"insert into testfast (id, data) values (%%s, %%s) -- %s" % snowman,
[(1, 'x')])
cur.execute("select id, data from testfast where id = 1")
self.assertEqual(cur.fetchone(), (1, 'x'))
# unicode in data
psycopg2.extras.execute_batch(cur,
"insert into testfast (id, data) values (%s, %s)",
[(2, snowman)])
cur.execute("select id, data from testfast where id = 2")
self.assertEqual(cur.fetchone(), (2, snowman))
# unicode in both
psycopg2.extras.execute_batch(cur,
"insert into testfast (id, data) values (%%s, %%s) -- %s" % snowman,
[(3, snowman)])
cur.execute("select id, data from testfast where id = 3")
self.assertEqual(cur.fetchone(), (3, snowman))
class TestExecuteValuse(FastExecuteTestMixin, ConnectingTestCase):
def test_empty(self):
cur = self.conn.cursor()
psycopg2.extras.execute_values(cur,
"insert into testfast (id, val) values %s",
[])
cur.execute("select * from testfast order by id")
self.assertEqual(cur.fetchall(), [])
def test_one(self):
cur = self.conn.cursor()
psycopg2.extras.execute_values(cur,
"insert into testfast (id, val) values %s",
iter([(1, 10)]))
cur.execute("select id, val from testfast order by id")
self.assertEqual(cur.fetchall(), [(1, 10)])
def test_tuples(self):
cur = self.conn.cursor()
psycopg2.extras.execute_values(cur,
"insert into testfast (id, date, val) values %s",
((i, date(2017, 1, i + 1), i * 10) for i in range(10)))
cur.execute("select id, date, val from testfast order by id")
self.assertEqual(cur.fetchall(),
[(i, date(2017, 1, i + 1), i * 10) for i in range(10)])
def test_dicts(self):
cur = self.conn.cursor()
psycopg2.extras.execute_values(cur,
"insert into testfast (id, date, val) values %s",
(dict(id=i, date=date(2017, 1, i + 1), val=i * 10, foo="bar")
for i in range(10)),
template='(%(id)s, %(date)s, %(val)s)')
cur.execute("select id, date, val from testfast order by id")
self.assertEqual(cur.fetchall(),
[(i, date(2017, 1, i + 1), i * 10) for i in range(10)])
def test_many(self):
cur = self.conn.cursor()
psycopg2.extras.execute_values(cur,
"insert into testfast (id, val) values %s",
((i, i * 10) for i in range(1000)))
cur.execute("select id, val from testfast order by id")
self.assertEqual(cur.fetchall(), [(i, i * 10) for i in range(1000)])
def test_pages(self):
cur = self.conn.cursor()
psycopg2.extras.execute_values(cur,
"insert into testfast (id, val) values %s",
((i, i * 10) for i in range(25)),
page_size=10)
# last statement was 5 tuples (one parens is for the fields list)
self.assertEqual(sum(c == '(' for c in cur.query.decode('ascii')), 6)
cur.execute("select id, val from testfast order by id")
self.assertEqual(cur.fetchall(), [(i, i * 10) for i in range(25)])
def test_unicode(self):
cur = self.conn.cursor()
ext.register_type(ext.UNICODE, cur)
snowman = u"\u2603"
# unicode in statement
psycopg2.extras.execute_values(cur,
"insert into testfast (id, data) values %%s -- %s" % snowman,
[(1, 'x')])
cur.execute("select id, data from testfast where id = 1")
self.assertEqual(cur.fetchone(), (1, 'x'))
# unicode in data
psycopg2.extras.execute_values(cur,
"insert into testfast (id, data) values %s",
[(2, snowman)])
cur.execute("select id, data from testfast where id = 2")
self.assertEqual(cur.fetchone(), (2, snowman))
# unicode in both
psycopg2.extras.execute_values(cur,
"insert into testfast (id, data) values %%s -- %s" % snowman,
[(3, snowman)])
cur.execute("select id, data from testfast where id = 3")
self.assertEqual(cur.fetchone(), (3, snowman))
def test_invalid_sql(self):
cur = self.conn.cursor()
self.assertRaises(ValueError, psycopg2.extras.execute_values, cur,
"insert", [])
self.assertRaises(ValueError, psycopg2.extras.execute_values, cur,
"insert %s and %s", [])
self.assertRaises(ValueError, psycopg2.extras.execute_values, cur,
"insert %f", [])
self.assertRaises(ValueError, psycopg2.extras.execute_values, cur,
"insert %f %s", [])
def test_percent_escape(self):
cur = self.conn.cursor()
psycopg2.extras.execute_values(cur,
"insert into testfast (id, data) values %s -- a%%b",
[(1, 'hi')])
self.assert_(b'a%%b' not in cur.query)
self.assert_(b'a%b' in cur.query)
cur.execute("select id, data from testfast")
self.assertEqual(cur.fetchall(), [(1, 'hi')])
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)
if __name__ == "__main__":
unittest.main()

View File

@ -27,7 +27,7 @@ import psycopg2
import psycopg2.extensions
import psycopg2.extras
from testutils import ConnectingTestCase
from testutils import ConnectingTestCase, slow
class ConnectionStub(object):
@ -61,6 +61,7 @@ class GreenTestCase(ConnectingTestCase):
lambda conn: psycopg2.extras.wait_select(stub))
return stub
@slow
def test_flush_on_write(self):
# a very large query requires a flush loop to be sent to the backend
conn = self.conn

View File

@ -29,8 +29,8 @@ from functools import wraps
import psycopg2
import psycopg2.extensions
from testutils import unittest, decorate_all_tests, skip_if_tpc_disabled
from testutils import ConnectingTestCase, skip_if_green
from testutils import (unittest, decorate_all_tests, skip_if_tpc_disabled,
ConnectingTestCase, skip_if_green, slow)
def skip_if_no_lo(f):
@ -191,6 +191,7 @@ class LargeObjectTests(LargeObjectTestCase):
self.assertEqual(x, u"some")
self.assertEqual(lo.read(), u" data " + snowman)
@slow
def test_read_large(self):
lo = self.conn.lobject()
data = "data" * 1000000

View File

@ -26,8 +26,8 @@ import os
import sys
from subprocess import Popen
from testutils import unittest, skip_before_python, skip_before_postgres
from testutils import ConnectingTestCase, skip_copy_if_green, script_to_py3
from testutils import (unittest, skip_before_python, skip_before_postgres,
ConnectingTestCase, skip_copy_if_green, script_to_py3, assertDsnEqual, slow)
import psycopg2
@ -36,24 +36,21 @@ class ConnectTestCase(unittest.TestCase):
def setUp(self):
self.args = None
def conect_stub(dsn, connection_factory=None, async=False):
self.args = (dsn, connection_factory, async)
def connect_stub(dsn, connection_factory=None, async_=False):
self.args = (dsn, connection_factory, async_)
self._connect_orig = psycopg2._connect
psycopg2._connect = conect_stub
psycopg2._connect = connect_stub
def tearDown(self):
psycopg2._connect = self._connect_orig
def assertDsnEqual(self, dsn1, dsn2):
self.assertEqual(set(dsn1.split()), set(dsn2.split()))
def test_there_has_to_be_something(self):
self.assertRaises(TypeError, psycopg2.connect)
self.assertRaises(TypeError, psycopg2.connect,
connection_factory=lambda dsn, async=False: None)
connection_factory=lambda dsn, async_=False: None)
self.assertRaises(TypeError, psycopg2.connect,
async=True)
async_=True)
def test_no_keywords(self):
psycopg2.connect('')
@ -92,27 +89,27 @@ class ConnectTestCase(unittest.TestCase):
self.assertEqual(self.args[0], 'options=stuff')
def test_factory(self):
def f(dsn, async=False):
def f(dsn, async_=False):
pass
psycopg2.connect(database='foo', host='baz', connection_factory=f)
self.assertDsnEqual(self.args[0], 'dbname=foo host=baz')
assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], f)
self.assertEqual(self.args[2], False)
psycopg2.connect("dbname=foo host=baz", connection_factory=f)
self.assertDsnEqual(self.args[0], 'dbname=foo host=baz')
assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], f)
self.assertEqual(self.args[2], False)
def test_async(self):
psycopg2.connect(database='foo', host='baz', async=1)
self.assertDsnEqual(self.args[0], 'dbname=foo host=baz')
psycopg2.connect(database='foo', host='baz', async_=1)
assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], None)
self.assert_(self.args[2])
psycopg2.connect("dbname=foo host=baz", async=True)
self.assertDsnEqual(self.args[0], 'dbname=foo host=baz')
psycopg2.connect("dbname=foo host=baz", async_=True)
assertDsnEqual(self, self.args[0], 'dbname=foo host=baz')
self.assertEqual(self.args[1], None)
self.assert_(self.args[2])
@ -124,7 +121,7 @@ class ConnectTestCase(unittest.TestCase):
def test_empty_param(self):
psycopg2.connect(database='sony', password='')
self.assertDsnEqual(self.args[0], "dbname=sony password=''")
assertDsnEqual(self, self.args[0], "dbname=sony password=''")
def test_escape(self):
psycopg2.connect(database='hello world')
@ -147,7 +144,7 @@ class ConnectTestCase(unittest.TestCase):
self.assertEqual(self.args[0], 'dbname=bar')
psycopg2.connect('dbname=foo', user='postgres')
self.assertDsnEqual(self.args[0], 'dbname=foo user=postgres')
assertDsnEqual(self, self.args[0], 'dbname=foo user=postgres')
class ExceptionsTestCase(ConnectingTestCase):
@ -311,6 +308,7 @@ class ExceptionsTestCase(ConnectingTestCase):
class TestExtensionModule(unittest.TestCase):
@slow
def test_import_internal(self):
# check that the internal package can be imported "naked"
# we may break this property if there is a compelling reason to do so,

View File

@ -26,7 +26,7 @@ from testutils import unittest
import psycopg2
from psycopg2 import extensions
from testutils import ConnectingTestCase, script_to_py3
from testutils import ConnectingTestCase, script_to_py3, slow
from testconfig import dsn
import sys
@ -72,6 +72,7 @@ conn.close()
return Popen([sys.executable, '-c', script_to_py3(script)], stdout=PIPE)
@slow
def test_notifies_received_on_poll(self):
self.autocommit(self.conn)
self.listen('foo')
@ -90,6 +91,7 @@ conn.close()
self.assertEqual(pid, self.conn.notifies[0][0])
self.assertEqual('foo', self.conn.notifies[0][1])
@slow
def test_many_notifies(self):
self.autocommit(self.conn)
for name in ['foo', 'bar', 'baz']:
@ -109,6 +111,7 @@ conn.close()
self.assertEqual(pids[name], pid)
names.pop(name) # raise if name found twice
@slow
def test_notifies_received_on_execute(self):
self.autocommit(self.conn)
self.listen('foo')
@ -119,6 +122,7 @@ conn.close()
self.assertEqual(pid, self.conn.notifies[0][0])
self.assertEqual('foo', self.conn.notifies[0][1])
@slow
def test_notify_object(self):
self.autocommit(self.conn)
self.listen('foo')
@ -128,6 +132,7 @@ conn.close()
notify = self.conn.notifies[0]
self.assert_(isinstance(notify, psycopg2.extensions.Notify))
@slow
def test_notify_attributes(self):
self.autocommit(self.conn)
self.listen('foo')
@ -140,6 +145,7 @@ conn.close()
self.assertEqual('foo', notify.channel)
self.assertEqual('', notify.payload)
@slow
def test_notify_payload(self):
if self.conn.server_version < 90000:
return self.skipTest("server version %s doesn't support notify payload"
@ -155,6 +161,7 @@ conn.close()
self.assertEqual('foo', notify.channel)
self.assertEqual('Hello, world!', notify.payload)
@slow
def test_notify_deque(self):
from collections import deque
self.autocommit(self.conn)
@ -167,6 +174,7 @@ conn.close()
self.assert_(isinstance(notify, psycopg2.extensions.Notify))
self.assertEqual(len(self.conn.notifies), 0)
@slow
def test_notify_noappend(self):
self.autocommit(self.conn)
self.conn.notifies = None

View File

@ -183,7 +183,7 @@ class AsyncReplicationTest(ReplicationTestCase):
@skip_repl_if_green
def test_async_replication(self):
conn = self.repl_connect(
connection_factory=LogicalReplicationConnection, async=1)
connection_factory=LogicalReplicationConnection, async_=1)
if conn is None:
return

View File

@ -23,7 +23,7 @@
# License for more details.
import threading
from testutils import unittest, ConnectingTestCase, skip_before_postgres
from testutils import unittest, ConnectingTestCase, skip_before_postgres, slow
import psycopg2
from psycopg2.extensions import (
@ -131,6 +131,7 @@ class DeadlockSerializationTests(ConnectingTestCase):
ConnectingTestCase.tearDown(self)
@slow
def test_deadlock(self):
self.thread1_error = self.thread2_error = None
step1 = threading.Event()
@ -178,6 +179,7 @@ class DeadlockSerializationTests(ConnectingTestCase):
self.assertTrue(isinstance(
error, psycopg2.extensions.TransactionRollbackError))
@slow
def test_serialisation_failure(self):
self.thread1_error = self.thread2_error = None
step1 = threading.Event()

View File

@ -17,14 +17,14 @@ from __future__ import with_statement
import re
import sys
import warnings
from decimal import Decimal
from datetime import date, datetime
from functools import wraps
from pickle import dumps, loads
from testutils import unittest, skip_if_no_uuid, skip_before_postgres
from testutils import ConnectingTestCase, decorate_all_tests
from testutils import py3_raises_typeerror
from testutils import (unittest, skip_if_no_uuid, skip_before_postgres,
ConnectingTestCase, decorate_all_tests, py3_raises_typeerror, slow)
import psycopg2
import psycopg2.extras
@ -77,7 +77,10 @@ class TypesExtrasTests(ConnectingTestCase):
self.failUnless(type(s) == list and len(s) == 0)
def testINET(self):
with warnings.catch_warnings():
warnings.simplefilter('ignore', DeprecationWarning)
psycopg2.extras.register_inet()
i = psycopg2.extras.Inet("192.168.1.0/24")
s = self.execute("SELECT %s AS foo", (i,))
self.failUnless(i.addr == s.addr)
@ -86,7 +89,10 @@ class TypesExtrasTests(ConnectingTestCase):
self.failUnless(s is None)
def testINETARRAY(self):
with warnings.catch_warnings():
warnings.simplefilter('ignore', DeprecationWarning)
psycopg2.extras.register_inet()
i = psycopg2.extras.Inet("192.168.1.0/24")
s = self.execute("SELECT %s AS foo", ([i],))
self.failUnless(i.addr == s[0].addr)
@ -708,6 +714,7 @@ class AdaptTypeTestCase(ConnectingTestCase):
curs.execute("select (1,2)::type_ii")
self.assertRaises(psycopg2.DataError, curs.fetchone)
@slow
@skip_if_no_composite
@skip_before_postgres(8, 4)
def test_from_tables(self):

View File

@ -50,6 +50,8 @@ else:
@wraps(f)
def skipIf__(self):
if cond:
with warnings.catch_warnings():
warnings.simplefilter('always', UserWarning)
warnings.warn(msg)
return
else:
@ -61,6 +63,8 @@ else:
return skipIf(True, msg)
def skipTest(self, msg):
with warnings.catch_warnings():
warnings.simplefilter('always', UserWarning)
warnings.warn(msg)
return
@ -130,7 +134,7 @@ class ConnectingTestCase(unittest.TestCase):
import psycopg2
try:
conn = self.connect(**kwargs)
if conn.async == 1:
if conn.async_ == 1:
self.wait(conn)
except psycopg2.OperationalError, e:
# If pgcode is not set it is a genuine connection error
@ -447,7 +451,6 @@ def script_to_py3(script):
class py3_raises_typeerror(object):
def __enter__(self):
pass
@ -455,3 +458,22 @@ class py3_raises_typeerror(object):
if sys.version_info[0] >= 3:
assert type is TypeError
return True
def slow(f):
"""Decorator to mark slow tests we may want to skip
Note: in order to find slow tests you can run:
make check 2>&1 | ts -i "%.s" | sort -n
"""
@wraps(f)
def slow_(self):
if os.environ.get('PSYCOPG2_TEST_FAST'):
return self.skipTest("slow test")
return f(self)
return slow_
def assertDsnEqual(testsuite, dsn1, dsn2):
testsuite.assertEqual(set(dsn1.split()), set(dsn2.split()))