Remove fileno() and poll() from cursors

Instead, the code should be using the fileno() and poll() methods of
the cursor's connection. Handle the case when poll() is called on an
already built connection as a request to poll the asynchronous query
(if there is one) and get NOTIFY events.
Update the tests to reflect that change, add a test for NOTIFY.
This commit is contained in:
Jan Urbański 2010-04-18 15:51:58 +02:00 committed by Daniele Varrazzo
parent b7327a349d
commit 067161d5f3
5 changed files with 74 additions and 71 deletions

View File

@ -132,6 +132,7 @@ HIDDEN int conn_switch_isolation_level(connectionObject *self, int level);
HIDDEN int conn_set_client_encoding(connectionObject *self, const char *enc);
HIDDEN PyObject *conn_poll_send(connectionObject *self);
HIDDEN PyObject *conn_poll_fetch(connectionObject *self);
HIDDEN PyObject *conn_poll_ready(connectionObject *self);
/* exception-raising macros */
#define EXC_IF_CONN_CLOSED(self) if ((self)->closed > 0) { \

View File

@ -564,6 +564,44 @@ conn_poll_fetch(connectionObject *self)
return PyInt_FromLong(ret);
}
/* conn_poll_ready - handle connection polling when it is already open */
PyObject *
conn_poll_ready(connectionObject *self)
{
int is_busy;
/* if there is an asynchronous query underway, poll it */
if (self->async_cursor != NULL) {
if (self->async_status == ASYNC_WRITE) {
return curs_poll_send((cursorObject *) self->async_cursor);
}
else {
/* this gets called both for ASYNC_READ and ASYNC_DONE, because
even if the async query is complete, we still might want to
check for NOTIFYs */
return curs_poll_fetch((cursorObject *) self->async_cursor);
}
}
/* otherwise just check for NOTIFYs */
is_busy = pq_is_busy(self);
if (is_busy == -1) {
/* there was an error, raise the exception */
return NULL;
}
else if (is_busy == 1) {
/* the connection is busy, tell the user to wait more */
Dprintf("conn_poll_ready: returning %d", PSYCO_POLL_READ);
return PyInt_FromLong(PSYCO_POLL_READ);
}
else {
/* connection is idle */
Dprintf("conn_poll_ready: returning %d", PSYCO_POLL_OK);
return PyInt_FromLong(PSYCO_POLL_OK);
}
}
/* conn_close - do anything needed to shut down the connection */
void

View File

@ -449,8 +449,11 @@ psyco_conn_poll(connectionObject *self)
break;
case CONN_STATUS_READY:
/* we have completed the connection setup */
return PyInt_FromLong(PSYCO_POLL_OK);
case CONN_STATUS_BEGIN:
/* The connection is ready, but we might be in an asynchronous query,
or we just might want to check for NOTIFYs. For synchronous
connections the status might be BEGIN, not READY. */
return conn_poll_ready(self);
default:
/* everything else is an error */

View File

@ -1441,59 +1441,6 @@ psyco_curs_copy_expert(cursorObject *self, PyObject *args, PyObject *kwargs)
return res;
}
/* extension: fileno - return the file descripor of the connection */
#define psyco_curs_fileno_doc \
"fileno() -> int -- Return file descriptor associated to database connection."
static PyObject *
psyco_curs_fileno(cursorObject *self)
{
long int socket;
EXC_IF_CURS_CLOSED(self);
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&(self->conn->lock));
socket = (long int)PQsocket(self->conn->pgconn);
pthread_mutex_unlock(&(self->conn->lock));
Py_END_ALLOW_THREADS;
return PyInt_FromLong(socket);
}
/* extension: poll - return true if data from async execute is ready */
#define psyco_curs_poll_doc \
"poll() -- return POLL_OK if the query has been fully processed, " \
"POLL_READ if the query has been sent and the application should be " \
"waiting for the result to arrive or POLL_WRITE is the query is still " \
"being sent."
static PyObject *
psyco_curs_poll(cursorObject *self)
{
EXC_IF_CURS_CLOSED(self);
if (self->conn->async_cursor != NULL &&
self->conn->async_cursor != (PyObject *) self) {
PyErr_SetString(ProgrammingError, "poll with wrong cursor");
return NULL;
}
Dprintf("curs_poll: polling with status %d", self->conn->async_status);
if (self->conn->async_status == ASYNC_WRITE) {
return curs_poll_send(self);
}
else {
/* this gets called both for ASYNC_READ and ASYNC_DONE, because even
if the async query is complete, we still might want to check for
NOTIFYs */
return curs_poll_fetch(self);
}
}
/* extension: closed - return true if cursor is closed*/
#define psyco_curs_closed_doc \
@ -1572,10 +1519,6 @@ static struct PyMethodDef cursorObject_methods[] = {
#ifdef PSYCOPG_EXTENSIONS
{"mogrify", (PyCFunction)psyco_curs_mogrify,
METH_VARARGS|METH_KEYWORDS, psyco_curs_mogrify_doc},
{"poll", (PyCFunction)psyco_curs_poll,
METH_VARARGS, psyco_curs_poll_doc},
{"fileno", (PyCFunction)psyco_curs_fileno,
METH_NOARGS, psyco_curs_fileno_doc},
{"copy_from", (PyCFunction)psyco_curs_copy_from,
METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_from_doc},
{"copy_to", (PyCFunction)psyco_curs_copy_to,

View File

@ -4,6 +4,7 @@ import unittest
import psycopg2
from psycopg2 import extensions
import time
import select
import StringIO
@ -48,7 +49,10 @@ class AsyncTests(unittest.TestCase):
self.sync_conn.close()
self.conn.close()
def wait(self, pollable):
def wait(self, cur_or_conn):
pollable = cur_or_conn
if not hasattr(pollable, 'poll'):
pollable = cur_or_conn.connection
while True:
state = pollable.poll()
if state == psycopg2.extensions.POLL_OK:
@ -301,7 +305,7 @@ class AsyncTests(unittest.TestCase):
curs = self.conn.cursor()
for mb in 1, 5, 10, 20, 50:
size = mb * 1024 * 1024
stub = PollableStub(curs)
stub = PollableStub(self.conn)
curs.execute("select %s;", ('x' * size,))
self.wait(stub)
self.assertEqual(size, len(curs.fetchone()[0]))
@ -312,19 +316,33 @@ class AsyncTests(unittest.TestCase):
def test_sync_poll(self):
cur = self.sync_conn.cursor()
# polling a sync cursor works
cur.poll()
cur.execute("select 1")
# polling with a sync query works
cur.connection.poll()
self.assertEquals(cur.fetchone()[0], 1)
def test_async_poll_wrong_cursor(self):
cur1 = self.conn.cursor()
cur2 = self.conn.cursor()
cur1.execute("select 1")
def test_notify(self):
cur = self.conn.cursor()
sync_cur = self.sync_conn.cursor()
# polling a cursor that's not currently executing is an error
self.assertRaises(psycopg2.ProgrammingError, cur2.poll)
sync_cur.execute("listen test_notify")
self.sync_conn.commit()
cur.execute("notify test_notify")
self.wait(cur)
self.wait(cur1)
self.assertEquals(cur1.fetchone()[0], 1)
self.assertEquals(self.sync_conn.notifies, [])
pid = self.conn.get_backend_pid()
for _ in range(5):
self.wait(self.sync_conn)
if not self.sync_conn.notifies:
time.sleep(0.5)
continue
self.assertEquals(len(self.sync_conn.notifies), 1)
self.assertEquals(self.sync_conn.notifies.pop(),
(pid, "test_notify"))
return
self.fail("No NOTIFY in 2.5 seconds")
def test_async_fetch_wrong_cursor(self):
cur1 = self.conn.cursor()