Merge remote branch 'piro/fix22' into python2

This commit is contained in:
Federico Di Gregorio 2010-04-14 08:57:30 +02:00
commit a90935930b
11 changed files with 284 additions and 140 deletions

View File

@ -1,3 +1,9 @@
2010-04-13 Daniele Varrazzo <daniele.varrazzo@gmail.com>
* lib/extensions.py: DECIMAL typecaster imported from _psycopg.
* lib/extensions.py: PY* and MX* time typecaster imported from _psycopg.
2010-04-05 Federico Di Gregorio <fog@initd.org>
* Fixed problem with asynchronous NOTIFYs.

View File

@ -1,8 +0,0 @@
Asynchronous support in psycopg
*******************************
The support for asynchronous queries in psycopg is only half-backed and
won't work unless a good deal of time is put into fixing bugs. Currently
I (the author) have no need for asycnhronous queries but patches are
welcome. Also welcome are short tests that show how the async path fails
(miserably) so that I can at least fix obvious problems.

View File

@ -246,88 +246,83 @@ Running the script and executing the command :sql:`NOTIFY test` in a separate
.. index::
double: Asynchronous; Query
double: Asynchronous; Connection
.. _asynchronous-queries:
.. _async-support:
Asynchronous queries
Asynchronous support
--------------------
.. warning::
.. versionadded:: 2.2.0
Psycopg support for asynchronous queries is still experimental and the
informations reported here may be out of date.
Psycopg can issue asynchronous queries to a Postgresql database. An asynchronous
communication style is estabilished passing the parameter *async*\=1 to the
`~psycopg2.connect()` function: the returned connection will work in
asynchronous mode.
Discussion, testing and suggestions are welcome.
In asynchronous mode, a Psycopg connection will rely on the caller to poll for
the socket file descriptor ready to accept data or a query result ready to be
read from the server. The caller can use the method `~cursor.fileno()` to get
the connection file descriptor and `~cursor.poll()` to make communication
proceed. An application can use a loop like the one below to transfer data
between the client and the server::
Program code can initiate an asynchronous query by passing an ``async=1`` flag
to the `~cursor.execute()` or `~cursor.callproc()` cursor methods. A
very simple example, from the connection to the query::
def wait(conn_or_cur):
while 1:
state = conn_or_cur.poll()
if state == psycopg2.extensions.POLL_OK:
break
elif state == psycopg2.extensions.POLL_WRITE:
select.select([], [conn_or_cur.fileno()], [])
elif state == psycopg2.extensions.POLL_READ:
select.select([conn_or_cur.fileno()], [], [])
else:
raise psycopg2.OperationalError("poll() returned %s" % state)
conn = psycopg2.connect(database='test')
After `!poll()` has returned `~psycopg2.extensions.POLL_OK`, the results are
available in the cursor for regular reading::
curs.execute("SELECT * FROM foo;")
wait(curs)
for record in curs:
# use it...
The same loop should also be used to accomplish a connection with the server:
the connection is usable only after `connection.poll()` has returned `!POLL_OK`.
The `!connection` has a `~connection.fileno()` method too, so it is possible to
use the same interface for the wait loop::
conn = psycopg2.connect(database='test', async=1)
wait(conn)
# Now you can have a cursor.
curs = conn.cursor()
curs.execute("SELECT * from test WHERE fielda > %s", (1971,), async=1)
From then on any query on other cursors derived from the same connection is
doomed to fail (and raise an exception) until the original cursor (the one
executing the query) complete the asynchronous operation. This can happen in
a number of different ways:
Notice that there are a few other requirements to be met in order to have a
completely non-blocking connection attempt: see the libpq documentation for
|PQconnectStart|_.
1) one of the `!fetch*()` methods is called, effectively blocking until
data has been sent from the backend to the client, terminating the query.
.. |PQconnectStart| replace:: `!PQconnectStart()`
.. _PQconnectStart: http://www.postgresql.org/docs/8.4/static/libpq-connect.html#AEN33199
2) `connection.cancel()` is called. This method tries to abort the
current query and will block until the query is aborted or fully executed.
The return value is ``True`` if the query was successfully aborted or
``False`` if it was executed. Query result are discarded in both cases.
When an asynchronous query is being executed, `connection.executing()` returns
`True`. Two cursors can't execute concurrent queries on the same asynchronous
connection.
3) `~cursor.execute()` is called again on the same cursor
(`!execute()` on a different cursor will simply raise an exception).
This waits for the complete execution of the current query, discard any
data and execute the new one.
There are several limitations in using asynchronous connections: the connection
is always in :ref:`autocommit <autocommit>` mode and it is not possible to
change it using `~connection.set_isolation_level()`. So transaction are not
started at each query and is not possible to use methods `~connection.commit()`
and `~connection.rollback()`: you can manually control transactions using
`~cursor.execute()` to send commands :sql:`BEGIN`, :sql:`COMMIT` and
:sql:`ROLLBACK`.
Note that calling `!execute()` two times in a row will not abort the
former query and will temporarily go to synchronous mode until the first of
the two queries is executed.
With asynchronous connections it is also not possible to use
`~connection.set_client_encoding()`, `~cursor.executemany()`, :ref:`large
objects <large-objects>`, :ref:`named cursors <server-side-cursors>`.
Cursors now have some extra methods that make them useful during
asynchronous queries:
:ref:`COPY commands <copy>` are not supported either in asynchronous mode, but
this will be probably implemented in a future release.
`~cursor.fileno()`
Returns the file descriptor associated with the current connection and
make possible to use a cursor in a context where a file object would be
expected (like in a `select()` call).
`~cursor.isready()`
Returns ``False`` if the backend is still processing the query or ``True``
if data is ready to be fetched (by one of the `!fetch*()` methods).
.. index::
single: Example; Asynchronous query
A code snippet that shows how to use the cursor object in a `!select()`
call::
import psycopg2
import select
conn = psycopg2.connect(database='test')
curs = conn.cursor()
curs.execute("SELECT * from test WHERE fielda > %s", (1971,), async=1)
# wait for input with a maximum timeout of 5 seconds
query_ended = False
while not query_ended:
rread, rwrite, rspec = select([curs, another_file], [], [], 5)
if curs.isready():
query_ended = True
# manage input from other sources like other_file, etc.
print "Query Results:"
for row in curs:
print row
.. testcode::

