Merge branch 'master' into feature/replication-message-object

Conflicts:
	lib/extensions.py
This commit is contained in:
Oleksandr Shulgin 2015-06-30 10:30:32 +02:00
commit e3c3a2c19e
23 changed files with 304 additions and 49 deletions

15
NEWS
View File

@ -1,9 +1,24 @@
Current release
---------------
What's new in psycopg 2.7
-------------------------
New features:
- Added `~psycopg2.__libpq_version__` and
`~psycopg2.extensions.libpq_version()` to inspect the version of the
``libpq`` library the module was compiled/loaded with
(:tickets:`#35, #323`).
- The attributes `~connection.notices` and `~connection.notifies` can be
customized replacing them with any object exposing an `!append()` method
(:ticket:`#326`).
What's new in psycopg 2.6.1
^^^^^^^^^^^^^^^^^^^^^^^^^^^
- Lists consisting of only `None` are escaped correctly (:ticket:`#285`).
- Fixed deadlock in multithread programs using OpenSSL (:ticket:`#290`).
- Correctly unlock the connection after error in flush (:ticket:`#294`).
- Fixed ``MinTimeLoggingCursor.callproc()`` (:ticket:`#309`).

View File

@ -291,7 +291,7 @@ something to read::
else:
conn.poll()
while conn.notifies:
notify = conn.notifies.pop()
notify = conn.notifies.pop(0)
print "Got NOTIFY:", notify.pid, notify.channel, notify.payload
Running the script and executing a command such as :sql:`NOTIFY test, 'hello'`
@ -312,6 +312,10 @@ received from a previous version server will have the
Added `~psycopg2.extensions.Notify` object and handling notification
payload.
.. versionchanged:: 2.7
The `~connection.notifies` attribute is writable: it is possible to
replace it with any object exposing an `!append()` method. An useful
example would be to use a `~collections.deque` object.
.. index::

View File

@ -419,8 +419,8 @@ The ``connection`` class
By default, any query execution, including a simple :sql:`SELECT`
will start a transaction: for long-running programs, if no further
action is taken, the session will remain "idle in transaction", a
condition non desiderable for several reasons (locks are held by
action is taken, the session will remain "idle in transaction", an
undesirable condition for several reasons (locks are held by
the session, tables bloat...). For long lived scripts, either
ensure to terminate a transaction as soon as possible or use an
autocommit connection.
@ -483,8 +483,16 @@ The ``connection`` class
['NOTICE: CREATE TABLE / PRIMARY KEY will create implicit index "foo_pkey" for table "foo"\n',
'NOTICE: CREATE TABLE will create implicit sequence "foo_id_seq" for serial column "foo.id"\n']
.. versionchanged:: 2.7
The `!notices` attribute is writable: the user may replace it
with any Python object exposing an `!append()` method. If
appending raises an exception the notice is silently
dropped.
To avoid a leak in case excessive notices are generated, only the last
50 messages are kept.
50 messages are kept. This check is only in place if the `!notices`
attribute is a list: if any other object is used it will be up to the
user to guard from leakage.
You can configure what messages to receive using `PostgreSQL logging
configuration parameters`__ such as ``log_statement``,
@ -506,6 +514,12 @@ The ``connection`` class
the payload was not accessible. To keep backward compatibility,
`!Notify` objects can still be accessed as 2 items tuples.
.. versionchanged:: 2.7
The `!notifies` attribute is writable: the user may replace it
with any Python object exposing an `!append()` method. If
appending raises an exception the notification is silently
dropped.
.. attribute:: cursor_factory

View File

@ -197,6 +197,18 @@ functionalities defined by the |DBAPI|_.
.. versionadded:: 2.2.0
.. function:: libpq_version()
Return the version number of the ``libpq`` dynamic library loaded as an
integer, in the same format of `~connection.server_version`.
Raise `~psycopg2.NotSupportedError` if the ``psycopg2`` module was
compiled with a ``libpq`` version lesser than 9.1 (which can be detected
by the `~psycopg2.__libpq_version__` constant).
.. seealso:: libpq docs for `PQlibVersion()`__.
.. __: http://www.postgresql.org/docs/current/static/libpq-misc.html#LIBPQ-PQLIBVERSION
.. _sql-adaptation-objects:

View File

@ -109,6 +109,13 @@ The module interface respects the standard defined in the |DBAPI|_.
by the interface. For `psycopg2` is ``pyformat``. See also
:ref:`query-parameters`.
.. data:: __libpq_version__
Integer constant reporting the version of the ``libpq`` library this
``psycopg2`` module was compiled with (in the same format of
`~connection.server_version`). If this value is greater or equal than
``90100`` then you may query the version of the actually loaded library
using the `~psycopg2.extensions.libpq_version()` function.
.. index::

View File

@ -679,7 +679,7 @@ older versions).
By default even a simple :sql:`SELECT` will start a transaction: in
long-running programs, if no further action is taken, the session will
remain "idle in transaction", a condition non desiderable for several
remain "idle in transaction", an undesirable condition for several
reasons (locks are held by the session, tables bloat...). For long lived
scripts, either make sure to terminate a transaction as soon as possible or
use an autocommit connection.

