diff --git a/doc/src/advanced.rst b/doc/src/advanced.rst index 2801a2df..76570147 100644 --- a/doc/src/advanced.rst +++ b/doc/src/advanced.rst @@ -246,88 +246,83 @@ Running the script and executing the command :sql:`NOTIFY test` in a separate .. index:: - double: Asynchronous; Query + double: Asynchronous; Connection -.. _asynchronous-queries: +.. _async-support: -Asynchronous queries +Asynchronous support -------------------- -.. warning:: +.. versionadded:: 2.2.0 - Psycopg support for asynchronous queries is still experimental and the - informations reported here may be out of date. +Psycopg can issue asynchronous queries to a Postgresql database. An asynchronous +communication style is estabilished passing the parameter *async*\=1 to the +`~psycopg2.connect()` function: the returned connection will work in +asynchronous mode. - Discussion, testing and suggestions are welcome. +In asynchronous mode, a Psycopg connection will rely on the caller to poll for +the socket file descriptor ready to accept data or a query result ready to be +read from the server. The caller can use the method `~cursor.fileno()` to get +the connection file descriptor and `~cursor.poll()` to make communication +proceed. An application can use a loop like the one below to transfer data +between the client and the server:: -Program code can initiate an asynchronous query by passing an ``async=1`` flag -to the `~cursor.execute()` or `~cursor.callproc()` cursor methods. A -very simple example, from the connection to the query:: + def wait(conn_or_cur): + while 1: + state = conn_or_cur.poll() + if state == psycopg2.extensions.POLL_OK: + break + elif state == psycopg2.extensions.POLL_WRITE: + select.select([], [conn_or_cur.fileno()], []) + elif state == psycopg2.extensions.POLL_READ: + select.select([conn_or_cur.fileno()], [], []) + else: + raise psycopg2.OperationalError("poll() returned %s" % state) - conn = psycopg2.connect(database='test') +After `!poll()` has returned `~psycopg2.extensions.POLL_OK`, the results are +available in the cursor for regular reading:: + + curs.execute("SELECT * FROM foo;") + wait(curs) + for record in curs: + # use it... + +The same loop should also be used to accomplish a connection with the server: +the connection is usable only after `connection.poll()` has returned `!POLL_OK`. +The `!connection` has a `~connection.fileno()` method too, so it is possible to +use the same interface for the wait loop:: + + conn = psycopg2.connect(database='test', async=1) + wait(conn) + # Now you can have a cursor. curs = conn.cursor() - curs.execute("SELECT * from test WHERE fielda > %s", (1971,), async=1) -From then on any query on other cursors derived from the same connection is -doomed to fail (and raise an exception) until the original cursor (the one -executing the query) complete the asynchronous operation. This can happen in -a number of different ways: +Notice that there are a few other requirements to be met in order to have a +completely non-blocking connection attempt: see the libpq documentation for +|PQconnectStart|_. -1) one of the `!fetch*()` methods is called, effectively blocking until - data has been sent from the backend to the client, terminating the query. +.. |PQconnectStart| replace:: `!PQconnectStart()` +.. _PQconnectStart: http://www.postgresql.org/docs/8.4/static/libpq-connect.html#AEN33199 -2) `connection.cancel()` is called. This method tries to abort the - current query and will block until the query is aborted or fully executed. - The return value is ``True`` if the query was successfully aborted or - ``False`` if it was executed. Query result are discarded in both cases. +When an asynchronous query is being executed, `connection.executing()` returns +`True`. Two cursors can't execute concurrent queries on the same asynchronous +connection. -3) `~cursor.execute()` is called again on the same cursor - (`!execute()` on a different cursor will simply raise an exception). - This waits for the complete execution of the current query, discard any - data and execute the new one. +There are several limitations in using asynchronous connections: the connection +is always in :ref:`autocommit ` mode and it is not possible to +change it using `~connection.set_isolation_level()`. So transaction are not +started at each query and is not possible to use methods `~connection.commit()` +and `~connection.rollback()`: you can manually control transactions using +`~cursor.execute()` to send commands :sql:`BEGIN`, :sql:`COMMIT` and +:sql:`ROLLBACK`. -Note that calling `!execute()` two times in a row will not abort the -former query and will temporarily go to synchronous mode until the first of -the two queries is executed. +With asynchronous connections it is also not possible to use +`~connection.set_client_encoding()`, `~cursor.executemany()`, :ref:`large +objects `, :ref:`named cursors `. -Cursors now have some extra methods that make them useful during -asynchronous queries: +:ref:`COPY commands ` are not supported either in asynchronous mode, but +this will be probably implemented in a future release. -`~cursor.fileno()` - Returns the file descriptor associated with the current connection and - make possible to use a cursor in a context where a file object would be - expected (like in a `select()` call). - -`~cursor.isready()` - Returns ``False`` if the backend is still processing the query or ``True`` - if data is ready to be fetched (by one of the `!fetch*()` methods). - -.. index:: - single: Example; Asynchronous query - -A code snippet that shows how to use the cursor object in a `!select()` -call:: - - import psycopg2 - import select - - conn = psycopg2.connect(database='test') - curs = conn.cursor() - curs.execute("SELECT * from test WHERE fielda > %s", (1971,), async=1) - - # wait for input with a maximum timeout of 5 seconds - query_ended = False - while not query_ended: - rread, rwrite, rspec = select([curs, another_file], [], [], 5) - - if curs.isready(): - query_ended = True - - # manage input from other sources like other_file, etc. - - print "Query Results:" - for row in curs: - print row .. testcode:: diff --git a/doc/src/connection.rst b/doc/src/connection.rst index 1e65b299..bf091bfa 100644 --- a/doc/src/connection.rst +++ b/doc/src/connection.rst @@ -314,3 +314,37 @@ The ``connection`` class .. _lo_import: http://www.postgresql.org/docs/8.4/static/lo-interfaces.html#AEN36307 .. versionadded:: 2.0.8 + + + + .. rubric:: Methods related to asynchronous support. + + .. seealso:: :ref:`Asynchronous support `. + + + .. method:: issync() + + Return `True` if the connection is synchronous, `False` if asynchronous. + + + .. method:: poll() + + Used during an asynchronous connection attempt, make communication + proceed if it wouldn't block. + + Return one of the constants defined in :ref:`poll-constants`. If it + returns `~psycopg2.extensions.POLL_OK` the connection has been + estabilished. Otherwise wait until the file descriptor is ready as + explained in :ref:`async-support`. + + + .. method:: fileno() + + Return the file descriptor associated with the connection to read the + status during asynchronous communication. + + + .. method:: executing() + + Return `True` if the connection is executing an asynchronous operation. + diff --git a/doc/src/cursor.rst b/doc/src/cursor.rst index b31f2e79..333739dc 100644 --- a/doc/src/cursor.rst +++ b/doc/src/cursor.rst @@ -109,7 +109,7 @@ The ``cursor`` class .. rubric:: Commands execution methods - .. method:: execute(operation [, parameters] [, async]) + .. method:: execute(operation [, parameters]) Prepare and execute a database operation (query or command). @@ -121,16 +121,6 @@ The ``cursor`` class The method returns `None`. If a query was executed, the returned values can be retrieved using |fetch*|_ methods. - If `async` is ``True``, query execution will be asynchronous: - the function returns immediately while the query is executed by the - backend. Use the `~cursor.isready()` method to see if the data is - ready for return via |fetch*|_ methods. See - :ref:`asynchronous-queries`. - - .. extension:: - - The `async` parameter is a Psycopg extension to the |DBAPI|. - .. method:: mogrify(operation [, parameters]) @@ -159,7 +149,7 @@ The ``cursor`` class the `~cursor.execute()` method. - .. method:: callproc(procname [, parameters] [, async]) + .. method:: callproc(procname [, parameters]) Call a stored database procedure with the given name. The sequence of parameters must contain one entry for each argument that the procedure @@ -170,16 +160,6 @@ The ``cursor`` class The procedure may also provide a result set as output. This must then be made available through the standard |fetch*|_ methods. - If `async` is ``True``, procedure execution will be asynchronous: - the function returns immediately while the procedure is executed by - the backend. Use the `~cursor.isready()` method to see if the - data is ready for return via |fetch*|_ methods. See - :ref:`asynchronous-queries`. - - .. extension:: - - The `async` parameter is a Psycopg extension to the |DBAPI|. - .. method:: setinputsizes(sizes) @@ -400,29 +380,6 @@ The ``cursor`` class |DBAPI|. - .. method:: isready() - - Return ``False`` if the backend is still processing an asynchronous - query or ``True`` if data is ready to be fetched by one of the - |fetch*|_ methods. See :ref:`asynchronous-queries`. - - .. extension:: - - The `isready()` method is a Psycopg extension to the |DBAPI|. - - - .. method:: fileno() - - Return the file descriptor associated with the current connection and - make possible to use a cursor in a context where a file object would - be expected (like in a `select()` call). See - :ref:`asynchronous-queries`. - - .. extension:: - - The `fileno()` method is a Psycopg extension to the |DBAPI|. - - .. attribute:: tzinfo_factory The time zone factory used to handle data types such as @@ -434,6 +391,33 @@ The ``cursor`` class + .. rubric:: Methods related to asynchronous support. + + .. extension:: + + :ref:`Asynchronous support ` is a Psycopg extension to + the |DBAPI|. + + + .. method:: poll() + + Used during asynchronous queries, make asynchronous communication + proceed if it wouldn't block. + + Return `~psycopg2.extensions.POLL_OK` if the query has been fully + processed, `~psycopg2.extensions.POLL_READ` if the query has been sent + and the application should be waiting for the result to arrive or + `~psycopg2.extensions.POLL_WRITE` is the query is still being sent. + + + .. method:: fileno() + + Return the file descriptor associated with the current connection to + make possible to use a cursor in a context where a file object would + be expected (like in a `select()` call). + + + .. rubric:: COPY-related methods .. extension:: diff --git a/doc/src/extensions.rst b/doc/src/extensions.rst index 436bb7aa..a672781c 100644 --- a/doc/src/extensions.rst +++ b/doc/src/extensions.rst @@ -21,6 +21,8 @@ functionalities defined by the |DBAPI|_. `!connect()` function using the `connection_factory` parameter. See also :ref:`subclassing-connection`. + Subclasses should have constructor signature :samp:`({dsn}, {async}=0)`. + For a complete description of the class, see `connection`. .. class:: cursor @@ -80,13 +82,13 @@ functionalities defined by the |DBAPI|_. .. method:: truncate(len=0) - .. versionadded:: 2.0.15 + .. versionadded:: 2.2.0 Truncate the lobject to the given size. - The method will only be available if psycopg has been built against libpq - from PostgreSQL 8.3 or later and can only be used with PostgreSQL servers - running these versions. It uses the |lo_truncate|_ libpq function. + The method will only be available if Psycopg has been built against libpq + from PostgreSQL 8.3 or later and can only be used with PostgreSQL servers + running these versions. It uses the |lo_truncate|_ libpq function. .. |lo_truncate| replace:: `!lo_truncate()` .. _lo_truncate: http://www.postgresql.org/docs/8.4/static/lo-interfaces.html#AEN36420 @@ -443,6 +445,40 @@ can be read from the `~connection.status` attribute. +.. index:: + pair: Poll status; Constants + +.. _poll-constants: + +Poll constants +-------------- + +.. versionadded:: 2.2.0 + +These values can be returned by `connection.poll()` and `cursor.poll()` during +asynchronous communication. See :ref:`async-support`. + +.. data:: POLL_OK + + The data is available (or the file descriptor is ready for writing): there + is no need to block anymore. + +.. data:: POLL_READ + + Upon receiving this value, the callback should wait for the connection + file descriptor to be ready *for reading*. For example:: + + select.select([conn.fileno()], [], []) + +.. data:: POLL_WRITE + + Upon receiving this value, the callback should wait for the connection + file descriptor to be ready *for writing*. For example:: + + select.select([], [conn.fileno()], []) + + + Additional database types ------------------------- diff --git a/doc/src/module.rst b/doc/src/module.rst index df067037..db7586e4 100644 --- a/doc/src/module.rst +++ b/doc/src/module.rst @@ -16,7 +16,7 @@ The module interface respects the standard defined in the |DBAPI|_. single: Port; Connection single: DSN (Database Source Name) -.. function:: connect(dsn or params[, connection_factory]) +.. function:: connect(dsn or params [, connection_factory] [, async=0]) Create a new database session and return a new `connection` object. @@ -40,15 +40,18 @@ The module interface respects the standard defined in the |DBAPI|_. .. __: http://www.postgresql.org/docs/8.4/static/libpq-ssl.html#LIBPQ-SSL-SSLMODE-STATEMENTS - Using the `connection_factory` parameter a different class or + Using the *connection_factory* parameter a different class or connections factory can be specified. It should be a callable object - taking a `dsn` argument. See :ref:`subclassing-connection` for + taking a *dsn* argument. See :ref:`subclassing-connection` for details. + Using *async*\=1 an asynchronous connection will be created: see + :ref:`async-support` to know about advantages and limitations. + .. extension:: - The `connection_factory` parameter is a Psycopg extension to the - |DBAPI|. + The parameters *connection_factory* and *async* are Psycopg extensions + to the |DBAPI|. .. data:: apilevel