Merge branch 'master' into https

This commit is contained in:
Daniele Varrazzo 2018-10-10 22:07:33 +01:00
commit 9148157697
42 changed files with 251 additions and 61957 deletions

7
NEWS
View File

@ -7,8 +7,15 @@ What's new in psycopg 2.8
New features:
- Added `~psycopg2.extensions.encrypt_password()` function (:ticket:`#576`).
- Added `connection.host` property (:ticket:`#726`).
- `~psycopg2.sql.Identifier` can represent qualified names in SQL composition
(:ticket:`#732`).
- `!str()` on `~psycopg2.extras.Range` produces a human-readable representation
(:ticket:`#773`).
- `~psycopg2.extras.DictCursor` and `~psycopg2.extras.RealDictCursor` rows
maintain columns order (:ticket:`#177`).
- Added `!severity_nonlocalized` attribute on the
`~psycopg2.extensions.Diagnostics` object (:ticket:`#783`).
Other changes:

View File

@ -599,6 +599,24 @@ The ``connection`` class
.. versionadded:: 2.5
.. index::
pair: Backend; Host
.. attribute:: host
The server host name of the active connection.
This can be a host name, an IP address, or a directory path if the
connection is via Unix socket. (The path case can be distinguished
because it will always be an absolute path, beginning with ``/``.)
.. seealso:: libpq docs for `PQhost()`__ for details.
.. __: http://www.postgresql.org/docs/current/static/libpq-status.html#LIBPQ-PQHOST
.. versionadded:: 2.8.0
.. index::
pair: Backend; PID

View File

@ -186,6 +186,7 @@ introspection etc.
message_primary
schema_name
severity
severity_nonlocalized
source_file
source_function
source_line
@ -198,6 +199,9 @@ introspection etc.
not all the fields are available for all the errors and for all the
server versions.
.. versionadded:: 2.8
The `!severity_nonlocalized` attribute.
.. _sql-adaptation-objects:

View File

@ -77,16 +77,26 @@ to cursor methods such as `~cursor.execute()`, `~cursor.executemany()`,
.. autoclass:: Identifier
.. autoattribute:: string
.. versionchanged:: 2.8
added support for multiple strings.
.. autoattribute:: strings
.. versionadded:: 2.8
previous verions only had a `!string` attribute. The attribute
still exists but is deprecate and will only work if the
`!Identifier` wraps a single string.
.. autoclass:: Literal
.. autoattribute:: wrapped
.. autoclass:: Placeholder
.. autoattribute:: name
.. autoclass:: Composed
.. autoattribute:: seq

View File

@ -221,7 +221,28 @@ argument of the `~cursor.execute()` method::
>>> cur.execute(SQL, data) # Note: no % operator
Values containing backslashes and LIKE
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Unlike in Python, the backslash (`\\`) is not used as an escape
character *except* in patterns used with `LIKE` and `ILIKE` where they
are needed to escape the `%` and `_` characters.
This can lead to confusing situations::
>>> path = r'C:\Users\Bobby.Tables'
>>> cur.execute('INSERT INTO mytable(path) VALUES (%s)', (path,))
>>> cur.execute('SELECT * FROM mytable WHERE path LIKE %s', (path,))
>>> cur.fetchall()
[]
The solution is to specify an `ESCAPE` character of `''` (empty string)
in your `LIKE` query::
>>> cur.execute("SELECT * FROM mytable WHERE path LIKE %s ESCAPE ''", (path,))
.. index::
single: Adaptation
pair: Objects; Adaptation

View File

@ -62,6 +62,19 @@ class Range(object):
return "%s(%r, %r, %r)" % (self.__class__.__name__,
self._lower, self._upper, self._bounds)
def __str__(self):
if self._bounds is None:
return 'empty'
items = [
self._bounds[0],
str(self._lower),
', ',
str(self._upper),
self._bounds[1]
]
return ''.join(items)
@property
def lower(self):
"""The lower bound of the range. `!None` if empty or unbound."""

View File

