Merge pull request #644 from jdufresne/noinstall-tests

Avoid installing tests to site-packages
This commit is contained in:
Daniele Varrazzo 2017-12-11 02:20:56 +00:00 committed by GitHub
commit f3d21c24fc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
77 changed files with 389 additions and 362 deletions

View File

@ -238,6 +238,7 @@ build_script:
- "%PYTHON%\\python.exe setup.py build_ext --have-ssl --pg-config %PGTOP%\\bin\\pg_config.exe -l libpgcommon -l libpgport -L %OPENSSLTOP%\\lib -I %OPENSSLTOP%\\include"
- "%PYTHON%\\python.exe setup.py build"
- "%PYTHON%\\python.exe setup.py install"
- RD /S /Q psycopg2.egg-info
#after_build:
@ -251,4 +252,4 @@ test_script:
- "%PYTHON%\\python.exe -c \"import psycopg2; print(psycopg2.__version__)\""
- "%PYTHON%\\python.exe -c \"import psycopg2; print(psycopg2.__libpq_version__)\""
- "%PYTHON%\\python.exe -c \"import psycopg2; print(psycopg2.extensions.libpq_version())\""
- "%PYTHON%\\python.exe -c \"from psycopg2 import tests; tests.unittest.main(defaultTest='tests.test_suite')\" --verbose"
- "%PYTHON%\\python.exe -c \"import tests; tests.unittest.main(defaultTest='tests.test_suite')\" --verbose"

View File

@ -12,6 +12,7 @@ python:
install:
- python setup.py install
- rm -rf psycopg2.egg-info
- sudo scripts/travis_prepare.sh
script:

View File

@ -29,8 +29,7 @@ SOURCE := $(SOURCE_C) $(SOURCE_PY) $(SOURCE_TESTS) $(SOURCE_DOC)
PACKAGE := $(BUILD_DIR)/psycopg2
PLATLIB := $(PACKAGE)/_psycopg.so
PURELIB := $(patsubst lib/%,$(PACKAGE)/%,$(SOURCE_PY)) \
$(patsubst tests/%,$(PACKAGE)/tests/%,$(SOURCE_TESTS))
PURELIB := $(patsubst lib/%,$(PACKAGE)/%,$(SOURCE_PY))
BUILD_OPT := --build-lib=$(BUILD_DIR)
BUILD_EXT_OPT := --build-lib=$(BUILD_DIR)
@ -66,7 +65,7 @@ env:
$(MAKE) -C doc $@
check:
PYTHONPATH=$(BUILD_DIR):$(PYTHONPATH) $(PYTHON) -c "from psycopg2 import tests; tests.unittest.main(defaultTest='tests.test_suite')" --verbose
PYTHONPATH=$(BUILD_DIR):$(PYTHONPATH) $(PYTHON) -c "import tests; tests.unittest.main(defaultTest='tests.test_suite')" --verbose
testdb:
@echo "* Creating $(TESTDB)"

3
NEWS
View File

@ -9,6 +9,9 @@ Other changes:
- Dropped support for Python 2.6, 3.2, 3.3.
- Dropped `psycopg1` module.
- Dropped deprecated ``register_tstz_w_secs()`` (was previously a no-op).
- The ``psycopg2.test`` package is no longer installed by ``python setup.py
install``. The test source files now are compatible with Python 2 and 3
without using 2to3.
What's new in psycopg 2.7.4

View File

