Added tpc_begin() and current xid on connection.

This commit is contained in:
Daniele Varrazzo 2010-10-08 10:15:50 +01:00
parent 60ee39fa3d
commit 98c5b1d374
5 changed files with 118 additions and 0 deletions

View File

@ -31,6 +31,7 @@
#include <libpq-fe.h> #include <libpq-fe.h>
#include "psycopg/config.h" #include "psycopg/config.h"
#include "psycopg/xid.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
@ -87,6 +88,7 @@ typedef struct {
long int isolation_level; /* isolation level for this connection */ long int isolation_level; /* isolation level for this connection */
long int mark; /* number of commits/rollbacks done so far */ long int mark; /* number of commits/rollbacks done so far */
int status; /* status of the connection */ int status; /* status of the connection */
XidObject *tpc_xid; /* Transaction ID in two-phase commit */
long int async; /* 1 means the connection is async */ long int async; /* 1 means the connection is async */
int protocol; /* protocol version */ int protocol; /* protocol version */
@ -110,6 +112,7 @@ typedef struct {
PyObject *binary_types; /* a set of typecasters for binary types */ PyObject *binary_types; /* a set of typecasters for binary types */
int equote; /* use E''-style quotes for escaped strings */ int equote; /* use E''-style quotes for escaped strings */
} connectionObject; } connectionObject;
/* C-callable functions in connection_int.c and connection_ext.c */ /* C-callable functions in connection_int.c and connection_ext.c */
@ -128,6 +131,7 @@ HIDDEN int conn_rollback(connectionObject *self);
HIDDEN int conn_switch_isolation_level(connectionObject *self, int level); HIDDEN int conn_switch_isolation_level(connectionObject *self, int level);
HIDDEN int conn_set_client_encoding(connectionObject *self, const char *enc); HIDDEN int conn_set_client_encoding(connectionObject *self, const char *enc);
HIDDEN int conn_poll(connectionObject *self); HIDDEN int conn_poll(connectionObject *self);
HIDDEN int conn_tpc_begin(connectionObject *self, XidObject *xid);
/* exception-raising macros */ /* exception-raising macros */
#define EXC_IF_CONN_CLOSED(self) if ((self)->closed > 0) { \ #define EXC_IF_CONN_CLOSED(self) if ((self)->closed > 0) { \

View File

@ -890,3 +890,41 @@ conn_set_client_encoding(connectionObject *self, const char *enc)
return res; return res;
} }
/* conn_tpc_begin -- begin a two-phase commit.
*
* The state of a connection in the middle of a TPC is exactly the same
* of a normal transaction, in CONN_STATUS_BEGIN, but with the tpc_xid
* member set to the xid used. This allows to reuse all the code paths used
* in regular transactions, as PostgreSQL won't even know we are in a TPC
* until PREPARE. */
int
conn_tpc_begin(connectionObject *self, XidObject *xid)
{
PGresult *pgres = NULL;
char *error = NULL;
Dprintf("conn_tpc_begin: starting transaction");
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&self->lock);
if (pq_begin_locked(self, &pgres, &error, &_save) < 0) {
pthread_mutex_unlock(&(self->lock));
Py_BLOCK_THREADS;
pq_complete_error(self, &pgres, &error);
return -1;
}
pthread_mutex_unlock(&self->lock);
Py_END_ALLOW_THREADS;
/* The transaction started ok, let's store this xid. */
Py_INCREF(xid);
self->tpc_xid = xid;
return 0;
}

View File

@ -181,6 +181,54 @@ psyco_conn_xid(connectionObject *self, PyObject *args, PyObject *kwargs)
} }
#define psyco_conn_tpc_begin_doc \
"tpc_begin(xid) -- begin a TPC transaction with given transaction ID xid."
static PyObject *
psyco_conn_tpc_begin(connectionObject *self, PyObject *args)
{
PyObject *rv = NULL;
XidObject *xid = NULL;
PyObject *oxid;
EXC_IF_CONN_CLOSED(self);
EXC_IF_CONN_ASYNC(self, tpc_begin);
if (!PyArg_ParseTuple(args, "O", &oxid)) {
goto exit;
}
if (NULL == (xid = xid_ensure(oxid))) {
goto exit;
}
/* check we are not in a transaction */
if (self->status != CONN_STATUS_READY) {
PyErr_SetString(ProgrammingError,
"tpc_begin must be called outside a transaction");
goto exit;
}
/* two phase commit and autocommit make no point */
if (self->isolation_level == 0) {
PyErr_SetString(ProgrammingError,
"tpc_begin can't be called in autocommit mode");
goto exit;
}
if (conn_tpc_begin(self, xid) < 0) {
goto exit;
}
Py_INCREF(Py_None);
rv = Py_None;
exit:
Py_XDECREF(xid);
return rv;
}
#ifdef PSYCOPG_EXTENSIONS #ifdef PSYCOPG_EXTENSIONS
/* set_isolation_level method - switch connection isolation level */ /* set_isolation_level method - switch connection isolation level */
@ -509,6 +557,8 @@ static struct PyMethodDef connectionObject_methods[] = {
METH_VARARGS, psyco_conn_rollback_doc}, METH_VARARGS, psyco_conn_rollback_doc},
{"xid", (PyCFunction)psyco_conn_xid, {"xid", (PyCFunction)psyco_conn_xid,
METH_VARARGS|METH_KEYWORDS, psyco_conn_xid_doc}, METH_VARARGS|METH_KEYWORDS, psyco_conn_xid_doc},
{"tpc_begin", (PyCFunction)psyco_conn_tpc_begin,
METH_VARARGS, psyco_conn_tpc_begin_doc},
#ifdef PSYCOPG_EXTENSIONS #ifdef PSYCOPG_EXTENSIONS
{"set_isolation_level", (PyCFunction)psyco_conn_set_isolation_level, {"set_isolation_level", (PyCFunction)psyco_conn_set_isolation_level,
METH_VARARGS, psyco_conn_set_isolation_level_doc}, METH_VARARGS, psyco_conn_set_isolation_level_doc},

View File

@ -50,4 +50,6 @@ typedef struct {
PyObject *database; PyObject *database;
} XidObject; } XidObject;
HIDDEN XidObject *xid_ensure(PyObject *oxid);
#endif /* PSYCOPG_XID_H */ #endif /* PSYCOPG_XID_H */

View File

@ -295,3 +295,27 @@ PyTypeObject XidType = {
0, /*tp_subclasses*/ 0, /*tp_subclasses*/
0 /*tp_weaklist*/ 0 /*tp_weaklist*/
}; };
/* Convert a Python object into a proper xid.
*
* Return a new reference to the object or set an exception.
*
* The idea is that people can either create a xid from connection.xid
* or use a regular string they have found in PostgreSQL's pg_prepared_xacts
* in order to recover a transaction not generated by psycopg.
*/
XidObject *xid_ensure(PyObject *oxid)
{
/* TODO: string roundtrip. */
if (PyObject_TypeCheck(oxid, &XidType)) {
Py_INCREF(oxid);
return (XidObject *)oxid;
}
else {
PyErr_SetString(PyExc_TypeError,
"not a valid transaction id");
return NULL;
}
}