mirror of
https://github.com/psycopg/psycopg2.git
synced 2024-11-22 17:06:33 +03:00
Merge branch 'fix-856'
This commit is contained in:
commit
917335eacb
2
NEWS
2
NEWS
|
@ -33,6 +33,8 @@ New features:
|
|||
- Added `~psycopg2.extensions.Diagnostics.severity_nonlocalized` attribute on
|
||||
the `~psycopg2.extensions.Diagnostics` object (:ticket:`#783`).
|
||||
- More efficient `~psycopg2.extras.NamedTupleCursor` (:ticket:`#838`).
|
||||
- Async communication improved to fix blocking on results returned at
|
||||
different times (:ticket:`#856`).
|
||||
|
||||
Other changes:
|
||||
|
||||
|
|
|
@ -38,8 +38,6 @@
|
|||
RAISES_NEG int
|
||||
psyco_adapter_datetime_init(void)
|
||||
{
|
||||
Dprintf("psyco_adapter_datetime_init: datetime init");
|
||||
|
||||
PyDateTime_IMPORT;
|
||||
|
||||
if (!PyDateTimeAPI) {
|
||||
|
|
|
@ -38,8 +38,6 @@
|
|||
int
|
||||
psyco_adapter_mxdatetime_init(void)
|
||||
{
|
||||
Dprintf("psyco_adapter_mxdatetime_init: mx.DateTime init");
|
||||
|
||||
if (mxDateTime_ImportModuleAndAPI()) {
|
||||
Dprintf("psyco_adapter_mxdatetime_init: mx.DateTime initialization failed");
|
||||
PyErr_Clear();
|
||||
|
|
|
@ -108,6 +108,7 @@ struct connectionObject {
|
|||
* for a green connection. If NULL, the connection is idle. */
|
||||
PyObject *async_cursor;
|
||||
int async_status; /* asynchronous execution status */
|
||||
PGresult *pgres; /* temporary result across async calls */
|
||||
|
||||
/* notice processing */
|
||||
PyObject *notice_list;
|
||||
|
|
|
@ -863,11 +863,16 @@ _conn_poll_connecting(connectionObject *self)
|
|||
/* Advance to the next state after an attempt of flushing output */
|
||||
|
||||
static int
|
||||
_conn_poll_advance_write(connectionObject *self, int flush)
|
||||
_conn_poll_advance_write(connectionObject *self)
|
||||
{
|
||||
int res;
|
||||
int flush;
|
||||
|
||||
Dprintf("conn_poll: poll writing");
|
||||
|
||||
flush = PQflush(self->pgconn);
|
||||
Dprintf("conn_poll: PQflush() = %i", flush);
|
||||
|
||||
switch (flush) {
|
||||
case 0: /* success */
|
||||
/* we've finished pushing the query to the server. Let's start
|
||||
|
@ -891,18 +896,24 @@ _conn_poll_advance_write(connectionObject *self, int flush)
|
|||
return res;
|
||||
}
|
||||
|
||||
/* Advance to the next state after a call to a pq_is_busy* function */
|
||||
|
||||
/* Advance to the next state after reading results */
|
||||
|
||||
static int
|
||||
_conn_poll_advance_read(connectionObject *self, int busy)
|
||||
_conn_poll_advance_read(connectionObject *self)
|
||||
{
|
||||
int res;
|
||||
int busy;
|
||||
|
||||
Dprintf("conn_poll: poll reading");
|
||||
|
||||
busy = pq_get_result_async(self);
|
||||
|
||||
switch (busy) {
|
||||
case 0: /* result is ready */
|
||||
res = PSYCO_POLL_OK;
|
||||
Dprintf("conn_poll: async_status -> ASYNC_DONE");
|
||||
self->async_status = ASYNC_DONE;
|
||||
res = PSYCO_POLL_OK;
|
||||
break;
|
||||
case 1: /* result not ready: fd would block */
|
||||
res = PSYCO_POLL_READ;
|
||||
|
@ -911,13 +922,15 @@ _conn_poll_advance_read(connectionObject *self, int busy)
|
|||
res = PSYCO_POLL_ERROR;
|
||||
break;
|
||||
default:
|
||||
Dprintf("conn_poll: unexpected result from pq_is_busy: %d", busy);
|
||||
Dprintf("conn_poll: unexpected result from pq_get_result_async: %d",
|
||||
busy);
|
||||
res = PSYCO_POLL_ERROR;
|
||||
break;
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
|
||||
/* Poll the connection for the send query/retrieve result phase
|
||||
|
||||
Advance the async_status (usually going WRITE -> READ -> DONE) but don't
|
||||
|
@ -931,27 +944,18 @@ _conn_poll_query(connectionObject *self)
|
|||
switch (self->async_status) {
|
||||
case ASYNC_WRITE:
|
||||
Dprintf("conn_poll: async_status = ASYNC_WRITE");
|
||||
res = _conn_poll_advance_write(self, PQflush(self->pgconn));
|
||||
res = _conn_poll_advance_write(self);
|
||||
break;
|
||||
|
||||
case ASYNC_READ:
|
||||
Dprintf("conn_poll: async_status = ASYNC_READ");
|
||||
if (self->async) {
|
||||
res = _conn_poll_advance_read(self, pq_is_busy(self));
|
||||
}
|
||||
else {
|
||||
/* we are a green connection being polled as result of a query.
|
||||
this means that our caller has the lock and we are being called
|
||||
from the callback. If we tried to acquire the lock now it would
|
||||
be a deadlock. */
|
||||
res = _conn_poll_advance_read(self, pq_is_busy_locked(self));
|
||||
}
|
||||
res = _conn_poll_advance_read(self);
|
||||
break;
|
||||
|
||||
case ASYNC_DONE:
|
||||
Dprintf("conn_poll: async_status = ASYNC_DONE");
|
||||
/* We haven't asked anything: just check for notifications. */
|
||||
res = _conn_poll_advance_read(self, pq_is_busy(self));
|
||||
res = _conn_poll_advance_read(self);
|
||||
break;
|
||||
|
||||
default:
|
||||
|
@ -974,7 +978,6 @@ static int
|
|||
_conn_poll_setup_async(connectionObject *self)
|
||||
{
|
||||
int res = PSYCO_POLL_ERROR;
|
||||
PGresult *pgres;
|
||||
|
||||
switch (self->status) {
|
||||
case CONN_STATUS_CONNECTING:
|
||||
|
@ -1025,12 +1028,12 @@ _conn_poll_setup_async(connectionObject *self)
|
|||
res = _conn_poll_query(self);
|
||||
if (res == PSYCO_POLL_OK) {
|
||||
res = PSYCO_POLL_ERROR;
|
||||
pgres = pq_get_last_result(self);
|
||||
if (pgres == NULL || PQresultStatus(pgres) != PGRES_COMMAND_OK ) {
|
||||
if (self->pgres == NULL
|
||||
|| PQresultStatus(self->pgres) != PGRES_COMMAND_OK ) {
|
||||
PyErr_SetString(OperationalError, "can't set datestyle to ISO");
|
||||
break;
|
||||
}
|
||||
CLEARPGRES(pgres);
|
||||
CLEARPGRES(self->pgres);
|
||||
|
||||
Dprintf("conn_poll: status -> CONN_STATUS_READY");
|
||||
self->status = CONN_STATUS_READY;
|
||||
|
@ -1042,6 +1045,35 @@ _conn_poll_setup_async(connectionObject *self)
|
|||
}
|
||||
|
||||
|
||||
static cursorObject *
|
||||
_conn_get_async_cursor(connectionObject *self) {
|
||||
PyObject *py_curs;
|
||||
|
||||
if (!(self->async_cursor)) {
|
||||
PyErr_SetString(PyExc_SystemError,
|
||||
"unexpectedly, there's no async cursor here");
|
||||
goto error;
|
||||
}
|
||||
|
||||
if (!(py_curs = PyWeakref_GetObject(self->async_cursor))) {
|
||||
PyErr_SetString(PyExc_SystemError,
|
||||
"got null dereferencing cursor weakref");
|
||||
goto error;
|
||||
}
|
||||
if (Py_None == py_curs) {
|
||||
PyErr_SetString(InterfaceError,
|
||||
"the asynchronous cursor has disappeared");
|
||||
goto error;
|
||||
}
|
||||
|
||||
Py_INCREF(py_curs);
|
||||
return (cursorObject *)py_curs;
|
||||
|
||||
error:
|
||||
pq_clear_async(self);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
/* conn_poll - Main polling switch
|
||||
*
|
||||
* The function is called in all the states and connection types and invokes
|
||||
|
@ -1056,12 +1088,13 @@ conn_poll(connectionObject *self)
|
|||
|
||||
switch (self->status) {
|
||||
case CONN_STATUS_SETUP:
|
||||
Dprintf("conn_poll: status -> CONN_STATUS_CONNECTING");
|
||||
Dprintf("conn_poll: status -> CONN_STATUS_SETUP");
|
||||
self->status = CONN_STATUS_CONNECTING;
|
||||
res = PSYCO_POLL_WRITE;
|
||||
break;
|
||||
|
||||
case CONN_STATUS_CONNECTING:
|
||||
Dprintf("conn_poll: status -> CONN_STATUS_CONNECTING");
|
||||
res = _conn_poll_connecting(self);
|
||||
if (res == PSYCO_POLL_OK && self->async) {
|
||||
res = _conn_poll_setup_async(self);
|
||||
|
@ -1069,39 +1102,29 @@ conn_poll(connectionObject *self)
|
|||
break;
|
||||
|
||||
case CONN_STATUS_DATESTYLE:
|
||||
Dprintf("conn_poll: status -> CONN_STATUS_DATESTYLE");
|
||||
res = _conn_poll_setup_async(self);
|
||||
break;
|
||||
|
||||
case CONN_STATUS_READY:
|
||||
case CONN_STATUS_BEGIN:
|
||||
case CONN_STATUS_PREPARED:
|
||||
Dprintf("conn_poll: status -> CONN_STATUS_*");
|
||||
res = _conn_poll_query(self);
|
||||
|
||||
if (res == PSYCO_POLL_OK && self->async && self->async_cursor) {
|
||||
if (res == PSYCO_POLL_OK && self->async) {
|
||||
cursorObject *curs;
|
||||
|
||||
/* An async query has just finished: parse the tuple in the
|
||||
* target cursor. */
|
||||
cursorObject *curs;
|
||||
PyObject *py_curs;
|
||||
if (!(py_curs = PyWeakref_GetObject(self->async_cursor))) {
|
||||
/* It shouldn't happen but consider it to avoid dereferencing
|
||||
* a null pointer below. */
|
||||
pq_clear_async(self);
|
||||
PyErr_SetString(PyExc_SystemError,
|
||||
"got null dereferencing cursor weakref");
|
||||
res = PSYCO_POLL_ERROR;
|
||||
break;
|
||||
}
|
||||
if (Py_None == py_curs) {
|
||||
pq_clear_async(self);
|
||||
PyErr_SetString(InterfaceError,
|
||||
"the asynchronous cursor has disappeared");
|
||||
if (!(curs = _conn_get_async_cursor(self))) {
|
||||
res = PSYCO_POLL_ERROR;
|
||||
break;
|
||||
}
|
||||
|
||||
curs = (cursorObject *)py_curs;
|
||||
CLEARPGRES(curs->pgres);
|
||||
curs->pgres = pq_get_last_result(self);
|
||||
PQclear(curs->pgres);
|
||||
curs->pgres = self->pgres;
|
||||
self->pgres = NULL;
|
||||
|
||||
/* fetch the tuples (if there are any) and build the result. We
|
||||
* don't care if pq_fetch return 0 or 1, but if there was an error,
|
||||
|
@ -1111,6 +1134,7 @@ conn_poll(connectionObject *self)
|
|||
}
|
||||
|
||||
/* We have finished with our async_cursor */
|
||||
Py_DECREF(curs);
|
||||
Py_CLEAR(self->async_cursor);
|
||||
}
|
||||
break;
|
||||
|
@ -1120,6 +1144,7 @@ conn_poll(connectionObject *self)
|
|||
res = PSYCO_POLL_ERROR;
|
||||
}
|
||||
|
||||
Dprintf("conn_poll: returning %d", res);
|
||||
return res;
|
||||
}
|
||||
|
||||
|
|
|
@ -1432,6 +1432,7 @@ connection_dealloc(PyObject* obj)
|
|||
PyMem_Free(self->encoding);
|
||||
if (self->critical) free(self->critical);
|
||||
if (self->cancel) PQfreeCancel(self->cancel);
|
||||
PQclear(self->pgres);
|
||||
|
||||
connection_clear(self);
|
||||
|
||||
|
|
|
@ -52,8 +52,6 @@ psyco_curs_close(cursorObject *self, PyObject *dummy)
|
|||
PyObject *rv = NULL;
|
||||
char *lname = NULL;
|
||||
|
||||
EXC_IF_ASYNC_IN_PROGRESS(self, close);
|
||||
|
||||
if (self->closed) {
|
||||
rv = Py_None;
|
||||
Py_INCREF(rv);
|
||||
|
@ -64,6 +62,8 @@ psyco_curs_close(cursorObject *self, PyObject *dummy)
|
|||
char buffer[256];
|
||||
PGTransactionStatusType status;
|
||||
|
||||
EXC_IF_ASYNC_IN_PROGRESS(self, close_named);
|
||||
|
||||
if (self->conn) {
|
||||
status = PQtransactionStatus(self->conn->pgconn);
|
||||
}
|
||||
|
|
|
@ -177,10 +177,12 @@ psyco_exec_green(connectionObject *conn, const char *command)
|
|||
goto end;
|
||||
}
|
||||
|
||||
/* Now we can read the data without fear of blocking. */
|
||||
result = pq_get_last_result(conn);
|
||||
/* the result is now in the connection: take its ownership */
|
||||
result = conn->pgres;
|
||||
conn->pgres = NULL;
|
||||
|
||||
end:
|
||||
CLEARPGRES(conn->pgres);
|
||||
conn->async_status = ASYNC_DONE;
|
||||
Py_CLEAR(conn->async_cursor);
|
||||
return result;
|
||||
|
|
|
@ -68,8 +68,6 @@ microprotocols_add(PyTypeObject *type, PyObject *proto, PyObject *cast)
|
|||
|
||||
if (proto == NULL) proto = (PyObject*)&isqlquoteType;
|
||||
|
||||
Dprintf("microprotocols_add: cast %p for (%s, ?)", cast, type->tp_name);
|
||||
|
||||
if (!(key = PyTuple_Pack(2, (PyObject*)type, proto))) { goto exit; }
|
||||
if (0 != PyDict_SetItem(psyco_adapters, key, cast)) { goto exit; }
|
||||
|
||||
|
|
178
psycopg/pqpath.c
178
psycopg/pqpath.c
|
@ -764,63 +764,22 @@ exit:
|
|||
}
|
||||
|
||||
|
||||
/* pq_is_busy - consume input and return connection status
|
||||
|
||||
a status of 1 means that a call to pq_fetch will block, while a status of 0
|
||||
means that there is data available to be collected. -1 means an error, the
|
||||
exception will be set accordingly.
|
||||
|
||||
this function locks the connection object
|
||||
this function call Py_*_ALLOW_THREADS macros */
|
||||
|
||||
int
|
||||
pq_is_busy(connectionObject *conn)
|
||||
{
|
||||
int res;
|
||||
Dprintf("pq_is_busy: consuming input");
|
||||
|
||||
Py_BEGIN_ALLOW_THREADS;
|
||||
pthread_mutex_lock(&(conn->lock));
|
||||
|
||||
if (PQconsumeInput(conn->pgconn) == 0) {
|
||||
Dprintf("pq_is_busy: PQconsumeInput() failed");
|
||||
pthread_mutex_unlock(&(conn->lock));
|
||||
Py_BLOCK_THREADS;
|
||||
|
||||
/* if the libpq says pgconn is lost, close the py conn */
|
||||
if (CONNECTION_BAD == PQstatus(conn->pgconn)) {
|
||||
conn->closed = 2;
|
||||
}
|
||||
|
||||
PyErr_SetString(OperationalError, PQerrorMessage(conn->pgconn));
|
||||
return -1;
|
||||
}
|
||||
|
||||
res = PQisBusy(conn->pgconn);
|
||||
|
||||
Py_BLOCK_THREADS;
|
||||
conn_notifies_process(conn);
|
||||
conn_notice_process(conn);
|
||||
Py_UNBLOCK_THREADS;
|
||||
|
||||
pthread_mutex_unlock(&(conn->lock));
|
||||
Py_END_ALLOW_THREADS;
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
/* pq_is_busy_locked - equivalent to pq_is_busy but we already have the lock
|
||||
/* pq_get_result_async - read an available result without blocking.
|
||||
*
|
||||
* Return 0 if the result is ready, 1 if it will block, -1 on error.
|
||||
* The last result will be returned in pgres.
|
||||
*
|
||||
* The function should be called with the lock and holding the GIL.
|
||||
*/
|
||||
|
||||
int
|
||||
pq_is_busy_locked(connectionObject *conn)
|
||||
RAISES_NEG int
|
||||
pq_get_result_async(connectionObject *conn)
|
||||
{
|
||||
Dprintf("pq_is_busy_locked: consuming input");
|
||||
int rv = -1;
|
||||
|
||||
Dprintf("pq_get_result_async: calling PQconsumeInput()");
|
||||
if (PQconsumeInput(conn->pgconn) == 0) {
|
||||
Dprintf("pq_is_busy_locked: PQconsumeInput() failed");
|
||||
Dprintf("pq_get_result_async: PQconsumeInput() failed");
|
||||
|
||||
/* if the libpq says pgconn is lost, close the py conn */
|
||||
if (CONNECTION_BAD == PQstatus(conn->pgconn)) {
|
||||
|
@ -828,13 +787,69 @@ pq_is_busy_locked(connectionObject *conn)
|
|||
}
|
||||
|
||||
PyErr_SetString(OperationalError, PQerrorMessage(conn->pgconn));
|
||||
return -1;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
/* notices and notifies will be processed at the end of the loop we are in
|
||||
* (async reading) by pq_fetch. */
|
||||
conn_notifies_process(conn);
|
||||
conn_notice_process(conn);
|
||||
|
||||
return PQisBusy(conn->pgconn);
|
||||
for (;;) {
|
||||
int busy;
|
||||
PGresult *res;
|
||||
ExecStatusType status;
|
||||
|
||||
Dprintf("pq_get_result_async: calling PQisBusy()");
|
||||
busy = PQisBusy(conn->pgconn);
|
||||
|
||||
if (busy) {
|
||||
/* try later */
|
||||
Dprintf("pq_get_result_async: PQisBusy() = 1");
|
||||
rv = 1;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
if (!(res = PQgetResult(conn->pgconn))) {
|
||||
Dprintf("pq_get_result_async: got no result");
|
||||
/* the result is ready: it was the previously read one */
|
||||
rv = 0;
|
||||
goto exit;
|
||||
}
|
||||
|
||||
status = PQresultStatus(res);
|
||||
Dprintf("pq_get_result_async: got result %s", PQresStatus(status));
|
||||
|
||||
/* Store the result outside because we want to return the last non-null
|
||||
* one and we may have to do it across poll calls. However if there is
|
||||
* an error in the stream of results we want to handle the *first*
|
||||
* error. So don't clobber it with the following ones. */
|
||||
if (conn->pgres && PQresultStatus(conn->pgres) == PGRES_FATAL_ERROR) {
|
||||
Dprintf("previous pgres is error: discarding");
|
||||
PQclear(res);
|
||||
}
|
||||
else {
|
||||
PQclear(conn->pgres);
|
||||
conn->pgres = res;
|
||||
}
|
||||
|
||||
switch (status) {
|
||||
case PGRES_COPY_OUT:
|
||||
case PGRES_COPY_IN:
|
||||
case PGRES_COPY_BOTH:
|
||||
/* After entering copy mode, libpq will make a phony
|
||||
* PGresult for us every time we query for it, so we need to
|
||||
* break out of this endless loop. */
|
||||
rv = 0;
|
||||
goto exit;
|
||||
|
||||
default:
|
||||
/* keep on reading to check if there are other results or
|
||||
* we have finished. */
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
exit:
|
||||
return rv;
|
||||
}
|
||||
|
||||
/* pq_flush - flush output and return connection status
|
||||
|
@ -940,10 +955,8 @@ _pq_execute_sync(cursorObject *curs, const char *query, int no_result, int no_be
|
|||
}
|
||||
|
||||
RAISES_NEG int
|
||||
_pq_execute_async(cursorObject *curs, const char *query, int no_result, int no_begin)
|
||||
_pq_execute_async(cursorObject *curs, const char *query, int no_result)
|
||||
{
|
||||
PGresult *pgres = NULL;
|
||||
char *error = NULL;
|
||||
int async_status = ASYNC_WRITE;
|
||||
int ret;
|
||||
|
||||
|
@ -952,15 +965,6 @@ _pq_execute_async(cursorObject *curs, const char *query, int no_result, int no_b
|
|||
Py_BEGIN_ALLOW_THREADS;
|
||||
pthread_mutex_lock(&(curs->conn->lock));
|
||||
|
||||
/* TODO: is this needed here? */
|
||||
if (!no_begin && pq_begin_locked(curs->conn, &pgres, &error, &_save) < 0) {
|
||||
pthread_mutex_unlock(&(curs->conn->lock));
|
||||
Py_BLOCK_THREADS;
|
||||
pq_complete_error(curs->conn, &pgres, &error);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
Dprintf("pq_execute: executing ASYNC query: pgconn = %p", curs->conn->pgconn);
|
||||
Dprintf(" %-.200s", query);
|
||||
|
||||
|
@ -1028,7 +1032,7 @@ pq_execute(cursorObject *curs, const char *query, int async, int no_result, int
|
|||
if (!async) {
|
||||
return _pq_execute_sync(curs, query, no_result, no_begin);
|
||||
} else {
|
||||
return _pq_execute_async(curs, query, no_result, no_begin);
|
||||
return _pq_execute_async(curs, query, no_result);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1047,6 +1051,7 @@ pq_send_query(connectionObject *conn, const char *query)
|
|||
Dprintf("pq_send_query: sending ASYNC query:");
|
||||
Dprintf(" %-.200s", query);
|
||||
|
||||
CLEARPGRES(conn->pgres);
|
||||
if (0 == (rv = PQsendQuery(conn->pgconn, query))) {
|
||||
Dprintf("pq_send_query: error: %s", PQerrorMessage(conn->pgconn));
|
||||
}
|
||||
|
@ -1054,45 +1059,6 @@ pq_send_query(connectionObject *conn, const char *query)
|
|||
return rv;
|
||||
}
|
||||
|
||||
/* Return the last result available on the connection.
|
||||
*
|
||||
* The function will block only if a command is active and the
|
||||
* necessary response data has not yet been read by PQconsumeInput.
|
||||
*
|
||||
* The result should be disposed of using PQclear()
|
||||
*/
|
||||
PGresult *
|
||||
pq_get_last_result(connectionObject *conn)
|
||||
{
|
||||
PGresult *result = NULL, *res;
|
||||
ExecStatusType status;
|
||||
|
||||
/* Read until PQgetResult gives a NULL */
|
||||
while (NULL != (res = PQgetResult(conn->pgconn))) {
|
||||
if (result) {
|
||||
/* TODO too bad: we are discarding results from all the queries
|
||||
* except the last. We could have populated `nextset()` with it
|
||||
* but it would be an incompatible change (apps currently issue
|
||||
* groups of queries expecting to receive the last result: they
|
||||
* would start receiving the first instead). */
|
||||
PQclear(result);
|
||||
}
|
||||
result = res;
|
||||
status = PQresultStatus(result);
|
||||
Dprintf("pq_get_last_result: got result %s", PQresStatus(status));
|
||||
|
||||
/* After entering copy mode, libpq will make a phony
|
||||
* PGresult for us every time we query for it, so we need to
|
||||
* break out of this endless loop. */
|
||||
if (status == PGRES_COPY_BOTH
|
||||
|| status == PGRES_COPY_OUT
|
||||
|| status == PGRES_COPY_IN) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/* pq_fetch - fetch data after a query
|
||||
|
||||
|
|
|
@ -35,7 +35,6 @@
|
|||
#define CLEARPGRES(pgres) do { PQclear(pgres); pgres = NULL; } while (0)
|
||||
|
||||
/* exported functions */
|
||||
HIDDEN PGresult *pq_get_last_result(connectionObject *conn);
|
||||
RAISES_NEG HIDDEN int pq_fetch(cursorObject *curs, int no_result);
|
||||
RAISES_NEG HIDDEN int pq_execute(cursorObject *curs, const char *query,
|
||||
int async, int no_result, int no_begin);
|
||||
|
@ -59,8 +58,7 @@ HIDDEN int pq_tpc_command_locked(connectionObject *conn,
|
|||
const char *cmd, const char *tid,
|
||||
PGresult **pgres, char **error,
|
||||
PyThreadState **tstate);
|
||||
HIDDEN int pq_is_busy(connectionObject *conn);
|
||||
HIDDEN int pq_is_busy_locked(connectionObject *conn);
|
||||
RAISES_NEG HIDDEN int pq_get_result_async(connectionObject *conn);
|
||||
HIDDEN int pq_flush(connectionObject *conn);
|
||||
HIDDEN void pq_clear_async(connectionObject *conn);
|
||||
RAISES_NEG HIDDEN int pq_set_non_blocking(connectionObject *conn, int arg);
|
||||
|
|
|
@ -303,7 +303,7 @@ adapters_init(PyObject *module)
|
|||
|
||||
if (0 > microprotocols_init(module)) { goto exit; }
|
||||
|
||||
Dprintf("psycopgmodule: configuring adapters");
|
||||
Dprintf("psycopgmodule: initializing adapters");
|
||||
|
||||
if (0 > microprotocols_add(&PyFloat_Type, NULL, (PyObject*)&pfloatType)) {
|
||||
goto exit;
|
||||
|
@ -935,7 +935,7 @@ datetime_init(void)
|
|||
{
|
||||
PyObject *dt = NULL;
|
||||
|
||||
Dprintf("psycopgmodule: datetime module");
|
||||
Dprintf("psycopgmodule: initializing datetime module");
|
||||
|
||||
/* import python builtin datetime module, if available */
|
||||
if (!(dt = PyImport_ImportModule("datetime"))) {
|
||||
|
|
|
@ -198,8 +198,6 @@ psyco_repl_curs_send_feedback(replicationCursorObject *self,
|
|||
RAISES_NEG int
|
||||
psyco_repl_curs_datetime_init(void)
|
||||
{
|
||||
Dprintf("psyco_repl_curs_datetime_init: datetime init");
|
||||
|
||||
PyDateTime_IMPORT;
|
||||
|
||||
if (!PyDateTimeAPI) {
|
||||
|
|
|
@ -33,8 +33,6 @@
|
|||
RAISES_NEG int
|
||||
psyco_replmsg_datetime_init(void)
|
||||
{
|
||||
Dprintf("psyco_replmsg_datetime_init: datetime init");
|
||||
|
||||
PyDateTime_IMPORT;
|
||||
|
||||
if (!PyDateTimeAPI) {
|
||||
|
|
|
@ -271,8 +271,6 @@ typecast_init(PyObject *module)
|
|||
/* insert the cast types into the 'types' dictionary and register them in
|
||||
the module dictionary */
|
||||
for (i = 0; typecast_builtins[i].name != NULL; i++) {
|
||||
Dprintf("typecast_init: initializing %s", typecast_builtins[i].name);
|
||||
|
||||
t = (typecastObject *)typecast_from_c(&(typecast_builtins[i]), dict);
|
||||
if (t == NULL) { goto exit; }
|
||||
if (typecast_add((PyObject *)t, NULL, 0) < 0) { goto exit; }
|
||||
|
@ -295,7 +293,6 @@ typecast_init(PyObject *module)
|
|||
#ifdef HAVE_MXDATETIME
|
||||
if (0 == psyco_typecast_mxdatetime_init()) {
|
||||
for (i = 0; typecast_mxdatetime[i].name != NULL; i++) {
|
||||
Dprintf("typecast_init: initializing %s", typecast_mxdatetime[i].name);
|
||||
t = (typecastObject *)typecast_from_c(&(typecast_mxdatetime[i]), dict);
|
||||
if (t == NULL) { goto exit; }
|
||||
PyDict_SetItem(dict, t->name, (PyObject *)t);
|
||||
|
@ -307,7 +304,6 @@ typecast_init(PyObject *module)
|
|||
|
||||
if (0 > psyco_typecast_datetime_init()) { goto exit; }
|
||||
for (i = 0; typecast_pydatetime[i].name != NULL; i++) {
|
||||
Dprintf("typecast_init: initializing %s", typecast_pydatetime[i].name);
|
||||
t = (typecastObject *)typecast_from_c(&(typecast_pydatetime[i]), dict);
|
||||
if (t == NULL) { goto exit; }
|
||||
PyDict_SetItem(dict, t->name, (PyObject *)t);
|
||||
|
@ -331,23 +327,15 @@ typecast_add(PyObject *obj, PyObject *dict, int binary)
|
|||
|
||||
typecastObject *type = (typecastObject *)obj;
|
||||
|
||||
Dprintf("typecast_add: object at %p, values refcnt = "
|
||||
FORMAT_CODE_PY_SSIZE_T,
|
||||
obj, Py_REFCNT(type->values)
|
||||
);
|
||||
|
||||
if (dict == NULL)
|
||||
dict = (binary ? psyco_binary_types : psyco_types);
|
||||
|
||||
len = PyTuple_Size(type->values);
|
||||
for (i = 0; i < len; i++) {
|
||||
val = PyTuple_GetItem(type->values, i);
|
||||
Dprintf("typecast_add: adding val: %ld", PyInt_AsLong(val));
|
||||
PyDict_SetItem(dict, val, obj);
|
||||
}
|
||||
|
||||
Dprintf("typecast_add: base caster: %p", type->bcast);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -531,9 +519,6 @@ typecast_new(PyObject *name, PyObject *values, PyObject *cast, PyObject *base)
|
|||
obj = PyObject_GC_New(typecastObject, &typecastType);
|
||||
if (obj == NULL) return NULL;
|
||||
|
||||
Dprintf("typecast_new: new type at = %p, refcnt = " FORMAT_CODE_PY_SSIZE_T,
|
||||
obj, Py_REFCNT(obj));
|
||||
|
||||
Py_INCREF(values);
|
||||
obj->values = values;
|
||||
|
||||
|
@ -560,8 +545,6 @@ typecast_new(PyObject *name, PyObject *values, PyObject *cast, PyObject *base)
|
|||
|
||||
PyObject_GC_Track(obj);
|
||||
|
||||
Dprintf("typecast_new: typecast object created at %p", obj);
|
||||
|
||||
return (PyObject *)obj;
|
||||
}
|
||||
|
||||
|
|
|
@ -29,8 +29,6 @@
|
|||
RAISES_NEG static int
|
||||
psyco_typecast_datetime_init(void)
|
||||
{
|
||||
Dprintf("psyco_typecast_datetime_init: datetime init");
|
||||
|
||||
PyDateTime_IMPORT;
|
||||
|
||||
if (!PyDateTimeAPI) {
|
||||
|
|
|
@ -31,8 +31,6 @@
|
|||
static int
|
||||
psyco_typecast_mxdatetime_init(void)
|
||||
{
|
||||
Dprintf("psyco_typecast_mxdatetime_init: mx.DateTime init");
|
||||
|
||||
if (mxDateTime_ImportModuleAndAPI()) {
|
||||
Dprintf("psyco_typecast_mxdatetime_init: mx.DateTime initialization failed");
|
||||
PyErr_Clear();
|
||||
|
|
|
@ -29,6 +29,7 @@ import unittest
|
|||
import warnings
|
||||
|
||||
import psycopg2
|
||||
import psycopg2.errors
|
||||
from psycopg2 import extensions as ext
|
||||
|
||||
from .testutils import ConnectingTestCase, StringIO, skip_before_postgres, slow
|
||||
|
@ -405,6 +406,15 @@ class AsyncTests(ConnectingTestCase):
|
|||
cur.execute("delete from table1")
|
||||
self.wait(cur)
|
||||
|
||||
def test_stop_on_first_error(self):
|
||||
cur = self.conn.cursor()
|
||||
cur.execute("select 1; select x; select 1/0; select 2")
|
||||
self.assertRaises(psycopg2.errors.UndefinedColumn, self.wait, cur)
|
||||
|
||||
cur.execute("select 1")
|
||||
self.wait(cur)
|
||||
self.assertEqual(cur.fetchone(), (1,))
|
||||
|
||||
def test_error_two_cursors(self):
|
||||
cur = self.conn.cursor()
|
||||
cur2 = self.conn.cursor()
|
||||
|
@ -454,6 +464,37 @@ class AsyncTests(ConnectingTestCase):
|
|||
cur.execute("copy (select 1) to stdout")
|
||||
self.assertRaises(psycopg2.ProgrammingError, self.wait, self.conn)
|
||||
|
||||
@slow
|
||||
@skip_before_postgres(9, 0)
|
||||
def test_non_block_after_notification(self):
|
||||
from select import select
|
||||
|
||||
cur = self.conn.cursor()
|
||||
cur.execute("""
|
||||
select 1;
|
||||
do $$
|
||||
begin
|
||||
raise notice 'hello';
|
||||
end
|
||||
$$ language plpgsql;
|
||||
select pg_sleep(1);
|
||||
""")
|
||||
|
||||
polls = 0
|
||||
while True:
|
||||
state = self.conn.poll()
|
||||
if state == psycopg2.extensions.POLL_OK:
|
||||
break
|
||||
elif state == psycopg2.extensions.POLL_READ:
|
||||
select([self.conn], [], [], 0.1)
|
||||
elif state == psycopg2.extensions.POLL_WRITE:
|
||||
select([], [self.conn], [], 0.1)
|
||||
else:
|
||||
raise Exception("Unexpected result from poll: %r", state)
|
||||
polls += 1
|
||||
|
||||
self.assert_(polls >= 8, polls)
|
||||
|
||||
|
||||
def test_suite():
|
||||
return unittest.TestLoader().loadTestsFromName(__name__)
|
||||
|
|
|
@ -25,6 +25,7 @@
|
|||
import select
|
||||
import unittest
|
||||
import warnings
|
||||
|
||||
import psycopg2
|
||||
import psycopg2.extensions
|
||||
import psycopg2.extras
|
||||
|
@ -58,10 +59,10 @@ class GreenTestCase(ConnectingTestCase):
|
|||
ConnectingTestCase.tearDown(self)
|
||||
psycopg2.extensions.set_wait_callback(self._cb)
|
||||
|
||||
def set_stub_wait_callback(self, conn):
|
||||
def set_stub_wait_callback(self, conn, cb=None):
|
||||
stub = ConnectionStub(conn)
|
||||
psycopg2.extensions.set_wait_callback(
|
||||
lambda conn: psycopg2.extras.wait_select(stub))
|
||||
lambda conn: (cb or psycopg2.extras.wait_select)(stub))
|
||||
return stub
|
||||
|
||||
@slow
|
||||
|
@ -119,6 +120,36 @@ class GreenTestCase(ConnectingTestCase):
|
|||
self.assertRaises(psycopg2.ProgrammingError,
|
||||
cur.execute, "copy (select 1) to stdout")
|
||||
|
||||
@slow
|
||||
@skip_before_postgres(9, 0)
|
||||
def test_non_block_after_notification(self):
|
||||
def wait(conn):
|
||||
while 1:
|
||||
state = conn.poll()
|
||||
if state == POLL_OK:
|
||||
break
|
||||
elif state == POLL_READ:
|
||||
select.select([conn.fileno()], [], [], 0.1)
|
||||
elif state == POLL_WRITE:
|
||||
select.select([], [conn.fileno()], [], 0.1)
|
||||
else:
|
||||
raise conn.OperationalError("bad state from poll: %s" % state)
|
||||
|
||||
stub = self.set_stub_wait_callback(self.conn, wait)
|
||||
cur = self.conn.cursor()
|
||||
cur.execute("""
|
||||
select 1;
|
||||
do $$
|
||||
begin
|
||||
raise notice 'hello';
|
||||
end
|
||||
$$ language plpgsql;
|
||||
select pg_sleep(1);
|
||||
""")
|
||||
|
||||
polls = stub.polls.count(POLL_READ)
|
||||
self.assert_(polls > 8, polls)
|
||||
|
||||
|
||||
class CallbackErrorTestCase(ConnectingTestCase):
|
||||
def setUp(self):
|
||||
|
|
|
@ -175,9 +175,9 @@ class ConnectingTestCase(unittest.TestCase):
|
|||
if state == psycopg2.extensions.POLL_OK:
|
||||
break
|
||||
elif state == psycopg2.extensions.POLL_READ:
|
||||
select.select([pollable], [], [], 10)
|
||||
select.select([pollable], [], [], 1)
|
||||
elif state == psycopg2.extensions.POLL_WRITE:
|
||||
select.select([], [pollable], [], 10)
|
||||
select.select([], [pollable], [], 1)
|
||||
else:
|
||||
raise Exception("Unexpected result from poll: %r", state)
|
||||
|
||||
|
|
Loading…
Reference in New Issue
Block a user