diff --git a/ChangeLog b/ChangeLog index 5f4357a1..db34be40 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,27 @@ +2004-12-10 Federico Di Gregorio + + * psycopg/cursor_type.c: now *all* write or async accesses to the + connection object are arbitrated using the connection lock. + + * psycopg/cursor_type.c (psyco_curs_isready): now we reset the + current async cursor if it is ready, to allow other cursors to + .execute() without raising the "transaction in progress" error. + + * psycopg/pqpath.c (pq_is_busy): gained status of high-level + function with its own blocking and locking. + + * psycopg/cursor.h (EXC_IF_CURS_CLOSED): also checks the + connection (a closed connection implies a closed cursor.) + + * psycopg/cursor_type.c: cursor's connection is correctly + INCREFfed and DECREFfed. + + * psycopg/connection_type.c: removed the cursors list from the + connection object. It is not necessary anymore for the connection + to know about the cursors and the reference counting will keep the + connection alive (but possibly closed) until all cursors are + garbage collected. + 2004-11-20 Federico Di Gregorio * psycopg/cursor_type.c (_mogrify): ported %% fix from 1.1.15. @@ -19,7 +43,7 @@ tuples are filled using PyTuple_SET_ITEM while extended types (created via row_factory) are filled using PySequence_SetItem. - * psycopg/cursor_type.c: change cursor attribute name from + * psycopg/cursor_type.c: changed cursor attribute name from tuple_factory to row_factory. 2004-10-14 Federico Di Gregorio diff --git a/NEWS b/NEWS index e2415fce..ad560510 100644 --- a/NEWS +++ b/NEWS @@ -1,3 +1,14 @@ +What's new in psycopg 1.99.11 +----------------------------- + +* 'cursor' argument in .cursor() connection method renamed to + 'cursor_factory'. + +* changed 'tuple_factory' cursor attribute name to 'row_factory'. + +* the .cursor attribute is gone and connections and cursors are propely + gc-managed. + What's new in psycopg 1.99.10 ----------------------------- diff --git a/psycopg/connection.h b/psycopg/connection.h index 56193bdd..c973b92a 100644 --- a/psycopg/connection.h +++ b/psycopg/connection.h @@ -40,8 +40,6 @@ extern PyTypeObject connectionType; typedef struct { PyObject HEAD; - PyObject *cursors; /* all cursors derived from this connection */ - pthread_mutex_t lock; /* the global connection lock */ char *dsn; /* data source name */ diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c index 8e72ff28..270ed6cd 100644 --- a/psycopg/connection_int.c +++ b/psycopg/connection_int.c @@ -131,9 +131,6 @@ conn_connect(connectionObject *self) void conn_close(connectionObject *self) { - int len, i; - PyObject *t = NULL; - /* sets this connection as closed even for other threads; also note that we need to check the value of pgconn, because we get called even when the connection fails! */ @@ -143,34 +140,17 @@ conn_close(connectionObject *self) self->closed = 1; /* execute a forced rollback on the connection (but don't check the - result, we're going to close the pq connection anyway */ - if (self->pgconn) pq_abort(self); - - /* orphans all the children cursors but do NOT destroy them (note that we - need to lock the connection before orphaning a cursor: we don't want to - remove a connection from a cursor executing a DB operation */ - pthread_mutex_unlock(&self->lock); - Py_END_ALLOW_THREADS; - - pthread_mutex_lock(&self->lock); - len = PyList_Size(self->cursors); - Dprintf("conn_close: ophaning %d cursors", len); - for (i = len-1; i >= 0; i--) { - t = PySequence_GetItem(self->cursors, i); - Dprintf("conn close: cursor at %p: refcnt = %d", t, t->ob_refcnt); - PySequence_DelItem(self->cursors, i); - ((cursorObject *)t)->conn = NULL; /* orphaned */ - Dprintf("conn_close: -> new refcnt = %d", t->ob_refcnt); - } - pthread_mutex_unlock(&self->lock); - - /* now that all cursors have been orphaned (they can't operate on the - database anymore) we can shut down the connection */ + result, we're going to close the pq connection anyway */ if (self->pgconn) { + pq_abort(self); PQfinish(self->pgconn); Dprintf("conn_close: PQfinish called"); self->pgconn = NULL; } + + pthread_mutex_unlock(&self->lock); + Py_END_ALLOW_THREADS; + } /* conn_commit - commit on a connection */ diff --git a/psycopg/connection_type.c b/psycopg/connection_type.c index 1d544f6e..d1bb978c 100644 --- a/psycopg/connection_type.c +++ b/psycopg/connection_type.c @@ -65,11 +65,6 @@ psyco_conn_cursor(connectionObject *self, PyObject *args, PyObject *keywds) /* TODO: added error checking on obj (cursor) here */ - /* add the cursor to this connection's list (and decref it, so that it has - the right number of references to go away even if still in the list) */ - PyList_Append(self->cursors, obj); - Py_DECREF(obj); - Dprintf("psyco_conn_cursor: new cursor at %p: refcnt = %d", obj, obj->ob_refcnt); return obj; @@ -244,7 +239,6 @@ static struct PyMemberDef connectionObject_members[] = { {"isolation_level", T_LONG, offsetof(connectionObject, isolation_level), RO}, {"encoding", T_STRING, offsetof(connectionObject, encoding), RO}, - {"cursors", T_OBJECT, offsetof(connectionObject, cursors), RO}, {"notices", T_OBJECT, offsetof(connectionObject, notice_list), RO}, {"notifies", T_OBJECT, offsetof(connectionObject, notifies), RO}, {"dsn", T_STRING, offsetof(connectionObject, dsn), RO}, @@ -261,7 +255,6 @@ connection_setup(connectionObject *self, char *dsn) self, ((PyObject *)self)->ob_refcnt); self->dsn = strdup(dsn); - self->cursors = PyList_New(0); self->notice_list = PyList_New(0); self->closed = 0; self->isolation_level = 1; @@ -273,7 +266,6 @@ connection_setup(connectionObject *self, char *dsn) pthread_mutex_init(&(self->lock), NULL); if (conn_connect(self) != 0) { - Py_XDECREF(self->cursors); pthread_mutex_destroy(&(self->lock)); Dprintf("connection_init: FAILED"); return -1; @@ -291,7 +283,6 @@ connection_dealloc(PyObject* obj) if (self->closed == 0) conn_close(self); - Py_XDECREF(self->cursors); if (self->dsn) free(self->dsn); if (self->encoding) free(self->encoding); if (self->critical) free(self->critical); diff --git a/psycopg/cursor.h b/psycopg/cursor.h index 2c2983e9..a1d6ac38 100644 --- a/psycopg/cursor.h +++ b/psycopg/cursor.h @@ -72,11 +72,12 @@ typedef struct { extern void curs_reset(cursorObject *self); /* exception-raising macros */ -#define EXC_IF_CURS_CLOSED(self) if ((self)->closed) { \ - PyErr_SetString(InterfaceError, "cursor already closed"); \ +#define EXC_IF_CURS_CLOSED(self) \ +if ((self)->closed || ((self)->conn && (self)->conn->closed)) { \ + PyErr_SetString(InterfaceError, "cursor already closed"); \ return NULL; } -#define EXC_IF_NO_TUPLES(self) if ((self)->notuples) { \ +#define EXC_IF_NO_TUPLES(self) if ((self)->notuples) { \ PyErr_SetString(ProgrammingError, "no results to fetch"); \ return NULL; } diff --git a/psycopg/cursor_type.c b/psycopg/cursor_type.c index 5482f662..1bb1894b 100644 --- a/psycopg/cursor_type.c +++ b/psycopg/cursor_type.c @@ -249,12 +249,15 @@ psyco_curs_execute(cursorObject *self, PyObject *args, PyObject *kwargs) return NULL; } + pthread_mutex_lock(&(self->conn->lock)); if (self->conn->async_cursor != NULL && self->conn->async_cursor != (PyObject*)self) { + pthread_mutex_unlock(&(self->conn->lock)); PyErr_SetString(ProgrammingError, "asynchronous query already in execution"); return NULL; } + pthread_mutex_unlock(&(self->conn->lock)); if (PyUnicode_Check(operation)) { PyObject *enc = PyDict_GetItemString(psycoEncodings, @@ -498,12 +501,15 @@ _psyco_curs_prefetch(cursorObject *self) /* check if the fetching cursor is the one that did the asynchronous query and raise an exception if not */ + pthread_mutex_lock(&(self->conn->lock)); if (self->conn->async_cursor != NULL && self->conn->async_cursor != (PyObject*)self) { + pthread_mutex_unlock(&(self->conn->lock)); PyErr_SetString(ProgrammingError, "asynchronous fetch by wrong cursor"); return -2; } + pthread_mutex_unlock(&(self->conn->lock)); if (self->pgres == NULL) { Dprintf("_psyco_curs_prefetch: trying to fetch data"); @@ -622,10 +628,9 @@ psyco_curs_fetchone(cursorObject *self, PyObject *args) /* if the query was async aggresively free pgres, to allow successive requests to reallocate it */ if (self->row >= self->rowcount - && self->conn->async_cursor == (PyObject*)self) IFCLEARPGRES(self->pgres); - + return res; } @@ -687,7 +692,8 @@ psyco_curs_fetchmany(cursorObject *self, PyObject *args, PyObject *kwords) /* if the query was async aggresively free pgres, to allow successive requests to reallocate it */ - if (self->row >= self->rowcount && self->conn->async_cursor) + if (self->row >= self->rowcount + && self->conn->async_cursor == (PyObject*)self) IFCLEARPGRES(self->pgres); return list; @@ -743,7 +749,8 @@ psyco_curs_fetchall(cursorObject *self, PyObject *args) /* if the query was async aggresively free pgres, to allow successive requests to reallocate it */ - if (self->row >= self->rowcount && self->conn->async_cursor) + if (self->row >= self->rowcount + && self->conn->async_cursor == (PyObject*)self) IFCLEARPGRES(self->pgres); return list; @@ -929,13 +936,21 @@ psyco_curs_copy_from(cursorObject *self, PyObject *args) static PyObject * psyco_curs_fileno(cursorObject *self, PyObject *args) { + long int socket; + if (!PyArg_ParseTuple(args, "")) return NULL; EXC_IF_CURS_CLOSED(self); /* note how we call PQflush() to make sure the user will use select() in the safe way! */ + pthread_mutex_lock(&(self->conn->lock)); + Py_BEGIN_ALLOW_THREADS; PQflush(self->conn->pgconn); - return PyInt_FromLong((long int)PQsocket(self->conn->pgconn)); + socket = (long int)PQsocket(self->conn->pgconn); + Py_END_ALLOW_THREADS; + pthread_mutex_unlock(&(self->conn->lock)); + + return PyInt_FromLong(socket); } /* extension: isready - return true if data from async execute is ready */ @@ -949,11 +964,20 @@ psyco_curs_isready(cursorObject *self, PyObject *args) if (!PyArg_ParseTuple(args, "")) return NULL; EXC_IF_CURS_CLOSED(self); + /* pq_is_busy does its own locking, we don't need anything special but if + the cursor is ready we need to fetch the result and free the connection + for the next query. */ + if (pq_is_busy(self->conn)) { Py_INCREF(Py_False); return Py_False; } else { + IFCLEARPGRES(self->pgres); + pthread_mutex_lock(&(self->conn->lock)); + self->pgres = PQgetResult(self->conn->pgconn); + self->conn->async_cursor = NULL; + pthread_mutex_unlock(&(self->conn->lock)); Py_INCREF(Py_True); return Py_True; } @@ -1061,6 +1085,8 @@ cursor_setup(cursorObject *self, connectionObject *conn) self, ((PyObject *)self)->ob_refcnt); self->conn = conn; + Py_INCREF((PyObject*)self->conn); + self->closed = 0; self->pgres = NULL; @@ -1092,25 +1118,10 @@ cursor_dealloc(PyObject* obj) { cursorObject *self = (cursorObject *)obj; - /* if necessary remove cursor from connection */ - if (self->conn != NULL) { - PyObject *t; - int len, i; - - if ((len = PyList_Size(self->conn->cursors)) > 0) { - for (i = 0; i < len; i++) { - t = PyList_GET_ITEM(self->conn->cursors, i); - if (self == (cursorObject *)t) { - Dprintf("cursor_dealloc: found myself in cursor list"); - PySequence_DelItem(self->conn->cursors, i); - break; - } - } - } - } if (self->query) free(self->query); - + + Py_DECREF((PyObject*)self->conn); Py_XDECREF(self->casts); Py_XDECREF(self->description); Py_XDECREF(self->pgstatus); diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index 0f5beb1a..e7bed669 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -19,6 +19,12 @@ * Foundation, 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */ +/* IMPORTANT NOTE: no function in this file do its own connection locking + except for pg_execute and pq_fetch (that are somehow high-level. This means + that all the othe functions should be called while holding a lock to the + connection. +*/ + #include #include @@ -33,7 +39,9 @@ #include "psycopg/pgtypes.h" #include "psycopg/pgversion.h" -/* pq_raise - raise a python exception of the right kind */ +/* pq_raise - raise a python exception of the right kind + + This function should be called while holding the GIL. */ void pq_raise(connectionObject *conn, cursorObject *curs, PyObject *exc, char *msg) @@ -106,7 +114,8 @@ pq_raise(connectionObject *conn, cursorObject *curs, PyObject *exc, char *msg) critical condition like out of memory or lost connection. it save the error message and mark the connection as 'wanting cleanup'. - both functions do not call any Py_*_ALLOW_THREADS macros. */ + both functions do not call any Py_*_ALLOW_THREADS macros. + pq_resolve_critical should be called while holding the GIL. */ void pq_set_critical(connectionObject *conn, const char *msg) @@ -140,6 +149,7 @@ pq_resolve_critical(connectionObject *conn, int close) note that this function does block because it needs to wait for the full result sets of the previous query to clear them. + this function does not call any Py_*_ALLOW_THREADS macros */ void @@ -149,6 +159,7 @@ pq_clear_async(connectionObject *conn) do { pgres = PQgetResult(conn->pgconn); + Dprintf("pq_clear_async: clearing PGresult at %p", pgres); IFCLEARPGRES(pgres); } while (pgres != NULL); } @@ -291,7 +302,10 @@ pq_abort(connectionObject *conn) a status of 1 means that a call to pq_fetch will block, while a status of 0 means that there is data available to be collected. -1 means an error, the - exception will be set accordingly. */ + exception will be set accordingly. + + this fucntion locks the connection object + this function call Py_*_ALLOW_THREADS macros */ int pq_is_busy(connectionObject *conn) @@ -299,9 +313,15 @@ pq_is_busy(connectionObject *conn) PGnotify *pgn; Dprintf("pq_is_busy: consuming input"); + + Py_BEGIN_ALLOW_THREADS; + pthread_mutex_lock(&(conn->lock)); + if (PQconsumeInput(conn->pgconn) == 0) { Dprintf("pq_is_busy: PQconsumeInput() failed"); PyErr_SetString(OperationalError, PQerrorMessage(conn->pgconn)); + pthread_mutex_unlock(&(conn->lock)); + Py_BLOCK_THREADS; return -1; } @@ -315,11 +335,12 @@ pq_is_busy(connectionObject *conn) notify = PyTuple_New(2); PyTuple_SET_ITEM(notify, 0, PyInt_FromLong((long)pgn->be_pid)); PyTuple_SET_ITEM(notify, 1, PyString_FromString(pgn->relname)); - pthread_mutex_lock(&(conn->lock)); PyList_Append(conn->notifies, notify); - pthread_mutex_unlock(&(conn->lock)); free(pgn); } + + pthread_mutex_unlock(&(conn->lock)); + Py_END_ALLOW_THREADS; return PQisBusy(conn->pgconn); } @@ -656,14 +677,14 @@ pq_fetch(cursorObject *curs) Dprintf("pq_fetch: no data: entering polling loop"); - Py_BEGIN_ALLOW_THREADS; - pthread_mutex_lock(&(curs->conn->lock)); - while (pq_is_busy(curs->conn) > 0) { fd_set rfds; struct timeval tv; int sval, sock; + Py_BEGIN_ALLOW_THREADS; + pthread_mutex_lock(&(curs->conn->lock)); + sock = PQsocket(curs->conn->pgconn); FD_ZERO(&rfds); FD_SET(sock, &rfds); @@ -677,14 +698,14 @@ pq_fetch(cursorObject *curs) Dprintf("pq_fetch: entering PDflush() loop"); while (PQflush(curs->conn->pgconn) != 0); sval = select(sock+1, &rfds, NULL, NULL, &tv); + + pthread_mutex_unlock(&(curs->conn->lock)); + Py_END_ALLOW_THREADS; } Dprintf("pq_fetch: data is probably ready"); IFCLEARPGRES(curs->pgres); curs->pgres = PQgetResult(curs->conn->pgconn); - - pthread_mutex_unlock(&(curs->conn->lock)); - Py_END_ALLOW_THREADS; } /* check for PGRES_FATAL_ERROR result */