Added tpc_prepare and CONN_STATUS_PREPARED.

This commit is contained in:
Daniele Varrazzo 2010-10-08 22:03:06 +01:00
parent 4588fa50f2
commit 4f66de494b
9 changed files with 181 additions and 10 deletions

View File

@ -80,8 +80,9 @@ ISOLATION_LEVEL_READ_UNCOMMITTED = ISOLATION_LEVEL_READ_COMMITTED
STATUS_SETUP = 0
STATUS_READY = 1
STATUS_BEGIN = 2
STATUS_SYNC = 3
STATUS_ASYNC = 4
STATUS_SYNC = 3 # currently unused
STATUS_ASYNC = 4 # currently unused
STATUS_PREPARED = 5
# This is a usefull mnemonic to check if the connection is in a transaction
STATUS_IN_TRANSACTION = STATUS_BEGIN

View File

@ -38,9 +38,10 @@ extern "C" {
#endif
/* connection status */
#define CONN_STATUS_SETUP 0
#define CONN_STATUS_READY 1
#define CONN_STATUS_BEGIN 2
#define CONN_STATUS_SETUP 0
#define CONN_STATUS_READY 1
#define CONN_STATUS_BEGIN 2
#define CONN_STATUS_PREPARED 5
/* async connection building statuses */
#define CONN_STATUS_CONNECTING 20
#define CONN_STATUS_DATESTYLE 21
@ -132,6 +133,8 @@ HIDDEN int conn_switch_isolation_level(connectionObject *self, int level);
HIDDEN int conn_set_client_encoding(connectionObject *self, const char *enc);
HIDDEN int conn_poll(connectionObject *self);
HIDDEN int conn_tpc_begin(connectionObject *self, XidObject *xid);
HIDDEN int conn_tpc_command(connectionObject *self,
const char *cmd, XidObject *xid);
/* exception-raising macros */
#define EXC_IF_CONN_CLOSED(self) if ((self)->closed > 0) { \
@ -148,6 +151,12 @@ HIDDEN int conn_tpc_begin(connectionObject *self, XidObject *xid);
"during a two-phase transaction", #cmd); \
return NULL; }
#define EXC_IF_TPC_PREPARED(self, cmd) \
if ((self)->status == CONN_STATUS_PREPARED) { \
PyErr_Format(ProgrammingError, "%s cannot be used " \
"with a prepared two-phase transaction", #cmd); \
return NULL; }
#ifdef __cplusplus
}
#endif

View File

@ -928,3 +928,32 @@ conn_tpc_begin(connectionObject *self, XidObject *xid)
return 0;
}
/* conn_tpc_command -- run one of the TPC-related PostgreSQL commands.
*
* The function doesn't change the connection state as it can be used
* for many commands and for recovered transactions. */
int
conn_tpc_command(connectionObject *self, const char *cmd, XidObject *xid)
{
PGresult *pgres = NULL;
char *error = NULL;
int rv;
Dprintf("conn_tpc_command: %s", cmd);
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&self->lock);
rv = pq_tpc_command_locked(self, cmd, xid, &pgres, &error, &_save);
pthread_mutex_unlock(&self->lock);
Py_END_ALLOW_THREADS;
if (rv < 0) {
pq_complete_error(self, &pgres, &error);
}
return rv;
}

View File