@ -290,7 +290,7 @@ class SQL(Composable):
class Identifier(Composable):
"""
A `Composable` representing an SQL identifer.
A `Composable` representing an SQL identifer or a dot-separated sequence.
Identifiers usually represent names of database objects, such as tables or
fields. PostgreSQL identifiers follow `different rules`__ than SQL string
@ -307,20 +307,50 @@ class Identifier(Composable):
>>> print(sql.SQL(', ').join([t1, t2, t3]).as_string(conn))
"foo", "ba'r", "ba""z"
"""
def __init__(self, string):
if not isinstance(string, string_types):
raise TypeError("SQL identifiers must be strings")
Multiple strings can be passed to the object to represent a qualified name,
i.e. a dot-separated sequence of identifiers.
super(Identifier, self).__init__(string)
Example::
>>> query = sql.SQL("select {} from {}").format(
... sql.Identifier("table", "field"),
... sql.Identifier("schema", "table"))
>>> print(query.as_string(conn))
select "table"."field" from "schema"."table"
"""
def __init__(self, *strings):
if not strings:
raise TypeError("Identifier cannot be empty")
for s in strings:
if not isinstance(s, string_types):
raise TypeError("SQL identifier parts must be strings")
super(Identifier, self).__init__(strings)
@property
def strings(self):
"""A tuple with the strings wrapped by the `Identifier`."""
return self._wrapped
@property
def string(self):
"""The string wrapped by the `Identifier`."""
return self._wrapped
"""The string wrapped by the `Identifier`.
"""
if len(self._wrapped) == 1:
return self._wrapped[0]
else:
raise AttributeError(
"the Identifier wraps more than one than one string")
def __repr__(self):
return "%s(%s)" % (
self.__class__.__name__,
', '.join(map(repr, self._wrapped)))
def as_string(self, context):
return ext.quote_ident(self._wrapped, context)
return '.'.join(ext.quote_ident(s, context) for s in self._wrapped)
class Literal(Composable):

View File

@ -992,6 +992,25 @@ psyco_conn_get_backend_pid(connectionObject *self)
return PyInt_FromLong((long)PQbackendPID(self->pgconn));
}
/* get the current host */
#define psyco_conn_host_get_doc \
"host -- Get the host name."
static PyObject *
psyco_conn_host_get(connectionObject *self)
{
const char *val = NULL;
EXC_IF_CONN_CLOSED(self);
val = PQhost(self->pgconn);
if (!val) {
Py_RETURN_NONE;
}
return conn_text_from_chars(self, val);
}
/* reset the currect connection */
#define psyco_conn_reset_doc \
@ -1243,6 +1262,9 @@ static struct PyGetSetDef connectionObject_getsets[] = {
(getter)psyco_conn_deferrable_get,
(setter)psyco_conn_deferrable_set,
psyco_conn_deferrable_doc },
{ "host",
(getter)psyco_conn_host_get, NULL,
psyco_conn_host_get_doc },
{NULL}
};
#undef EXCEPTION_GETTER

View File

@ -29,8 +29,11 @@
#include "psycopg/diagnostics.h"
#include "psycopg/error.h"
/* These are new in PostgreSQL 9.3. Defining them here so that psycopg2 can
* use them with a 9.3+ server even if compiled against pre-9.3 headers. */
/* These constants are defined in src/include/postgres_ext.h but some may not
* be available with the libpq we currently support at compile time. */
/* Available from PG 9.3 */
#ifndef PG_DIAG_SCHEMA_NAME
#define PG_DIAG_SCHEMA_NAME 's'
#endif
@ -47,6 +50,11 @@
#define PG_DIAG_CONSTRAINT_NAME 'n'
#endif
/* Available from PG 9.6 */
#ifndef PG_DIAG_SEVERITY_NONLOCALIZED
#define PG_DIAG_SEVERITY_NONLOCALIZED 'V'
#endif
/* Retrieve an error string from the exception's cursor.
*
@ -70,6 +78,8 @@ psyco_diagnostics_get_field(diagnosticsObject *self, void *closure)
static struct PyGetSetDef diagnosticsObject_getsets[] = {
{ "severity", (getter)psyco_diagnostics_get_field, NULL,
NULL, (void*) PG_DIAG_SEVERITY },
{ "severity_nonlocalized", (getter)psyco_diagnostics_get_field, NULL,
NULL, (void*) PG_DIAG_SEVERITY_NONLOCALIZED },
{ "sqlstate", (getter)psyco_diagnostics_get_field, NULL,
NULL, (void*) PG_DIAG_SQLSTATE },
{ "message_primary", (getter)psyco_diagnostics_get_field, NULL,

View File

@ -1,30 +0,0 @@
import psycopg2
conn = psycopg2.connect("port=5433 dbname=test")
curs = conn.cursor()
#curs.execute("SELECT ARRAY[1,2,3] AS foo")
#print curs.fetchone()[0]
#curs.execute("SELECT ARRAY['1','2','3'] AS foo")
#print curs.fetchone()[0]
#curs.execute("""SELECT ARRAY[',','"','\\\\'] AS foo""")
#d = curs.fetchone()[0]
#print d, '->', d[0], d[1], d[2]
#curs.execute("SELECT ARRAY[ARRAY[1,2],ARRAY[3,4]] AS foo")
#print curs.fetchone()[0]
#curs.execute("SELECT ARRAY[ARRAY[now(), now()], ARRAY[now(), now()]] AS foo")
#print curs.description
#print curs.fetchone()[0]
#curs.execute("SELECT 1 AS foo, ARRAY[1,2] AS bar")
#print curs.fetchone()
#curs.execute("SELECT * FROM test()")
#print curs.fetchone()
curs.execute("SELECT %s", ([1,2,None],))
print(curs.fetchone())

View File

@ -1,35 +0,0 @@
import datetime
import time
import psycopg2
#d = datetime.timedelta(12, 100, 9876)
#print d.days, d.seconds, d.microseconds
#print psycopg.adapt(d).getquoted()
conn = psycopg2.connect("dbname=test_unicode")
conn.set_client_encoding("xxx")
curs = conn.cursor()
#curs.execute("SELECT 1.0 AS foo")
#print curs.fetchmany(2)
#print curs.fetchall()
def sleep(curs):
while not curs.isready():
print(".")
time.sleep(.1)
#curs.execute("""
# DECLARE zz INSENSITIVE SCROLL CURSOR WITH HOLD FOR
# SELECT now();
# FOR READ ONLY;""", async = 1)
curs.execute("SELECT now() AS foo", async=1)
sleep(curs)
print(curs.fetchall())
#curs.execute("""
# FETCH FORWARD 1 FROM zz;""", async = 1)
curs.execute("SELECT now() AS bar", async=1)
print(curs.fetchall())
curs.execute("SELECT now() AS bar")
sleep(curs)

View File

@ -1,25 +0,0 @@
#!/usr/bin/env python
#import psycopg as db
import psycopg2 as db
import threading
import time
import sys
def query_worker(dsn):
conn = db.connect(dsn)
cursor = conn.cursor()
while True:
cursor.execute("select * from pg_class")
while True:
row = cursor.fetchone()
if row is None:
break
if len(sys.argv) != 2:
print('usage: %s DSN' % sys.argv[0])
sys.exit(1)
th = threading.Thread(target=query_worker, args=(sys.argv[1],))
th.setDaemon(True)
th.start()
time.sleep(1)

View File

@ -1,15 +0,0 @@
import psycopg2
import psycopg2.extensions
DEC2FLOAT = psycopg2.extensions.new_type(
psycopg2._psycopg.DECIMAL.values,
'DEC2FLOAT',
psycopg2.extensions.FLOAT)
psycopg2.extensions.register_type(DEC2FLOAT)
o = psycopg2.connect("dbname=test")
c = o.cursor()
c.execute("SELECT NULL::decimal(10,2)")
n = c.fetchone()[0]
print(n, type(n))

View File

@ -1,18 +0,0 @@
import psycopg2
con = psycopg2.connect("dbname=test")
cur = con.cursor()
cur.execute("SELECT %s::regtype::oid", ('bytea', ))
print(cur.fetchone()[0])
# 17
cur.execute("CREATE DOMAIN thing AS bytea")
cur.execute("SELECT %s::regtype::oid", ('thing', ))
print(cur.fetchone()[0])
#62148
cur.execute("CREATE TABLE thingrel (thingcol thing)")
cur.execute("SELECT * FROM thingrel")
print(cur.description)
#(('thingcol', 17, None, -1, None, None, None),)