View File

@ -314,3 +314,37 @@ The ``connection`` class
.. _lo_import: http://www.postgresql.org/docs/8.4/static/lo-interfaces.html#AEN36307
.. versionadded:: 2.0.8
.. rubric:: Methods related to asynchronous support.
.. seealso:: :ref:`Asynchronous support <async-support>`.
.. method:: issync()
Return `True` if the connection is synchronous, `False` if asynchronous.
.. method:: poll()
Used during an asynchronous connection attempt, make communication
proceed if it wouldn't block.
Return one of the constants defined in :ref:`poll-constants`. If it
returns `~psycopg2.extensions.POLL_OK` the connection has been
estabilished. Otherwise wait until the file descriptor is ready as
explained in :ref:`async-support`.
.. method:: fileno()
Return the file descriptor associated with the connection to read the
status during asynchronous communication.
.. method:: executing()
Return `True` if the connection is executing an asynchronous operation.

View File

@ -109,7 +109,7 @@ The ``cursor`` class
.. rubric:: Commands execution methods
.. method:: execute(operation [, parameters] [, async])
.. method:: execute(operation [, parameters])
Prepare and execute a database operation (query or command).
@ -121,16 +121,6 @@ The ``cursor`` class
The method returns `None`. If a query was executed, the returned
values can be retrieved using |fetch*|_ methods.
If `async` is ``True``, query execution will be asynchronous:
the function returns immediately while the query is executed by the
backend. Use the `~cursor.isready()` method to see if the data is
ready for return via |fetch*|_ methods. See
:ref:`asynchronous-queries`.
.. extension::
The `async` parameter is a Psycopg extension to the |DBAPI|.
.. method:: mogrify(operation [, parameters])
@ -159,7 +149,7 @@ The ``cursor`` class
the `~cursor.execute()` method.
.. method:: callproc(procname [, parameters] [, async])
.. method:: callproc(procname [, parameters])
Call a stored database procedure with the given name. The sequence of
parameters must contain one entry for each argument that the procedure
@ -170,16 +160,6 @@ The ``cursor`` class
The procedure may also provide a result set as output. This must then
be made available through the standard |fetch*|_ methods.
If `async` is ``True``, procedure execution will be asynchronous:
the function returns immediately while the procedure is executed by
the backend. Use the `~cursor.isready()` method to see if the
data is ready for return via |fetch*|_ methods. See
:ref:`asynchronous-queries`.
.. extension::
The `async` parameter is a Psycopg extension to the |DBAPI|.
.. method:: setinputsizes(sizes)
@ -400,29 +380,6 @@ The ``cursor`` class
|DBAPI|.
.. method:: isready()
Return ``False`` if the backend is still processing an asynchronous
query or ``True`` if data is ready to be fetched by one of the
|fetch*|_ methods. See :ref:`asynchronous-queries`.
.. extension::
The `isready()` method is a Psycopg extension to the |DBAPI|.
.. method:: fileno()
Return the file descriptor associated with the current connection and
make possible to use a cursor in a context where a file object would
be expected (like in a `select()` call). See
:ref:`asynchronous-queries`.
.. extension::
The `fileno()` method is a Psycopg extension to the |DBAPI|.
.. attribute:: tzinfo_factory
The time zone factory used to handle data types such as
@ -434,6 +391,33 @@ The ``cursor`` class
.. rubric:: Methods related to asynchronous support.
.. extension::
:ref:`Asynchronous support <async-support>` is a Psycopg extension to
the |DBAPI|.
.. method:: poll()
Used during asynchronous queries, make asynchronous communication
proceed if it wouldn't block.
Return `~psycopg2.extensions.POLL_OK` if the query has been fully
processed, `~psycopg2.extensions.POLL_READ` if the query has been sent
and the application should be waiting for the result to arrive or
`~psycopg2.extensions.POLL_WRITE` is the query is still being sent.
.. method:: fileno()
Return the file descriptor associated with the current connection to
make possible to use a cursor in a context where a file object would
be expected (like in a `select()` call).
.. rubric:: COPY-related methods
.. extension::

