async fixes and better connection/cursor management.

This commit is contained in:
Federico Di Gregorio 2004-12-10 10:34:57 +00:00
parent 61b4ff6e6f
commit 1b74bb897f
8 changed files with 111 additions and 74 deletions

View File

@ -1,3 +1,27 @@
2004-12-10 Federico Di Gregorio <fog@debian.org>
* psycopg/cursor_type.c: now *all* write or async accesses to the
connection object are arbitrated using the connection lock.
* psycopg/cursor_type.c (psyco_curs_isready): now we reset the
current async cursor if it is ready, to allow other cursors to
.execute() without raising the "transaction in progress" error.
* psycopg/pqpath.c (pq_is_busy): gained status of high-level
function with its own blocking and locking.
* psycopg/cursor.h (EXC_IF_CURS_CLOSED): also checks the
connection (a closed connection implies a closed cursor.)
* psycopg/cursor_type.c: cursor's connection is correctly
INCREFfed and DECREFfed.
* psycopg/connection_type.c: removed the cursors list from the
connection object. It is not necessary anymore for the connection
to know about the cursors and the reference counting will keep the
connection alive (but possibly closed) until all cursors are
garbage collected.
2004-11-20 Federico Di Gregorio <fog@initd.org> 2004-11-20 Federico Di Gregorio <fog@initd.org>
* psycopg/cursor_type.c (_mogrify): ported %% fix from 1.1.15. * psycopg/cursor_type.c (_mogrify): ported %% fix from 1.1.15.
@ -19,7 +43,7 @@
tuples are filled using PyTuple_SET_ITEM while extended types tuples are filled using PyTuple_SET_ITEM while extended types
(created via row_factory) are filled using PySequence_SetItem. (created via row_factory) are filled using PySequence_SetItem.
* psycopg/cursor_type.c: change cursor attribute name from * psycopg/cursor_type.c: changed cursor attribute name from
tuple_factory to row_factory. tuple_factory to row_factory.
2004-10-14 Federico Di Gregorio <fog@debian.org> 2004-10-14 Federico Di Gregorio <fog@debian.org>

11
NEWS
View File

@ -1,3 +1,14 @@
What's new in psycopg 1.99.11
-----------------------------
* 'cursor' argument in .cursor() connection method renamed to
'cursor_factory'.
* changed 'tuple_factory' cursor attribute name to 'row_factory'.
* the .cursor attribute is gone and connections and cursors are propely
gc-managed.
What's new in psycopg 1.99.10 What's new in psycopg 1.99.10
----------------------------- -----------------------------

View File