View File

@ -1,18 +0,0 @@
import psycopg2
o = psycopg2.connect("dbname=test")
c = o.cursor()
def sql():
c.execute("SELECT 1.23 AS foo")
print(1, c.fetchone())
#print c.description
c.execute("SELECT 1.23::float AS foo")
print(2, c.fetchone())
#print c.description
print("BEFORE")
sql()
import gtk
print("AFTER")
sql()

View File

@ -1,13 +0,0 @@
import psycopg2
import psycopg2.extras
conn = psycopg2.connect("dbname=test")
curs = conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
curs.execute("SELECT '2005-2-12'::date AS foo, 'boo!' as bar")
for x in curs.fetchall():
print(type(x), x[0], x[1], x['foo'], x['bar'])
curs.execute("SELECT '2005-2-12'::date AS foo, 'boo!' as bar")
for x in curs:
print(type(x), x[0], x[1], x['foo'], x['bar'])

View File

@ -1,82 +0,0 @@
"""
script: test_leak.py
This script attempts to repeatedly insert the same list of rows into
the database table, causing a duplicate key error to occur. It will
then roll back the transaction and try again.
Database table schema:
-- CREATE TABLE t (foo TEXT PRIMARY KEY);
There are two ways to run the script, which will launch one of the
two functions:
# leak() will cause increasingly more RAM to be used by the script.
$ python <script_nam> leak
# noleak() does not have the RAM usage problem. The only difference
# between it and leak() is that 'rows' is created once, before the loop.
$ python <script_name> noleak
Use Control-C to quit the script.
"""
import sys
import psycopg2
DB_NAME = 'test'
connection = psycopg2.connect(database=DB_NAME)
cursor = connection.cursor()
# Uncomment the following if table 't' does not exist
create_table = """CREATE TABLE t (foo TEXT PRIMARY KEY)"""
cursor.execute(create_table)
insert = """INSERT INTO t VALUES (%(foo)s)"""
def leak():
"""rows created in each loop run"""
count = 0
while 1:
try:
rows = []
for i in range(1, 100):
row = {'foo': i}
rows.append(row)
count += 1
print("loop count:", count)
cursor.executemany(insert, rows)
connection.commit()
except psycopg2.IntegrityError:
connection.rollback()
def noleak():
"""rows created once, before the loop"""
rows = []
for i in range(1, 100):
row = {'foo': i}
rows.append(row)
count = 0
while 1:
try:
count += 1
print("loop count:", count)
cursor.executemany(insert, rows)
connection.commit()
except psycopg2.IntegrityError:
connection.rollback()
usage = "%s requires one argument: 'leak' or 'noleak'" % sys.argv[0]
try:
if 'leak' == sys.argv[1]:
run_function = leak
elif 'noleak' == sys.argv[1]:
run_function = noleak
else:
print(usage)
sys.exit()
except IndexError:
print(usage)
sys.exit()
# Run leak() or noleak(), whichever was indicated on the command line
run_function()

View File

@ -1,43 +0,0 @@
#!/usr/bin/env python
"""
Test if the arguments object can be used with both positional and keyword
arguments.
"""
class O(object):
def __init__(self, *args, **kwds):
self.args = args
self.kwds = kwds
def __getitem__(self, k):
if isinstance(k, int):
return self.args[k]
else:
return self.kwds[k]
o = O('R%', second='S%')
print(o[0])
print(o['second'])
#-------------------------------------------------------------------------------
import psycopg2 as dbapi
conn = dbapi.connect(database='test')
cursor = conn.cursor()
cursor.execute("""
SELECT * FROM location_pretty
WHERE keyname LIKE %s OR keyname LIKE %(second)s
""", (o,))
for row in cursor:
print(row)

View File

@ -1,31 +0,0 @@
import psycopg2
import psycopg2.extensions
class Portal(psycopg2.extensions.cursor):
def __init__(self, name, curs):
psycopg2.extensions.cursor.__init__(
self, curs.connection, '"'+name+'"')
CURSOR = psycopg2.extensions.new_type((1790,), "CURSOR", Portal)
psycopg2.extensions.register_type(CURSOR)
conn = psycopg2.connect("dbname=test")
curs = conn.cursor()
curs.execute("SELECT reffunc2()")
portal = curs.fetchone()[0]
print(portal.fetchone())
print(portal.fetchmany(2))
portal.scroll(0, 'absolute')
print(portal.fetchall())
#print curs.rowcount
#print curs.statusmessage
#print curs.fetchone()
#print curs.rowcount
#print curs.statusmessage
#print curs.fetchone()
#print curs.rowcount
#print curs.statusmessage

View File

@ -1,12 +0,0 @@
class B(object):
def __init__(self, x):
if x: self._o = True
else: self._o = False
def __getattribute__(self, attr):
print("ga called", attr)
return object.__getattribute__(self, attr)
def _sqlquote(self):
if self._o:
return 'It is True'
else:
return 'It is False'

View File

@ -1,11 +0,0 @@
import psycopg2
import threading, os, time, gc
for i in range(20000):
conn = psycopg2.connect('dbname=test')
del conn
if i%200 == 0:
datafile = os.popen('ps -p %s -o rss' % os.getpid())
line = datafile.readlines(2)[1].strip()
datafile.close()
print(str(i) + '\t' + line)

View File

@ -1,42 +0,0 @@
import psycopg2
import threading, os, time, gc
super_lock = threading.Lock()
def f():
try:
conn = psycopg2.connect('dbname=testx')
#c = db.cursor()
#c.close()
#conn.close()
del conn
except:
pass
#print "ERROR"
def g():
n = 30
k = 0
i = 1
while i > 0:
while n > 0:
threading.Thread(target=f).start()
time.sleep(0.001)
threading.Thread(target=f).start()
time.sleep(0.001)
threading.Thread(target=f).start()
n -= 1
while threading.activeCount() > 1:
time.sleep(0.01)
datafile = os.popen('ps -p %s -o rss' % os.getpid())
line = datafile.readlines(2)[1].strip()
datafile.close()
n = 30
print(str(k*n) + '\t' + line)
k += 1
while threading.activeCount()>1:
pass
g()

View File

