diff --git a/lib/extensions.py b/lib/extensions.py index 20f78e69..f7431d18 100644 --- a/lib/extensions.py +++ b/lib/extensions.py @@ -60,6 +60,11 @@ from _psycopg import ISQLQuote from _psycopg import QueryCanceledError, TransactionRollbackError +try: + from _psycopg import set_wait_callback +except ImportError: + pass + """Isolation level values.""" ISOLATION_LEVEL_AUTOCOMMIT = 0 ISOLATION_LEVEL_READ_COMMITTED = 1 @@ -83,6 +88,7 @@ STATUS_IN_TRANSACTION = STATUS_BEGIN POLL_OK = 0 POLL_READ = 1 POLL_WRITE = 2 +POLL_ERROR = 3 """Backend transaction status values.""" TRANSACTION_STATUS_IDLE = 0 diff --git a/lib/extras.py b/lib/extras.py index 027c587b..ac784b8e 100644 --- a/lib/extras.py +++ b/lib/extras.py @@ -484,4 +484,28 @@ def register_tstz_w_secs(oids=None, conn_or_curs=None): return _ext.TSTZ_W_SECS +import select +from psycopg2.extensions import POLL_OK, POLL_READ, POLL_WRITE +from psycopg2 import OperationalError + +def wait_select(conn, curs=None): + """Wait until a connection or cursor has data available. + + The function is an example of a wait callback to be registered with + `~psycopg2.extensions.set_wait_callback()`. This function uses `!select()` + to wait for data available. + """ + poll = (curs or conn).poll + while 1: + state = poll() + if state == POLL_OK: + break + elif state == POLL_READ: + select.select([conn.fileno()], [], []) + elif state == POLL_WRITE: + select.select([], [conn.fileno()], []) + else: + raise OperationalError("bad state from poll: %s" % state) + + __all__ = filter(lambda k: not k.startswith('_'), locals().keys()) diff --git a/psycopg/green.c b/psycopg/green.c new file mode 100644 index 00000000..874c264d --- /dev/null +++ b/psycopg/green.c @@ -0,0 +1,94 @@ +/* green.c - cooperation with coroutine libraries. + * + * Copyright (C) 2010 Daniele Varrazzo + * + * This file is part of psycopg. + * + * psycopg2 is free software: you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * In addition, as a special exception, the copyright holders give + * permission to link this program with the OpenSSL library (or with + * modified versions of OpenSSL that use the same license as OpenSSL), + * and distribute linked combinations including the two. + * + * You must obey the GNU Lesser General Public License in all respects for + * all of the code used other than OpenSSL. + * + * psycopg2 is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + */ + +#define PSYCOPG_MODULE +#include "psycopg/config.h" +#include "psycopg/python.h" +#include "psycopg/psycopg.h" +#include "psycopg/green.h" + +HIDDEN PyObject *wait_callback = NULL; + +/* Register a callback function to block waiting for data. + * + * The function is exported by the _psycopg module. + */ +PyObject * +psyco_set_wait_callback(PyObject *self, PyObject *obj) +{ + Py_XDECREF(wait_callback); + + if (obj != Py_None) { + wait_callback = obj; + Py_INCREF(obj); + } + else { + wait_callback = NULL; + } + + Py_INCREF(Py_None); + return Py_None; +} + + +/* Return nonzero if a wait callback should be called. */ +int +psyco_green() +{ +#ifdef PSYCOPG_EXTENSIONS + return (NULL != wait_callback); +#else + return 0; +#endif +} + +/* Block waiting for data available in an async connection. + * + * This function assumes `wait_callback` to be available: + * raise `InterfaceError` if it is not. Use `psyco_green()` to check if + * the function is to be called. + * + * The function returns the return value of the called function. + */ +PyObject * +psyco_wait(PyObject *conn, PyObject *curs) +{ + PyObject *rv; + PyObject *cb; + + Dprintf("psyco_wait"); + cb = wait_callback; + if (!cb) { + PyErr_SetString(OperationalError, "wait callback not available"); + return NULL; + } + + Py_INCREF(cb); + rv = PyObject_CallFunctionObjArgs(cb, conn, curs, NULL); + Py_DECREF(cb); + + return rv; +} + diff --git a/psycopg/green.h b/psycopg/green.h new file mode 100644 index 00000000..28999272 --- /dev/null +++ b/psycopg/green.h @@ -0,0 +1,58 @@ +/* green.c - cooperation with coroutine libraries. + * + * Copyright (C) 2010 Daniele Varrazzo + * + * This file is part of psycopg. + * + * psycopg2 is free software: you can redistribute it and/or modify it + * under the terms of the GNU Lesser General Public License as published + * by the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * In addition, as a special exception, the copyright holders give + * permission to link this program with the OpenSSL library (or with + * modified versions of OpenSSL that use the same license as OpenSSL), + * and distribute linked combinations including the two. + * + * You must obey the GNU Lesser General Public License in all respects for + * all of the code used other than OpenSSL. + * + * psycopg2 is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public + * License for more details. + */ + +#ifndef PSYCOPG_GREEN_H +#define PSYCOPG_GREEN_H 1 + +struct PyObject; + +#ifdef __cplusplus +extern "C" { +#endif + +#define psyco_set_wait_callback_doc \ +"set_wait_callback(f) -- Register a callback function to block waiting for data.\n" \ +"\n" \ +"The callback must should signature :samp:`fun({conn}, {cur}=None)` and\n" \ +"is called to wait for data available whenever a blocking function from the\n" \ +"libpq is called. Use `!register_wait_function(None)` to revert to the\n" \ +"original behaviour (using blocking libpq functions).\n" \ +"\n" \ +"The function is an hook to allow coroutine-based libraries (such as\n" \ +"eventlet_ or gevent_) to switch when Psycopg is blocked, allowing\n" \ +"other coroutines to run concurrently.\n" \ +"\n" \ +"See `~psycopg2.extras.wait_select()` for an example of a wait callback\n" \ +"implementation.\n" + +HIDDEN PyObject *psyco_set_wait_callback(PyObject *self, PyObject *obj); +HIDDEN int psyco_green(void); +HIDDEN PyObject *psyco_wait(PyObject *conn, PyObject *curs); + +#ifdef __cplusplus +} +#endif + +#endif /* !defined(PSYCOPG_GREEN_H) */ diff --git a/psycopg/psycopgmodule.c b/psycopg/psycopgmodule.c index 528c592b..60c9c041 100644 --- a/psycopg/psycopgmodule.c +++ b/psycopg/psycopgmodule.c @@ -32,6 +32,7 @@ #include "psycopg/psycopg.h" #include "psycopg/connection.h" #include "psycopg/cursor.h" +#include "psycopg/green.h" #include "psycopg/lobject.h" #include "psycopg/typecast.h" #include "psycopg/microprotocols.h" @@ -693,6 +694,11 @@ static PyMethodDef psycopgMethods[] = { METH_VARARGS, psyco_IntervalFromMx_doc}, #endif +#ifdef PSYCOPG_EXTENSIONS + {"set_wait_callback", (PyCFunction)psyco_set_wait_callback, + METH_O, psyco_set_wait_callback_doc}, +#endif + {NULL, NULL, 0, NULL} /* Sentinel */ }; diff --git a/setup.py b/setup.py index 6e5d85db..13abfe1a 100644 --- a/setup.py +++ b/setup.py @@ -343,7 +343,7 @@ sources = [ 'adapter_qstring.c', 'adapter_pboolean.c', 'adapter_binary.c', 'adapter_asis.c', 'adapter_list.c', 'adapter_datetime.c', 'adapter_pfloat.c', 'adapter_pdecimal.c', - 'utils.c'] + 'green.c', 'utils.c'] parser = ConfigParser.ConfigParser() parser.read('setup.cfg')