diff --git a/psycopg/connection.h b/psycopg/connection.h index 94d756dd..6b74ca7b 100644 --- a/psycopg/connection.h +++ b/psycopg/connection.h @@ -132,6 +132,7 @@ HIDDEN int conn_switch_isolation_level(connectionObject *self, int level); HIDDEN int conn_set_client_encoding(connectionObject *self, const char *enc); HIDDEN PyObject *conn_poll_send(connectionObject *self); HIDDEN PyObject *conn_poll_fetch(connectionObject *self); +HIDDEN PyObject *conn_poll_ready(connectionObject *self); /* exception-raising macros */ #define EXC_IF_CONN_CLOSED(self) if ((self)->closed > 0) { \ diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c index a41d7541..81eae8ac 100644 --- a/psycopg/connection_int.c +++ b/psycopg/connection_int.c @@ -564,6 +564,44 @@ conn_poll_fetch(connectionObject *self) return PyInt_FromLong(ret); } +/* conn_poll_ready - handle connection polling when it is already open */ + +PyObject * +conn_poll_ready(connectionObject *self) +{ + int is_busy; + + /* if there is an asynchronous query underway, poll it */ + if (self->async_cursor != NULL) { + if (self->async_status == ASYNC_WRITE) { + return curs_poll_send((cursorObject *) self->async_cursor); + } + else { + /* this gets called both for ASYNC_READ and ASYNC_DONE, because + even if the async query is complete, we still might want to + check for NOTIFYs */ + return curs_poll_fetch((cursorObject *) self->async_cursor); + } + } + + /* otherwise just check for NOTIFYs */ + is_busy = pq_is_busy(self); + if (is_busy == -1) { + /* there was an error, raise the exception */ + return NULL; + } + else if (is_busy == 1) { + /* the connection is busy, tell the user to wait more */ + Dprintf("conn_poll_ready: returning %d", PSYCO_POLL_READ); + return PyInt_FromLong(PSYCO_POLL_READ); + } + else { + /* connection is idle */ + Dprintf("conn_poll_ready: returning %d", PSYCO_POLL_OK); + return PyInt_FromLong(PSYCO_POLL_OK); + } +} + /* conn_close - do anything needed to shut down the connection */ void diff --git a/psycopg/connection_type.c b/psycopg/connection_type.c index f7fe190f..432078fb 100644 --- a/psycopg/connection_type.c +++ b/psycopg/connection_type.c @@ -449,8 +449,11 @@ psyco_conn_poll(connectionObject *self) break; case CONN_STATUS_READY: - /* we have completed the connection setup */ - return PyInt_FromLong(PSYCO_POLL_OK); + case CONN_STATUS_BEGIN: + /* The connection is ready, but we might be in an asynchronous query, + or we just might want to check for NOTIFYs. For synchronous + connections the status might be BEGIN, not READY. */ + return conn_poll_ready(self); default: /* everything else is an error */ diff --git a/psycopg/cursor_type.c b/psycopg/cursor_type.c index b916477d..2d4751e4 100644 --- a/psycopg/cursor_type.c +++ b/psycopg/cursor_type.c @@ -1441,59 +1441,6 @@ psyco_curs_copy_expert(cursorObject *self, PyObject *args, PyObject *kwargs) return res; } -/* extension: fileno - return the file descripor of the connection */ - -#define psyco_curs_fileno_doc \ -"fileno() -> int -- Return file descriptor associated to database connection." - -static PyObject * -psyco_curs_fileno(cursorObject *self) -{ - long int socket; - - EXC_IF_CURS_CLOSED(self); - - Py_BEGIN_ALLOW_THREADS; - pthread_mutex_lock(&(self->conn->lock)); - socket = (long int)PQsocket(self->conn->pgconn); - pthread_mutex_unlock(&(self->conn->lock)); - Py_END_ALLOW_THREADS; - - return PyInt_FromLong(socket); -} - -/* extension: poll - return true if data from async execute is ready */ - -#define psyco_curs_poll_doc \ -"poll() -- return POLL_OK if the query has been fully processed, " \ - "POLL_READ if the query has been sent and the application should be " \ - "waiting for the result to arrive or POLL_WRITE is the query is still " \ - "being sent." - -static PyObject * -psyco_curs_poll(cursorObject *self) -{ - EXC_IF_CURS_CLOSED(self); - - if (self->conn->async_cursor != NULL && - self->conn->async_cursor != (PyObject *) self) { - PyErr_SetString(ProgrammingError, "poll with wrong cursor"); - return NULL; - } - - Dprintf("curs_poll: polling with status %d", self->conn->async_status); - - if (self->conn->async_status == ASYNC_WRITE) { - return curs_poll_send(self); - } - else { - /* this gets called both for ASYNC_READ and ASYNC_DONE, because even - if the async query is complete, we still might want to check for - NOTIFYs */ - return curs_poll_fetch(self); - } -} - /* extension: closed - return true if cursor is closed*/ #define psyco_curs_closed_doc \ @@ -1572,10 +1519,6 @@ static struct PyMethodDef cursorObject_methods[] = { #ifdef PSYCOPG_EXTENSIONS {"mogrify", (PyCFunction)psyco_curs_mogrify, METH_VARARGS|METH_KEYWORDS, psyco_curs_mogrify_doc}, - {"poll", (PyCFunction)psyco_curs_poll, - METH_VARARGS, psyco_curs_poll_doc}, - {"fileno", (PyCFunction)psyco_curs_fileno, - METH_NOARGS, psyco_curs_fileno_doc}, {"copy_from", (PyCFunction)psyco_curs_copy_from, METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_from_doc}, {"copy_to", (PyCFunction)psyco_curs_copy_to, diff --git a/tests/test_async.py b/tests/test_async.py index 91a53921..e1b50cbf 100755 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -4,6 +4,7 @@ import unittest import psycopg2 from psycopg2 import extensions +import time import select import StringIO @@ -48,7 +49,10 @@ class AsyncTests(unittest.TestCase): self.sync_conn.close() self.conn.close() - def wait(self, pollable): + def wait(self, cur_or_conn): + pollable = cur_or_conn + if not hasattr(pollable, 'poll'): + pollable = cur_or_conn.connection while True: state = pollable.poll() if state == psycopg2.extensions.POLL_OK: @@ -301,7 +305,7 @@ class AsyncTests(unittest.TestCase): curs = self.conn.cursor() for mb in 1, 5, 10, 20, 50: size = mb * 1024 * 1024 - stub = PollableStub(curs) + stub = PollableStub(self.conn) curs.execute("select %s;", ('x' * size,)) self.wait(stub) self.assertEqual(size, len(curs.fetchone()[0])) @@ -312,19 +316,33 @@ class AsyncTests(unittest.TestCase): def test_sync_poll(self): cur = self.sync_conn.cursor() - # polling a sync cursor works - cur.poll() + cur.execute("select 1") + # polling with a sync query works + cur.connection.poll() + self.assertEquals(cur.fetchone()[0], 1) - def test_async_poll_wrong_cursor(self): - cur1 = self.conn.cursor() - cur2 = self.conn.cursor() - cur1.execute("select 1") + def test_notify(self): + cur = self.conn.cursor() + sync_cur = self.sync_conn.cursor() - # polling a cursor that's not currently executing is an error - self.assertRaises(psycopg2.ProgrammingError, cur2.poll) + sync_cur.execute("listen test_notify") + self.sync_conn.commit() + cur.execute("notify test_notify") + self.wait(cur) - self.wait(cur1) - self.assertEquals(cur1.fetchone()[0], 1) + self.assertEquals(self.sync_conn.notifies, []) + + pid = self.conn.get_backend_pid() + for _ in range(5): + self.wait(self.sync_conn) + if not self.sync_conn.notifies: + time.sleep(0.5) + continue + self.assertEquals(len(self.sync_conn.notifies), 1) + self.assertEquals(self.sync_conn.notifies.pop(), + (pid, "test_notify")) + return + self.fail("No NOTIFY in 2.5 seconds") def test_async_fetch_wrong_cursor(self): cur1 = self.conn.cursor()