Split pq_execute into sync/async parts

This commit is contained in:
Daniele Varrazzo 2019-01-22 10:51:27 +00:00
parent 4246fdf809
commit 37891500d9

View File

@ -996,12 +996,140 @@ pq_flush(connectionObject *conn)
*/ */
RAISES_NEG int RAISES_NEG int
pq_execute(cursorObject *curs, const char *query, int async, int no_result, int no_begin) _pq_execute_sync(cursorObject *curs, const char *query, int no_result, int no_begin)
{
PGresult *pgres = NULL;
char *error = NULL;
CLEARPGRES(curs->pgres);
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&(curs->conn->lock));
if (!no_begin && pq_begin_locked(curs->conn, &pgres, &error, &_save) < 0) {
pthread_mutex_unlock(&(curs->conn->lock));
Py_BLOCK_THREADS;
pq_complete_error(curs->conn, &pgres, &error);
return -1;
}
Dprintf("pq_execute: executing SYNC query: pgconn = %p", curs->conn->pgconn);
Dprintf(" %-.200s", query);
if (!psyco_green()) {
curs->pgres = PQexec(curs->conn->pgconn, query);
}
else {
Py_BLOCK_THREADS;
curs->pgres = psyco_exec_green(curs->conn, query);
Py_UNBLOCK_THREADS;
}
/* don't let pgres = NULL go to pq_fetch() */
if (curs->pgres == NULL) {
if (CONNECTION_BAD == PQstatus(curs->conn->pgconn)) {
curs->conn->closed = 2;
}
pthread_mutex_unlock(&(curs->conn->lock));
Py_BLOCK_THREADS;
if (!PyErr_Occurred()) {
PyErr_SetString(OperationalError,
PQerrorMessage(curs->conn->pgconn));
}
return -1;
}
/* Process notifies here instead of when fetching the tuple as we are
* into the same critical section that received the data. Without this
* care, reading notifies may disrupt other thread communications.
* (as in ticket #55). */
Py_BLOCK_THREADS;
conn_notifies_process(curs->conn);
conn_notice_process(curs->conn);
Py_UNBLOCK_THREADS;
pthread_mutex_unlock(&(curs->conn->lock));
Py_END_ALLOW_THREADS;
/* if the execute was sync, we call pq_fetch() immediately,
to respect the old DBAPI-2.0 compatible behaviour */
Dprintf("pq_execute: entering synchronous DBAPI compatibility mode");
if (pq_fetch(curs, no_result) < 0) return -1;
return 1;
}
RAISES_NEG int
_pq_execute_async(cursorObject *curs, const char *query, int no_result, int no_begin)
{ {
PGresult *pgres = NULL; PGresult *pgres = NULL;
char *error = NULL; char *error = NULL;
int async_status = ASYNC_WRITE; int async_status = ASYNC_WRITE;
int ret;
CLEARPGRES(curs->pgres);
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&(curs->conn->lock));
/* TODO: is this needed here? */
if (!no_begin && pq_begin_locked(curs->conn, &pgres, &error, &_save) < 0) {
pthread_mutex_unlock(&(curs->conn->lock));
Py_BLOCK_THREADS;
pq_complete_error(curs->conn, &pgres, &error);
return -1;
}
Dprintf("pq_execute: executing ASYNC query: pgconn = %p", curs->conn->pgconn);
Dprintf(" %-.200s", query);
if (PQsendQuery(curs->conn->pgconn, query) == 0) {
if (CONNECTION_BAD == PQstatus(curs->conn->pgconn)) {
curs->conn->closed = 2;
}
pthread_mutex_unlock(&(curs->conn->lock));
Py_BLOCK_THREADS;
PyErr_SetString(OperationalError,
PQerrorMessage(curs->conn->pgconn));
return -1;
}
Dprintf("pq_execute: async query sent to backend");
ret = PQflush(curs->conn->pgconn);
if (ret == 0) {
/* the query got fully sent to the server */
Dprintf("pq_execute: query got flushed immediately");
/* the async status will be ASYNC_READ */
async_status = ASYNC_READ;
}
else if (ret == 1) {
/* not all of the query got sent to the server */
async_status = ASYNC_WRITE;
}
else {
/* there was an error */
pthread_mutex_unlock(&(curs->conn->lock));
Py_BLOCK_THREADS;
PyErr_SetString(OperationalError,
PQerrorMessage(curs->conn->pgconn));
return -1;
}
pthread_mutex_unlock(&(curs->conn->lock));
Py_END_ALLOW_THREADS;
curs->conn->async_status = async_status;
if (!(curs->conn->async_cursor
= PyWeakref_NewRef((PyObject *)curs, NULL))) {
return -1;
}
return 0;
}
RAISES_NEG int
pq_execute(cursorObject *curs, const char *query, int async, int no_result, int no_begin)
{
/* if the status of the connection is critical raise an exception and /* if the status of the connection is critical raise an exception and
definitely close the connection */ definitely close the connection */
if (curs->conn->critical) { if (curs->conn->critical) {
@ -1016,115 +1144,14 @@ pq_execute(cursorObject *curs, const char *query, int async, int no_result, int
} }
Dprintf("pq_execute: pg connection at %p OK", curs->conn->pgconn); Dprintf("pq_execute: pg connection at %p OK", curs->conn->pgconn);
CLEARPGRES(curs->pgres); if (!async) {
return _pq_execute_sync(curs, query, no_result, no_begin);
Py_BEGIN_ALLOW_THREADS; } else {
pthread_mutex_lock(&(curs->conn->lock)); return _pq_execute_async(curs, query, no_result, no_begin);
if (!no_begin && pq_begin_locked(curs->conn, &pgres, &error, &_save) < 0) {
pthread_mutex_unlock(&(curs->conn->lock));
Py_BLOCK_THREADS;
pq_complete_error(curs->conn, &pgres, &error);
return -1;
} }
if (async == 0) {
Dprintf("pq_execute: executing SYNC query: pgconn = %p", curs->conn->pgconn);
Dprintf(" %-.200s", query);
if (!psyco_green()) {
curs->pgres = PQexec(curs->conn->pgconn, query);
}
else {
Py_BLOCK_THREADS;
curs->pgres = psyco_exec_green(curs->conn, query);
Py_UNBLOCK_THREADS;
}
/* don't let pgres = NULL go to pq_fetch() */
if (curs->pgres == NULL) {
if (CONNECTION_BAD == PQstatus(curs->conn->pgconn)) {
curs->conn->closed = 2;
}
pthread_mutex_unlock(&(curs->conn->lock));
Py_BLOCK_THREADS;
if (!PyErr_Occurred()) {
PyErr_SetString(OperationalError,
PQerrorMessage(curs->conn->pgconn));
}
return -1;
}
/* Process notifies here instead of when fetching the tuple as we are
* into the same critical section that received the data. Without this
* care, reading notifies may disrupt other thread communications.
* (as in ticket #55). */
Py_BLOCK_THREADS;
conn_notifies_process(curs->conn);
conn_notice_process(curs->conn);
Py_UNBLOCK_THREADS;
}
else if (async == 1) {
int ret;
Dprintf("pq_execute: executing ASYNC query: pgconn = %p", curs->conn->pgconn);
Dprintf(" %-.200s", query);
if (PQsendQuery(curs->conn->pgconn, query) == 0) {
if (CONNECTION_BAD == PQstatus(curs->conn->pgconn)) {
curs->conn->closed = 2;
}
pthread_mutex_unlock(&(curs->conn->lock));
Py_BLOCK_THREADS;
PyErr_SetString(OperationalError,
PQerrorMessage(curs->conn->pgconn));
return -1;
}
Dprintf("pq_execute: async query sent to backend");
ret = PQflush(curs->conn->pgconn);
if (ret == 0) {
/* the query got fully sent to the server */
Dprintf("pq_execute: query got flushed immediately");
/* the async status will be ASYNC_READ */
async_status = ASYNC_READ;
}
else if (ret == 1) {
/* not all of the query got sent to the server */
async_status = ASYNC_WRITE;
}
else {
/* there was an error */
pthread_mutex_unlock(&(curs->conn->lock));
Py_BLOCK_THREADS;
PyErr_SetString(OperationalError,
PQerrorMessage(curs->conn->pgconn));
return -1;
}
}
pthread_mutex_unlock(&(curs->conn->lock));
Py_END_ALLOW_THREADS;
/* if the execute was sync, we call pq_fetch() immediately,
to respect the old DBAPI-2.0 compatible behaviour */
if (async == 0) {
Dprintf("pq_execute: entering synchronous DBAPI compatibility mode");
if (pq_fetch(curs, no_result) < 0) return -1;
}
else {
PyObject *tmp;
curs->conn->async_status = async_status;
curs->conn->async_cursor = tmp = PyWeakref_NewRef((PyObject *)curs, NULL);
if (!tmp) {
/* weakref creation failed */
return -1;
}
}
return 1-async;
} }
/* send an async query to the backend. /* send an async query to the backend.
* *
* Return 1 if command succeeded, else 0. * Return 1 if command succeeded, else 0.