Cleaned conn_notice_callback() to run without the GIL

This commit is contained in:
Federico Di Gregorio 2009-04-04 19:17:40 +02:00
parent f1e532151f
commit 16c2a8fc81
5 changed files with 86 additions and 8 deletions

View File

@ -1,3 +1,10 @@
2009-04-04 Federico Di Gregorio <fog@initd.org>
* connection_int.c(conn_notice_callback): removed all Python
calls because conn_notice_callback() can be called without a lock
on the GIL. Moved processing and cleanup of notices into their
own functions that are called while holding the GIL.
2009-04-01 Federico Di Gregorio <fog@initd.org> 2009-04-01 Federico Di Gregorio <fog@initd.org>
* Applied patch from Menno Smits to fix failures in test_dates * Applied patch from Menno Smits to fix failures in test_dates

View File

@ -43,6 +43,11 @@ extern "C" {
extern HIDDEN PyTypeObject connectionType; extern HIDDEN PyTypeObject connectionType;
struct connectionObject_notice {
struct connectionObject_notice *next;
const char *message;
};
typedef struct { typedef struct {
PyObject_HEAD PyObject_HEAD
@ -66,6 +71,7 @@ typedef struct {
/* notice processing */ /* notice processing */
PyObject *notice_list; PyObject *notice_list;
PyObject *notice_filter; PyObject *notice_filter;
struct connectionObject_notice *notice_pending;
/* notifies */ /* notifies */
PyObject *notifies; PyObject *notifies;
@ -75,10 +81,11 @@ typedef struct {
PyObject *binary_types; /* a set of typecasters for binary types */ PyObject *binary_types; /* a set of typecasters for binary types */
int equote; /* use E''-style quotes for escaped strings */ int equote; /* use E''-style quotes for escaped strings */
} connectionObject; } connectionObject;
/* C-callable functions in connection_int.c and connection_ext.c */ /* C-callable functions in connection_int.c and connection_ext.c */
HIDDEN void conn_notice_process(connectionObject *self);
HIDDEN void conn_notice_clean(connectionObject *self);
HIDDEN int conn_connect(connectionObject *self); HIDDEN int conn_connect(connectionObject *self);
HIDDEN void conn_close(connectionObject *self); HIDDEN void conn_close(connectionObject *self);
HIDDEN int conn_commit(connectionObject *self); HIDDEN int conn_commit(connectionObject *self);

View File

@ -39,14 +39,39 @@ conn_notice_callback(void *args, const char *message)
Dprintf("conn_notice_callback: %s", message); Dprintf("conn_notice_callback: %s", message);
/* unfortunately the old protocl return COPY FROM errors only as notices, /* unfortunately the old protocol return COPY FROM errors only as notices,
so we need to filter them looking for such errors (but we do it so we need to filter them looking for such errors (but we do it
only if the protocol if <3, else we don't need that */ only if the protocol if <3, else we don't need that)
NOTE: if we get here and the connection is unlocked then there is a
problem but this should happen because the notice callback is only
called from libpq and when we're inside libpq the connection is usually
locked.
*/
if (self->protocol < 3 && strncmp(message, "ERROR", 5) == 0) if (self->protocol < 3 && strncmp(message, "ERROR", 5) == 0)
pq_set_critical(self, message); pq_set_critical(self, message);
else { else {
PyObject *msg = PyString_FromString(message); struct connectionObject_notice *notice =
(struct connectionObject_notice *)
PyMem_Malloc(sizeof(struct connectionObject_notice));
notice->message = strdup(message);
notice->next = self->notice_pending;
self->notice_pending = notice;
}
}
void
conn_notice_process(connectionObject *self)
{
pthread_mutex_lock(&self->lock);
struct connectionObject_notice *notice = self->notice_pending;
while (notice != NULL) {
PyObject *msg = PyString_FromString(notice->message);
Dprintf("conn_notice_process: %s", notice->message);
PyList_Append(self->notice_list, msg); PyList_Append(self->notice_list, msg);
Py_DECREF(msg); Py_DECREF(msg);
@ -54,7 +79,32 @@ conn_notice_callback(void *args, const char *message)
/* Remove the oldest item if the queue is getting too long. */ /* Remove the oldest item if the queue is getting too long. */
if (PyList_GET_SIZE(self->notice_list) > CONN_NOTICES_LIMIT) if (PyList_GET_SIZE(self->notice_list) > CONN_NOTICES_LIMIT)
PySequence_DelItem(self->notice_list, 0); PySequence_DelItem(self->notice_list, 0);
notice = notice->next;
} }
pthread_mutex_unlock(&self->lock);
conn_notice_clean(self);
}
void
conn_notice_clean(connectionObject *self)
{
pthread_mutex_lock(&self->lock);
struct connectionObject_notice *tmp, *notice = self->notice_pending;
while (notice != NULL) {
tmp = notice;
notice = notice->next;
free(tmp->message);
PyMem_Free(tmp);
}
self->notice_pending = NULL;
pthread_mutex_unlock(&self->lock);
} }
/* conn_connect - execute a connection to the database */ /* conn_connect - execute a connection to the database */

View File

@ -430,6 +430,7 @@ connection_setup(connectionObject *self, const char *dsn)
self->mark = 0; self->mark = 0;
self->string_types = PyDict_New(); self->string_types = PyDict_New();
self->binary_types = PyDict_New(); self->binary_types = PyDict_New();
self->notice_pending = NULL;
pthread_mutex_init(&(self->lock), NULL); pthread_mutex_init(&(self->lock), NULL);
@ -462,13 +463,16 @@ connection_dealloc(PyObject* obj)
if (self->closed == 0) conn_close(self); if (self->closed == 0) conn_close(self);
conn_notice_clean(self);
if (self->dsn) free(self->dsn); if (self->dsn) free(self->dsn);
if (self->encoding) free(self->encoding); if (self->encoding) free(self->encoding);
if (self->critical) free(self->critical); if (self->critical) free(self->critical);
Py_CLEAR(self->notice_list);
Py_CLEAR(self->notifies);
Py_CLEAR(self->async_cursor); Py_CLEAR(self->async_cursor);
Py_CLEAR(self->notice_list);
Py_CLEAR(self->notice_filter);
Py_CLEAR(self->notifies);
Py_CLEAR(self->string_types); Py_CLEAR(self->string_types);
Py_CLEAR(self->binary_types); Py_CLEAR(self->binary_types);

View File

@ -20,7 +20,7 @@
*/ */
/* IMPORTANT NOTE: no function in this file do its own connection locking /* IMPORTANT NOTE: no function in this file do its own connection locking
except for pg_execute and pq_fetch (that are somehow high-level. This means except for pg_execute and pq_fetch (that are somehow high-level). This means
that all the othe functions should be called while holding a lock to the that all the othe functions should be called while holding a lock to the
connection. connection.
*/ */
@ -428,6 +428,8 @@ pq_commit(connectionObject *conn)
pthread_mutex_unlock(&conn->lock); pthread_mutex_unlock(&conn->lock);
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
conn_notice_process(conn);
if (retvalue < 0) if (retvalue < 0)
pq_complete_error(conn, &pgres, &error); pq_complete_error(conn, &pgres, &error);
@ -488,6 +490,8 @@ pq_abort(connectionObject *conn)
pthread_mutex_unlock(&conn->lock); pthread_mutex_unlock(&conn->lock);
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
conn_notice_process(conn);
if (retvalue < 0) if (retvalue < 0)
pq_complete_error(conn, &pgres, &error); pq_complete_error(conn, &pgres, &error);
@ -544,6 +548,8 @@ pq_is_busy(connectionObject *conn)
pthread_mutex_unlock(&(conn->lock)); pthread_mutex_unlock(&(conn->lock));
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
conn_notice_process(conn);
return res; return res;
} }
@ -623,6 +629,8 @@ pq_execute(cursorObject *curs, const char *query, int async)
pthread_mutex_unlock(&(curs->conn->lock)); pthread_mutex_unlock(&(curs->conn->lock));
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
conn_notice_process(curs->conn);
/* if the execute was sync, we call pq_fetch() immediately, /* if the execute was sync, we call pq_fetch() immediately,
to respect the old DBAPI-2.0 compatible behaviour */ to respect the old DBAPI-2.0 compatible behaviour */
if (async == 0) { if (async == 0) {
@ -1150,6 +1158,8 @@ pq_fetch(cursorObject *curs)
Dprintf("pq_fetch: fetching done; check for critical errors"); Dprintf("pq_fetch: fetching done; check for critical errors");
conn_notice_process(curs->conn);
/* error checking, close the connection if necessary (some critical errors /* error checking, close the connection if necessary (some critical errors
are not really critical, like a COPY FROM error: if that's the case we are not really critical, like a COPY FROM error: if that's the case we
raise the exception but we avoid to close the connection) */ raise the exception but we avoid to close the connection) */