From f64cbeda4606d8af3ca063736f313f81506c56bb Mon Sep 17 00:00:00 2001 From: James Henstridge Date: Wed, 16 Jan 2008 05:14:24 +0000 Subject: [PATCH] * tests/test_transaction.py (DeadlockSerializationTestCase): port over some tests for serialisation and deadlock errors, demonstrating that TransactionRollbackError is generated. (QueryCancelationTests): add a test to show that QueryCanceledError is raised on statement timeouts. * psycopg2da/adapter.py (_handle_psycopg_exception): rather than checking exception messages, check for TransactionRollbackError. * psycopg/pqpath.c (exception_from_sqlstate): return TransactionRollbackError for 40xxx errors, and QueryCanceledError for 57014 errors. (pq_raise): If we are using an old server, use TransactionRollbackError if the error message contains "could not serialize" or "deadlock detected". * psycopg/psycopgmodule.c (_psyco_connect_fill_exc): remove function, since we no longer need to store pointers to the exceptions in the connection. This also fixes a reference leak. (psyco_connect): remove _psyco_connect_fill_exc() function call. * psycopg/connection.h (connectionObject): remove exception members from struct. * psycopg/connection_type.c (connectionObject_getsets): modify the exception attributes on the connection object from members to getsets. This reduces the size of the struct. * lib/extensions.py: import the two new extensions. * psycopg/psycopgmodule.c (exctable): add new QueryCanceledError and TransactionRollbackError exceptions. --- ChangeLog | 35 +++++++++ lib/extensions.py | 2 + psycopg/connection.h | 12 --- psycopg/connection_type.c | 55 +++++++------- psycopg/pqpath.c | 16 +++- psycopg/psycopg.h | 11 +++ psycopg/psycopgmodule.c | 47 +++++------- psycopg2da/adapter.py | 9 +-- tests/test_transaction.py | 153 +++++++++++++++++++++++++++++++++++++- 9 files changed, 263 insertions(+), 77 deletions(-) diff --git a/ChangeLog b/ChangeLog index bdbed8ab..c6e63f93 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,38 @@ +2008-01-16 James Henstridge + + * tests/test_transaction.py (DeadlockSerializationTestCase): port + over some tests for serialisation and deadlock errors, + demonstrating that TransactionRollbackError is generated. + (QueryCancelationTests): add a test to show that + QueryCanceledError is raised on statement timeouts. + + * psycopg2da/adapter.py (_handle_psycopg_exception): rather than + checking exception messages, check for TransactionRollbackError. + + * psycopg/pqpath.c (exception_from_sqlstate): return + TransactionRollbackError for 40xxx errors, and QueryCanceledError + for 57014 errors. + (pq_raise): If we are using an old server, use + TransactionRollbackError if the error message contains "could not + serialize" or "deadlock detected". + + * psycopg/psycopgmodule.c (_psyco_connect_fill_exc): remove + function, since we no longer need to store pointers to the + exceptions in the connection. This also fixes a reference leak. + (psyco_connect): remove _psyco_connect_fill_exc() function call. + + * psycopg/connection.h (connectionObject): remove exception + members from struct. + + * psycopg/connection_type.c (connectionObject_getsets): modify the + exception attributes on the connection object from members to + getsets. This reduces the size of the struct. + + * lib/extensions.py: import the two new extensions. + + * psycopg/psycopgmodule.c (exctable): add new QueryCanceledError + and TransactionRollbackError exceptions. + 2008-01-16 James Henstridge * tests/__init__.py (test_suite): add date tests to test suite. diff --git a/lib/extensions.py b/lib/extensions.py index fe613039..a2bc869e 100644 --- a/lib/extensions.py +++ b/lib/extensions.py @@ -42,6 +42,8 @@ from _psycopg import adapt, adapters, encodings, connection, cursor from _psycopg import string_types, binary_types, new_type, register_type from _psycopg import ISQLQuote +from _psycopg import QueryCanceledError, TransactionRollbackError + """Isolation level values.""" ISOLATION_LEVEL_AUTOCOMMIT = 0 ISOLATION_LEVEL_READ_COMMITTED = 1 diff --git a/psycopg/connection.h b/psycopg/connection.h index bc62381d..068c7943 100644 --- a/psycopg/connection.h +++ b/psycopg/connection.h @@ -67,18 +67,6 @@ typedef struct { /* notifies */ PyObject *notifies; - /* errors (DBAPI-2.0 extension) */ - PyObject *exc_Error; - PyObject *exc_Warning; - PyObject *exc_InterfaceError; - PyObject *exc_DatabaseError; - PyObject *exc_InternalError; - PyObject *exc_OperationalError; - PyObject *exc_ProgrammingError; - PyObject *exc_IntegrityError; - PyObject *exc_DataError; - PyObject *exc_NotSupportedError; - /* per-connection typecasters */ PyObject *string_types; /* a set of typecasters for string types */ PyObject *binary_types; /* a set of typecasters for binary types */ diff --git a/psycopg/connection_type.c b/psycopg/connection_type.c index 2d186618..92a06863 100644 --- a/psycopg/connection_type.c +++ b/psycopg/connection_type.c @@ -231,6 +231,14 @@ psyco_conn_get_transaction_status(connectionObject *self, PyObject *args) #endif +static PyObject * +psyco_conn_get_exception(PyObject *self, void *closure) +{ + PyObject *exception = *(PyObject **)closure; + + Py_INCREF(exception); + return exception; +} /** the connection object **/ @@ -260,32 +268,6 @@ static struct PyMethodDef connectionObject_methods[] = { /* object member list */ static struct PyMemberDef connectionObject_members[] = { - /* DBAPI-2.0 extensions (exception objects) */ - {"Error", T_OBJECT, - offsetof(connectionObject, exc_Error), RO, Error_doc}, - {"Warning", - T_OBJECT, offsetof(connectionObject, exc_Warning), RO, Warning_doc}, - {"InterfaceError", T_OBJECT, - offsetof(connectionObject, exc_InterfaceError), RO, - InterfaceError_doc}, - {"DatabaseError", T_OBJECT, - offsetof(connectionObject, exc_DatabaseError), RO, DatabaseError_doc}, - {"InternalError", T_OBJECT, - offsetof(connectionObject, exc_InternalError), RO, InternalError_doc}, - {"OperationalError", T_OBJECT, - offsetof(connectionObject, exc_OperationalError), RO, - OperationalError_doc}, - {"ProgrammingError", T_OBJECT, - offsetof(connectionObject, exc_ProgrammingError), RO, - ProgrammingError_doc}, - {"IntegrityError", T_OBJECT, - offsetof(connectionObject, exc_IntegrityError), RO, - IntegrityError_doc}, - {"DataError", T_OBJECT, - offsetof(connectionObject, exc_DataError), RO, DataError_doc}, - {"NotSupportedError", T_OBJECT, - offsetof(connectionObject, exc_NotSupportedError), RO, - NotSupportedError_doc}, #ifdef PSYCOPG_EXTENSIONS {"closed", T_LONG, offsetof(connectionObject, closed), RO, "True if the connection is closed."}, @@ -309,6 +291,25 @@ static struct PyMemberDef connectionObject_members[] = { {NULL} }; +#define EXCEPTION_GETTER(exc) \ + { #exc, psyco_conn_get_exception, NULL, exc ## _doc, &exc } + +static struct PyGetSetDef connectionObject_getsets[] = { + /* DBAPI-2.0 extensions (exception objects) */ + EXCEPTION_GETTER(Error), + EXCEPTION_GETTER(Warning), + EXCEPTION_GETTER(InterfaceError), + EXCEPTION_GETTER(DatabaseError), + EXCEPTION_GETTER(InternalError), + EXCEPTION_GETTER(OperationalError), + EXCEPTION_GETTER(ProgrammingError), + EXCEPTION_GETTER(IntegrityError), + EXCEPTION_GETTER(DataError), + EXCEPTION_GETTER(NotSupportedError), + {NULL} +}; +#undef EXCEPTION_GETTER + /* initialization and finalization methods */ static int @@ -465,7 +466,7 @@ PyTypeObject connectionType = { connectionObject_methods, /*tp_methods*/ connectionObject_members, /*tp_members*/ - 0, /*tp_getset*/ + connectionObject_getsets, /*tp_getset*/ 0, /*tp_base*/ 0, /*tp_dict*/ diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index a7f7a7c9..02f7be59 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -107,9 +107,13 @@ exception_from_sqlstate(const char *sqlstate) case '4': switch (sqlstate[1]) { case '0': /* Class 40 - Transaction Rollback */ +#ifdef PSYCOPG_EXTENSIONS + return TransactionRollbackError; +#else return OperationalError; +#endif case '2': /* Class 42 - Syntax Error or Access Rule Violation */ - case '4': /* Class 44 — WITH CHECK OPTION Violation */ + case '4': /* Class 44 - WITH CHECK OPTION Violation */ return ProgrammingError; } break; @@ -119,7 +123,12 @@ exception_from_sqlstate(const char *sqlstate) Class 55 - Object Not In Prerequisite State Class 57 - Operator Intervention Class 58 - System Error (errors external to PostgreSQL itself) */ - return OperationalError; +#ifdef PSYCOPG_EXTENSIONS + if (!strcmp(sqlstate, "57014")) + return QueryCanceledError; + else +#endif + return OperationalError; case 'F': /* Class F0 - Configuration File Error */ return InternalError; case 'P': /* Class P0 - PL/pgSQL Error */ @@ -188,6 +197,9 @@ pq_raise(connectionObject *conn, cursorObject *curs, PGresult *pgres, || !strncmp(err, "ERROR: ExecAppend: Fail to add null", 36) || strstr(err, "referential integrity violation")) exc = IntegrityError; + else if (strstr(err, "could not serialize") || + strstr(err, "deadlock detected")) + exc = TransactionRollbackError; else exc = ProgrammingError; } diff --git a/psycopg/psycopg.h b/psycopg/psycopg.h index f97fee14..e5eb70b9 100644 --- a/psycopg/psycopg.h +++ b/psycopg/psycopg.h @@ -85,6 +85,9 @@ extern psyco_errors_set_RETURN psyco_errors_set psyco_errors_set_PROTO; extern PyObject *Error, *Warning, *InterfaceError, *DatabaseError, *InternalError, *OperationalError, *ProgrammingError, *IntegrityError, *DataError, *NotSupportedError; +#ifdef PSYCOPG_EXTENSIONS +extern PyObject *QueryCanceledError, *TransactionRollbackError; +#endif /* python versions and compatibility stuff */ #ifndef PyMODINIT_FUNC @@ -167,6 +170,14 @@ extern void psyco_set_error(PyObject *exc, PyObject *curs, const char *msg, #define NotSupportedError_doc \ "A not supported datbase API was called." +#ifdef PSYCOPG_EXTENSIONS +#define QueryCanceledError_doc \ +"Error related to SQL query cancelation." + +#define TransactionRollbackError_doc \ +"Error causing transaction rollback (deadlocks, serialisation failures, etc)." +#endif + #ifdef __cplusplus } #endif diff --git a/psycopg/psycopgmodule.c b/psycopg/psycopgmodule.c index 26cc500b..a212ba4b 100644 --- a/psycopg/psycopgmodule.c +++ b/psycopg/psycopgmodule.c @@ -100,32 +100,6 @@ _psyco_connect_fill_dsn(char *dsn, char *kw, char *v, size_t i) return i; } -static void -_psyco_connect_fill_exc(connectionObject *conn) -{ - /* fill the connection object with the exceptions */ - conn->exc_Error = Error; - Py_INCREF(Error); - conn->exc_Warning = Warning; - Py_INCREF(Warning); - conn->exc_InterfaceError = InterfaceError; - Py_INCREF(InterfaceError); - conn->exc_DatabaseError = DatabaseError; - Py_INCREF(DatabaseError); - conn->exc_InternalError = InternalError; - Py_INCREF(InternalError); - conn->exc_ProgrammingError = ProgrammingError; - Py_INCREF(ProgrammingError); - conn->exc_IntegrityError = IntegrityError; - Py_INCREF(IntegrityError); - conn->exc_DataError = DataError; - Py_INCREF(DataError); - conn->exc_NotSupportedError = NotSupportedError; - Py_INCREF(NotSupportedError); - conn->exc_OperationalError = OperationalError; - Py_INCREF(OperationalError); -} - static PyObject * psyco_connect(PyObject *self, PyObject *args, PyObject *keywds) { @@ -215,7 +189,6 @@ psyco_connect(PyObject *self, PyObject *args, PyObject *keywds) /* allocate connection, fill with errors and return it */ if (factory == NULL) factory = (PyObject *)&connectionType; conn = PyObject_CallFunction(factory, "s", dsn); - if (conn) _psyco_connect_fill_exc((connectionObject*)conn); } goto cleanup; @@ -433,6 +406,9 @@ static void psyco_encodings_fill(PyObject *dict) PyObject *Error, *Warning, *InterfaceError, *DatabaseError, *InternalError, *OperationalError, *ProgrammingError, *IntegrityError, *DataError, *NotSupportedError; +#ifdef PSYCOPG_EXTENSIONS +PyObject *QueryCanceledError, *TransactionRollbackError; +#endif /* mapping between exception names and their PyObject */ static struct { @@ -455,6 +431,13 @@ static struct { { "psycopg2.DataError", &DataError, &DatabaseError, DataError_doc }, { "psycopg2.NotSupportedError", &NotSupportedError, &DatabaseError, NotSupportedError_doc }, +#ifdef PSYCOPG_EXTENSIONS + { "psycopg2.extensions.QueryCanceledError", &QueryCanceledError, + &OperationalError, OperationalError_doc }, + { "psycopg2.extensions.TransactionRollbackError", + &TransactionRollbackError, &OperationalError, + TransactionRollbackError_doc }, +#endif {NULL} /* Sentinel */ }; @@ -507,6 +490,11 @@ psyco_errors_fill(PyObject *dict) PyDict_SetItemString(dict, "IntegrityError", IntegrityError); PyDict_SetItemString(dict, "DataError", DataError); PyDict_SetItemString(dict, "NotSupportedError", NotSupportedError); +#ifdef PSYCOPG_EXTENSIONS + PyDict_SetItemString(dict, "QueryCanceledError", QueryCanceledError); + PyDict_SetItemString(dict, "TransactionRollbackError", + TransactionRollbackError); +#endif } void @@ -522,6 +510,11 @@ psyco_errors_set(PyObject *type) PyObject_SetAttrString(type, "IntegrityError", IntegrityError); PyObject_SetAttrString(type, "DataError", DataError); PyObject_SetAttrString(type, "NotSupportedError", NotSupportedError); +#ifdef PSYCOPG_EXTENSIONS + PyObject_SetAttrString(type, "QueryCanceledError", QueryCanceledError); + PyObject_SetAttrString(type, "TransactionRollbackError", + TransactionRollbackError); +#endif } /* psyco_error_new diff --git a/psycopg2da/adapter.py b/psycopg2da/adapter.py index dc796008..6c912060 100644 --- a/psycopg2da/adapter.py +++ b/psycopg2da/adapter.py @@ -372,14 +372,7 @@ def _handle_psycopg_exception(error): If we have a serialization exception or a deadlock, we should retry the transaction by raising a Retry exception. Otherwise, we reraise. """ - if not error.args: - raise - msg = error.args[0] - # These messages are from PostgreSQL 8.0. They may change between - # PostgreSQL releases - if so, the different messages should be added - # rather than the existing ones changed so this logic works with - # different versions. - if 'could not serialize' in msg or 'deadlock detected' in msg: + if isinstance(error, psycopg2.extensions.TransactionRollbackError): raise Retry(sys.exc_info()) raise diff --git a/tests/test_transaction.py b/tests/test_transaction.py index 81fe54bf..bd96ce49 100755 --- a/tests/test_transaction.py +++ b/tests/test_transaction.py @@ -1,6 +1,9 @@ #!/usr/bin/env python -import psycopg2 +import threading import unittest + +import psycopg2 +import psycopg2 import tests from psycopg2.extensions import ( @@ -69,6 +72,154 @@ class TransactionTestCase(unittest.TestCase): self.assertEqual(curs.fetchone()[0], 1) +class DeadlockSerializationTestCase(unittest.TestCase): + """Test deadlock and serialization failure errors.""" + + def connect(self): + conn = psycopg2.connect("dbname=%s" % tests.dbname) + conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE) + return conn + + def setUp(self): + self.conn = self.connect() + curs = self.conn.cursor() + # Drop table if it already exists + try: + curs.execute("DROP TABLE table1") + self.conn.commit() + except psycopg2.DatabaseError: + self.conn.rollback() + try: + curs.execute("DROP TABLE table2") + self.conn.commit() + except psycopg2.DatabaseError: + self.conn.rollback() + # Create sample data + curs.execute(""" + CREATE TABLE table1 ( + id int PRIMARY KEY, + name text) + """) + curs.execute("INSERT INTO table1 VALUES (1, 'hello')") + curs.execute("CREATE TABLE table2 (id int PRIMARY KEY)") + self.conn.commit() + + def tearDown(self): + curs = self.conn.cursor() + curs.execute("DROP TABLE table1") + curs.execute("DROP TABLE table2") + self.conn.commit() + self.conn.close() + + def test_deadlock(self): + self.thread1_error = self.thread2_error = None + step1 = threading.Event() + step2 = threading.Event() + + def task1(): + try: + conn = self.connect() + curs = conn.cursor() + curs.execute("LOCK table1 IN ACCESS EXCLUSIVE MODE") + step1.set() + step2.wait() + curs.execute("LOCK table2 IN ACCESS EXCLUSIVE MODE") + except psycopg2.DatabaseError, exc: + self.thread1_error = exc + step1.set() + conn.close() + def task2(): + try: + conn = self.connect() + curs = conn.cursor() + step1.wait() + curs.execute("LOCK table2 IN ACCESS EXCLUSIVE MODE") + step2.set() + curs.execute("LOCK table1 IN ACCESS EXCLUSIVE MODE") + except psycopg2.DatabaseError, exc: + self.thread2_error = exc + step2.set() + conn.close() + + # Run the threads in parallel. The "step1" and "step2" events + # ensure that the two transactions overlap. + thread1 = threading.Thread(target=task1) + thread2 = threading.Thread(target=task2) + thread1.start() + thread2.start() + thread1.join() + thread2.join() + + # Exactly one of the threads should have failed with + # TransactionRollbackError: + self.assertFalse(self.thread1_error and self.thread2_error) + error = self.thread1_error or self.thread2_error + self.assertTrue(isinstance( + error, psycopg2.extensions.TransactionRollbackError)) + + def test_serialisation_failure(self): + self.thread1_error = self.thread2_error = None + step1 = threading.Event() + step2 = threading.Event() + + def task1(): + try: + conn = self.connect() + curs = conn.cursor() + curs.execute("SELECT name FROM table1 WHERE id = 1") + curs.fetchall() + step1.set() + step2.wait() + curs.execute("UPDATE table1 SET name='task1' WHERE id = 1") + conn.commit() + except psycopg2.DatabaseError, exc: + self.thread1_error = exc + step1.set() + conn.close() + def task2(): + try: + conn = self.connect() + curs = conn.cursor() + step1.wait() + curs.execute("UPDATE table1 SET name='task2' WHERE id = 1") + conn.commit() + except psycopg2.DatabaseError, exc: + self.thread2_error = exc + step2.set() + conn.close() + + # Run the threads in parallel. The "step1" and "step2" events + # ensure that the two transactions overlap. + thread1 = threading.Thread(target=task1) + thread2 = threading.Thread(target=task2) + thread1.start() + thread2.start() + thread1.join() + thread2.join() + + # Exactly one of the threads should have failed with + # TransactionRollbackError: + self.assertFalse(self.thread1_error and self.thread2_error) + error = self.thread1_error or self.thread2_error + self.assertTrue(isinstance( + error, psycopg2.extensions.TransactionRollbackError)) + + +class QueryCancelationTests(unittest.TestCase): + """Tests for query cancelation.""" + + def setUp(self): + self.conn = psycopg2.connect("dbname=%s" % tests.dbname) + self.conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE) + + def test_statement_timeout(self): + curs = self.conn.cursor() + # Set a low statement timeout, then sleep for a longer period. + curs.execute('SET statement_timeout TO 10') + self.assertRaises(psycopg2.extensions.QueryCanceledError, + curs.execute, 'SELECT pg_sleep(50)') + + def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__)