From 37891500d9220e73593d0d5e9db3130e4bc025a6 Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Tue, 22 Jan 2019 10:51:27 +0000 Subject: [PATCH 1/3] Split pq_execute into sync/async parts --- psycopg/pqpath.c | 241 ++++++++++++++++++++++++++--------------------- 1 file changed, 134 insertions(+), 107 deletions(-) diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index 2d4bac87..80c6fedb 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -996,12 +996,140 @@ pq_flush(connectionObject *conn) */ RAISES_NEG int -pq_execute(cursorObject *curs, const char *query, int async, int no_result, int no_begin) +_pq_execute_sync(cursorObject *curs, const char *query, int no_result, int no_begin) +{ + PGresult *pgres = NULL; + char *error = NULL; + + CLEARPGRES(curs->pgres); + + Py_BEGIN_ALLOW_THREADS; + pthread_mutex_lock(&(curs->conn->lock)); + + if (!no_begin && pq_begin_locked(curs->conn, &pgres, &error, &_save) < 0) { + pthread_mutex_unlock(&(curs->conn->lock)); + Py_BLOCK_THREADS; + pq_complete_error(curs->conn, &pgres, &error); + return -1; + } + + Dprintf("pq_execute: executing SYNC query: pgconn = %p", curs->conn->pgconn); + Dprintf(" %-.200s", query); + if (!psyco_green()) { + curs->pgres = PQexec(curs->conn->pgconn, query); + } + else { + Py_BLOCK_THREADS; + curs->pgres = psyco_exec_green(curs->conn, query); + Py_UNBLOCK_THREADS; + } + + /* don't let pgres = NULL go to pq_fetch() */ + if (curs->pgres == NULL) { + if (CONNECTION_BAD == PQstatus(curs->conn->pgconn)) { + curs->conn->closed = 2; + } + pthread_mutex_unlock(&(curs->conn->lock)); + Py_BLOCK_THREADS; + if (!PyErr_Occurred()) { + PyErr_SetString(OperationalError, + PQerrorMessage(curs->conn->pgconn)); + } + return -1; + } + + /* Process notifies here instead of when fetching the tuple as we are + * into the same critical section that received the data. Without this + * care, reading notifies may disrupt other thread communications. + * (as in ticket #55). */ + Py_BLOCK_THREADS; + conn_notifies_process(curs->conn); + conn_notice_process(curs->conn); + Py_UNBLOCK_THREADS; + + pthread_mutex_unlock(&(curs->conn->lock)); + Py_END_ALLOW_THREADS; + + /* if the execute was sync, we call pq_fetch() immediately, + to respect the old DBAPI-2.0 compatible behaviour */ + Dprintf("pq_execute: entering synchronous DBAPI compatibility mode"); + if (pq_fetch(curs, no_result) < 0) return -1; + + return 1; +} + +RAISES_NEG int +_pq_execute_async(cursorObject *curs, const char *query, int no_result, int no_begin) { PGresult *pgres = NULL; char *error = NULL; int async_status = ASYNC_WRITE; + int ret; + CLEARPGRES(curs->pgres); + + Py_BEGIN_ALLOW_THREADS; + pthread_mutex_lock(&(curs->conn->lock)); + + /* TODO: is this needed here? */ + if (!no_begin && pq_begin_locked(curs->conn, &pgres, &error, &_save) < 0) { + pthread_mutex_unlock(&(curs->conn->lock)); + Py_BLOCK_THREADS; + pq_complete_error(curs->conn, &pgres, &error); + return -1; + } + + + Dprintf("pq_execute: executing ASYNC query: pgconn = %p", curs->conn->pgconn); + Dprintf(" %-.200s", query); + + if (PQsendQuery(curs->conn->pgconn, query) == 0) { + if (CONNECTION_BAD == PQstatus(curs->conn->pgconn)) { + curs->conn->closed = 2; + } + pthread_mutex_unlock(&(curs->conn->lock)); + Py_BLOCK_THREADS; + PyErr_SetString(OperationalError, + PQerrorMessage(curs->conn->pgconn)); + return -1; + } + Dprintf("pq_execute: async query sent to backend"); + + ret = PQflush(curs->conn->pgconn); + if (ret == 0) { + /* the query got fully sent to the server */ + Dprintf("pq_execute: query got flushed immediately"); + /* the async status will be ASYNC_READ */ + async_status = ASYNC_READ; + } + else if (ret == 1) { + /* not all of the query got sent to the server */ + async_status = ASYNC_WRITE; + } + else { + /* there was an error */ + pthread_mutex_unlock(&(curs->conn->lock)); + Py_BLOCK_THREADS; + PyErr_SetString(OperationalError, + PQerrorMessage(curs->conn->pgconn)); + return -1; + } + + pthread_mutex_unlock(&(curs->conn->lock)); + Py_END_ALLOW_THREADS; + + curs->conn->async_status = async_status; + if (!(curs->conn->async_cursor + = PyWeakref_NewRef((PyObject *)curs, NULL))) { + return -1; + } + + return 0; +} + +RAISES_NEG int +pq_execute(cursorObject *curs, const char *query, int async, int no_result, int no_begin) +{ /* if the status of the connection is critical raise an exception and definitely close the connection */ if (curs->conn->critical) { @@ -1016,115 +1144,14 @@ pq_execute(cursorObject *curs, const char *query, int async, int no_result, int } Dprintf("pq_execute: pg connection at %p OK", curs->conn->pgconn); - CLEARPGRES(curs->pgres); - - Py_BEGIN_ALLOW_THREADS; - pthread_mutex_lock(&(curs->conn->lock)); - - if (!no_begin && pq_begin_locked(curs->conn, &pgres, &error, &_save) < 0) { - pthread_mutex_unlock(&(curs->conn->lock)); - Py_BLOCK_THREADS; - pq_complete_error(curs->conn, &pgres, &error); - return -1; + if (!async) { + return _pq_execute_sync(curs, query, no_result, no_begin); + } else { + return _pq_execute_async(curs, query, no_result, no_begin); } - - if (async == 0) { - Dprintf("pq_execute: executing SYNC query: pgconn = %p", curs->conn->pgconn); - Dprintf(" %-.200s", query); - if (!psyco_green()) { - curs->pgres = PQexec(curs->conn->pgconn, query); - } - else { - Py_BLOCK_THREADS; - curs->pgres = psyco_exec_green(curs->conn, query); - Py_UNBLOCK_THREADS; - } - - /* don't let pgres = NULL go to pq_fetch() */ - if (curs->pgres == NULL) { - if (CONNECTION_BAD == PQstatus(curs->conn->pgconn)) { - curs->conn->closed = 2; - } - pthread_mutex_unlock(&(curs->conn->lock)); - Py_BLOCK_THREADS; - if (!PyErr_Occurred()) { - PyErr_SetString(OperationalError, - PQerrorMessage(curs->conn->pgconn)); - } - return -1; - } - - /* Process notifies here instead of when fetching the tuple as we are - * into the same critical section that received the data. Without this - * care, reading notifies may disrupt other thread communications. - * (as in ticket #55). */ - Py_BLOCK_THREADS; - conn_notifies_process(curs->conn); - conn_notice_process(curs->conn); - Py_UNBLOCK_THREADS; - } - - else if (async == 1) { - int ret; - - Dprintf("pq_execute: executing ASYNC query: pgconn = %p", curs->conn->pgconn); - Dprintf(" %-.200s", query); - - if (PQsendQuery(curs->conn->pgconn, query) == 0) { - if (CONNECTION_BAD == PQstatus(curs->conn->pgconn)) { - curs->conn->closed = 2; - } - pthread_mutex_unlock(&(curs->conn->lock)); - Py_BLOCK_THREADS; - PyErr_SetString(OperationalError, - PQerrorMessage(curs->conn->pgconn)); - return -1; - } - Dprintf("pq_execute: async query sent to backend"); - - ret = PQflush(curs->conn->pgconn); - if (ret == 0) { - /* the query got fully sent to the server */ - Dprintf("pq_execute: query got flushed immediately"); - /* the async status will be ASYNC_READ */ - async_status = ASYNC_READ; - } - else if (ret == 1) { - /* not all of the query got sent to the server */ - async_status = ASYNC_WRITE; - } - else { - /* there was an error */ - pthread_mutex_unlock(&(curs->conn->lock)); - Py_BLOCK_THREADS; - PyErr_SetString(OperationalError, - PQerrorMessage(curs->conn->pgconn)); - return -1; - } - } - - pthread_mutex_unlock(&(curs->conn->lock)); - Py_END_ALLOW_THREADS; - - /* if the execute was sync, we call pq_fetch() immediately, - to respect the old DBAPI-2.0 compatible behaviour */ - if (async == 0) { - Dprintf("pq_execute: entering synchronous DBAPI compatibility mode"); - if (pq_fetch(curs, no_result) < 0) return -1; - } - else { - PyObject *tmp; - curs->conn->async_status = async_status; - curs->conn->async_cursor = tmp = PyWeakref_NewRef((PyObject *)curs, NULL); - if (!tmp) { - /* weakref creation failed */ - return -1; - } - } - - return 1-async; } + /* send an async query to the backend. * * Return 1 if command succeeded, else 0. From 92e615a1a47a49e5da5bfe69d7c3689cce0ccf90 Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Tue, 22 Jan 2019 11:02:09 +0000 Subject: [PATCH 2/3] Assign the PGresult to the cursor in the execute critical section Possible cause of the issue reported in #346 (in concurrent environments). --- psycopg/pqpath.c | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index 80c6fedb..7d085a22 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -1016,16 +1016,16 @@ _pq_execute_sync(cursorObject *curs, const char *query, int no_result, int no_be Dprintf("pq_execute: executing SYNC query: pgconn = %p", curs->conn->pgconn); Dprintf(" %-.200s", query); if (!psyco_green()) { - curs->pgres = PQexec(curs->conn->pgconn, query); + pgres = PQexec(curs->conn->pgconn, query); } else { Py_BLOCK_THREADS; - curs->pgres = psyco_exec_green(curs->conn, query); + pgres = psyco_exec_green(curs->conn, query); Py_UNBLOCK_THREADS; } /* don't let pgres = NULL go to pq_fetch() */ - if (curs->pgres == NULL) { + if (pgres == NULL) { if (CONNECTION_BAD == PQstatus(curs->conn->pgconn)) { curs->conn->closed = 2; } @@ -1038,11 +1038,16 @@ _pq_execute_sync(cursorObject *curs, const char *query, int no_result, int no_be return -1; } + Py_BLOCK_THREADS; + + /* assign the result back to the cursor now that we have the GIL */ + curs->pgres = pgres; + pgres = NULL; + /* Process notifies here instead of when fetching the tuple as we are * into the same critical section that received the data. Without this * care, reading notifies may disrupt other thread communications. * (as in ticket #55). */ - Py_BLOCK_THREADS; conn_notifies_process(curs->conn); conn_notice_process(curs->conn); Py_UNBLOCK_THREADS; From c34c99aa7f820ef90be0a9f917e85c51d7838485 Mon Sep 17 00:00:00 2001 From: Daniele Varrazzo Date: Tue, 22 Jan 2019 11:16:52 +0000 Subject: [PATCH 3/3] Mention cursor locks cleanup in news file --- NEWS | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/NEWS b/NEWS index 76b6b80f..6995f81c 100644 --- a/NEWS +++ b/NEWS @@ -44,6 +44,13 @@ Other changes: install``. +What's new in psycopg 2.7.7 +^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +- Cleanup of the cursor results assignment code, which might have solved + double free and inconsistencies in concurrent usage (:tickets:`#346, #384`). + + What's new in psycopg 2.7.6.1 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^