mirror of
https://github.com/psycopg/psycopg2.git
synced 2025-02-28 14:30:32 +03:00
'pq_execute_command_locked()' calls the wait callback if set.
The function is called without holding the GIL. Because it is necessary to execute the Python callback if set, we need to re-acquire the GIL and tnen release it again. In order to correctly bookkeep the thread state, the pointer of the _save variable is passed to the function.
This commit is contained in:
parent
7a06c0455b
commit
0ec73a18b4
|
@ -760,21 +760,22 @@ conn_close(connectionObject *self)
|
||||||
Py_BEGIN_ALLOW_THREADS;
|
Py_BEGIN_ALLOW_THREADS;
|
||||||
pthread_mutex_lock(&self->lock);
|
pthread_mutex_lock(&self->lock);
|
||||||
|
|
||||||
if (self->closed == 0)
|
|
||||||
self->closed = 1;
|
|
||||||
|
|
||||||
/* execute a forced rollback on the connection (but don't check the
|
/* execute a forced rollback on the connection (but don't check the
|
||||||
result, we're going to close the pq connection anyway */
|
result, we're going to close the pq connection anyway */
|
||||||
if (self->pgconn && self->closed == 1) {
|
if (self->pgconn && self->closed == 1) {
|
||||||
PGresult *pgres = NULL;
|
PGresult *pgres = NULL;
|
||||||
char *error = NULL;
|
char *error = NULL;
|
||||||
|
|
||||||
if (pq_abort_locked(self, &pgres, &error) < 0) {
|
if (pq_abort_locked(self, &pgres, &error, &_save) < 0) {
|
||||||
IFCLEARPGRES(pgres);
|
IFCLEARPGRES(pgres);
|
||||||
if (error)
|
if (error)
|
||||||
free (error);
|
free (error);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (self->closed == 0)
|
||||||
|
self->closed = 1;
|
||||||
|
|
||||||
if (self->pgconn) {
|
if (self->pgconn) {
|
||||||
PQfinish(self->pgconn);
|
PQfinish(self->pgconn);
|
||||||
Dprintf("conn_close: PQfinish called");
|
Dprintf("conn_close: PQfinish called");
|
||||||
|
@ -825,7 +826,7 @@ conn_switch_isolation_level(connectionObject *self, int level)
|
||||||
/* if the current isolation level is > 0 we need to abort the current
|
/* if the current isolation level is > 0 we need to abort the current
|
||||||
transaction before changing; that all folks! */
|
transaction before changing; that all folks! */
|
||||||
if (self->isolation_level != level && self->isolation_level > 0) {
|
if (self->isolation_level != level && self->isolation_level > 0) {
|
||||||
res = pq_abort_locked(self, &pgres, &error);
|
res = pq_abort_locked(self, &pgres, &error, &_save);
|
||||||
}
|
}
|
||||||
self->isolation_level = level;
|
self->isolation_level = level;
|
||||||
|
|
||||||
|
@ -862,10 +863,10 @@ conn_set_client_encoding(connectionObject *self, const char *enc)
|
||||||
|
|
||||||
/* abort the current transaction, to set the encoding ouside of
|
/* abort the current transaction, to set the encoding ouside of
|
||||||
transactions */
|
transactions */
|
||||||
res = pq_abort_locked(self, &pgres, &error);
|
res = pq_abort_locked(self, &pgres, &error, &_save);
|
||||||
|
|
||||||
if (res == 0) {
|
if (res == 0) {
|
||||||
res = pq_execute_command_locked(self, query, &pgres, &error);
|
res = pq_execute_command_locked(self, query, &pgres, &error, &_save);
|
||||||
if (res == 0) {
|
if (res == 0) {
|
||||||
/* no error, we can proceeed and store the new encoding */
|
/* no error, we can proceeed and store the new encoding */
|
||||||
if (self->encoding) free(self->encoding);
|
if (self->encoding) free(self->encoding);
|
||||||
|
|
|
@ -58,7 +58,7 @@ lobject_open(lobjectObject *self, connectionObject *conn,
|
||||||
Py_BEGIN_ALLOW_THREADS;
|
Py_BEGIN_ALLOW_THREADS;
|
||||||
pthread_mutex_lock(&(self->conn->lock));
|
pthread_mutex_lock(&(self->conn->lock));
|
||||||
|
|
||||||
retvalue = pq_begin_locked(self->conn, &pgres, &error);
|
retvalue = pq_begin_locked(self->conn, &pgres, &error, &_save);
|
||||||
if (retvalue < 0)
|
if (retvalue < 0)
|
||||||
goto end;
|
goto end;
|
||||||
|
|
||||||
|
@ -174,7 +174,7 @@ lobject_unlink(lobjectObject *self)
|
||||||
Py_BEGIN_ALLOW_THREADS;
|
Py_BEGIN_ALLOW_THREADS;
|
||||||
pthread_mutex_lock(&(self->conn->lock));
|
pthread_mutex_lock(&(self->conn->lock));
|
||||||
|
|
||||||
retvalue = pq_begin_locked(self->conn, &pgres, &error);
|
retvalue = pq_begin_locked(self->conn, &pgres, &error, &_save);
|
||||||
if (retvalue < 0)
|
if (retvalue < 0)
|
||||||
goto end;
|
goto end;
|
||||||
|
|
||||||
|
@ -316,7 +316,7 @@ lobject_export(lobjectObject *self, const char *filename)
|
||||||
Py_BEGIN_ALLOW_THREADS;
|
Py_BEGIN_ALLOW_THREADS;
|
||||||
pthread_mutex_lock(&(self->conn->lock));
|
pthread_mutex_lock(&(self->conn->lock));
|
||||||
|
|
||||||
retvalue = pq_begin_locked(self->conn, &pgres, &error);
|
retvalue = pq_begin_locked(self->conn, &pgres, &error, &_save);
|
||||||
if (retvalue < 0)
|
if (retvalue < 0)
|
||||||
goto end;
|
goto end;
|
||||||
|
|
||||||
|
|
|
@ -334,17 +334,29 @@ pq_set_non_blocking(connectionObject *conn, int arg, int pyerr)
|
||||||
|
|
||||||
On error, -1 is returned, and the pgres argument will hold the
|
On error, -1 is returned, and the pgres argument will hold the
|
||||||
relevant result structure.
|
relevant result structure.
|
||||||
|
|
||||||
|
The tstate parameter should be the pointer of the _save variable created by
|
||||||
|
Py_BEGIN_ALLOW_THREADS: this enables the function to acquire and release
|
||||||
|
again the GIL if needed, i.e. if a Python wait callback must be invoked.
|
||||||
*/
|
*/
|
||||||
int
|
int
|
||||||
pq_execute_command_locked(connectionObject *conn, const char *query,
|
pq_execute_command_locked(connectionObject *conn, const char *query,
|
||||||
PGresult **pgres, char **error)
|
PGresult **pgres, char **error,
|
||||||
|
PyThreadState **tstate)
|
||||||
{
|
{
|
||||||
int pgstatus, retvalue = -1;
|
int pgstatus, retvalue = -1;
|
||||||
|
|
||||||
Dprintf("pq_execute_command_locked: pgconn = %p, query = %s",
|
Dprintf("pq_execute_command_locked: pgconn = %p, query = %s",
|
||||||
conn->pgconn, query);
|
conn->pgconn, query);
|
||||||
*error = NULL;
|
*error = NULL;
|
||||||
*pgres = PQexec(conn->pgconn, query);
|
|
||||||
|
if (!psyco_green()) {
|
||||||
|
*pgres = PQexec(conn->pgconn, query);
|
||||||
|
} else {
|
||||||
|
PyEval_RestoreThread(*tstate);
|
||||||
|
*pgres = psyco_exec_green(conn, query);
|
||||||
|
*tstate = PyEval_SaveThread();
|
||||||
|
}
|
||||||
if (*pgres == NULL) {
|
if (*pgres == NULL) {
|
||||||
const char *msg;
|
const char *msg;
|
||||||
|
|
||||||
|
@ -406,7 +418,8 @@ pq_complete_error(connectionObject *conn, PGresult **pgres, char **error)
|
||||||
relevant result structure.
|
relevant result structure.
|
||||||
*/
|
*/
|
||||||
int
|
int
|
||||||
pq_begin_locked(connectionObject *conn, PGresult **pgres, char **error)
|
pq_begin_locked(connectionObject *conn, PGresult **pgres, char **error,
|
||||||
|
PyThreadState **tstate)
|
||||||
{
|
{
|
||||||
const char *query[] = {
|
const char *query[] = {
|
||||||
NULL,
|
NULL,
|
||||||
|
@ -423,7 +436,7 @@ pq_begin_locked(connectionObject *conn, PGresult **pgres, char **error)
|
||||||
}
|
}
|
||||||
|
|
||||||
result = pq_execute_command_locked(conn, query[conn->isolation_level],
|
result = pq_execute_command_locked(conn, query[conn->isolation_level],
|
||||||
pgres, error);
|
pgres, error, tstate);
|
||||||
if (result == 0)
|
if (result == 0)
|
||||||
conn->status = CONN_STATUS_BEGIN;
|
conn->status = CONN_STATUS_BEGIN;
|
||||||
|
|
||||||
|
@ -455,7 +468,7 @@ pq_commit(connectionObject *conn)
|
||||||
pthread_mutex_lock(&conn->lock);
|
pthread_mutex_lock(&conn->lock);
|
||||||
conn->mark += 1;
|
conn->mark += 1;
|
||||||
|
|
||||||
retvalue = pq_execute_command_locked(conn, "COMMIT", &pgres, &error);
|
retvalue = pq_execute_command_locked(conn, "COMMIT", &pgres, &error, &_save);
|
||||||
|
|
||||||
pthread_mutex_unlock(&conn->lock);
|
pthread_mutex_unlock(&conn->lock);
|
||||||
Py_END_ALLOW_THREADS;
|
Py_END_ALLOW_THREADS;
|
||||||
|
@ -473,7 +486,8 @@ pq_commit(connectionObject *conn)
|
||||||
}
|
}
|
||||||
|
|
||||||
int
|
int
|
||||||
pq_abort_locked(connectionObject *conn, PGresult **pgres, char **error)
|
pq_abort_locked(connectionObject *conn, PGresult **pgres, char **error,
|
||||||
|
PyThreadState **tstate)
|
||||||
{
|
{
|
||||||
int retvalue = -1;
|
int retvalue = -1;
|
||||||
|
|
||||||
|
@ -486,7 +500,7 @@ pq_abort_locked(connectionObject *conn, PGresult **pgres, char **error)
|
||||||
}
|
}
|
||||||
|
|
||||||
conn->mark += 1;
|
conn->mark += 1;
|
||||||
retvalue = pq_execute_command_locked(conn, "ROLLBACK", pgres, error);
|
retvalue = pq_execute_command_locked(conn, "ROLLBACK", pgres, error, tstate);
|
||||||
if (retvalue == 0)
|
if (retvalue == 0)
|
||||||
conn->status = CONN_STATUS_READY;
|
conn->status = CONN_STATUS_READY;
|
||||||
|
|
||||||
|
@ -516,7 +530,7 @@ pq_abort(connectionObject *conn)
|
||||||
Py_BEGIN_ALLOW_THREADS;
|
Py_BEGIN_ALLOW_THREADS;
|
||||||
pthread_mutex_lock(&conn->lock);
|
pthread_mutex_lock(&conn->lock);
|
||||||
|
|
||||||
retvalue = pq_abort_locked(conn, &pgres, &error);
|
retvalue = pq_abort_locked(conn, &pgres, &error, &_save);
|
||||||
|
|
||||||
pthread_mutex_unlock(&conn->lock);
|
pthread_mutex_unlock(&conn->lock);
|
||||||
Py_END_ALLOW_THREADS;
|
Py_END_ALLOW_THREADS;
|
||||||
|
@ -539,7 +553,8 @@ pq_abort(connectionObject *conn)
|
||||||
*/
|
*/
|
||||||
|
|
||||||
int
|
int
|
||||||
pq_reset_locked(connectionObject *conn, PGresult **pgres, char **error)
|
pq_reset_locked(connectionObject *conn, PGresult **pgres, char **error,
|
||||||
|
PyThreadState **tstate)
|
||||||
{
|
{
|
||||||
int retvalue = -1;
|
int retvalue = -1;
|
||||||
|
|
||||||
|
@ -549,15 +564,15 @@ pq_reset_locked(connectionObject *conn, PGresult **pgres, char **error)
|
||||||
conn->mark += 1;
|
conn->mark += 1;
|
||||||
|
|
||||||
if (conn->isolation_level > 0 && conn->status == CONN_STATUS_BEGIN) {
|
if (conn->isolation_level > 0 && conn->status == CONN_STATUS_BEGIN) {
|
||||||
retvalue = pq_execute_command_locked(conn, "ABORT", pgres, error);
|
retvalue = pq_execute_command_locked(conn, "ABORT", pgres, error, tstate);
|
||||||
if (retvalue != 0) return retvalue;
|
if (retvalue != 0) return retvalue;
|
||||||
}
|
}
|
||||||
|
|
||||||
retvalue = pq_execute_command_locked(conn, "RESET ALL", pgres, error);
|
retvalue = pq_execute_command_locked(conn, "RESET ALL", pgres, error, tstate);
|
||||||
if (retvalue != 0) return retvalue;
|
if (retvalue != 0) return retvalue;
|
||||||
|
|
||||||
retvalue = pq_execute_command_locked(conn,
|
retvalue = pq_execute_command_locked(conn,
|
||||||
"SET SESSION AUTHORIZATION DEFAULT", pgres, error);
|
"SET SESSION AUTHORIZATION DEFAULT", pgres, error, tstate);
|
||||||
if (retvalue != 0) return retvalue;
|
if (retvalue != 0) return retvalue;
|
||||||
|
|
||||||
conn->status = CONN_STATUS_READY;
|
conn->status = CONN_STATUS_READY;
|
||||||
|
@ -578,7 +593,7 @@ pq_reset(connectionObject *conn)
|
||||||
Py_BEGIN_ALLOW_THREADS;
|
Py_BEGIN_ALLOW_THREADS;
|
||||||
pthread_mutex_lock(&conn->lock);
|
pthread_mutex_lock(&conn->lock);
|
||||||
|
|
||||||
retvalue = pq_reset_locked(conn, &pgres, &error);
|
retvalue = pq_reset_locked(conn, &pgres, &error, &_save);
|
||||||
|
|
||||||
pthread_mutex_unlock(&conn->lock);
|
pthread_mutex_unlock(&conn->lock);
|
||||||
Py_END_ALLOW_THREADS;
|
Py_END_ALLOW_THREADS;
|
||||||
|
@ -684,7 +699,7 @@ pq_execute(cursorObject *curs, const char *query, int async)
|
||||||
Py_BEGIN_ALLOW_THREADS;
|
Py_BEGIN_ALLOW_THREADS;
|
||||||
pthread_mutex_lock(&(curs->conn->lock));
|
pthread_mutex_lock(&(curs->conn->lock));
|
||||||
|
|
||||||
if (pq_begin_locked(curs->conn, &pgres, &error) < 0) {
|
if (pq_begin_locked(curs->conn, &pgres, &error, &_save) < 0) {
|
||||||
pthread_mutex_unlock(&(curs->conn->lock));
|
pthread_mutex_unlock(&(curs->conn->lock));
|
||||||
Py_BLOCK_THREADS;
|
Py_BLOCK_THREADS;
|
||||||
pq_complete_error(curs->conn, &pgres, &error);
|
pq_complete_error(curs->conn, &pgres, &error);
|
||||||
|
|
|
@ -38,10 +38,10 @@
|
||||||
HIDDEN int pq_fetch(cursorObject *curs);
|
HIDDEN int pq_fetch(cursorObject *curs);
|
||||||
HIDDEN int pq_execute(cursorObject *curs, const char *query, int async);
|
HIDDEN int pq_execute(cursorObject *curs, const char *query, int async);
|
||||||
HIDDEN int pq_begin_locked(connectionObject *conn, PGresult **pgres,
|
HIDDEN int pq_begin_locked(connectionObject *conn, PGresult **pgres,
|
||||||
char **error);
|
char **error, PyThreadState **tstate);
|
||||||
HIDDEN int pq_commit(connectionObject *conn);
|
HIDDEN int pq_commit(connectionObject *conn);
|
||||||
HIDDEN int pq_abort_locked(connectionObject *conn, PGresult **pgres,
|
HIDDEN int pq_abort_locked(connectionObject *conn, PGresult **pgres,
|
||||||
char **error);
|
char **error, PyThreadState **tstate);
|
||||||
HIDDEN int pq_abort(connectionObject *conn);
|
HIDDEN int pq_abort(connectionObject *conn);
|
||||||
HIDDEN int pq_reset(connectionObject *conn);
|
HIDDEN int pq_reset(connectionObject *conn);
|
||||||
HIDDEN int pq_is_busy(connectionObject *conn);
|
HIDDEN int pq_is_busy(connectionObject *conn);
|
||||||
|
@ -53,7 +53,8 @@ HIDDEN void pq_set_critical(connectionObject *conn, const char *msg);
|
||||||
|
|
||||||
HIDDEN int pq_execute_command_locked(connectionObject *conn,
|
HIDDEN int pq_execute_command_locked(connectionObject *conn,
|
||||||
const char *query,
|
const char *query,
|
||||||
PGresult **pgres, char **error);
|
PGresult **pgres, char **error,
|
||||||
|
PyThreadState **tstate);
|
||||||
HIDDEN void pq_complete_error(connectionObject *conn, PGresult **pgres,
|
HIDDEN void pq_complete_error(connectionObject *conn, PGresult **pgres,
|
||||||
char **error);
|
char **error);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue
Block a user