Avoid a deadlock using concurrent green threads on the same connection

Use the async_cursor property to store an indication that something is
running (even if it is not necessarily a cursor running the query).
This commit is contained in:
Daniele Varrazzo 2011-06-05 16:22:54 +01:00
parent dcc9e84a68
commit 8f876d4b5d
7 changed files with 32 additions and 13 deletions

3
NEWS
View File

@ -13,6 +13,9 @@ What's new in psycopg 2.4.2
support was built (ticket #53).
- Fixed escape for negative numbers prefixed by minus operator
(ticket #57).
- Trying to execute concurrent operations on the same connection
through concurrent green thread results in an error instead of a
deadlock.
What's new in psycopg 2.4.1

View File

@ -432,11 +432,9 @@ SQLAlchemy_) to be used in coroutine-based programs.
.. warning::
Psycopg connections are not *green thread safe* and can't be used
concurrently by different green threads. Each connection has a lock
used to serialize requests from different cursors to the backend process.
The lock is held for the duration of the command: if the control switched
to a different thread and the latter tried to access the same connection,
the result would be a deadlock.
concurrently by different green threads. Trying to execute more than one
command at time using one cursor per thread will result in an error (or a
deadlock on versions before 2.4.2).
Therefore, programmers are advised to either avoid sharing connections
between coroutines or to use a library-friendly lock to synchronize shared

View File

@ -598,8 +598,8 @@ forking web deploy method such as FastCGI ensure to create the connections
.. __: http://www.postgresql.org/docs/9.0/static/libpq-connect.html#LIBPQ-CONNECT
Connections shouldn't be shared either by different green threads: doing so
may result in a deadlock. See :ref:`green-support` for further details.
Connections shouldn't be shared either by different green threads: see
:ref:`green-support` for further details.

View File

@ -91,7 +91,10 @@ typedef struct {
PGconn *pgconn; /* the postgresql connection */
PGcancel *cancel; /* the cancellation structure */
PyObject *async_cursor; /* weakref to a cursor executing an asynchronous query */
/* Weakref to the object executing an asynchronous query. The object
* is a cursor for async connections, but it may be something else
* for a green connection. If NULL, the connection is idle. */
PyObject *async_cursor;
int async_status; /* asynchronous execution status */
/* notice processing */

View File

@ -881,7 +881,7 @@ conn_poll(connectionObject *self)
case CONN_STATUS_PREPARED:
res = _conn_poll_query(self);
if (res == PSYCO_POLL_OK && self->async_cursor) {
if (res == PSYCO_POLL_OK && self->async && self->async_cursor) {
/* An async query has just finished: parse the tuple in the
* target cursor. */
cursorObject *curs;

View File

@ -739,7 +739,6 @@ psyco_curs_fetchone(cursorObject *self, PyObject *args)
PyObject *res;
EXC_IF_CURS_CLOSED(self);
EXC_IF_ASYNC_IN_PROGRESS(self, fetchone);
if (_psyco_curs_prefetch(self) < 0) return NULL;
EXC_IF_NO_TUPLES(self);
@ -747,6 +746,7 @@ psyco_curs_fetchone(cursorObject *self, PyObject *args)
char buffer[128];
EXC_IF_NO_MARK(self);
EXC_IF_ASYNC_IN_PROGRESS(self, fetchone);
EXC_IF_TPC_PREPARED(self->conn, fetchone);
PyOS_snprintf(buffer, 127, "FETCH FORWARD 1 FROM \"%s\"", self->name);
if (pq_execute(self, buffer, 0) == -1) return NULL;
@ -853,7 +853,6 @@ psyco_curs_fetchmany(cursorObject *self, PyObject *args, PyObject *kwords)
}
EXC_IF_CURS_CLOSED(self);
EXC_IF_ASYNC_IN_PROGRESS(self, fetchmany);
if (_psyco_curs_prefetch(self) < 0) return NULL;
EXC_IF_NO_TUPLES(self);
@ -861,6 +860,7 @@ psyco_curs_fetchmany(cursorObject *self, PyObject *args, PyObject *kwords)
char buffer[128];
EXC_IF_NO_MARK(self);
EXC_IF_ASYNC_IN_PROGRESS(self, fetchmany);
EXC_IF_TPC_PREPARED(self->conn, fetchone);
PyOS_snprintf(buffer, 127, "FETCH FORWARD %d FROM \"%s\"",
(int)size, self->name);
@ -924,7 +924,6 @@ psyco_curs_fetchall(cursorObject *self, PyObject *args)
PyObject *list, *res;
EXC_IF_CURS_CLOSED(self);
EXC_IF_ASYNC_IN_PROGRESS(self, fetchall);
if (_psyco_curs_prefetch(self) < 0) return NULL;
EXC_IF_NO_TUPLES(self);
@ -932,6 +931,7 @@ psyco_curs_fetchall(cursorObject *self, PyObject *args)
char buffer[128];
EXC_IF_NO_MARK(self);
EXC_IF_ASYNC_IN_PROGRESS(self, fetchall);
EXC_IF_TPC_PREPARED(self->conn, fetchall);
PyOS_snprintf(buffer, 127, "FETCH FORWARD ALL FROM \"%s\"", self->name);
if (pq_execute(self, buffer, 0) == -1) return NULL;
@ -1112,7 +1112,6 @@ psyco_curs_scroll(cursorObject *self, PyObject *args, PyObject *kwargs)
return NULL;
EXC_IF_CURS_CLOSED(self);
EXC_IF_ASYNC_IN_PROGRESS(self, scroll)
/* if the cursor is not named we have the full result set and we can do
our own calculations to scroll; else we just delegate the scrolling
@ -1141,6 +1140,7 @@ psyco_curs_scroll(cursorObject *self, PyObject *args, PyObject *kwargs)
char buffer[128];
EXC_IF_NO_MARK(self);
EXC_IF_ASYNC_IN_PROGRESS(self, scroll)
EXC_IF_TPC_PREPARED(self->conn, scroll);
if (strcmp(mode, "absolute") == 0) {

View File

@ -152,6 +152,20 @@ psyco_exec_green(connectionObject *conn, const char *command)
{
PGresult *result = NULL;
/* Check that there is a single concurrently executing query */
if (conn->async_cursor) {
PyErr_SetString(ProgrammingError,
"a single async query can be executed on the same connection");
goto end;
}
/* we don't care about which cursor is executing the query, and
* it may also be that no cursor is involved at all and this is
* an internal query. So just store anything in the async_cursor,
* respecting the code expecting it to be a weakref */
if (!(conn->async_cursor = PyWeakref_NewRef((PyObject*)conn, NULL))) {
goto end;
}
/* Send the query asynchronously */
if (0 == pq_send_query(conn, command)) {
goto end;
@ -173,6 +187,7 @@ psyco_exec_green(connectionObject *conn, const char *command)
end:
conn->async_status = ASYNC_DONE;
Py_CLEAR(conn->async_cursor);
return result;
}