View File

@ -57,7 +57,7 @@ from psycopg2._psycopg import IntegrityError, InterfaceError, InternalError
from psycopg2._psycopg import NotSupportedError, OperationalError
from psycopg2._psycopg import _connect, apilevel, threadsafety, paramstyle
from psycopg2._psycopg import __version__
from psycopg2._psycopg import __version__, __libpq_version__
from psycopg2 import tz

View File

@ -56,7 +56,7 @@ try:
except ImportError:
pass
from psycopg2._psycopg import adapt, adapters, encodings, connection, cursor, replicationMessage, lobject, Xid
from psycopg2._psycopg import adapt, adapters, encodings, connection, cursor, replicationMessage, lobject, Xid, libpq_version
from psycopg2._psycopg import string_types, binary_types, new_type, new_array_type, register_type
from psycopg2._psycopg import ISQLQuote, Notify, Diagnostics, Column

View File

@ -39,7 +39,7 @@ static unsigned char *
binary_escape(unsigned char *from, size_t from_length,
size_t *to_length, PGconn *conn)
{
#if PG_VERSION_HEX >= 0x080104
#if PG_VERSION_NUM >= 80104
if (conn)
return PQescapeByteaConn(conn, from, from_length, to_length);
else

View File

@ -39,6 +39,14 @@ list_quote(listObject *self)
/* adapt the list by calling adapt() recursively and then wrapping
everything into "ARRAY[]" */
PyObject *tmp = NULL, *str = NULL, *joined = NULL, *res = NULL;
/* list consisting of only NULL don't work with the ARRAY[] construct
* so we use the {NULL,...} syntax. Note however that list of lists where
* some element is a list of only null still fails: for that we should use
* the '{...}' syntax uniformly but we cannot do it in the current
* infrastructure. TODO in psycopg3 */
int all_nulls = 1;
Py_ssize_t i, len;
len = PyList_GET_SIZE(self->wrapped);
@ -60,6 +68,7 @@ list_quote(listObject *self)
quoted = microprotocol_getquoted(wrapped,
(connectionObject*)self->connection);
if (quoted == NULL) goto error;
all_nulls = 0;
}
/* here we don't loose a refcnt: SET_ITEM does not change the
@ -74,7 +83,12 @@ list_quote(listObject *self)
joined = PyObject_CallMethod(str, "join", "(O)", tmp);
if (joined == NULL) goto error;
/* PG doesn't like ARRAY[NULL..] */
if (!all_nulls) {
res = Bytes_FromFormat("ARRAY[%s]", Bytes_AsString(joined));
} else {
res = Bytes_FromFormat("'{%s}'", Bytes_AsString(joined));
}
error:
Py_XDECREF(tmp);

View File