@ -57,7 +57,7 @@ try:
release = psycopg2.__version__.split()[0]
version = '.'.join(release.split('.')[:2])
except ImportError:
print "WARNING: couldn't import psycopg to read version."
print("WARNING: couldn't import psycopg to read version.")
release = version
intersphinx_mapping = {

View File

@ -267,11 +267,11 @@ Running the test suite
----------------------
Once `!psycopg2` is installed you can run the test suite to verify it is
working correctly. You can run:
working correctly. From the source directory, you can run:
.. code-block:: console
$ python -c "from psycopg2 import tests; tests.unittest.main(defaultTest='tests.test_suite')" --verbose
$ python -c "import tests; tests.unittest.main(defaultTest='tests.test_suite')" --verbose
The tests run against a database called ``psycopg2_test`` on UNIX socket and
the standard port. You can configure a different database to run the test by

View File

@ -17,6 +17,7 @@
DSN = 'dbname=test'
## don't modify anything below this line (except for experimenting)
from __future__ import print_function
import sys
import psycopg2
@ -24,9 +25,9 @@ import psycopg2
if len(sys.argv) > 1:
DSN = sys.argv[1]
print "Opening connection using dsn:", DSN
print("Opening connection using dsn:", DSN)
conn = psycopg2.connect(DSN)
print "Encoding for this connection is", conn.encoding
print("Encoding for this connection is", conn.encoding)
curs = conn.cursor()
try:
@ -52,20 +53,20 @@ curs.execute("""INSERT INTO test_binary
# now we try to extract the images as simple text strings
print "Extracting the images as strings..."
print("Extracting the images as strings...")
curs.execute("SELECT * FROM test_binary")
for row in curs.fetchall():
name, ext = row[1].split('.')
new_name = name + '_S.' + ext
print " writing %s to %s ..." % (name+'.'+ext, new_name),
print(" writing %s to %s ..." % (name+'.'+ext, new_name), end=' ')
open(new_name, 'wb').write(row[2])
print "done"
print " python type of image data is", type(row[2])
print("done")
print(" python type of image data is", type(row[2]))
# extract exactly the same data but using a binary cursor
print "Extracting the images using a binary cursor:"
print("Extracting the images using a binary cursor:")
curs.execute("""DECLARE zot CURSOR FOR
SELECT img, name FROM test_binary FOR READ ONLY""")
@ -74,10 +75,10 @@ curs.execute("""FETCH ALL FROM zot""")
for row in curs.fetchall():
name, ext = row[1].split('.')
new_name = name + '_B.' + ext
print " writing %s to %s ..." % (name+'.'+ext, new_name),
print(" writing %s to %s ..." % (name+'.'+ext, new_name), end=' ')
open(new_name, 'wb').write(row[0])
print "done"
print " python type of image data is", type(row[0])
print("done")
print(" python type of image data is", type(row[0]))
# this rollback is required because we can't drop a table with a binary cursor
# declared and still open
@ -86,4 +87,4 @@ conn.rollback()
curs.execute("DROP TABLE test_binary")
conn.commit()
print "\nNow try to load the new images, to check it worked!"
print("\nNow try to load the new images, to check it worked!")

View File

@ -18,6 +18,7 @@
DSN = 'dbname=test'
## don't modify anything below this line (except for experimenting)
from __future__ import print_function
import sys
import os
@ -27,9 +28,9 @@ import psycopg2
if len(sys.argv) > 1:
DSN = sys.argv[1]
print "Opening connection using dsn:", DSN
print("Opening connection using dsn:", DSN)
conn = psycopg2.connect(DSN)
print "Encoding for this connection is", conn.encoding
print("Encoding for this connection is", conn.encoding)
curs = conn.cursor()
try:
@ -51,52 +52,52 @@ conn.commit()
# copy_to using defaults
io = open('copy_to.txt', 'w')
curs.copy_to(io, 'test_copy')
print "1) Copy %d records into file object using defaults: " % len (data) + \
"sep = \\t and null = \\N"
print("1) Copy %d records into file object using defaults: " % len (data) + \
"sep = \\t and null = \\N")
io.close()
rows = open('copy_to.txt', 'r').readlines()
print " File has %d rows:" % len(rows)
print(" File has %d rows:" % len(rows))
for r in rows:
print " ", r,
print(" ", r, end=' ')
# copy_to using custom separator
io = open('copy_to.txt', 'w')
curs.copy_to(io, 'test_copy', ':')
print "2) Copy %d records into file object using sep = :" % len(data)
print("2) Copy %d records into file object using sep = :" % len(data))
io.close()
rows = open('copy_to.txt', 'r').readlines()
print " File has %d rows:" % len(rows)
print(" File has %d rows:" % len(rows))
for r in rows:
print " ", r,
print(" ", r, end=' ')
# copy_to using custom null identifier
io = open('copy_to.txt', 'w')
curs.copy_to(io, 'test_copy', null='NULL')
print "3) Copy %d records into file object using null = NULL" % len(data)
print("3) Copy %d records into file object using null = NULL" % len(data))
io.close()
rows = open('copy_to.txt', 'r').readlines()
print " File has %d rows:" % len(rows)
print(" File has %d rows:" % len(rows))
for r in rows:
print " ", r,
print(" ", r, end=' ')
# copy_to using custom separator and null identifier
io = open('copy_to.txt', 'w')
curs.copy_to(io, 'test_copy', ':', 'NULL')
print "4) Copy %d records into file object using sep = : and null ) NULL" % \
len(data)
print("4) Copy %d records into file object using sep = : and null ) NULL" % \
len(data))
io.close()
rows = open('copy_to.txt', 'r').readlines()
print " File has %d rows:" % len(rows)
print(" File has %d rows:" % len(rows))
for r in rows:
print " ", r,
print(" ", r, end=' ')
curs.execute("DROP TABLE test_copy")
os.unlink('copy_to.txt')

View File

@ -86,12 +86,12 @@ persistent_fields = {'Album': ['album_id', 'creation_time', 'binary_data'],
'Order': ['order_id', 'items', 'price']
}
print adapt(Album()).generateInsert()
print adapt(Album()).generateInsert()
print adapt(Album()).generateInsert()
print adapt(Order()).generateInsert()
print adapt(Order()).generateInsert()
print adapt(Order()).generateInsert()
print(adapt(Album()).generateInsert())
print(adapt(Album()).generateInsert())
print(adapt(Album()).generateInsert())
print(adapt(Order()).generateInsert())
print(adapt(Order()).generateInsert())
print(adapt(Order()).generateInsert())
"""
- Discussion

View File

@ -25,41 +25,41 @@ import psycopg2.extras
if len(sys.argv) > 1:
DSN = sys.argv[1]
print "Opening connection using dsn:", DSN
print("Opening connection using dsn:", DSN)
conn = psycopg2.connect(DSN)
print "Encoding for this connection is", conn.encoding
print("Encoding for this connection is", conn.encoding)
curs = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
curs.execute("SELECT 1 AS foo, 'cip' AS bar, date(now()) as zot")
print "Cursor's row factory is", curs.row_factory
print("Cursor's row factory is", curs.row_factory)
data = curs.fetchone()
print "The type of the data row is", type(data)
print "Some data accessed both as tuple and dict:"
print " ", data['foo'], data['bar'], data['zot']
print " ", data[0], data[1], data[2]
print("The type of the data row is", type(data))
print("Some data accessed both as tuple and dict:")
print(" ", data['foo'], data['bar'], data['zot'])
print(" ", data[0], data[1], data[2])
# execute another query and demostrate we can still access the row
curs.execute("SELECT 2 AS foo")
print "The type of the data row is", type(data)
print "Some more data accessed both as tuple and dict:"
print " ", data['foo'], data['bar'], data['zot']
print " ", data[0], data[1], data[2]
print("The type of the data row is", type(data))
print("Some more data accessed both as tuple and dict:")
print(" ", data['foo'], data['bar'], data['zot'])
print(" ", data[0], data[1], data[2])
curs = conn.cursor(cursor_factory=psycopg2.extras.RealDictCursor)
curs.execute("SELECT 1 AS foo, 'cip' AS bar, date(now()) as zot")
print "Cursor's row factory is", curs.row_factory
print("Cursor's row factory is", curs.row_factory)
data = curs.fetchone()
print "The type of the data row is", type(data)
print "Some data accessed both as tuple and dict:"
print " ", data['foo'], data['bar'], data['zot']
print " ", "No access using indices: this is a specialized cursor."
print("The type of the data row is", type(data))
print("Some data accessed both as tuple and dict:")
print(" ", data['foo'], data['bar'], data['zot'])
print(" ", "No access using indices: this is a specialized cursor.")
# execute another query and demostrate we can still access the row
curs.execute("SELECT 2 AS foo")
print "The type of the data row is", type(data)
print "Some more data accessed both as tuple and dict:"
print " ", data['foo'], data['bar'], data['zot']
print " ", "No access using indices: this is a specialized cursor."
print("The type of the data row is", type(data))
print("Some more data accessed both as tuple and dict:")
print(" ", data['foo'], data['bar'], data['zot'])
print(" ", "No access using indices: this is a specialized cursor.")

View File

@ -28,7 +28,7 @@ from psycopg2.extensions import adapt
if len(sys.argv) > 1:
DSN = sys.argv[1]
print "Opening connection using dsn:", DSN
print("Opening connection using dsn:", DSN)
conn = psycopg2.connect(DSN)
curs = conn.cursor()
@ -52,9 +52,9 @@ mx1 = (
from psycopg2.extensions import adapt
import psycopg2.extras
print adapt(mx1)
print(adapt(mx1))
print "Inserting mx.DateTime values..."
print("Inserting mx.DateTime values...")
curs.execute("INSERT INTO test_dt VALUES (%s, %s, %s, %s, %s)", mx1)
# build and insert some values using the datetime adapters
@ -65,11 +65,11 @@ dt1 = (
datetime.datetime(2004, 10, 19, 0, 11, 17, 500000),
datetime.timedelta(13, 15*3600+17*60+59, 900000))
print "Inserting Python datetime values..."
print("Inserting Python datetime values...")
curs.execute("INSERT INTO test_dt VALUES (%s, %s, %s, %s, %s)", dt1)
# now extract the row from database and print them
print "Extracting values inserted with mx.DateTime wrappers:"
print("Extracting values inserted with mx.DateTime wrappers:")
curs.execute("SELECT d, t, dt, z FROM test_dt WHERE k = 1")
for n, x in zip(mx1[1:], curs.fetchone()):
try:
@ -80,10 +80,10 @@ for n, x in zip(mx1[1:], curs.fetchone()):
except:
s = repr(n) + "\n -> " + str(adapt(n)) + \
"\n -> " + repr(x) + "\n -> " + str(x)
print s
print
print(s)
print()
print "Extracting values inserted with Python datetime wrappers:"
print("Extracting values inserted with Python datetime wrappers:")
curs.execute("SELECT d, t, dt, z FROM test_dt WHERE k = 2")
for n, x in zip(dt1[1:], curs.fetchone()):
try:
@ -92,8 +92,8 @@ for n, x in zip(dt1[1:], curs.fetchone()):
s = repr(n) + "\n -> " + repr(x) + "\n -> " + x.isoformat()
except:
s = repr(n) + "\n -> " + repr(x) + "\n -> " + str(x)
print s
print
print(s)
print()
curs.execute("DROP TABLE test_dt")
conn.commit()

View File

@ -26,80 +26,80 @@ import psycopg2.extensions
if len(sys.argv) > 1:
DSN = sys.argv[1]
print "Opening connection using dsn:", DSN
print("Opening connection using dsn:", DSN)
conn = psycopg2.connect(DSN)
print "Initial encoding for this connection is", conn.encoding
print("Initial encoding for this connection is", conn.encoding)
print "\n** This example is supposed to be run in a UNICODE terminal! **\n"
print("\n** This example is supposed to be run in a UNICODE terminal! **\n")
print "Available encodings:"
print("Available encodings:")
encs = psycopg2.extensions.encodings.items()
encs.sort()
for a, b in encs:
print " ", a, "<->", b
print(" ", a, "<->", b)
print "Using STRING typecaster"
print "Setting backend encoding to LATIN1 and executing queries:"
print("Using STRING typecaster")
print("Setting backend encoding to LATIN1 and executing queries:")
conn.set_client_encoding('LATIN1')
curs = conn.cursor()
curs.execute("SELECT %s::TEXT AS foo", ('àèìòù',))
x = curs.fetchone()[0]
print " ->", unicode(x, 'latin-1').encode('utf-8'), type(x)
print(" ->", unicode(x, 'latin-1').encode('utf-8'), type(x))
curs.execute("SELECT %s::TEXT AS foo", (u'àèìòù',))
x = curs.fetchone()[0]
print " ->", unicode(x, 'latin-1').encode('utf-8'), type(x)
print(" ->", unicode(x, 'latin-1').encode('utf-8'), type(x))
print "Setting backend encoding to UTF8 and executing queries:"
print("Setting backend encoding to UTF8 and executing queries:")
conn.set_client_encoding('UNICODE')
curs = conn.cursor()
curs.execute("SELECT %s::TEXT AS foo", (u'àèìòù'.encode('utf-8'),))
x = curs.fetchone()[0]
print " ->", x, type(x)
print(" ->", x, type(x))
curs.execute("SELECT %s::TEXT AS foo", (u'àèìòù',))
x = curs.fetchone()[0]
print " ->", x, type(x)
print(" ->", x, type(x))
print "Using UNICODE typecaster"
print("Using UNICODE typecaster")
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE)
print "Setting backend encoding to LATIN1 and executing queries:"
print("Setting backend encoding to LATIN1 and executing queries:")
conn.set_client_encoding('LATIN1')
curs = conn.cursor()
curs.execute("SELECT %s::TEXT AS foo", ('àèìòù',))
x = curs.fetchone()[0]
print " ->", x.encode('utf-8'), ":", type(x)
print(" ->", x.encode('utf-8'), ":", type(x))
curs.execute("SELECT %s::TEXT AS foo", (u'àèìòù',))
x = curs.fetchone()[0]
print " ->", x.encode('utf-8'), ":", type(x)
print(" ->", x.encode('utf-8'), ":", type(x))
print "Setting backend encoding to UTF8 and executing queries:"
print("Setting backend encoding to UTF8 and executing queries:")
conn.set_client_encoding('UNICODE')
curs = conn.cursor()
curs.execute("SELECT %s::TEXT AS foo", (u'àèìòù'.encode('utf-8'),))
x = curs.fetchone()[0]
print " ->", x.encode('utf-8'), ":", type(x)
print(" ->", x.encode('utf-8'), ":", type(x))
curs.execute("SELECT %s::TEXT AS foo", (u'àèìòù',))
x = curs.fetchone()[0]
print " ->", x.encode('utf-8'), ":", type(x)
print(" ->", x.encode('utf-8'), ":", type(x))
print "Executing full UNICODE queries"
print("Executing full UNICODE queries")
print "Setting backend encoding to LATIN1 and executing queries:"
print("Setting backend encoding to LATIN1 and executing queries:")
conn.set_client_encoding('LATIN1')
curs = conn.cursor()
curs.execute(u"SELECT %s::TEXT AS foo", ('àèìòù',))
x = curs.fetchone()[0]
print " ->", x.encode('utf-8'), ":", type(x)
print(" ->", x.encode('utf-8'), ":", type(x))
curs.execute(u"SELECT %s::TEXT AS foo", (u'àèìòù',))
x = curs.fetchone()[0]
print " ->", x.encode('utf-8'), ":", type(x)
print(" ->", x.encode('utf-8'), ":", type(x))
print "Setting backend encoding to UTF8 and executing queries:"
print("Setting backend encoding to UTF8 and executing queries:")
conn.set_client_encoding('UNICODE')
curs = conn.cursor()
curs.execute(u"SELECT %s::TEXT AS foo", (u'àèìòù'.encode('utf-8'),))
x = curs.fetchone()[0]
print " ->", x.encode('utf-8'), ":", type(x)
print(" ->", x.encode('utf-8'), ":", type(x))
curs.execute(u"SELECT %s::TEXT AS foo", (u'àèìòù',))
x = curs.fetchone()[0]
print " ->", x.encode('utf-8'), ":", type(x)
print(" ->", x.encode('utf-8'), ":", type(x))

View File

@ -24,9 +24,9 @@ import psycopg2
if len(sys.argv) > 1:
DSN = sys.argv[1]
print "Opening connection using dsn:", DSN
print("Opening connection using dsn:", DSN)
conn = psycopg2.connect(DSN)
print "Encoding for this connection is", conn.encoding
print("Encoding for this connection is", conn.encoding)
curs = conn.cursor()
try:
@ -68,12 +68,12 @@ conn.commit()
ncurs = conn.cursor("crs")
ncurs.execute("SELECT * FROM test_fetch")
print "First 10 rows:", flatten(ncurs.fetchmany(10))
print("First 10 rows:", flatten(ncurs.fetchmany(10)))
ncurs.scroll(-5)
print "Moved back cursor by 5 rows (to row 5.)"
print "Another 10 rows:", flatten(ncurs.fetchmany(10))
print "Another one:", list(ncurs.fetchone())
print "The remaining rows:", flatten(ncurs.fetchall())
print("Moved back cursor by 5 rows (to row 5.)")
print("Another 10 rows:", flatten(ncurs.fetchmany(10)))
print("Another one:", list(ncurs.fetchone()))
print("The remaining rows:", flatten(ncurs.fetchall()))
conn.rollback()
curs.execute("DROP TABLE test_fetch")

View File

@ -23,7 +23,7 @@ import sys, psycopg2
if len(sys.argv) > 1:
DSN = sys.argv[1]
print "Opening connection using dsn:", DSN
print("Opening connection using dsn:", DSN)
conn = psycopg2.connect(DSN)
curs = conn.cursor()
@ -42,18 +42,18 @@ curs.execute("""INSERT INTO test_oid
VALUES (%(name)s, %(surname)s)""", data[0])
foid = curs.lastrowid
print "Oid for %(name)s %(surname)s" % data[0], "is", foid
print("Oid for %(name)s %(surname)s" % data[0], "is", foid)
curs.execute("""INSERT INTO test_oid
VALUES (%(name)s, %(surname)s)""", data[1])
moid = curs.lastrowid
print "Oid for %(name)s %(surname)s" % data[1], "is", moid
print("Oid for %(name)s %(surname)s" % data[1], "is", moid)
curs.execute("SELECT * FROM test_oid WHERE oid = %s", (foid,))
print "Oid", foid, "selected %s %s" % curs.fetchone()
print("Oid", foid, "selected %s %s" % curs.fetchone())
curs.execute("SELECT * FROM test_oid WHERE oid = %s", (moid,))
print "Oid", moid, "selected %s %s" % curs.fetchone()
print("Oid", moid, "selected %s %s" % curs.fetchone())
curs.execute("DROP TABLE test_oid")
conn.commit()

View File

@ -24,68 +24,68 @@ import psycopg2
if len(sys.argv) > 1:
DSN = sys.argv[1]
print "Opening connection using dsn:", DSN
print("Opening connection using dsn:", DSN)
conn = psycopg2.connect(DSN)
print "Encoding for this connection is", conn.encoding
print("Encoding for this connection is", conn.encoding)
# this will create a large object with a new random oid, we'll
# use it to make some basic tests about read/write and seek.
lobj = conn.lobject()
loid = lobj.oid
print "Created a new large object with oid", loid
print("Created a new large object with oid", loid)
print "Manually importing some binary data into the object:"
print("Manually importing some binary data into the object:")
data = open("somehackers.jpg").read()
len = lobj.write(data)
print " imported", len, "bytes of data"
print(" imported", len, "bytes of data")
conn.commit()
print "Trying to (re)open large object with oid", loid
print("Trying to (re)open large object with oid", loid)
lobj = conn.lobject(loid)
print "Manually exporting the data from the lobject:"
print("Manually exporting the data from the lobject:")
data1 = lobj.read()
len = lobj.tell()
lobj.seek(0, 0)
data2 = lobj.read()
if data1 != data2:
print "ERROR: read after seek returned different data"
print("ERROR: read after seek returned different data")
open("somehackers_lobject1.jpg", 'wb').write(data1)
print " written", len, "bytes of data to somehackers_lobject1.jpg"
print(" written", len, "bytes of data to somehackers_lobject1.jpg")
lobj.unlink()
print "Large object with oid", loid, "removed"
print("Large object with oid", loid, "removed")
conn.commit()
# now we try to use the import and export functions to do the same
lobj = conn.lobject(0, 'n', 0, "somehackers.jpg")
loid = lobj.oid
print "Imported a new large object with oid", loid
print("Imported a new large object with oid", loid)
conn.commit()
print "Trying to (re)open large object with oid", loid
print("Trying to (re)open large object with oid", loid)
lobj = conn.lobject(loid, 'n')
print "Using export() to export the data from the large object:"
print("Using export() to export the data from the large object:")
lobj.export("somehackers_lobject2.jpg")
print " exported large object to somehackers_lobject2.jpg"
print(" exported large object to somehackers_lobject2.jpg")
lobj.unlink()
print "Large object with oid", loid, "removed"
print("Large object with oid", loid, "removed")
conn.commit()
# this will create a very large object with a new random oid.
lobj = conn.lobject()
loid = lobj.oid
print "Created a new large object with oid", loid
print("Created a new large object with oid", loid)
print "Manually importing a lot of data into the object:"
print("Manually importing a lot of data into the object:")
data = "data" * 1000000
len = lobj.write(data)
print " imported", len, "bytes of data"
print(" imported", len, "bytes of data")
conn.rollback()
print "\nNow try to load the new images, to check it worked!"
print("\nNow try to load the new images, to check it worked!")

View File

@ -24,10 +24,10 @@ import sys, psycopg2
if len(sys.argv) > 1:
DSN = sys.argv[1]
print "Opening connection using dsn:", DSN
print("Opening connection using dsn:", DSN)
conn = psycopg2.connect(DSN)
print "Encoding for this connection is", conn.encoding
print("Encoding for this connection is", conn.encoding)
curs = conn.cursor()
curs.execute("SELECT %(foo)s AS foo", {'foo':'bar'})
@ -37,11 +37,11 @@ curs.execute("SELECT %(foo)s AS foo", {'foo':42})
curs.execute("SELECT %(foo)s AS foo", {'foo':u'yatt<EFBFBD>!'})
curs.execute("SELECT %(foo)s AS foo", {'foo':u'bar'})
print curs.mogrify("SELECT %(foo)s AS foo", {'foo':'bar'})
print curs.mogrify("SELECT %(foo)s AS foo", {'foo':None})
print curs.mogrify("SELECT %(foo)s AS foo", {'foo':True})
print curs.mogrify("SELECT %(foo)s AS foo", {'foo':42})
print curs.mogrify("SELECT %(foo)s AS foo", {'foo':u'yatt<EFBFBD>!'})
print curs.mogrify("SELECT %(foo)s AS foo", {'foo':u'bar'})
print(curs.mogrify("SELECT %(foo)s AS foo", {'foo':'bar'}))
print(curs.mogrify("SELECT %(foo)s AS foo", {'foo':None}))
print(curs.mogrify("SELECT %(foo)s AS foo", {'foo':True}))
print(curs.mogrify("SELECT %(foo)s AS foo", {'foo':42}))
print(curs.mogrify("SELECT %(foo)s AS foo", {'foo':u'yatt<EFBFBD>!'}))
print(curs.mogrify("SELECT %(foo)s AS foo", {'foo':u'bar'}))
conn.rollback()

View File

@ -122,5 +122,5 @@ register_adapter(int, AsIs)
# the SQL_IN class by calling psycopg's adapt() directly:
if __name__ == '__main__':
print "Note how the string will be SQL-quoted, but the number will not:"
print psycoadapt(("this is an 'sql quoted' str\\ing", 1, 2.0))
print("Note how the string will be SQL-quoted, but the number will not:")
print(psycoadapt(("this is an 'sql quoted' str\\ing", 1, 2.0)))

View File

@ -26,20 +26,20 @@ from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
if len(sys.argv) > 1:
DSN = sys.argv[1]
print "Opening connection using dsn:", DSN
print("Opening connection using dsn:", DSN)
conn = psycopg2.connect(DSN)
print "Encoding for this connection is", conn.encoding
print("Encoding for this connection is", conn.encoding)
conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
curs = conn.cursor()
curs.execute("listen test")
print "Waiting for 'NOTIFY test'"
print("Waiting for 'NOTIFY test'")
while 1:
if select.select([conn],[],[],5)==([],[],[]):
print "Timeout"
print("Timeout")
else:
conn.poll()
while conn.notifies:
print "Got NOTIFY:", conn.notifies.pop()
print("Got NOTIFY:", conn.notifies.pop())

View File

@ -30,17 +30,17 @@ import psycopg2
if len(sys.argv) > 1:
DSN = sys.argv[1]
print "Opening connection using dsn:", DSN
print("Opening connection using dsn:", DSN)
conn = psycopg2.connect(DSN)
print "Encoding for this connection is", conn.encoding
print("Encoding for this connection is", conn.encoding)
curs = conn.cursor()
curs.execute("SELECT 1 AS foo")
print curs.fetchone()
print(curs.fetchone())
curs.execute("SELECT 1 AS foo")
print curs.fetchmany()
print(curs.fetchmany())
curs.execute("SELECT 1 AS foo")
print curs.fetchall()
print(curs.fetchall())
conn.rollback()

View File

@ -45,7 +45,7 @@ if len(sys.argv) > 1:
if len(sys.argv) > 2:
MODE = int(sys.argv[2])
print "Opening connection using dsn:", DSN
print("Opening connection using dsn:", DSN)
conn = psycopg2.connect(DSN)
curs = conn.cursor()
@ -77,7 +77,7 @@ def insert_func(conn_or_pool, rows):
if MODE == 1:
conn_or_pool.putconn(conn)
s = name + ": COMMIT STEP " + str(i)
print s
print(s)
if MODE == 1:
conn = conn_or_pool.getconn()
c = conn.cursor()
@ -85,8 +85,8 @@ def insert_func(conn_or_pool, rows):
c.execute("INSERT INTO test_threads VALUES (%s, %s, %s)",
(str(i), i, float(i)))
except psycopg2.ProgrammingError as err:
print name, ": an error occurred; skipping this insert"
print err
print(name, ": an error occurred; skipping this insert")
print(err)
conn.commit()
## a nice select function that prints the current number of rows in the

View File

@ -29,14 +29,14 @@ import psycopg2.extensions
if len(sys.argv) > 1:
DSN = sys.argv[1]
print "Opening connection using dsn:", DSN
print("Opening connection using dsn:", DSN)
conn = psycopg2.connect(DSN)
print "Encoding for this connection is", conn.encoding
print("Encoding for this connection is", conn.encoding)
curs = conn.cursor()
curs.execute("SELECT 'text'::text AS foo")
textoid = curs.description[0][1]
print "Oid for the text datatype is", textoid
print("Oid for the text datatype is", textoid)
def castA(s, curs):
if s is not None: return "(A) " + s
@ -48,18 +48,18 @@ TYPEB = psycopg2.extensions.new_type((textoid,), "TYPEB", castB)
curs = conn.cursor()
curs.execute("SELECT 'some text.'::text AS foo")
print "Some text from plain connection:", curs.fetchone()[0]
print("Some text from plain connection:", curs.fetchone()[0])
psycopg2.extensions.register_type(TYPEA, conn)
curs = conn.cursor()
curs.execute("SELECT 'some text.'::text AS foo")
print "Some text from connection with typecaster:", curs.fetchone()[0]
print("Some text from connection with typecaster:", curs.fetchone()[0])
curs = conn.cursor()
psycopg2.extensions.register_type(TYPEB, curs)
curs.execute("SELECT 'some text.'::text AS foo")
print "Some text from cursor with typecaster:", curs.fetchone()[0]
print("Some text from cursor with typecaster:", curs.fetchone()[0])
curs = conn.cursor()
curs.execute("SELECT 'some text.'::text AS foo")
print "Some text from connection with typecaster again:", curs.fetchone()[0]
print("Some text from connection with typecaster again:", curs.fetchone()[0])

View File

@ -28,7 +28,7 @@ from psycopg2.tz import ZERO, LOCAL, FixedOffsetTimezone
if len(sys.argv) > 1:
DSN = sys.argv[1]
print "Opening connection using dsn:", DSN
print("Opening connection using dsn:", DSN)
conn = psycopg2.connect(DSN)
curs = conn.cursor()
@ -42,27 +42,27 @@ conn.commit()
d = datetime.datetime(1971, 10, 19, 22, 30, 0, tzinfo=LOCAL)
curs.execute("INSERT INTO test_tz VALUES (%s)", (d,))
print "Inserted timestamp with timezone:", d
print "Time zone:", d.tzinfo.tzname(d), "offset:", d.tzinfo.utcoffset(d)
print("Inserted timestamp with timezone:", d)
print("Time zone:", d.tzinfo.tzname(d), "offset:", d.tzinfo.utcoffset(d))
tz = FixedOffsetTimezone(-5*60, "EST")
d = datetime.datetime(1971, 10, 19, 22, 30, 0, tzinfo=tz)
curs.execute("INSERT INTO test_tz VALUES (%s)", (d,))
print "Inserted timestamp with timezone:", d
print "Time zone:", d.tzinfo.tzname(d), "offset:", d.tzinfo.utcoffset(d)
print("Inserted timestamp with timezone:", d)
print("Time zone:", d.tzinfo.tzname(d), "offset:", d.tzinfo.utcoffset(d))
curs.execute("SELECT * FROM test_tz")
d = curs.fetchone()[0]
curs.execute("INSERT INTO test_tz VALUES (%s)", (d,))
print "Inserted SELECTed timestamp:", d
print "Time zone:", d.tzinfo.tzname(d), "offset:", d.tzinfo.utcoffset(d)
print("Inserted SELECTed timestamp:", d)
print("Time zone:", d.tzinfo.tzname(d), "offset:", d.tzinfo.utcoffset(d))
curs.execute("SELECT * FROM test_tz")
for d in curs:
u = d[0].utcoffset() or ZERO
print "UTC time: ", d[0] - u
print "Local time:", d[0]
print "Time zone:", d[0].tzinfo.tzname(d[0]), d[0].tzinfo.utcoffset(d[0])
print("UTC time: ", d[0] - u)
print("Local time:", d[0])
print("Time zone:", d[0].tzinfo.tzname(d[0]), d[0].tzinfo.utcoffset(d[0]))
curs.execute("DROP TABLE test_tz")

View File

@ -33,9 +33,9 @@ import psycopg2.extras
if len(sys.argv) > 1:
DSN = sys.argv[1]
print "Opening connection using dsn:", DSN
print("Opening connection using dsn:", DSN)
conn = psycopg2.connect(DSN)
print "Initial encoding for this connection is", conn.encoding
print("Initial encoding for this connection is", conn.encoding)
curs = conn.cursor()
try:
@ -98,7 +98,7 @@ class Rect(object):
# here we select from the empty table, just to grab the description
curs.execute("SELECT b FROM test_cast WHERE 0=1")
boxoid = curs.description[0][1]
print "Oid for the box datatype is", boxoid
print("Oid for the box datatype is", boxoid)
# and build the user cast object
BOX = psycopg2.extensions.new_type((boxoid,), "BOX", Rect)
@ -113,14 +113,14 @@ for i in range(100):
whrandom.randint(0,100), whrandom.randint(0,100))
curs.execute("INSERT INTO test_cast VALUES ('%(p1)s', '%(p2)s', %(box)s)",
{'box':b, 'p1':p1, 'p2':p2})
print "Added 100 boxed to the database"
print("Added 100 boxed to the database")
# select and print all boxes with at least one point inside
curs.execute("SELECT b FROM test_cast WHERE p1 @ b OR p2 @ b")
boxes = curs.fetchall()
print "Found %d boxes with at least a point inside:" % len(boxes)
print("Found %d boxes with at least a point inside:" % len(boxes))
for box in boxes:
print " ", box[0].show()
print(" ", box[0].show())
curs.execute("DROP TABLE test_cast")
conn.commit()

View File

@ -27,4 +27,4 @@ curs = conn.cursor()
#print curs.fetchone()
curs.execute("SELECT %s", ([1,2,None],))
print curs.fetchone()
print(curs.fetchone())

View File

@ -15,7 +15,7 @@ curs = conn.cursor()
def sleep(curs):
while not curs.isready():
print "."
print(".")
time.sleep(.1)
#curs.execute("""
@ -24,12 +24,12 @@ def sleep(curs):
# FOR READ ONLY;""", async = 1)
curs.execute("SELECT now() AS foo", async=1)
sleep(curs)
print curs.fetchall()
print(curs.fetchall())
#curs.execute("""
# FETCH FORWARD 1 FROM zz;""", async = 1)
curs.execute("SELECT now() AS bar", async=1)
print curs.fetchall()
print(curs.fetchall())
curs.execute("SELECT now() AS bar")
sleep(curs)

View File

@ -17,7 +17,7 @@ def query_worker(dsn):
break
if len(sys.argv) != 2:
print 'usage: %s DSN' % sys.argv[0]
print('usage: %s DSN' % sys.argv[0])
sys.exit(1)
th = threading.Thread(target=query_worker, args=(sys.argv[1],))
th.setDaemon(True)

View File

@ -12,4 +12,4 @@ o = psycopg2.connect("dbname=test")
c = o.cursor()
c.execute("SELECT NULL::decimal(10,2)")
n = c.fetchone()[0]
print n, type(n)
print(n, type(n))

View File

@ -4,15 +4,15 @@ con = psycopg2.connect("dbname=test")
cur = con.cursor()
cur.execute("SELECT %s::regtype::oid", ('bytea', ))
print cur.fetchone()[0]
print(cur.fetchone()[0])
# 17
cur.execute("CREATE DOMAIN thing AS bytea")
cur.execute("SELECT %s::regtype::oid", ('thing', ))
print cur.fetchone()[0]
print(cur.fetchone()[0])
#62148
cur.execute("CREATE TABLE thingrel (thingcol thing)")
cur.execute("SELECT * FROM thingrel")
print cur.description
print(cur.description)
#(('thingcol', 17, None, -1, None, None, None),)

View File

@ -5,14 +5,14 @@ c = o.cursor()
def sql():
c.execute("SELECT 1.23 AS foo")
print 1, c.fetchone()
print(1, c.fetchone())
#print c.description
c.execute("SELECT 1.23::float AS foo")
print 2, c.fetchone()
print(2, c.fetchone())
#print c.description
print "BEFORE"
print("BEFORE")
sql()
import gtk
print "AFTER"
print("AFTER")
sql()

View File

@ -6,8 +6,8 @@ curs = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
curs.execute("SELECT '2005-2-12'::date AS foo, 'boo!' as bar")
for x in curs.fetchall():
print type(x), x[0], x[1], x['foo'], x['bar']
print(type(x), x[0], x[1], x['foo'], x['bar'])
curs.execute("SELECT '2005-2-12'::date AS foo, 'boo!' as bar")
for x in curs:
print type(x), x[0], x[1], x['foo'], x['bar']
print(type(x), x[0], x[1], x['foo'], x['bar'])

View File

@ -43,7 +43,7 @@ def leak():
row = {'foo': i}
rows.append(row)
count += 1
print "loop count:", count
print("loop count:", count)
cursor.executemany(insert, rows)
connection.commit()
except psycopg2.IntegrityError:
@ -59,7 +59,7 @@ def noleak():
while 1:
try:
count += 1
print "loop count:", count
print("loop count:", count)
cursor.executemany(insert, rows)
connection.commit()
except psycopg2.IntegrityError:
@ -72,10 +72,10 @@ try:
elif 'noleak' == sys.argv[1]:
run_function = noleak
else:
print usage
print(usage)
sys.exit()
except IndexError:
print usage
print(usage)
sys.exit()
# Run leak() or noleak(), whichever was indicated on the command line

View File

@ -18,8 +18,8 @@ class O(object):
o = O('R%', second='S%')
print o[0]
print o['second']
print(o[0])
print(o['second'])
#-------------------------------------------------------------------------------
@ -40,4 +40,4 @@ cursor.execute("""
""", (o,))
for row in cursor:
print row
print(row)

View File

@ -15,10 +15,10 @@ curs = conn.cursor()
curs.execute("SELECT reffunc2()")
portal = curs.fetchone()[0]
print portal.fetchone()
print portal.fetchmany(2)
print(portal.fetchone())
print(portal.fetchmany(2))
portal.scroll(0, 'absolute')
print portal.fetchall()
print(portal.fetchall())
#print curs.rowcount

View File

@ -3,7 +3,7 @@ class B(object):
if x: self._o = True
else: self._o = False
def __getattribute__(self, attr):
print "ga called", attr
print("ga called", attr)
return object.__getattribute__(self, attr)
def _sqlquote(self):
if self._o:

View File

@ -8,4 +8,4 @@ for i in range(20000):
datafile = os.popen('ps -p %s -o rss' % os.getpid())
line = datafile.readlines(2)[1].strip()
datafile.close()
print str(i) + '\t' + line
print(str(i) + '\t' + line)

View File

@ -33,7 +33,7 @@ def g():
line = datafile.readlines(2)[1].strip()
datafile.close()
n = 30
print str(k*n) + '\t' + line
print(str(k*n) + '\t' + line)
k += 1
while threading.activeCount()>1:

View File

@ -10,26 +10,26 @@ conn = psycopg2.connect("dbname=test")
#conn.set_client_encoding("xxx")
curs = conn.cursor()
curs.execute("SELECT '2005-2-12'::date AS foo")
print curs.fetchall()
print(curs.fetchall())
curs.execute("SELECT '10:23:60'::time AS foo")
print curs.fetchall()
print(curs.fetchall())
curs.execute("SELECT '10:23:59.895342'::time AS foo")
print curs.fetchall()
print(curs.fetchall())
curs.execute("SELECT '0:0:12.31423'::time with time zone AS foo")
print curs.fetchall()
print(curs.fetchall())
curs.execute("SELECT '0:0:12+01:30'::time with time zone AS foo")
print curs.fetchall()
print(curs.fetchall())
curs.execute("SELECT '2005-2-12 10:23:59.895342'::timestamp AS foo")
print curs.fetchall()
print(curs.fetchall())
curs.execute("SELECT '2005-2-12 10:23:59.895342'::timestamp with time zone AS foo")
print curs.fetchall()
print(curs.fetchall())
#print curs.fetchmany(2)
#print curs.fetchall()
def sleep(curs):
while not curs.isready():
print "."
print(".")
time.sleep(.1)
#curs.execute("""

View File

@ -4,5 +4,5 @@ import psycopg2.extras
conn = psycopg2.connect("dbname=test")
curs = conn.cursor()
curs.execute("SELECT true AS foo WHERE 'a' in %s", (("aa", "bb"),))
print curs.fetchall()
print curs.query
print(curs.fetchall())
print(curs.query)

View File

@ -52,7 +52,7 @@ signal.signal(signal.SIGHUP, handler)
def worker():
while 1:
print "I'm working"
print("I'm working")
sleep(1)
eventlet.spawn(worker)
@ -61,21 +61,21 @@ eventlet.spawn(worker)
# You can unplug the network cable etc. here.
# Kill -HUP will raise an exception in the callback.
print "PID", os.getpid()
print("PID", os.getpid())
conn = psycopg2.connect(DSN)
curs = conn.cursor()
try:
for i in range(1000):
curs.execute("select %s, pg_sleep(1)", (i,))
r = curs.fetchone()
print "selected", r
print("selected", r)
except BaseException, e:
print "got exception:", e.__class__.__name__, e
print("got exception:", e.__class__.__name__, e)
if conn.closed:
print "the connection is closed"
print("the connection is closed")
else:
conn.rollback()
curs.execute("select 1")
print curs.fetchone()
print(curs.fetchone())

View File

@ -5,27 +5,27 @@ import signal
import warnings
import psycopg2
print "Testing psycopg2 version %s" % psycopg2.__version__
print("Testing psycopg2 version %s" % psycopg2.__version__)
dbname = os.environ.get('PSYCOPG2_TESTDB', 'psycopg2_test')
conn = psycopg2.connect("dbname=%s" % dbname)
curs = conn.cursor()
curs.isready()
print "Now restart the test postgresql server to drop all connections, press enter when done."
print("Now restart the test postgresql server to drop all connections, press enter when done.")
raw_input()
try:
curs.isready() # No need to test return value
curs.isready()
except:
print "Test passed"
print("Test passed")
sys.exit(0)
if curs.isready():
print "Warning: looks like the connection didn't get killed. This test is probably in-effective"
print "Test inconclusive"
print("Warning: looks like the connection didn't get killed. This test is probably in-effective")
print("Test inconclusive")
sys.exit(1)
gc.collect() # used to error here
print "Test Passed"
print("Test Passed")

View File

@ -5,4 +5,4 @@ o = psycopg2.connect("dbname=test")
c = o.cursor()
c.execute("SELECT 1.23::float AS foo")
x = c.fetchone()[0]
print x, type(x)
print(x, type(x))

View File

@ -42,7 +42,7 @@ cur = conn.cursor()
gc_thread.start()
# Now do lots of "cursor.copy_from" calls:
print "copy_from"
print("copy_from")
for i in range(1000):
f = StringIO("42\tfoo\n74\tbar\n")
cur.copy_from(f, 'test', columns=('num', 'data'))
@ -51,7 +51,7 @@ for i in range(1000):
# python: Modules/gcmodule.c:277: visit_decref: Assertion `gc->gc.gc_refs != 0' failed.
# Also exercise the copy_to code path
print "copy_to"
print("copy_to")
cur.execute("truncate test")
f = StringIO("42\tfoo\n74\tbar\n")
cur.copy_from(f, 'test', columns=('num', 'data'))
@ -60,7 +60,7 @@ for i in range(1000):
cur.copy_to(f, 'test', columns=('num', 'data'))
# And copy_expert too
print "copy_expert"
print("copy_expert")
cur.execute("truncate test")
for i in range(1000):
f = StringIO("42\tfoo\n74\tbar\n")

View File

@ -6,7 +6,7 @@ db = psycopg2.connect('dbname=test')
cursor = db.cursor()
print 'Creating tables and sample data'
print('Creating tables and sample data')
cursor.execute('''
CREATE TEMPORARY TABLE foo (
@ -23,22 +23,22 @@ cursor.execute('INSERT INTO bar VALUES (1, 1)')
db.commit()
print 'Deferring constraint and breaking referential integrity'
print('Deferring constraint and breaking referential integrity')
cursor.execute('SET CONSTRAINTS bar_foo_fk DEFERRED')
cursor.execute('UPDATE bar SET foo_id = 42 WHERE id = 1')
print 'Committing (this should fail)'
print('Committing (this should fail)')
try:
db.commit()
except:
traceback.print_exc()
print 'Rolling back connection'
print('Rolling back connection')
db.rollback()
print 'Running a trivial query'
print('Running a trivial query')
try:
cursor.execute('SELECT TRUE')
except:
traceback.print_exc()
print 'db.closed:', db.closed
print('db.closed:', db.closed)

View File

@ -1,3 +1,5 @@
from __future__ import print_function
import psycopg2, psycopg2.extensions
import threading
import gc
@ -20,9 +22,9 @@ class db_user(threading.Thread):
# the conn2 desctructor will block indefinitely
# on the completion of the query
# (and it will not be holding the GIL during that time)
print >> sys.stderr, "begin conn2 del"
print("begin conn2 del", file=sys.stderr)
del cursor, conn2
print >> sys.stderr, "end conn2 del"
print("end conn2 del", file=sys.stderr)
def main():
# lock out a db row
@ -43,7 +45,7 @@ def main():
# as it will avoid conn_close()
for i in range(10):
if gc.collect():
print >> sys.stderr, "garbage collection done"
print("garbage collection done", file=sys.stderr)
break
time.sleep(1)
@ -52,9 +54,9 @@ def main():
# concurrent thread destructor of conn2 to
# continue and it will end up trying to free
# self->dsn a second time.
print >> sys.stderr, "begin conn1 del"
print("begin conn1 del", file=sys.stderr)
del cursor, conn1
print >> sys.stderr, "end conn1 del"
print("end conn1 del", file=sys.stderr)
if __name__ == '__main__':

View File

@ -1,7 +1,7 @@
import psycopg2.extensions
print dir(psycopg2._psycopg)
print psycopg2.extensions.new_type(
(600,), "POINT", lambda oids, name, fun: None)
print "ciccia ciccia"
print psycopg2._psycopg
print(dir(psycopg2._psycopg))
print(psycopg2.extensions.new_type(
(600,), "POINT", lambda oids, name, fun: None))
print("ciccia ciccia")
print(psycopg2._psycopg)

View File

@ -6,4 +6,4 @@ conn = psycopg2.connect("dbname=test")
curs = conn.cursor()
curs.execute("set timezone = 'Asia/Calcutta'")
curs.execute("SELECT now()")
print curs.fetchone()[0]
print(curs.fetchone()[0])

View File

@ -15,6 +15,7 @@ The script can be run at a new PostgreSQL release to refresh the module.
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
from __future__ import print_function
import re
import sys
@ -26,7 +27,7 @@ from BeautifulSoup import BeautifulSoup as BS
def main():
if len(sys.argv) != 2:
print >>sys.stderr, "usage: %s /path/to/errorcodes.py" % sys.argv[0]
print("usage: %s /path/to/errorcodes.py" % sys.argv[0], file=sys.stderr)
return 2
filename = sys.argv[1]
@ -39,9 +40,9 @@ def main():
f = open(filename, "w")
for line in file_start:
print >>f, line
print(line, file=f)
for line in generate_module_data(classes, errors):
print >>f, line
print(line, file=f)
def read_base_file(filename):
@ -141,7 +142,7 @@ def fetch_errors(versions):
errors = defaultdict(dict)
for version in versions:
print >> sys.stderr, version
print(version, file=sys.stderr)
tver = tuple(map(int, version.split()[0].split('.')))
if tver < (9, 1):
c1, e1 = parse_errors_sgml(errors_sgml_url % version)

View File

@ -17,6 +17,7 @@ script exits with error 1.
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
from __future__ import print_function
import gc
import sys
@ -29,8 +30,8 @@ from collections import defaultdict
def main():
opt = parse_args()
import psycopg2.tests
test = psycopg2.tests
import tests
test = tests
if opt.suite:
test = getattr(test, opt.suite)

View File

@ -34,13 +34,13 @@ run_test () {
export PSYCOPG2_TEST_REPL_DSN=
unset PSYCOPG2_TEST_GREEN
python -c \
"from psycopg2 import tests; tests.unittest.main(defaultTest='tests.test_suite')" \
"import tests; tests.unittest.main(defaultTest='tests.test_suite')" \
$VERBOSE
printf "\n\nRunning tests against PostgreSQL $VERSION (green mode)\n\n"
export PSYCOPG2_TEST_GREEN=1
python -c \
"from psycopg2 import tests; tests.unittest.main(defaultTest='tests.test_suite')" \
"import tests; tests.unittest.main(defaultTest='tests.test_suite')" \
$VERBOSE
}

View File

@ -624,8 +624,8 @@ setup(name="psycopg2",
long_description="\n".join(readme.split("\n")[2:]).lstrip(),
classifiers=[x for x in classifiers.split("\n") if x],
data_files=data_files,
package_dir={'psycopg2': 'lib', 'psycopg2.tests': 'tests'},
packages=['psycopg2', 'psycopg2.tests'],
package_dir={'psycopg2': 'lib'},
packages=['psycopg2'],
cmdclass={
'build_ext': psycopg_build_ext,
'build_py': build_py, },

View File

@ -28,36 +28,36 @@ import warnings
warnings.simplefilter('error') # noqa
import sys
from testconfig import dsn
from .testconfig import dsn
import unittest
import test_async
import test_bugX000
import test_bug_gc
import test_cancel
import test_connection
import test_copy
import test_cursor
import test_dates
import test_errcodes
import test_extras_dictcursor
import test_fast_executemany
import test_green
import test_ipaddress
import test_lobject
import test_module
import test_notify
import test_psycopg2_dbapi20
import test_quote
import test_replication
import test_sql
import test_transaction
import test_types_basic
import test_types_extras
import test_with
from . import test_async
from . import test_bugX000
from . import test_bug_gc
from . import test_cancel
from . import test_connection
from . import test_copy
from . import test_cursor
from . import test_dates
from . import test_errcodes
from . import test_extras_dictcursor
from . import test_fast_executemany
from . import test_green
from . import test_ipaddress
from . import test_lobject
from . import test_module
from . import test_notify
from . import test_psycopg2_dbapi20
from . import test_quote
from . import test_replication
from . import test_sql
from . import test_transaction
from . import test_types_basic
from . import test_types_extras
from . import test_with
if sys.version_info[:2] < (3, 6):
import test_async_keyword
from . import test_async_keyword
def test_suite():
@ -66,8 +66,8 @@ def test_suite():
try:
cnn = psycopg2.connect(dsn)
except Exception as e:
print "Failed connection to test db:", e.__class__.__name__, e
print "Please set env vars 'PSYCOPG2_TESTDB*' to valid values."
print("Failed connection to test db:", e.__class__.__name__, e)
print("Please set env vars 'PSYCOPG2_TESTDB*' to valid values.")
sys.exit(1)
else:
cnn.close()

View File

@ -90,12 +90,12 @@ class DatabaseAPI20Test(unittest.TestCase):
self.driver, connect_args and connect_kw_args. Class specification
should be as follows:
import dbapi20
from . import dbapi20
class mytest(dbapi20.DatabaseAPI20Test):
[...]
Don't 'import DatabaseAPI20Test from dbapi20', or you will
confuse the unit tester - just 'import dbapi20'.
Don't 'from .dbapi20 import DatabaseAPI20Test', or you will
confuse the unit tester - just 'from . import dbapi20'.
'''
# The self.driver module. This should be the module where the 'connect'

View File

@ -24,15 +24,14 @@
# License for more details.
import unittest
from testutils import skip_before_postgres, slow
from .testutils import skip_before_postgres, slow
import psycopg2
from psycopg2 import extensions as ext
import time
import StringIO
from testutils import ConnectingTestCase
from .testutils import ConnectingTestCase, StringIO
class PollableStub(object):
@ -241,7 +240,7 @@ class AsyncTests(ConnectingTestCase):
# copy should fail
self.assertRaises(psycopg2.ProgrammingError,
cur.copy_from,
StringIO.StringIO("1\n3\n5\n\\.\n"), "table1")
StringIO("1\n3\n5\n\\.\n"), "table1")
def test_lobject_while_async(self):
# large objects should be prohibited

View File

@ -28,11 +28,11 @@ import time
import psycopg2
from psycopg2 import extras
from testconfig import dsn
from .testconfig import dsn
import unittest
from testutils import ConnectingTestCase, skip_before_postgres, slow
from .testutils import ConnectingTestCase, skip_before_postgres, slow
from test_replication import ReplicationTestCase, skip_repl_if_green
from .test_replication import ReplicationTestCase, skip_repl_if_green
from psycopg2.extras import LogicalReplicationConnection, StopReplication

View File

@ -27,7 +27,7 @@ import psycopg2.extensions
import unittest
import gc
from testutils import ConnectingTestCase, skip_if_no_uuid
from .testutils import ConnectingTestCase, skip_if_no_uuid
class StolenReferenceTestCase(ConnectingTestCase):

View File

@ -30,9 +30,9 @@ import psycopg2
import psycopg2.extensions
from psycopg2 import extras
from testconfig import dsn
from .testconfig import dsn
import unittest
from testutils import ConnectingTestCase, skip_before_postgres, slow
from .testutils import ConnectingTestCase, skip_before_postgres, slow
class CancelTests(ConnectingTestCase):

View File

@ -34,12 +34,12 @@ import psycopg2
import psycopg2.errorcodes
from psycopg2 import extensions as ext
from testutils import (
from .testutils import (
unittest, decorate_all_tests, skip_if_no_superuser,
skip_before_postgres, skip_after_postgres, skip_before_libpq,
ConnectingTestCase, skip_if_tpc_disabled, skip_if_windows, slow)
from testconfig import dsn, dbname
from .testconfig import dsn, dbname
class ConnectionTests(ConnectingTestCase):

View File

@ -25,16 +25,15 @@
import sys
import string
import unittest
from testutils import (ConnectingTestCase, decorate_all_tests,
skip_before_postgres, slow)
from cStringIO import StringIO
from itertools import cycle, izip
from .testutils import (ConnectingTestCase, decorate_all_tests,
skip_before_postgres, slow, StringIO)
from itertools import cycle
from subprocess import Popen, PIPE
import psycopg2
import psycopg2.extensions
from testutils import skip_copy_if_green
from testconfig import dsn
from .testutils import skip_copy_if_green
from .testconfig import dsn
if sys.version_info[0] < 3:
@ -99,7 +98,7 @@ class CopyTests(ConnectingTestCase):
def test_copy_from_cols(self):
curs = self.conn.cursor()
f = StringIO()
for i in xrange(10):
for i in range(10):
f.write("%s\n" % (i,))
f.seek(0)
@ -111,7 +110,7 @@ class CopyTests(ConnectingTestCase):
def test_copy_from_cols_err(self):
curs = self.conn.cursor()
f = StringIO()
for i in xrange(10):
for i in range(10):
f.write("%s\n" % (i,))
f.seek(0)
@ -141,7 +140,7 @@ class CopyTests(ConnectingTestCase):
about = abin.decode('latin1').replace('\\', '\\\\')
else:
abin = bytes(range(32, 127) + range(160, 256)).decode('latin1')
abin = bytes(list(range(32, 127)) + list(range(160, 256))).decode('latin1')
about = abin.replace('\\', '\\\\')
curs = self.conn.cursor()
@ -162,7 +161,7 @@ class CopyTests(ConnectingTestCase):
abin = ''.join(map(chr, range(32, 127) + range(160, 255)))
about = abin.replace('\\', '\\\\')
else:
abin = bytes(range(32, 127) + range(160, 255)).decode('latin1')
abin = bytes(list(range(32, 127)) + list(range(160, 255))).decode('latin1')
about = abin.replace('\\', '\\\\').encode('latin1')
curs = self.conn.cursor()
@ -185,7 +184,7 @@ class CopyTests(ConnectingTestCase):
about = abin.replace('\\', '\\\\')
else:
abin = bytes(range(32, 127) + range(160, 256)).decode('latin1')
abin = bytes(list(range(32, 127)) + list(range(160, 256))).decode('latin1')
about = abin.replace('\\', '\\\\')
import io
@ -225,7 +224,7 @@ class CopyTests(ConnectingTestCase):
def _copy_from(self, curs, nrecs, srec, copykw):
f = StringIO()
for i, c in izip(xrange(nrecs), cycle(string.ascii_letters)):
for i, c in zip(range(nrecs), cycle(string.ascii_letters)):
l = c * srec
f.write("%s\t%s\n" % (i, l))

View File

@ -27,9 +27,9 @@ import pickle
import psycopg2
import psycopg2.extensions
import unittest
from testutils import (ConnectingTestCase, skip_before_postgres,
from .testutils import (ConnectingTestCase, skip_before_postgres,
skip_if_no_getrefcount, slow, skip_if_no_superuser,
skip_if_windows)
skip_if_windows, unicode)
import psycopg2.extras
@ -553,7 +553,7 @@ class CursorTests(ConnectingTestCase):
# Issue #443 is in the async code too. Since the fix is duplicated,
# so is the test.
control_conn = self.conn
connect_func = lambda: self.connect(async=True)
connect_func = lambda: self.connect(async_=True)
wait_func = psycopg2.extras.wait_select
self._test_external_close(control_conn, connect_func, wait_func)

View File

@ -26,7 +26,7 @@ import math
import psycopg2
from psycopg2.tz import FixedOffsetTimezone, ZERO
import unittest
from testutils import ConnectingTestCase, skip_before_postgres
from .testutils import ConnectingTestCase, skip_before_postgres
def total_seconds(d):

View File

@ -23,7 +23,7 @@
# License for more details.
import unittest
from testutils import ConnectingTestCase, slow
from .testutils import ConnectingTestCase, slow, reload
try:
reload
@ -52,7 +52,7 @@ class ErrocodeTests(ConnectingTestCase):
except Exception as e:
errs.append(e)
for __ in xrange(MAX_CYCLES):
for __ in range(MAX_CYCLES):
reload(errorcodes)
(t1, t2) = (Thread(target=f), Thread(target=f))
(t1.start(), t2.start())

View File

@ -19,7 +19,7 @@ from datetime import timedelta
import psycopg2
import psycopg2.extras
import unittest
from testutils import ConnectingTestCase, skip_before_postgres
from .testutils import ConnectingTestCase, skip_before_postgres
class ExtrasDictCursorTests(ConnectingTestCase):
@ -390,7 +390,7 @@ class NamedTupleCursorTest(ConnectingTestCase):
recs.extend(curs.fetchmany(5))
recs.append(curs.fetchone())
recs.extend(curs.fetchall())
self.assertEqual(range(10), [t.i for t in recs])
self.assertEqual(list(range(10)), [t.i for t in recs])
def test_named_fetchone(self):
curs = self.conn.cursor('tmp')

View File

@ -16,7 +16,7 @@
from datetime import date
import testutils
from . import testutils
import unittest
import psycopg2

View File

@ -27,7 +27,7 @@ import psycopg2
import psycopg2.extensions
import psycopg2.extras
from testutils import ConnectingTestCase, slow
from .testutils import ConnectingTestCase, slow
class ConnectionStub(object):

View File

@ -19,7 +19,7 @@ from __future__ import unicode_literals
import sys
from functools import wraps
import testutils
from . import testutils
import unittest
import psycopg2

View File

@ -30,7 +30,7 @@ from functools import wraps
import psycopg2
import psycopg2.extensions
import unittest
from testutils import (decorate_all_tests, skip_if_tpc_disabled,
from .testutils import (decorate_all_tests, skip_if_tpc_disabled,
ConnectingTestCase, skip_if_green, slow)

View File

@ -27,8 +27,8 @@ import sys
from subprocess import Popen
import unittest
from testutils import (skip_before_postgres,
ConnectingTestCase, skip_copy_if_green, slow)
from .testutils import (skip_before_postgres,
ConnectingTestCase, skip_copy_if_green, slow, StringIO)
import psycopg2
@ -217,7 +217,6 @@ class ExceptionsTestCase(ConnectingTestCase):
@skip_copy_if_green
def test_diagnostics_copy(self):
from StringIO import StringIO
f = StringIO()
cur = self.conn.cursor()
try:

View File

@ -26,8 +26,8 @@ import unittest
import psycopg2
from psycopg2 import extensions
from testutils import ConnectingTestCase, slow
from testconfig import dsn
from .testutils import ConnectingTestCase, slow
from .testconfig import dsn
import sys
import time

View File

@ -22,14 +22,14 @@
# FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
# License for more details.
import dbapi20
import dbapi20_tpc
from testutils import skip_if_tpc_disabled
from . import dbapi20
from . import dbapi20_tpc
from .testutils import skip_if_tpc_disabled
import unittest
from testutils import decorate_all_tests
from .testutils import decorate_all_tests
import psycopg2
from testconfig import dsn
from .testconfig import dsn
class Psycopg2Tests(dbapi20.DatabaseAPI20Test):

View File

@ -23,9 +23,9 @@
# License for more details.
import sys
import testutils
from . import testutils
import unittest
from testutils import ConnectingTestCase
from .testutils import ConnectingTestCase, unichr
import psycopg2
import psycopg2.extensions
@ -81,7 +81,7 @@ class QuotingTestCase(ConnectingTestCase):
if sys.version_info[0] < 3:
data += "".join(map(chr, range(256)))
else:
data += bytes(range(256))
data += bytes(list(range(256)))
curs = self.conn.cursor()
curs.execute("SELECT %s::bytea;", (psycopg2.Binary(data),))
@ -126,7 +126,7 @@ class QuotingTestCase(ConnectingTestCase):
if sys.version_info[0] < 3:
data = ''.join(map(chr, range(32, 127) + range(160, 256)))
else:
data = bytes(range(32, 127) + range(160, 256)).decode('latin1')
data = bytes(list(range(32, 127)) + list(range(160, 256))).decode('latin1')
# as string
curs.execute("SELECT %s::text;", (data,))
@ -150,7 +150,7 @@ class QuotingTestCase(ConnectingTestCase):
if sys.version_info[0] < 3:
data = ''.join(map(chr, range(32, 127) + range(128, 256)))
else:
data = bytes(range(32, 127) + range(128, 256)).decode('koi8_r')
data = bytes(list(range(32, 127)) + list(range(128, 256))).decode('koi8_r')
# as string
curs.execute("SELECT %s::text;", (data,))

View File

@ -26,10 +26,10 @@ import psycopg2
from psycopg2.extras import (
PhysicalReplicationConnection, LogicalReplicationConnection, StopReplication)
import testconfig
from . import testconfig
import unittest
from testutils import ConnectingTestCase
from testutils import skip_before_postgres, skip_if_green
from .testutils import ConnectingTestCase
from .testutils import skip_before_postgres, skip_if_green
skip_repl_if_green = skip_if_green("replication not supported in green mode")

View File

@ -23,10 +23,10 @@
# License for more details.
import datetime as dt
from cStringIO import StringIO
import unittest
from testutils import (ConnectingTestCase,
skip_before_postgres, skip_before_python, skip_copy_if_green)
from .testutils import (ConnectingTestCase,
skip_before_postgres, skip_before_python, skip_copy_if_green,
unicode, StringIO)
import psycopg2
from psycopg2 import sql

View File

@ -24,7 +24,7 @@
import threading
import unittest
from testutils import ConnectingTestCase, skip_before_postgres, slow
from .testutils import ConnectingTestCase, skip_before_postgres, slow
import psycopg2
from psycopg2.extensions import (

View File

@ -26,9 +26,9 @@ import decimal
import sys
from functools import wraps
import testutils
from . import testutils
import unittest
from testutils import ConnectingTestCase, decorate_all_tests
from .testutils import ConnectingTestCase, decorate_all_tests, long
import psycopg2
@ -54,8 +54,8 @@ class TypesBasicTests(ConnectingTestCase):
def testNumber(self):
s = self.execute("SELECT %s AS foo", (1971,))
self.failUnless(s == 1971, "wrong integer quoting: " + str(s))
s = self.execute("SELECT %s AS foo", (1971L,))
self.failUnless(s == 1971L, "wrong integer quoting: " + str(s))
s = self.execute("SELECT %s AS foo", (long(1971),))
self.failUnless(s == long(1971), "wrong integer quoting: " + str(s))
def testBoolean(self):
x = self.execute("SELECT %s as foo", (False,))
@ -326,7 +326,7 @@ class TypesBasicTests(ConnectingTestCase):
self.assertEqual(1, f1)
i1 = self.execute("select -%s;", (-1,))
self.assertEqual(1, i1)
l1 = self.execute("select -%s;", (-1L,))
l1 = self.execute("select -%s;", (long(-1),))
self.assertEqual(1, l1)
def testGenericArray(self):

View File

@ -23,8 +23,9 @@ from functools import wraps
from pickle import dumps, loads
import unittest
from testutils import (skip_if_no_uuid, skip_before_postgres,
ConnectingTestCase, decorate_all_tests, py3_raises_typeerror, slow)
from .testutils import (skip_if_no_uuid, skip_before_postgres,
ConnectingTestCase, decorate_all_tests, py3_raises_typeerror, slow,
skip_from_python)
import psycopg2
import psycopg2.extras
@ -180,7 +181,7 @@ class HstoreTestCase(ConnectingTestCase):
kk = m.group(1).split(b", ")
vv = m.group(2).split(b", ")
ii = zip(kk, vv)
ii = list(zip(kk, vv))
ii.sort()
self.assertEqual(len(ii), len(o))
@ -250,6 +251,7 @@ class HstoreTestCase(ConnectingTestCase):
self.assertEqual(t[2], {'a': 'b'})
@skip_if_no_hstore
@skip_from_python(3)
def test_register_unicode(self):
from psycopg2.extras import register_hstore
@ -304,7 +306,7 @@ class HstoreTestCase(ConnectingTestCase):
ok({})
ok({'a': 'b', 'c': None})
ab = map(chr, range(32, 128))
ab = list(map(chr, range(32, 128)))
ok(dict(zip(ab, ab)))
ok({''.join(ab): ''.join(ab)})
@ -312,12 +314,13 @@ class HstoreTestCase(ConnectingTestCase):
if sys.version_info[0] < 3:
ab = map(chr, range(32, 127) + range(160, 255))
else:
ab = bytes(range(32, 127) + range(160, 255)).decode('latin1')
ab = bytes(list(range(32, 127)) + list(range(160, 255))).decode('latin1')
ok({''.join(ab): ''.join(ab)})
ok(dict(zip(ab, ab)))
@skip_if_no_hstore
@skip_from_python(3)
def test_roundtrip_unicode(self):
from psycopg2.extras import register_hstore
register_hstore(self.conn, unicode=True)
@ -368,7 +371,7 @@ class HstoreTestCase(ConnectingTestCase):
ds = [{}, {'a': 'b', 'c': None}]
ab = map(chr, range(32, 128))
ab = list(map(chr, range(32, 128)))
ds.append(dict(zip(ab, ab)))
ds.append({''.join(ab): ''.join(ab)})
@ -376,7 +379,7 @@ class HstoreTestCase(ConnectingTestCase):
if sys.version_info[0] < 3:
ab = map(chr, range(32, 127) + range(160, 255))
else:
ab = bytes(range(32, 127) + range(160, 255)).decode('latin1')
ab = bytes(list(range(32, 127)) + list(range(160, 255))).decode('latin1')
ds.append({''.join(ab): ''.join(ab)})
ds.append(dict(zip(ab, ab)))
@ -511,7 +514,7 @@ class AdaptTypeTestCase(ConnectingTestCase):
'@,A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P,Q,R,S,T,U,V,W,X,Y,Z,[,"\\\\",],'
'^,_,`,a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t,u,v,w,x,y,z,{,|,},'
'~,\x7f)',
map(chr, range(1, 128)))
list(map(chr, range(1, 128))))
ok('(,"\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f'
'\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !'
'""#$%&\'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\\\]'
@ -1638,8 +1641,8 @@ class RangeCasterTestCase(ConnectingTestCase):
bounds = ['[)', '(]', '()', '[]']
ranges = [TextRange(low, up, bounds[i % 4])
for i, (low, up) in enumerate(zip(
[None] + map(chr, range(1, 128)),
map(chr, range(1, 128)) + [None],
[None] + list(map(chr, range(1, 128))),
list(map(chr, range(1, 128))) + [None],
))]
ranges.append(TextRange())
ranges.append(TextRange(empty=True))

View File

@ -26,7 +26,7 @@ import psycopg2
import psycopg2.extensions as ext
import unittest
from testutils import ConnectingTestCase
from .testutils import ConnectingTestCase
class WithTestCase(ConnectingTestCase):

View File

@ -29,7 +29,24 @@ import select
import platform
import unittest
from functools import wraps
from testconfig import dsn, repl_dsn
from .testconfig import dsn, repl_dsn
# Python 2/3 compatibility
if sys.version_info[0] == 2:
# Python 2
from StringIO import StringIO
long = long
reload = reload
unichr = unichr
unicode = unicode
else:
# Python 3
from io import StringIO
from importlib import reload
long = int
unichr = chr
unicode = str
# Silence warnings caused by the stubbornness of the Python unittest
@ -338,7 +355,7 @@ def skip_if_green(reason):
def skip_if_green_(f):
@wraps(f)
def skip_if_green__(self):
from testconfig import green
from .testconfig import green
if green:
return self.skipTest(reason)
else: