diff --git a/NEWS b/NEWS index 2a6df275..7dc3e1b8 100644 --- a/NEWS +++ b/NEWS @@ -33,6 +33,8 @@ New features: - Added `~psycopg2.extensions.Diagnostics.severity_nonlocalized` attribute on the `~psycopg2.extensions.Diagnostics` object (:ticket:`#783`). - More efficient `~psycopg2.extras.NamedTupleCursor` (:ticket:`#838`). +- Async communication improved to fix blocking on results returned at + different times (:ticket:`#856`). Other changes: diff --git a/psycopg/adapter_datetime.c b/psycopg/adapter_datetime.c index 5620fc78..a340c9b4 100644 --- a/psycopg/adapter_datetime.c +++ b/psycopg/adapter_datetime.c @@ -38,8 +38,6 @@ RAISES_NEG int psyco_adapter_datetime_init(void) { - Dprintf("psyco_adapter_datetime_init: datetime init"); - PyDateTime_IMPORT; if (!PyDateTimeAPI) { diff --git a/psycopg/adapter_mxdatetime.c b/psycopg/adapter_mxdatetime.c index cca8790e..4c2e274c 100644 --- a/psycopg/adapter_mxdatetime.c +++ b/psycopg/adapter_mxdatetime.c @@ -38,8 +38,6 @@ int psyco_adapter_mxdatetime_init(void) { - Dprintf("psyco_adapter_mxdatetime_init: mx.DateTime init"); - if (mxDateTime_ImportModuleAndAPI()) { Dprintf("psyco_adapter_mxdatetime_init: mx.DateTime initialization failed"); PyErr_Clear(); diff --git a/psycopg/connection.h b/psycopg/connection.h index 60ec6205..b7344d7e 100644 --- a/psycopg/connection.h +++ b/psycopg/connection.h @@ -108,6 +108,7 @@ struct connectionObject { * for a green connection. If NULL, the connection is idle. */ PyObject *async_cursor; int async_status; /* asynchronous execution status */ + PGresult *pgres; /* temporary result across async calls */ /* notice processing */ PyObject *notice_list; diff --git a/psycopg/connection_int.c b/psycopg/connection_int.c index aea85b9c..c05f48ee 100644 --- a/psycopg/connection_int.c +++ b/psycopg/connection_int.c @@ -863,11 +863,16 @@ _conn_poll_connecting(connectionObject *self) /* Advance to the next state after an attempt of flushing output */ static int -_conn_poll_advance_write(connectionObject *self, int flush) +_conn_poll_advance_write(connectionObject *self) { int res; + int flush; Dprintf("conn_poll: poll writing"); + + flush = PQflush(self->pgconn); + Dprintf("conn_poll: PQflush() = %i", flush); + switch (flush) { case 0: /* success */ /* we've finished pushing the query to the server. Let's start @@ -891,18 +896,24 @@ _conn_poll_advance_write(connectionObject *self, int flush) return res; } -/* Advance to the next state after a call to a pq_is_busy* function */ + +/* Advance to the next state after reading results */ + static int -_conn_poll_advance_read(connectionObject *self, int busy) +_conn_poll_advance_read(connectionObject *self) { int res; + int busy; Dprintf("conn_poll: poll reading"); + + busy = pq_get_result_async(self); + switch (busy) { case 0: /* result is ready */ - res = PSYCO_POLL_OK; Dprintf("conn_poll: async_status -> ASYNC_DONE"); self->async_status = ASYNC_DONE; + res = PSYCO_POLL_OK; break; case 1: /* result not ready: fd would block */ res = PSYCO_POLL_READ; @@ -911,13 +922,15 @@ _conn_poll_advance_read(connectionObject *self, int busy) res = PSYCO_POLL_ERROR; break; default: - Dprintf("conn_poll: unexpected result from pq_is_busy: %d", busy); + Dprintf("conn_poll: unexpected result from pq_get_result_async: %d", + busy); res = PSYCO_POLL_ERROR; break; } return res; } + /* Poll the connection for the send query/retrieve result phase Advance the async_status (usually going WRITE -> READ -> DONE) but don't @@ -931,27 +944,18 @@ _conn_poll_query(connectionObject *self) switch (self->async_status) { case ASYNC_WRITE: Dprintf("conn_poll: async_status = ASYNC_WRITE"); - res = _conn_poll_advance_write(self, PQflush(self->pgconn)); + res = _conn_poll_advance_write(self); break; case ASYNC_READ: Dprintf("conn_poll: async_status = ASYNC_READ"); - if (self->async) { - res = _conn_poll_advance_read(self, pq_is_busy(self)); - } - else { - /* we are a green connection being polled as result of a query. - this means that our caller has the lock and we are being called - from the callback. If we tried to acquire the lock now it would - be a deadlock. */ - res = _conn_poll_advance_read(self, pq_is_busy_locked(self)); - } + res = _conn_poll_advance_read(self); break; case ASYNC_DONE: Dprintf("conn_poll: async_status = ASYNC_DONE"); /* We haven't asked anything: just check for notifications. */ - res = _conn_poll_advance_read(self, pq_is_busy(self)); + res = _conn_poll_advance_read(self); break; default: @@ -974,7 +978,6 @@ static int _conn_poll_setup_async(connectionObject *self) { int res = PSYCO_POLL_ERROR; - PGresult *pgres; switch (self->status) { case CONN_STATUS_CONNECTING: @@ -1025,12 +1028,12 @@ _conn_poll_setup_async(connectionObject *self) res = _conn_poll_query(self); if (res == PSYCO_POLL_OK) { res = PSYCO_POLL_ERROR; - pgres = pq_get_last_result(self); - if (pgres == NULL || PQresultStatus(pgres) != PGRES_COMMAND_OK ) { + if (self->pgres == NULL + || PQresultStatus(self->pgres) != PGRES_COMMAND_OK ) { PyErr_SetString(OperationalError, "can't set datestyle to ISO"); break; } - CLEARPGRES(pgres); + CLEARPGRES(self->pgres); Dprintf("conn_poll: status -> CONN_STATUS_READY"); self->status = CONN_STATUS_READY; @@ -1042,6 +1045,35 @@ _conn_poll_setup_async(connectionObject *self) } +static cursorObject * +_conn_get_async_cursor(connectionObject *self) { + PyObject *py_curs; + + if (!(self->async_cursor)) { + PyErr_SetString(PyExc_SystemError, + "unexpectedly, there's no async cursor here"); + goto error; + } + + if (!(py_curs = PyWeakref_GetObject(self->async_cursor))) { + PyErr_SetString(PyExc_SystemError, + "got null dereferencing cursor weakref"); + goto error; + } + if (Py_None == py_curs) { + PyErr_SetString(InterfaceError, + "the asynchronous cursor has disappeared"); + goto error; + } + + Py_INCREF(py_curs); + return (cursorObject *)py_curs; + +error: + pq_clear_async(self); + return NULL; +} + /* conn_poll - Main polling switch * * The function is called in all the states and connection types and invokes @@ -1056,12 +1088,13 @@ conn_poll(connectionObject *self) switch (self->status) { case CONN_STATUS_SETUP: - Dprintf("conn_poll: status -> CONN_STATUS_CONNECTING"); + Dprintf("conn_poll: status -> CONN_STATUS_SETUP"); self->status = CONN_STATUS_CONNECTING; res = PSYCO_POLL_WRITE; break; case CONN_STATUS_CONNECTING: + Dprintf("conn_poll: status -> CONN_STATUS_CONNECTING"); res = _conn_poll_connecting(self); if (res == PSYCO_POLL_OK && self->async) { res = _conn_poll_setup_async(self); @@ -1069,39 +1102,29 @@ conn_poll(connectionObject *self) break; case CONN_STATUS_DATESTYLE: + Dprintf("conn_poll: status -> CONN_STATUS_DATESTYLE"); res = _conn_poll_setup_async(self); break; case CONN_STATUS_READY: case CONN_STATUS_BEGIN: case CONN_STATUS_PREPARED: + Dprintf("conn_poll: status -> CONN_STATUS_*"); res = _conn_poll_query(self); - if (res == PSYCO_POLL_OK && self->async && self->async_cursor) { + if (res == PSYCO_POLL_OK && self->async) { + cursorObject *curs; + /* An async query has just finished: parse the tuple in the * target cursor. */ - cursorObject *curs; - PyObject *py_curs; - if (!(py_curs = PyWeakref_GetObject(self->async_cursor))) { - /* It shouldn't happen but consider it to avoid dereferencing - * a null pointer below. */ - pq_clear_async(self); - PyErr_SetString(PyExc_SystemError, - "got null dereferencing cursor weakref"); - res = PSYCO_POLL_ERROR; - break; - } - if (Py_None == py_curs) { - pq_clear_async(self); - PyErr_SetString(InterfaceError, - "the asynchronous cursor has disappeared"); + if (!(curs = _conn_get_async_cursor(self))) { res = PSYCO_POLL_ERROR; break; } - curs = (cursorObject *)py_curs; - CLEARPGRES(curs->pgres); - curs->pgres = pq_get_last_result(self); + PQclear(curs->pgres); + curs->pgres = self->pgres; + self->pgres = NULL; /* fetch the tuples (if there are any) and build the result. We * don't care if pq_fetch return 0 or 1, but if there was an error, @@ -1111,6 +1134,7 @@ conn_poll(connectionObject *self) } /* We have finished with our async_cursor */ + Py_DECREF(curs); Py_CLEAR(self->async_cursor); } break; @@ -1120,6 +1144,7 @@ conn_poll(connectionObject *self) res = PSYCO_POLL_ERROR; } + Dprintf("conn_poll: returning %d", res); return res; } diff --git a/psycopg/connection_type.c b/psycopg/connection_type.c index d4a969b4..96e56474 100644 --- a/psycopg/connection_type.c +++ b/psycopg/connection_type.c @@ -1432,6 +1432,7 @@ connection_dealloc(PyObject* obj) PyMem_Free(self->encoding); if (self->critical) free(self->critical); if (self->cancel) PQfreeCancel(self->cancel); + PQclear(self->pgres); connection_clear(self); diff --git a/psycopg/cursor_type.c b/psycopg/cursor_type.c index 875be663..0d10e61f 100644 --- a/psycopg/cursor_type.c +++ b/psycopg/cursor_type.c @@ -52,8 +52,6 @@ psyco_curs_close(cursorObject *self, PyObject *dummy) PyObject *rv = NULL; char *lname = NULL; - EXC_IF_ASYNC_IN_PROGRESS(self, close); - if (self->closed) { rv = Py_None; Py_INCREF(rv); @@ -64,6 +62,8 @@ psyco_curs_close(cursorObject *self, PyObject *dummy) char buffer[256]; PGTransactionStatusType status; + EXC_IF_ASYNC_IN_PROGRESS(self, close_named); + if (self->conn) { status = PQtransactionStatus(self->conn->pgconn); } diff --git a/psycopg/green.c b/psycopg/green.c index fec59d95..d2d73838 100644 --- a/psycopg/green.c +++ b/psycopg/green.c @@ -177,10 +177,12 @@ psyco_exec_green(connectionObject *conn, const char *command) goto end; } - /* Now we can read the data without fear of blocking. */ - result = pq_get_last_result(conn); + /* the result is now in the connection: take its ownership */ + result = conn->pgres; + conn->pgres = NULL; end: + CLEARPGRES(conn->pgres); conn->async_status = ASYNC_DONE; Py_CLEAR(conn->async_cursor); return result; diff --git a/psycopg/microprotocols.c b/psycopg/microprotocols.c index 4f4fe806..ba73f905 100644 --- a/psycopg/microprotocols.c +++ b/psycopg/microprotocols.c @@ -68,8 +68,6 @@ microprotocols_add(PyTypeObject *type, PyObject *proto, PyObject *cast) if (proto == NULL) proto = (PyObject*)&isqlquoteType; - Dprintf("microprotocols_add: cast %p for (%s, ?)", cast, type->tp_name); - if (!(key = PyTuple_Pack(2, (PyObject*)type, proto))) { goto exit; } if (0 != PyDict_SetItem(psyco_adapters, key, cast)) { goto exit; } diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index 5e56838e..0789ab83 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -764,63 +764,22 @@ exit: } -/* pq_is_busy - consume input and return connection status - - a status of 1 means that a call to pq_fetch will block, while a status of 0 - means that there is data available to be collected. -1 means an error, the - exception will be set accordingly. - - this function locks the connection object - this function call Py_*_ALLOW_THREADS macros */ - -int -pq_is_busy(connectionObject *conn) -{ - int res; - Dprintf("pq_is_busy: consuming input"); - - Py_BEGIN_ALLOW_THREADS; - pthread_mutex_lock(&(conn->lock)); - - if (PQconsumeInput(conn->pgconn) == 0) { - Dprintf("pq_is_busy: PQconsumeInput() failed"); - pthread_mutex_unlock(&(conn->lock)); - Py_BLOCK_THREADS; - - /* if the libpq says pgconn is lost, close the py conn */ - if (CONNECTION_BAD == PQstatus(conn->pgconn)) { - conn->closed = 2; - } - - PyErr_SetString(OperationalError, PQerrorMessage(conn->pgconn)); - return -1; - } - - res = PQisBusy(conn->pgconn); - - Py_BLOCK_THREADS; - conn_notifies_process(conn); - conn_notice_process(conn); - Py_UNBLOCK_THREADS; - - pthread_mutex_unlock(&(conn->lock)); - Py_END_ALLOW_THREADS; - - return res; -} - -/* pq_is_busy_locked - equivalent to pq_is_busy but we already have the lock +/* pq_get_result_async - read an available result without blocking. + * + * Return 0 if the result is ready, 1 if it will block, -1 on error. + * The last result will be returned in pgres. * * The function should be called with the lock and holding the GIL. */ -int -pq_is_busy_locked(connectionObject *conn) +RAISES_NEG int +pq_get_result_async(connectionObject *conn) { - Dprintf("pq_is_busy_locked: consuming input"); + int rv = -1; + Dprintf("pq_get_result_async: calling PQconsumeInput()"); if (PQconsumeInput(conn->pgconn) == 0) { - Dprintf("pq_is_busy_locked: PQconsumeInput() failed"); + Dprintf("pq_get_result_async: PQconsumeInput() failed"); /* if the libpq says pgconn is lost, close the py conn */ if (CONNECTION_BAD == PQstatus(conn->pgconn)) { @@ -828,13 +787,69 @@ pq_is_busy_locked(connectionObject *conn) } PyErr_SetString(OperationalError, PQerrorMessage(conn->pgconn)); - return -1; + goto exit; } - /* notices and notifies will be processed at the end of the loop we are in - * (async reading) by pq_fetch. */ + conn_notifies_process(conn); + conn_notice_process(conn); - return PQisBusy(conn->pgconn); + for (;;) { + int busy; + PGresult *res; + ExecStatusType status; + + Dprintf("pq_get_result_async: calling PQisBusy()"); + busy = PQisBusy(conn->pgconn); + + if (busy) { + /* try later */ + Dprintf("pq_get_result_async: PQisBusy() = 1"); + rv = 1; + goto exit; + } + + if (!(res = PQgetResult(conn->pgconn))) { + Dprintf("pq_get_result_async: got no result"); + /* the result is ready: it was the previously read one */ + rv = 0; + goto exit; + } + + status = PQresultStatus(res); + Dprintf("pq_get_result_async: got result %s", PQresStatus(status)); + + /* Store the result outside because we want to return the last non-null + * one and we may have to do it across poll calls. However if there is + * an error in the stream of results we want to handle the *first* + * error. So don't clobber it with the following ones. */ + if (conn->pgres && PQresultStatus(conn->pgres) == PGRES_FATAL_ERROR) { + Dprintf("previous pgres is error: discarding"); + PQclear(res); + } + else { + PQclear(conn->pgres); + conn->pgres = res; + } + + switch (status) { + case PGRES_COPY_OUT: + case PGRES_COPY_IN: + case PGRES_COPY_BOTH: + /* After entering copy mode, libpq will make a phony + * PGresult for us every time we query for it, so we need to + * break out of this endless loop. */ + rv = 0; + goto exit; + + default: + /* keep on reading to check if there are other results or + * we have finished. */ + continue; + } + } + +exit: + return rv; } /* pq_flush - flush output and return connection status @@ -940,10 +955,8 @@ _pq_execute_sync(cursorObject *curs, const char *query, int no_result, int no_be } RAISES_NEG int -_pq_execute_async(cursorObject *curs, const char *query, int no_result, int no_begin) +_pq_execute_async(cursorObject *curs, const char *query, int no_result) { - PGresult *pgres = NULL; - char *error = NULL; int async_status = ASYNC_WRITE; int ret; @@ -952,15 +965,6 @@ _pq_execute_async(cursorObject *curs, const char *query, int no_result, int no_b Py_BEGIN_ALLOW_THREADS; pthread_mutex_lock(&(curs->conn->lock)); - /* TODO: is this needed here? */ - if (!no_begin && pq_begin_locked(curs->conn, &pgres, &error, &_save) < 0) { - pthread_mutex_unlock(&(curs->conn->lock)); - Py_BLOCK_THREADS; - pq_complete_error(curs->conn, &pgres, &error); - return -1; - } - - Dprintf("pq_execute: executing ASYNC query: pgconn = %p", curs->conn->pgconn); Dprintf(" %-.200s", query); @@ -1028,7 +1032,7 @@ pq_execute(cursorObject *curs, const char *query, int async, int no_result, int if (!async) { return _pq_execute_sync(curs, query, no_result, no_begin); } else { - return _pq_execute_async(curs, query, no_result, no_begin); + return _pq_execute_async(curs, query, no_result); } } @@ -1047,6 +1051,7 @@ pq_send_query(connectionObject *conn, const char *query) Dprintf("pq_send_query: sending ASYNC query:"); Dprintf(" %-.200s", query); + CLEARPGRES(conn->pgres); if (0 == (rv = PQsendQuery(conn->pgconn, query))) { Dprintf("pq_send_query: error: %s", PQerrorMessage(conn->pgconn)); } @@ -1054,45 +1059,6 @@ pq_send_query(connectionObject *conn, const char *query) return rv; } -/* Return the last result available on the connection. - * - * The function will block only if a command is active and the - * necessary response data has not yet been read by PQconsumeInput. - * - * The result should be disposed of using PQclear() - */ -PGresult * -pq_get_last_result(connectionObject *conn) -{ - PGresult *result = NULL, *res; - ExecStatusType status; - - /* Read until PQgetResult gives a NULL */ - while (NULL != (res = PQgetResult(conn->pgconn))) { - if (result) { - /* TODO too bad: we are discarding results from all the queries - * except the last. We could have populated `nextset()` with it - * but it would be an incompatible change (apps currently issue - * groups of queries expecting to receive the last result: they - * would start receiving the first instead). */ - PQclear(result); - } - result = res; - status = PQresultStatus(result); - Dprintf("pq_get_last_result: got result %s", PQresStatus(status)); - - /* After entering copy mode, libpq will make a phony - * PGresult for us every time we query for it, so we need to - * break out of this endless loop. */ - if (status == PGRES_COPY_BOTH - || status == PGRES_COPY_OUT - || status == PGRES_COPY_IN) { - break; - } - } - - return result; -} /* pq_fetch - fetch data after a query diff --git a/psycopg/pqpath.h b/psycopg/pqpath.h index 81120a00..0ad74d97 100644 --- a/psycopg/pqpath.h +++ b/psycopg/pqpath.h @@ -35,7 +35,6 @@ #define CLEARPGRES(pgres) do { PQclear(pgres); pgres = NULL; } while (0) /* exported functions */ -HIDDEN PGresult *pq_get_last_result(connectionObject *conn); RAISES_NEG HIDDEN int pq_fetch(cursorObject *curs, int no_result); RAISES_NEG HIDDEN int pq_execute(cursorObject *curs, const char *query, int async, int no_result, int no_begin); @@ -59,8 +58,7 @@ HIDDEN int pq_tpc_command_locked(connectionObject *conn, const char *cmd, const char *tid, PGresult **pgres, char **error, PyThreadState **tstate); -HIDDEN int pq_is_busy(connectionObject *conn); -HIDDEN int pq_is_busy_locked(connectionObject *conn); +RAISES_NEG HIDDEN int pq_get_result_async(connectionObject *conn); HIDDEN int pq_flush(connectionObject *conn); HIDDEN void pq_clear_async(connectionObject *conn); RAISES_NEG HIDDEN int pq_set_non_blocking(connectionObject *conn, int arg); diff --git a/psycopg/psycopgmodule.c b/psycopg/psycopgmodule.c index d4a3fde9..003904c8 100644 --- a/psycopg/psycopgmodule.c +++ b/psycopg/psycopgmodule.c @@ -303,7 +303,7 @@ adapters_init(PyObject *module) if (0 > microprotocols_init(module)) { goto exit; } - Dprintf("psycopgmodule: configuring adapters"); + Dprintf("psycopgmodule: initializing adapters"); if (0 > microprotocols_add(&PyFloat_Type, NULL, (PyObject*)&pfloatType)) { goto exit; @@ -935,7 +935,7 @@ datetime_init(void) { PyObject *dt = NULL; - Dprintf("psycopgmodule: datetime module"); + Dprintf("psycopgmodule: initializing datetime module"); /* import python builtin datetime module, if available */ if (!(dt = PyImport_ImportModule("datetime"))) { diff --git a/psycopg/replication_cursor_type.c b/psycopg/replication_cursor_type.c index 3f3481d3..f3e829a4 100644 --- a/psycopg/replication_cursor_type.c +++ b/psycopg/replication_cursor_type.c @@ -198,8 +198,6 @@ psyco_repl_curs_send_feedback(replicationCursorObject *self, RAISES_NEG int psyco_repl_curs_datetime_init(void) { - Dprintf("psyco_repl_curs_datetime_init: datetime init"); - PyDateTime_IMPORT; if (!PyDateTimeAPI) { diff --git a/psycopg/replication_message_type.c b/psycopg/replication_message_type.c index d112c9da..248e9ec3 100644 --- a/psycopg/replication_message_type.c +++ b/psycopg/replication_message_type.c @@ -33,8 +33,6 @@ RAISES_NEG int psyco_replmsg_datetime_init(void) { - Dprintf("psyco_replmsg_datetime_init: datetime init"); - PyDateTime_IMPORT; if (!PyDateTimeAPI) { diff --git a/psycopg/typecast.c b/psycopg/typecast.c index f75e046a..9710663f 100644 --- a/psycopg/typecast.c +++ b/psycopg/typecast.c @@ -271,8 +271,6 @@ typecast_init(PyObject *module) /* insert the cast types into the 'types' dictionary and register them in the module dictionary */ for (i = 0; typecast_builtins[i].name != NULL; i++) { - Dprintf("typecast_init: initializing %s", typecast_builtins[i].name); - t = (typecastObject *)typecast_from_c(&(typecast_builtins[i]), dict); if (t == NULL) { goto exit; } if (typecast_add((PyObject *)t, NULL, 0) < 0) { goto exit; } @@ -295,7 +293,6 @@ typecast_init(PyObject *module) #ifdef HAVE_MXDATETIME if (0 == psyco_typecast_mxdatetime_init()) { for (i = 0; typecast_mxdatetime[i].name != NULL; i++) { - Dprintf("typecast_init: initializing %s", typecast_mxdatetime[i].name); t = (typecastObject *)typecast_from_c(&(typecast_mxdatetime[i]), dict); if (t == NULL) { goto exit; } PyDict_SetItem(dict, t->name, (PyObject *)t); @@ -307,7 +304,6 @@ typecast_init(PyObject *module) if (0 > psyco_typecast_datetime_init()) { goto exit; } for (i = 0; typecast_pydatetime[i].name != NULL; i++) { - Dprintf("typecast_init: initializing %s", typecast_pydatetime[i].name); t = (typecastObject *)typecast_from_c(&(typecast_pydatetime[i]), dict); if (t == NULL) { goto exit; } PyDict_SetItem(dict, t->name, (PyObject *)t); @@ -331,23 +327,15 @@ typecast_add(PyObject *obj, PyObject *dict, int binary) typecastObject *type = (typecastObject *)obj; - Dprintf("typecast_add: object at %p, values refcnt = " - FORMAT_CODE_PY_SSIZE_T, - obj, Py_REFCNT(type->values) - ); - if (dict == NULL) dict = (binary ? psyco_binary_types : psyco_types); len = PyTuple_Size(type->values); for (i = 0; i < len; i++) { val = PyTuple_GetItem(type->values, i); - Dprintf("typecast_add: adding val: %ld", PyInt_AsLong(val)); PyDict_SetItem(dict, val, obj); } - Dprintf("typecast_add: base caster: %p", type->bcast); - return 0; } @@ -531,9 +519,6 @@ typecast_new(PyObject *name, PyObject *values, PyObject *cast, PyObject *base) obj = PyObject_GC_New(typecastObject, &typecastType); if (obj == NULL) return NULL; - Dprintf("typecast_new: new type at = %p, refcnt = " FORMAT_CODE_PY_SSIZE_T, - obj, Py_REFCNT(obj)); - Py_INCREF(values); obj->values = values; @@ -560,8 +545,6 @@ typecast_new(PyObject *name, PyObject *values, PyObject *cast, PyObject *base) PyObject_GC_Track(obj); - Dprintf("typecast_new: typecast object created at %p", obj); - return (PyObject *)obj; } diff --git a/psycopg/typecast_datetime.c b/psycopg/typecast_datetime.c index 7e5789b2..a5b882e9 100644 --- a/psycopg/typecast_datetime.c +++ b/psycopg/typecast_datetime.c @@ -29,8 +29,6 @@ RAISES_NEG static int psyco_typecast_datetime_init(void) { - Dprintf("psyco_typecast_datetime_init: datetime init"); - PyDateTime_IMPORT; if (!PyDateTimeAPI) { diff --git a/psycopg/typecast_mxdatetime.c b/psycopg/typecast_mxdatetime.c index 46147cc3..1abeb40f 100644 --- a/psycopg/typecast_mxdatetime.c +++ b/psycopg/typecast_mxdatetime.c @@ -31,8 +31,6 @@ static int psyco_typecast_mxdatetime_init(void) { - Dprintf("psyco_typecast_mxdatetime_init: mx.DateTime init"); - if (mxDateTime_ImportModuleAndAPI()) { Dprintf("psyco_typecast_mxdatetime_init: mx.DateTime initialization failed"); PyErr_Clear(); diff --git a/tests/test_async.py b/tests/test_async.py index 057deac1..21ef7fe5 100755 --- a/tests/test_async.py +++ b/tests/test_async.py @@ -29,6 +29,7 @@ import unittest import warnings import psycopg2 +import psycopg2.errors from psycopg2 import extensions as ext from .testutils import ConnectingTestCase, StringIO, skip_before_postgres, slow @@ -405,6 +406,15 @@ class AsyncTests(ConnectingTestCase): cur.execute("delete from table1") self.wait(cur) + def test_stop_on_first_error(self): + cur = self.conn.cursor() + cur.execute("select 1; select x; select 1/0; select 2") + self.assertRaises(psycopg2.errors.UndefinedColumn, self.wait, cur) + + cur.execute("select 1") + self.wait(cur) + self.assertEqual(cur.fetchone(), (1,)) + def test_error_two_cursors(self): cur = self.conn.cursor() cur2 = self.conn.cursor() @@ -454,6 +464,37 @@ class AsyncTests(ConnectingTestCase): cur.execute("copy (select 1) to stdout") self.assertRaises(psycopg2.ProgrammingError, self.wait, self.conn) + @slow + @skip_before_postgres(9, 0) + def test_non_block_after_notification(self): + from select import select + + cur = self.conn.cursor() + cur.execute(""" + select 1; + do $$ + begin + raise notice 'hello'; + end + $$ language plpgsql; + select pg_sleep(1); + """) + + polls = 0 + while True: + state = self.conn.poll() + if state == psycopg2.extensions.POLL_OK: + break + elif state == psycopg2.extensions.POLL_READ: + select([self.conn], [], [], 0.1) + elif state == psycopg2.extensions.POLL_WRITE: + select([], [self.conn], [], 0.1) + else: + raise Exception("Unexpected result from poll: %r", state) + polls += 1 + + self.assert_(polls >= 8, polls) + def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__) diff --git a/tests/test_green.py b/tests/test_green.py index 76ee1c83..46f18dcb 100755 --- a/tests/test_green.py +++ b/tests/test_green.py @@ -25,6 +25,7 @@ import select import unittest import warnings + import psycopg2 import psycopg2.extensions import psycopg2.extras @@ -58,10 +59,10 @@ class GreenTestCase(ConnectingTestCase): ConnectingTestCase.tearDown(self) psycopg2.extensions.set_wait_callback(self._cb) - def set_stub_wait_callback(self, conn): + def set_stub_wait_callback(self, conn, cb=None): stub = ConnectionStub(conn) psycopg2.extensions.set_wait_callback( - lambda conn: psycopg2.extras.wait_select(stub)) + lambda conn: (cb or psycopg2.extras.wait_select)(stub)) return stub @slow @@ -119,6 +120,36 @@ class GreenTestCase(ConnectingTestCase): self.assertRaises(psycopg2.ProgrammingError, cur.execute, "copy (select 1) to stdout") + @slow + @skip_before_postgres(9, 0) + def test_non_block_after_notification(self): + def wait(conn): + while 1: + state = conn.poll() + if state == POLL_OK: + break + elif state == POLL_READ: + select.select([conn.fileno()], [], [], 0.1) + elif state == POLL_WRITE: + select.select([], [conn.fileno()], [], 0.1) + else: + raise conn.OperationalError("bad state from poll: %s" % state) + + stub = self.set_stub_wait_callback(self.conn, wait) + cur = self.conn.cursor() + cur.execute(""" + select 1; + do $$ + begin + raise notice 'hello'; + end + $$ language plpgsql; + select pg_sleep(1); + """) + + polls = stub.polls.count(POLL_READ) + self.assert_(polls > 8, polls) + class CallbackErrorTestCase(ConnectingTestCase): def setUp(self): diff --git a/tests/testutils.py b/tests/testutils.py index 843bfdfb..405da18a 100644 --- a/tests/testutils.py +++ b/tests/testutils.py @@ -175,9 +175,9 @@ class ConnectingTestCase(unittest.TestCase): if state == psycopg2.extensions.POLL_OK: break elif state == psycopg2.extensions.POLL_READ: - select.select([pollable], [], [], 10) + select.select([pollable], [], [], 1) elif state == psycopg2.extensions.POLL_WRITE: - select.select([], [pollable], [], 10) + select.select([], [pollable], [], 1) else: raise Exception("Unexpected result from poll: %r", state)