@ -71,7 +71,7 @@ extern HIDDEN PyTypeObject connectionType;
struct connectionObject_notice {
struct connectionObject_notice *next;
const char *message;
char *message;
};
/* the typedef is forward-declared in psycopg.h */
@ -106,8 +106,8 @@ struct connectionObject {
/* notice processing */
PyObject *notice_list;
PyObject *notice_filter;
struct connectionObject_notice *notice_pending;
struct connectionObject_notice *last_notice;
/* notifies */
PyObject *notifies;

View File

@ -87,13 +87,20 @@ conn_notice_callback(void *args, const char *message)
/* Discard the notice in case of failed allocation. */
return;
}
notice->next = NULL;
notice->message = strdup(message);
if (NULL == notice->message) {
free(notice);
return;
}
notice->next = self->notice_pending;
self->notice_pending = notice;
if (NULL == self->last_notice) {
self->notice_pending = self->last_notice = notice;
}
else {
self->last_notice->next = notice;
self->last_notice = notice;
}
}
/* Expose the notices received as Python objects.
@ -104,44 +111,60 @@ void
conn_notice_process(connectionObject *self)
{
struct connectionObject_notice *notice;
Py_ssize_t nnotices;
PyObject *msg = NULL;
PyObject *tmp = NULL;
static PyObject *append;
if (NULL == self->notice_pending) {
return;
}
notice = self->notice_pending;
nnotices = PyList_GET_SIZE(self->notice_list);
if (!append) {
if (!(append = Text_FromUTF8("append"))) {
goto error;
}
}
notice = self->notice_pending;
while (notice != NULL) {
PyObject *msg;
msg = conn_text_from_chars(self, notice->message);
Dprintf("conn_notice_process: %s", notice->message);
/* Respect the order in which notices were produced,
because in notice_list they are reversed (see ticket #9) */
if (msg) {
PyList_Insert(self->notice_list, nnotices, msg);
Py_DECREF(msg);
}
else {
/* We don't really have a way to report errors, so gulp it.
* The function should only fail for out of memory, so we are
* likely going to die anyway. */
PyErr_Clear();
if (!(msg = conn_text_from_chars(self, notice->message))) { goto error; }
if (!(tmp = PyObject_CallMethodObjArgs(
self->notice_list, append, msg, NULL))) {
goto error;
}
Py_DECREF(tmp); tmp = NULL;
Py_DECREF(msg); msg = NULL;
notice = notice->next;
}
/* Remove the oldest item if the queue is getting too long. */
if (PyList_Check(self->notice_list)) {
Py_ssize_t nnotices;
nnotices = PyList_GET_SIZE(self->notice_list);
if (nnotices > CONN_NOTICES_LIMIT) {
PySequence_DelSlice(self->notice_list,
0, nnotices - CONN_NOTICES_LIMIT);
if (-1 == PySequence_DelSlice(self->notice_list,
0, nnotices - CONN_NOTICES_LIMIT)) {
PyErr_Clear();
}
}
}
conn_notice_clean(self);
return;
error:
Py_XDECREF(tmp);
Py_XDECREF(msg);
conn_notice_clean(self);
/* TODO: the caller doesn't expects errors from us */
PyErr_Clear();
}
void
@ -154,11 +177,11 @@ conn_notice_clean(connectionObject *self)
while (notice != NULL) {
tmp = notice;
notice = notice->next;
free((void*)tmp->message);
free(tmp->message);
free(tmp);
}
self->notice_pending = NULL;
self->last_notice = self->notice_pending = NULL;
}
@ -173,6 +196,15 @@ conn_notifies_process(connectionObject *self)
PGnotify *pgn = NULL;
PyObject *notify = NULL;
PyObject *pid = NULL, *channel = NULL, *payload = NULL;
PyObject *tmp = NULL;
static PyObject *append;
if (!append) {
if (!(append = Text_FromUTF8("append"))) {
goto error;
}
}
while ((pgn = PQnotifies(self->pgconn)) != NULL) {
@ -192,7 +224,11 @@ conn_notifies_process(connectionObject *self)
Py_DECREF(channel); channel = NULL;
Py_DECREF(payload); payload = NULL;
PyList_Append(self->notifies, (PyObject *)notify);
if (!(tmp = PyObject_CallMethodObjArgs(
self->notifies, append, notify, NULL))) {
goto error;
}
Py_DECREF(tmp); tmp = NULL;
Py_DECREF(notify); notify = NULL;
PQfreemem(pgn); pgn = NULL;
@ -201,6 +237,7 @@ conn_notifies_process(connectionObject *self)
error:
if (pgn) { PQfreemem(pgn); }
Py_XDECREF(tmp);
Py_XDECREF(notify);
Py_XDECREF(pid);
Py_XDECREF(channel);

View File

@ -1001,8 +1001,8 @@ static struct PyMemberDef connectionObject_members[] = {
"True if the connection is closed."},
{"encoding", T_STRING, offsetof(connectionObject, encoding), READONLY,
"The current client encoding."},
{"notices", T_OBJECT, offsetof(connectionObject, notice_list), READONLY},
{"notifies", T_OBJECT, offsetof(connectionObject, notifies), READONLY},
{"notices", T_OBJECT, offsetof(connectionObject, notice_list), 0},
{"notifies", T_OBJECT, offsetof(connectionObject, notifies), 0},
{"dsn", T_STRING, offsetof(connectionObject, dsn), READONLY,
"The current connection string."},
{"async", T_LONG, offsetof(connectionObject, async), READONLY,
@ -1105,7 +1105,6 @@ connection_clear(connectionObject *self)
Py_CLEAR(self->tpc_xid);
Py_CLEAR(self->async_cursor);
Py_CLEAR(self->notice_list);
Py_CLEAR(self->notice_filter);
Py_CLEAR(self->notifies);
Py_CLEAR(self->string_types);
Py_CLEAR(self->binary_types);
@ -1181,7 +1180,6 @@ connection_traverse(connectionObject *self, visitproc visit, void *arg)
Py_VISIT((PyObject *)(self->tpc_xid));
Py_VISIT(self->async_cursor);
Py_VISIT(self->notice_list);
Py_VISIT(self->notice_filter);
Py_VISIT(self->notifies);
Py_VISIT(self->string_types);
Py_VISIT(self->binary_types);

View File

@ -474,7 +474,7 @@ lobject_export(lobjectObject *self, const char *filename)
return retvalue;
}
#if PG_VERSION_HEX >= 0x080300
#if PG_VERSION_NUM >= 80300
RAISES_NEG int
lobject_truncate(lobjectObject *self, size_t len)
@ -511,4 +511,4 @@ lobject_truncate(lobjectObject *self, size_t len)
}
#endif /* PG_VERSION_HEX >= 0x080300 */
#endif /* PG_VERSION_NUM >= 80300 */

