From 067161d5f317e43aba4de40e871127021bedf407 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jan=20Urba=C5=84ski?= Date: Sun, 18 Apr 2010 15:51:58 +0200 Subject: [PATCH] Remove fileno() and poll() from cursors Instead, the code should be using the fileno() and poll() methods of the cursor's connection. Handle the case when poll() is called on an already built connection as a request to poll the asynchronous query (if there is one) and get NOTIFY events. Update the tests to reflect that change, add a test for NOTIFY. --- psycopg/connection.h | 1 + psycopg/connection_int.c | 38 ++++++++++++++++++++++++++ psycopg/connection_type.c | 7 +++-- psycopg/cursor_type.c | 57 --------------------------------------- tests/test_async.py | 42 ++++++++++++++++++++--------- 5 files changed, 74 insertions(+), 71 deletions(-) diff --git a/psycopg/connection.h b/psycopg/connection.h index 94d756dd..6b74ca7b 100644 --- a/psycopg/connection.h +++ b/psycopg/connection.h @@ -132,6 +132,7 @@ HIDDEN int conn_switch_isolation_level(connectionObject *self, int level); HIDDEN int conn_set_client_encoding(connectionObject *self, const char *enc); HIDDEN PyObject *conn_poll_send(connectionObject *self); HIDDEN PyObject *conn_poll_fetch(connectionObject *self); +HIDDEN PyObject *conn_poll_ready(connectionObject *self); /* exception-raising macros */ #define EXC_IF_CONN_CLOSED(self) if ((self)->closed > 0) { \ diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c index a41d7541..81eae8ac 100644 --- a/psycopg/connection_int.c +++ b/psycopg/connection_int.c @@ -564,6 +564,44 @@ conn_poll_fetch(connectionObject *self) return PyInt_FromLong(ret); } +/* conn_poll_ready - handle connection polling when it is already open */ + +PyObject * +conn_poll_ready(connectionObject *self) +{ + int is_busy; + + /* if there is an asynchronous query underway, poll it */ + if (self->async_cursor != NULL) { + if (self->async_status == ASYNC_WRITE) { + return curs_poll_send((cursorObject *) self->async_cursor); + } + else { + /* this gets called both for ASYNC_READ and ASYNC_DONE, because + even if the async query is complete, we still might want to + check for NOTIFYs */ + return curs_poll_fetch((cursorObject *) self->async_cursor); + } + } + + /* otherwise just check for NOTIFYs */ + is_busy = pq_is_busy(self); + if (is_busy == -1) { + /* there was an error, raise the exception */ + return NULL; + } + else if (is_busy == 1) { + /* the connection is busy, tell the user to wait more */ + Dprintf("conn_poll_ready: returning %d", PSYCO_POLL_READ); + return PyInt_FromLong(PSYCO_POLL_READ); + } + else { + /* connection is idle */ + Dprintf("conn_poll_ready: returning %d", PSYCO_POLL_OK); + return PyInt_FromLong(PSYCO_POLL_OK); + } +} + /* conn_close - do anything needed to shut down the connection */ void diff --git a/psycopg/connection_type.c b/psycopg/connection_type.c index f7fe190f..432078fb 100644 --- a/psycopg/connection_type.c +++ b/psycopg/connection_type.c @@ -449,8 +449,11 @@ psyco_conn_poll(connectionObject *self) break; case CONN_STATUS_READY: - /* we have completed the connection setup */ - return PyInt_FromLong(PSYCO_POLL_OK); + case CONN_STATUS_BEGIN: + /* The connection is ready, but we might be in an asynchronous query, + or we just might want to check for NOTIFYs. For synchronous + connections the status might be BEGIN, not READY. */ + return conn_poll_ready(self); default: /* everything else is an error */ diff --git a/psycopg/cursor_type.c b/psycopg/cursor_type.c index b916477d..2d4751e4 100644 --- a/psycopg/cursor_type.c +++ b/psycopg/cursor_type.c @@ -1441,59 +1441,6 @@ psyco_curs_copy_expert(cursorObject *self, PyObject *args, PyObject *kwargs) return res; } -/* extension: fileno - return the file descripor of the connection */ - -#define psyco_curs_fileno_doc \ -"fileno() -> int -- Return file descriptor associated to database connection." - -static PyObject * -psyco_curs_fileno(cursorObject *self) -{ - long int socket; - - EXC_IF_CURS_CLOSED(self); - - Py_BEGIN_ALLOW_THREADS; - pthread_mutex_lock(&(self->conn->lock)); - socket = (long int)PQsocket(self->conn->pgconn); - pthread_mutex_unlock(&(self->conn->lock)); - Py_END_ALLOW_THREADS; - - return PyInt_FromLong(socket); -} - -/* extension: poll - return true if data from async execute is ready */ - -#define psyco_curs_poll_doc \ -"poll() -- return POLL_OK if the query has been fully processed, " \ - "POLL_READ if the query has been sent and the application should be " \ - "waiting for the result to arrive or POLL_WRITE is the query is still " \ - "being sent." - -static PyObject * -psyco_curs_poll(cursorObject *self) -{ - EXC_IF_CURS_CLOSED(self); - - if (self->conn->async_cursor != NULL && - self->conn->async_cursor != (PyObject *) self) { - PyErr_SetString(ProgrammingError, "poll with wrong cursor"); - return NULL; - } - - Dprintf("curs_poll: polling with status %d", self->conn->async_status); - - if (self->conn->async_status == ASYNC_WRITE) { - return curs_poll_send(self); - } - else { - /* this gets called both for ASYNC_READ and ASYNC_DONE, because even - if the async query is complete, we still might want to check for - NOTIFYs */ - return curs_poll_fetch(self); - } -} - /* extension: closed - return true if cursor is closed*/ #define psyco_curs_closed_doc \ @@ -1572,10 +1519,6 @@ static struct PyMethodDef cursorObject_methods[] = { #ifdef PSYCOPG_EXTENSIONS {"mogrify", (PyCFunction)psyco_curs_mogrify, METH_VARARGS|METH_KEYWORDS, psyco_curs_mogrify_doc}, - {"poll", (PyCFunction)psyco_curs_poll, - METH_VARARGS, psyco_curs_poll_doc}, - {"fileno", (PyCFunction)psyco_curs_fileno, - METH_NOARGS, psyco_curs_fileno_doc}, {"copy_from", (PyCFunction)psyco_curs_copy_from, METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_from_doc}, {"copy_to", (PyCFunction)psyco_curs_copy_to, diff --git a/tests/test_async.py b/tests/test_async.py index 91a53921..e1b50cbf 100755 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -4,6 +4,7 @@ import unittest import psycopg2 from psycopg2 import extensions +import time import select import StringIO @@ -48,7 +49,10 @@ class AsyncTests(unittest.TestCase): self.sync_conn.close() self.conn.close() - def wait(self, pollable): + def wait(self, cur_or_conn): + pollable = cur_or_conn + if not hasattr(pollable, 'poll'): + pollable = cur_or_conn.connection while True: state = pollable.poll() if state == psycopg2.extensions.POLL_OK: @@ -301,7 +305,7 @@ class AsyncTests(unittest.TestCase): curs = self.conn.cursor() for mb in 1, 5, 10, 20, 50: size = mb * 1024 * 1024 - stub = PollableStub(curs) + stub = PollableStub(self.conn) curs.execute("select %s;", ('x' * size,)) self.wait(stub) self.assertEqual(size, len(curs.fetchone()[0])) @@ -312,19 +316,33 @@ class AsyncTests(unittest.TestCase): def test_sync_poll(self): cur = self.sync_conn.cursor() - # polling a sync cursor works - cur.poll() + cur.execute("select 1") + # polling with a sync query works + cur.connection.poll() + self.assertEquals(cur.fetchone()[0], 1) - def test_async_poll_wrong_cursor(self): - cur1 = self.conn.cursor() - cur2 = self.conn.cursor() - cur1.execute("select 1") + def test_notify(self): + cur = self.conn.cursor() + sync_cur = self.sync_conn.cursor() - # polling a cursor that's not currently executing is an error - self.assertRaises(psycopg2.ProgrammingError, cur2.poll) + sync_cur.execute("listen test_notify") + self.sync_conn.commit() + cur.execute("notify test_notify") + self.wait(cur) - self.wait(cur1) - self.assertEquals(cur1.fetchone()[0], 1) + self.assertEquals(self.sync_conn.notifies, []) + + pid = self.conn.get_backend_pid() + for _ in range(5): + self.wait(self.sync_conn) + if not self.sync_conn.notifies: + time.sleep(0.5) + continue + self.assertEquals(len(self.sync_conn.notifies), 1) + self.assertEquals(self.sync_conn.notifies.pop(), + (pid, "test_notify")) + return + self.fail("No NOTIFY in 2.5 seconds") def test_async_fetch_wrong_cursor(self): cur1 = self.conn.cursor()