Added tpc_recover method.

This commit is contained in:
Daniele Varrazzo 2010-10-11 13:03:37 +01:00
parent 09983db6ed
commit 56c02b0f94
6 changed files with 283 additions and 8 deletions

View File

@ -135,6 +135,7 @@ HIDDEN int conn_poll(connectionObject *self);
HIDDEN int conn_tpc_begin(connectionObject *self, XidObject *xid);
HIDDEN int conn_tpc_command(connectionObject *self,
const char *cmd, XidObject *xid);
HIDDEN PyObject *conn_tpc_recover(connectionObject *self);
/* exception-raising macros */
#define EXC_IF_CONN_CLOSED(self) if ((self)->closed > 0) { \

View File

@ -957,3 +957,36 @@ conn_tpc_command(connectionObject *self, const char *cmd, XidObject *xid)
return rv;
}
/* conn_tpc_recover -- return a list of pending TPC Xid */
PyObject *
conn_tpc_recover(connectionObject *self)
{
int status;
PyObject *xids = NULL;
PyObject *rv = NULL;
PyObject *tmp;
/* store the status to restore it. */
status = self->status;
if (!(xids = xid_recover((PyObject *)self))) { goto exit; }
if (status == CONN_STATUS_READY && self->status == CONN_STATUS_BEGIN) {
/* recover began a transaction: let's abort it. */
if (!(tmp = PyObject_CallMethod((PyObject *)self, "rollback", NULL))) {
goto exit;
}
Py_DECREF(tmp);
}
/* all fine */
rv = xids;
xids = NULL;
exit:
Py_XDECREF(xids);
return rv;
}

View File

@ -381,6 +381,22 @@ psyco_conn_tpc_rollback(connectionObject *self, PyObject *args)
conn_rollback, "ROLLBACK PREPARED");
}
#define psyco_conn_tpc_recover_doc \
"tpc_recover() -- returns a list of pending transaction IDs."
static PyObject *
psyco_conn_tpc_recover(connectionObject *self, PyObject *args)
{
EXC_IF_CONN_CLOSED(self);
EXC_IF_CONN_ASYNC(self, tpc_recover);
EXC_IF_TPC_PREPARED(self, tpc_recover);
if (!PyArg_ParseTuple(args, "")) { return NULL; }
return conn_tpc_recover(self);
}
#ifdef PSYCOPG_EXTENSIONS
/* set_isolation_level method - switch connection isolation level */
@ -720,6 +736,8 @@ static struct PyMethodDef connectionObject_methods[] = {
METH_VARARGS, psyco_conn_tpc_commit_doc},
{"tpc_rollback", (PyCFunction)psyco_conn_tpc_rollback,
METH_VARARGS, psyco_conn_tpc_rollback_doc},
{"tpc_recover", (PyCFunction)psyco_conn_tpc_recover,
METH_VARARGS, psyco_conn_tpc_recover_doc},
#ifdef PSYCOPG_EXTENSIONS
{"set_isolation_level", (PyCFunction)psyco_conn_set_isolation_level,
METH_VARARGS, psyco_conn_set_isolation_level_doc},

View File