View File

@ -266,7 +266,7 @@ psyco_lobj_get_closed(lobjectObject *self, void *closure)
return closed;
}
#if PG_VERSION_HEX >= 0x080300
#if PG_VERSION_NUM >= 80300
#define psyco_lobj_truncate_doc \
"truncate(len=0) -- Truncate large object to given size."
@ -327,10 +327,10 @@ static struct PyMethodDef lobjectObject_methods[] = {
METH_NOARGS, psyco_lobj_unlink_doc},
{"export",(PyCFunction)psyco_lobj_export,
METH_VARARGS, psyco_lobj_export_doc},
#if PG_VERSION_HEX >= 0x080300
#if PG_VERSION_NUM >= 80300
{"truncate",(PyCFunction)psyco_lobj_truncate,
METH_VARARGS, psyco_lobj_truncate_doc},
#endif /* PG_VERSION_HEX >= 0x080300 */
#endif /* PG_VERSION_NUM >= 80300 */
{NULL}
};

View File

@ -186,7 +186,7 @@ psyco_libcrypto_threads_init(void)
if (PyImport_ImportModule("ssl") != NULL) {
/* disable libcrypto setup in libpq, so it won't stomp on the callbacks
that have already been set up */
#if PG_VERSION_HEX >= 0x080400
#if PG_VERSION_NUM >= 80400
PQinitOpenSSL(1, 0);
#endif
}
@ -301,6 +301,19 @@ exit:
return rv;
}
#define psyco_libpq_version_doc "Query actual libpq version loaded."
static PyObject*
psyco_libpq_version(PyObject *self)
{
#if PG_VERSION_NUM >= 90100
return PyInt_FromLong(PQlibVersion());
#else
PyErr_SetString(NotSupportedError, "version discovery is not supported in libpq < 9.1");
return NULL;
#endif
}
/* psyco_encodings_fill
Fill the module's postgresql<->python encoding table */
@ -705,6 +718,8 @@ static PyMethodDef psycopgMethods[] = {
METH_VARARGS|METH_KEYWORDS, typecast_from_python_doc},
{"new_array_type", (PyCFunction)typecast_array_from_python,
METH_VARARGS|METH_KEYWORDS, typecast_array_from_python_doc},
{"libpq_version", (PyCFunction)psyco_libpq_version,
METH_NOARGS, psyco_libpq_version_doc},
{"Date", (PyCFunction)psyco_Date,
METH_VARARGS, psyco_Date_doc},
@ -904,6 +919,7 @@ INIT_MODULE(_psycopg)(void)
/* set some module's parameters */
PyModule_AddStringConstant(module, "__version__", PSYCOPG_VERSION);
PyModule_AddStringConstant(module, "__doc__", "psycopg PostgreSQL driver");
PyModule_AddIntConstant(module, "__libpq_version__", PG_VERSION_NUM);
PyModule_AddObject(module, "apilevel", Text_FromUTF8(APILEVEL));
PyModule_AddObject(module, "threadsafety", PyInt_FromLong(THREADSAFETY));
PyModule_AddObject(module, "paramstyle", Text_FromUTF8(PARAMSTYLE));