@ -1,49 +0,0 @@
import datetime
import time
import psycopg2
#d = datetime.timedelta(12, 100, 9876)
#print d.days, d.seconds, d.microseconds
#print psycopg.adapt(d).getquoted()
conn = psycopg2.connect("dbname=test")
#conn.set_client_encoding("xxx")
curs = conn.cursor()
curs.execute("SELECT '2005-2-12'::date AS foo")
print(curs.fetchall())
curs.execute("SELECT '10:23:60'::time AS foo")
print(curs.fetchall())
curs.execute("SELECT '10:23:59.895342'::time AS foo")
print(curs.fetchall())
curs.execute("SELECT '0:0:12.31423'::time with time zone AS foo")
print(curs.fetchall())
curs.execute("SELECT '0:0:12+01:30'::time with time zone AS foo")
print(curs.fetchall())
curs.execute("SELECT '2005-2-12 10:23:59.895342'::timestamp AS foo")
print(curs.fetchall())
curs.execute("SELECT '2005-2-12 10:23:59.895342'::timestamp with time zone AS foo")
print(curs.fetchall())
#print curs.fetchmany(2)
#print curs.fetchall()
def sleep(curs):
while not curs.isready():
print(".")
time.sleep(.1)
#curs.execute("""
# DECLARE zz INSENSITIVE SCROLL CURSOR WITH HOLD FOR
# SELECT now();
# FOR READ ONLY;""", async = 1)
#curs.execute("SELECT now() AS foo", async=1);
#sleep(curs)
#print curs.fetchall()
#curs.execute("""
# FETCH FORWARD 1 FROM zz;""", async = 1)
#curs.execute("SELECT now() AS bar", async=1);
#print curs.fetchall()
#curs.execute("SELECT now() AS bar");
#sleep(curs)

View File

@ -1,8 +0,0 @@
import psycopg2
import psycopg2.extras
conn = psycopg2.connect("dbname=test")
curs = conn.cursor()
curs.execute("SELECT true AS foo WHERE 'a' in %s", (("aa", "bb"),))
print(curs.fetchall())
print(curs.query)

File diff suppressed because it is too large Load Diff

View File

@ -1,42 +0,0 @@
import psycopg2
dbconn = psycopg2.connect(database="test",host="localhost",port="5432")
query = """
CREATE TEMP TABLE data (
field01 char,
field02 varchar,
field03 varchar,
field04 varchar,
field05 varchar,
field06 varchar,
field07 varchar,
field08 varchar,
field09 numeric,
field10 integer,
field11 numeric,
field12 numeric,
field13 numeric,
field14 numeric,
field15 numeric,
field16 numeric,
field17 char,
field18 char,
field19 char,
field20 varchar,
field21 varchar,
field22 integer,
field23 char,
field24 char
);
"""
cursor = dbconn.cursor()
cursor.execute(query)
f = open('test_copy2.csv')
cursor.copy_from(f, 'data', sep='|')
f.close()
dbconn.commit()
cursor.close()
dbconn.close()

View File

@ -1,81 +0,0 @@
#!/usr/bin/env python
"""Test for issue #113: test with error during green processing
"""
DSN = 'dbname=test'
import eventlet.patcher
eventlet.patcher.monkey_patch()
import os
import signal
from time import sleep
import psycopg2
from psycopg2 import extensions
from eventlet.hubs import trampoline
# register a test wait callback that fails if SIGHUP is received
panic = []
def wait_cb(conn):
"""A wait callback useful to allow eventlet to work with Psycopg."""
while 1:
if panic:
raise Exception('whatever')
state = conn.poll()
if state == extensions.POLL_OK:
break
elif state == extensions.POLL_READ:
trampoline(conn.fileno(), read=True)
elif state == extensions.POLL_WRITE:
trampoline(conn.fileno(), write=True)
else:
raise psycopg2.OperationalError(
"Bad result from poll: %r" % state)
extensions.set_wait_callback(wait_cb)
# SIGHUP handler to inject a fail in the callback
def handler(signum, frame):
panic.append(True)
signal.signal(signal.SIGHUP, handler)
# Simulate another green thread working
def worker():
while 1:
print("I'm working")
sleep(1)
eventlet.spawn(worker)
# You can unplug the network cable etc. here.
# Kill -HUP will raise an exception in the callback.
print("PID", os.getpid())
conn = psycopg2.connect(DSN)
curs = conn.cursor()
try:
for i in range(1000):
curs.execute("select %s, pg_sleep(1)", (i,))
r = curs.fetchone()
print("selected", r)
except BaseException, e:
print("got exception:", e.__class__.__name__, e)
if conn.closed:
print("the connection is closed")
else:
conn.rollback()
curs.execute("select 1")
print(curs.fetchone())

View File

@ -1,31 +0,0 @@
import gc
import sys
import os
import signal
import warnings
import psycopg2
print("Testing psycopg2 version %s" % psycopg2.__version__)
dbname = os.environ.get('PSYCOPG2_TESTDB', 'psycopg2_test')
conn = psycopg2.connect("dbname=%s" % dbname)
curs = conn.cursor()
curs.isready()
print("Now restart the test postgresql server to drop all connections, press enter when done.")
raw_input()
try:
curs.isready() # No need to test return value
curs.isready()
except:
print("Test passed")
sys.exit(0)
if curs.isready():
print("Warning: looks like the connection didn't get killed. This test is probably in-effective")
print("Test inconclusive")
sys.exit(1)
gc.collect() # used to error here
print("Test Passed")

View File

@ -1,12 +0,0 @@
def test():
import sys, os, thread, psycopg2
def test2():
while True:
for filename in map(lambda m: getattr(m, "__file__", None), sys.modules.values()):
os.stat("/dev/null")
connection = psycopg2.connect(database="test")
cursor = connection.cursor()
thread.start_new_thread(test2, ())
while True:
cursor.execute("COMMIT")
test()

View File

@ -1,8 +0,0 @@
import gtk
import psycopg2
o = psycopg2.connect("dbname=test")
c = o.cursor()
c.execute("SELECT 1.23::float AS foo")
x = c.fetchone()[0]
print(x, type(x))

View File

