connection.isolation_level is now writable

This commit is contained in:
Daniele Varrazzo 2017-02-16 02:07:05 +00:00
parent fb1a47c064
commit 20c9c17457
5 changed files with 256 additions and 108 deletions

1
NEWS
View File

@ -39,6 +39,7 @@ New features:
control the session characteristics as it may create problems with external control the session characteristics as it may create problems with external
connection pools such as pgbouncer; use :sql:`BEGIN` options instead connection pools such as pgbouncer; use :sql:`BEGIN` options instead
(:ticket:`#503`). (:ticket:`#503`).
- `~connection.isolation_level` is now writable.
Bug fixes: Bug fixes:

View File

@ -454,6 +454,28 @@ The ``connection`` class
.. versionadded:: 2.4.2 .. versionadded:: 2.4.2
.. attribute:: isolation_level
Return or set the `transaction isolation level`_ for the current
session. The value is one of the :ref:`isolation-level-constants`
defined in the `psycopg2.extensions` module. On set it is also
possible to use one of the literal values ``READ UNCOMMITTED``, ``READ
COMMITTED``, ``REPEATABLE READ``, ``SERIALIZABLE``, ``DEFAULT``.
.. versionchanged:: 2.7
the property is writable.
.. versionchanged:: 2.7
the default value for `!isolation_level` is
`~psycopg2.extensions.ISOLATION_LEVEL_DEFAULT`; previously the
property would have queried the server and returned the real value
applied. To know this value you can run a query such as :sql:`show
transaction_isolation`. Usually the default value is `READ
COMMITTED`, but this may be changed in the server configuration.
.. method:: set_isolation_level(level) .. method:: set_isolation_level(level)
.. note:: .. note::
@ -486,22 +508,6 @@ The ``connection`` class
an exception. an exception.
.. attribute:: isolation_level
Read the `transaction isolation level`_ for the current session. The
value is one of the :ref:`isolation-level-constants` defined in the
`psycopg2.extensions` module.
.. versionchanged:: 2.7
the default value for `!isolation_level` is
`~psycopg2.extensions.ISOLATION_LEVEL_DEFAULT`; previously the
property would have queried the server and returned the real value
applied. To know this value you can run a query such as :sql:`show
transaction_isolation`. Usually the default value is `READ
COMMITTED`, but this may be changed in the server configuration.
.. index:: .. index::
pair: Client; Encoding pair: Client; Encoding

View File

@ -72,7 +72,7 @@ ISOLATION_LEVEL_READ_UNCOMMITTED = 4
ISOLATION_LEVEL_READ_COMMITTED = 1 ISOLATION_LEVEL_READ_COMMITTED = 1
ISOLATION_LEVEL_REPEATABLE_READ = 2 ISOLATION_LEVEL_REPEATABLE_READ = 2
ISOLATION_LEVEL_SERIALIZABLE = 3 ISOLATION_LEVEL_SERIALIZABLE = 3
ISOLATION_LEVEL_DEFAULT = 5 ISOLATION_LEVEL_DEFAULT = None
"""psycopg connection status values.""" """psycopg connection status values."""

View File

