Merge branch 'session-attributes'

This commit is contained in:
Daniele Varrazzo 2017-02-16 14:50:21 +00:00
commit 82adf8a162
9 changed files with 577 additions and 196 deletions

5
NEWS
View File

@ -39,6 +39,9 @@ New features:
control the session characteristics as it may create problems with external control the session characteristics as it may create problems with external
connection pools such as pgbouncer; use :sql:`BEGIN` options instead connection pools such as pgbouncer; use :sql:`BEGIN` options instead
(:ticket:`#503`). (:ticket:`#503`).
- `~connection.isolation_level` is now writable and entirely separated from
`~connection.autocommit`; added `~connection.readonly`,
`connection.deferrable` writable attributes.
Bug fixes: Bug fixes:
@ -53,8 +56,6 @@ Other changes:
- `~connection.isolation_level` doesn't read from the database but will return - `~connection.isolation_level` doesn't read from the database but will return
`~psycopg2.extensions.ISOLATION_LEVEL_DEFAULT` if no value was set on the `~psycopg2.extensions.ISOLATION_LEVEL_DEFAULT` if no value was set on the
connection. connection.
- `~connection.set_isolation_level()` will throw an exception if executed
inside a transaction; previously it would have silently rolled it back.
- Empty arrays no more converted into lists if they don't have a type attached - Empty arrays no more converted into lists if they don't have a type attached
(:ticket:`#506`) (:ticket:`#506`)

View File

@ -386,12 +386,6 @@ The ``connection`` class
The function must be invoked with no transaction in progress. The function must be invoked with no transaction in progress.
.. note::
There is currently no builtin method to read the current value for
the parameters: use :sql:`SHOW default_transaction_...` to read
the values from the backend.
.. seealso:: |SET TRANSACTION|_ for further details about the behaviour .. seealso:: |SET TRANSACTION|_ for further details about the behaviour
of the transaction parameters in the server. of the transaction parameters in the server.
@ -454,12 +448,64 @@ The ``connection`` class
.. versionadded:: 2.4.2 .. versionadded:: 2.4.2
.. attribute:: isolation_level
Return or set the `transaction isolation level`_ for the current
session. The value is one of the :ref:`isolation-level-constants`
defined in the `psycopg2.extensions` module. On set it is also
possible to use one of the literal values ``READ UNCOMMITTED``, ``READ
COMMITTED``, ``REPEATABLE READ``, ``SERIALIZABLE``, ``DEFAULT``.
.. versionchanged:: 2.7
the property is writable.
.. versionchanged:: 2.7
the default value for `!isolation_level` is
`~psycopg2.extensions.ISOLATION_LEVEL_DEFAULT`; previously the
property would have queried the server and returned the real value
applied. To know this value you can run a query such as :sql:`show
transaction_isolation`. Usually the default value is `READ
COMMITTED`, but this may be changed in the server configuration.
This value is now entirely separate from the `autocommit`
property: in previous version, if `!autocommit` was set to `!True`
this property would have returned
`~psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT`; it will now
return the server isolation level.
.. attribute:: readonly
Return or set the read-only status for the current session. Available
values are `!True` (new transactions will be in read-only mode),
`!False` (new transactions will be writable), `!None` (use the default
configured for the server by :sql:`default_transaction_read_only`).
.. versionadded:: 2.7
.. attribute:: deferrable
Return or set the `deferrable status`__ for the current session.
Available values are `!True` (new transactions will be in deferrable
mode), `!False` (new transactions will be in non deferrable mode),
`!None` (use the default configured for the server by
:sql:`default_transaction_deferrable`).
.. __: `SET TRANSACTION`_
.. versionadded:: 2.7
.. method:: set_isolation_level(level) .. method:: set_isolation_level(level)
.. note:: .. note::
From version 2.4.2, `set_session()` and `autocommit` offer This is a legacy method mixing `~conn.isolation_level` and
finer control on the transaction characteristics. `~conn.autocommit`. Using the respective properties is a better
option.
Set the `transaction isolation level`_ for the current session. Set the `transaction isolation level`_ for the current session.
The level defines the different phenomena that can happen in the The level defines the different phenomena that can happen in the
@ -479,28 +525,6 @@ The ``connection`` class
See also :ref:`transactions-control`. See also :ref:`transactions-control`.
.. versionchanged:: 2.7
the function must be called outside a transaction; previously it
would have executed an implicit :sql:`ROLLBACK`; it will now raise
an exception.
.. attribute:: isolation_level
Read the `transaction isolation level`_ for the current session. The
value is one of the :ref:`isolation-level-constants` defined in the
`psycopg2.extensions` module.
.. versionchanged:: 2.7
the default value for `!isolation_level` is
`~psycopg2.extensions.ISOLATION_LEVEL_DEFAULT`; previously the
property would have queried the server and returned the real value
applied. To know this value you can run a query such as :sql:`show
transaction_isolation`. Usually the default value is `READ
COMMITTED`, but this may be changed in the server configuration.
.. index:: .. index::
pair: Client; Encoding pair: Client; Encoding

View File

@ -72,7 +72,7 @@ ISOLATION_LEVEL_READ_UNCOMMITTED = 4
ISOLATION_LEVEL_READ_COMMITTED = 1 ISOLATION_LEVEL_READ_COMMITTED = 1
ISOLATION_LEVEL_REPEATABLE_READ = 2 ISOLATION_LEVEL_REPEATABLE_READ = 2
ISOLATION_LEVEL_SERIALIZABLE = 3 ISOLATION_LEVEL_SERIALIZABLE = 3
ISOLATION_LEVEL_DEFAULT = 5 ISOLATION_LEVEL_DEFAULT = None
"""psycopg connection status values.""" """psycopg connection status values."""

View File

@ -154,7 +154,6 @@ HIDDEN PyObject *conn_encode(connectionObject *self, PyObject *b);
HIDDEN PyObject *conn_decode(connectionObject *self, const char *str, Py_ssize_t len); HIDDEN PyObject *conn_decode(connectionObject *self, const char *str, Py_ssize_t len);
HIDDEN int conn_get_standard_conforming_strings(PGconn *pgconn); HIDDEN int conn_get_standard_conforming_strings(PGconn *pgconn);
HIDDEN PyObject *conn_pgenc_to_pyenc(const char *encoding, char **clean_encoding); HIDDEN PyObject *conn_pgenc_to_pyenc(const char *encoding, char **clean_encoding);
RAISES_NEG HIDDEN int conn_get_isolation_level(connectionObject *self);
HIDDEN int conn_get_protocol_version(PGconn *pgconn); HIDDEN int conn_get_protocol_version(PGconn *pgconn);
HIDDEN int conn_get_server_version(PGconn *pgconn); HIDDEN int conn_get_server_version(PGconn *pgconn);
HIDDEN void conn_notice_process(connectionObject *self); HIDDEN void conn_notice_process(connectionObject *self);

View File