@ -1,73 +0,0 @@
"""
A script to reproduce the race condition described in ticket #58
from https://bugzilla.redhat.com/show_bug.cgi?id=711095
Results in the error:
python: Modules/gcmodule.c:277: visit_decref: Assertion `gc->gc.gc_refs != 0'
failed.
on unpatched library.
"""
import threading
import gc
import time
import psycopg2
from StringIO import StringIO
done = 0
class GCThread(threading.Thread):
# A thread that sits in an infinite loop, forcing the garbage collector
# to run
def run(self):
global done
while not done:
gc.collect()
time.sleep(0.1) # give the other thread a chance to run
gc_thread = GCThread()
# This assumes a pre-existing db named "test", with:
# "CREATE TABLE test (id serial PRIMARY KEY, num integer, data varchar);"
conn = psycopg2.connect("dbname=test user=postgres")
cur = conn.cursor()
# Start the other thread, running the GC regularly
gc_thread.start()
# Now do lots of "cursor.copy_from" calls:
print("copy_from")
for i in range(1000):
f = StringIO("42\tfoo\n74\tbar\n")
cur.copy_from(f, 'test', columns=('num', 'data'))
# Assuming the other thread gets a chance to run during this call, expect a
# build of python (with assertions enabled) to bail out here with:
# python: Modules/gcmodule.c:277: visit_decref: Assertion `gc->gc.gc_refs != 0' failed.
# Also exercise the copy_to code path
print("copy_to")
cur.execute("truncate test")
f = StringIO("42\tfoo\n74\tbar\n")
cur.copy_from(f, 'test', columns=('num', 'data'))
for i in range(1000):
f = StringIO()
cur.copy_to(f, 'test', columns=('num', 'data'))
# And copy_expert too
print("copy_expert")
cur.execute("truncate test")
for i in range(1000):
f = StringIO("42\tfoo\n74\tbar\n")
cur.copy_expert("copy test to stdout", f)
# Terminate the GC thread's loop:
done = 1
cur.close()
conn.close()

View File

@ -1,44 +0,0 @@
import psycopg2
import traceback
# Change the table here to something the user can create tables in ...
db = psycopg2.connect('dbname=test')
cursor = db.cursor()
print('Creating tables and sample data')
cursor.execute('''
CREATE TEMPORARY TABLE foo (
id int PRIMARY KEY
)''')
cursor.execute('''
CREATE TEMPORARY TABLE bar (
id int PRIMARY KEY,
foo_id int,
CONSTRAINT bar_foo_fk FOREIGN KEY (foo_id) REFERENCES foo(id) DEFERRABLE
)''')
cursor.execute('INSERT INTO foo VALUES (1)')
cursor.execute('INSERT INTO bar VALUES (1, 1)')
db.commit()
print('Deferring constraint and breaking referential integrity')
cursor.execute('SET CONSTRAINTS bar_foo_fk DEFERRED')
cursor.execute('UPDATE bar SET foo_id = 42 WHERE id = 1')
print('Committing (this should fail)')
try:
db.commit()
except:
traceback.print_exc()
print('Rolling back connection')
db.rollback()
print('Running a trivial query')
try:
cursor.execute('SELECT TRUE')
except:
traceback.print_exc()
print('db.closed:', db.closed)

View File

@ -1,63 +0,0 @@
from __future__ import print_function
import psycopg2, psycopg2.extensions
import threading
import gc
import time
import sys
# inherit psycopg2 connection class just so that
# garbage collector enters the tp_clear code path
# in delete_garbage()
class my_connection(psycopg2.extensions.connection):
pass
class db_user(threading.Thread):
def run(self):
conn2 = psycopg2.connect(sys.argv[1], connection_factory=my_connection)
cursor = conn2.cursor()
cursor.execute("UPDATE test_psycopg2_dealloc SET a = 3", async=1)
# the conn2 desctructor will block indefinitely
# on the completion of the query
# (and it will not be holding the GIL during that time)
print("begin conn2 del", file=sys.stderr)
del cursor, conn2
print("end conn2 del", file=sys.stderr)
def main():
# lock out a db row
conn1 = psycopg2.connect(sys.argv[1], connection_factory=my_connection)
cursor = conn1.cursor()
cursor.execute("DROP TABLE IF EXISTS test_psycopg2_dealloc")
cursor.execute("CREATE TABLE test_psycopg2_dealloc (a int)")
cursor.execute("INSERT INTO test_psycopg2_dealloc VALUES (1)")
conn1.commit()
cursor.execute("UPDATE test_psycopg2_dealloc SET a = 2", async=1)
# concurrent thread trying to access the locked row
db_user().start()
# eventually, a gc.collect run will happen
# while the conn2 is inside conn_close()
# but this second dealloc won't get blocked
# as it will avoid conn_close()
for i in range(10):
if gc.collect():
print("garbage collection done", file=sys.stderr)
break
time.sleep(1)
# we now unlock the row by invoking
# the desctructor of conn1. This will permit the
# concurrent thread destructor of conn2 to
# continue and it will end up trying to free
# self->dsn a second time.
print("begin conn1 del", file=sys.stderr)
del cursor, conn1
print("end conn1 del", file=sys.stderr)
if __name__ == '__main__':
main()

View File

@ -1,7 +0,0 @@
import psycopg2.extensions
print(dir(psycopg2._psycopg))
print(psycopg2.extensions.new_type(
(600,), "POINT", lambda oids, name, fun: None))
print("ciccia ciccia")
print(psycopg2._psycopg)

View File

@ -1,9 +0,0 @@
import datetime
import time
import psycopg2
conn = psycopg2.connect("dbname=test")
curs = conn.cursor()
curs.execute("set timezone = 'Asia/Calcutta'")
curs.execute("SELECT now()")
print(curs.fetchone()[0])

View File

