diff --git a/lib/extensions.py b/lib/extensions.py index 51461af3..3fd9803f 100644 --- a/lib/extensions.py +++ b/lib/extensions.py @@ -80,8 +80,9 @@ ISOLATION_LEVEL_READ_UNCOMMITTED = ISOLATION_LEVEL_READ_COMMITTED STATUS_SETUP = 0 STATUS_READY = 1 STATUS_BEGIN = 2 -STATUS_SYNC = 3 -STATUS_ASYNC = 4 +STATUS_SYNC = 3 # currently unused +STATUS_ASYNC = 4 # currently unused +STATUS_PREPARED = 5 # This is a usefull mnemonic to check if the connection is in a transaction STATUS_IN_TRANSACTION = STATUS_BEGIN diff --git a/psycopg/connection.h b/psycopg/connection.h index 5b7c5d26..7d2f66c0 100644 --- a/psycopg/connection.h +++ b/psycopg/connection.h @@ -38,9 +38,10 @@ extern "C" { #endif /* connection status */ -#define CONN_STATUS_SETUP 0 -#define CONN_STATUS_READY 1 -#define CONN_STATUS_BEGIN 2 +#define CONN_STATUS_SETUP 0 +#define CONN_STATUS_READY 1 +#define CONN_STATUS_BEGIN 2 +#define CONN_STATUS_PREPARED 5 /* async connection building statuses */ #define CONN_STATUS_CONNECTING 20 #define CONN_STATUS_DATESTYLE 21 @@ -132,6 +133,8 @@ HIDDEN int conn_switch_isolation_level(connectionObject *self, int level); HIDDEN int conn_set_client_encoding(connectionObject *self, const char *enc); HIDDEN int conn_poll(connectionObject *self); HIDDEN int conn_tpc_begin(connectionObject *self, XidObject *xid); +HIDDEN int conn_tpc_command(connectionObject *self, + const char *cmd, XidObject *xid); /* exception-raising macros */ #define EXC_IF_CONN_CLOSED(self) if ((self)->closed > 0) { \ @@ -148,6 +151,12 @@ HIDDEN int conn_tpc_begin(connectionObject *self, XidObject *xid); "during a two-phase transaction", #cmd); \ return NULL; } +#define EXC_IF_TPC_PREPARED(self, cmd) \ + if ((self)->status == CONN_STATUS_PREPARED) { \ + PyErr_Format(ProgrammingError, "%s cannot be used " \ + "with a prepared two-phase transaction", #cmd); \ + return NULL; } + #ifdef __cplusplus } #endif diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c index 84233ba2..6024a2b3 100644 --- a/psycopg/connection_int.c +++ b/psycopg/connection_int.c @@ -928,3 +928,32 @@ conn_tpc_begin(connectionObject *self, XidObject *xid) return 0; } + +/* conn_tpc_command -- run one of the TPC-related PostgreSQL commands. + * + * The function doesn't change the connection state as it can be used + * for many commands and for recovered transactions. */ + +int +conn_tpc_command(connectionObject *self, const char *cmd, XidObject *xid) +{ + PGresult *pgres = NULL; + char *error = NULL; + int rv; + + Dprintf("conn_tpc_command: %s", cmd); + + Py_BEGIN_ALLOW_THREADS; + pthread_mutex_lock(&self->lock); + + rv = pq_tpc_command_locked(self, cmd, xid, &pgres, &error, &_save); + + pthread_mutex_unlock(&self->lock); + Py_END_ALLOW_THREADS; + + if (rv < 0) { + pq_complete_error(self, &pgres, &error); + } + return rv; +} + diff --git a/psycopg/connection_type.c b/psycopg/connection_type.c index 9cc32a2d..12115141 100644 --- a/psycopg/connection_type.c +++ b/psycopg/connection_type.c @@ -70,7 +70,8 @@ psyco_conn_cursor(connectionObject *self, PyObject *args, PyObject *keywds) EXC_IF_CONN_CLOSED(self); if (self->status != CONN_STATUS_READY && - self->status != CONN_STATUS_BEGIN) { + self->status != CONN_STATUS_BEGIN && + self->status != CONN_STATUS_PREPARED) { PyErr_SetString(OperationalError, "asynchronous connection attempt underway"); return NULL; @@ -231,6 +232,39 @@ exit: } +#define psyco_conn_tpc_prepare_doc \ +"tpc_prepare() -- perform the first phase of a two-phase transaction." + +static PyObject * +psyco_conn_tpc_prepare(connectionObject *self, PyObject *args) +{ + EXC_IF_CONN_CLOSED(self); + EXC_IF_CONN_ASYNC(self, tpc_prepare); + EXC_IF_TPC_PREPARED(self, tpc_prepare); + + if (!PyArg_ParseTuple(args, "")) { + return NULL; + } + + if (NULL == self->tpc_xid) { + PyErr_SetString(ProgrammingError, + "prepare must be called inside a two-phase transaction"); + return NULL; + } + + if (0 > conn_tpc_command(self, "PREPARE TRANSACTION", self->tpc_xid)) { + return NULL; + } + + /* transaction prepared: set the state so that no operation + * can be performed until commit. */ + self->status = CONN_STATUS_PREPARED; + + Py_INCREF(Py_None); + return Py_None; +} + + #ifdef PSYCOPG_EXTENSIONS /* set_isolation_level method - switch connection isolation level */ @@ -245,6 +279,7 @@ psyco_conn_set_isolation_level(connectionObject *self, PyObject *args) EXC_IF_CONN_CLOSED(self); EXC_IF_CONN_ASYNC(self, set_isolation_level); + EXC_IF_TPC_PREPARED(self, set_isolation_level); if (!PyArg_ParseTuple(args, "i", &level)) return NULL; @@ -279,6 +314,7 @@ psyco_conn_set_client_encoding(connectionObject *self, PyObject *args) EXC_IF_CONN_CLOSED(self); EXC_IF_CONN_ASYNC(self, set_client_encoding); + EXC_IF_TPC_PREPARED(self, set_client_encoding); if (!PyArg_ParseTuple(args, "s", &enc)) return NULL; @@ -379,6 +415,7 @@ psyco_conn_lobject(connectionObject *self, PyObject *args, PyObject *keywds) EXC_IF_CONN_CLOSED(self); EXC_IF_CONN_ASYNC(self, lobject); EXC_IF_GREEN(lobject); + EXC_IF_TPC_PREPARED(self, lobject); Dprintf("psyco_conn_lobject: new lobject for connection at %p", self); Dprintf("psyco_conn_lobject: parameters: oid = %d, mode = %s", @@ -561,6 +598,8 @@ static struct PyMethodDef connectionObject_methods[] = { METH_VARARGS|METH_KEYWORDS, psyco_conn_xid_doc}, {"tpc_begin", (PyCFunction)psyco_conn_tpc_begin, METH_VARARGS, psyco_conn_tpc_begin_doc}, + {"tpc_prepare", (PyCFunction)psyco_conn_tpc_prepare, + METH_VARARGS, psyco_conn_tpc_prepare_doc}, #ifdef PSYCOPG_EXTENSIONS {"set_isolation_level", (PyCFunction)psyco_conn_set_isolation_level, METH_VARARGS, psyco_conn_set_isolation_level_doc}, diff --git a/psycopg/cursor_type.c b/psycopg/cursor_type.c index 8bc7c40d..949433b2 100644 --- a/psycopg/cursor_type.c +++ b/psycopg/cursor_type.c @@ -492,6 +492,7 @@ psyco_curs_execute(cursorObject *self, PyObject *args, PyObject *kwargs) EXC_IF_CURS_CLOSED(self); EXC_IF_ASYNC_IN_PROGRESS(self, execute); + EXC_IF_TPC_PREPARED(self->conn, execute); if (_psyco_curs_execute(self, operation, vars, self->conn->async)) { Py_INCREF(Py_None); @@ -511,12 +512,12 @@ psyco_curs_executemany(cursorObject *self, PyObject *args, PyObject *kwargs) PyObject *operation = NULL, *vars = NULL; PyObject *v, *iter = NULL; int rowcount = 0; - + static char *kwlist[] = {"query", "vars_list", NULL}; /* reset rowcount to -1 to avoid setting it when an exception is raised */ self->rowcount = -1; - + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OO", kwlist, &operation, &vars)) { return NULL; @@ -524,6 +525,7 @@ psyco_curs_executemany(cursorObject *self, PyObject *args, PyObject *kwargs) EXC_IF_CURS_CLOSED(self); EXC_IF_CURS_ASYNC(self, executemany); + EXC_IF_TPC_PREPARED(self->conn, executemany); if (self->name != NULL) { psyco_set_error(ProgrammingError, (PyObject*)self, @@ -746,6 +748,7 @@ psyco_curs_fetchone(cursorObject *self, PyObject *args) char buffer[128]; EXC_IF_NO_MARK(self); + EXC_IF_TPC_PREPARED(self->conn, fetchone); PyOS_snprintf(buffer, 127, "FETCH FORWARD 1 FROM %s", self->name); if (pq_execute(self, buffer, 0) == -1) return NULL; if (_psyco_curs_prefetch(self) < 0) return NULL; @@ -807,6 +810,7 @@ psyco_curs_fetchmany(cursorObject *self, PyObject *args, PyObject *kwords) char buffer[128]; EXC_IF_NO_MARK(self); + EXC_IF_TPC_PREPARED(self->conn, fetchone); PyOS_snprintf(buffer, 127, "FETCH FORWARD %d FROM %s", (int)size, self->name); if (pq_execute(self, buffer, 0) == -1) return NULL; @@ -880,6 +884,7 @@ psyco_curs_fetchall(cursorObject *self, PyObject *args) char buffer[128]; EXC_IF_NO_MARK(self); + EXC_IF_TPC_PREPARED(self->conn, fetchall); PyOS_snprintf(buffer, 127, "FETCH FORWARD ALL FROM %s", self->name); if (pq_execute(self, buffer, 0) == -1) return NULL; if (_psyco_curs_prefetch(self) < 0) return NULL; @@ -941,6 +946,7 @@ psyco_curs_callproc(cursorObject *self, PyObject *args, PyObject *kwargs) EXC_IF_CURS_CLOSED(self); EXC_IF_ASYNC_IN_PROGRESS(self, callproc); + EXC_IF_TPC_PREPARED(self->conn, callproc); if (self->name != NULL) { psyco_set_error(ProgrammingError, (PyObject*)self, @@ -1086,6 +1092,7 @@ psyco_curs_scroll(cursorObject *self, PyObject *args, PyObject *kwargs) char buffer[128]; EXC_IF_NO_MARK(self); + EXC_IF_TPC_PREPARED(self->conn, scroll); if (strcmp(mode, "absolute") == 0) { PyOS_snprintf(buffer, 127, "MOVE ABSOLUTE %d FROM %s", @@ -1209,6 +1216,8 @@ psyco_curs_copy_from(cursorObject *self, PyObject *args, PyObject *kwargs) EXC_IF_CURS_CLOSED(self); EXC_IF_CURS_ASYNC(self, copy_from); EXC_IF_GREEN(copy_from); + EXC_IF_TPC_PREPARED(self->conn, copy_from); + quoted_delimiter = psycopg_escape_string((PyObject*)self->conn, sep, 0, NULL, NULL); if (quoted_delimiter == NULL) { @@ -1315,6 +1324,7 @@ psyco_curs_copy_to(cursorObject *self, PyObject *args, PyObject *kwargs) EXC_IF_CURS_CLOSED(self); EXC_IF_CURS_ASYNC(self, copy_to); EXC_IF_GREEN(copy_to); + EXC_IF_TPC_PREPARED(self->conn, copy_to); quoted_delimiter = psycopg_escape_string((PyObject*)self->conn, sep, 0, NULL, NULL); if (quoted_delimiter == NULL) { @@ -1401,6 +1411,7 @@ psyco_curs_copy_expert(cursorObject *self, PyObject *args, PyObject *kwargs) EXC_IF_CURS_CLOSED(self); EXC_IF_CURS_ASYNC(self, copy_expert); EXC_IF_GREEN(copy_expert); + EXC_IF_TPC_PREPARED(self->conn, copy_expert); sql = _psyco_curs_validate_sql_basic(self, sql); diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index 69c78133..37d52791 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -575,6 +575,7 @@ pq_reset_locked(connectionObject *conn, PGresult **pgres, char **error, "SET SESSION AUTHORIZATION DEFAULT", pgres, error, tstate); if (retvalue != 0) return retvalue; + /* should set the tpc xid to null: postponed until we get the GIL again */ conn->status = CONN_STATUS_READY; return retvalue; @@ -600,12 +601,58 @@ pq_reset(connectionObject *conn) conn_notice_process(conn); - if (retvalue < 0) + if (retvalue < 0) { pq_complete_error(conn, &pgres, &error); - + } + else { + Py_CLEAR(conn->tpc_xid); + } return retvalue; } + +/* Call one of the PostgreSQL tpc-related commands. + * + * This function should only be called on a locked connection without + * holding the global interpreter lock. */ + +int +pq_tpc_command_locked(connectionObject *conn, const char *cmd, XidObject *xid, + PGresult **pgres, char **error, + PyThreadState **tstate) +{ + int rv = -1; + char *tid = NULL, *etid = NULL, *buf = NULL; + Py_ssize_t buflen; + + Dprintf("_pq_tpc_command: pgconn = %p, command = %s", + conn->pgconn, cmd); + + /* convert the xid into the postgres transaction_id and quote it. */ + if (!(tid = xid_get_tid(xid))) { goto exit; } + if (!(etid = psycopg_escape_string((PyObject *)conn, tid, 0, NULL, NULL))) + { goto exit; } + + /* prepare the command to the server */ + buflen = 3 + strlen(cmd) + strlen(etid); /* add space, semicolon, zero */ + if (!(buf = PyMem_Malloc(buflen))) { + PyErr_NoMemory(); + goto exit; + } + if (0 > PyOS_snprintf(buf, buflen, "%s %s;", cmd, etid)) { goto exit; } + + /* run the command and let it handle the error cases */ + rv = pq_execute_command_locked(conn, buf, pgres, error, tstate); + +exit: + PyMem_Free(buf); + PyMem_Free(etid); + PyMem_Free(tid); + + return rv; +} + + /* pq_is_busy - consume input and return connection status a status of 1 means that a call to pq_fetch will block, while a status of 0 diff --git a/psycopg/pqpath.h b/psycopg/pqpath.h index 7f11383d..5f56efa4 100644 --- a/psycopg/pqpath.h +++ b/psycopg/pqpath.h @@ -46,6 +46,10 @@ HIDDEN int pq_abort_locked(connectionObject *conn, PGresult **pgres, char **error, PyThreadState **tstate); HIDDEN int pq_abort(connectionObject *conn); HIDDEN int pq_reset(connectionObject *conn); +HIDDEN int pq_tpc_command_locked(connectionObject *conn, + const char *cmd, XidObject *xid, + PGresult **pgres, char **error, + PyThreadState **tstate); HIDDEN int pq_is_busy(connectionObject *conn); HIDDEN int pq_is_busy_locked(connectionObject *conn); HIDDEN int pq_flush(connectionObject *conn); diff --git a/psycopg/xid.h b/psycopg/xid.h index f2c6d4d7..eec4c7ec 100644 --- a/psycopg/xid.h +++ b/psycopg/xid.h @@ -51,5 +51,6 @@ typedef struct { } XidObject; HIDDEN XidObject *xid_ensure(PyObject *oxid); +HIDDEN char *xid_get_tid(XidObject *self); #endif /* PSYCOPG_XID_H */ diff --git a/psycopg/xid_type.c b/psycopg/xid_type.c index 3e352081..1aa7b068 100644 --- a/psycopg/xid_type.c +++ b/psycopg/xid_type.c @@ -319,3 +319,33 @@ XidObject *xid_ensure(PyObject *oxid) } } + +/* Return the PostgreSQL transaction_id for this XA xid. + * + * PostgreSQL wants just a string, while the DBAPI supports the XA standard + * and thus a triple. We use the same conversion algorithm implemented by JDBC + * in order to allow some form of interoperation. + * + * Return a buffer allocated with PyMem_Malloc. Use PyMem_Free to free it. + */ +char * +xid_get_tid(XidObject *self) +{ + /* TODO: for the moment just use the string mashed up by James. + * later will implement the JDBC algorithm. */ + char *buf; + Py_ssize_t bufsize = 0; + + if (self->pg_xact_id) { + bufsize = 1 + strlen(self->pg_xact_id); + } + + buf = (char *)PyMem_Malloc(bufsize); + if (buf) { + strncpy(buf, self->pg_xact_id, bufsize); + } + + return buf; +} + +