View File

@ -21,6 +21,8 @@ functionalities defined by the |DBAPI|_.
`!connect()` function using the `connection_factory` parameter.
See also :ref:`subclassing-connection`.
Subclasses should have constructor signature :samp:`({dsn}, {async}=0)`.
For a complete description of the class, see `connection`.
.. class:: cursor
@ -80,13 +82,13 @@ functionalities defined by the |DBAPI|_.
.. method:: truncate(len=0)
.. versionadded:: 2.0.15
.. versionadded:: 2.2.0
Truncate the lobject to the given size.
The method will only be available if psycopg has been built against libpq
from PostgreSQL 8.3 or later and can only be used with PostgreSQL servers
running these versions. It uses the |lo_truncate|_ libpq function.
The method will only be available if Psycopg has been built against libpq
from PostgreSQL 8.3 or later and can only be used with PostgreSQL servers
running these versions. It uses the |lo_truncate|_ libpq function.
.. |lo_truncate| replace:: `!lo_truncate()`
.. _lo_truncate: http://www.postgresql.org/docs/8.4/static/lo-interfaces.html#AEN36420
@ -197,7 +199,7 @@ deal with Python objects adaptation:
.. versionchanged:: 2.0.14
previously the adapter was not exposed by the `extensions`
module. In older version it can be imported from the implementation
module. In older versions it can be imported from the implementation
module `!psycopg2._psycopg`.
@ -443,6 +445,40 @@ can be read from the `~connection.status` attribute.
.. index::
pair: Poll status; Constants
.. _poll-constants:
Poll constants
--------------
.. versionadded:: 2.2.0
These values can be returned by `connection.poll()` and `cursor.poll()` during
asynchronous communication. See :ref:`async-support`.
.. data:: POLL_OK
The data is available (or the file descriptor is ready for writing): there
is no need to block anymore.
.. data:: POLL_READ
Upon receiving this value, the callback should wait for the connection
file descriptor to be ready *for reading*. For example::
select.select([conn.fileno()], [], [])
.. data:: POLL_WRITE
Upon receiving this value, the callback should wait for the connection
file descriptor to be ready *for writing*. For example::
select.select([], [conn.fileno()], [])
Additional database types
-------------------------
@ -453,25 +489,56 @@ Python objects. All the typecasters are automatically registered, except
`register_type()` in order to receive Unicode objects instead of strings
from the database. See :ref:`unicode-handling` for details.
.. data:: BINARYARRAY
BOOLEAN
BOOLEANARRAY
.. data:: BOOLEAN
DATE
DECIMAL
FLOAT
INTEGER
INTERVAL
LONGINTEGER
TIME
UNICODE
Typecasters for basic types. Notice that a few other ones (`~psycopg2.BINARY`,
`~psycopg2.DATETIME`, `~psycopg2.NUMBER`, `~psycopg2.ROWID`,
`~psycopg2.STRING`) are exposed by the `psycopg2` module for |DBAPI|_
compliance.
.. data:: BINARYARRAY
BOOLEANARRAY
DATEARRAY
DATETIMEARRAY
DECIMALARRAY
FLOAT
FLOATARRAY
INTEGER
INTEGERARRAY
INTERVAL
INTERVALARRAY
LONGINTEGER
LONGINTEGERARRAY
ROWIDARRAY
STRINGARRAY
TIME
TIMEARRAY
UNICODE
UNICODEARRAY
Typecasters to convert arrays of sql types into Python lists.
.. data:: PYDATE
PYDATETIME
PYINTERVAL
PYTIME
Typecasters to convert time-related data types to Python `!datetime`
objects.
.. data:: MXDATE
MXDATETIME
MXINTERVAL
MXTIME
Typecasters to convert time-related data types to `mx.DateTime`_ objects.
Only available if Psycopg was compiled with `!mx` support.
.. versionchanged:: 2.2.0
previously the `DECIMAL` typecaster and the specific time-related
typecasters (`!PY*` and `!MX*`) were not exposed by the `extensions`
module. In older versions they can be imported from the implementation
module `!psycopg2._psycopg`.

