2007-12-23 James Henstridge <james@jamesh.id.au>

* psycopg/pqpath.c (pq_execute_command_locked): add an error
	argument to hold an error when no PGresult is returned by PQexec,
	rather than using pq_set_critical().
	(pq_complete_error): new function that converts the error returned
	by pq_execute_command_locked() to a Python exception.
	(pq_begin_locked): add error argument.
	(pq_commit): use pq_complete_error().
	(pq_abort): use pq_complete_error().
	(pq_abort_locked): always call pq_set_critical() on error, and
	clear the error message from pq_execute_command_locked().
	(pq_execute): use pq_complete_error() to handle the error from
	pq_begin_locked().

	* psycopg/pqpath.c (pq_begin): remove unused function.

	* psycopg/connection_type.c (psyco_conn_commit): if conn_commit()
	raises an error, just return NULL, since it is now setting an
	exception itself.
	(psyco_conn_rollback): same here.

	* psycopg/connection_int.c (conn_commit): don't drop GIL and lock
	connection before calling pq_commit().
	(conn_rollback): same here.
	(conn_close): use pq_abort_locked().
	(conn_switch_isolation_level): same here.
	(conn_set_client_encoding): same here.

	* psycopg/pqpath.h: add prototype for pq_abort_locked().

	* psycopg/pqpath.c (pq_commit): convert function to run with GIL
	held, and handle errors appropriately.
	(pq_abort): same here.
	(pq_abort_locked): new function to abort a locked connection.

2007-12-22  James Henstridge  <james@jamesh.id.au>

	* psycopg/pqpath.c (pq_raise): add a "pgres" argument so we can
	generate nice errors not related to a particular cursor.
	(pq_execute): use pq_begin_locked() rather than pq_begin().  Use
	pq_raise() to handle any errors from it.

	* psycopg/pqpath.c (pq_execute_command_locked): helper function
	used to execute a command-style query on a locked connection.
	(pq_begin_locked): a variant of pq_begin() that uses
	pq_execute_command_locked().
	(pq_begin): rewrite to use pq_begin_locked().
This commit is contained in:
James Henstridge 2008-01-10 06:14:20 +00:00
parent 5fe08ae83e
commit d190d5918a
5 changed files with 218 additions and 115 deletions

View File

@ -1,3 +1,52 @@
2007-12-23 James Henstridge <james@jamesh.id.au>
* psycopg/pqpath.c (pq_execute_command_locked): add an error
argument to hold an error when no PGresult is returned by PQexec,
rather than using pq_set_critical().
(pq_complete_error): new function that converts the error returned
by pq_execute_command_locked() to a Python exception.
(pq_begin_locked): add error argument.
(pq_commit): use pq_complete_error().
(pq_abort): use pq_complete_error().
(pq_abort_locked): always call pq_set_critical() on error, and
clear the error message from pq_execute_command_locked().
(pq_execute): use pq_complete_error() to handle the error from
pq_begin_locked().
* psycopg/pqpath.c (pq_begin): remove unused function.
* psycopg/connection_type.c (psyco_conn_commit): if conn_commit()
raises an error, just return NULL, since it is now setting an
exception itself.
(psyco_conn_rollback): same here.
* psycopg/connection_int.c (conn_commit): don't drop GIL and lock
connection before calling pq_commit().
(conn_rollback): same here.
(conn_close): use pq_abort_locked().
(conn_switch_isolation_level): same here.
(conn_set_client_encoding): same here.
* psycopg/pqpath.h: add prototype for pq_abort_locked().
* psycopg/pqpath.c (pq_commit): convert function to run with GIL
held, and handle errors appropriately.
(pq_abort): same here.
(pq_abort_locked): new function to abort a locked connection.
2007-12-22 James Henstridge <james@jamesh.id.au>
* psycopg/pqpath.c (pq_raise): add a "pgres" argument so we can
generate nice errors not related to a particular cursor.
(pq_execute): use pq_begin_locked() rather than pq_begin(). Use
pq_raise() to handle any errors from it.
* psycopg/pqpath.c (pq_execute_command_locked): helper function
used to execute a command-style query on a locked connection.
(pq_begin_locked): a variant of pq_begin() that uses
pq_execute_command_locked().
(pq_begin): rewrite to use pq_begin_locked().
2007-12-22 James Henstridge <james@jamesh.id.au>
* psycopg/config.h: only print debug messages if