@ -1,486 +0,0 @@
#
# This is a valgrind suppression file that should be used when using valgrind.
#
# Here's an example of running valgrind:
#
# cd python/dist/src
# valgrind --tool=memcheck --suppressions=Misc/valgrind-python.supp \
# ./python -E -tt ./Lib/test/regrtest.py -u bsddb,network
#
# You must edit Objects/obmalloc.c and uncomment Py_USING_MEMORY_DEBUGGER
# to use the preferred suppressions with Py_ADDRESS_IN_RANGE.
#
# If you do not want to recompile Python, you can uncomment
# suppressions for PyObject_Free and PyObject_Realloc.
#
# See Misc/README.valgrind for more information.
# all tool names: Addrcheck,Memcheck,cachegrind,helgrind,massif
{
ADDRESS_IN_RANGE/Invalid read of size 4
Memcheck:Addr4
fun:Py_ADDRESS_IN_RANGE
}
{
ADDRESS_IN_RANGE/Invalid read of size 4
Memcheck:Value4
fun:Py_ADDRESS_IN_RANGE
}
{
ADDRESS_IN_RANGE/Invalid read of size 8 (x86_64)
Memcheck:Value8
fun:Py_ADDRESS_IN_RANGE
}
{
ADDRESS_IN_RANGE/Conditional jump or move depends on uninitialised value
Memcheck:Cond
fun:Py_ADDRESS_IN_RANGE
}
{
ADDRESS_IN_RANGE/Invalid read of size 4
Memcheck:Addr4
fun:PyObject_Free
}
{
ADDRESS_IN_RANGE/Invalid read of size 4
Memcheck:Value4
fun:PyObject_Free
}
{
ADDRESS_IN_RANGE/Conditional jump or move depends on uninitialised value
Memcheck:Cond
fun:PyObject_Free
}
{
ADDRESS_IN_RANGE/Invalid read of size 4
Memcheck:Addr4
fun:PyObject_Realloc
}
{
ADDRESS_IN_RANGE/Invalid read of size 4
Memcheck:Value4
fun:PyObject_Realloc
}
{
ADDRESS_IN_RANGE/Conditional jump or move depends on uninitialised value
Memcheck:Cond
fun:PyObject_Realloc
}
###
### All the suppressions below are for errors that occur within libraries
### that Python uses. The problems to not appear to be related to Python's
### use of the libraries.
###
{
GDBM problems, see test_gdbm
Memcheck:Param
write(buf)
fun:write
fun:gdbm_open
}
{
Avoid problem in libc on gentoo
Memcheck:Cond
obj:/lib/ld-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/ld-2.3.5.so
}
{
Avoid problem in glibc on gentoo
Memcheck:Addr8
obj:/lib/ld-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/libc-2.3.5.so
obj:/lib/ld-2.3.5.so
fun:_dl_open
obj:/lib/libdl-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/libdl-2.3.5.so
fun:dlopen
}
{
Avoid problem in glibc on gentoo
Memcheck:Addr8
obj:/lib/ld-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/libc-2.3.5.so
obj:/lib/ld-2.3.5.so
fun:_dl_open
obj:/lib/libdl-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/libdl-2.3.5.so
fun:dlopen
}
{
Avoid problem in glibc on gentoo
Memcheck:Cond
obj:/lib/ld-2.3.5.so
obj:/lib/libc-2.3.5.so
obj:/lib/ld-2.3.5.so
fun:_dl_open
obj:/lib/libdl-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/libdl-2.3.5.so
fun:dlopen
}
{
Avoid problem in glibc on gentoo
Memcheck:Cond
obj:/lib/ld-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/libc-2.3.5.so
obj:/lib/ld-2.3.5.so
fun:_dl_open
obj:/lib/libdl-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/libdl-2.3.5.so
fun:dlopen
}
{
Avoid problems w/readline doing a putenv and leaking on exit
Memcheck:Leak
fun:malloc
fun:xmalloc
fun:sh_set_lines_and_columns
fun:_rl_get_screen_size
fun:_rl_init_terminal_io
obj:/lib/libreadline.so.4.3
fun:rl_initialize
fun:setup_readline
fun:initreadline
fun:_PyImport_LoadDynamicModule
fun:load_module
fun:import_submodule
fun:load_next
fun:import_module_ex
fun:PyImport_ImportModuleEx
}
{
Mysterious leak that seems to deal w/pthreads
Memcheck:Leak
fun:calloc
obj:/lib/ld-2.3.5.so
obj:/lib/ld-2.3.5.so
fun:_dl_allocate_tls
fun:__pthread_initialize_minimal
}
{
Mysterious leak that seems to deal w/pthreads
Memcheck:Leak
fun:memalign
obj:/lib/ld-2.3.5.so
fun:_dl_allocate_tls
fun:__pthread_initialize_minimal
}
###
### These occur from somewhere within the SSL, when running
### test_socket_sll. They are too general to leave on by default.
###
###{
### somewhere in SSL stuff
### Memcheck:Cond
### fun:memset
###}
###{
### somewhere in SSL stuff
### Memcheck:Value4
### fun:memset
###}
###
###{
### somewhere in SSL stuff
### Memcheck:Cond
### fun:MD5_Update
###}
###
###{
### somewhere in SSL stuff
### Memcheck:Value4
### fun:MD5_Update
###}
#
# All of these problems come from using test_socket_ssl
#
{
from test_socket_ssl
Memcheck:Cond
fun:BN_bin2bn
}
{
from test_socket_ssl
Memcheck:Cond
fun:BN_num_bits_word
}
{
from test_socket_ssl
Memcheck:Value4
fun:BN_num_bits_word
}
{
from test_socket_ssl
Memcheck:Cond
fun:BN_mod_exp_mont_word
}
{
from test_socket_ssl
Memcheck:Cond
fun:BN_mod_exp_mont
}
{
from test_socket_ssl
Memcheck:Param
write(buf)
fun:write
obj:/usr/lib/libcrypto.so.0.9.7
}
{
from test_socket_ssl
Memcheck:Cond
fun:RSA_verify
}
{
from test_socket_ssl
Memcheck:Value4
fun:RSA_verify
}
{
from test_socket_ssl
Memcheck:Value4
fun:DES_set_key_unchecked
}
{
from test_socket_ssl
Memcheck:Value4
fun:DES_encrypt2
}
{
from test_socket_ssl
Memcheck:Cond
obj:/usr/lib/libssl.so.0.9.7
}
{
from test_socket_ssl
Memcheck:Value4
obj:/usr/lib/libssl.so.0.9.7
}
{
from test_socket_ssl
Memcheck:Cond
fun:BUF_MEM_grow_clean
}
{
from test_socket_ssl
Memcheck:Cond
fun:memcpy
fun:ssl3_read_bytes
}
{
from test_socket_ssl
Memcheck:Cond
fun:SHA1_Update
}
{
from test_socket_ssl
Memcheck:Value4
fun:SHA1_Update
}
{
Debian unstable with libc-i686 suppressions
Memcheck:Cond
obj:/lib/ld-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/tls/i686/cmov/libc-2.3.5.so
obj:/lib/ld-2.3.5.so
fun:_dl_open
obj:/lib/tls/i686/cmov/libdl-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/tls/i686/cmov/libdl-2.3.5.so
fun:dlopen
fun:_PyImport_GetDynLoadFunc
fun:_PyImport_LoadDynamicModule
}
{
Debian unstable with libc-i686 suppressions
Memcheck:Cond
obj:/lib/ld-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/tls/i686/cmov/libc-2.3.5.so
obj:/lib/ld-2.3.5.so
fun:_dl_open
obj:/lib/tls/i686/cmov/libdl-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/tls/i686/cmov/libdl-2.3.5.so
fun:dlopen
fun:_PyImport_GetDynLoadFunc
fun:_PyImport_LoadDynamicModule
}
{
Debian unstable with libc-i686 suppressions
Memcheck:Addr4
obj:/lib/ld-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/tls/i686/cmov/libc-2.3.5.so
obj:/lib/ld-2.3.5.so
fun:_dl_open
obj:/lib/tls/i686/cmov/libdl-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/tls/i686/cmov/libdl-2.3.5.so
fun:dlopen
fun:_PyImport_GetDynLoadFunc
fun:_PyImport_LoadDynamicModule
}
{
Debian unstable with libc-i686 suppressions
Memcheck:Addr4
obj:/lib/ld-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/tls/i686/cmov/libc-2.3.5.so
obj:/lib/ld-2.3.5.so
fun:_dl_open
obj:/lib/tls/i686/cmov/libdl-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/tls/i686/cmov/libdl-2.3.5.so
}
{
Debian unstable with libc-i686 suppressions
Memcheck:Cond
obj:/lib/ld-2.3.5.so
obj:/lib/tls/i686/cmov/libc-2.3.5.so
obj:/lib/ld-2.3.5.so
fun:_dl_open
obj:/lib/tls/i686/cmov/libdl-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/tls/i686/cmov/libdl-2.3.5.so
fun:dlopen
fun:_PyImport_GetDynLoadFunc
fun:_PyImport_LoadDynamicModule
obj:/usr/bin/python2.3
obj:/usr/bin/python2.3
}
{
Debian unstable with libc-i686 suppressions
Memcheck:Cond
obj:/lib/ld-2.3.5.so
obj:/lib/tls/i686/cmov/libc-2.3.5.so
obj:/lib/ld-2.3.5.so
fun:_dl_open
obj:/lib/tls/i686/cmov/libc-2.3.5.so
obj:/lib/ld-2.3.5.so
fun:__libc_dlopen_mode
fun:__nss_lookup_function
obj:/lib/tls/i686/cmov/libc-2.3.5.so
fun:__nss_passwd_lookup
fun:getpwuid_r
fun:pqGetpwuid
}
{
Debian unstable with libc-i686 suppressions
Memcheck:Cond
obj:/lib/ld-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/tls/i686/cmov/libc-2.3.5.so
obj:/lib/ld-2.3.5.so
fun:_dl_open
obj:/lib/tls/i686/cmov/libc-2.3.5.so
obj:/lib/ld-2.3.5.so
fun:__libc_dlopen_mode
fun:__nss_lookup_function
obj:/lib/tls/i686/cmov/libc-2.3.5.so
fun:__nss_passwd_lookup
}
{
Debian unstable with libc-i686 suppressions
Memcheck:Addr4
obj:/lib/ld-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/tls/i686/cmov/libc-2.3.5.so
obj:/lib/ld-2.3.5.so
fun:_dl_open
obj:/lib/tls/i686/cmov/libc-2.3.5.so
obj:/lib/ld-2.3.5.so
fun:__libc_dlopen_mode
fun:__nss_lookup_function
obj:/lib/tls/i686/cmov/libnss_compat-2.3.5.so
fun:_nss_compat_getpwuid_r
}
{
Debian unstable with libc-i686 suppressions
Memcheck:Cond
obj:/lib/ld-2.3.5.so
obj:/lib/tls/i686/cmov/libc-2.3.5.so
obj:/lib/ld-2.3.5.so
fun:_dl_open
obj:/lib/tls/i686/cmov/libdl-2.3.5.so
obj:/lib/ld-2.3.5.so
obj:/lib/tls/i686/cmov/libdl-2.3.5.so
fun:dlopen
fun:_PyImport_GetDynLoadFunc
fun:_PyImport_LoadDynamicModule
obj:/usr/bin/python2.4
obj:/usr/bin/python2.4
}