View File

@ -62,7 +62,7 @@ psycopg_escape_string(connectionObject *conn, const char *from, Py_ssize_t len,
}
{
#if PG_VERSION_HEX >= 0x080104
#if PG_VERSION_NUM >= 80104
int err;
if (conn && conn->pgconn)
ql = PQescapeStringConn(conn->pgconn, to+eq+1, from, len, &err);

View File

@ -416,7 +416,7 @@ class psycopg_build_ext(build_ext):
% pgversion)
sys.exit(1)
define_macros.append(("PG_VERSION_HEX", "0x%02X%02X%02X" %
define_macros.append(("PG_VERSION_NUM", "%d%02d%02d" %
(pgmajor, pgminor, pgpatch)))
# enable lo64 if libpq >= 9.3 and Python 64 bits

View File

@ -129,6 +129,42 @@ class ConnectionTests(ConnectingTestCase):
self.assertEqual(50, len(conn.notices))
self.assert_('table99' in conn.notices[-1], conn.notices[-1])
def test_notices_deque(self):
from collections import deque
conn = self.conn
self.conn.notices = deque()
cur = conn.cursor()
if self.conn.server_version >= 90300:
cur.execute("set client_min_messages=debug1")
cur.execute("create temp table table1 (id serial); create temp table table2 (id serial);")
cur.execute("create temp table table3 (id serial); create temp table table4 (id serial);")
self.assertEqual(len(conn.notices), 4)
self.assert_('table1' in conn.notices.popleft())
self.assert_('table2' in conn.notices.popleft())
self.assert_('table3' in conn.notices.popleft())
self.assert_('table4' in conn.notices.popleft())
self.assertEqual(len(conn.notices), 0)
# not limited, but no error
for i in range(0, 100, 10):
sql = " ".join(["create temp table table2_%d (id serial);" % j for j in range(i, i+10)])
cur.execute(sql)
self.assertEqual(100, len(conn.notices))
def test_notices_noappend(self):
conn = self.conn
self.conn.notices = None # will make an error swallowes ok
cur = conn.cursor()
if self.conn.server_version >= 90300:
cur.execute("set client_min_messages=debug1")
cur.execute("create temp table table1 (id serial);")
self.assertEqual(self.conn.notices, None)
def test_server_version(self):
self.assert_(self.conn.server_version)

View File

@ -320,6 +320,15 @@ import _psycopg
self.assertEqual(0, proc.returncode)
class TestVersionDiscovery(unittest.TestCase):
def test_libpq_version(self):
self.assertTrue(type(psycopg2.__libpq_version__) is int)
try:
self.assertTrue(type(psycopg2.extensions.libpq_version()) is int)
except NotSupportedError:
self.assertTrue(psycopg2.__libpq_version__ < 90100)
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)

View File

@ -155,6 +155,27 @@ conn.close()
self.assertEqual('foo', notify.channel)
self.assertEqual('Hello, world!', notify.payload)
def test_notify_deque(self):
from collections import deque
self.autocommit(self.conn)
self.conn.notifies = deque()
self.listen('foo')
self.notify('foo').communicate()
time.sleep(0.5)
self.conn.poll()
notify = self.conn.notifies.popleft()
self.assert_(isinstance(notify, psycopg2.extensions.Notify))
self.assertEqual(len(self.conn.notifies), 0)
def test_notify_noappend(self):
self.autocommit(self.conn)
self.conn.notifies = None
self.listen('foo')
self.notify('foo').communicate()
time.sleep(0.5)
self.conn.poll()
self.assertEqual(self.conn.notifies, None)
def test_notify_init(self):
n = psycopg2.extensions.Notify(10, 'foo')
self.assertEqual(10, n.pid)
@ -192,6 +213,7 @@ conn.close()
self.assertNotEqual(hash(Notify(10, 'foo', 'bar')),
hash(Notify(10, 'foo')))
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)