View File

@ -217,7 +217,7 @@ conn_close(connectionObject *self)
/* execute a forced rollback on the connection (but don't check the
result, we're going to close the pq connection anyway */
if (self->pgconn) {
pq_abort(self);
pq_abort_locked(self);
PQfinish(self->pgconn);
Dprintf("conn_close: PQfinish called");
self->pgconn = NULL;
@ -235,17 +235,8 @@ conn_commit(connectionObject *self)
{
int res;
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&self->lock);
res = pq_commit(self);
self->mark++;
pthread_mutex_unlock(&self->lock);
Py_END_ALLOW_THREADS;
if (res == -1)
pq_resolve_critical(self, 0);
return res;
}
@ -256,17 +247,8 @@ conn_rollback(connectionObject *self)
{
int res;
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&self->lock);
res = pq_abort(self);
self->mark++;
pthread_mutex_unlock(&self->lock);
Py_END_ALLOW_THREADS;
if (res == -1)
pq_resolve_critical(self, 0);
return res;
}
@ -286,7 +268,7 @@ conn_switch_isolation_level(connectionObject *self, int level)
/* if the current isolation level is > 0 we need to abort the current
transaction before changing; that all folks! */
if (self->isolation_level != level && self->isolation_level > 0) {
res = pq_abort(self);
res = pq_abort_locked(self);
}
self->isolation_level = level;
self->mark++;
@ -322,7 +304,7 @@ conn_set_client_encoding(connectionObject *self, char *enc)
/* abort the current transaction, to set the encoding ouside of
transactions */
res = pq_abort(self);
res = pq_abort_locked(self);
if (res == 0) {
pgres = PQexec(self->pgconn, query);

View File

@ -117,11 +117,8 @@ psyco_conn_commit(connectionObject *self, PyObject *args)
if (!PyArg_ParseTuple(args, "")) return NULL;
if (conn_commit(self) < 0) {
PyErr_SetString(OperationalError,
PQerrorMessage(self->pgconn));
if (conn_commit(self) < 0)
return NULL;
}
Py_INCREF(Py_None);
return Py_None;
@ -140,11 +137,8 @@ psyco_conn_rollback(connectionObject *self, PyObject *args)
if (!PyArg_ParseTuple(args, "")) return NULL;
if (conn_rollback(self) < 0) {
PyErr_SetString(OperationalError,
PQerrorMessage(self->pgconn));
if (conn_rollback(self) < 0)
return NULL;
}
Py_INCREF(Py_None);
return Py_None;

View File

@ -61,8 +61,8 @@ strip_severity(const char *msg)
This function should be called while holding the GIL. */
static void
pq_raise(connectionObject *conn, cursorObject *curs, PyObject *exc,
const char *msg)
pq_raise(connectionObject *conn, cursorObject *curs, PGresult *pgres,
PyObject *exc, const char *msg)
{
PyObject *pgc = (PyObject*)curs;
@ -76,11 +76,14 @@ pq_raise(connectionObject *conn, cursorObject *curs, PyObject *exc,
return;
}
if (curs && curs->pgres) {
err = PQresultErrorMessage(curs->pgres);
if (pgres == NULL && curs != NULL)
pgres = curs->pgres;
if (pgres) {
err = PQresultErrorMessage(pgres);
#ifdef HAVE_PQPROTOCOL3
if (err != NULL && conn->protocol == 3) {
code = PQresultErrorField(curs->pgres, PG_DIAG_SQLSTATE);
code = PQresultErrorField(pgres, PG_DIAG_SQLSTATE);
}
#endif
}
@ -98,11 +101,11 @@ pq_raise(connectionObject *conn, cursorObject *curs, PyObject *exc,
/* if exc is NULL, analyze the message and try to deduce the right
exception kind (only if we have a pgres, obviously) */
if (exc == NULL) {
if (curs && curs->pgres) {
if (pgres) {
if (conn->protocol == 3) {
#ifdef HAVE_PQPROTOCOL3
char *pgstate =
PQresultErrorField(curs->pgres, PG_DIAG_SQLSTATE);
PQresultErrorField(pgres, PG_DIAG_SQLSTATE);
if (pgstate != NULL && !strncmp(pgstate, "23", 2))
exc = IntegrityError;
else
@ -227,63 +230,122 @@ pq_clear_async(connectionObject *conn)
} while (pgres != NULL);
}
/* pq_begin - send a BEGIN WORK, if necessary
/* pg_execute_command_locked - execute a no-result query on a locked connection.
this function does not call any Py_*_ALLOW_THREADS macros */
This function should only be called on a locked connection without
holding the global interpreter lock.
int
pq_begin(connectionObject *conn)
On error, -1 is returned, and the pgres argument will hold the
relevant result structure.
*/
static int
pq_execute_command_locked(connectionObject *conn, const char *query,
PGresult **pgres, char **error)
{
int pgstatus, retvalue = -1;
Dprintf("pq_execute_command_locked: pgconn = %p, query = %s",
conn->pgconn, query);
*error = NULL;
*pgres = PQexec(conn->pgconn, query);
if (*pgres == NULL) {
const char *msg;
Dprintf("pq_execute_command_locked: PQexec returned NULL");
msg = PQerrorMessage(conn->pgconn);
if (msg)
*error = strdup(msg);
goto cleanup;
}
pgstatus = PQresultStatus(*pgres);
if (pgstatus != PGRES_COMMAND_OK ) {
Dprintf("pq_execute_command_locked: result was not COMMAND_OK (%d)",
pgstatus);
goto cleanup;
}
retvalue = 0;
IFCLEARPGRES(*pgres);
cleanup:
return retvalue;
}
/* pq_complete_error: handle an error from pq_execute_command_locked()
If pq_execute_command_locked() returns -1, this function should be
called to convert the result to a Python exception.
This function should be called while holding the global interpreter
lock.
*/
static void
pq_complete_error(connectionObject *conn, PGresult **pgres, char **error)
{
Dprintf("pq_complete_error: pgconn = %p, pgres = %p, error = %s",
conn->pgconn, *pgres, *error ? *error : "(null)");
if (*pgres != NULL)
pq_raise(conn, NULL, *pgres, OperationalError, NULL);
else if (*error != NULL) {
PyErr_SetString(OperationalError, *error);
free(*error);
} else {
PyErr_SetString(OperationalError, "unknown error");
}
IFCLEARPGRES(*pgres);
if (*error) {
free(*error);
*error = NULL;
}
}
/* pq_begin_locked - begin a transaction, if necessary
This function should only be called on a locked connection without
holding the global interpreter lock.
On error, -1 is returned, and the pgres argument will hold the
relevant result structure.
*/
static int
pq_begin_locked(connectionObject *conn, PGresult **pgres, char **error)
{
const char *query[] = {
NULL,
"BEGIN; SET TRANSACTION ISOLATION LEVEL READ COMMITTED",
"BEGIN; SET TRANSACTION ISOLATION LEVEL SERIALIZABLE"};
int result;
int pgstatus, retvalue = -1;
PGresult *pgres = NULL;
Dprintf("pq_begin: pgconn = %p, isolevel = %ld, status = %d",
Dprintf("pq_begin_locked: pgconn = %p, isolevel = %ld, status = %d",
conn->pgconn, conn->isolation_level, conn->status);
if (conn->isolation_level == 0 || conn->status != CONN_STATUS_READY) {
Dprintf("pq_begin: transaction in progress");
Dprintf("pq_begin_locked: transaction in progress");
return 0;
}
pq_clear_async(conn);
pgres = PQexec(conn->pgconn, query[conn->isolation_level]);
if (pgres == NULL) {
Dprintf("pq_begin: PQexec() failed");
pq_set_critical(conn, NULL);
goto cleanup;
}
result = pq_execute_command_locked(conn, query[conn->isolation_level],
pgres, error);
if (result == 0)
conn->status = CONN_STATUS_BEGIN;
pgstatus = PQresultStatus(pgres);
if (pgstatus != PGRES_COMMAND_OK ) {
Dprintf("pq_begin: result is NOT OK");
pq_set_critical(conn, NULL);
goto cleanup;
}
Dprintf("pq_begin: issued '%s' command", query[conn->isolation_level]);
retvalue = 0;
conn->status = CONN_STATUS_BEGIN;
cleanup:
IFCLEARPGRES(pgres);
return retvalue;
return result;
}
/* pq_commit - send an END, if necessary
this function does not call any Py_*_ALLOW_THREADS macros */
This function should be called while holding the global interpreter
lock. */
int
pq_commit(connectionObject *conn)
{
const char *query = "END";
int pgstatus, retvalue = -1;
int retvalue = -1;
PGresult *pgres = NULL;
char *error = NULL;
Dprintf("pq_commit: pgconn = %p, isolevel = %ld, status = %d",
conn->pgconn, conn->isolation_level, conn->status);
@ -293,43 +355,63 @@ pq_commit(connectionObject *conn)
return 0;
}
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&conn->lock);
pq_clear_async(conn);
pgres = PQexec(conn->pgconn, query);
if (pgres == NULL) {
Dprintf("pq_commit: PQexec() failed");
pq_set_critical(conn, NULL);
goto cleanup;
}
retvalue = pq_execute_command_locked(conn, "COMMIT", &pgres, &error);
pgstatus = PQresultStatus(pgres);
if (pgstatus != PGRES_COMMAND_OK ) {
Dprintf("pq_commit: result is NOT OK");
/* if the result is not OK the transaction has been rolled back
so we set the status to CONN_STATUS_READY anyway */
conn->status = CONN_STATUS_READY;
pq_set_critical(conn, NULL);
goto cleanup;
}
Dprintf("pq_commit: issued '%s' command", query);
pthread_mutex_unlock(&conn->lock);
Py_END_ALLOW_THREADS;
retvalue = 0;
if (retvalue < 0)
pq_complete_error(conn, &pgres, &error);
/* Even if an error occurred, the connection will be rolled back,
so we unconditionally set the connection status here. */
conn->status = CONN_STATUS_READY;
cleanup:
return retvalue;
}
int
pq_abort_locked(connectionObject *conn)
{
int retvalue = -1;
PGresult *pgres = NULL;
char *error = NULL;
Dprintf("pq_abort_locked: pgconn = %p, isolevel = %ld, status = %d",
conn->pgconn, conn->isolation_level, conn->status);
if (conn->isolation_level == 0 || conn->status != CONN_STATUS_BEGIN) {
Dprintf("pq_abort_locked: no transaction to abort");
return 0;
}
pq_clear_async(conn);
retvalue = pq_execute_command_locked(conn, "ROLLBACK", &pgres, &error);
if (retvalue < 0)
pq_set_critical(conn, NULL);
IFCLEARPGRES(pgres);
if (error)
free(error);
return retvalue;
}
/* pq_abort - send an ABORT, if necessary
this function does not call any Py_*_ALLOW_THREADS macros */
This function should be called while holding the global interpreter
lock. */
int
pq_abort(connectionObject *conn)
{
const char *query = "ABORT";
int pgstatus, retvalue = -1;
int retvalue = -1;
PGresult *pgres = NULL;
char *error = NULL;
Dprintf("pq_abort: pgconn = %p, isolevel = %ld, status = %d",
conn->pgconn, conn->isolation_level, conn->status);
@ -339,27 +421,20 @@ pq_abort(connectionObject *conn)
return 0;
}
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&conn->lock);
pq_clear_async(conn);
pgres = PQexec(conn->pgconn, query);
if (pgres == NULL) {
Dprintf("pq_abort: PQexec() failed");
pq_set_critical(conn, NULL);
goto cleanup;
}
retvalue = pq_execute_command_locked(conn, "ROLLBACK", &pgres, &error);
pgstatus = PQresultStatus(pgres);
if (pgstatus != PGRES_COMMAND_OK ) {
Dprintf("pq_abort: result is NOT OK");
pq_set_critical(conn, NULL);
goto cleanup;
}
Dprintf("pq_abort: issued '%s' command", query);
pthread_mutex_unlock(&conn->lock);
Py_END_ALLOW_THREADS;
retvalue = 0;
conn->status = CONN_STATUS_READY;
if (retvalue < 0)
pq_complete_error(conn, &pgres, &error);
else
conn->status = CONN_STATUS_READY;
cleanup:
IFCLEARPGRES(pgres);
return retvalue;
}
@ -424,6 +499,9 @@ pq_is_busy(connectionObject *conn)
int
pq_execute(cursorObject *curs, const char *query, int async)
{
PGresult *pgres = NULL;
char *error = NULL;
/* if the status of the connection is critical raise an exception and
definitely close the connection */
if (curs->conn->critical) {
@ -442,10 +520,10 @@ pq_execute(cursorObject *curs, const char *query, int async)
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&(curs->conn->lock));
if (pq_begin(curs->conn) < 0) {
if (pq_begin_locked(curs->conn, &pgres, &error) < 0) {
pthread_mutex_unlock(&(curs->conn->lock));
Py_BLOCK_THREADS;
pq_resolve_critical(curs->conn, 0);
pq_complete_error(curs->conn, &pgres, &error);
return -1;
}
@ -719,7 +797,7 @@ _pq_copy_in_v3(cursorObject *curs)
IFCLEARPGRES(curs->pgres);
while ((curs->pgres = PQgetResult(curs->conn->pgconn)) != NULL) {
if (PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR)
pq_raise(curs->conn, curs, NULL, NULL);
pq_raise(curs->conn, curs, NULL, NULL, NULL);
IFCLEARPGRES(curs->pgres);
}
@ -753,7 +831,7 @@ _pq_copy_in(cursorObject *curs)
IFCLEARPGRES(curs->pgres);
while ((curs->pgres = PQgetResult(curs->conn->pgconn)) != NULL) {
if (PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR)
pq_raise(curs->conn, curs, NULL, NULL);
pq_raise(curs->conn, curs, NULL, NULL, NULL);
IFCLEARPGRES(curs->pgres);
}
@ -790,7 +868,7 @@ _pq_copy_out_v3(cursorObject *curs)
}
if (len == -2) {
pq_raise(curs->conn, NULL, NULL, NULL);
pq_raise(curs->conn, NULL, NULL, NULL, NULL);
return -1;
}
@ -798,7 +876,7 @@ _pq_copy_out_v3(cursorObject *curs)
IFCLEARPGRES(curs->pgres);
while ((curs->pgres = PQgetResult(curs->conn->pgconn)) != NULL) {
if (PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR)
pq_raise(curs->conn, curs, NULL, NULL);
pq_raise(curs->conn, curs, NULL, NULL, NULL);
IFCLEARPGRES(curs->pgres);
}
return 1;
@ -849,7 +927,7 @@ _pq_copy_out(cursorObject *curs)
IFCLEARPGRES(curs->pgres);
while ((curs->pgres = PQgetResult(curs->conn->pgconn)) != NULL) {
if (PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR)
pq_raise(curs->conn, curs, NULL, NULL);
pq_raise(curs->conn, curs, NULL, NULL, NULL);
IFCLEARPGRES(curs->pgres);
}
@ -984,7 +1062,7 @@ pq_fetch(cursorObject *curs)
default:
Dprintf("pq_fetch: uh-oh, something FAILED");
pq_raise(curs->conn, curs, NULL, NULL);
pq_raise(curs->conn, curs, NULL, NULL, NULL);
IFCLEARPGRES(curs->pgres);
ex = -1;
break;

View File

@ -32,8 +32,8 @@
/* exported functions */
extern int pq_fetch(cursorObject *curs);
extern int pq_execute(cursorObject *curs, const char *query, int async);
extern int pq_begin(connectionObject *conn);
extern int pq_commit(connectionObject *conn);
extern int pq_abort_locked(connectionObject *conn);
extern int pq_abort(connectionObject *conn);
extern int pq_is_busy(connectionObject *conn);
extern void pq_set_critical(connectionObject *conn, const char *msg);