View File

@ -19,6 +19,7 @@ script exits with error 1.
# License for more details.
from __future__ import print_function
import argparse
import gc
import sys
import difflib
@ -62,19 +63,16 @@ def main():
def parse_args():
import optparse
parser = optparse.OptionParser(description=__doc__)
parser.add_option('--nruns', type='int', metavar="N", default=3,
help="number of test suite runs [default: %default]")
parser.add_option('--suite', metavar="NAME",
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument('--nruns', type=int, metavar="N", default=3,
help="number of test suite runs [default: %(default)d]")
parser.add_argument('--suite', metavar="NAME",
help="the test suite to run (e.g. 'test_cursor'). [default: all]")
parser.add_option('--objs', metavar="TYPE",
parser.add_argument('--objs', metavar="TYPE",
help="in case of leaks, print a report of object TYPE "
"(support still incomplete)")
opt, args = parser.parse_args()
return opt
return parser.parse_args()
def dump(i, opt):

View File

@ -39,7 +39,7 @@ from .testutils import (
skip_after_postgres, skip_before_libpq, skip_after_libpq,
ConnectingTestCase, skip_if_tpc_disabled, skip_if_windows, slow)
from .testconfig import dsn, dbname
from .testconfig import dbhost, dsn, dbname
class ConnectionTests(ConnectingTestCase):
@ -1682,6 +1682,19 @@ while True:
self.assert_(not err, err)
class TestConnectionProps(ConnectingTestCase):
def test_host(self):
self.assertFalse(self.conn.closed)
expected = dbhost if dbhost else "/"
self.assertIn(expected, self.conn.host)
def test_host_readonly(self):
self.assertFalse(self.conn.closed)
with self.assertRaises(AttributeError):
self.conn.host = 'override'
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)

View File

@ -173,8 +173,8 @@ class ExceptionsTestCase(ConnectingTestCase):
'column_name', 'constraint_name', 'context', 'datatype_name',
'internal_position', 'internal_query', 'message_detail',
'message_hint', 'message_primary', 'schema_name', 'severity',
'source_file', 'source_function', 'source_line', 'sqlstate',
'statement_position', 'table_name', ]:
'severity_nonlocalized', 'source_file', 'source_function',
'source_line', 'sqlstate', 'statement_position', 'table_name', ]:
v = getattr(diag, attr)
if v is not None:
self.assert_(isinstance(v, str))
@ -276,6 +276,15 @@ class ExceptionsTestCase(ConnectingTestCase):
self.assertEqual(e.diag.constraint_name, "chk_eq1")
self.assertEqual(e.diag.datatype_name, None)
@skip_before_postgres(9, 6)
def test_9_6_diagnostics(self):
cur = self.conn.cursor()
try:
cur.execute("select 1 from nosuchtable")
except psycopg2.Error as exc:
e = exc
self.assertEqual(e.diag.severity_nonlocalized, 'ERROR')
def test_pickle(self):
import pickle
cur = self.conn.cursor()