View File

@ -192,6 +192,40 @@ class TypesBasicTests(ConnectingTestCase):
self.assertRaises(psycopg2.DataError,
psycopg2.extensions.STRINGARRAY, b(s), curs)
def testArrayOfNulls(self):
curs = self.conn.cursor()
curs.execute("""
create table na (
texta text[],
inta int[],
boola boolean[],
textaa text[][],
intaa int[][],
boolaa boolean[][]
)""")
curs.execute("insert into na (texta) values (%s)", ([None],))
curs.execute("insert into na (texta) values (%s)", (['a', None],))
curs.execute("insert into na (texta) values (%s)", ([None, None],))
curs.execute("insert into na (inta) values (%s)", ([None],))
curs.execute("insert into na (inta) values (%s)", ([42, None],))
curs.execute("insert into na (inta) values (%s)", ([None, None],))
curs.execute("insert into na (boola) values (%s)", ([None],))
curs.execute("insert into na (boola) values (%s)", ([True, None],))
curs.execute("insert into na (boola) values (%s)", ([None, None],))
# TODO: array of array of nulls are not supported yet
# curs.execute("insert into na (textaa) values (%s)", ([[None]],))
curs.execute("insert into na (textaa) values (%s)", ([['a', None]],))
# curs.execute("insert into na (textaa) values (%s)", ([[None, None]],))
# curs.execute("insert into na (intaa) values (%s)", ([[None]],))
curs.execute("insert into na (intaa) values (%s)", ([[42, None]],))
# curs.execute("insert into na (intaa) values (%s)", ([[None, None]],))
# curs.execute("insert into na (boolaa) values (%s)", ([[None]],))
curs.execute("insert into na (boolaa) values (%s)", ([[True, None]],))
# curs.execute("insert into na (boolaa) values (%s)", ([[None, None]],))
@testutils.skip_from_python(3)
def testTypeRoundtripBuffer(self):
o1 = buffer("".join(map(chr, range(256))))

View File

@ -236,6 +236,43 @@ def skip_after_postgres(*ver):
return skip_after_postgres__
return skip_after_postgres_
def libpq_version():
import psycopg2
v = psycopg2.__libpq_version__
if v >= 90100:
v = psycopg2.extensions.libpq_version()
return v
def skip_before_libpq(*ver):
"""Skip a test if libpq we're linked to is older than a certain version."""
ver = ver + (0,) * (3 - len(ver))
def skip_before_libpq_(f):
@wraps(f)
def skip_before_libpq__(self):
v = libpq_version()
if v < int("%d%02d%02d" % ver):
return self.skipTest("skipped because libpq %d" % v)
else:
return f(self)
return skip_before_libpq__
return skip_before_libpq_
def skip_after_libpq(*ver):
"""Skip a test if libpq we're linked to is newer than a certain version."""
ver = ver + (0,) * (3 - len(ver))
def skip_after_libpq_(f):
@wraps(f)
def skip_after_libpq__(self):
v = libpq_version()
if v >= int("%d%02d%02d" % ver):
return self.skipTest("skipped because libpq %s" % v)
else:
return f(self)
return skip_after_libpq__
return skip_after_libpq_
def skip_before_python(*ver):
"""Skip a test on Python before a certain version."""
def skip_before_python_(f):