@ -568,19 +568,6 @@ exit:
} }
RAISES_NEG int
conn_get_isolation_level(connectionObject *self)
{
/* this may get called by async connections too: here's your result */
if (self->autocommit) {
return ISOLATION_LEVEL_AUTOCOMMIT;
}
else {
return self->isolevel;
}
}
int int
conn_get_protocol_version(PGconn *pgconn) conn_get_protocol_version(PGconn *pgconn)
{ {
@ -697,6 +684,9 @@ conn_setup(connectionObject *self, PGconn *pgconn)
/* for reset */ /* for reset */
self->autocommit = 0; self->autocommit = 0;
self->isolevel = ISOLATION_LEVEL_DEFAULT;
self->readonly = STATE_DEFAULT;
self->deferrable = STATE_DEFAULT;
/* success */ /* success */
rv = 0; rv = 0;
@ -1199,6 +1189,13 @@ conn_set_session(connectionObject *self, int autocommit,
PGresult *pgres = NULL; PGresult *pgres = NULL;
char *error = NULL; char *error = NULL;
if (deferrable != self->deferrable && self->server_version < 90100) {
PyErr_SetString(ProgrammingError,
"the 'deferrable' setting is only available"
" from PostgreSQL 9.1");
goto exit;
}
/* Promote an isolation level to one of the levels supported by the server */ /* Promote an isolation level to one of the levels supported by the server */
if (self->server_version < 80000) { if (self->server_version < 80000) {
if (isolevel == ISOLATION_LEVEL_READ_UNCOMMITTED) { if (isolevel == ISOLATION_LEVEL_READ_UNCOMMITTED) {
@ -1229,7 +1226,7 @@ conn_set_session(connectionObject *self, int autocommit,
goto endlock; goto endlock;
} }
} }
if (deferrable != self->deferrable && self->server_version >= 90100) { if (deferrable != self->deferrable) {
if (0 > pq_set_guc_locked(self, if (0 > pq_set_guc_locked(self,
"default_transaction_deferrable", srv_state_guc[deferrable], "default_transaction_deferrable", srv_state_guc[deferrable],
&pgres, &error, &_save)) { &pgres, &error, &_save)) {
@ -1240,17 +1237,21 @@ conn_set_session(connectionObject *self, int autocommit,
else if (self->autocommit) { else if (self->autocommit) {
/* we are moving from autocommit to not autocommit, so revert the /* we are moving from autocommit to not autocommit, so revert the
* characteristics to defaults to let BEGIN do its work */ * characteristics to defaults to let BEGIN do its work */
if (0 > pq_set_guc_locked(self, if (self->isolevel != ISOLATION_LEVEL_DEFAULT) {
"default_transaction_isolation", "default", if (0 > pq_set_guc_locked(self,
&pgres, &error, &_save)) { "default_transaction_isolation", "default",
goto endlock; &pgres, &error, &_save)) {
goto endlock;
}
} }
if (0 > pq_set_guc_locked(self, if (self->readonly != STATE_DEFAULT) {
"default_transaction_read_only", "default", if (0 > pq_set_guc_locked(self,
&pgres, &error, &_save)) { "default_transaction_read_only", "default",
goto endlock; &pgres, &error, &_save)) {
goto endlock;
}
} }
if (self->server_version >= 90100) { if (self->deferrable != STATE_DEFAULT) {
if (0 > pq_set_guc_locked(self, if (0 > pq_set_guc_locked(self,
"default_transaction_deferrable", "default", "default_transaction_deferrable", "default",
&pgres, &error, &_save)) { &pgres, &error, &_save)) {

View File

@ -455,8 +455,14 @@ _psyco_conn_parse_isolevel(PyObject *pyval)
Py_INCREF(pyval); /* for ensure_bytes */ Py_INCREF(pyval); /* for ensure_bytes */
/* None is default. This is only used when setting the property, because
* set_session() has None used as "don't change" */
if (pyval == Py_None) {
rv = ISOLATION_LEVEL_DEFAULT;
}
/* parse from one of the level constants */ /* parse from one of the level constants */
if (PyInt_Check(pyval)) { else if (PyInt_Check(pyval)) {
level = PyInt_AsLong(pyval); level = PyInt_AsLong(pyval);
if (level == -1 && PyErr_Occurred()) { goto exit; } if (level == -1 && PyErr_Occurred()) { goto exit; }
if (level < 1 || level > 4) { if (level < 1 || level > 4) {
@ -469,7 +475,6 @@ _psyco_conn_parse_isolevel(PyObject *pyval)
} }
/* parse from the string -- this includes "default" */ /* parse from the string -- this includes "default" */
else { else {
if (!(pyval = psycopg_ensure_bytes(pyval))) { if (!(pyval = psycopg_ensure_bytes(pyval))) {
goto exit; goto exit;
@ -505,7 +510,10 @@ _psyco_conn_parse_onoff(PyObject *pyval)
Py_INCREF(pyval); /* for ensure_bytes */ Py_INCREF(pyval); /* for ensure_bytes */
if (PyUnicode_CheckExact(pyval) || Bytes_CheckExact(pyval)) { if (pyval == Py_None) {
rv = STATE_DEFAULT;
}
else if (PyUnicode_CheckExact(pyval) || Bytes_CheckExact(pyval)) {
if (!(pyval = psycopg_ensure_bytes(pyval))) { if (!(pyval = psycopg_ensure_bytes(pyval))) {
goto exit; goto exit;
} }
@ -580,13 +588,7 @@ psyco_conn_set_session(connectionObject *self, PyObject *args, PyObject *kwargs)
} }
} }
if (Py_None != deferrable) { if (Py_None != deferrable) {
if (self->server_version < 90100) { if (0 > (c_deferrable = _psyco_conn_parse_onoff(deferrable))) {
PyErr_SetString(ProgrammingError,
"the 'deferrable' setting is only available"
" from PostgreSQL 9.1");
return NULL;
}
if (0 > (c_deferrable = _psyco_conn_parse_onoff(readonly))) {
return NULL; return NULL;
} }
} }
@ -604,6 +606,8 @@ psyco_conn_set_session(connectionObject *self, PyObject *args, PyObject *kwargs)
} }
/* autocommit - return or set the current autocommit status */
#define psyco_conn_autocommit_doc \ #define psyco_conn_autocommit_doc \
"Set or return the autocommit status." "Set or return the autocommit status."
@ -617,11 +621,11 @@ psyco_conn_autocommit_get(connectionObject *self)
} }
BORROWED static PyObject * BORROWED static PyObject *
_psyco_conn_autocommit_set_checks(connectionObject *self) _psyco_set_session_check_setter_wrapper(connectionObject *self)
{ {
/* wrapper to use the EXC_IF macros. /* wrapper to use the EXC_IF macros.
* return NULL in case of error, else whatever */ * return NULL in case of error, else whatever */
_set_session_checks(self, autocommit); _set_session_checks(self, set_session);
return Py_None; /* borrowed */ return Py_None; /* borrowed */
} }
@ -630,7 +634,7 @@ psyco_conn_autocommit_set(connectionObject *self, PyObject *pyvalue)
{ {
int value; int value;
if (!_psyco_conn_autocommit_set_checks(self)) { return -1; } if (!_psyco_set_session_check_setter_wrapper(self)) { return -1; }
if (-1 == (value = PyObject_IsTrue(pyvalue))) { return -1; } if (-1 == (value = PyObject_IsTrue(pyvalue))) { return -1; }
if (0 > conn_set_session(self, value, if (0 > conn_set_session(self, value,
self->isolevel, self->readonly, self->deferrable)) { self->isolevel, self->readonly, self->deferrable)) {
@ -641,19 +645,35 @@ psyco_conn_autocommit_set(connectionObject *self, PyObject *pyvalue)
} }
/* isolation_level - return the current isolation level */ /* isolation_level - return or set the current isolation level */
#define psyco_conn_isolation_level_doc \
"Set or return the connection transaction isolation level."
static PyObject * static PyObject *
psyco_conn_isolation_level_get(connectionObject *self) psyco_conn_isolation_level_get(connectionObject *self)
{ {
int rv; if (self->isolevel == ISOLATION_LEVEL_DEFAULT) {
Py_RETURN_NONE;
} else {
return PyInt_FromLong((long)self->isolevel);
}
}
EXC_IF_CONN_CLOSED(self);
EXC_IF_TPC_PREPARED(self, set_isolation_level);
rv = conn_get_isolation_level(self); static int
if (-1 == rv) { return NULL; } psyco_conn_isolation_level_set(connectionObject *self, PyObject *pyvalue)
return PyInt_FromLong((long)rv); {
int value;
if (!_psyco_set_session_check_setter_wrapper(self)) { return -1; }
if (0 > (value = _psyco_conn_parse_isolevel(pyvalue))) { return -1; }
if (0 > conn_set_session(self, self->autocommit,
value, self->readonly, self->deferrable)) {
return -1;
}
return 0;
} }
@ -666,20 +686,36 @@ static PyObject *
psyco_conn_set_isolation_level(connectionObject *self, PyObject *args) psyco_conn_set_isolation_level(connectionObject *self, PyObject *args)
{ {
int level = 1; int level = 1;
PyObject *pyval = NULL;
_set_session_checks(self, set_isolation_level); EXC_IF_CONN_CLOSED(self);
EXC_IF_CONN_ASYNC(self, "isolation_level");
EXC_IF_TPC_PREPARED(self, "isolation_level");
if (!PyArg_ParseTuple(args, "i", &level)) return NULL; if (!PyArg_ParseTuple(args, "O", &pyval)) return NULL;
if (level < 0 || level > 5) { if (pyval == Py_None) {
PyErr_SetString(PyExc_ValueError, level = ISOLATION_LEVEL_DEFAULT;
"isolation level must be between 0 and 4"); }
/* parse from one of the level constants */
else if (PyInt_Check(pyval)) {
level = PyInt_AsLong(pyval);
if (level < 0 || level > 4) {
PyErr_SetString(PyExc_ValueError,
"isolation level must be between 0 and 4");
return NULL;
}
}
if (0 > conn_rollback(self)) {
return NULL; return NULL;
} }
if (level == 0) { if (level == 0) {
if (0 > conn_set_session(self, 1, if (0 > conn_set_session(self, 1,
ISOLATION_LEVEL_DEFAULT, self->readonly, self->deferrable)) { self->isolevel, self->readonly, self->deferrable)) {
return NULL; return NULL;
} }
} }
@ -693,6 +729,99 @@ psyco_conn_set_isolation_level(connectionObject *self, PyObject *args)
Py_RETURN_NONE; Py_RETURN_NONE;
} }
/* readonly - return or set the current read-only status */
#define psyco_conn_readonly_doc \
"Set or return the connection read-only status."
static PyObject *
psyco_conn_readonly_get(connectionObject *self)
{
PyObject *rv = NULL;
switch (self->readonly) {
case STATE_OFF:
rv = Py_False;
break;
case STATE_ON:
rv = Py_True;
break;
case STATE_DEFAULT:
rv = Py_None;
break;
default:
PyErr_Format(InternalError,
"bad internal value for readonly: %d", self->readonly);
break;
}
return rv;
}
static int
psyco_conn_readonly_set(connectionObject *self, PyObject *pyvalue)
{
int value;
if (!_psyco_set_session_check_setter_wrapper(self)) { return -1; }
if (0 > (value = _psyco_conn_parse_onoff(pyvalue))) { return -1; }
if (0 > conn_set_session(self, self->autocommit,
self->isolevel, value, self->deferrable)) {
return -1;
}
return 0;
}
/* deferrable - return or set the current deferrable status */
#define psyco_conn_deferrable_doc \
"Set or return the connection deferrable status."
static PyObject *
psyco_conn_deferrable_get(connectionObject *self)
{
PyObject *rv = NULL;
switch (self->deferrable) {
case STATE_OFF:
rv = Py_False;
break;
case STATE_ON:
rv = Py_True;
break;
case STATE_DEFAULT:
rv = Py_None;
break;
default:
PyErr_Format(InternalError,
"bad internal value for deferrable: %d", self->deferrable);
break;
}
return rv;
}
static int
psyco_conn_deferrable_set(connectionObject *self, PyObject *pyvalue)
{
int value;
if (!_psyco_set_session_check_setter_wrapper(self)) { return -1; }
if (0 > (value = _psyco_conn_parse_onoff(pyvalue))) { return -1; }
if (0 > conn_set_session(self, self->autocommit,
self->isolevel, self->readonly, value)) {
return -1;
}
return 0;
}
/* set_client_encoding method - set client encoding */ /* set_client_encoding method - set client encoding */
#define psyco_conn_set_client_encoding_doc \ #define psyco_conn_set_client_encoding_doc \
@ -1103,8 +1232,16 @@ static struct PyGetSetDef connectionObject_getsets[] = {
psyco_conn_autocommit_doc }, psyco_conn_autocommit_doc },
{ "isolation_level", { "isolation_level",
(getter)psyco_conn_isolation_level_get, (getter)psyco_conn_isolation_level_get,
(setter)NULL, (setter)psyco_conn_isolation_level_set,
"The current isolation level." }, psyco_conn_isolation_level_doc },
{ "readonly",
(getter)psyco_conn_readonly_get,
(setter)psyco_conn_readonly_set,
psyco_conn_readonly_doc },
{ "deferrable",
(getter)psyco_conn_deferrable_get,
(setter)psyco_conn_deferrable_set,
psyco_conn_deferrable_doc },
{NULL} {NULL}
}; };
#undef EXCEPTION_GETTER #undef EXCEPTION_GETTER

View File

@ -26,7 +26,7 @@
from testutils import unittest, skip_before_postgres, slow from testutils import unittest, skip_before_postgres, slow
import psycopg2 import psycopg2
from psycopg2 import extensions from psycopg2 import extensions as ext
import time import time
import StringIO import StringIO
@ -74,13 +74,14 @@ class AsyncTests(ConnectingTestCase):
self.assert_(self.conn.async_) self.assert_(self.conn.async_)
self.assert_(not self.sync_conn.async_) self.assert_(not self.sync_conn.async_)
# the async connection should be in isolevel 0 # the async connection should be autocommit
self.assertEquals(self.conn.isolation_level, 0) self.assert_(self.conn.autocommit)
self.assertEquals(self.conn.isolation_level, ext.ISOLATION_LEVEL_DEFAULT)
# check other properties to be found on the connection # check other properties to be found on the connection
self.assert_(self.conn.server_version) self.assert_(self.conn.server_version)
self.assert_(self.conn.protocol_version in (2, 3)) self.assert_(self.conn.protocol_version in (2, 3))
self.assert_(self.conn.encoding in psycopg2.extensions.encodings) self.assert_(self.conn.encoding in ext.encodings)
def test_async_named_cursor(self): def test_async_named_cursor(self):
self.assertRaises(psycopg2.ProgrammingError, self.assertRaises(psycopg2.ProgrammingError,
@ -192,7 +193,7 @@ class AsyncTests(ConnectingTestCase):
# getting transaction status works # getting transaction status works
self.assertEquals(self.conn.get_transaction_status(), self.assertEquals(self.conn.get_transaction_status(),
extensions.TRANSACTION_STATUS_ACTIVE) ext.TRANSACTION_STATUS_ACTIVE)
self.assertTrue(self.conn.isexecuting()) self.assertTrue(self.conn.isexecuting())
# setting connection encoding should fail # setting connection encoding should fail
@ -311,9 +312,9 @@ class AsyncTests(ConnectingTestCase):
self.assertEquals(cur.fetchone()[0], "b" * 10000) self.assertEquals(cur.fetchone()[0], "b" * 10000)
def test_async_subclass(self): def test_async_subclass(self):
class MyConn(psycopg2.extensions.connection): class MyConn(ext.connection):
def __init__(self, dsn, async_=0): def __init__(self, dsn, async_=0):
psycopg2.extensions.connection.__init__(self, dsn, async_=async_) ext.connection.__init__(self, dsn, async_=async_)
conn = self.connect(connection_factory=MyConn, async_=True) conn = self.connect(connection_factory=MyConn, async_=True)
self.assert_(isinstance(conn, MyConn)) self.assert_(isinstance(conn, MyConn))
@ -330,7 +331,7 @@ class AsyncTests(ConnectingTestCase):
curs.execute("select %s;", ('x' * size,)) curs.execute("select %s;", ('x' * size,))
self.wait(stub) self.wait(stub)
self.assertEqual(size, len(curs.fetchone()[0])) self.assertEqual(size, len(curs.fetchone()[0]))
if stub.polls.count(psycopg2.extensions.POLL_WRITE) > 1: if stub.polls.count(ext.POLL_WRITE) > 1:
return return
# This is more a testing glitch than an error: it happens # This is more a testing glitch than an error: it happens

View File

@ -59,8 +59,8 @@ class AsyncTests(ConnectingTestCase):
self.assert_(self.conn.async) self.assert_(self.conn.async)
self.assert_(not self.sync_conn.async) self.assert_(not self.sync_conn.async)
# the async connection should be in isolevel 0 # the async connection should be autocommit
self.assertEquals(self.conn.isolation_level, 0) self.assert_(self.conn.autocommit)
# check other properties to be found on the connection # check other properties to be found on the connection
self.assert_(self.conn.server_version) self.assert_(self.conn.server_version)

View File

@ -89,13 +89,26 @@ class ConnectionTests(ConnectingTestCase):
def test_reset(self): def test_reset(self):
conn = self.conn conn = self.conn
# switch isolation level, then reset # switch session characteristics
level = conn.isolation_level conn.autocommit = True
conn.set_isolation_level(0) conn.isolation_level = 'serializable'
self.assertEqual(conn.isolation_level, 0) conn.readonly = True
if self.conn.server_version >= 90100:
conn.deferrable = False
self.assert_(conn.autocommit)
self.assertEqual(conn.isolation_level, ext.ISOLATION_LEVEL_SERIALIZABLE)
self.assert_(conn.readonly is True)
if self.conn.server_version >= 90100:
self.assert_(conn.deferrable is False)
conn.reset() conn.reset()
# now the isolation level should be equal to saved one # now the session characteristics should be reverted
self.assertEqual(conn.isolation_level, level) self.assert_(not conn.autocommit)
self.assertEqual(conn.isolation_level, ext.ISOLATION_LEVEL_DEFAULT)
self.assert_(conn.readonly is None)
if self.conn.server_version >= 90100:
self.assert_(conn.deferrable is None)
def test_notices(self): def test_notices(self):
conn = self.conn conn = self.conn
@ -222,7 +235,7 @@ class ConnectionTests(ConnectingTestCase):
self.conn.set_client_encoding("EUC_JP") self.conn.set_client_encoding("EUC_JP")
# conn.encoding is 'EUCJP' now. # conn.encoding is 'EUCJP' now.
cur = self.conn.cursor() cur = self.conn.cursor()
psycopg2.extensions.register_type(psycopg2.extensions.UNICODE, cur) ext.register_type(ext.UNICODE, cur)
cur.execute("select 'foo'::text;") cur.execute("select 'foo'::text;")
self.assertEqual(cur.fetchone()[0], u'foo') self.assertEqual(cur.fetchone()[0], u'foo')
@ -308,14 +321,14 @@ class ConnectionTests(ConnectingTestCase):
# issue #210 # issue #210
conn = self.connect() conn = self.connect()
cur = conn.cursor(cursor_factory=None) cur = conn.cursor(cursor_factory=None)
self.assertEqual(type(cur), psycopg2.extensions.cursor) self.assertEqual(type(cur), ext.cursor)
conn = self.connect(cursor_factory=psycopg2.extras.DictCursor) conn = self.connect(cursor_factory=psycopg2.extras.DictCursor)
cur = conn.cursor(cursor_factory=None) cur = conn.cursor(cursor_factory=None)
self.assertEqual(type(cur), psycopg2.extras.DictCursor) self.assertEqual(type(cur), psycopg2.extras.DictCursor)
def test_failed_init_status(self): def test_failed_init_status(self):
class SubConnection(psycopg2.extensions.connection): class SubConnection(ext.connection):
def __init__(self, dsn): def __init__(self, dsn):
try: try:
super(SubConnection, self).__init__(dsn) super(SubConnection, self).__init__(dsn)
@ -488,23 +501,22 @@ class IsolationLevelsTestCase(ConnectingTestCase):
conn = self.connect() conn = self.connect()
self.assertEqual( self.assertEqual(
conn.isolation_level, conn.isolation_level,
psycopg2.extensions.ISOLATION_LEVEL_DEFAULT) ext.ISOLATION_LEVEL_DEFAULT)
def test_encoding(self): def test_encoding(self):
conn = self.connect() conn = self.connect()
self.assert_(conn.encoding in psycopg2.extensions.encodings) self.assert_(conn.encoding in ext.encodings)
def test_set_isolation_level(self): def test_set_isolation_level(self):
conn = self.connect() conn = self.connect()
curs = conn.cursor() curs = conn.cursor()
levels = [ levels = [
(None, psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT),
('read uncommitted', ('read uncommitted',
psycopg2.extensions.ISOLATION_LEVEL_READ_UNCOMMITTED), ext.ISOLATION_LEVEL_READ_UNCOMMITTED),
('read committed', psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED), ('read committed', ext.ISOLATION_LEVEL_READ_COMMITTED),
('repeatable read', psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ), ('repeatable read', ext.ISOLATION_LEVEL_REPEATABLE_READ),
('serializable', psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE), ('serializable', ext.ISOLATION_LEVEL_SERIALIZABLE),
] ]
for name, level in levels: for name, level in levels:
conn.set_isolation_level(level) conn.set_isolation_level(level)
@ -512,8 +524,8 @@ class IsolationLevelsTestCase(ConnectingTestCase):
# the only values available on prehistoric PG versions # the only values available on prehistoric PG versions
if conn.server_version < 80000: if conn.server_version < 80000:
if level in ( if level in (
psycopg2.extensions.ISOLATION_LEVEL_READ_UNCOMMITTED, ext.ISOLATION_LEVEL_READ_UNCOMMITTED,
psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ): ext.ISOLATION_LEVEL_REPEATABLE_READ):
name, level = levels[levels.index((name, level)) + 1] name, level = levels[levels.index((name, level)) + 1]
self.assertEqual(conn.isolation_level, level) self.assertEqual(conn.isolation_level, level)
@ -521,15 +533,26 @@ class IsolationLevelsTestCase(ConnectingTestCase):
curs.execute('show transaction_isolation;') curs.execute('show transaction_isolation;')
got_name = curs.fetchone()[0] got_name = curs.fetchone()[0]
if name is None:
curs.execute('show transaction_isolation;')
name = curs.fetchone()[0]
self.assertEqual(name, got_name) self.assertEqual(name, got_name)
conn.commit() conn.commit()
self.assertRaises(ValueError, conn.set_isolation_level, -1) self.assertRaises(ValueError, conn.set_isolation_level, -1)
self.assertRaises(ValueError, conn.set_isolation_level, 6) self.assertRaises(ValueError, conn.set_isolation_level, 5)
def test_set_isolation_level_autocommit(self):
conn = self.connect()
curs = conn.cursor()
conn.set_isolation_level(ext.ISOLATION_LEVEL_AUTOCOMMIT)
self.assertEqual(conn.isolation_level, ext.ISOLATION_LEVEL_DEFAULT)
self.assert_(conn.autocommit)
conn.isolation_level = 'serializable'
self.assertEqual(conn.isolation_level, ext.ISOLATION_LEVEL_SERIALIZABLE)
self.assert_(conn.autocommit)
curs.execute('show transaction_isolation;')
self.assertEqual(curs.fetchone()[0], 'serializable')
def test_set_isolation_level_default(self): def test_set_isolation_level_default(self):
conn = self.connect() conn = self.connect()
@ -539,14 +562,14 @@ class IsolationLevelsTestCase(ConnectingTestCase):
curs.execute("set default_transaction_isolation to 'read committed'") curs.execute("set default_transaction_isolation to 'read committed'")
conn.autocommit = False conn.autocommit = False
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) conn.set_isolation_level(ext.ISOLATION_LEVEL_SERIALIZABLE)
self.assertEqual(conn.isolation_level, self.assertEqual(conn.isolation_level,
psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) ext.ISOLATION_LEVEL_SERIALIZABLE)
curs.execute("show transaction_isolation") curs.execute("show transaction_isolation")
self.assertEqual(curs.fetchone()[0], "serializable") self.assertEqual(curs.fetchone()[0], "serializable")
conn.rollback() conn.rollback()
conn.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_DEFAULT) conn.set_isolation_level(ext.ISOLATION_LEVEL_DEFAULT)
curs.execute("show transaction_isolation") curs.execute("show transaction_isolation")
self.assertEqual(curs.fetchone()[0], "read committed") self.assertEqual(curs.fetchone()[0], "read committed")
@ -554,25 +577,45 @@ class IsolationLevelsTestCase(ConnectingTestCase):
conn = self.connect() conn = self.connect()
cur = conn.cursor() cur = conn.cursor()
self.assertEqual(ext.TRANSACTION_STATUS_IDLE,
conn.get_transaction_status())
cur.execute("insert into isolevel values (10);")
self.assertEqual(ext.TRANSACTION_STATUS_INTRANS,
conn.get_transaction_status())
conn.set_isolation_level(
psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE, self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE,
conn.get_transaction_status()) conn.get_transaction_status())
cur.execute("select count(*) from isolevel;")
self.assertEqual(0, cur.fetchone()[0])
cur.execute("insert into isolevel values (10);") cur.execute("insert into isolevel values (10);")
self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_INTRANS, self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_INTRANS,
conn.get_transaction_status()) conn.get_transaction_status())
conn.set_isolation_level(
# changed in psycopg 2.7 psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
self.assertRaises(psycopg2.ProgrammingError, self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE,
conn.set_isolation_level,
psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_INTRANS,
conn.get_transaction_status()) conn.get_transaction_status())
cur.execute("select count(*) from isolevel;")
self.assertEqual(0, cur.fetchone()[0])
cur.execute("insert into isolevel values (10);")
self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE,
conn.get_transaction_status())
conn.set_isolation_level(
psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE,
conn.get_transaction_status())
cur.execute("select count(*) from isolevel;")
self.assertEqual(1, cur.fetchone()[0])
self.assertEqual(conn.isolation_level, self.assertEqual(conn.isolation_level,
psycopg2.extensions.ISOLATION_LEVEL_DEFAULT) psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
def test_isolation_level_autocommit(self): def test_isolation_level_autocommit(self):
cnn1 = self.connect() cnn1 = self.connect()
cnn2 = self.connect() cnn2 = self.connect()
cnn2.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT) cnn2.set_isolation_level(ext.ISOLATION_LEVEL_AUTOCOMMIT)
cur1 = cnn1.cursor() cur1 = cnn1.cursor()
cur1.execute("select count(*) from isolevel;") cur1.execute("select count(*) from isolevel;")
@ -588,7 +631,7 @@ class IsolationLevelsTestCase(ConnectingTestCase):
def test_isolation_level_read_committed(self): def test_isolation_level_read_committed(self):
cnn1 = self.connect() cnn1 = self.connect()
cnn2 = self.connect() cnn2 = self.connect()
cnn2.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED) cnn2.set_isolation_level(ext.ISOLATION_LEVEL_READ_COMMITTED)
cur1 = cnn1.cursor() cur1 = cnn1.cursor()
cur1.execute("select count(*) from isolevel;") cur1.execute("select count(*) from isolevel;")
@ -614,7 +657,7 @@ class IsolationLevelsTestCase(ConnectingTestCase):
def test_isolation_level_serializable(self): def test_isolation_level_serializable(self):
cnn1 = self.connect() cnn1 = self.connect()
cnn2 = self.connect() cnn2 = self.connect()
cnn2.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) cnn2.set_isolation_level(ext.ISOLATION_LEVEL_SERIALIZABLE)
cur1 = cnn1.cursor() cur1 = cnn1.cursor()
cur1.execute("select count(*) from isolevel;") cur1.execute("select count(*) from isolevel;")
@ -643,13 +686,112 @@ class IsolationLevelsTestCase(ConnectingTestCase):
def test_isolation_level_closed(self): def test_isolation_level_closed(self):
cnn = self.connect() cnn = self.connect()
cnn.close() cnn.close()
self.assertRaises(psycopg2.InterfaceError, getattr,
cnn, 'isolation_level')
self.assertRaises(psycopg2.InterfaceError, self.assertRaises(psycopg2.InterfaceError,
cnn.set_isolation_level, 0) cnn.set_isolation_level, 0)
self.assertRaises(psycopg2.InterfaceError, self.assertRaises(psycopg2.InterfaceError,
cnn.set_isolation_level, 1) cnn.set_isolation_level, 1)
def test_setattr_isolation_level_int(self):
cur = self.conn.cursor()
self.conn.isolation_level = ext.ISOLATION_LEVEL_SERIALIZABLE
self.assertEqual(self.conn.isolation_level, ext.ISOLATION_LEVEL_SERIALIZABLE)
cur.execute("SHOW transaction_isolation;")
self.assertEqual(cur.fetchone()[0], 'serializable')
self.conn.rollback()
self.conn.isolation_level = ext.ISOLATION_LEVEL_REPEATABLE_READ
cur.execute("SHOW transaction_isolation;")
if self.conn.server_version > 80000:
self.assertEqual(self.conn.isolation_level,
ext.ISOLATION_LEVEL_REPEATABLE_READ)
self.assertEqual(cur.fetchone()[0], 'repeatable read')
else:
self.assertEqual(self.conn.isolation_level,
ext.ISOLATION_LEVEL_SERIALIZABLE)
self.assertEqual(cur.fetchone()[0], 'serializable')
self.conn.rollback()
self.conn.isolation_level = ext.ISOLATION_LEVEL_READ_COMMITTED
self.assertEqual(self.conn.isolation_level,
ext.ISOLATION_LEVEL_READ_COMMITTED)
cur.execute("SHOW transaction_isolation;")
self.assertEqual(cur.fetchone()[0], 'read committed')
self.conn.rollback()
self.conn.isolation_level = ext.ISOLATION_LEVEL_READ_UNCOMMITTED
cur.execute("SHOW transaction_isolation;")
if self.conn.server_version > 80000:
self.assertEqual(self.conn.isolation_level,
ext.ISOLATION_LEVEL_READ_UNCOMMITTED)
self.assertEqual(cur.fetchone()[0], 'read uncommitted')
else:
self.assertEqual(self.conn.isolation_level,
ext.ISOLATION_LEVEL_READ_COMMITTED)
self.assertEqual(cur.fetchone()[0], 'read committed')
self.conn.rollback()
self.assertEqual(ext.ISOLATION_LEVEL_DEFAULT, None)
self.conn.isolation_level = ext.ISOLATION_LEVEL_DEFAULT
self.assertEqual(self.conn.isolation_level, None)
cur.execute("SHOW transaction_isolation;")
isol = cur.fetchone()[0]
cur.execute("SHOW default_transaction_isolation;")
self.assertEqual(cur.fetchone()[0], isol)
def test_setattr_isolation_level_str(self):
cur = self.conn.cursor()
self.conn.isolation_level = "serializable"
self.assertEqual(self.conn.isolation_level, ext.ISOLATION_LEVEL_SERIALIZABLE)
cur.execute("SHOW transaction_isolation;")
self.assertEqual(cur.fetchone()[0], 'serializable')
self.conn.rollback()
self.conn.isolation_level = "repeatable read"
cur.execute("SHOW transaction_isolation;")
if self.conn.server_version > 80000:
self.assertEqual(self.conn.isolation_level,
ext.ISOLATION_LEVEL_REPEATABLE_READ)
self.assertEqual(cur.fetchone()[0], 'repeatable read')
else:
self.assertEqual(self.conn.isolation_level,
ext.ISOLATION_LEVEL_SERIALIZABLE)
self.assertEqual(cur.fetchone()[0], 'serializable')
self.conn.rollback()
self.conn.isolation_level = "read committed"
self.assertEqual(self.conn.isolation_level,
ext.ISOLATION_LEVEL_READ_COMMITTED)
cur.execute("SHOW transaction_isolation;")
self.assertEqual(cur.fetchone()[0], 'read committed')
self.conn.rollback()
self.conn.isolation_level = "read uncommitted"
cur.execute("SHOW transaction_isolation;")
if self.conn.server_version > 80000:
self.assertEqual(self.conn.isolation_level,
ext.ISOLATION_LEVEL_READ_UNCOMMITTED)
self.assertEqual(cur.fetchone()[0], 'read uncommitted')
else:
self.assertEqual(self.conn.isolation_level,
ext.ISOLATION_LEVEL_READ_COMMITTED)
self.assertEqual(cur.fetchone()[0], 'read committed')
self.conn.rollback()
self.conn.isolation_level = "default"
self.assertEqual(self.conn.isolation_level, None)
cur.execute("SHOW transaction_isolation;")
isol = cur.fetchone()[0]
cur.execute("SHOW default_transaction_isolation;")
self.assertEqual(cur.fetchone()[0], isol)
def test_setattr_isolation_level_invalid(self):
self.assertRaises(ValueError, setattr, self.conn, 'isolation_level', 0)
self.assertRaises(ValueError, setattr, self.conn, 'isolation_level', -1)
self.assertRaises(ValueError, setattr, self.conn, 'isolation_level', 5)
self.assertRaises(ValueError, setattr, self.conn, 'isolation_level', 'bah')
class ConnectionTwoPhaseTests(ConnectingTestCase): class ConnectionTwoPhaseTests(ConnectingTestCase):
def setUp(self): def setUp(self):
@ -716,10 +858,10 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
def test_tpc_commit(self): def test_tpc_commit(self):
cnn = self.connect() cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual") xid = cnn.xid(1, "gtrid", "bqual")
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(cnn.status, ext.STATUS_READY)
cnn.tpc_begin(xid) cnn.tpc_begin(xid)
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_BEGIN) self.assertEqual(cnn.status, ext.STATUS_BEGIN)
cur = cnn.cursor() cur = cnn.cursor()
cur.execute("insert into test_tpc values ('test_tpc_commit');") cur.execute("insert into test_tpc values ('test_tpc_commit');")
@ -727,22 +869,22 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
self.assertEqual(0, self.count_test_records()) self.assertEqual(0, self.count_test_records())
cnn.tpc_prepare() cnn.tpc_prepare()
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_PREPARED) self.assertEqual(cnn.status, ext.STATUS_PREPARED)
self.assertEqual(1, self.count_xacts()) self.assertEqual(1, self.count_xacts())
self.assertEqual(0, self.count_test_records()) self.assertEqual(0, self.count_test_records())
cnn.tpc_commit() cnn.tpc_commit()
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(cnn.status, ext.STATUS_READY)
self.assertEqual(0, self.count_xacts()) self.assertEqual(0, self.count_xacts())
self.assertEqual(1, self.count_test_records()) self.assertEqual(1, self.count_test_records())
def test_tpc_commit_one_phase(self): def test_tpc_commit_one_phase(self):
cnn = self.connect() cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual") xid = cnn.xid(1, "gtrid", "bqual")
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(cnn.status, ext.STATUS_READY)
cnn.tpc_begin(xid) cnn.tpc_begin(xid)
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_BEGIN) self.assertEqual(cnn.status, ext.STATUS_BEGIN)
cur = cnn.cursor() cur = cnn.cursor()
cur.execute("insert into test_tpc values ('test_tpc_commit_1p');") cur.execute("insert into test_tpc values ('test_tpc_commit_1p');")
@ -750,17 +892,17 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
self.assertEqual(0, self.count_test_records()) self.assertEqual(0, self.count_test_records())
cnn.tpc_commit() cnn.tpc_commit()
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(cnn.status, ext.STATUS_READY)
self.assertEqual(0, self.count_xacts()) self.assertEqual(0, self.count_xacts())
self.assertEqual(1, self.count_test_records()) self.assertEqual(1, self.count_test_records())
def test_tpc_commit_recovered(self): def test_tpc_commit_recovered(self):
cnn = self.connect() cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual") xid = cnn.xid(1, "gtrid", "bqual")
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(cnn.status, ext.STATUS_READY)
cnn.tpc_begin(xid) cnn.tpc_begin(xid)
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_BEGIN) self.assertEqual(cnn.status, ext.STATUS_BEGIN)
cur = cnn.cursor() cur = cnn.cursor()
cur.execute("insert into test_tpc values ('test_tpc_commit_rec');") cur.execute("insert into test_tpc values ('test_tpc_commit_rec');")
@ -776,17 +918,17 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
xid = cnn.xid(1, "gtrid", "bqual") xid = cnn.xid(1, "gtrid", "bqual")
cnn.tpc_commit(xid) cnn.tpc_commit(xid)
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(cnn.status, ext.STATUS_READY)
self.assertEqual(0, self.count_xacts()) self.assertEqual(0, self.count_xacts())
self.assertEqual(1, self.count_test_records()) self.assertEqual(1, self.count_test_records())
def test_tpc_rollback(self): def test_tpc_rollback(self):
cnn = self.connect() cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual") xid = cnn.xid(1, "gtrid", "bqual")
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(cnn.status, ext.STATUS_READY)
cnn.tpc_begin(xid) cnn.tpc_begin(xid)
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_BEGIN) self.assertEqual(cnn.status, ext.STATUS_BEGIN)
cur = cnn.cursor() cur = cnn.cursor()
cur.execute("insert into test_tpc values ('test_tpc_rollback');") cur.execute("insert into test_tpc values ('test_tpc_rollback');")
@ -794,22 +936,22 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
self.assertEqual(0, self.count_test_records()) self.assertEqual(0, self.count_test_records())
cnn.tpc_prepare() cnn.tpc_prepare()
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_PREPARED) self.assertEqual(cnn.status, ext.STATUS_PREPARED)
self.assertEqual(1, self.count_xacts()) self.assertEqual(1, self.count_xacts())
self.assertEqual(0, self.count_test_records()) self.assertEqual(0, self.count_test_records())
cnn.tpc_rollback() cnn.tpc_rollback()
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(cnn.status, ext.STATUS_READY)
self.assertEqual(0, self.count_xacts()) self.assertEqual(0, self.count_xacts())
self.assertEqual(0, self.count_test_records()) self.assertEqual(0, self.count_test_records())
def test_tpc_rollback_one_phase(self): def test_tpc_rollback_one_phase(self):
cnn = self.connect() cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual") xid = cnn.xid(1, "gtrid", "bqual")
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(cnn.status, ext.STATUS_READY)
cnn.tpc_begin(xid) cnn.tpc_begin(xid)
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_BEGIN) self.assertEqual(cnn.status, ext.STATUS_BEGIN)
cur = cnn.cursor() cur = cnn.cursor()
cur.execute("insert into test_tpc values ('test_tpc_rollback_1p');") cur.execute("insert into test_tpc values ('test_tpc_rollback_1p');")
@ -817,17 +959,17 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
self.assertEqual(0, self.count_test_records()) self.assertEqual(0, self.count_test_records())
cnn.tpc_rollback() cnn.tpc_rollback()
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(cnn.status, ext.STATUS_READY)
self.assertEqual(0, self.count_xacts()) self.assertEqual(0, self.count_xacts())
self.assertEqual(0, self.count_test_records()) self.assertEqual(0, self.count_test_records())
def test_tpc_rollback_recovered(self): def test_tpc_rollback_recovered(self):
cnn = self.connect() cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual") xid = cnn.xid(1, "gtrid", "bqual")
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(cnn.status, ext.STATUS_READY)
cnn.tpc_begin(xid) cnn.tpc_begin(xid)
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_BEGIN) self.assertEqual(cnn.status, ext.STATUS_BEGIN)
cur = cnn.cursor() cur = cnn.cursor()
cur.execute("insert into test_tpc values ('test_tpc_commit_rec');") cur.execute("insert into test_tpc values ('test_tpc_commit_rec');")
@ -843,21 +985,21 @@ class ConnectionTwoPhaseTests(ConnectingTestCase):
xid = cnn.xid(1, "gtrid", "bqual") xid = cnn.xid(1, "gtrid", "bqual")
cnn.tpc_rollback(xid) cnn.tpc_rollback(xid)
self.assertEqual(cnn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(cnn.status, ext.STATUS_READY)
self.assertEqual(0, self.count_xacts()) self.assertEqual(0, self.count_xacts())
self.assertEqual(0, self.count_test_records()) self.assertEqual(0, self.count_test_records())
def test_status_after_recover(self): def test_status_after_recover(self):
cnn = self.connect() cnn = self.connect()
self.assertEqual(psycopg2.extensions.STATUS_READY, cnn.status) self.assertEqual(ext.STATUS_READY, cnn.status)
cnn.tpc_recover() cnn.tpc_recover()
self.assertEqual(psycopg2.extensions.STATUS_READY, cnn.status) self.assertEqual(ext.STATUS_READY, cnn.status)
cur = cnn.cursor() cur = cnn.cursor()
cur.execute("select 1") cur.execute("select 1")
self.assertEqual(psycopg2.extensions.STATUS_BEGIN, cnn.status) self.assertEqual(ext.STATUS_BEGIN, cnn.status)
cnn.tpc_recover() cnn.tpc_recover()
self.assertEqual(psycopg2.extensions.STATUS_BEGIN, cnn.status) self.assertEqual(ext.STATUS_BEGIN, cnn.status)
def test_recovered_xids(self): def test_recovered_xids(self):
# insert a few test xns # insert a few test xns
@ -1030,25 +1172,25 @@ class TransactionControlTests(ConnectingTestCase):
self.conn.close() self.conn.close()
self.assertRaises(psycopg2.InterfaceError, self.assertRaises(psycopg2.InterfaceError,
self.conn.set_session, self.conn.set_session,
psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) ext.ISOLATION_LEVEL_SERIALIZABLE)
def test_not_in_transaction(self): def test_not_in_transaction(self):
cur = self.conn.cursor() cur = self.conn.cursor()
cur.execute("select 1") cur.execute("select 1")
self.assertRaises(psycopg2.ProgrammingError, self.assertRaises(psycopg2.ProgrammingError,
self.conn.set_session, self.conn.set_session,
psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) ext.ISOLATION_LEVEL_SERIALIZABLE)
def test_set_isolation_level(self): def test_set_isolation_level(self):
cur = self.conn.cursor() cur = self.conn.cursor()
self.conn.set_session( self.conn.set_session(
psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE) ext.ISOLATION_LEVEL_SERIALIZABLE)
cur.execute("SHOW transaction_isolation;") cur.execute("SHOW transaction_isolation;")
self.assertEqual(cur.fetchone()[0], 'serializable') self.assertEqual(cur.fetchone()[0], 'serializable')
self.conn.rollback() self.conn.rollback()
self.conn.set_session( self.conn.set_session(
psycopg2.extensions.ISOLATION_LEVEL_REPEATABLE_READ) ext.ISOLATION_LEVEL_REPEATABLE_READ)
cur.execute("SHOW transaction_isolation;") cur.execute("SHOW transaction_isolation;")
if self.conn.server_version > 80000: if self.conn.server_version > 80000:
self.assertEqual(cur.fetchone()[0], 'repeatable read') self.assertEqual(cur.fetchone()[0], 'repeatable read')
@ -1057,13 +1199,13 @@ class TransactionControlTests(ConnectingTestCase):
self.conn.rollback() self.conn.rollback()
self.conn.set_session( self.conn.set_session(
isolation_level=psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED) isolation_level=ext.ISOLATION_LEVEL_READ_COMMITTED)
cur.execute("SHOW transaction_isolation;") cur.execute("SHOW transaction_isolation;")
self.assertEqual(cur.fetchone()[0], 'read committed') self.assertEqual(cur.fetchone()[0], 'read committed')
self.conn.rollback() self.conn.rollback()
self.conn.set_session( self.conn.set_session(
isolation_level=psycopg2.extensions.ISOLATION_LEVEL_READ_UNCOMMITTED) isolation_level=ext.ISOLATION_LEVEL_READ_UNCOMMITTED)
cur.execute("SHOW transaction_isolation;") cur.execute("SHOW transaction_isolation;")
if self.conn.server_version > 80000: if self.conn.server_version > 80000:
self.assertEqual(cur.fetchone()[0], 'read uncommitted') self.assertEqual(cur.fetchone()[0], 'read uncommitted')
@ -1105,22 +1247,47 @@ class TransactionControlTests(ConnectingTestCase):
self.assertRaises(ValueError, self.conn.set_session, 'whatever') self.assertRaises(ValueError, self.conn.set_session, 'whatever')
def test_set_read_only(self): def test_set_read_only(self):
cur = self.conn.cursor() self.assert_(self.conn.readonly is None)
self.conn.set_session(readonly=True)
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
self.conn.rollback()
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
self.conn.rollback()
cur = self.conn.cursor() cur = self.conn.cursor()
self.conn.set_session(readonly=None) self.conn.set_session(readonly=True)
self.assert_(self.conn.readonly is True)
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
self.conn.rollback()
cur.execute("SHOW transaction_read_only;") cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on') self.assertEqual(cur.fetchone()[0], 'on')
self.conn.rollback() self.conn.rollback()
self.conn.set_session(readonly=False) self.conn.set_session(readonly=False)
self.assert_(self.conn.readonly is False)
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'off')
self.conn.rollback()
def test_setattr_read_only(self):
cur = self.conn.cursor()
self.conn.readonly = True
self.assert_(self.conn.readonly is True)
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
self.assertRaises(self.conn.ProgrammingError,
setattr, self.conn, 'readonly', False)
self.assert_(self.conn.readonly is True)
self.conn.rollback()
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
self.conn.rollback()
cur = self.conn.cursor()
self.conn.readonly = None
self.assert_(self.conn.readonly is None)
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'off') # assume defined by server
self.conn.rollback()
self.conn.readonly = False
self.assert_(self.conn.readonly is False)
cur.execute("SHOW transaction_read_only;") cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'off') self.assertEqual(cur.fetchone()[0], 'off')
self.conn.rollback() self.conn.rollback()
@ -1143,8 +1310,10 @@ class TransactionControlTests(ConnectingTestCase):
@skip_before_postgres(9, 1) @skip_before_postgres(9, 1)
def test_set_deferrable(self): def test_set_deferrable(self):
self.assert_(self.conn.deferrable is None)
cur = self.conn.cursor() cur = self.conn.cursor()
self.conn.set_session(readonly=True, deferrable=True) self.conn.set_session(readonly=True, deferrable=True)
self.assert_(self.conn.deferrable is True)
cur.execute("SHOW transaction_read_only;") cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on') self.assertEqual(cur.fetchone()[0], 'on')
cur.execute("SHOW transaction_deferrable;") cur.execute("SHOW transaction_deferrable;")
@ -1155,6 +1324,7 @@ class TransactionControlTests(ConnectingTestCase):
self.conn.rollback() self.conn.rollback()
self.conn.set_session(deferrable=False) self.conn.set_session(deferrable=False)
self.assert_(self.conn.deferrable is False)
cur.execute("SHOW transaction_read_only;") cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on') self.assertEqual(cur.fetchone()[0], 'on')
cur.execute("SHOW transaction_deferrable;") cur.execute("SHOW transaction_deferrable;")
@ -1165,6 +1335,54 @@ class TransactionControlTests(ConnectingTestCase):
def test_set_deferrable_error(self): def test_set_deferrable_error(self):
self.assertRaises(psycopg2.ProgrammingError, self.assertRaises(psycopg2.ProgrammingError,
self.conn.set_session, readonly=True, deferrable=True) self.conn.set_session, readonly=True, deferrable=True)
self.assertRaises(psycopg2.ProgrammingError,
setattr, self.conn, 'deferrable', True)
@skip_before_postgres(9, 1)
def test_setattr_deferrable(self):
cur = self.conn.cursor()
self.conn.deferrable = True
self.assert_(self.conn.deferrable is True)
cur.execute("SHOW transaction_deferrable;")
self.assertEqual(cur.fetchone()[0], 'on')
self.assertRaises(self.conn.ProgrammingError,
setattr, self.conn, 'deferrable', False)
self.assert_(self.conn.deferrable is True)
self.conn.rollback()
cur.execute("SHOW transaction_deferrable;")
self.assertEqual(cur.fetchone()[0], 'on')
self.conn.rollback()
cur = self.conn.cursor()
self.conn.deferrable = None
self.assert_(self.conn.deferrable is None)
cur.execute("SHOW transaction_deferrable;")
self.assertEqual(cur.fetchone()[0], 'off') # assume defined by server
self.conn.rollback()
self.conn.deferrable = False
self.assert_(self.conn.deferrable is False)
cur.execute("SHOW transaction_deferrable;")
self.assertEqual(cur.fetchone()[0], 'off')
self.conn.rollback()
def test_mixing_session_attribs(self):
cur = self.conn.cursor()
self.conn.autocommit = True
self.conn.readonly = True
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
cur.execute("SHOW default_transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
self.conn.autocommit = False
cur.execute("SHOW transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'on')
cur.execute("SHOW default_transaction_read_only;")
self.assertEqual(cur.fetchone()[0], 'off')
class AutocommitTests(ConnectingTestCase): class AutocommitTests(ConnectingTestCase):
@ -1183,44 +1401,44 @@ class AutocommitTests(ConnectingTestCase):
def test_default_no_autocommit(self): def test_default_no_autocommit(self):
self.assert_(not self.conn.autocommit) self.assert_(not self.conn.autocommit)
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(), self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_IDLE) ext.TRANSACTION_STATUS_IDLE)
cur = self.conn.cursor() cur = self.conn.cursor()
cur.execute('select 1;') cur.execute('select 1;')
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_BEGIN) self.assertEqual(self.conn.status, ext.STATUS_BEGIN)
self.assertEqual(self.conn.get_transaction_status(), self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_INTRANS) ext.TRANSACTION_STATUS_INTRANS)
self.conn.rollback() self.conn.rollback()
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(), self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_IDLE) ext.TRANSACTION_STATUS_IDLE)
def test_set_autocommit(self): def test_set_autocommit(self):
self.conn.autocommit = True self.conn.autocommit = True
self.assert_(self.conn.autocommit) self.assert_(self.conn.autocommit)
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(), self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_IDLE) ext.TRANSACTION_STATUS_IDLE)
cur = self.conn.cursor() cur = self.conn.cursor()
cur.execute('select 1;') cur.execute('select 1;')
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(), self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_IDLE) ext.TRANSACTION_STATUS_IDLE)
self.conn.autocommit = False self.conn.autocommit = False
self.assert_(not self.conn.autocommit) self.assert_(not self.conn.autocommit)
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(), self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_IDLE) ext.TRANSACTION_STATUS_IDLE)
cur.execute('select 1;') cur.execute('select 1;')
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_BEGIN) self.assertEqual(self.conn.status, ext.STATUS_BEGIN)
self.assertEqual(self.conn.get_transaction_status(), self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_INTRANS) ext.TRANSACTION_STATUS_INTRANS)
def test_set_intrans_error(self): def test_set_intrans_error(self):
cur = self.conn.cursor() cur = self.conn.cursor()
@ -1231,34 +1449,34 @@ class AutocommitTests(ConnectingTestCase):
def test_set_session_autocommit(self): def test_set_session_autocommit(self):
self.conn.set_session(autocommit=True) self.conn.set_session(autocommit=True)
self.assert_(self.conn.autocommit) self.assert_(self.conn.autocommit)
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(), self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_IDLE) ext.TRANSACTION_STATUS_IDLE)
cur = self.conn.cursor() cur = self.conn.cursor()
cur.execute('select 1;') cur.execute('select 1;')
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(), self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_IDLE) ext.TRANSACTION_STATUS_IDLE)
self.conn.set_session(autocommit=False) self.conn.set_session(autocommit=False)
self.assert_(not self.conn.autocommit) self.assert_(not self.conn.autocommit)
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(), self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_IDLE) ext.TRANSACTION_STATUS_IDLE)
cur.execute('select 1;') cur.execute('select 1;')
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_BEGIN) self.assertEqual(self.conn.status, ext.STATUS_BEGIN)
self.assertEqual(self.conn.get_transaction_status(), self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_INTRANS) ext.TRANSACTION_STATUS_INTRANS)
self.conn.rollback() self.conn.rollback()
self.conn.set_session('serializable', readonly=True, autocommit=True) self.conn.set_session('serializable', readonly=True, autocommit=True)
self.assert_(self.conn.autocommit) self.assert_(self.conn.autocommit)
cur.execute('select 1;') cur.execute('select 1;')
self.assertEqual(self.conn.status, psycopg2.extensions.STATUS_READY) self.assertEqual(self.conn.status, ext.STATUS_READY)
self.assertEqual(self.conn.get_transaction_status(), self.assertEqual(self.conn.get_transaction_status(),
psycopg2.extensions.TRANSACTION_STATUS_IDLE) ext.TRANSACTION_STATUS_IDLE)
cur.execute("SHOW transaction_isolation;") cur.execute("SHOW transaction_isolation;")
self.assertEqual(cur.fetchone()[0], 'serializable') self.assertEqual(cur.fetchone()[0], 'serializable')
cur.execute("SHOW transaction_read_only;") cur.execute("SHOW transaction_read_only;")