View File

@ -24,9 +24,8 @@
import datetime as dt
import unittest
from .testutils import (ConnectingTestCase,
skip_before_postgres, skip_before_python, skip_copy_if_green,
StringIO)
from .testutils import (
ConnectingTestCase, skip_before_postgres, skip_copy_if_green, StringIO)
import psycopg2
from psycopg2 import sql
@ -181,26 +180,43 @@ class IdentifierTests(ConnectingTestCase):
def test_init(self):
self.assert_(isinstance(sql.Identifier('foo'), sql.Identifier))
self.assert_(isinstance(sql.Identifier(u'foo'), sql.Identifier))
self.assert_(isinstance(sql.Identifier('foo', 'bar', 'baz'), sql.Identifier))
self.assertRaises(TypeError, sql.Identifier)
self.assertRaises(TypeError, sql.Identifier, 10)
self.assertRaises(TypeError, sql.Identifier, dt.date(2016, 12, 31))
def test_string(self):
def test_strings(self):
self.assertEqual(sql.Identifier('foo').strings, ('foo',))
self.assertEqual(sql.Identifier('foo', 'bar').strings, ('foo', 'bar'))
# Legacy method
self.assertEqual(sql.Identifier('foo').string, 'foo')
self.assertRaises(AttributeError,
getattr, sql.Identifier('foo', 'bar'), 'string')
def test_repr(self):
obj = sql.Identifier("fo'o")
self.assertEqual(repr(obj), 'Identifier("fo\'o")')
self.assertEqual(repr(obj), str(obj))
obj = sql.Identifier("fo'o", 'ba"r')
self.assertEqual(repr(obj), 'Identifier("fo\'o", \'ba"r\')')
self.assertEqual(repr(obj), str(obj))
def test_eq(self):
self.assert_(sql.Identifier('foo') == sql.Identifier('foo'))
self.assert_(sql.Identifier('foo', 'bar') == sql.Identifier('foo', 'bar'))
self.assert_(sql.Identifier('foo') != sql.Identifier('bar'))
self.assert_(sql.Identifier('foo') != 'foo')
self.assert_(sql.Identifier('foo') != sql.SQL('foo'))
def test_as_str(self):
self.assertEqual(sql.Identifier('foo').as_string(self.conn), '"foo"')
self.assertEqual(sql.Identifier("fo'o").as_string(self.conn), '"fo\'o"')
self.assertEqual(
sql.Identifier('foo').as_string(self.conn), '"foo"')
self.assertEqual(
sql.Identifier('foo', 'bar').as_string(self.conn), '"foo"."bar"')
self.assertEqual(
sql.Identifier("fo'o", 'ba"r').as_string(self.conn), '"fo\'o"."ba""r"')
def test_join(self):
self.assert_(not hasattr(sql.Identifier('foo'), 'join'))

View File

@ -167,6 +167,10 @@ class TypesBasicTests(ConnectingTestCase):
curs.execute("select col from array_test where id = 2")
self.assertEqual(curs.fetchone()[0], [])
# issue #788 (test commented out until issue fixed)
#curs.execute("select null = any(%s)", ([[]], ))
#self.assertFalse(curs.fetchone()[0])
def testEmptyArrayNoCast(self):
s = self.execute("SELECT '{}' AS foo")
self.assertEqual(s, '{}')

View File

@ -1386,6 +1386,52 @@ class RangeTestCase(unittest.TestCase):
r = Range(0, 4)
self.assertEqual(loads(dumps(r)), r)
def test_str(self):
'''
Range types should have a short and readable ``str`` implementation.
Using ``repr`` for all string conversions can be very unreadable for
longer types like ``DateTimeTZRange``.
'''
from psycopg2.extras import Range
# Using the "u" prefix to make sure we have the proper return types in
# Python2
expected = [
u'(0, 4)',
u'[0, 4]',
u'(0, 4]',
u'[0, 4)',
u'empty',
]
results = []
converter = unicode if sys.version_info < (3, 0) else str
for bounds in ('()', '[]', '(]', '[)'):
r = Range(0, 4, bounds=bounds)
results.append(converter(r))
r = Range(empty=True)
results.append(converter(r))
self.assertEqual(results, expected)
def test_str_datetime(self):
'''
Date-Time ranges should return a human-readable string as well on
string conversion.
'''
from psycopg2.extras import DateTimeTZRange
from datetime import datetime
from psycopg2.tz import FixedOffsetTimezone
converter = unicode if sys.version_info < (3, 0) else str
tz = FixedOffsetTimezone(-5*60, "EST")
r = DateTimeTZRange(datetime(2010, 1, 1, tzinfo=tz),
datetime(2011, 1, 1, tzinfo=tz))
expected = u'[2010-01-01 00:00:00-05:00, 2011-01-01 00:00:00-05:00)'
result = converter(r)
self.assertEqual(result, expected)
def skip_if_no_range(f):
@wraps(f)