diff --git a/psycopg/connection.h b/psycopg/connection.h index 7d2f66c0..85e2d26b 100644 --- a/psycopg/connection.h +++ b/psycopg/connection.h @@ -135,6 +135,7 @@ HIDDEN int conn_poll(connectionObject *self); HIDDEN int conn_tpc_begin(connectionObject *self, XidObject *xid); HIDDEN int conn_tpc_command(connectionObject *self, const char *cmd, XidObject *xid); +HIDDEN PyObject *conn_tpc_recover(connectionObject *self); /* exception-raising macros */ #define EXC_IF_CONN_CLOSED(self) if ((self)->closed > 0) { \ diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c index 6024a2b3..d92d53f7 100644 --- a/psycopg/connection_int.c +++ b/psycopg/connection_int.c @@ -957,3 +957,36 @@ conn_tpc_command(connectionObject *self, const char *cmd, XidObject *xid) return rv; } +/* conn_tpc_recover -- return a list of pending TPC Xid */ + +PyObject * +conn_tpc_recover(connectionObject *self) +{ + int status; + PyObject *xids = NULL; + PyObject *rv = NULL; + PyObject *tmp; + + /* store the status to restore it. */ + status = self->status; + + if (!(xids = xid_recover((PyObject *)self))) { goto exit; } + + if (status == CONN_STATUS_READY && self->status == CONN_STATUS_BEGIN) { + /* recover began a transaction: let's abort it. */ + if (!(tmp = PyObject_CallMethod((PyObject *)self, "rollback", NULL))) { + goto exit; + } + Py_DECREF(tmp); + } + + /* all fine */ + rv = xids; + xids = NULL; + +exit: + Py_XDECREF(xids); + + return rv; + +} diff --git a/psycopg/connection_type.c b/psycopg/connection_type.c index 26e84455..48f27807 100644 --- a/psycopg/connection_type.c +++ b/psycopg/connection_type.c @@ -381,6 +381,22 @@ psyco_conn_tpc_rollback(connectionObject *self, PyObject *args) conn_rollback, "ROLLBACK PREPARED"); } +#define psyco_conn_tpc_recover_doc \ +"tpc_recover() -- returns a list of pending transaction IDs." + +static PyObject * +psyco_conn_tpc_recover(connectionObject *self, PyObject *args) +{ + EXC_IF_CONN_CLOSED(self); + EXC_IF_CONN_ASYNC(self, tpc_recover); + EXC_IF_TPC_PREPARED(self, tpc_recover); + + if (!PyArg_ParseTuple(args, "")) { return NULL; } + + return conn_tpc_recover(self); +} + + #ifdef PSYCOPG_EXTENSIONS /* set_isolation_level method - switch connection isolation level */ @@ -720,6 +736,8 @@ static struct PyMethodDef connectionObject_methods[] = { METH_VARARGS, psyco_conn_tpc_commit_doc}, {"tpc_rollback", (PyCFunction)psyco_conn_tpc_rollback, METH_VARARGS, psyco_conn_tpc_rollback_doc}, + {"tpc_recover", (PyCFunction)psyco_conn_tpc_recover, + METH_VARARGS, psyco_conn_tpc_recover_doc}, #ifdef PSYCOPG_EXTENSIONS {"set_isolation_level", (PyCFunction)psyco_conn_set_isolation_level, METH_VARARGS, psyco_conn_set_isolation_level_doc}, diff --git a/psycopg/xid.h b/psycopg/xid.h index eec4c7ec..79446349 100644 --- a/psycopg/xid.h +++ b/psycopg/xid.h @@ -31,6 +31,9 @@ #include "psycopg/config.h" +/* value for the format_id when the xid doesn't follow the XA standard. */ +#define XID_UNPARSED (-2) + extern HIDDEN PyTypeObject XidType; typedef struct { @@ -51,6 +54,8 @@ typedef struct { } XidObject; HIDDEN XidObject *xid_ensure(PyObject *oxid); +HIDDEN XidObject *xid_from_string(PyObject *s); HIDDEN char *xid_get_tid(XidObject *self); +HIDDEN PyObject *xid_recover(PyObject *conn); #endif /* PSYCOPG_XID_H */ diff --git a/psycopg/xid_type.c b/psycopg/xid_type.c index 1aa7b068..ad1e0915 100644 --- a/psycopg/xid_type.c +++ b/psycopg/xid_type.c @@ -331,21 +331,170 @@ XidObject *xid_ensure(PyObject *oxid) char * xid_get_tid(XidObject *self) { - /* TODO: for the moment just use the string mashed up by James. - * later will implement the JDBC algorithm. */ - char *buf; + char *buf = NULL; + long format_id; Py_ssize_t bufsize = 0; - if (self->pg_xact_id) { - bufsize = 1 + strlen(self->pg_xact_id); - } + format_id = PyInt_AsLong(self->format_id); + if (-1 == format_id && PyErr_Occurred()) { goto exit; } - buf = (char *)PyMem_Malloc(bufsize); - if (buf) { + if (XID_UNPARSED == format_id) { + bufsize = 1 + PyString_Size(self->gtrid); + if (!(buf = (char *)PyMem_Malloc(bufsize))) { + PyErr_NoMemory(); + goto exit; + } + strncpy(buf, PyString_AsString(self->gtrid), bufsize); + } + else { + /* TODO: for the moment just use the string mashed up by James. + * later will implement the JDBC algorithm. */ + bufsize = 1 + strlen(self->pg_xact_id); + if (!(buf = (char *)PyMem_Malloc(bufsize))) { + PyErr_NoMemory(); + goto exit; + } strncpy(buf, self->pg_xact_id, bufsize); } +exit: return buf; } +/* Build a Xid from a string representation. + * + * If the xid is in the format generated by Psycopg, unpack the tuple into + * the struct members. Otherwise generate an "unparsed" xid. + */ +XidObject * +xid_from_string(PyObject *str) { + /* TODO: currently always generates an unparsed xid. */ + XidObject *xid = NULL; + XidObject *rv = NULL; + PyObject *format_id = NULL; + PyObject *tmp; + /* fake args to work around the checks performed by the xid init */ + if (!(xid = (XidObject *)PyObject_CallFunction((PyObject *)&XidType, + "iss", 0, "tmp", "tmp"))) { + goto exit; + } + + /* set xid.gtrid */ + tmp = xid->gtrid; + Py_INCREF(str); + xid->gtrid = str; + Py_DECREF(tmp); + + /* set xid.format_id */ + if (!(format_id = PyInt_FromLong(XID_UNPARSED))) { goto exit; } + tmp = xid->format_id; + xid->format_id = format_id; + format_id = NULL; + Py_DECREF(tmp); + + /* set xid.bqual */ + tmp = xid->bqual; + Py_INCREF(Py_None); + xid->bqual = Py_None; + Py_DECREF(tmp); + + /* return the finished object */ + rv = xid; + xid = NULL; + +exit: + Py_XDECREF(format_id); + Py_XDECREF(xid); + + return rv; +} + +/* conn_tpc_recover -- return a list of pending TPC Xid */ + +PyObject * +xid_recover(PyObject *conn) +{ + PyObject *rv = NULL; + PyObject *curs = NULL; + PyObject *xids = NULL; + XidObject *xid = NULL; + PyObject *recs = NULL; + PyObject *rec = NULL; + PyObject *item = NULL; + PyObject *tmp; + Py_ssize_t len, i; + + /* curs = conn.cursor() */ + if (!(curs = PyObject_CallMethod(conn, "cursor", NULL))) { goto exit; } + + /* curs.execute(...) */ + if (!(tmp = PyObject_CallMethod(curs, "execute", "s", + "SELECT gid, prepared, owner, database FROM pg_prepared_xacts;"))) + { + goto exit; + } + Py_DECREF(tmp); + + /* recs = curs.fetchall() */ + if (!(recs = PyObject_CallMethod(curs, "fetchall", NULL))) { goto exit; } + + /* curs.close() */ + if (!(tmp = PyObject_CallMethod(curs, "close", NULL))) { goto exit; } + Py_DECREF(tmp); + + /* Build the list with return values. */ + if (0 > (len = PySequence_Size(recs))) { goto exit; } + if (!(xids = PyList_New(len))) { goto exit; } + + /* populate the xids list */ + for (i = 0; i < len; ++i) { + if (!(rec = PySequence_GetItem(recs, i))) { goto exit; } + + /* Get the xid with the XA triple set */ + if (!(item = PySequence_GetItem(rec, 0))) { goto exit; } + if (!(xid = xid_from_string(item))) { goto exit; } + Py_DECREF(item); item = NULL; + + /* set xid.prepared */ + if (!(item = PySequence_GetItem(rec, 1))) { goto exit; } + tmp = xid->prepared; + xid->prepared = item; + Py_DECREF(tmp); + item = NULL; + + /* set xid.owner */ + if (!(item = PySequence_GetItem(rec, 2))) { goto exit; } + tmp = xid->owner; + xid->owner = item; + Py_DECREF(tmp); + item = NULL; + + /* set xid.database */ + if (!(item = PySequence_GetItem(rec, 3))) { goto exit; } + tmp = xid->database; + xid->database = item; + Py_DECREF(tmp); + item = NULL; + + /* xid finished: add it to the returned list */ + PyList_SET_ITEM(xids, i, (PyObject *)xid); + xid = NULL; /* ref stolen */ + + Py_DECREF(rec); rec = NULL; + } + + /* set the return value. */ + rv = xids; + xids = NULL; + +exit: + Py_XDECREF(xids); + Py_XDECREF(xid); + Py_XDECREF(curs); + Py_XDECREF(recs); + Py_XDECREF(rec); + Py_XDECREF(item); + + return rv; +} diff --git a/tests/test_connection.py b/tests/test_connection.py index 4eff8678..6c6dddca 100644 --- a/tests/test_connection.py +++ b/tests/test_connection.py @@ -1,6 +1,8 @@ #!/usr/bin/env python import unittest +from operator import attrgetter + import psycopg2 import psycopg2.extensions import tests @@ -64,6 +66,73 @@ class ConnectionTests(unittest.TestCase): conn = self.connect() self.assert_(conn.encoding in psycopg2.extensions.encodings) + +class ConnectionTwoPhaseTests(unittest.TestCase): + def setUp(self): + self.clear_test_xacts() + + def tearDown(self): + self.clear_test_xacts() + + def clear_test_xacts(self): + """Rollback all the prepared transaction in the testing db.""" + cnn = self.connect() + cnn.set_isolation_level(0) + cur = cnn.cursor() + cur.execute( + "select gid from pg_prepared_xacts where database = %s", + (tests.dbname,)) + gids = [ r[0] for r in cur ] + for gid in gids: + cur.execute("rollback prepared %s;", (gid,)) + cnn.close() + + def connect(self): + return psycopg2.connect(tests.dsn) + + def test_status_after_recover(self): + cnn = self.connect() + self.assertEqual(psycopg2.extensions.STATUS_READY, cnn.status) + xns = cnn.tpc_recover() + self.assertEqual(psycopg2.extensions.STATUS_READY, cnn.status) + + cur = cnn.cursor() + cur.execute("select 1") + self.assertEqual(psycopg2.extensions.STATUS_BEGIN, cnn.status) + xns = cnn.tpc_recover() + self.assertEqual(psycopg2.extensions.STATUS_BEGIN, cnn.status) + + def test_recovered_xids(self): + # insert a few test xns + cnn = self.connect() + cnn.set_isolation_level(0) + cur = cnn.cursor() + cur.execute("begin; prepare transaction '1-foo';") + cur.execute("begin; prepare transaction '2-bar';") + + # read the values to return + cur.execute(""" + select gid, prepared, owner, database + from pg_prepared_xacts + where database = %s;""", + (tests.dbname,)) + okvals = cur.fetchall() + okvals.sort() + + cnn = self.connect() + xids = cnn.tpc_recover() + xids = [ xid for xid in xids if xid.database == tests.dbname ] + xids.sort(key=attrgetter('gtrid')) + + # check the values returned + self.assertEqual(len(okvals), len(xids)) + for (xid, (gid, prepared, owner, database)) in zip (xids, okvals): + self.assertEqual(xid.gtrid, gid) + self.assertEqual(xid.prepared, prepared) + self.assertEqual(xid.owner, owner) + self.assertEqual(xid.database, database) + + def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__)