@ -70,7 +70,8 @@ psyco_conn_cursor(connectionObject *self, PyObject *args, PyObject *keywds)
EXC_IF_CONN_CLOSED(self);
if (self->status != CONN_STATUS_READY &&
self->status != CONN_STATUS_BEGIN) {
self->status != CONN_STATUS_BEGIN &&
self->status != CONN_STATUS_PREPARED) {
PyErr_SetString(OperationalError,
"asynchronous connection attempt underway");
return NULL;
@ -231,6 +232,39 @@ exit:
}
#define psyco_conn_tpc_prepare_doc \
"tpc_prepare() -- perform the first phase of a two-phase transaction."
static PyObject *
psyco_conn_tpc_prepare(connectionObject *self, PyObject *args)
{
EXC_IF_CONN_CLOSED(self);
EXC_IF_CONN_ASYNC(self, tpc_prepare);
EXC_IF_TPC_PREPARED(self, tpc_prepare);
if (!PyArg_ParseTuple(args, "")) {
return NULL;
}
if (NULL == self->tpc_xid) {
PyErr_SetString(ProgrammingError,
"prepare must be called inside a two-phase transaction");
return NULL;
}
if (0 > conn_tpc_command(self, "PREPARE TRANSACTION", self->tpc_xid)) {
return NULL;
}
/* transaction prepared: set the state so that no operation
* can be performed until commit. */
self->status = CONN_STATUS_PREPARED;
Py_INCREF(Py_None);
return Py_None;
}
#ifdef PSYCOPG_EXTENSIONS
/* set_isolation_level method - switch connection isolation level */
@ -245,6 +279,7 @@ psyco_conn_set_isolation_level(connectionObject *self, PyObject *args)
EXC_IF_CONN_CLOSED(self);
EXC_IF_CONN_ASYNC(self, set_isolation_level);
EXC_IF_TPC_PREPARED(self, set_isolation_level);
if (!PyArg_ParseTuple(args, "i", &level)) return NULL;
@ -279,6 +314,7 @@ psyco_conn_set_client_encoding(connectionObject *self, PyObject *args)
EXC_IF_CONN_CLOSED(self);
EXC_IF_CONN_ASYNC(self, set_client_encoding);
EXC_IF_TPC_PREPARED(self, set_client_encoding);
if (!PyArg_ParseTuple(args, "s", &enc)) return NULL;
@ -379,6 +415,7 @@ psyco_conn_lobject(connectionObject *self, PyObject *args, PyObject *keywds)
EXC_IF_CONN_CLOSED(self);
EXC_IF_CONN_ASYNC(self, lobject);
EXC_IF_GREEN(lobject);
EXC_IF_TPC_PREPARED(self, lobject);
Dprintf("psyco_conn_lobject: new lobject for connection at %p", self);
Dprintf("psyco_conn_lobject: parameters: oid = %d, mode = %s",
@ -561,6 +598,8 @@ static struct PyMethodDef connectionObject_methods[] = {
METH_VARARGS|METH_KEYWORDS, psyco_conn_xid_doc},
{"tpc_begin", (PyCFunction)psyco_conn_tpc_begin,
METH_VARARGS, psyco_conn_tpc_begin_doc},
{"tpc_prepare", (PyCFunction)psyco_conn_tpc_prepare,
METH_VARARGS, psyco_conn_tpc_prepare_doc},
#ifdef PSYCOPG_EXTENSIONS
{"set_isolation_level", (PyCFunction)psyco_conn_set_isolation_level,
METH_VARARGS, psyco_conn_set_isolation_level_doc},

View File

@ -492,6 +492,7 @@ psyco_curs_execute(cursorObject *self, PyObject *args, PyObject *kwargs)
EXC_IF_CURS_CLOSED(self);
EXC_IF_ASYNC_IN_PROGRESS(self, execute);
EXC_IF_TPC_PREPARED(self->conn, execute);
if (_psyco_curs_execute(self, operation, vars, self->conn->async)) {
Py_INCREF(Py_None);
@ -511,12 +512,12 @@ psyco_curs_executemany(cursorObject *self, PyObject *args, PyObject *kwargs)
PyObject *operation = NULL, *vars = NULL;
PyObject *v, *iter = NULL;
int rowcount = 0;
static char *kwlist[] = {"query", "vars_list", NULL};
/* reset rowcount to -1 to avoid setting it when an exception is raised */
self->rowcount = -1;
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "OO", kwlist,
&operation, &vars)) {
return NULL;
@ -524,6 +525,7 @@ psyco_curs_executemany(cursorObject *self, PyObject *args, PyObject *kwargs)
EXC_IF_CURS_CLOSED(self);
EXC_IF_CURS_ASYNC(self, executemany);
EXC_IF_TPC_PREPARED(self->conn, executemany);
if (self->name != NULL) {
psyco_set_error(ProgrammingError, (PyObject*)self,
@ -746,6 +748,7 @@ psyco_curs_fetchone(cursorObject *self, PyObject *args)
char buffer[128];
EXC_IF_NO_MARK(self);
EXC_IF_TPC_PREPARED(self->conn, fetchone);
PyOS_snprintf(buffer, 127, "FETCH FORWARD 1 FROM %s", self->name);
if (pq_execute(self, buffer, 0) == -1) return NULL;
if (_psyco_curs_prefetch(self) < 0) return NULL;
@ -807,6 +810,7 @@ psyco_curs_fetchmany(cursorObject *self, PyObject *args, PyObject *kwords)
char buffer[128];
EXC_IF_NO_MARK(self);
EXC_IF_TPC_PREPARED(self->conn, fetchone);
PyOS_snprintf(buffer, 127, "FETCH FORWARD %d FROM %s",
(int)size, self->name);
if (pq_execute(self, buffer, 0) == -1) return NULL;
@ -880,6 +884,7 @@ psyco_curs_fetchall(cursorObject *self, PyObject *args)
char buffer[128];
EXC_IF_NO_MARK(self);
EXC_IF_TPC_PREPARED(self->conn, fetchall);
PyOS_snprintf(buffer, 127, "FETCH FORWARD ALL FROM %s", self->name);
if (pq_execute(self, buffer, 0) == -1) return NULL;
if (_psyco_curs_prefetch(self) < 0) return NULL;
@ -941,6 +946,7 @@ psyco_curs_callproc(cursorObject *self, PyObject *args, PyObject *kwargs)
EXC_IF_CURS_CLOSED(self);
EXC_IF_ASYNC_IN_PROGRESS(self, callproc);
EXC_IF_TPC_PREPARED(self->conn, callproc);
if (self->name != NULL) {
psyco_set_error(ProgrammingError, (PyObject*)self,
@ -1086,6 +1092,7 @@ psyco_curs_scroll(cursorObject *self, PyObject *args, PyObject *kwargs)
char buffer[128];
EXC_IF_NO_MARK(self);
EXC_IF_TPC_PREPARED(self->conn, scroll);
if (strcmp(mode, "absolute") == 0) {
PyOS_snprintf(buffer, 127, "MOVE ABSOLUTE %d FROM %s",
@ -1209,6 +1216,8 @@ psyco_curs_copy_from(cursorObject *self, PyObject *args, PyObject *kwargs)
EXC_IF_CURS_CLOSED(self);
EXC_IF_CURS_ASYNC(self, copy_from);
EXC_IF_GREEN(copy_from);
EXC_IF_TPC_PREPARED(self->conn, copy_from);
quoted_delimiter = psycopg_escape_string((PyObject*)self->conn, sep, 0, NULL, NULL);
if (quoted_delimiter == NULL) {
@ -1315,6 +1324,7 @@ psyco_curs_copy_to(cursorObject *self, PyObject *args, PyObject *kwargs)
EXC_IF_CURS_CLOSED(self);
EXC_IF_CURS_ASYNC(self, copy_to);
EXC_IF_GREEN(copy_to);
EXC_IF_TPC_PREPARED(self->conn, copy_to);
quoted_delimiter = psycopg_escape_string((PyObject*)self->conn, sep, 0, NULL, NULL);
if (quoted_delimiter == NULL) {
@ -1401,6 +1411,7 @@ psyco_curs_copy_expert(cursorObject *self, PyObject *args, PyObject *kwargs)
EXC_IF_CURS_CLOSED(self);
EXC_IF_CURS_ASYNC(self, copy_expert);
EXC_IF_GREEN(copy_expert);
EXC_IF_TPC_PREPARED(self->conn, copy_expert);
sql = _psyco_curs_validate_sql_basic(self, sql);

View File

@ -575,6 +575,7 @@ pq_reset_locked(connectionObject *conn, PGresult **pgres, char **error,
"SET SESSION AUTHORIZATION DEFAULT", pgres, error, tstate);
if (retvalue != 0) return retvalue;
/* should set the tpc xid to null: postponed until we get the GIL again */
conn->status = CONN_STATUS_READY;
return retvalue;
@ -600,12 +601,58 @@ pq_reset(connectionObject *conn)
conn_notice_process(conn);
if (retvalue < 0)
if (retvalue < 0) {
pq_complete_error(conn, &pgres, &error);
}
else {
Py_CLEAR(conn->tpc_xid);
}
return retvalue;
}
/* Call one of the PostgreSQL tpc-related commands.
*
* This function should only be called on a locked connection without
* holding the global interpreter lock. */
int
pq_tpc_command_locked(connectionObject *conn, const char *cmd, XidObject *xid,
PGresult **pgres, char **error,
PyThreadState **tstate)
{
int rv = -1;
char *tid = NULL, *etid = NULL, *buf = NULL;
Py_ssize_t buflen;
Dprintf("_pq_tpc_command: pgconn = %p, command = %s",
conn->pgconn, cmd);
/* convert the xid into the postgres transaction_id and quote it. */
if (!(tid = xid_get_tid(xid))) { goto exit; }
if (!(etid = psycopg_escape_string((PyObject *)conn, tid, 0, NULL, NULL)))
{ goto exit; }
/* prepare the command to the server */
buflen = 3 + strlen(cmd) + strlen(etid); /* add space, semicolon, zero */
if (!(buf = PyMem_Malloc(buflen))) {
PyErr_NoMemory();
goto exit;
}
if (0 > PyOS_snprintf(buf, buflen, "%s %s;", cmd, etid)) { goto exit; }
/* run the command and let it handle the error cases */
rv = pq_execute_command_locked(conn, buf, pgres, error, tstate);
exit:
PyMem_Free(buf);
PyMem_Free(etid);
PyMem_Free(tid);
return rv;
}
/* pq_is_busy - consume input and return connection status
a status of 1 means that a call to pq_fetch will block, while a status of 0

View File

@ -46,6 +46,10 @@ HIDDEN int pq_abort_locked(connectionObject *conn, PGresult **pgres,
char **error, PyThreadState **tstate);
HIDDEN int pq_abort(connectionObject *conn);
HIDDEN int pq_reset(connectionObject *conn);
HIDDEN int pq_tpc_command_locked(connectionObject *conn,
const char *cmd, XidObject *xid,
PGresult **pgres, char **error,
PyThreadState **tstate);
HIDDEN int pq_is_busy(connectionObject *conn);
HIDDEN int pq_is_busy_locked(connectionObject *conn);
HIDDEN int pq_flush(connectionObject *conn);

View File

@ -51,5 +51,6 @@ typedef struct {
} XidObject;
HIDDEN XidObject *xid_ensure(PyObject *oxid);
HIDDEN char *xid_get_tid(XidObject *self);
#endif /* PSYCOPG_XID_H */

View File

@ -319,3 +319,33 @@ XidObject *xid_ensure(PyObject *oxid)
}
}
/* Return the PostgreSQL transaction_id for this XA xid.
*
* PostgreSQL wants just a string, while the DBAPI supports the XA standard
* and thus a triple. We use the same conversion algorithm implemented by JDBC
* in order to allow some form of interoperation.
*
* Return a buffer allocated with PyMem_Malloc. Use PyMem_Free to free it.
*/
char *
xid_get_tid(XidObject *self)
{
/* TODO: for the moment just use the string mashed up by James.
* later will implement the JDBC algorithm. */
char *buf;
Py_ssize_t bufsize = 0;
if (self->pg_xact_id) {
bufsize = 1 + strlen(self->pg_xact_id);
}
buf = (char *)PyMem_Malloc(bufsize);
if (buf) {
strncpy(buf, self->pg_xact_id, bufsize);
}
return buf;
}