View File

@ -70,6 +70,20 @@ My database is Unicode, but I receive all the strings as UTF-8 `str`. Can I rece
See :ref:`unicode-handling` for the gory details.
Psycopg converts :sql:`decimal`\/\ :sql:`numeric` database types into Python `!Decimal` objects. Can I have `!float` instead?
You can register the `~psycopg2.extensions.FLOAT` typecaster to be used in
place of `~psycopg2.extensions.DECIMAL`::
DEC2FLOAT = psycopg2.extensions.new_type(
psycopg2.extensions.DECIMAL.values,
'DEC2FLOAT',
psycopg2.extensions.FLOAT)
psycopg2.extensions.register_type(DEC2FLOAT)
See :ref:`type-casting-from-sql-to-python` to read the relevant
documentation. If you find `!psycopg2.extensions.DECIMAL` not avalable, use
`!psycopg2._psycopg.DECIMAL` instead.
I can't compile `!psycopg2`: the compiler says *error: Python.h: No such file or directory*. What am I missing?
You need to install a Python development package: it is usually called
``python-dev``.

View File

@ -16,7 +16,7 @@ The module interface respects the standard defined in the |DBAPI|_.
single: Port; Connection
single: DSN (Database Source Name)
.. function:: connect(dsn or params[, connection_factory])
.. function:: connect(dsn or params [, connection_factory] [, async=0])
Create a new database session and return a new `connection` object.
@ -40,15 +40,18 @@ The module interface respects the standard defined in the |DBAPI|_.
.. __: http://www.postgresql.org/docs/8.4/static/libpq-ssl.html#LIBPQ-SSL-SSLMODE-STATEMENTS
Using the `connection_factory` parameter a different class or
Using the *connection_factory* parameter a different class or
connections factory can be specified. It should be a callable object
taking a `dsn` argument. See :ref:`subclassing-connection` for
taking a *dsn* argument. See :ref:`subclassing-connection` for
details.
Using *async*\=1 an asynchronous connection will be created: see
:ref:`async-support` to know about advantages and limitations.
.. extension::
The `connection_factory` parameter is a Psycopg extension to the
|DBAPI|.
The parameters *connection_factory* and *async* are Psycopg extensions
to the |DBAPI|.
.. data:: apilevel

