diff --git a/doc/src/connection.rst b/doc/src/connection.rst index 22241fed..153fea5d 100644 --- a/doc/src/connection.rst +++ b/doc/src/connection.rst @@ -264,6 +264,26 @@ The ``connection`` class (0) or closed (1). + .. method:: cancel + + Cancel the current database operation. + + The method interrupts the processing of the current operation. If no + query is being executed, it does nothing. You can call this function + from a different thread than the one currently executing a database + operation, for instance if you want to cancel a long running query if a + button is pushed in the UI. Interrupting query execution will cause the + cancelled method to raise a + `~psycopg2.extensions.QueryCanceledError`. Note that the termination + of the query is not guaranteed to succeed: see the documentation for + |PQcancel|_. + + .. |PQcancel| replace:: `!PQcancel()` + .. _PQcancel: http://www.postgresql.org/docs/8.4/static/libpq-cancel.html#AEN34765 + + .. versionadded:: 2.2.3 + + .. method:: reset Reset the connection to the default. diff --git a/psycopg/connection.h b/psycopg/connection.h index 96536876..76a6a093 100644 --- a/psycopg/connection.h +++ b/psycopg/connection.h @@ -101,6 +101,7 @@ typedef struct { int server_version; /* server version */ PGconn *pgconn; /* the postgresql connection */ + PGcancel *cancel; /* the cancellation structure */ PyObject *async_cursor; /* a cursor executing an asynchronous query */ int async_status; /* asynchronous execution status */ diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c index f98e4fdc..73292b88 100644 --- a/psycopg/connection_int.c +++ b/psycopg/connection_int.c @@ -282,6 +282,12 @@ conn_get_server_version(PGconn *pgconn) return (int)PQserverVersion(pgconn); } +PGcancel * +conn_get_cancel(PGconn *pgconn) +{ + return PQgetCancel(pgconn); +} + /* Return 1 if the server datestyle allows us to work without problems, 0 if it needs to be set to something better, e.g. ISO. */ @@ -320,6 +326,12 @@ conn_setup(connectionObject *self, PGconn *pgconn) return -1; } + self->cancel = conn_get_cancel(self->pgconn); + if (self->cancel == NULL) { + PyErr_SetString(OperationalError, "can't get cancellation key"); + return -1; + } + Py_BEGIN_ALLOW_THREADS; pthread_mutex_lock(&self->lock); Py_BLOCK_THREADS; @@ -645,6 +657,11 @@ _conn_poll_setup_async(connectionObject *self) if (self->encoding == NULL) { break; } + self->cancel = conn_get_cancel(self->pgconn); + if (self->cancel == NULL) { + PyErr_SetString(OperationalError, "can't get cancellation key"); + break; + } /* asynchronous connections always use isolation level 0, the user is * expected to manage the transactions himself, by sending @@ -782,8 +799,10 @@ conn_close(connectionObject *self) if (self->pgconn) { PQfinish(self->pgconn); + PQfreeCancel(self->cancel); Dprintf("conn_close: PQfinish called"); self->pgconn = NULL; + self->cancel = NULL; } pthread_mutex_unlock(&self->lock); diff --git a/psycopg/connection_type.c b/psycopg/connection_type.c index 19945ce4..e27b3120 100644 --- a/psycopg/connection_type.c +++ b/psycopg/connection_type.c @@ -697,6 +697,37 @@ psyco_conn_isexecuting(connectionObject *self) return Py_False; } + +/* extension: cancel - cancel the current operation */ + +#define psyco_conn_cancel_doc \ +"cancel() -- cancel the current operation" + +static PyObject * +psyco_conn_cancel(connectionObject *self) +{ + char errbuf[256]; + + EXC_IF_CONN_CLOSED(self); + + /* do not allow cancellation while the connection is being built */ + Dprintf("psyco_conn_cancel: cancelling with key %p", self->cancel); + if (self->status != CONN_STATUS_READY && + self->status != CONN_STATUS_BEGIN) { + PyErr_SetString(OperationalError, + "asynchronous connection attempt underway"); + return NULL; + } + + if (PQcancel(self->cancel, errbuf, sizeof(errbuf)) == 0) { + Dprintf("psyco_conn_cancel: cancelling failed: %s", errbuf); + PyErr_SetString(OperationalError, errbuf); + return NULL; + } + Py_INCREF(Py_None); + return Py_None; +} + #endif /* PSYCOPG_EXTENSIONS */ @@ -747,6 +778,8 @@ static struct PyMethodDef connectionObject_methods[] = { METH_NOARGS, psyco_conn_fileno_doc}, {"isexecuting", (PyCFunction)psyco_conn_isexecuting, METH_NOARGS, psyco_conn_isexecuting_doc}, + {"cancel", (PyCFunction)psyco_conn_cancel, + METH_NOARGS, psyco_conn_cancel_doc}, #endif {NULL} }; @@ -827,6 +860,7 @@ connection_setup(connectionObject *self, const char *dsn, long int async) self->async_cursor = NULL; self->async_status = ASYNC_DONE; self->pgconn = NULL; + self->cancel = NULL; self->mark = 0; self->string_types = PyDict_New(); self->binary_types = PyDict_New(); diff --git a/tests/__init__.py b/tests/__init__.py index 23193413..4ee01ff1 100755 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -46,6 +46,7 @@ import test_copy import test_notify import test_async import test_green +import test_cancel def test_suite(): suite = unittest.TestSuite() @@ -64,6 +65,7 @@ def test_suite(): suite.addTest(test_notify.test_suite()) suite.addTest(test_async.test_suite()) suite.addTest(test_green.test_suite()) + suite.addTest(test_cancel.test_suite()) return suite if __name__ == '__main__': diff --git a/tests/test_cancel.py b/tests/test_cancel.py new file mode 100644 index 00000000..746b211c --- /dev/null +++ b/tests/test_cancel.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python + +import time +import threading +import unittest + +import tests +import psycopg2 +import psycopg2.extensions +from psycopg2 import extras + + +class CancelTests(unittest.TestCase): + + def setUp(self): + self.conn = psycopg2.connect(tests.dsn) + cur = self.conn.cursor() + cur.execute(''' + CREATE TEMPORARY TABLE table1 ( + id int PRIMARY KEY + )''') + self.conn.commit() + + def tearDown(self): + self.conn.close() + + def test_empty_cancel(self): + self.conn.cancel() + + def test_cancel(self): + errors = [] + + def neverending(conn): + cur = conn.cursor() + try: + self.assertRaises(psycopg2.extensions.QueryCanceledError, + cur.execute, "select pg_sleep(10000)") + # make sure the connection still works + conn.rollback() + cur.execute("select 1") + self.assertEqual(cur.fetchall(), [(1, )]) + except Exception, e: + errors.append(e) + raise + + def canceller(conn): + cur = conn.cursor() + try: + conn.cancel() + except Exception, e: + errors.append(e) + raise + + thread1 = threading.Thread(target=neverending, args=(self.conn, )) + # wait a bit to make sure that the other thread is already in + # pg_sleep -- ugly and racy, but the chances are ridiculously low + thread2 = threading.Timer(0.3, canceller, args=(self.conn, )) + thread1.start() + thread2.start() + thread1.join() + thread2.join() + + self.assertEqual(errors, []) + + def test_async_cancel(self): + async_conn = psycopg2.connect(tests.dsn, async=True) + self.assertRaises(psycopg2.OperationalError, async_conn.cancel) + extras.wait_select(async_conn) + cur = async_conn.cursor() + cur.execute("select pg_sleep(10000)") + self.assertTrue(async_conn.isexecuting()) + async_conn.cancel() + self.assertRaises(psycopg2.extensions.QueryCanceledError, + extras.wait_select, async_conn) + cur.execute("select 1") + extras.wait_select(async_conn) + self.assertEqual(cur.fetchall(), [(1, )]) + + def test_async_connection_cancel(self): + async_conn = psycopg2.connect(tests.dsn, async=True) + async_conn.close() + self.assertTrue(async_conn.closed) + + +def test_suite(): + return unittest.TestLoader().loadTestsFromName(__name__) + +if __name__ == "__main__": + unittest.main()