@ -40,8 +40,6 @@ extern PyTypeObject connectionType;
typedef struct { typedef struct {
PyObject HEAD; PyObject HEAD;
PyObject *cursors; /* all cursors derived from this connection */
pthread_mutex_t lock; /* the global connection lock */ pthread_mutex_t lock; /* the global connection lock */
char *dsn; /* data source name */ char *dsn; /* data source name */

View File

@ -131,9 +131,6 @@ conn_connect(connectionObject *self)
void void
conn_close(connectionObject *self) conn_close(connectionObject *self)
{ {
int len, i;
PyObject *t = NULL;
/* sets this connection as closed even for other threads; also note that /* sets this connection as closed even for other threads; also note that
we need to check the value of pgconn, because we get called even when we need to check the value of pgconn, because we get called even when
the connection fails! */ the connection fails! */
@ -143,34 +140,17 @@ conn_close(connectionObject *self)
self->closed = 1; self->closed = 1;
/* execute a forced rollback on the connection (but don't check the /* execute a forced rollback on the connection (but don't check the
result, we're going to close the pq connection anyway */ result, we're going to close the pq connection anyway */
if (self->pgconn) pq_abort(self);
/* orphans all the children cursors but do NOT destroy them (note that we
need to lock the connection before orphaning a cursor: we don't want to
remove a connection from a cursor executing a DB operation */
pthread_mutex_unlock(&self->lock);
Py_END_ALLOW_THREADS;
pthread_mutex_lock(&self->lock);
len = PyList_Size(self->cursors);
Dprintf("conn_close: ophaning %d cursors", len);
for (i = len-1; i >= 0; i--) {
t = PySequence_GetItem(self->cursors, i);
Dprintf("conn close: cursor at %p: refcnt = %d", t, t->ob_refcnt);
PySequence_DelItem(self->cursors, i);
((cursorObject *)t)->conn = NULL; /* orphaned */
Dprintf("conn_close: -> new refcnt = %d", t->ob_refcnt);
}
pthread_mutex_unlock(&self->lock);
/* now that all cursors have been orphaned (they can't operate on the
database anymore) we can shut down the connection */
if (self->pgconn) { if (self->pgconn) {
pq_abort(self);
PQfinish(self->pgconn); PQfinish(self->pgconn);
Dprintf("conn_close: PQfinish called"); Dprintf("conn_close: PQfinish called");
self->pgconn = NULL; self->pgconn = NULL;
} }
pthread_mutex_unlock(&self->lock);
Py_END_ALLOW_THREADS;
} }
/* conn_commit - commit on a connection */ /* conn_commit - commit on a connection */

View File

@ -65,11 +65,6 @@ psyco_conn_cursor(connectionObject *self, PyObject *args, PyObject *keywds)
/* TODO: added error checking on obj (cursor) here */ /* TODO: added error checking on obj (cursor) here */
/* add the cursor to this connection's list (and decref it, so that it has
the right number of references to go away even if still in the list) */
PyList_Append(self->cursors, obj);
Py_DECREF(obj);
Dprintf("psyco_conn_cursor: new cursor at %p: refcnt = %d", Dprintf("psyco_conn_cursor: new cursor at %p: refcnt = %d",
obj, obj->ob_refcnt); obj, obj->ob_refcnt);
return obj; return obj;
@ -244,7 +239,6 @@ static struct PyMemberDef connectionObject_members[] = {
{"isolation_level", T_LONG, {"isolation_level", T_LONG,
offsetof(connectionObject, isolation_level), RO}, offsetof(connectionObject, isolation_level), RO},
{"encoding", T_STRING, offsetof(connectionObject, encoding), RO}, {"encoding", T_STRING, offsetof(connectionObject, encoding), RO},
{"cursors", T_OBJECT, offsetof(connectionObject, cursors), RO},
{"notices", T_OBJECT, offsetof(connectionObject, notice_list), RO}, {"notices", T_OBJECT, offsetof(connectionObject, notice_list), RO},
{"notifies", T_OBJECT, offsetof(connectionObject, notifies), RO}, {"notifies", T_OBJECT, offsetof(connectionObject, notifies), RO},
{"dsn", T_STRING, offsetof(connectionObject, dsn), RO}, {"dsn", T_STRING, offsetof(connectionObject, dsn), RO},
@ -261,7 +255,6 @@ connection_setup(connectionObject *self, char *dsn)
self, ((PyObject *)self)->ob_refcnt); self, ((PyObject *)self)->ob_refcnt);
self->dsn = strdup(dsn); self->dsn = strdup(dsn);
self->cursors = PyList_New(0);
self->notice_list = PyList_New(0); self->notice_list = PyList_New(0);
self->closed = 0; self->closed = 0;
self->isolation_level = 1; self->isolation_level = 1;
@ -273,7 +266,6 @@ connection_setup(connectionObject *self, char *dsn)
pthread_mutex_init(&(self->lock), NULL); pthread_mutex_init(&(self->lock), NULL);
if (conn_connect(self) != 0) { if (conn_connect(self) != 0) {
Py_XDECREF(self->cursors);
pthread_mutex_destroy(&(self->lock)); pthread_mutex_destroy(&(self->lock));
Dprintf("connection_init: FAILED"); Dprintf("connection_init: FAILED");
return -1; return -1;
@ -291,7 +283,6 @@ connection_dealloc(PyObject* obj)
if (self->closed == 0) conn_close(self); if (self->closed == 0) conn_close(self);
Py_XDECREF(self->cursors);
if (self->dsn) free(self->dsn); if (self->dsn) free(self->dsn);
if (self->encoding) free(self->encoding); if (self->encoding) free(self->encoding);
if (self->critical) free(self->critical); if (self->critical) free(self->critical);

View File

@ -72,11 +72,12 @@ typedef struct {
extern void curs_reset(cursorObject *self); extern void curs_reset(cursorObject *self);
/* exception-raising macros */ /* exception-raising macros */
#define EXC_IF_CURS_CLOSED(self) if ((self)->closed) { \ #define EXC_IF_CURS_CLOSED(self) \
PyErr_SetString(InterfaceError, "cursor already closed"); \ if ((self)->closed || ((self)->conn && (self)->conn->closed)) { \
PyErr_SetString(InterfaceError, "cursor already closed"); \
return NULL; } return NULL; }
#define EXC_IF_NO_TUPLES(self) if ((self)->notuples) { \ #define EXC_IF_NO_TUPLES(self) if ((self)->notuples) { \
PyErr_SetString(ProgrammingError, "no results to fetch"); \ PyErr_SetString(ProgrammingError, "no results to fetch"); \
return NULL; } return NULL; }

View File

@ -249,12 +249,15 @@ psyco_curs_execute(cursorObject *self, PyObject *args, PyObject *kwargs)
return NULL; return NULL;
} }
pthread_mutex_lock(&(self->conn->lock));
if (self->conn->async_cursor != NULL if (self->conn->async_cursor != NULL
&& self->conn->async_cursor != (PyObject*)self) { && self->conn->async_cursor != (PyObject*)self) {
pthread_mutex_unlock(&(self->conn->lock));
PyErr_SetString(ProgrammingError, PyErr_SetString(ProgrammingError,
"asynchronous query already in execution"); "asynchronous query already in execution");
return NULL; return NULL;
} }
pthread_mutex_unlock(&(self->conn->lock));
if (PyUnicode_Check(operation)) { if (PyUnicode_Check(operation)) {
PyObject *enc = PyDict_GetItemString(psycoEncodings, PyObject *enc = PyDict_GetItemString(psycoEncodings,
@ -498,12 +501,15 @@ _psyco_curs_prefetch(cursorObject *self)
/* check if the fetching cursor is the one that did the asynchronous query /* check if the fetching cursor is the one that did the asynchronous query
and raise an exception if not */ and raise an exception if not */
pthread_mutex_lock(&(self->conn->lock));
if (self->conn->async_cursor != NULL if (self->conn->async_cursor != NULL
&& self->conn->async_cursor != (PyObject*)self) { && self->conn->async_cursor != (PyObject*)self) {
pthread_mutex_unlock(&(self->conn->lock));
PyErr_SetString(ProgrammingError, PyErr_SetString(ProgrammingError,
"asynchronous fetch by wrong cursor"); "asynchronous fetch by wrong cursor");
return -2; return -2;
} }
pthread_mutex_unlock(&(self->conn->lock));
if (self->pgres == NULL) { if (self->pgres == NULL) {
Dprintf("_psyco_curs_prefetch: trying to fetch data"); Dprintf("_psyco_curs_prefetch: trying to fetch data");
@ -622,10 +628,9 @@ psyco_curs_fetchone(cursorObject *self, PyObject *args)
/* if the query was async aggresively free pgres, to allow /* if the query was async aggresively free pgres, to allow
successive requests to reallocate it */ successive requests to reallocate it */
if (self->row >= self->rowcount if (self->row >= self->rowcount
&& self->conn->async_cursor == (PyObject*)self) && self->conn->async_cursor == (PyObject*)self)
IFCLEARPGRES(self->pgres); IFCLEARPGRES(self->pgres);
return res; return res;
} }
@ -687,7 +692,8 @@ psyco_curs_fetchmany(cursorObject *self, PyObject *args, PyObject *kwords)
/* if the query was async aggresively free pgres, to allow /* if the query was async aggresively free pgres, to allow
successive requests to reallocate it */ successive requests to reallocate it */
if (self->row >= self->rowcount && self->conn->async_cursor) if (self->row >= self->rowcount
&& self->conn->async_cursor == (PyObject*)self)
IFCLEARPGRES(self->pgres); IFCLEARPGRES(self->pgres);
return list; return list;
@ -743,7 +749,8 @@ psyco_curs_fetchall(cursorObject *self, PyObject *args)
/* if the query was async aggresively free pgres, to allow /* if the query was async aggresively free pgres, to allow
successive requests to reallocate it */ successive requests to reallocate it */
if (self->row >= self->rowcount && self->conn->async_cursor) if (self->row >= self->rowcount
&& self->conn->async_cursor == (PyObject*)self)
IFCLEARPGRES(self->pgres); IFCLEARPGRES(self->pgres);
return list; return list;
@ -929,13 +936,21 @@ psyco_curs_copy_from(cursorObject *self, PyObject *args)
static PyObject * static PyObject *
psyco_curs_fileno(cursorObject *self, PyObject *args) psyco_curs_fileno(cursorObject *self, PyObject *args)
{ {
long int socket;
if (!PyArg_ParseTuple(args, "")) return NULL; if (!PyArg_ParseTuple(args, "")) return NULL;
EXC_IF_CURS_CLOSED(self); EXC_IF_CURS_CLOSED(self);
/* note how we call PQflush() to make sure the user will use /* note how we call PQflush() to make sure the user will use
select() in the safe way! */ select() in the safe way! */
pthread_mutex_lock(&(self->conn->lock));
Py_BEGIN_ALLOW_THREADS;
PQflush(self->conn->pgconn); PQflush(self->conn->pgconn);
return PyInt_FromLong((long int)PQsocket(self->conn->pgconn)); socket = (long int)PQsocket(self->conn->pgconn);
Py_END_ALLOW_THREADS;
pthread_mutex_unlock(&(self->conn->lock));
return PyInt_FromLong(socket);
} }
/* extension: isready - return true if data from async execute is ready */ /* extension: isready - return true if data from async execute is ready */
@ -949,11 +964,20 @@ psyco_curs_isready(cursorObject *self, PyObject *args)
if (!PyArg_ParseTuple(args, "")) return NULL; if (!PyArg_ParseTuple(args, "")) return NULL;
EXC_IF_CURS_CLOSED(self); EXC_IF_CURS_CLOSED(self);
/* pq_is_busy does its own locking, we don't need anything special but if
the cursor is ready we need to fetch the result and free the connection
for the next query. */
if (pq_is_busy(self->conn)) { if (pq_is_busy(self->conn)) {
Py_INCREF(Py_False); Py_INCREF(Py_False);
return Py_False; return Py_False;
} }
else { else {
IFCLEARPGRES(self->pgres);
pthread_mutex_lock(&(self->conn->lock));
self->pgres = PQgetResult(self->conn->pgconn);
self->conn->async_cursor = NULL;
pthread_mutex_unlock(&(self->conn->lock));
Py_INCREF(Py_True); Py_INCREF(Py_True);
return Py_True; return Py_True;
} }
@ -1061,6 +1085,8 @@ cursor_setup(cursorObject *self, connectionObject *conn)
self, ((PyObject *)self)->ob_refcnt); self, ((PyObject *)self)->ob_refcnt);
self->conn = conn; self->conn = conn;
Py_INCREF((PyObject*)self->conn);
self->closed = 0; self->closed = 0;
self->pgres = NULL; self->pgres = NULL;
@ -1092,25 +1118,10 @@ cursor_dealloc(PyObject* obj)
{ {
cursorObject *self = (cursorObject *)obj; cursorObject *self = (cursorObject *)obj;
/* if necessary remove cursor from connection */
if (self->conn != NULL) {
PyObject *t;
int len, i;
if ((len = PyList_Size(self->conn->cursors)) > 0) {
for (i = 0; i < len; i++) {
t = PyList_GET_ITEM(self->conn->cursors, i);
if (self == (cursorObject *)t) {
Dprintf("cursor_dealloc: found myself in cursor list");
PySequence_DelItem(self->conn->cursors, i);
break;
}
}
}
}
if (self->query) free(self->query); if (self->query) free(self->query);
Py_DECREF((PyObject*)self->conn);
Py_XDECREF(self->casts); Py_XDECREF(self->casts);
Py_XDECREF(self->description); Py_XDECREF(self->description);
Py_XDECREF(self->pgstatus); Py_XDECREF(self->pgstatus);

View File

@ -19,6 +19,12 @@
* Foundation, 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * Foundation, 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
*/ */
/* IMPORTANT NOTE: no function in this file do its own connection locking
except for pg_execute and pq_fetch (that are somehow high-level. This means
that all the othe functions should be called while holding a lock to the
connection.
*/
#include <Python.h> #include <Python.h>
#include <string.h> #include <string.h>
@ -33,7 +39,9 @@
#include "psycopg/pgtypes.h" #include "psycopg/pgtypes.h"
#include "psycopg/pgversion.h" #include "psycopg/pgversion.h"
/* pq_raise - raise a python exception of the right kind */ /* pq_raise - raise a python exception of the right kind
This function should be called while holding the GIL. */
void void
pq_raise(connectionObject *conn, cursorObject *curs, PyObject *exc, char *msg) pq_raise(connectionObject *conn, cursorObject *curs, PyObject *exc, char *msg)
@ -106,7 +114,8 @@ pq_raise(connectionObject *conn, cursorObject *curs, PyObject *exc, char *msg)
critical condition like out of memory or lost connection. it save the error critical condition like out of memory or lost connection. it save the error
message and mark the connection as 'wanting cleanup'. message and mark the connection as 'wanting cleanup'.
both functions do not call any Py_*_ALLOW_THREADS macros. */ both functions do not call any Py_*_ALLOW_THREADS macros.
pq_resolve_critical should be called while holding the GIL. */
void void
pq_set_critical(connectionObject *conn, const char *msg) pq_set_critical(connectionObject *conn, const char *msg)
@ -140,6 +149,7 @@ pq_resolve_critical(connectionObject *conn, int close)
note that this function does block because it needs to wait for the full note that this function does block because it needs to wait for the full
result sets of the previous query to clear them. result sets of the previous query to clear them.
this function does not call any Py_*_ALLOW_THREADS macros */ this function does not call any Py_*_ALLOW_THREADS macros */
void void
@ -149,6 +159,7 @@ pq_clear_async(connectionObject *conn)
do { do {
pgres = PQgetResult(conn->pgconn); pgres = PQgetResult(conn->pgconn);
Dprintf("pq_clear_async: clearing PGresult at %p", pgres);
IFCLEARPGRES(pgres); IFCLEARPGRES(pgres);
} while (pgres != NULL); } while (pgres != NULL);
} }
@ -291,7 +302,10 @@ pq_abort(connectionObject *conn)
a status of 1 means that a call to pq_fetch will block, while a status of 0 a status of 1 means that a call to pq_fetch will block, while a status of 0
means that there is data available to be collected. -1 means an error, the means that there is data available to be collected. -1 means an error, the
exception will be set accordingly. */ exception will be set accordingly.
this fucntion locks the connection object
this function call Py_*_ALLOW_THREADS macros */
int int
pq_is_busy(connectionObject *conn) pq_is_busy(connectionObject *conn)
@ -299,9 +313,15 @@ pq_is_busy(connectionObject *conn)
PGnotify *pgn; PGnotify *pgn;
Dprintf("pq_is_busy: consuming input"); Dprintf("pq_is_busy: consuming input");
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&(conn->lock));
if (PQconsumeInput(conn->pgconn) == 0) { if (PQconsumeInput(conn->pgconn) == 0) {
Dprintf("pq_is_busy: PQconsumeInput() failed"); Dprintf("pq_is_busy: PQconsumeInput() failed");
PyErr_SetString(OperationalError, PQerrorMessage(conn->pgconn)); PyErr_SetString(OperationalError, PQerrorMessage(conn->pgconn));
pthread_mutex_unlock(&(conn->lock));
Py_BLOCK_THREADS;
return -1; return -1;
} }
@ -315,11 +335,12 @@ pq_is_busy(connectionObject *conn)
notify = PyTuple_New(2); notify = PyTuple_New(2);
PyTuple_SET_ITEM(notify, 0, PyInt_FromLong((long)pgn->be_pid)); PyTuple_SET_ITEM(notify, 0, PyInt_FromLong((long)pgn->be_pid));
PyTuple_SET_ITEM(notify, 1, PyString_FromString(pgn->relname)); PyTuple_SET_ITEM(notify, 1, PyString_FromString(pgn->relname));
pthread_mutex_lock(&(conn->lock));
PyList_Append(conn->notifies, notify); PyList_Append(conn->notifies, notify);
pthread_mutex_unlock(&(conn->lock));
free(pgn); free(pgn);
} }
pthread_mutex_unlock(&(conn->lock));
Py_END_ALLOW_THREADS;
return PQisBusy(conn->pgconn); return PQisBusy(conn->pgconn);
} }
@ -656,14 +677,14 @@ pq_fetch(cursorObject *curs)
Dprintf("pq_fetch: no data: entering polling loop"); Dprintf("pq_fetch: no data: entering polling loop");
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&(curs->conn->lock));
while (pq_is_busy(curs->conn) > 0) { while (pq_is_busy(curs->conn) > 0) {
fd_set rfds; fd_set rfds;
struct timeval tv; struct timeval tv;
int sval, sock; int sval, sock;
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&(curs->conn->lock));
sock = PQsocket(curs->conn->pgconn); sock = PQsocket(curs->conn->pgconn);
FD_ZERO(&rfds); FD_ZERO(&rfds);
FD_SET(sock, &rfds); FD_SET(sock, &rfds);
@ -677,14 +698,14 @@ pq_fetch(cursorObject *curs)
Dprintf("pq_fetch: entering PDflush() loop"); Dprintf("pq_fetch: entering PDflush() loop");
while (PQflush(curs->conn->pgconn) != 0); while (PQflush(curs->conn->pgconn) != 0);
sval = select(sock+1, &rfds, NULL, NULL, &tv); sval = select(sock+1, &rfds, NULL, NULL, &tv);
pthread_mutex_unlock(&(curs->conn->lock));
Py_END_ALLOW_THREADS;
} }
Dprintf("pq_fetch: data is probably ready"); Dprintf("pq_fetch: data is probably ready");
IFCLEARPGRES(curs->pgres); IFCLEARPGRES(curs->pgres);
curs->pgres = PQgetResult(curs->conn->pgconn); curs->pgres = PQgetResult(curs->conn->pgconn);
pthread_mutex_unlock(&(curs->conn->lock));
Py_END_ALLOW_THREADS;
} }
/* check for PGRES_FATAL_ERROR result */ /* check for PGRES_FATAL_ERROR result */