Support query cancellation.

Add a cancel() method do the connection object that will interrupt
the current query using the libpq PQcancel() function.
This commit is contained in:
Jan Urbański 2010-07-24 23:01:27 +02:00 committed by Daniele Varrazzo
parent 9f78141532
commit 751bfa1ea6
6 changed files with 165 additions and 0 deletions

View File

@ -264,6 +264,26 @@ The ``connection`` class
(0) or closed (1).
.. method:: cancel
Cancel the current database operation.
The method interrupts the processing of the current operation. If no
query is being executed, it does nothing. You can call this function
from a different thread than the one currently executing a database
operation, for instance if you want to cancel a long running query if a
button is pushed in the UI. Interrupting query execution will cause the
cancelled method to raise a
`~psycopg2.extensions.QueryCanceledError`. Note that the termination
of the query is not guaranteed to succeed: see the documentation for
|PQcancel|_.
.. |PQcancel| replace:: `!PQcancel()`
.. _PQcancel: http://www.postgresql.org/docs/8.4/static/libpq-cancel.html#AEN34765
.. versionadded:: 2.2.3
.. method:: reset
Reset the connection to the default.

View File

@ -101,6 +101,7 @@ typedef struct {
int server_version; /* server version */
PGconn *pgconn; /* the postgresql connection */
PGcancel *cancel; /* the cancellation structure */
PyObject *async_cursor; /* a cursor executing an asynchronous query */
int async_status; /* asynchronous execution status */

View File

@ -282,6 +282,12 @@ conn_get_server_version(PGconn *pgconn)
return (int)PQserverVersion(pgconn);
}
PGcancel *
conn_get_cancel(PGconn *pgconn)
{
return PQgetCancel(pgconn);
}
/* Return 1 if the server datestyle allows us to work without problems,
0 if it needs to be set to something better, e.g. ISO. */
@ -320,6 +326,12 @@ conn_setup(connectionObject *self, PGconn *pgconn)
return -1;
}
self->cancel = conn_get_cancel(self->pgconn);
if (self->cancel == NULL) {
PyErr_SetString(OperationalError, "can't get cancellation key");
return -1;
}
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&self->lock);
Py_BLOCK_THREADS;
@ -645,6 +657,11 @@ _conn_poll_setup_async(connectionObject *self)
if (self->encoding == NULL) {
break;
}
self->cancel = conn_get_cancel(self->pgconn);
if (self->cancel == NULL) {
PyErr_SetString(OperationalError, "can't get cancellation key");
break;
}
/* asynchronous connections always use isolation level 0, the user is
* expected to manage the transactions himself, by sending
@ -782,8 +799,10 @@ conn_close(connectionObject *self)
if (self->pgconn) {
PQfinish(self->pgconn);
PQfreeCancel(self->cancel);
Dprintf("conn_close: PQfinish called");
self->pgconn = NULL;
self->cancel = NULL;
}
pthread_mutex_unlock(&self->lock);

View File

@ -697,6 +697,37 @@ psyco_conn_isexecuting(connectionObject *self)
return Py_False;
}
/* extension: cancel - cancel the current operation */
#define psyco_conn_cancel_doc \
"cancel() -- cancel the current operation"
static PyObject *
psyco_conn_cancel(connectionObject *self)
{
char errbuf[256];
EXC_IF_CONN_CLOSED(self);
/* do not allow cancellation while the connection is being built */
Dprintf("psyco_conn_cancel: cancelling with key %p", self->cancel);
if (self->status != CONN_STATUS_READY &&
self->status != CONN_STATUS_BEGIN) {
PyErr_SetString(OperationalError,
"asynchronous connection attempt underway");
return NULL;
}
if (PQcancel(self->cancel, errbuf, sizeof(errbuf)) == 0) {
Dprintf("psyco_conn_cancel: cancelling failed: %s", errbuf);
PyErr_SetString(OperationalError, errbuf);
return NULL;
}
Py_INCREF(Py_None);
return Py_None;
}
#endif /* PSYCOPG_EXTENSIONS */
@ -747,6 +778,8 @@ static struct PyMethodDef connectionObject_methods[] = {
METH_NOARGS, psyco_conn_fileno_doc},
{"isexecuting", (PyCFunction)psyco_conn_isexecuting,
METH_NOARGS, psyco_conn_isexecuting_doc},
{"cancel", (PyCFunction)psyco_conn_cancel,
METH_NOARGS, psyco_conn_cancel_doc},
#endif
{NULL}
};
@ -827,6 +860,7 @@ connection_setup(connectionObject *self, const char *dsn, long int async)
self->async_cursor = NULL;
self->async_status = ASYNC_DONE;
self->pgconn = NULL;
self->cancel = NULL;
self->mark = 0;
self->string_types = PyDict_New();
self->binary_types = PyDict_New();

View File

@ -46,6 +46,7 @@ import test_copy
import test_notify
import test_async
import test_green
import test_cancel
def test_suite():
suite = unittest.TestSuite()
@ -64,6 +65,7 @@ def test_suite():
suite.addTest(test_notify.test_suite())
suite.addTest(test_async.test_suite())
suite.addTest(test_green.test_suite())
suite.addTest(test_cancel.test_suite())
return suite
if __name__ == '__main__':

89
tests/test_cancel.py Normal file
View File

@ -0,0 +1,89 @@
#!/usr/bin/env python
import time
import threading
import unittest
import tests
import psycopg2
import psycopg2.extensions
from psycopg2 import extras
class CancelTests(unittest.TestCase):
def setUp(self):
self.conn = psycopg2.connect(tests.dsn)
cur = self.conn.cursor()
cur.execute('''
CREATE TEMPORARY TABLE table1 (
id int PRIMARY KEY
)''')
self.conn.commit()
def tearDown(self):
self.conn.close()
def test_empty_cancel(self):
self.conn.cancel()
def test_cancel(self):
errors = []
def neverending(conn):
cur = conn.cursor()
try:
self.assertRaises(psycopg2.extensions.QueryCanceledError,
cur.execute, "select pg_sleep(10000)")
# make sure the connection still works
conn.rollback()
cur.execute("select 1")
self.assertEqual(cur.fetchall(), [(1, )])
except Exception, e:
errors.append(e)
raise
def canceller(conn):
cur = conn.cursor()
try:
conn.cancel()
except Exception, e:
errors.append(e)
raise
thread1 = threading.Thread(target=neverending, args=(self.conn, ))
# wait a bit to make sure that the other thread is already in
# pg_sleep -- ugly and racy, but the chances are ridiculously low
thread2 = threading.Timer(0.3, canceller, args=(self.conn, ))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
self.assertEqual(errors, [])
def test_async_cancel(self):
async_conn = psycopg2.connect(tests.dsn, async=True)
self.assertRaises(psycopg2.OperationalError, async_conn.cancel)
extras.wait_select(async_conn)
cur = async_conn.cursor()
cur.execute("select pg_sleep(10000)")
self.assertTrue(async_conn.isexecuting())
async_conn.cancel()
self.assertRaises(psycopg2.extensions.QueryCanceledError,
extras.wait_select, async_conn)
cur.execute("select 1")
extras.wait_select(async_conn)
self.assertEqual(cur.fetchall(), [(1, )])
def test_async_connection_cancel(self):
async_conn = psycopg2.connect(tests.dsn, async=True)
async_conn.close()
self.assertTrue(async_conn.closed)
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)
if __name__ == "__main__":
unittest.main()