@ -31,6 +31,9 @@
#include "psycopg/config.h"
/* value for the format_id when the xid doesn't follow the XA standard. */
#define XID_UNPARSED (-2)
extern HIDDEN PyTypeObject XidType;
typedef struct {
@ -51,6 +54,8 @@ typedef struct {
} XidObject;
HIDDEN XidObject *xid_ensure(PyObject *oxid);
HIDDEN XidObject *xid_from_string(PyObject *s);
HIDDEN char *xid_get_tid(XidObject *self);
HIDDEN PyObject *xid_recover(PyObject *conn);
#endif /* PSYCOPG_XID_H */

View File

@ -331,21 +331,170 @@ XidObject *xid_ensure(PyObject *oxid)
char *
xid_get_tid(XidObject *self)
{
/* TODO: for the moment just use the string mashed up by James.
* later will implement the JDBC algorithm. */
char *buf;
char *buf = NULL;
long format_id;
Py_ssize_t bufsize = 0;
if (self->pg_xact_id) {
bufsize = 1 + strlen(self->pg_xact_id);
}
format_id = PyInt_AsLong(self->format_id);
if (-1 == format_id && PyErr_Occurred()) { goto exit; }
buf = (char *)PyMem_Malloc(bufsize);
if (buf) {
if (XID_UNPARSED == format_id) {
bufsize = 1 + PyString_Size(self->gtrid);
if (!(buf = (char *)PyMem_Malloc(bufsize))) {
PyErr_NoMemory();
goto exit;
}
strncpy(buf, PyString_AsString(self->gtrid), bufsize);
}
else {
/* TODO: for the moment just use the string mashed up by James.
* later will implement the JDBC algorithm. */
bufsize = 1 + strlen(self->pg_xact_id);
if (!(buf = (char *)PyMem_Malloc(bufsize))) {
PyErr_NoMemory();
goto exit;
}
strncpy(buf, self->pg_xact_id, bufsize);
}
exit:
return buf;
}
/* Build a Xid from a string representation.
*
* If the xid is in the format generated by Psycopg, unpack the tuple into
* the struct members. Otherwise generate an "unparsed" xid.
*/
XidObject *
xid_from_string(PyObject *str) {
/* TODO: currently always generates an unparsed xid. */
XidObject *xid = NULL;
XidObject *rv = NULL;
PyObject *format_id = NULL;
PyObject *tmp;
/* fake args to work around the checks performed by the xid init */
if (!(xid = (XidObject *)PyObject_CallFunction((PyObject *)&XidType,
"iss", 0, "tmp", "tmp"))) {
goto exit;
}
/* set xid.gtrid */
tmp = xid->gtrid;
Py_INCREF(str);
xid->gtrid = str;
Py_DECREF(tmp);
/* set xid.format_id */
if (!(format_id = PyInt_FromLong(XID_UNPARSED))) { goto exit; }
tmp = xid->format_id;
xid->format_id = format_id;
format_id = NULL;
Py_DECREF(tmp);
/* set xid.bqual */
tmp = xid->bqual;
Py_INCREF(Py_None);
xid->bqual = Py_None;
Py_DECREF(tmp);
/* return the finished object */
rv = xid;
xid = NULL;
exit:
Py_XDECREF(format_id);
Py_XDECREF(xid);
return rv;
}
/* conn_tpc_recover -- return a list of pending TPC Xid */
PyObject *
xid_recover(PyObject *conn)
{
PyObject *rv = NULL;
PyObject *curs = NULL;
PyObject *xids = NULL;
XidObject *xid = NULL;
PyObject *recs = NULL;
PyObject *rec = NULL;
PyObject *item = NULL;
PyObject *tmp;
Py_ssize_t len, i;
/* curs = conn.cursor() */
if (!(curs = PyObject_CallMethod(conn, "cursor", NULL))) { goto exit; }
/* curs.execute(...) */
if (!(tmp = PyObject_CallMethod(curs, "execute", "s",
"SELECT gid, prepared, owner, database FROM pg_prepared_xacts;")))
{
goto exit;
}
Py_DECREF(tmp);
/* recs = curs.fetchall() */
if (!(recs = PyObject_CallMethod(curs, "fetchall", NULL))) { goto exit; }
/* curs.close() */
if (!(tmp = PyObject_CallMethod(curs, "close", NULL))) { goto exit; }
Py_DECREF(tmp);
/* Build the list with return values. */
if (0 > (len = PySequence_Size(recs))) { goto exit; }
if (!(xids = PyList_New(len))) { goto exit; }
/* populate the xids list */
for (i = 0; i < len; ++i) {
if (!(rec = PySequence_GetItem(recs, i))) { goto exit; }
/* Get the xid with the XA triple set */
if (!(item = PySequence_GetItem(rec, 0))) { goto exit; }
if (!(xid = xid_from_string(item))) { goto exit; }
Py_DECREF(item); item = NULL;
/* set xid.prepared */
if (!(item = PySequence_GetItem(rec, 1))) { goto exit; }
tmp = xid->prepared;
xid->prepared = item;
Py_DECREF(tmp);
item = NULL;
/* set xid.owner */
if (!(item = PySequence_GetItem(rec, 2))) { goto exit; }
tmp = xid->owner;
xid->owner = item;
Py_DECREF(tmp);
item = NULL;
/* set xid.database */
if (!(item = PySequence_GetItem(rec, 3))) { goto exit; }
tmp = xid->database;
xid->database = item;
Py_DECREF(tmp);
item = NULL;
/* xid finished: add it to the returned list */
PyList_SET_ITEM(xids, i, (PyObject *)xid);
xid = NULL; /* ref stolen */
Py_DECREF(rec); rec = NULL;
}
/* set the return value. */
rv = xids;
xids = NULL;
exit:
Py_XDECREF(xids);
Py_XDECREF(xid);
Py_XDECREF(curs);
Py_XDECREF(recs);
Py_XDECREF(rec);
Py_XDECREF(item);
return rv;
}

View File

@ -1,6 +1,8 @@
#!/usr/bin/env python
import unittest
from operator import attrgetter
import psycopg2
import psycopg2.extensions
import tests
@ -64,6 +66,73 @@ class ConnectionTests(unittest.TestCase):
conn = self.connect()
self.assert_(conn.encoding in psycopg2.extensions.encodings)
class ConnectionTwoPhaseTests(unittest.TestCase):
def setUp(self):
self.clear_test_xacts()
def tearDown(self):
self.clear_test_xacts()
def clear_test_xacts(self):
"""Rollback all the prepared transaction in the testing db."""
cnn = self.connect()
cnn.set_isolation_level(0)
cur = cnn.cursor()
cur.execute(
"select gid from pg_prepared_xacts where database = %s",
(tests.dbname,))
gids = [ r[0] for r in cur ]
for gid in gids:
cur.execute("rollback prepared %s;", (gid,))
cnn.close()
def connect(self):
return psycopg2.connect(tests.dsn)
def test_status_after_recover(self):
cnn = self.connect()
self.assertEqual(psycopg2.extensions.STATUS_READY, cnn.status)
xns = cnn.tpc_recover()
self.assertEqual(psycopg2.extensions.STATUS_READY, cnn.status)
cur = cnn.cursor()
cur.execute("select 1")
self.assertEqual(psycopg2.extensions.STATUS_BEGIN, cnn.status)
xns = cnn.tpc_recover()
self.assertEqual(psycopg2.extensions.STATUS_BEGIN, cnn.status)
def test_recovered_xids(self):
# insert a few test xns
cnn = self.connect()
cnn.set_isolation_level(0)
cur = cnn.cursor()
cur.execute("begin; prepare transaction '1-foo';")
cur.execute("begin; prepare transaction '2-bar';")
# read the values to return
cur.execute("""
select gid, prepared, owner, database
from pg_prepared_xacts
where database = %s;""",
(tests.dbname,))
okvals = cur.fetchall()
okvals.sort()
cnn = self.connect()
xids = cnn.tpc_recover()
xids = [ xid for xid in xids if xid.database == tests.dbname ]
xids.sort(key=attrgetter('gtrid'))
# check the values returned
self.assertEqual(len(okvals), len(xids))
for (xid, (gid, prepared, owner, database)) in zip (xids, okvals):
self.assertEqual(xid.gtrid, gid)
self.assertEqual(xid.prepared, prepared)
self.assertEqual(xid.owner, owner)
self.assertEqual(xid.database, database)
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)