Merge branch 'transaction-control' into devel

This commit is contained in:
Daniele Varrazzo 2011-06-03 01:48:24 +01:00
commit 0eb5e0430e
16 changed files with 764 additions and 124 deletions

8
NEWS
View File

@ -1,8 +1,12 @@
What's new in psycopg 2.4.2
---------------------------
- Allow using the isolation level "repeatable read" which is distinct
from "serializable" in PostgreSQL 9.1.
- Added 'set_transaction()' method and 'autocommit' property to the
connection. Added support for read-only sessions and, for PostgreSQL
9.1, for the "repeatable read" isolation level and the "deferrable"
transaction property.
- Psycopg doesn't execute queries at connection time to find the
default isolation level.
- Don't build mx.DateTime support if the module can't be imported
(ticket #53).
- Fixed escape for negative numbers prefixed by minus operator

View File

@ -239,9 +239,8 @@ be sent from Python code simply executing a :sql:`NOTIFY` command in an
`~cursor.execute()` call.
Because of the way sessions interact with notifications (see |NOTIFY|_
documentation), you should keep the connection in :ref:`autocommit
<autocommit>` mode if you wish to receive or send notifications in a timely
manner.
documentation), you should keep the connection in `~connection.autocommit`
mode if you wish to receive or send notifications in a timely manner.
.. |LISTEN| replace:: :sql:`LISTEN`
.. _LISTEN: http://www.postgresql.org/docs/9.0/static/sql-listen.html
@ -373,12 +372,14 @@ When an asynchronous query is being executed, `connection.isexecuting()` returns
connection.
There are several limitations in using asynchronous connections: the
connection is always in :ref:`autocommit <autocommit>` mode and it is not
possible to change it using `~connection.set_isolation_level()`. So a
connection is always in `~connection.autocommit` mode and it is not
possible to change it. So a
transaction is not implicitly started at the first query and is not possible
to use methods `~connection.commit()` and `~connection.rollback()`: you can
manually control transactions using `~cursor.execute()` to send database
commands such as :sql:`BEGIN`, :sql:`COMMIT` and :sql:`ROLLBACK`.
commands such as :sql:`BEGIN`, :sql:`COMMIT` and :sql:`ROLLBACK`. Similarly
`set_transaction()` can't be used but it is still possible to invoke the
:sql:`SET` command with the proper :sql:`default_transaction_...` parameter.
With asynchronous connections it is also not possible to use
`~connection.set_client_encoding()`, `~cursor.executemany()`, :ref:`large

View File

@ -111,10 +111,10 @@ rst_epilog = """
.. _DBAPI: http://www.python.org/dev/peps/pep-0249/
.. _transaction isolation level:
http://www.postgresql.org/docs/9.0/static/transaction-iso.html
http://www.postgresql.org/docs/9.1/static/transaction-iso.html
.. _serializable isolation level:
http://www.postgresql.org/docs/9.0/static/transaction-iso.html#XACT-SERIALIZABLE
http://www.postgresql.org/docs/9.1/static/transaction-iso.html#XACT-SERIALIZABLE
.. _mx.DateTime: http://www.egenix.com/products/python/mxBase/mxDateTime/

View File

@ -327,11 +327,93 @@ The ``connection`` class
pair: Transaction; Autocommit
pair: Transaction; Isolation level
.. _autocommit:
.. method:: set_transaction([isolation_level,] [readonly,] [deferrable,] [autocommit])
Set one or more parameters for the next transactions or statements in
the current session. See |SET TRANSACTION|_ for further details.
.. |SET TRANSACTION| replace:: :sql:`SET TRANSACTION`
.. _SET TRANSACTION: http://www.postgresql.org/docs/9.1/static/sql-set-transaction.html
:param isolation_level: set the `isolation level`_ for the next
transactions/statements. The value can be one of the
:ref:`constants <isolation-level-constants>` defined in the
`~psycopg2.extensions` module or one of the literal values
``read uncommitted``, ``read committed``, ``repeatable read``,
``serializable``.
:param readonly: if `!True`, set the connection to read only;
read/write if `!False`.
:param deferrable: if `!True`, set the connection to deferrable;
non deferrable if `!False`. Only available from PostgreSQL 9.1.
:param autocommit: switch the connection to autocommit mode: not a
PostgreSQL session setting but an alias for setting the
`autocommit` attribute.
The parameters *isolation_level*, *readonly* and *deferrable* also
accept the string ``default`` as a value: the effect is to reset the
parameter to the server default.
.. _isolation level:
http://www.postgresql.org/docs/9.1/static/transaction-iso.html
The function must be invoked with no transaction in progress. At every
function invocation, only the specified parameters are changed.
The default for the values are defined by the server configuration:
see values for |default_transaction_isolation|__,
|default_transaction_read_only|__, |default_transaction_deferrable|__.
.. |default_transaction_isolation| replace:: :sql:`default_transaction_isolation`
.. __: http://www.postgresql.org/docs/9.1/static/runtime-config-client.html#GUC-DEFAULT-TRANSACTION-ISOLATION
.. |default_transaction_read_only| replace:: :sql:`default_transaction_read_only`
.. __: http://www.postgresql.org/docs/9.1/static/runtime-config-client.html#GUC-DEFAULT-TRANSACTION-READ-ONLY
.. |default_transaction_deferrable| replace:: :sql:`default_transaction_deferrable`
.. __: http://www.postgresql.org/docs/9.1/static/runtime-config-client.html#GUC-DEFAULT-TRANSACTION-DEFERRABLE
.. note::
There is currently no builtin method to read the current value for
the parameters: use :sql:`SHOW default_transaction_...` to read
the values from the backend.
.. versionadded:: 2.4.2
.. attribute:: autocommit
Read/write attribute: if `!True`, no transaction is handled by the
driver and every statement sent to the backend has immediate effect;
if `!False` a new transaction is started at the first command
execution: the methods `commit()` or `rollback()` must be manually
invoked to terminate the transaction.
The autocommit mode is useful to execute commands requiring to be run
outside a transaction, such as :sql:`CREATE DATABASE` or
:sql:`VACUUM`.
The default is `!False` (manual commit) as per DBAPI specification.
.. warning::
By default, any query execution, including a simple :sql:`SELECT`
will start a transaction: for long-running program, if no further
action is taken, the session will remain "idle in transaction", a
condition non desiderable for several reasons (locks are held by
the session, tables bloat...). For long lived scripts, either
ensure to terminate a transaction as soon as possible or use an
autocommit connection.
.. versionadded:: 2.4.2
.. attribute:: isolation_level
.. method:: set_isolation_level(level)
.. note::
From version 2.4.2, `set_transaction()` and `autocommit`, offer
finer control on the transaction characteristics.
Read or set the `transaction isolation level`_ for the current session.
The level defines the different phenomena that can happen in the
database between concurrent transactions.

View File

@ -22,8 +22,8 @@ Why does `!psycopg2` leave database sessions "idle in transaction"?
call one of the transaction closing methods before leaving the connection
unused for a long time (which may also be a few seconds, depending on the
concurrency level in your database). Alternatively you can use a
connection in :ref:`autocommit <autocommit>` mode to avoid a new
transaction to be started at the first command.
connection in `~connection.autocommit` mode to avoid a new transaction to
be started at the first command.
I receive the error *current transaction is aborted, commands ignored until end of transaction block* and can't do anything else!
There was a problem *in the previous* command to the database, which

View File

@ -519,6 +519,10 @@ outside any transaction: in order to be able to run these commands from
Psycopg, the session must be in autocommit mode. Read the documentation for
`connection.set_isolation_level()` to know how to change the commit mode.
.. note::
From version 2.4.2 you can use the `~connection.autocommit` property to
switch a connection in autocommit mode.
.. index::

View File

@ -61,15 +61,6 @@ extern "C" {
#define psyco_datestyle "SET DATESTYLE TO 'ISO'"
#define psyco_transaction_isolation "SHOW default_transaction_isolation"
/* possible values for isolation_level */
typedef enum {
ISOLATION_LEVEL_AUTOCOMMIT = 0,
ISOLATION_LEVEL_READ_UNCOMMITTED = 1,
ISOLATION_LEVEL_READ_COMMITTED = 2,
ISOLATION_LEVEL_REPEATABLE_READ = 3,
ISOLATION_LEVEL_SERIALIZABLE = 4,
} conn_isolation_level_t;
extern HIDDEN PyTypeObject connectionType;
struct connectionObject_notice {
@ -89,7 +80,6 @@ typedef struct {
long int closed; /* 1 means connection has been closed;
2 that something horrible happened */
long int isolation_level; /* isolation level for this connection */
long int mark; /* number of commits/rollbacks done so far */
int status; /* status of the connection */
XidObject *tpc_xid; /* Transaction ID in two-phase commit */
@ -119,12 +109,20 @@ typedef struct {
int equote; /* use E''-style quotes for escaped strings */
PyObject *weakreflist; /* list of weak references */
int autocommit;
} connectionObject;
/* map isolation level values into a numeric const */
typedef struct {
char *name;
int value;
} IsolationLevel;
/* C-callable functions in connection_int.c and connection_ext.c */
HIDDEN PyObject *conn_text_from_chars(connectionObject *pgconn, const char *str);
HIDDEN int conn_get_standard_conforming_strings(PGconn *pgconn);
HIDDEN int conn_get_isolation_level(PGresult *pgres);
HIDDEN int conn_get_isolation_level(connectionObject *self);
HIDDEN int conn_get_protocol_version(PGconn *pgconn);
HIDDEN int conn_get_server_version(PGconn *pgconn);
HIDDEN PGcancel *conn_get_cancel(PGconn *pgconn);
@ -136,6 +134,8 @@ HIDDEN int conn_connect(connectionObject *self, long int async);
HIDDEN void conn_close(connectionObject *self);
HIDDEN int conn_commit(connectionObject *self);
HIDDEN int conn_rollback(connectionObject *self);
HIDDEN int conn_set(connectionObject *self, const char *param, const char *value);
HIDDEN int conn_set_autocommit(connectionObject *self, int value);
HIDDEN int conn_switch_isolation_level(connectionObject *self, int level);
HIDDEN int conn_set_client_encoding(connectionObject *self, const char *enc);
HIDDEN int conn_poll(connectionObject *self);
@ -154,6 +154,13 @@ HIDDEN PyObject *conn_tpc_recover(connectionObject *self);
"in asynchronous mode"); \
return NULL; }
#define EXC_IF_IN_TRANSACTION(self, cmd) \
if (self->status != CONN_STATUS_READY) { \
PyErr_Format(ProgrammingError, \
"%s cannot be used inside a transaction", #cmd); \
return NULL; \
}
#define EXC_IF_TPC_NOT_SUPPORTED(self) \
if ((self)->server_version < 80100) { \
PyErr_Format(NotSupportedError, \

View File

@ -34,6 +34,19 @@
#include <string.h>
/* Mapping from isolation level name to value exposed by Python.
* Only used for backward compatibility by the isolation_level property */
const IsolationLevel conn_isolevels[] = {
{"", 0}, /* autocommit */
{"read uncommitted", 1},
{"read committed", 2},
{"repeatable read", 3},
{"serializable", 4},
{"default", -1},
{ NULL }
};
/* Return a new "string" from a char* from the database.
*
@ -358,22 +371,60 @@ exit:
return rv;
}
int
conn_get_isolation_level(PGresult *pgres)
conn_get_isolation_level(connectionObject *self)
{
static const char lvl1a[] = "read uncommitted";
static const char lvl1b[] = "read committed";
int rv;
PGresult *pgres;
int rv = -1;
char *lname;
const IsolationLevel *level;
char *isolation_level = PQgetvalue(pgres, 0, 0);
/* this may get called by async connections too: here's your result */
if (self->autocommit) {
return 0;
}
if ((strcmp(lvl1b, isolation_level) == 0) /* most likely */
|| (strcmp(lvl1a, isolation_level) == 0))
rv = ISOLATION_LEVEL_READ_COMMITTED;
else /* if it's not one of the lower ones, it's SERIALIZABLE */
rv = ISOLATION_LEVEL_SERIALIZABLE;
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&self->lock);
Py_BLOCK_THREADS;
CLEARPGRES(pgres);
if (!psyco_green()) {
Py_UNBLOCK_THREADS;
pgres = PQexec(self->pgconn, psyco_transaction_isolation);
Py_BLOCK_THREADS;
} else {
pgres = psyco_exec_green(self, psyco_transaction_isolation);
}
if (pgres == NULL || PQresultStatus(pgres) != PGRES_TUPLES_OK) {
PyErr_SetString(OperationalError,
"can't fetch default_transaction_isolation");
goto endlock;
}
/* find the value for the requested isolation level */
lname = PQgetvalue(pgres, 0, 0);
level = conn_isolevels;
while ((++level)->name) {
if (0 == strcasecmp(level->name, lname)) {
rv = level->value;
break;
}
}
if (-1 == rv) {
char msg[256];
snprintf(msg, sizeof(msg), "unexpected isolation level: '%s'", lname);
PyErr_SetString(OperationalError, msg);
}
endlock:
IFCLEARPGRES(pgres);
Py_UNBLOCK_THREADS;
pthread_mutex_unlock(&self->lock);
Py_END_ALLOW_THREADS;
return rv;
}
@ -477,24 +528,8 @@ conn_setup(connectionObject *self, PGconn *pgconn)
CLEARPGRES(pgres);
}
if (!green) {
Py_UNBLOCK_THREADS;
pgres = PQexec(pgconn, psyco_transaction_isolation);
Py_BLOCK_THREADS;
} else {
pgres = psyco_exec_green(self, psyco_transaction_isolation);
}
if (pgres == NULL || PQresultStatus(pgres) != PGRES_TUPLES_OK) {
PyErr_SetString(OperationalError,
"can't fetch default_isolation_level");
IFCLEARPGRES(pgres);
Py_UNBLOCK_THREADS;
pthread_mutex_unlock(&self->lock);
Py_BLOCK_THREADS;
return -1;
}
self->isolation_level = conn_get_isolation_level(pgres);
/* for reset */
self->autocommit = 0;
Py_UNBLOCK_THREADS;
pthread_mutex_unlock(&self->lock);
@ -779,7 +814,7 @@ _conn_poll_setup_async(connectionObject *self)
* expected to manage the transactions himself, by sending
* (asynchronously) BEGIN and COMMIT statements.
*/
self->isolation_level = ISOLATION_LEVEL_AUTOCOMMIT;
self->autocommit = 1;
/* If the datestyle is ISO or anything else good,
* we can skip the CONN_STATUS_DATESTYLE step. */
@ -952,38 +987,114 @@ conn_rollback(connectionObject *self)
return res;
}
/* conn_set - set a guc parameter */
int
conn_set(connectionObject *self, const char *param, const char *value)
{
char query[256];
PGresult *pgres = NULL;
char *error = NULL;
int res = 1;
Dprintf("conn_set: setting %s to %s", param, value);
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&self->lock);
if (0 == strcmp(value, "default")) {
sprintf(query, "SET %s TO DEFAULT;", param);
}
else {
sprintf(query, "SET %s TO '%s';", param, value);
}
res = pq_execute_command_locked(self, query, &pgres, &error, &_save);
pthread_mutex_unlock(&self->lock);
Py_END_ALLOW_THREADS;
if (res < 0) {
pq_complete_error(self, &pgres, &error);
}
return res;
}
int
conn_set_autocommit(connectionObject *self, int value)
{
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&self->lock);
self->autocommit = value;
pthread_mutex_unlock(&self->lock);
Py_END_ALLOW_THREADS;
return 0;
}
/* conn_switch_isolation_level - switch isolation level on the connection */
int
conn_switch_isolation_level(connectionObject *self, int level)
{
PGresult *pgres = NULL;
char *error = NULL;
int res = 0;
int curr_level;
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&self->lock);
/* if the current isolation level is equal to the requested one don't switch */
if (self->isolation_level != level) {
/* if the current isolation level is > 0 we need to abort the current
transaction before changing; that all folks! */
if (self->isolation_level != ISOLATION_LEVEL_AUTOCOMMIT) {
res = pq_abort_locked(self, &pgres, &error, &_save);
/* use only supported levels on older PG versions */
if (self->server_version < 80000) {
if (level == 1 || level == 3) {
++level;
}
}
if (-1 == (curr_level = conn_get_isolation_level(self))) {
return -1;
}
if (curr_level == level) {
/* no need to change level */
return 0;
}
/* Emulate the previous semantic of set_isolation_level() using the
* functions currently available. */
/* terminate the current transaction if any */
pq_abort(self);
if (level == 0) {
if (0 != conn_set(self, "default_transaction_isolation", "default")) {
return -1;
}
if (0 != conn_set_autocommit(self, 1)) {
return -1;
}
}
else {
/* find the name of the requested level */
const IsolationLevel *isolevel = conn_isolevels;
while ((++isolevel)->name) {
if (level == isolevel->value) {
break;
}
}
if (!isolevel->name) {
PyErr_SetString(OperationalError, "bad isolation level value");
return -1;
}
if (0 != conn_set(self, "default_transaction_isolation", isolevel->name)) {
return -1;
}
if (0 != conn_set_autocommit(self, 0)) {
return -1;
}
}
self->isolation_level = level;
Dprintf("conn_switch_isolation_level: switched to level %d", level);
}
pthread_mutex_unlock(&self->lock);
Py_END_ALLOW_THREADS;
if (res < 0)
pq_complete_error(self, &pgres, &error);
return res;
return 0;
}
/* conn_set_client_encoding - switch client encoding on connection */

View File

@ -187,6 +187,7 @@ psyco_conn_tpc_begin(connectionObject *self, PyObject *args)
EXC_IF_CONN_CLOSED(self);
EXC_IF_CONN_ASYNC(self, tpc_begin);
EXC_IF_TPC_NOT_SUPPORTED(self);
EXC_IF_IN_TRANSACTION(self, tpc_begin);
if (!PyArg_ParseTuple(args, "O", &oxid)) {
goto exit;
@ -196,15 +197,8 @@ psyco_conn_tpc_begin(connectionObject *self, PyObject *args)
goto exit;
}
/* check we are not in a transaction */
if (self->status != CONN_STATUS_READY) {
PyErr_SetString(ProgrammingError,
"tpc_begin must be called outside a transaction");
goto exit;
}
/* two phase commit and autocommit make no point */
if (self->isolation_level == ISOLATION_LEVEL_AUTOCOMMIT) {
if (self->autocommit) {
PyErr_SetString(ProgrammingError,
"tpc_begin can't be called in autocommit mode");
goto exit;
@ -384,6 +378,200 @@ psyco_conn_tpc_recover(connectionObject *self, PyObject *args)
#ifdef PSYCOPG_EXTENSIONS
/* parse a python object into one of the possible isolation level values */
extern const IsolationLevel conn_isolevels[];
static const char *
_psyco_conn_parse_isolevel(connectionObject *self, PyObject *pyval)
{
const IsolationLevel *isolevel = NULL;
Py_INCREF(pyval); /* for ensure_bytes */
/* parse from one of the level constants */
if (PyInt_Check(pyval)) {
long level = PyInt_AsLong(pyval);
if (level == -1 && PyErr_Occurred()) { goto exit; }
if (level < 1 || level > 4) {
PyErr_SetString(PyExc_ValueError,
"isolation_level must be between 1 and 4");
goto exit;
}
isolevel = conn_isolevels + level;
}
/* parse from the string -- this includes "default" */
else {
isolevel = conn_isolevels;
while ((++isolevel)->name) {
if (!(pyval = psycopg_ensure_bytes(pyval))) {
goto exit;
}
if (0 == strcasecmp(isolevel->name, Bytes_AS_STRING(pyval))) {
break;
}
}
if (!isolevel->name) {
char msg[256];
snprintf(msg, sizeof(msg),
"bad value for isolation_level: '%s'", Bytes_AS_STRING(pyval));
PyErr_SetString(PyExc_ValueError, msg);
}
}
/* use only supported levels on older PG versions */
if (isolevel && self->server_version < 80000) {
if (isolevel->value == 1 || isolevel->value == 3) {
++isolevel;
}
}
exit:
Py_XDECREF(pyval);
return isolevel ? isolevel->name : NULL;
}
/* convert True/False/"default" into a C string */
static const char *
_psyco_conn_parse_onoff(PyObject *pyval)
{
int istrue = PyObject_IsTrue(pyval);
if (-1 == istrue) { return NULL; }
if (istrue) {
int cmp;
PyObject *pydef;
if (!(pydef = Text_FromUTF8("default"))) { return NULL; }
cmp = PyObject_RichCompareBool(pyval, pydef, Py_EQ);
Py_DECREF(pydef);
if (-1 == cmp) { return NULL; }
return cmp ? "default" : "on";
}
else {
return "off";
}
}
/* set_transaction - default transaction characteristics */
#define psyco_conn_set_transaction_doc \
"set_transaction(...) -- Set one or more parameters for the next transactions.\n\n" \
"Accepted arguments are 'isolation_level', 'readonly', 'deferrable', 'autocommit'."
static PyObject *
psyco_conn_set_transaction(connectionObject *self, PyObject *args, PyObject *kwargs)
{
PyObject *isolation_level = Py_None;
PyObject *readonly = Py_None;
PyObject *deferrable = Py_None;
PyObject *autocommit = Py_None;
static char *kwlist[] =
{"isolation_level", "readonly", "deferrable", "autocommit", NULL};
EXC_IF_CONN_CLOSED(self);
EXC_IF_CONN_ASYNC(self, set_transaction);
EXC_IF_IN_TRANSACTION(self, set_transaction);
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "|OOOO", kwlist,
&isolation_level, &readonly, &deferrable, &autocommit)) {
return NULL;
}
if (Py_None != isolation_level) {
const char *value = NULL;
if (!(value = _psyco_conn_parse_isolevel(self, isolation_level))) {
return NULL;
}
if (0 != conn_set(self, "default_transaction_isolation", value)) {
return NULL;
}
}
if (Py_None != readonly) {
const char *value = NULL;
if (!(value = _psyco_conn_parse_onoff(readonly))) {
return NULL;
}
if (0 != conn_set(self, "default_transaction_read_only", value)) {
return NULL;
}
}
if (Py_None != deferrable) {
const char *value = NULL;
if (!(value = _psyco_conn_parse_onoff(deferrable))) {
return NULL;
}
if (0 != conn_set(self, "default_transaction_deferrable", value)) {
return NULL;
}
}
if (Py_None != autocommit) {
int value = PyObject_IsTrue(autocommit);
if (-1 == value) { return NULL; }
if (0 != conn_set_autocommit(self, value)) {
return NULL;
}
}
Py_INCREF(Py_None);
return Py_None;
}
#define psyco_conn_autocommit_doc \
"set or return the autocommit status."
static PyObject *
psyco_conn_autocommit_get(connectionObject *self)
{
PyObject *ret;
ret = self->autocommit ? Py_True : Py_False;
Py_INCREF(ret);
return ret;
}
static PyObject *
_psyco_conn_autocommit_set_checks(connectionObject *self)
{
/* wrapper to use the EXC_IF macros.
* return NULL in case of error, else whatever */
EXC_IF_CONN_CLOSED(self);
EXC_IF_CONN_ASYNC(self, autocommit);
EXC_IF_IN_TRANSACTION(self, autocommit);
return Py_None; /* borrowed */
}
static int
psyco_conn_autocommit_set(connectionObject *self, PyObject *pyvalue)
{
int value;
if (!_psyco_conn_autocommit_set_checks(self)) { return -1; }
if (-1 == (value = PyObject_IsTrue(pyvalue))) { return -1; }
if (0 != conn_set_autocommit(self, value)) { return -1; }
return 0;
}
/* isolation_level - return the current isolation level */
static PyObject *
psyco_conn_isolation_level_get(connectionObject *self)
{
int rv = conn_get_isolation_level(self);
if (-1 == rv) { return NULL; }
return PyInt_FromLong((long)rv);
}
/* set_isolation_level method - switch connection isolation level */
#define psyco_conn_set_isolation_level_doc \
@ -717,6 +905,8 @@ static struct PyMethodDef connectionObject_methods[] = {
{"tpc_recover", (PyCFunction)psyco_conn_tpc_recover,
METH_NOARGS, psyco_conn_tpc_recover_doc},
#ifdef PSYCOPG_EXTENSIONS
{"set_transaction", (PyCFunction)psyco_conn_set_transaction,
METH_VARARGS|METH_KEYWORDS, psyco_conn_set_transaction_doc},
{"set_isolation_level", (PyCFunction)psyco_conn_set_isolation_level,
METH_VARARGS, psyco_conn_set_isolation_level_doc},
{"set_client_encoding", (PyCFunction)psyco_conn_set_client_encoding,
@ -749,9 +939,6 @@ static struct PyMemberDef connectionObject_members[] = {
#ifdef PSYCOPG_EXTENSIONS
{"closed", T_LONG, offsetof(connectionObject, closed), READONLY,
"True if the connection is closed."},
{"isolation_level", T_LONG,
offsetof(connectionObject, isolation_level), READONLY,
"The current isolation level."},
{"encoding", T_STRING, offsetof(connectionObject, encoding), READONLY,
"The current client encoding."},
{"notices", T_OBJECT, offsetof(connectionObject, notice_list), READONLY},
@ -792,6 +979,16 @@ static struct PyGetSetDef connectionObject_getsets[] = {
EXCEPTION_GETTER(IntegrityError),
EXCEPTION_GETTER(DataError),
EXCEPTION_GETTER(NotSupportedError),
#ifdef PSYCOPG_EXTENSIONS
{ "autocommit",
(getter)psyco_conn_autocommit_get,
(setter)psyco_conn_autocommit_set,
psyco_conn_autocommit_doc },
{ "isolation_level",
(getter)psyco_conn_isolation_level_get,
(setter)NULL,
"The current isolation level." },
#endif
{NULL}
};
#undef EXCEPTION_GETTER

View File

@ -456,7 +456,7 @@ psyco_curs_execute(cursorObject *self, PyObject *args, PyObject *kwargs)
NULL, NULL);
return NULL;
}
if (self->conn->isolation_level == ISOLATION_LEVEL_AUTOCOMMIT) {
if (self->conn->autocommit) {
psyco_set_error(ProgrammingError, self,
"can't use a named cursor outside of transactions", NULL, NULL);
return NULL;

View File

@ -76,7 +76,7 @@ HIDDEN int lobject_close(lobjectObject *self);
return NULL; }
#define EXC_IF_LOBJ_LEVEL0(self) \
if (self->conn->isolation_level == 0) { \
if (self->conn->autocommit) { \
psyco_set_error(ProgrammingError, NULL, \
"can't use a lobject outside of transactions", NULL, NULL); \
return NULL; \

View File

@ -252,7 +252,7 @@ lobject_close_locked(lobjectObject *self, char **error)
break;
}
if (self->conn->isolation_level == ISOLATION_LEVEL_AUTOCOMMIT ||
if (self->conn->autocommit ||
self->conn->mark != self->mark ||
self->fd == -1)
return 0;

View File

@ -51,7 +51,7 @@ psyco_lobj_close(lobjectObject *self, PyObject *args)
closing the current transaction is equivalent to close all the
opened large objects */
if (!lobject_is_closed(self)
&& self->conn->isolation_level != ISOLATION_LEVEL_AUTOCOMMIT
&& !self->conn->autocommit
&& self->conn->mark == self->mark)
{
Dprintf("psyco_lobj_close: closing lobject at %p", self);
@ -331,7 +331,7 @@ lobject_setup(lobjectObject *self, connectionObject *conn,
{
Dprintf("lobject_setup: init lobject object at %p", self);
if (conn->isolation_level == ISOLATION_LEVEL_AUTOCOMMIT) {
if (conn->autocommit) {
psyco_set_error(ProgrammingError, NULL,
"can't use a lobject outside of transactions", NULL, NULL);
return -1;

View File

@ -406,25 +406,17 @@ int
pq_begin_locked(connectionObject *conn, PGresult **pgres, char **error,
PyThreadState **tstate)
{
const char *query[] = {
NULL,
"BEGIN; SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED",
"BEGIN; SET TRANSACTION ISOLATION LEVEL READ COMMITTED",
"BEGIN; SET TRANSACTION ISOLATION LEVEL REPEATABLE READ",
"BEGIN; SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"};
int result;
Dprintf("pq_begin_locked: pgconn = %p, isolevel = %ld, status = %d",
conn->pgconn, conn->isolation_level, conn->status);
Dprintf("pq_begin_locked: pgconn = %p, autocommit = %d, status = %d",
conn->pgconn, conn->autocommit, conn->status);
if (conn->isolation_level == ISOLATION_LEVEL_AUTOCOMMIT
|| conn->status != CONN_STATUS_READY) {
if (conn->autocommit || conn->status != CONN_STATUS_READY) {
Dprintf("pq_begin_locked: transaction in progress");
return 0;
}
result = pq_execute_command_locked(conn, query[conn->isolation_level],
pgres, error, tstate);
result = pq_execute_command_locked(conn, "BEGIN;", pgres, error, tstate);
if (result == 0)
conn->status = CONN_STATUS_BEGIN;
@ -444,11 +436,10 @@ pq_commit(connectionObject *conn)
PGresult *pgres = NULL;
char *error = NULL;
Dprintf("pq_commit: pgconn = %p, isolevel = %ld, status = %d",
conn->pgconn, conn->isolation_level, conn->status);
Dprintf("pq_commit: pgconn = %p, autocommit = %d, status = %d",
conn->pgconn, conn->autocommit, conn->status);
if (conn->isolation_level == ISOLATION_LEVEL_AUTOCOMMIT
|| conn->status != CONN_STATUS_BEGIN) {
if (conn->autocommit || conn->status != CONN_STATUS_BEGIN) {
Dprintf("pq_commit: no transaction to commit");
return 0;
}
@ -480,11 +471,10 @@ pq_abort_locked(connectionObject *conn, PGresult **pgres, char **error,
{
int retvalue = -1;
Dprintf("pq_abort_locked: pgconn = %p, isolevel = %ld, status = %d",
conn->pgconn, conn->isolation_level, conn->status);
Dprintf("pq_abort_locked: pgconn = %p, autocommit = %d, status = %d",
conn->pgconn, conn->autocommit, conn->status);
if (conn->isolation_level == ISOLATION_LEVEL_AUTOCOMMIT
|| conn->status != CONN_STATUS_BEGIN) {
if (conn->autocommit || conn->status != CONN_STATUS_BEGIN) {
Dprintf("pq_abort_locked: no transaction to abort");
return 0;
}
@ -509,11 +499,10 @@ pq_abort(connectionObject *conn)
PGresult *pgres = NULL;
char *error = NULL;
Dprintf("pq_abort: pgconn = %p, isolevel = %ld, status = %d",
conn->pgconn, conn->isolation_level, conn->status);
Dprintf("pq_abort: pgconn = %p, autocommit = %d, status = %d",
conn->pgconn, conn->autocommit, conn->status);
if (conn->isolation_level == ISOLATION_LEVEL_AUTOCOMMIT
|| conn->status != CONN_STATUS_BEGIN) {
if (conn->autocommit || conn->status != CONN_STATUS_BEGIN) {
Dprintf("pq_abort: no transaction to abort");
return 0;
}
@ -549,13 +538,12 @@ pq_reset_locked(connectionObject *conn, PGresult **pgres, char **error,
{
int retvalue = -1;
Dprintf("pq_reset_locked: pgconn = %p, isolevel = %ld, status = %d",
conn->pgconn, conn->isolation_level, conn->status);
Dprintf("pq_reset_locked: pgconn = %p, autocommit = %d, status = %d",
conn->pgconn, conn->autocommit, conn->status);
conn->mark += 1;
if (conn->isolation_level != ISOLATION_LEVEL_AUTOCOMMIT
&& conn->status == CONN_STATUS_BEGIN) {
if (!conn->autocommit && conn->status == CONN_STATUS_BEGIN) {
retvalue = pq_execute_command_locked(conn, "ABORT", pgres, error, tstate);
if (retvalue != 0) return retvalue;
}
@ -580,8 +568,8 @@ pq_reset(connectionObject *conn)
PGresult *pgres = NULL;
char *error = NULL;
Dprintf("pq_reset: pgconn = %p, isolevel = %ld, status = %d",
conn->pgconn, conn->isolation_level, conn->status);
Dprintf("pq_reset: pgconn = %p, autocommit = %d, status = %d",
conn->pgconn, conn->autocommit, conn->status);
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&conn->lock);

View File

@ -105,6 +105,7 @@ typedef unsigned long Py_uhash_t;
#if PY_MAJOR_VERSION > 2
#define PyInt_Type PyLong_Type
#define PyInt_Check PyLong_Check
#define PyInt_AsLong PyLong_AsLong
#define PyInt_FromLong PyLong_FromLong
#define PyInt_FromSsize_t PyLong_FromSsize_t

View File

@ -25,7 +25,8 @@
import os
import time
import threading
from testutils import unittest, decorate_all_tests, skip_before_postgres
from testutils import unittest, decorate_all_tests
from testutils import skip_before_postgres, skip_after_postgres
from operator import attrgetter
import psycopg2
@ -203,14 +204,23 @@ class IsolationLevelsTestCase(unittest.TestCase):
conn = self.connect()
curs = conn.cursor()
for name, level in (
levels = (
(None, psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT),
('read uncommitted', psycopg2.extensions.ISOLATION_LEVEL_READ_UNCOMMITTED),
('read committed', psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED),
('repeatable read', psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ),
('serializable', psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE),
):
)
for name, level in levels:
conn.set_isolation_level(level)
# the only values available on prehistoric PG versions
if conn.server_version < 80000:
if level in (
psycopg2.extensions.ISOLATION_LEVEL_READ_UNCOMMITTED,
psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ):
name, level = levels[levels.index((name, level)) + 1]
self.assertEqual(conn.isolation_level, level)
curs.execute('show transaction_isolation;')
@ -707,6 +717,241 @@ from testutils import skip_if_tpc_disabled
decorate_all_tests(ConnectionTwoPhaseTests, skip_if_tpc_disabled)
class TransactionControlTests(unittest.TestCase):
def setUp(self):
self.conn = psycopg2.connect(dsn)
def tearDown(self):
if not self.conn.closed:
self.conn.close()
def test_not_in_transaction(self):
cur = self.conn.cursor()
cur.execute("select 1")
self.assertRaises(psycopg2.ProgrammingError,
self.conn.set_transaction,
psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
def test_set_isolation_level(self):
cur = self.conn.cursor()
self.conn.set_transaction(
psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
cur.execute("SHOW default_transaction_isolation;")
self.assertEqual(cur.fetchone()[0], 'serializable')
self.conn.rollback()
self.conn.set_transaction(
psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ)
cur.execute("SHOW default_transaction_isolation;")
if self.conn.server_version > 80000:
self.assertEqual(cur.fetchone()[0], 'repeatable read')
else:
self.assertEqual(cur.fetchone()[0], 'serializable')
self.conn.rollback()
self.conn.set_transaction(
isolation_level=psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
cur.execute("SHOW default_transaction_isolation;")
self.assertEqual(cur.fetchone()[0], 'read committed')
self.conn.rollback()
self.conn.set_transaction(
isolation_level=psycopg2.extensions.ISOLATION_LEVEL_READ_UNCOMMITTED)
cur.execute("SHOW default_transaction_isolation;")
if self.conn.server_version > 80000:
self.assertEqual(cur.fetchone()[0], 'read uncommitted')
else:
self.assertEqual(cur.fetchone()[0], 'read committed')
self.conn.rollback()
def test_set_isolation_level_str(self):
cur = self.conn.cursor()
self.conn.set_transaction("serializable")
cur.execute("SHOW default_transaction_isolation;")
self.assertEqual(cur.fetchone()[0], 'serializable')
self.conn.rollback()
self.conn.set_transaction("repeatable read")
cur.execute("SHOW default_transaction_isolation;")
if self.conn.server_version > 80000:
self.assertEqual(cur.fetchone()[0], 'repeatable read')
else:
self.assertEqual(cur.fetchone()[0], 'serializable')
self.conn.rollback()
self.conn.set_transaction("read committed")
cur.execute("SHOW default_transaction_isolation;")
self.assertEqual(cur.fetchone()[0], 'read committed')
self.conn.rollback()
self.conn.set_transaction("read uncommitted")
cur.execute("SHOW default_transaction_isolation;")
if self.conn.server_version > 80000:
self.assertEqual(cur.fetchone()[0], 'read uncommitted')
else:
self.assertEqual(cur.fetchone()[0], 'read committed')
self.conn.rollback()
def test_bad_isolation_level(self):
self.assertRaises(ValueError, self.conn.set_transaction, 0)
self.assertRaises(ValueError, self.conn.set_transaction, 5)
self.assertRaises(ValueError, self.conn.set_transaction, 'whatever')
def test_set_read_only(self):
cur = self.conn.cursor()
self.conn.set_transaction(readonly=True)
cur.execute("SHOW default_transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
self.conn.rollback()
cur.execute("SHOW default_transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
self.conn.rollback()
cur = self.conn.cursor()
self.conn.set_transaction(readonly=None)
cur.execute("SHOW default_transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
self.conn.rollback()
self.conn.set_transaction(readonly=False)
cur.execute("SHOW default_transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'off')
self.conn.rollback()
def test_set_default(self):
cur = self.conn.cursor()
cur.execute("SHOW default_transaction_isolation;")
default_isolevel = cur.fetchone()[0]
cur.execute("SHOW default_transaction_read_only;")
default_readonly = cur.fetchone()[0]
self.conn.rollback()
self.conn.set_transaction(isolation_level='serializable', readonly=True)
self.conn.set_transaction(isolation_level='default', readonly='default')
cur.execute("SHOW default_transaction_isolation;")
self.assertEqual(cur.fetchone()[0], default_isolevel)
cur.execute("SHOW default_transaction_read_only;")
self.assertEqual(cur.fetchone()[0], default_readonly)
@skip_before_postgres(9, 1)
def test_set_deferrable(self):
cur = self.conn.cursor()
self.conn.set_transaction(readonly=True, deferrable=True)
cur.execute("SHOW default_transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
cur.execute("SHOW default_transaction_deferrable;")
self.assertEqual(cur.fetchone()[0], 'on')
self.conn.rollback()
cur.execute("SHOW default_transaction_deferrable;")
self.assertEqual(cur.fetchone()[0], 'on')
self.conn.rollback()
self.conn.set_transaction(deferrable=False)
cur.execute("SHOW default_transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
cur.execute("SHOW default_transaction_deferrable;")
self.assertEqual(cur.fetchone()[0], 'off')
self.conn.rollback()
@skip_after_postgres(9, 1)
def test_set_deferrable_error(self):
self.assertRaises(psycopg2.ProgrammingError,
self.conn.set_transaction, readonly=True, deferrable=True)
class AutocommitTests(unittest.TestCase):
def setUp(self):
self.conn = psycopg2.connect(dsn)
def tearDown(self):
if not self.conn.closed:
self.conn.close()
def test_default_no_autocommit(self):
self.assert_(not self.conn.autocommit)
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_IDLE)
cur = self.conn.cursor()
cur.execute('select 1;')
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_BEGIN)
self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_INTRANS)
self.conn.rollback()
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_IDLE)
def test_set_autocommit(self):
self.conn.autocommit = True
self.assert_(self.conn.autocommit)
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_IDLE)
cur = self.conn.cursor()
cur.execute('select 1;')
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_IDLE)
self.conn.autocommit = False
self.assert_(not self.conn.autocommit)
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_IDLE)
cur.execute('select 1;')
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_BEGIN)
self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_INTRANS)
def test_set_intrans_error(self):
cur = self.conn.cursor()
cur.execute('select 1;')
self.assertRaises(psycopg2.ProgrammingError,
setattr, self.conn, 'autocommit', True)
def test_set_transaction_autocommit(self):
self.conn.set_transaction(autocommit=True)
self.assert_(self.conn.autocommit)
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_IDLE)
cur = self.conn.cursor()
cur.execute('select 1;')
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_IDLE)
self.conn.set_transaction(autocommit=False)
self.assert_(not self.conn.autocommit)
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_IDLE)
cur.execute('select 1;')
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_BEGIN)
self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_INTRANS)
self.conn.rollback()
self.conn.set_transaction('serializable', readonly=True, autocommit=True)
self.assert_(self.conn.autocommit)
cur.execute('select 1;')
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_IDLE)
cur.execute("SHOW default_transaction_isolation;")
self.assertEqual(cur.fetchone()[0], 'serializable')
cur.execute("SHOW default_transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)