View File

@ -33,7 +33,7 @@ This module holds all the extensions to the DBAPI-2.0 provided by psycopg.
# License for more details.
from _psycopg import UNICODE, INTEGER, LONGINTEGER, BOOLEAN, FLOAT
from _psycopg import TIME, DATE, INTERVAL
from _psycopg import TIME, DATE, INTERVAL, DECIMAL
from _psycopg import BINARYARRAY, BOOLEANARRAY, DATEARRAY, DATETIMEARRAY
from _psycopg import DECIMALARRAY, FLOATARRAY, INTEGERARRAY, INTERVALARRAY
from _psycopg import LONGINTEGERARRAY, ROWIDARRAY, STRINGARRAY, TIMEARRAY
@ -41,11 +41,14 @@ from _psycopg import UNICODEARRAY
from _psycopg import Binary, Boolean, Float, QuotedString, AsIs
try:
from _psycopg import MXDATE, MXDATETIME, MXINTERVAL, MXTIME
from _psycopg import DateFromMx, TimeFromMx, TimestampFromMx
from _psycopg import IntervalFromMx
except:
pass
try:
from _psycopg import PYDATE, PYDATETIME, PYINTERVAL, PYTIME
from _psycopg import DateFromPy, TimeFromPy, TimestampFromPy
from _psycopg import IntervalFromPy
except:

View File

@ -757,8 +757,9 @@ connection_init(PyObject *obj, PyObject *args, PyObject *kwds)
{
const char *dsn;
long int async = 0;
static char *kwlist[] = {"dsn", "async", NULL};
if (!PyArg_ParseTuple(args, "s|l", &dsn, &async))
if (!PyArg_ParseTupleAndKeywords(args, kwds, "s|l", kwlist, &dsn, &async))
return -1;
return connection_setup((connectionObject *)obj, dsn, async);

45
tests/test_async.py Normal file → Executable file
View File

@ -14,6 +14,21 @@ else:
import py3tests as tests
class PollableStub(object):
"""A 'pollable' wrapper allowing analysis of the `poll()` calls."""
def __init__(self, pollable):
self.pollable = pollable
self.polls = []
def fileno(self):
return self.pollable.fileno()
def poll(self):
rv = self.pollable.poll()
self.polls.append(rv)
return rv
class AsyncTests(unittest.TestCase):
def setUp(self):
@ -274,5 +289,35 @@ class AsyncTests(unittest.TestCase):
# it should be the result of the second query
self.assertEquals(cur.fetchone()[0], "b" * 10000)
def test_async_subclass(self):
class MyConn(psycopg2.extensions.connection):
def __init__(self, dsn, async=0):
psycopg2.extensions.connection.__init__(self, dsn, async=async)
conn = psycopg2.connect(tests.dsn, connection_factory=MyConn, async=True)
self.assert_(isinstance(conn, MyConn))
self.assert_(not conn.issync())
conn.close()
def test_flush_on_write(self):
# a very large query requires a flush loop to be sent to the backend
curs = self.conn.cursor()
for mb in 1, 5, 10, 20, 50:
size = mb * 1024 * 1024
print "\nplease wait: sending", mb, "MB query to the server",
stub = PollableStub(curs)
curs.execute("select %s;", ('x' * size,))
self.wait(stub)
self.assertEqual(size, len(curs.fetchone()[0]))
if stub.polls.count(psycopg2.extensions.POLL_WRITE) > 1:
return
self.fail("sending a large query didn't trigger block on write.")
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)
if __name__ == "__main__":
unittest.main()