@ -455,8 +455,14 @@ _psyco_conn_parse_isolevel(PyObject *pyval)
Py_INCREF(pyval); /* for ensure_bytes */ Py_INCREF(pyval); /* for ensure_bytes */
/* None is default. This is only used when setting the property, because
* set_session() has None used as "don't change" */
if (pyval == Py_None) {
rv = ISOLATION_LEVEL_DEFAULT;
}
/* parse from one of the level constants */ /* parse from one of the level constants */
if (PyInt_Check(pyval)) { else if (PyInt_Check(pyval)) {
level = PyInt_AsLong(pyval); level = PyInt_AsLong(pyval);
if (level == -1 && PyErr_Occurred()) { goto exit; } if (level == -1 && PyErr_Occurred()) { goto exit; }
if (level < 1 || level > 4) { if (level < 1 || level > 4) {
@ -469,7 +475,6 @@ _psyco_conn_parse_isolevel(PyObject *pyval)
} }
/* parse from the string -- this includes "default" */ /* parse from the string -- this includes "default" */
else { else {
if (!(pyval = psycopg_ensure_bytes(pyval))) { if (!(pyval = psycopg_ensure_bytes(pyval))) {
goto exit; goto exit;
@ -617,11 +622,11 @@ psyco_conn_autocommit_get(connectionObject *self)
} }
BORROWED static PyObject * BORROWED static PyObject *
_psyco_conn_autocommit_set_checks(connectionObject *self) _psyco_set_session_check_setter_wrapper(connectionObject *self)
{ {
/* wrapper to use the EXC_IF macros. /* wrapper to use the EXC_IF macros.
* return NULL in case of error, else whatever */ * return NULL in case of error, else whatever */
_set_session_checks(self, autocommit); _set_session_checks(self, set_session);
return Py_None; /* borrowed */ return Py_None; /* borrowed */
} }
@ -630,7 +635,7 @@ psyco_conn_autocommit_set(connectionObject *self, PyObject *pyvalue)
{ {
int value; int value;
if (!_psyco_conn_autocommit_set_checks(self)) { return -1; } if (!_psyco_set_session_check_setter_wrapper(self)) { return -1; }
if (-1 == (value = PyObject_IsTrue(pyvalue))) { return -1; } if (-1 == (value = PyObject_IsTrue(pyvalue))) { return -1; }
if (0 > conn_set_session(self, value, if (0 > conn_set_session(self, value,
self->isolevel, self->readonly, self->deferrable)) { self->isolevel, self->readonly, self->deferrable)) {
@ -643,6 +648,9 @@ psyco_conn_autocommit_set(connectionObject *self, PyObject *pyvalue)
/* isolation_level - return the current isolation level */ /* isolation_level - return the current isolation level */
#define psyco_conn_isolation_level_doc \
"Set or return the connection transaction isolation level."
static PyObject * static PyObject *
psyco_conn_isolation_level_get(connectionObject *self) psyco_conn_isolation_level_get(connectionObject *self)
{ {
@ -653,7 +661,29 @@ psyco_conn_isolation_level_get(connectionObject *self)
rv = conn_get_isolation_level(self); rv = conn_get_isolation_level(self);
if (-1 == rv) { return NULL; } if (-1 == rv) { return NULL; }
return PyInt_FromLong((long)rv); if (ISOLATION_LEVEL_DEFAULT == rv) {
Py_RETURN_NONE;
} else {
return PyInt_FromLong((long)rv);
}
}
/* isolation_level - set a new isolation level */
static int
psyco_conn_isolation_level_set(connectionObject *self, PyObject *pyvalue)
{
int value;
if (!_psyco_set_session_check_setter_wrapper(self)) { return -1; }
if (0 > (value = _psyco_conn_parse_isolevel(pyvalue))) { return -1; }
if (0 > conn_set_session(self, self->autocommit,
value, self->readonly, self->deferrable)) {
return -1;
}
return 0;
} }
@ -666,15 +696,25 @@ static PyObject *
psyco_conn_set_isolation_level(connectionObject *self, PyObject *args) psyco_conn_set_isolation_level(connectionObject *self, PyObject *args)
{ {
int level = 1; int level = 1;
PyObject *pyval = NULL;
_set_session_checks(self, set_isolation_level); _set_session_checks(self, set_isolation_level);
if (!PyArg_ParseTuple(args, "i", &level)) return NULL; if (!PyArg_ParseTuple(args, "O", &pyval)) return NULL;
if (level < 0 || level > 5) { if (pyval == Py_None) {
PyErr_SetString(PyExc_ValueError, level = ISOLATION_LEVEL_DEFAULT;
"isolation level must be between 0 and 4"); }
return NULL;
/* parse from one of the level constants */
else if (PyInt_Check(pyval)) {
level = PyInt_AsLong(pyval);
if (level < 0 || level > 4) {
PyErr_SetString(PyExc_ValueError,
"isolation level must be between 0 and 4");
return NULL;
}
} }
if (level == 0) { if (level == 0) {
@ -1103,8 +1143,8 @@ static struct PyGetSetDef connectionObject_getsets[] = {
psyco_conn_autocommit_doc }, psyco_conn_autocommit_doc },
{ "isolation_level", { "isolation_level",
(getter)psyco_conn_isolation_level_get, (getter)psyco_conn_isolation_level_get,
(setter)NULL, (setter)psyco_conn_isolation_level_set,
"The current isolation level." }, psyco_conn_isolation_level_doc },
{NULL} {NULL}
}; };
#undef EXCEPTION_GETTER #undef EXCEPTION_GETTER

View File

@ -222,7 +222,7 @@ class ConnectionTests(ConnectingTestCase):
self.conn.set_client_encoding("EUC_JP") self.conn.set_client_encoding("EUC_JP")
# conn.encoding is 'EUCJP' now. # conn.encoding is 'EUCJP' now.
cur = self.conn.cursor() cur = self.conn.cursor()
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE, cur) ext.register_type(ext.UNICODE, cur)
cur.execute("select 'foo'::text;") cur.execute("select 'foo'::text;")
self.assertEqual(cur.fetchone()[0], u'foo') self.assertEqual(cur.fetchone()[0], u'foo')
@ -308,14 +308,14 @@ class ConnectionTests(ConnectingTestCase):
# issue #210 # issue #210
conn = self.connect() conn = self.connect()
cur = conn.cursor(cursor_factory=None) cur = conn.cursor(cursor_factory=None)
self.assertEqual(type(cur), psycopg2.extensions.cursor) self.assertEqual(type(cur), ext.cursor)
conn = self.connect(cursor_factory=psycopg2.extras.DictCursor) conn = self.connect(cursor_factory=psycopg2.extras.DictCursor)
cur = conn.cursor(cursor_factory=None) cur = conn.cursor(cursor_factory=None)
self.assertEqual(type(cur), psycopg2.extras.DictCursor) self.assertEqual(type(cur), psycopg2.extras.DictCursor)
def test_failed_init_status(self): def test_failed_init_status(self):
class SubConnection(psycopg2.extensions.connection): class SubConnection(ext.connection):
def __init__(self, dsn): def __init__(self, dsn):
try: try:
super(SubConnection, self).__init__(dsn) super(SubConnection, self).__init__(dsn)
@ -488,23 +488,23 @@ class IsolationLevelsTestCase(ConnectingTestCase):
conn = self.connect() conn = self.connect()
self.assertEqual( self.assertEqual(
conn.isolation_level, conn.isolation_level,
psycopg2.extensions.ISOLATION_LEVEL_DEFAULT) ext.ISOLATION_LEVEL_DEFAULT)
def test_encoding(self): def test_encoding(self):
conn = self.connect() conn = self.connect()
self.assert_(conn.encoding in psycopg2.extensions.encodings) self.assert_(conn.encoding in ext.encodings)
def test_set_isolation_level(self): def test_set_isolation_level(self):
conn = self.connect() conn = self.connect()
curs = conn.cursor() curs = conn.cursor()
levels = [ levels = [
(None, psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT), (None, ext.ISOLATION_LEVEL_AUTOCOMMIT),
('read uncommitted', ('read uncommitted',
psycopg2.extensions.ISOLATION_LEVEL_READ_UNCOMMITTED), ext.ISOLATION_LEVEL_READ_UNCOMMITTED),
('read committed', psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED), ('read committed', ext.ISOLATION_LEVEL_READ_COMMITTED),
('repeatable read', psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ), ('repeatable read', ext.ISOLATION_LEVEL_REPEATABLE_READ),
('serializable', psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE), ('serializable', ext.ISOLATION_LEVEL_SERIALIZABLE),
] ]
for name, level in levels: for name, level in levels:
conn.set_isolation_level(level) conn.set_isolation_level(level)
@ -512,8 +512,8 @@ class IsolationLevelsTestCase(ConnectingTestCase):
# the only values available on prehistoric PG versions # the only values available on prehistoric PG versions
if conn.server_version < 80000: if conn.server_version < 80000:
if level in ( if level in (
psycopg2.extensions.ISOLATION_LEVEL_READ_UNCOMMITTED, ext.ISOLATION_LEVEL_READ_UNCOMMITTED,
psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ): ext.ISOLATION_LEVEL_REPEATABLE_READ):
name, level = levels[levels.index((name, level)) + 1] name, level = levels[levels.index((name, level)) + 1]
self.assertEqual(conn.isolation_level, level) self.assertEqual(conn.isolation_level, level)
@ -529,7 +529,7 @@ class IsolationLevelsTestCase(ConnectingTestCase):
conn.commit() conn.commit()
self.assertRaises(ValueError, conn.set_isolation_level, -1) self.assertRaises(ValueError, conn.set_isolation_level, -1)
self.assertRaises(ValueError, conn.set_isolation_level, 6) self.assertRaises(ValueError, conn.set_isolation_level, 5)
def test_set_isolation_level_default(self): def test_set_isolation_level_default(self):
conn = self.connect() conn = self.connect()
@ -539,14 +539,14 @@ class IsolationLevelsTestCase(ConnectingTestCase):
curs.execute("set default_transaction_isolation to 'read committed'") curs.execute("set default_transaction_isolation to 'read committed'")
conn.autocommit = False conn.autocommit = False
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) conn.set_isolation_level(ext.ISOLATION_LEVEL_SERIALIZABLE)
self.assertEqual(conn.isolation_level, self.assertEqual(conn.isolation_level,
psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) ext.ISOLATION_LEVEL_SERIALIZABLE)
curs.execute("show transaction_isolation") curs.execute("show transaction_isolation")
self.assertEqual(curs.fetchone()[0], "serializable") self.assertEqual(curs.fetchone()[0], "serializable")
conn.rollback() conn.rollback()
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_DEFAULT) conn.set_isolation_level(ext.ISOLATION_LEVEL_DEFAULT)
curs.execute("show transaction_isolation") curs.execute("show transaction_isolation")
self.assertEqual(curs.fetchone()[0], "read committed") self.assertEqual(curs.fetchone()[0], "read committed")
@ -554,25 +554,25 @@ class IsolationLevelsTestCase(ConnectingTestCase):
conn = self.connect() conn = self.connect()
cur = conn.cursor() cur = conn.cursor()
self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE, self.assertEqual(ext.TRANSACTION_STATUS_IDLE,
conn.get_transaction_status()) conn.get_transaction_status())
cur.execute("insert into isolevel values (10);") cur.execute("insert into isolevel values (10);")
self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_INTRANS, self.assertEqual(ext.TRANSACTION_STATUS_INTRANS,
conn.get_transaction_status()) conn.get_transaction_status())
# changed in psycopg 2.7 # changed in psycopg 2.7
self.assertRaises(psycopg2.ProgrammingError, self.assertRaises(psycopg2.ProgrammingError,
conn.set_isolation_level, conn.set_isolation_level,
psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) ext.ISOLATION_LEVEL_SERIALIZABLE)
self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_INTRANS, self.assertEqual(ext.TRANSACTION_STATUS_INTRANS,
conn.get_transaction_status()) conn.get_transaction_status())
self.assertEqual(conn.isolation_level, self.assertEqual(conn.isolation_level,
psycopg2.extensions.ISOLATION_LEVEL_DEFAULT) ext.ISOLATION_LEVEL_DEFAULT)
def test_isolation_level_autocommit(self): def test_isolation_level_autocommit(self):
cnn1 = self.connect() cnn1 = self.connect()
cnn2 = self.connect() cnn2 = self.connect()
cnn2.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) cnn2.set_isolation_level(ext.ISOLATION_LEVEL_AUTOCOMMIT)
cur1 = cnn1.cursor() cur1 = cnn1.cursor()
cur1.execute("select count(*) from isolevel;") cur1.execute("select count(*) from isolevel;")
@ -588,7 +588,7 @@ class IsolationLevelsTestCase(ConnectingTestCase):
def test_isolation_level_read_committed(self): def test_isolation_level_read_committed(self):
cnn1 = self.connect() cnn1 = self.connect()
cnn2 = self.connect() cnn2 = self.connect()
cnn2.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED) cnn2.set_isolation_level(ext.ISOLATION_LEVEL_READ_COMMITTED)
cur1 = cnn1.cursor() cur1 = cnn1.cursor()
cur1.execute("select count(*) from isolevel;") cur1.execute("select count(*) from isolevel;")
@ -614,7 +614,7 @@ class IsolationLevelsTestCase(ConnectingTestCase):
def test_isolation_level_serializable(self): def test_isolation_level_serializable(self):
cnn1 = self.connect() cnn1 = self.connect()
cnn2 = self.connect() cnn2 = self.connect()
cnn2.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) cnn2.set_isolation_level(ext.ISOLATION_LEVEL_SERIALIZABLE)
cur1 = cnn1.cursor() cur1 = cnn1.cursor()
cur1.execute("select count(*) from isolevel;") cur1.execute("select count(*) from isolevel;")
@ -650,6 +650,107 @@ class IsolationLevelsTestCase(ConnectingTestCase):
self.assertRaises(psycopg2.InterfaceError, self.assertRaises(psycopg2.InterfaceError,
cnn.set_isolation_level, 1) cnn.set_isolation_level, 1)
def test_setattr_isolation_level_int(self):
cur = self.conn.cursor()
self.conn.isolation_level = ext.ISOLATION_LEVEL_SERIALIZABLE
self.assertEqual(self.conn.isolation_level, ext.ISOLATION_LEVEL_SERIALIZABLE)
cur.execute("SHOW transaction_isolation;")
self.assertEqual(cur.fetchone()[0], 'serializable')
self.conn.rollback()
self.conn.isolation_level = ext.ISOLATION_LEVEL_REPEATABLE_READ
cur.execute("SHOW transaction_isolation;")
if self.conn.server_version > 80000:
self.assertEqual(self.conn.isolation_level,
ext.ISOLATION_LEVEL_REPEATABLE_READ)
self.assertEqual(cur.fetchone()[0], 'repeatable read')
else:
self.assertEqual(self.conn.isolation_level,
ext.ISOLATION_LEVEL_SERIALIZABLE)
self.assertEqual(cur.fetchone()[0], 'serializable')
self.conn.rollback()
self.conn.isolation_level = ext.ISOLATION_LEVEL_READ_COMMITTED
self.assertEqual(self.conn.isolation_level,
ext.ISOLATION_LEVEL_READ_COMMITTED)
cur.execute("SHOW transaction_isolation;")
self.assertEqual(cur.fetchone()[0], 'read committed')
self.conn.rollback()
self.conn.isolation_level = ext.ISOLATION_LEVEL_READ_UNCOMMITTED
cur.execute("SHOW transaction_isolation;")
if self.conn.server_version > 80000:
self.assertEqual(self.conn.isolation_level,
ext.ISOLATION_LEVEL_READ_UNCOMMITTED)
self.assertEqual(cur.fetchone()[0], 'read uncommitted')
else:
self.assertEqual(self.conn.isolation_level,
ext.ISOLATION_LEVEL_READ_COMMITTED)
self.assertEqual(cur.fetchone()[0], 'read committed')
self.conn.rollback()
self.assertEqual(ext.ISOLATION_LEVEL_DEFAULT, None)
self.conn.isolation_level = ext.ISOLATION_LEVEL_DEFAULT
self.assertEqual(self.conn.isolation_level, None)
cur.execute("SHOW transaction_isolation;")
isol = cur.fetchone()[0]
cur.execute("SHOW default_transaction_isolation;")
self.assertEqual(cur.fetchone()[0], isol)
def test_setattr_isolation_level_str(self):
cur = self.conn.cursor()
self.conn.isolation_level = "serializable"
self.assertEqual(self.conn.isolation_level, ext.ISOLATION_LEVEL_SERIALIZABLE)
cur.execute("SHOW transaction_isolation;")
self.assertEqual(cur.fetchone()[0], 'serializable')
self.conn.rollback()
self.conn.isolation_level = "repeatable read"
cur.execute("SHOW transaction_isolation;")
if self.conn.server_version > 80000:
self.assertEqual(self.conn.isolation_level,
ext.ISOLATION_LEVEL_REPEATABLE_READ)
self.assertEqual(cur.fetchone()[0], 'repeatable read')
else:
self.assertEqual(self.conn.isolation_level,
ext.ISOLATION_LEVEL_SERIALIZABLE)
self.assertEqual(cur.fetchone()[0], 'serializable')
self.conn.rollback()
self.conn.isolation_level = "read committed"
self.assertEqual(self.conn.isolation_level,
ext.ISOLATION_LEVEL_READ_COMMITTED)
cur.execute("SHOW transaction_isolation;")
self.assertEqual(cur.fetchone()[0], 'read committed')
self.conn.rollback()
self.conn.isolation_level = "read uncommitted"
cur.execute("SHOW transaction_isolation;")
if self.conn.server_version > 80000:
self.assertEqual(self.conn.isolation_level,
ext.ISOLATION_LEVEL_READ_UNCOMMITTED)
self.assertEqual(cur.fetchone()[0], 'read uncommitted')
else:
self.assertEqual(self.conn.isolation_level,
ext.ISOLATION_LEVEL_READ_COMMITTED)
self.assertEqual(cur.fetchone()[0], 'read committed')
self.conn.rollback()
self.conn.isolation_level = "default"
self.assertEqual(self.conn.isolation_level, None)
cur.execute("SHOW transaction_isolation;")
isol = cur.fetchone()[0]
cur.execute("SHOW default_transaction_isolation;")
self.assertEqual(cur.fetchone()[0], isol)
def test_setattr_isolation_level_invalid(self):
self.assertRaises(ValueError, setattr, self.conn, 'isolation_level', 0)
self.assertRaises(ValueError, setattr, self.conn, 'isolation_level', -1)
self.assertRaises(ValueError, setattr, self.conn, 'isolation_level', 5)
self.assertRaises(ValueError, setattr, self.conn, 'isolation_level', 'bah')
class ConnectionTwoPhaseTests(ConnectingTestCase): class ConnectionTwoPhaseTests(ConnectingTestCase):
def setUp(self): def setUp(self):
@ -716,10 +817,10 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
def test_tpc_commit(self): def test_tpc_commit(self):
cnn = self.connect() cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual") xid = cnn.xid(1, "gtrid", "bqual")
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(cnn.status, ext.STATUS_READY)
cnn.tpc_begin(xid) cnn.tpc_begin(xid)
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_BEGIN) self.assertEqual(cnn.status, ext.STATUS_BEGIN)
cur = cnn.cursor() cur = cnn.cursor()
cur.execute("insert into test_tpc values ('test_tpc_commit');") cur.execute("insert into test_tpc values ('test_tpc_commit');")
@ -727,22 +828,22 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
self.assertEqual(0, self.count_test_records()) self.assertEqual(0, self.count_test_records())
cnn.tpc_prepare() cnn.tpc_prepare()
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_PREPARED) self.assertEqual(cnn.status, ext.STATUS_PREPARED)
self.assertEqual(1, self.count_xacts()) self.assertEqual(1, self.count_xacts())
self.assertEqual(0, self.count_test_records()) self.assertEqual(0, self.count_test_records())
cnn.tpc_commit() cnn.tpc_commit()
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(cnn.status, ext.STATUS_READY)
self.assertEqual(0, self.count_xacts()) self.assertEqual(0, self.count_xacts())
self.assertEqual(1, self.count_test_records()) self.assertEqual(1, self.count_test_records())
def test_tpc_commit_one_phase(self): def test_tpc_commit_one_phase(self):
cnn = self.connect() cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual") xid = cnn.xid(1, "gtrid", "bqual")
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(cnn.status, ext.STATUS_READY)
cnn.tpc_begin(xid) cnn.tpc_begin(xid)
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_BEGIN) self.assertEqual(cnn.status, ext.STATUS_BEGIN)
cur = cnn.cursor() cur = cnn.cursor()
cur.execute("insert into test_tpc values ('test_tpc_commit_1p');") cur.execute("insert into test_tpc values ('test_tpc_commit_1p');")
@ -750,17 +851,17 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
self.assertEqual(0, self.count_test_records()) self.assertEqual(0, self.count_test_records())
cnn.tpc_commit() cnn.tpc_commit()
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(cnn.status, ext.STATUS_READY)
self.assertEqual(0, self.count_xacts()) self.assertEqual(0, self.count_xacts())
self.assertEqual(1, self.count_test_records()) self.assertEqual(1, self.count_test_records())
def test_tpc_commit_recovered(self): def test_tpc_commit_recovered(self):
cnn = self.connect() cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual") xid = cnn.xid(1, "gtrid", "bqual")
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(cnn.status, ext.STATUS_READY)
cnn.tpc_begin(xid) cnn.tpc_begin(xid)
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_BEGIN) self.assertEqual(cnn.status, ext.STATUS_BEGIN)
cur = cnn.cursor() cur = cnn.cursor()
cur.execute("insert into test_tpc values ('test_tpc_commit_rec');") cur.execute("insert into test_tpc values ('test_tpc_commit_rec');")
@ -776,17 +877,17 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
xid = cnn.xid(1, "gtrid", "bqual") xid = cnn.xid(1, "gtrid", "bqual")
cnn.tpc_commit(xid) cnn.tpc_commit(xid)
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(cnn.status, ext.STATUS_READY)
self.assertEqual(0, self.count_xacts()) self.assertEqual(0, self.count_xacts())
self.assertEqual(1, self.count_test_records()) self.assertEqual(1, self.count_test_records())
def test_tpc_rollback(self): def test_tpc_rollback(self):
cnn = self.connect() cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual") xid = cnn.xid(1, "gtrid", "bqual")
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(cnn.status, ext.STATUS_READY)
cnn.tpc_begin(xid) cnn.tpc_begin(xid)
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_BEGIN) self.assertEqual(cnn.status, ext.STATUS_BEGIN)
cur = cnn.cursor() cur = cnn.cursor()
cur.execute("insert into test_tpc values ('test_tpc_rollback');") cur.execute("insert into test_tpc values ('test_tpc_rollback');")
@ -794,22 +895,22 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
self.assertEqual(0, self.count_test_records()) self.assertEqual(0, self.count_test_records())
cnn.tpc_prepare() cnn.tpc_prepare()
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_PREPARED) self.assertEqual(cnn.status, ext.STATUS_PREPARED)
self.assertEqual(1, self.count_xacts()) self.assertEqual(1, self.count_xacts())
self.assertEqual(0, self.count_test_records()) self.assertEqual(0, self.count_test_records())
cnn.tpc_rollback() cnn.tpc_rollback()
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(cnn.status, ext.STATUS_READY)
self.assertEqual(0, self.count_xacts()) self.assertEqual(0, self.count_xacts())
self.assertEqual(0, self.count_test_records()) self.assertEqual(0, self.count_test_records())
def test_tpc_rollback_one_phase(self): def test_tpc_rollback_one_phase(self):
cnn = self.connect() cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual") xid = cnn.xid(1, "gtrid", "bqual")
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(cnn.status, ext.STATUS_READY)
cnn.tpc_begin(xid) cnn.tpc_begin(xid)
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_BEGIN) self.assertEqual(cnn.status, ext.STATUS_BEGIN)
cur = cnn.cursor() cur = cnn.cursor()
cur.execute("insert into test_tpc values ('test_tpc_rollback_1p');") cur.execute("insert into test_tpc values ('test_tpc_rollback_1p');")
@ -817,17 +918,17 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
self.assertEqual(0, self.count_test_records()) self.assertEqual(0, self.count_test_records())
cnn.tpc_rollback() cnn.tpc_rollback()
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(cnn.status, ext.STATUS_READY)
self.assertEqual(0, self.count_xacts()) self.assertEqual(0, self.count_xacts())
self.assertEqual(0, self.count_test_records()) self.assertEqual(0, self.count_test_records())
def test_tpc_rollback_recovered(self): def test_tpc_rollback_recovered(self):
cnn = self.connect() cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual") xid = cnn.xid(1, "gtrid", "bqual")
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(cnn.status, ext.STATUS_READY)
cnn.tpc_begin(xid) cnn.tpc_begin(xid)
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_BEGIN) self.assertEqual(cnn.status, ext.STATUS_BEGIN)
cur = cnn.cursor() cur = cnn.cursor()
cur.execute("insert into test_tpc values ('test_tpc_commit_rec');") cur.execute("insert into test_tpc values ('test_tpc_commit_rec');")
@ -843,21 +944,21 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
xid = cnn.xid(1, "gtrid", "bqual") xid = cnn.xid(1, "gtrid", "bqual")
cnn.tpc_rollback(xid) cnn.tpc_rollback(xid)
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(cnn.status, ext.STATUS_READY)
self.assertEqual(0, self.count_xacts()) self.assertEqual(0, self.count_xacts())
self.assertEqual(0, self.count_test_records()) self.assertEqual(0, self.count_test_records())
def test_status_after_recover(self): def test_status_after_recover(self):
cnn = self.connect() cnn = self.connect()
self.assertEqual(psycopg2.extensions.STATUS_READY, cnn.status) self.assertEqual(ext.STATUS_READY, cnn.status)
cnn.tpc_recover() cnn.tpc_recover()
self.assertEqual(psycopg2.extensions.STATUS_READY, cnn.status) self.assertEqual(ext.STATUS_READY, cnn.status)
cur = cnn.cursor() cur = cnn.cursor()
cur.execute("select 1") cur.execute("select 1")
self.assertEqual(psycopg2.extensions.STATUS_BEGIN, cnn.status) self.assertEqual(ext.STATUS_BEGIN, cnn.status)
cnn.tpc_recover() cnn.tpc_recover()
self.assertEqual(psycopg2.extensions.STATUS_BEGIN, cnn.status) self.assertEqual(ext.STATUS_BEGIN, cnn.status)
def test_recovered_xids(self): def test_recovered_xids(self):
# insert a few test xns # insert a few test xns
@ -1030,25 +1131,25 @@ class TransactionControlTests(ConnectingTestCase):
self.conn.close() self.conn.close()
self.assertRaises(psycopg2.InterfaceError, self.assertRaises(psycopg2.InterfaceError,
self.conn.set_session, self.conn.set_session,
psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) ext.ISOLATION_LEVEL_SERIALIZABLE)
def test_not_in_transaction(self): def test_not_in_transaction(self):
cur = self.conn.cursor() cur = self.conn.cursor()
cur.execute("select 1") cur.execute("select 1")
self.assertRaises(psycopg2.ProgrammingError, self.assertRaises(psycopg2.ProgrammingError,
self.conn.set_session, self.conn.set_session,
psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) ext.ISOLATION_LEVEL_SERIALIZABLE)
def test_set_isolation_level(self): def test_set_isolation_level(self):
cur = self.conn.cursor() cur = self.conn.cursor()
self.conn.set_session( self.conn.set_session(
psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) ext.ISOLATION_LEVEL_SERIALIZABLE)
cur.execute("SHOW transaction_isolation;") cur.execute("SHOW transaction_isolation;")
self.assertEqual(cur.fetchone()[0], 'serializable') self.assertEqual(cur.fetchone()[0], 'serializable')
self.conn.rollback() self.conn.rollback()
self.conn.set_session( self.conn.set_session(
psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ) ext.ISOLATION_LEVEL_REPEATABLE_READ)
cur.execute("SHOW transaction_isolation;") cur.execute("SHOW transaction_isolation;")
if self.conn.server_version > 80000: if self.conn.server_version > 80000:
self.assertEqual(cur.fetchone()[0], 'repeatable read') self.assertEqual(cur.fetchone()[0], 'repeatable read')
@ -1057,13 +1158,13 @@ class TransactionControlTests(ConnectingTestCase):
self.conn.rollback() self.conn.rollback()
self.conn.set_session( self.conn.set_session(
isolation_level=psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED) isolation_level=ext.ISOLATION_LEVEL_READ_COMMITTED)
cur.execute("SHOW transaction_isolation;") cur.execute("SHOW transaction_isolation;")
self.assertEqual(cur.fetchone()[0], 'read committed') self.assertEqual(cur.fetchone()[0], 'read committed')
self.conn.rollback() self.conn.rollback()
self.conn.set_session( self.conn.set_session(
isolation_level=psycopg2.extensions.ISOLATION_LEVEL_READ_UNCOMMITTED) isolation_level=ext.ISOLATION_LEVEL_READ_UNCOMMITTED)
cur.execute("SHOW transaction_isolation;") cur.execute("SHOW transaction_isolation;")
if self.conn.server_version > 80000: if self.conn.server_version > 80000:
self.assertEqual(cur.fetchone()[0], 'read uncommitted') self.assertEqual(cur.fetchone()[0], 'read uncommitted')
@ -1183,44 +1284,44 @@ class AutocommitTests(ConnectingTestCase):
def test_default_no_autocommit(self): def test_default_no_autocommit(self):
self.assert_(not self.conn.autocommit) self.assert_(not self.conn.autocommit)
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(), self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_IDLE) ext.TRANSACTION_STATUS_IDLE)
cur = self.conn.cursor() cur = self.conn.cursor()
cur.execute('select 1;') cur.execute('select 1;')
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_BEGIN) self.assertEqual(self.conn.status, ext.STATUS_BEGIN)
self.assertEqual(self.conn.get_transaction_status(), self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_INTRANS) ext.TRANSACTION_STATUS_INTRANS)
self.conn.rollback() self.conn.rollback()
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(), self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_IDLE) ext.TRANSACTION_STATUS_IDLE)
def test_set_autocommit(self): def test_set_autocommit(self):
self.conn.autocommit = True self.conn.autocommit = True
self.assert_(self.conn.autocommit) self.assert_(self.conn.autocommit)
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(), self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_IDLE) ext.TRANSACTION_STATUS_IDLE)
cur = self.conn.cursor() cur = self.conn.cursor()
cur.execute('select 1;') cur.execute('select 1;')
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(), self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_IDLE) ext.TRANSACTION_STATUS_IDLE)
self.conn.autocommit = False self.conn.autocommit = False
self.assert_(not self.conn.autocommit) self.assert_(not self.conn.autocommit)
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(), self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_IDLE) ext.TRANSACTION_STATUS_IDLE)
cur.execute('select 1;') cur.execute('select 1;')
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_BEGIN) self.assertEqual(self.conn.status, ext.STATUS_BEGIN)
self.assertEqual(self.conn.get_transaction_status(), self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_INTRANS) ext.TRANSACTION_STATUS_INTRANS)
def test_set_intrans_error(self): def test_set_intrans_error(self):
cur = self.conn.cursor() cur = self.conn.cursor()
@ -1231,34 +1332,34 @@ class AutocommitTests(ConnectingTestCase):
def test_set_session_autocommit(self): def test_set_session_autocommit(self):
self.conn.set_session(autocommit=True) self.conn.set_session(autocommit=True)
self.assert_(self.conn.autocommit) self.assert_(self.conn.autocommit)
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(), self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_IDLE) ext.TRANSACTION_STATUS_IDLE)
cur = self.conn.cursor() cur = self.conn.cursor()
cur.execute('select 1;') cur.execute('select 1;')
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(), self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_IDLE) ext.TRANSACTION_STATUS_IDLE)
self.conn.set_session(autocommit=False) self.conn.set_session(autocommit=False)
self.assert_(not self.conn.autocommit) self.assert_(not self.conn.autocommit)
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(), self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_IDLE) ext.TRANSACTION_STATUS_IDLE)
cur.execute('select 1;') cur.execute('select 1;')
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_BEGIN) self.assertEqual(self.conn.status, ext.STATUS_BEGIN)
self.assertEqual(self.conn.get_transaction_status(), self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_INTRANS) ext.TRANSACTION_STATUS_INTRANS)
self.conn.rollback() self.conn.rollback()
self.conn.set_session('serializable', readonly=True, autocommit=True) self.conn.set_session('serializable', readonly=True, autocommit=True)
self.assert_(self.conn.autocommit) self.assert_(self.conn.autocommit)
cur.execute('select 1;') cur.execute('select 1;')
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(), self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_IDLE) ext.TRANSACTION_STATUS_IDLE)
cur.execute("SHOW transaction_isolation;") cur.execute("SHOW transaction_isolation;")
self.assertEqual(cur.fetchone()[0], 'serializable') self.assertEqual(cur.fetchone()[0], 'serializable')
cur.execute("SHOW transaction_read_only;") cur.execute("SHOW transaction_read_only;")