Merge branch 'use-conn-pgres'

This commit is contained in:
Daniele Varrazzo 2019-03-17 04:43:58 +00:00
commit 155c739863
8 changed files with 196 additions and 321 deletions

View File

@ -87,7 +87,7 @@ struct connectionObject {
pthread_mutex_t lock; /* the global connection lock */ pthread_mutex_t lock; /* the global connection lock */
char *dsn; /* data source name */ char *dsn; /* data source name */
char *critical; /* critical error on this connection */ char *error; /* temporarily stored error before raising */
char *encoding; /* current backend encoding */ char *encoding; /* current backend encoding */
long int closed; /* 1 means connection has been closed; long int closed; /* 1 means connection has been closed;
@ -160,7 +160,7 @@ HIDDEN int conn_get_server_version(PGconn *pgconn);
HIDDEN void conn_notice_process(connectionObject *self); HIDDEN void conn_notice_process(connectionObject *self);
HIDDEN void conn_notice_clean(connectionObject *self); HIDDEN void conn_notice_clean(connectionObject *self);
HIDDEN void conn_notifies_process(connectionObject *self); HIDDEN void conn_notifies_process(connectionObject *self);
RAISES_NEG HIDDEN int conn_setup(connectionObject *self, PGconn *pgconn); RAISES_NEG HIDDEN int conn_setup(connectionObject *self);
HIDDEN int conn_connect(connectionObject *self, long int async); HIDDEN int conn_connect(connectionObject *self, long int async);
HIDDEN void conn_close(connectionObject *self); HIDDEN void conn_close(connectionObject *self);
HIDDEN void conn_close_locked(connectionObject *self); HIDDEN void conn_close_locked(connectionObject *self);
@ -174,6 +174,8 @@ RAISES_NEG HIDDEN int conn_tpc_begin(connectionObject *self, xidObject *xid);
RAISES_NEG HIDDEN int conn_tpc_command(connectionObject *self, RAISES_NEG HIDDEN int conn_tpc_command(connectionObject *self,
const char *cmd, xidObject *xid); const char *cmd, xidObject *xid);
HIDDEN PyObject *conn_tpc_recover(connectionObject *self); HIDDEN PyObject *conn_tpc_recover(connectionObject *self);
HIDDEN void conn_set_result(connectionObject *self, PGresult *pgres);
HIDDEN void conn_set_error(connectionObject *self, const char *msg);
/* exception-raising macros */ /* exception-raising macros */
#define EXC_IF_CONN_CLOSED(self) if ((self)->closed > 0) { \ #define EXC_IF_CONN_CLOSED(self) if ((self)->closed > 0) { \

View File

@ -649,25 +649,23 @@ conn_is_datestyle_ok(PGconn *pgconn)
/* conn_setup - setup and read basic information about the connection */ /* conn_setup - setup and read basic information about the connection */
RAISES_NEG int RAISES_NEG int
conn_setup(connectionObject *self, PGconn *pgconn) conn_setup(connectionObject *self)
{ {
PGresult *pgres = NULL;
char *error = NULL;
int rv = -1; int rv = -1;
self->equote = conn_get_standard_conforming_strings(pgconn); self->equote = conn_get_standard_conforming_strings(self->pgconn);
self->server_version = conn_get_server_version(pgconn); self->server_version = conn_get_server_version(self->pgconn);
self->protocol = conn_get_protocol_version(self->pgconn); self->protocol = conn_get_protocol_version(self->pgconn);
if (3 != self->protocol) { if (3 != self->protocol) {
PyErr_SetString(InterfaceError, "only protocol 3 supported"); PyErr_SetString(InterfaceError, "only protocol 3 supported");
goto exit; goto exit;
} }
if (0 > conn_read_encoding(self, pgconn)) { if (0 > conn_read_encoding(self, self->pgconn)) {
goto exit; goto exit;
} }
if (0 > conn_setup_cancel(self, pgconn)) { if (0 > conn_setup_cancel(self, self->pgconn)) {
goto exit; goto exit;
} }
@ -678,11 +676,10 @@ conn_setup(connectionObject *self, PGconn *pgconn)
if (!dsn_has_replication(self->dsn) && !conn_is_datestyle_ok(self->pgconn)) { if (!dsn_has_replication(self->dsn) && !conn_is_datestyle_ok(self->pgconn)) {
int res; int res;
Py_UNBLOCK_THREADS; Py_UNBLOCK_THREADS;
res = pq_set_guc_locked(self, "datestyle", "ISO", res = pq_set_guc_locked(self, "datestyle", "ISO", &_save);
&pgres, &error, &_save);
Py_BLOCK_THREADS; Py_BLOCK_THREADS;
if (res < 0) { if (res < 0) {
pq_complete_error(self, &pgres, &error); pq_complete_error(self);
goto unlock; goto unlock;
} }
} }
@ -710,7 +707,6 @@ exit:
static int static int
_conn_sync_connect(connectionObject *self) _conn_sync_connect(connectionObject *self)
{ {
PGconn *pgconn;
int green; int green;
/* store this value to prevent inconsistencies due to a change /* store this value to prevent inconsistencies due to a change
@ -718,31 +714,31 @@ _conn_sync_connect(connectionObject *self)
green = psyco_green(); green = psyco_green();
if (!green) { if (!green) {
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
self->pgconn = pgconn = PQconnectdb(self->dsn); self->pgconn = PQconnectdb(self->dsn);
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
Dprintf("conn_connect: new postgresql connection at %p", pgconn); Dprintf("conn_connect: new PG connection at %p", self->pgconn);
} }
else { else {
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
self->pgconn = pgconn = PQconnectStart(self->dsn); self->pgconn = PQconnectStart(self->dsn);
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
Dprintf("conn_connect: new green postgresql connection at %p", pgconn); Dprintf("conn_connect: new green PG connection at %p", self->pgconn);
} }
if (pgconn == NULL) if (!self->pgconn)
{ {
Dprintf("conn_connect: PQconnectdb(%s) FAILED", self->dsn); Dprintf("conn_connect: PQconnectdb(%s) FAILED", self->dsn);
PyErr_SetString(OperationalError, "PQconnectdb() failed"); PyErr_SetString(OperationalError, "PQconnectdb() failed");
return -1; return -1;
} }
else if (PQstatus(pgconn) == CONNECTION_BAD) else if (PQstatus(self->pgconn) == CONNECTION_BAD)
{ {
Dprintf("conn_connect: PQconnectdb(%s) returned BAD", self->dsn); Dprintf("conn_connect: PQconnectdb(%s) returned BAD", self->dsn);
PyErr_SetString(OperationalError, PQerrorMessage(pgconn)); PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn));
return -1; return -1;
} }
PQsetNoticeProcessor(pgconn, conn_notice_callback, (void*)self); PQsetNoticeProcessor(self->pgconn, conn_notice_callback, (void*)self);
/* if the connection is green, wait to finish connection */ /* if the connection is green, wait to finish connection */
if (green) { if (green) {
@ -759,7 +755,7 @@ _conn_sync_connect(connectionObject *self)
*/ */
self->status = CONN_STATUS_READY; self->status = CONN_STATUS_READY;
if (conn_setup(self, self->pgconn) == -1) { if (conn_setup(self) == -1) {
return -1; return -1;
} }
@ -1122,8 +1118,7 @@ conn_poll(connectionObject *self)
break; break;
} }
PQclear(curs->pgres); curs_set_result(curs, self->pgres);
curs->pgres = self->pgres;
self->pgres = NULL; self->pgres = NULL;
/* fetch the tuples (if there are any) and build the result. We /* fetch the tuples (if there are any) and build the result. We
@ -1225,8 +1220,6 @@ conn_set_session(connectionObject *self, int autocommit,
int isolevel, int readonly, int deferrable) int isolevel, int readonly, int deferrable)
{ {
int rv = -1; int rv = -1;
PGresult *pgres = NULL;
char *error = NULL;
int want_autocommit = autocommit == SRV_STATE_UNCHANGED ? int want_autocommit = autocommit == SRV_STATE_UNCHANGED ?
self->autocommit : autocommit; self->autocommit : autocommit;
@ -1256,21 +1249,21 @@ conn_set_session(connectionObject *self, int autocommit,
if (isolevel != SRV_STATE_UNCHANGED) { if (isolevel != SRV_STATE_UNCHANGED) {
if (0 > pq_set_guc_locked(self, if (0 > pq_set_guc_locked(self,
"default_transaction_isolation", srv_isolevels[isolevel], "default_transaction_isolation", srv_isolevels[isolevel],
&pgres, &error, &_save)) { &_save)) {
goto endlock; goto endlock;
} }
} }
if (readonly != SRV_STATE_UNCHANGED) { if (readonly != SRV_STATE_UNCHANGED) {
if (0 > pq_set_guc_locked(self, if (0 > pq_set_guc_locked(self,
"default_transaction_read_only", srv_state_guc[readonly], "default_transaction_read_only", srv_state_guc[readonly],
&pgres, &error, &_save)) { &_save)) {
goto endlock; goto endlock;
} }
} }
if (deferrable != SRV_STATE_UNCHANGED) { if (deferrable != SRV_STATE_UNCHANGED) {
if (0 > pq_set_guc_locked(self, if (0 > pq_set_guc_locked(self,
"default_transaction_deferrable", srv_state_guc[deferrable], "default_transaction_deferrable", srv_state_guc[deferrable],
&pgres, &error, &_save)) { &_save)) {
goto endlock; goto endlock;
} }
} }
@ -1281,21 +1274,21 @@ conn_set_session(connectionObject *self, int autocommit,
if (self->isolevel != ISOLATION_LEVEL_DEFAULT) { if (self->isolevel != ISOLATION_LEVEL_DEFAULT) {
if (0 > pq_set_guc_locked(self, if (0 > pq_set_guc_locked(self,
"default_transaction_isolation", "default", "default_transaction_isolation", "default",
&pgres, &error, &_save)) { &_save)) {
goto endlock; goto endlock;
} }
} }
if (self->readonly != STATE_DEFAULT) { if (self->readonly != STATE_DEFAULT) {
if (0 > pq_set_guc_locked(self, if (0 > pq_set_guc_locked(self,
"default_transaction_read_only", "default", "default_transaction_read_only", "default",
&pgres, &error, &_save)) { &_save)) {
goto endlock; goto endlock;
} }
} }
if (self->server_version >= 90100 && self->deferrable != STATE_DEFAULT) { if (self->server_version >= 90100 && self->deferrable != STATE_DEFAULT) {
if (0 > pq_set_guc_locked(self, if (0 > pq_set_guc_locked(self,
"default_transaction_deferrable", "default", "default_transaction_deferrable", "default",
&pgres, &error, &_save)) { &_save)) {
goto endlock; goto endlock;
} }
} }
@ -1320,7 +1313,7 @@ endlock:
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
if (rv < 0) { if (rv < 0) {
pq_complete_error(self, &pgres, &error); pq_complete_error(self);
goto exit; goto exit;
} }
@ -1339,8 +1332,6 @@ exit:
RAISES_NEG int RAISES_NEG int
conn_set_client_encoding(connectionObject *self, const char *pgenc) conn_set_client_encoding(connectionObject *self, const char *pgenc)
{ {
PGresult *pgres = NULL;
char *error = NULL;
int res = -1; int res = -1;
char *clean_enc = NULL; char *clean_enc = NULL;
@ -1356,12 +1347,11 @@ conn_set_client_encoding(connectionObject *self, const char *pgenc)
/* abort the current transaction, to set the encoding ouside of /* abort the current transaction, to set the encoding ouside of
transactions */ transactions */
if ((res = pq_abort_locked(self, &pgres, &error, &_save))) { if ((res = pq_abort_locked(self, &_save))) {
goto endlock; goto endlock;
} }
if ((res = pq_set_guc_locked(self, "client_encoding", clean_enc, if ((res = pq_set_guc_locked(self, "client_encoding", clean_enc, &_save))) {
&pgres, &error, &_save))) {
goto endlock; goto endlock;
} }
@ -1370,7 +1360,7 @@ endlock:
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
if (res < 0) { if (res < 0) {
pq_complete_error(self, &pgres, &error); pq_complete_error(self);
goto exit; goto exit;
} }
@ -1396,18 +1386,15 @@ exit:
RAISES_NEG int RAISES_NEG int
conn_tpc_begin(connectionObject *self, xidObject *xid) conn_tpc_begin(connectionObject *self, xidObject *xid)
{ {
PGresult *pgres = NULL;
char *error = NULL;
Dprintf("conn_tpc_begin: starting transaction"); Dprintf("conn_tpc_begin: starting transaction");
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&self->lock); pthread_mutex_lock(&self->lock);
if (pq_begin_locked(self, &pgres, &error, &_save) < 0) { if (pq_begin_locked(self, &_save) < 0) {
pthread_mutex_unlock(&(self->lock)); pthread_mutex_unlock(&(self->lock));
Py_BLOCK_THREADS; Py_BLOCK_THREADS;
pq_complete_error(self, &pgres, &error); pq_complete_error(self);
return -1; return -1;
} }
@ -1430,8 +1417,6 @@ conn_tpc_begin(connectionObject *self, xidObject *xid)
RAISES_NEG int RAISES_NEG int
conn_tpc_command(connectionObject *self, const char *cmd, xidObject *xid) conn_tpc_command(connectionObject *self, const char *cmd, xidObject *xid)
{ {
PGresult *pgres = NULL;
char *error = NULL;
PyObject *tid = NULL; PyObject *tid = NULL;
const char *ctid; const char *ctid;
int rv = -1; int rv = -1;
@ -1445,11 +1430,10 @@ conn_tpc_command(connectionObject *self, const char *cmd, xidObject *xid)
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&self->lock); pthread_mutex_lock(&self->lock);
if (0 > (rv = pq_tpc_command_locked(self, cmd, ctid, if (0 > (rv = pq_tpc_command_locked(self, cmd, ctid, &_save))) {
&pgres, &error, &_save))) {
pthread_mutex_unlock(&self->lock); pthread_mutex_unlock(&self->lock);
Py_BLOCK_THREADS; Py_BLOCK_THREADS;
pq_complete_error(self, &pgres, &error); pq_complete_error(self);
goto exit; goto exit;
} }
@ -1494,3 +1478,24 @@ exit:
return rv; return rv;
} }
void
conn_set_result(connectionObject *self, PGresult *pgres)
{
PQclear(self->pgres);
self->pgres = pgres;
}
void
conn_set_error(connectionObject *self, const char *msg)
{
if (self->error) {
free(self->error);
self->error = NULL;
}
if (msg && *msg) {
self->error = strdup(msg);
}
}

View File

@ -1052,7 +1052,7 @@ psyco_conn_reset(connectionObject *self, PyObject *dummy)
if (pq_reset(self) < 0) if (pq_reset(self) < 0)
return NULL; return NULL;
res = conn_setup(self, self->pgconn); res = conn_setup(self);
if (res < 0) if (res < 0)
return NULL; return NULL;
@ -1430,7 +1430,7 @@ connection_dealloc(PyObject* obj)
PyMem_Free(self->dsn); PyMem_Free(self->dsn);
PyMem_Free(self->encoding); PyMem_Free(self->encoding);
if (self->critical) free(self->critical); if (self->error) free(self->error);
if (self->cancel) PQfreeCancel(self->cancel); if (self->cancel) PQfreeCancel(self->cancel);
PQclear(self->pgres); PQclear(self->pgres);

View File

@ -96,6 +96,7 @@ HIDDEN void curs_reset(cursorObject *self);
RAISES_NEG HIDDEN int psyco_curs_withhold_set(cursorObject *self, PyObject *pyvalue); RAISES_NEG HIDDEN int psyco_curs_withhold_set(cursorObject *self, PyObject *pyvalue);
RAISES_NEG HIDDEN int psyco_curs_scrollable_set(cursorObject *self, PyObject *pyvalue); RAISES_NEG HIDDEN int psyco_curs_scrollable_set(cursorObject *self, PyObject *pyvalue);
HIDDEN PyObject *psyco_curs_validate_sql_basic(cursorObject *self, PyObject *sql); HIDDEN PyObject *psyco_curs_validate_sql_basic(cursorObject *self, PyObject *sql);
HIDDEN void curs_set_result(cursorObject *self, PGresult *pgres);
/* exception-raising macros */ /* exception-raising macros */
#define EXC_IF_CURS_CLOSED(self) \ #define EXC_IF_CURS_CLOSED(self) \

View File

@ -160,3 +160,11 @@ exit:
Py_XDECREF(comp); Py_XDECREF(comp);
return rv; return rv;
} }
void
curs_set_result(cursorObject *self, PGresult *pgres)
{
PQclear(self->pgres);
self->pgres = pgres;
}

View File

@ -33,12 +33,9 @@
#include <string.h> #include <string.h>
static void static void
collect_error(connectionObject *conn, char **error) collect_error(connectionObject *conn)
{ {
const char *msg = PQerrorMessage(conn->pgconn); conn_set_error(conn, PQerrorMessage(conn->pgconn));
if (msg)
*error = strdup(msg);
} }
@ -150,8 +147,6 @@ lobject_open(lobjectObject *self, connectionObject *conn,
Oid oid, const char *smode, Oid new_oid, const char *new_file) Oid oid, const char *smode, Oid new_oid, const char *new_file)
{ {
int retvalue = -1; int retvalue = -1;
PGresult *pgres = NULL;
char *error = NULL;
int pgmode = 0; int pgmode = 0;
int mode; int mode;
@ -162,7 +157,7 @@ lobject_open(lobjectObject *self, connectionObject *conn,
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&(self->conn->lock)); pthread_mutex_lock(&(self->conn->lock));
retvalue = pq_begin_locked(self->conn, &pgres, &error, &_save); retvalue = pq_begin_locked(self->conn, &_save);
if (retvalue < 0) if (retvalue < 0)
goto end; goto end;
@ -185,7 +180,7 @@ lobject_open(lobjectObject *self, connectionObject *conn,
self->oid); self->oid);
if (self->oid == InvalidOid) { if (self->oid == InvalidOid) {
collect_error(self->conn, &error); collect_error(self->conn);
retvalue = -1; retvalue = -1;
goto end; goto end;
} }
@ -205,7 +200,7 @@ lobject_open(lobjectObject *self, connectionObject *conn,
pgmode, self->fd); pgmode, self->fd);
if (self->fd == -1) { if (self->fd == -1) {
collect_error(self->conn, &error); collect_error(self->conn);
retvalue = -1; retvalue = -1;
goto end; goto end;
} }
@ -228,7 +223,7 @@ lobject_open(lobjectObject *self, connectionObject *conn,
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
if (retvalue < 0) if (retvalue < 0)
pq_complete_error(self->conn, &pgres, &error); pq_complete_error(self->conn);
/* if retvalue > 0, an exception is already set */ /* if retvalue > 0, an exception is already set */
return retvalue; return retvalue;
@ -237,7 +232,7 @@ lobject_open(lobjectObject *self, connectionObject *conn,
/* lobject_close - close an existing lo */ /* lobject_close - close an existing lo */
RAISES_NEG static int RAISES_NEG static int
lobject_close_locked(lobjectObject *self, char **error) lobject_close_locked(lobjectObject *self)
{ {
int retvalue; int retvalue;
@ -251,7 +246,7 @@ lobject_close_locked(lobjectObject *self, char **error)
return 0; return 0;
break; break;
default: default:
*error = strdup("the connection is broken"); conn_set_error(self->conn, "the connection is broken");
return -1; return -1;
break; break;
} }
@ -264,7 +259,7 @@ lobject_close_locked(lobjectObject *self, char **error)
retvalue = lo_close(self->conn->pgconn, self->fd); retvalue = lo_close(self->conn->pgconn, self->fd);
self->fd = -1; self->fd = -1;
if (retvalue < 0) if (retvalue < 0)
collect_error(self->conn, error); collect_error(self->conn);
return retvalue; return retvalue;
} }
@ -272,20 +267,18 @@ lobject_close_locked(lobjectObject *self, char **error)
RAISES_NEG int RAISES_NEG int
lobject_close(lobjectObject *self) lobject_close(lobjectObject *self)
{ {
PGresult *pgres = NULL;
char *error = NULL;
int retvalue; int retvalue;
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&(self->conn->lock)); pthread_mutex_lock(&(self->conn->lock));
retvalue = lobject_close_locked(self, &error); retvalue = lobject_close_locked(self);
pthread_mutex_unlock(&(self->conn->lock)); pthread_mutex_unlock(&(self->conn->lock));
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
if (retvalue < 0) if (retvalue < 0)
pq_complete_error(self->conn, &pgres, &error); pq_complete_error(self->conn);
return retvalue; return retvalue;
} }
@ -294,32 +287,30 @@ lobject_close(lobjectObject *self)
RAISES_NEG int RAISES_NEG int
lobject_unlink(lobjectObject *self) lobject_unlink(lobjectObject *self)
{ {
PGresult *pgres = NULL;
char *error = NULL;
int retvalue = -1; int retvalue = -1;
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&(self->conn->lock)); pthread_mutex_lock(&(self->conn->lock));
retvalue = pq_begin_locked(self->conn, &pgres, &error, &_save); retvalue = pq_begin_locked(self->conn, &_save);
if (retvalue < 0) if (retvalue < 0)
goto end; goto end;
/* first we make sure the lobject is closed and then we unlink */ /* first we make sure the lobject is closed and then we unlink */
retvalue = lobject_close_locked(self, &error); retvalue = lobject_close_locked(self);
if (retvalue < 0) if (retvalue < 0)
goto end; goto end;
retvalue = lo_unlink(self->conn->pgconn, self->oid); retvalue = lo_unlink(self->conn->pgconn, self->oid);
if (retvalue < 0) if (retvalue < 0)
collect_error(self->conn, &error); collect_error(self->conn);
end: end:
pthread_mutex_unlock(&(self->conn->lock)); pthread_mutex_unlock(&(self->conn->lock));
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
if (retvalue < 0) if (retvalue < 0)
pq_complete_error(self->conn, &pgres, &error); pq_complete_error(self->conn);
return retvalue; return retvalue;
} }
@ -329,8 +320,6 @@ RAISES_NEG Py_ssize_t
lobject_write(lobjectObject *self, const char *buf, size_t len) lobject_write(lobjectObject *self, const char *buf, size_t len)
{ {
Py_ssize_t written; Py_ssize_t written;
PGresult *pgres = NULL;
char *error = NULL;
Dprintf("lobject_writing: fd = %d, len = " FORMAT_CODE_SIZE_T, Dprintf("lobject_writing: fd = %d, len = " FORMAT_CODE_SIZE_T,
self->fd, len); self->fd, len);
@ -340,13 +329,13 @@ lobject_write(lobjectObject *self, const char *buf, size_t len)
written = lo_write(self->conn->pgconn, self->fd, buf, len); written = lo_write(self->conn->pgconn, self->fd, buf, len);
if (written < 0) if (written < 0)
collect_error(self->conn, &error); collect_error(self->conn);
pthread_mutex_unlock(&(self->conn->lock)); pthread_mutex_unlock(&(self->conn->lock));
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
if (written < 0) if (written < 0)
pq_complete_error(self->conn, &pgres, &error); pq_complete_error(self->conn);
return written; return written;
} }
@ -356,21 +345,19 @@ RAISES_NEG Py_ssize_t
lobject_read(lobjectObject *self, char *buf, size_t len) lobject_read(lobjectObject *self, char *buf, size_t len)
{ {
Py_ssize_t n_read; Py_ssize_t n_read;
PGresult *pgres = NULL;
char *error = NULL;
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&(self->conn->lock)); pthread_mutex_lock(&(self->conn->lock));
n_read = lo_read(self->conn->pgconn, self->fd, buf, len); n_read = lo_read(self->conn->pgconn, self->fd, buf, len);
if (n_read < 0) if (n_read < 0)
collect_error(self->conn, &error); collect_error(self->conn);
pthread_mutex_unlock(&(self->conn->lock)); pthread_mutex_unlock(&(self->conn->lock));
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
if (n_read < 0) if (n_read < 0)
pq_complete_error(self->conn, &pgres, &error); pq_complete_error(self->conn);
return n_read; return n_read;
} }
@ -379,8 +366,6 @@ lobject_read(lobjectObject *self, char *buf, size_t len)
RAISES_NEG Py_ssize_t RAISES_NEG Py_ssize_t
lobject_seek(lobjectObject *self, Py_ssize_t pos, int whence) lobject_seek(lobjectObject *self, Py_ssize_t pos, int whence)
{ {
PGresult *pgres = NULL;
char *error = NULL;
Py_ssize_t where; Py_ssize_t where;
Dprintf("lobject_seek: fd = %d, pos = " FORMAT_CODE_PY_SSIZE_T ", whence = %d", Dprintf("lobject_seek: fd = %d, pos = " FORMAT_CODE_PY_SSIZE_T ", whence = %d",
@ -400,13 +385,13 @@ lobject_seek(lobjectObject *self, Py_ssize_t pos, int whence)
#endif #endif
Dprintf("lobject_seek: where = " FORMAT_CODE_PY_SSIZE_T, where); Dprintf("lobject_seek: where = " FORMAT_CODE_PY_SSIZE_T, where);
if (where < 0) if (where < 0)
collect_error(self->conn, &error); collect_error(self->conn);
pthread_mutex_unlock(&(self->conn->lock)); pthread_mutex_unlock(&(self->conn->lock));
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
if (where < 0) if (where < 0)
pq_complete_error(self->conn, &pgres, &error); pq_complete_error(self->conn);
return where; return where;
} }
@ -415,8 +400,6 @@ lobject_seek(lobjectObject *self, Py_ssize_t pos, int whence)
RAISES_NEG Py_ssize_t RAISES_NEG Py_ssize_t
lobject_tell(lobjectObject *self) lobject_tell(lobjectObject *self)
{ {
PGresult *pgres = NULL;
char *error = NULL;
Py_ssize_t where; Py_ssize_t where;
Dprintf("lobject_tell: fd = %d", self->fd); Dprintf("lobject_tell: fd = %d", self->fd);
@ -435,13 +418,13 @@ lobject_tell(lobjectObject *self)
#endif #endif
Dprintf("lobject_tell: where = " FORMAT_CODE_PY_SSIZE_T, where); Dprintf("lobject_tell: where = " FORMAT_CODE_PY_SSIZE_T, where);
if (where < 0) if (where < 0)
collect_error(self->conn, &error); collect_error(self->conn);
pthread_mutex_unlock(&(self->conn->lock)); pthread_mutex_unlock(&(self->conn->lock));
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
if (where < 0) if (where < 0)
pq_complete_error(self->conn, &pgres, &error); pq_complete_error(self->conn);
return where; return where;
} }
@ -450,27 +433,25 @@ lobject_tell(lobjectObject *self)
RAISES_NEG int RAISES_NEG int
lobject_export(lobjectObject *self, const char *filename) lobject_export(lobjectObject *self, const char *filename)
{ {
PGresult *pgres = NULL;
char *error = NULL;
int retvalue; int retvalue;
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&(self->conn->lock)); pthread_mutex_lock(&(self->conn->lock));
retvalue = pq_begin_locked(self->conn, &pgres, &error, &_save); retvalue = pq_begin_locked(self->conn, &_save);
if (retvalue < 0) if (retvalue < 0)
goto end; goto end;
retvalue = lo_export(self->conn->pgconn, self->oid, filename); retvalue = lo_export(self->conn->pgconn, self->oid, filename);
if (retvalue < 0) if (retvalue < 0)
collect_error(self->conn, &error); collect_error(self->conn);
end: end:
pthread_mutex_unlock(&(self->conn->lock)); pthread_mutex_unlock(&(self->conn->lock));
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
if (retvalue < 0) if (retvalue < 0)
pq_complete_error(self->conn, &pgres, &error); pq_complete_error(self->conn);
return retvalue; return retvalue;
} }
@ -478,8 +459,6 @@ RAISES_NEG int
lobject_truncate(lobjectObject *self, size_t len) lobject_truncate(lobjectObject *self, size_t len)
{ {
int retvalue; int retvalue;
PGresult *pgres = NULL;
char *error = NULL;
Dprintf("lobject_truncate: fd = %d, len = " FORMAT_CODE_SIZE_T, Dprintf("lobject_truncate: fd = %d, len = " FORMAT_CODE_SIZE_T,
self->fd, len); self->fd, len);
@ -498,13 +477,13 @@ lobject_truncate(lobjectObject *self, size_t len)
#endif #endif
Dprintf("lobject_truncate: result = %d", retvalue); Dprintf("lobject_truncate: result = %d", retvalue);
if (retvalue < 0) if (retvalue < 0)
collect_error(self->conn, &error); collect_error(self->conn);
pthread_mutex_unlock(&(self->conn->lock)); pthread_mutex_unlock(&(self->conn->lock));
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
if (retvalue < 0) if (retvalue < 0)
pq_complete_error(self->conn, &pgres, &error); pq_complete_error(self->conn);
return retvalue; return retvalue;
} }

View File

@ -194,68 +194,6 @@ pq_raise(connectionObject *conn, cursorObject *curs, PGresult **pgres)
Py_XDECREF(pgcode); Py_XDECREF(pgcode);
} }
/* pq_set_critical, pq_resolve_critical - manage critical errors
this function is invoked when a PQexec() call returns NULL, meaning a
critical condition like out of memory or lost connection. it save the error
message and mark the connection as 'wanting cleanup'.
both functions do not call any Py_*_ALLOW_THREADS macros.
pq_resolve_critical should be called while holding the GIL. */
void
pq_set_critical(connectionObject *conn, const char *msg)
{
if (msg == NULL)
msg = PQerrorMessage(conn->pgconn);
if (conn->critical) free(conn->critical);
Dprintf("pq_set_critical: setting %s", msg);
if (msg && msg[0] != '\0') conn->critical = strdup(msg);
else conn->critical = NULL;
}
static void
pq_clear_critical(connectionObject *conn)
{
/* sometimes we know that the notice analizer set a critical that
was not really as such (like when raising an error for a delayed
contraint violation. it would be better to analyze the notice
or avoid the set-error-on-notice stuff at all but given that we
can't, some functions at least clear the critical status after
operations they know would result in a wrong critical to be set */
Dprintf("pq_clear_critical: clearing %s", conn->critical);
if (conn->critical) {
free(conn->critical);
conn->critical = NULL;
}
}
/* return -1 if the exception is set (i.e. if conn->critical is set),
* else 0 */
RAISES_NEG static int
pq_resolve_critical(connectionObject *conn, int close)
{
Dprintf("pq_resolve_critical: resolving %s", conn->critical);
if (conn->critical) {
char *msg = &(conn->critical[6]);
Dprintf("pq_resolve_critical: error = %s", msg);
/* we can't use pq_raise because the error has already been cleared
from the connection, so we just raise an OperationalError with the
critical message */
PyErr_SetString(OperationalError, msg);
/* we don't want to destroy this connection but just close it */
if (close == 1) conn_close(conn);
/* remember to clear the critical! */
pq_clear_critical(conn);
return -1;
}
return 0;
}
/* pq_clear_async - clear the effects of a previous async query /* pq_clear_async - clear the effects of a previous async query
note that this function does block because it needs to wait for the full note that this function does block because it needs to wait for the full
@ -273,9 +211,9 @@ pq_clear_async(connectionObject *conn)
finalize asynchronous processing so the connection will be ready to finalize asynchronous processing so the connection will be ready to
accept another query */ accept another query */
while ((pgres = PQgetResult(conn->pgconn)) != NULL) { while ((pgres = PQgetResult(conn->pgconn))) {
Dprintf("pq_clear_async: clearing PGresult at %p", pgres); Dprintf("pq_clear_async: clearing PGresult at %p", pgres);
CLEARPGRES(pgres); PQclear(pgres);
} }
Py_CLEAR(conn->async_cursor); Py_CLEAR(conn->async_cursor);
} }
@ -305,7 +243,7 @@ pq_set_non_blocking(connectionObject *conn, int arg)
This function should only be called on a locked connection without This function should only be called on a locked connection without
holding the global interpreter lock. holding the global interpreter lock.
On error, -1 is returned, and the pgres argument will hold the On error, -1 is returned, and the conn->pgres will hold the
relevant result structure. relevant result structure.
The tstate parameter should be the pointer of the _save variable created by The tstate parameter should be the pointer of the _save variable created by
@ -313,36 +251,31 @@ pq_set_non_blocking(connectionObject *conn, int arg)
again the GIL if needed, i.e. if a Python wait callback must be invoked. again the GIL if needed, i.e. if a Python wait callback must be invoked.
*/ */
int int
pq_execute_command_locked(connectionObject *conn, const char *query, pq_execute_command_locked(
PGresult **pgres, char **error, connectionObject *conn, const char *query, PyThreadState **tstate)
PyThreadState **tstate)
{ {
int pgstatus, retvalue = -1; int pgstatus, retvalue = -1;
Dprintf("pq_execute_command_locked: pgconn = %p, query = %s", Dprintf("pq_execute_command_locked: pgconn = %p, query = %s",
conn->pgconn, query); conn->pgconn, query);
*error = NULL;
if (!psyco_green()) { if (!psyco_green()) {
*pgres = PQexec(conn->pgconn, query); conn_set_result(conn, PQexec(conn->pgconn, query));
} else { } else {
PyEval_RestoreThread(*tstate); PyEval_RestoreThread(*tstate);
*pgres = psyco_exec_green(conn, query); conn_set_result(conn, psyco_exec_green(conn, query));
*tstate = PyEval_SaveThread(); *tstate = PyEval_SaveThread();
} }
if (*pgres == NULL) { if (conn->pgres == NULL) {
Dprintf("pq_execute_command_locked: PQexec returned NULL"); Dprintf("pq_execute_command_locked: PQexec returned NULL");
PyEval_RestoreThread(*tstate); PyEval_RestoreThread(*tstate);
if (!PyErr_Occurred()) { if (!PyErr_Occurred()) {
const char *msg; conn_set_error(conn, PQerrorMessage(conn->pgconn));
msg = PQerrorMessage(conn->pgconn);
if (msg && *msg) { *error = strdup(msg); }
} }
*tstate = PyEval_SaveThread(); *tstate = PyEval_SaveThread();
goto cleanup; goto cleanup;
} }
pgstatus = PQresultStatus(*pgres); pgstatus = PQresultStatus(conn->pgres);
if (pgstatus != PGRES_COMMAND_OK ) { if (pgstatus != PGRES_COMMAND_OK ) {
Dprintf("pq_execute_command_locked: result was not COMMAND_OK (%d)", Dprintf("pq_execute_command_locked: result was not COMMAND_OK (%d)",
pgstatus); pgstatus);
@ -350,7 +283,7 @@ pq_execute_command_locked(connectionObject *conn, const char *query,
} }
retvalue = 0; retvalue = 0;
CLEARPGRES(*pgres); CLEARPGRES(conn->pgres);
cleanup: cleanup:
return retvalue; return retvalue;
@ -365,17 +298,17 @@ cleanup:
lock. lock.
*/ */
RAISES void RAISES void
pq_complete_error(connectionObject *conn, PGresult **pgres, char **error) pq_complete_error(connectionObject *conn)
{ {
Dprintf("pq_complete_error: pgconn = %p, pgres = %p, error = %s", Dprintf("pq_complete_error: pgconn = %p, error = %s",
conn->pgconn, *pgres, *error ? *error : "(null)"); conn->pgconn, conn->error);
if (*pgres != NULL) { if (conn->pgres) {
pq_raise(conn, NULL, pgres); pq_raise(conn, NULL, &conn->pgres);
/* now *pgres is null */ /* now conn->pgres is null */
} }
else { else {
if (*error != NULL) { if (conn->error) {
PyErr_SetString(OperationalError, *error); PyErr_SetString(OperationalError, conn->error);
} else if (PyErr_Occurred()) { } else if (PyErr_Occurred()) {
/* There was a Python error (e.g. in the callback). Don't clobber /* There was a Python error (e.g. in the callback). Don't clobber
* it with an unknown exception. (see #410) */ * it with an unknown exception. (see #410) */
@ -392,11 +325,7 @@ pq_complete_error(connectionObject *conn, PGresult **pgres, char **error)
conn->closed = 2; conn->closed = 2;
} }
} }
conn_set_error(conn, NULL);
if (*error) {
free(*error);
*error = NULL;
}
} }
@ -405,12 +334,11 @@ pq_complete_error(connectionObject *conn, PGresult **pgres, char **error)
This function should only be called on a locked connection without This function should only be called on a locked connection without
holding the global interpreter lock. holding the global interpreter lock.
On error, -1 is returned, and the pgres argument will hold the On error, -1 is returned, and the conn->pgres argument will hold the
relevant result structure. relevant result structure.
*/ */
int int
pq_begin_locked(connectionObject *conn, PGresult **pgres, char **error, pq_begin_locked(connectionObject *conn, PyThreadState **tstate)
PyThreadState **tstate)
{ {
const size_t bufsize = 256; const size_t bufsize = 256;
char buf[256]; /* buf size must be same as bufsize */ char buf[256]; /* buf size must be same as bufsize */
@ -441,7 +369,7 @@ pq_begin_locked(connectionObject *conn, PGresult **pgres, char **error,
srv_deferrable[conn->deferrable]); srv_deferrable[conn->deferrable]);
} }
result = pq_execute_command_locked(conn, buf, pgres, error, tstate); result = pq_execute_command_locked(conn, buf, tstate);
if (result == 0) if (result == 0)
conn->status = CONN_STATUS_BEGIN; conn->status = CONN_STATUS_BEGIN;
@ -458,8 +386,6 @@ int
pq_commit(connectionObject *conn) pq_commit(connectionObject *conn)
{ {
int retvalue = -1; int retvalue = -1;
PGresult *pgres = NULL;
char *error = NULL;
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&conn->lock); pthread_mutex_lock(&conn->lock);
@ -473,7 +399,7 @@ pq_commit(connectionObject *conn)
} }
else { else {
conn->mark += 1; conn->mark += 1;
retvalue = pq_execute_command_locked(conn, "COMMIT", &pgres, &error, &_save); retvalue = pq_execute_command_locked(conn, "COMMIT", &_save);
} }
Py_BLOCK_THREADS; Py_BLOCK_THREADS;
@ -488,14 +414,13 @@ pq_commit(connectionObject *conn)
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
if (retvalue < 0) if (retvalue < 0)
pq_complete_error(conn, &pgres, &error); pq_complete_error(conn);
return retvalue; return retvalue;
} }
RAISES_NEG int RAISES_NEG int
pq_abort_locked(connectionObject *conn, PGresult **pgres, char **error, pq_abort_locked(connectionObject *conn, PyThreadState **tstate)
PyThreadState **tstate)
{ {
int retvalue = -1; int retvalue = -1;
@ -508,7 +433,7 @@ pq_abort_locked(connectionObject *conn, PGresult **pgres, char **error,
} }
conn->mark += 1; conn->mark += 1;
retvalue = pq_execute_command_locked(conn, "ROLLBACK", pgres, error, tstate); retvalue = pq_execute_command_locked(conn, "ROLLBACK", tstate);
if (retvalue == 0) if (retvalue == 0)
conn->status = CONN_STATUS_READY; conn->status = CONN_STATUS_READY;
@ -524,8 +449,6 @@ RAISES_NEG int
pq_abort(connectionObject *conn) pq_abort(connectionObject *conn)
{ {
int retvalue = -1; int retvalue = -1;
PGresult *pgres = NULL;
char *error = NULL;
Dprintf("pq_abort: pgconn = %p, autocommit = %d, status = %d", Dprintf("pq_abort: pgconn = %p, autocommit = %d, status = %d",
conn->pgconn, conn->autocommit, conn->status); conn->pgconn, conn->autocommit, conn->status);
@ -533,7 +456,7 @@ pq_abort(connectionObject *conn)
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&conn->lock); pthread_mutex_lock(&conn->lock);
retvalue = pq_abort_locked(conn, &pgres, &error, &_save); retvalue = pq_abort_locked(conn, &_save);
Py_BLOCK_THREADS; Py_BLOCK_THREADS;
conn_notice_process(conn); conn_notice_process(conn);
@ -543,7 +466,7 @@ pq_abort(connectionObject *conn)
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
if (retvalue < 0) if (retvalue < 0)
pq_complete_error(conn, &pgres, &error); pq_complete_error(conn);
return retvalue; return retvalue;
} }
@ -558,8 +481,7 @@ pq_abort(connectionObject *conn)
*/ */
RAISES_NEG int RAISES_NEG int
pq_reset_locked(connectionObject *conn, PGresult **pgres, char **error, pq_reset_locked(connectionObject *conn, PyThreadState **tstate)
PyThreadState **tstate)
{ {
int retvalue = -1; int retvalue = -1;
@ -569,20 +491,20 @@ pq_reset_locked(connectionObject *conn, PGresult **pgres, char **error,
conn->mark += 1; conn->mark += 1;
if (!conn->autocommit && conn->status == CONN_STATUS_BEGIN) { if (!conn->autocommit && conn->status == CONN_STATUS_BEGIN) {
retvalue = pq_execute_command_locked(conn, "ABORT", pgres, error, tstate); retvalue = pq_execute_command_locked(conn, "ABORT", tstate);
if (retvalue != 0) return retvalue; if (retvalue != 0) return retvalue;
} }
if (conn->server_version >= 80300) { if (conn->server_version >= 80300) {
retvalue = pq_execute_command_locked(conn, "DISCARD ALL", pgres, error, tstate); retvalue = pq_execute_command_locked(conn, "DISCARD ALL", tstate);
if (retvalue != 0) return retvalue; if (retvalue != 0) return retvalue;
} }
else { else {
retvalue = pq_execute_command_locked(conn, "RESET ALL", pgres, error, tstate); retvalue = pq_execute_command_locked(conn, "RESET ALL", tstate);
if (retvalue != 0) return retvalue; if (retvalue != 0) return retvalue;
retvalue = pq_execute_command_locked(conn, retvalue = pq_execute_command_locked(conn,
"SET SESSION AUTHORIZATION DEFAULT", pgres, error, tstate); "SET SESSION AUTHORIZATION DEFAULT", tstate);
if (retvalue != 0) return retvalue; if (retvalue != 0) return retvalue;
} }
@ -596,8 +518,6 @@ int
pq_reset(connectionObject *conn) pq_reset(connectionObject *conn)
{ {
int retvalue = -1; int retvalue = -1;
PGresult *pgres = NULL;
char *error = NULL;
Dprintf("pq_reset: pgconn = %p, autocommit = %d, status = %d", Dprintf("pq_reset: pgconn = %p, autocommit = %d, status = %d",
conn->pgconn, conn->autocommit, conn->status); conn->pgconn, conn->autocommit, conn->status);
@ -605,7 +525,7 @@ pq_reset(connectionObject *conn)
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&conn->lock); pthread_mutex_lock(&conn->lock);
retvalue = pq_reset_locked(conn, &pgres, &error, &_save); retvalue = pq_reset_locked(conn, &_save);
Py_BLOCK_THREADS; Py_BLOCK_THREADS;
conn_notice_process(conn); conn_notice_process(conn);
@ -615,7 +535,7 @@ pq_reset(connectionObject *conn)
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
if (retvalue < 0) { if (retvalue < 0) {
pq_complete_error(conn, &pgres, &error); pq_complete_error(conn);
} }
else { else {
Py_CLEAR(conn->tpc_xid); Py_CLEAR(conn->tpc_xid);
@ -633,9 +553,7 @@ pq_reset(connectionObject *conn)
*/ */
char * char *
pq_get_guc_locked( pq_get_guc_locked(connectionObject *conn, const char *param, PyThreadState **tstate)
connectionObject *conn, const char *param,
PGresult **pgres, char **error, PyThreadState **tstate)
{ {
char query[256]; char query[256];
int size; int size;
@ -645,40 +563,37 @@ pq_get_guc_locked(
size = PyOS_snprintf(query, sizeof(query), "SHOW %s", param); size = PyOS_snprintf(query, sizeof(query), "SHOW %s", param);
if (size < 0 || (size_t)size >= sizeof(query)) { if (size < 0 || (size_t)size >= sizeof(query)) {
*error = strdup("SHOW: query too large"); conn_set_error(conn, "SHOW: query too large");
goto cleanup; goto cleanup;
} }
Dprintf("pq_get_guc_locked: pgconn = %p, query = %s", conn->pgconn, query); Dprintf("pq_get_guc_locked: pgconn = %p, query = %s", conn->pgconn, query);
*error = NULL;
if (!psyco_green()) { if (!psyco_green()) {
*pgres = PQexec(conn->pgconn, query); conn_set_result(conn, PQexec(conn->pgconn, query));
} else { } else {
PyEval_RestoreThread(*tstate); PyEval_RestoreThread(*tstate);
*pgres = psyco_exec_green(conn, query); conn_set_result(conn, psyco_exec_green(conn, query));
*tstate = PyEval_SaveThread(); *tstate = PyEval_SaveThread();
} }
if (*pgres == NULL) { if (!conn->pgres) {
Dprintf("pq_get_guc_locked: PQexec returned NULL"); Dprintf("pq_get_guc_locked: PQexec returned NULL");
PyEval_RestoreThread(*tstate); PyEval_RestoreThread(*tstate);
if (!PyErr_Occurred()) { if (!PyErr_Occurred()) {
const char *msg; conn_set_error(conn, PQerrorMessage(conn->pgconn));
msg = PQerrorMessage(conn->pgconn);
if (msg && *msg) { *error = strdup(msg); }
} }
*tstate = PyEval_SaveThread(); *tstate = PyEval_SaveThread();
goto cleanup; goto cleanup;
} }
if (PQresultStatus(*pgres) != PGRES_TUPLES_OK) { if (PQresultStatus(conn->pgres) != PGRES_TUPLES_OK) {
Dprintf("pq_get_guc_locked: result was not TUPLES_OK (%d)", Dprintf("pq_get_guc_locked: result was not TUPLES_OK (%s)",
PQresultStatus(*pgres)); PQresStatus(PQresultStatus(conn->pgres)));
goto cleanup; goto cleanup;
} }
rv = strdup(PQgetvalue(*pgres, 0, 0)); rv = strdup(PQgetvalue(conn->pgres, 0, 0));
CLEARPGRES(*pgres); CLEARPGRES(conn->pgres);
cleanup: cleanup:
return rv; return rv;
@ -693,7 +608,7 @@ cleanup:
int int
pq_set_guc_locked( pq_set_guc_locked(
connectionObject *conn, const char *param, const char *value, connectionObject *conn, const char *param, const char *value,
PGresult **pgres, char **error, PyThreadState **tstate) PyThreadState **tstate)
{ {
char query[256]; char query[256];
int size; int size;
@ -710,11 +625,13 @@ pq_set_guc_locked(
"SET %s TO '%s'", param, value); "SET %s TO '%s'", param, value);
} }
if (size < 0 || (size_t)size >= sizeof(query)) { if (size < 0 || (size_t)size >= sizeof(query)) {
*error = strdup("SET: query too large"); conn_set_error(conn, "SET: query too large");
goto exit;
} }
rv = pq_execute_command_locked(conn, query, pgres, error, tstate); rv = pq_execute_command_locked(conn, query, tstate);
exit:
return rv; return rv;
} }
@ -724,8 +641,9 @@ pq_set_guc_locked(
* holding the global interpreter lock. */ * holding the global interpreter lock. */
int int
pq_tpc_command_locked(connectionObject *conn, const char *cmd, const char *tid, pq_tpc_command_locked(
PGresult **pgres, char **error, PyThreadState **tstate) connectionObject *conn, const char *cmd, const char *tid,
PyThreadState **tstate)
{ {
int rv = -1; int rv = -1;
char *etid = NULL, *buf = NULL; char *etid = NULL, *buf = NULL;
@ -752,7 +670,7 @@ pq_tpc_command_locked(connectionObject *conn, const char *cmd, const char *tid,
/* run the command and let it handle the error cases */ /* run the command and let it handle the error cases */
*tstate = PyEval_SaveThread(); *tstate = PyEval_SaveThread();
rv = pq_execute_command_locked(conn, buf, pgres, error, tstate); rv = pq_execute_command_locked(conn, buf, tstate);
PyEval_RestoreThread(*tstate); PyEval_RestoreThread(*tstate);
exit: exit:
@ -767,7 +685,7 @@ exit:
/* pq_get_result_async - read an available result without blocking. /* pq_get_result_async - read an available result without blocking.
* *
* Return 0 if the result is ready, 1 if it will block, -1 on error. * Return 0 if the result is ready, 1 if it will block, -1 on error.
* The last result will be returned in pgres. * The last result will be returned in conn->pgres.
* *
* The function should be called with the lock and holding the GIL. * The function should be called with the lock and holding the GIL.
*/ */
@ -827,8 +745,7 @@ pq_get_result_async(connectionObject *conn)
PQclear(res); PQclear(res);
} }
else { else {
PQclear(conn->pgres); conn_set_result(conn, res);
conn->pgres = res;
} }
switch (status) { switch (status) {
@ -889,42 +806,41 @@ pq_flush(connectionObject *conn)
RAISES_NEG int RAISES_NEG int
_pq_execute_sync(cursorObject *curs, const char *query, int no_result, int no_begin) _pq_execute_sync(cursorObject *curs, const char *query, int no_result, int no_begin)
{ {
PGresult *pgres = NULL; connectionObject *conn = curs->conn;
char *error = NULL;
CLEARPGRES(curs->pgres); CLEARPGRES(curs->pgres);
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&(curs->conn->lock)); pthread_mutex_lock(&(conn->lock));
if (!no_begin && pq_begin_locked(curs->conn, &pgres, &error, &_save) < 0) { if (!no_begin && pq_begin_locked(conn, &_save) < 0) {
pthread_mutex_unlock(&(curs->conn->lock)); pthread_mutex_unlock(&(conn->lock));
Py_BLOCK_THREADS; Py_BLOCK_THREADS;
pq_complete_error(curs->conn, &pgres, &error); pq_complete_error(conn);
return -1; return -1;
} }
Dprintf("pq_execute: executing SYNC query: pgconn = %p", curs->conn->pgconn); Dprintf("pq_execute: executing SYNC query: pgconn = %p", conn->pgconn);
Dprintf(" %-.200s", query); Dprintf(" %-.200s", query);
if (!psyco_green()) { if (!psyco_green()) {
pgres = PQexec(curs->conn->pgconn, query); conn_set_result(conn, PQexec(conn->pgconn, query));
} }
else { else {
Py_BLOCK_THREADS; Py_BLOCK_THREADS;
pgres = psyco_exec_green(curs->conn, query); conn_set_result(conn, psyco_exec_green(conn, query));
Py_UNBLOCK_THREADS; Py_UNBLOCK_THREADS;
} }
/* don't let pgres = NULL go to pq_fetch() */ /* don't let pgres = NULL go to pq_fetch() */
if (pgres == NULL) { if (!conn->pgres) {
if (CONNECTION_BAD == PQstatus(curs->conn->pgconn)) { if (CONNECTION_BAD == PQstatus(conn->pgconn)) {
curs->conn->closed = 2; conn->closed = 2;
} }
pthread_mutex_unlock(&(curs->conn->lock)); pthread_mutex_unlock(&(conn->lock));
Py_BLOCK_THREADS; Py_BLOCK_THREADS;
if (!PyErr_Occurred()) { if (!PyErr_Occurred()) {
PyErr_SetString(OperationalError, PyErr_SetString(OperationalError,
PQerrorMessage(curs->conn->pgconn)); PQerrorMessage(conn->pgconn));
} }
return -1; return -1;
} }
@ -932,18 +848,18 @@ _pq_execute_sync(cursorObject *curs, const char *query, int no_result, int no_be
Py_BLOCK_THREADS; Py_BLOCK_THREADS;
/* assign the result back to the cursor now that we have the GIL */ /* assign the result back to the cursor now that we have the GIL */
curs->pgres = pgres; curs_set_result(curs, conn->pgres);
pgres = NULL; conn->pgres = NULL;
/* Process notifies here instead of when fetching the tuple as we are /* Process notifies here instead of when fetching the tuple as we are
* into the same critical section that received the data. Without this * into the same critical section that received the data. Without this
* care, reading notifies may disrupt other thread communications. * care, reading notifies may disrupt other thread communications.
* (as in ticket #55). */ * (as in ticket #55). */
conn_notifies_process(curs->conn); conn_notifies_process(conn);
conn_notice_process(curs->conn); conn_notice_process(conn);
Py_UNBLOCK_THREADS; Py_UNBLOCK_THREADS;
pthread_mutex_unlock(&(curs->conn->lock)); pthread_mutex_unlock(&(conn->lock));
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
/* if the execute was sync, we call pq_fetch() immediately, /* if the execute was sync, we call pq_fetch() immediately,
@ -958,29 +874,30 @@ RAISES_NEG int
_pq_execute_async(cursorObject *curs, const char *query, int no_result) _pq_execute_async(cursorObject *curs, const char *query, int no_result)
{ {
int async_status = ASYNC_WRITE; int async_status = ASYNC_WRITE;
connectionObject *conn = curs->conn;
int ret; int ret;
CLEARPGRES(curs->pgres); CLEARPGRES(curs->pgres);
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&(curs->conn->lock)); pthread_mutex_lock(&(conn->lock));
Dprintf("pq_execute: executing ASYNC query: pgconn = %p", curs->conn->pgconn); Dprintf("pq_execute: executing ASYNC query: pgconn = %p", conn->pgconn);
Dprintf(" %-.200s", query); Dprintf(" %-.200s", query);
if (PQsendQuery(curs->conn->pgconn, query) == 0) { if (PQsendQuery(conn->pgconn, query) == 0) {
if (CONNECTION_BAD == PQstatus(curs->conn->pgconn)) { if (CONNECTION_BAD == PQstatus(conn->pgconn)) {
curs->conn->closed = 2; conn->closed = 2;
} }
pthread_mutex_unlock(&(curs->conn->lock)); pthread_mutex_unlock(&(conn->lock));
Py_BLOCK_THREADS; Py_BLOCK_THREADS;
PyErr_SetString(OperationalError, PyErr_SetString(OperationalError,
PQerrorMessage(curs->conn->pgconn)); PQerrorMessage(conn->pgconn));
return -1; return -1;
} }
Dprintf("pq_execute: async query sent to backend"); Dprintf("pq_execute: async query sent to backend");
ret = PQflush(curs->conn->pgconn); ret = PQflush(conn->pgconn);
if (ret == 0) { if (ret == 0) {
/* the query got fully sent to the server */ /* the query got fully sent to the server */
Dprintf("pq_execute: query got flushed immediately"); Dprintf("pq_execute: query got flushed immediately");
@ -993,18 +910,18 @@ _pq_execute_async(cursorObject *curs, const char *query, int no_result)
} }
else { else {
/* there was an error */ /* there was an error */
pthread_mutex_unlock(&(curs->conn->lock)); pthread_mutex_unlock(&(conn->lock));
Py_BLOCK_THREADS; Py_BLOCK_THREADS;
PyErr_SetString(OperationalError, PyErr_SetString(OperationalError,
PQerrorMessage(curs->conn->pgconn)); PQerrorMessage(conn->pgconn));
return -1; return -1;
} }
pthread_mutex_unlock(&(curs->conn->lock)); pthread_mutex_unlock(&(conn->lock));
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
curs->conn->async_status = async_status; conn->async_status = async_status;
if (!(curs->conn->async_cursor if (!(conn->async_cursor
= PyWeakref_NewRef((PyObject *)curs, NULL))) { = PyWeakref_NewRef((PyObject *)curs, NULL))) {
return -1; return -1;
} }
@ -1015,12 +932,6 @@ _pq_execute_async(cursorObject *curs, const char *query, int no_result)
RAISES_NEG int RAISES_NEG int
pq_execute(cursorObject *curs, const char *query, int async, int no_result, int no_begin) pq_execute(cursorObject *curs, const char *query, int async, int no_result, int no_begin)
{ {
/* if the status of the connection is critical raise an exception and
definitely close the connection */
if (curs->conn->critical) {
return pq_resolve_critical(curs->conn, 1);
}
/* check status of connection, raise error if not OK */ /* check status of connection, raise error if not OK */
if (PQstatus(curs->conn->pgconn) != CONNECTION_OK) { if (PQstatus(curs->conn->pgconn) != CONNECTION_OK) {
Dprintf("pq_execute: connection NOT OK"); Dprintf("pq_execute: connection NOT OK");
@ -1421,7 +1332,7 @@ _pq_copy_in_v3(cursorObject *curs)
/* and finally we grab the operation result from the backend */ /* and finally we grab the operation result from the backend */
for (;;) { for (;;) {
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
curs->pgres = PQgetResult(curs->conn->pgconn); curs_set_result(curs, PQgetResult(curs->conn->pgconn));
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
if (NULL == curs->pgres) if (NULL == curs->pgres)
@ -1502,10 +1413,9 @@ _pq_copy_out_v3(cursorObject *curs)
} }
/* and finally we grab the operation result from the backend */ /* and finally we grab the operation result from the backend */
CLEARPGRES(curs->pgres);
for (;;) { for (;;) {
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
curs->pgres = PQgetResult(curs->conn->pgconn); curs_set_result(curs, PQgetResult(curs->conn->pgconn));
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
if (NULL == curs->pgres) if (NULL == curs->pgres)
@ -1584,7 +1494,7 @@ retry:
} }
if (len == -1) { if (len == -1) {
/* EOF */ /* EOF */
curs->pgres = PQgetResult(pgconn); curs_set_result(curs, PQgetResult(pgconn));
if (curs->pgres && PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR) { if (curs->pgres && PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR) {
pq_raise(conn, curs, NULL); pq_raise(conn, curs, NULL);
@ -1805,20 +1715,6 @@ pq_fetch(cursorObject *curs, int no_result)
/* even if we fail, we remove any information about the previous query */ /* even if we fail, we remove any information about the previous query */
curs_reset(curs); curs_reset(curs);
/* check for PGRES_FATAL_ERROR result */
/* FIXME: I am not sure we need to check for critical error here.
if (curs->pgres == NULL) {
Dprintf("pq_fetch: got a NULL pgres, checking for critical");
pq_set_critical(curs->conn);
if (curs->conn->critical) {
return pq_resolve_critical(curs->conn);
}
else {
return 0;
}
}
*/
if (curs->pgres == NULL) return 0; if (curs->pgres == NULL) return 0;
pgstatus = PQresultStatus(curs->pgres); pgstatus = PQresultStatus(curs->pgres);
@ -1916,13 +1812,5 @@ pq_fetch(cursorObject *curs, int no_result)
break; break;
} }
/* error checking, close the connection if necessary (some critical errors
are not really critical, like a COPY FROM error: if that's the case we
raise the exception but we avoid to close the connection) */
Dprintf("pq_fetch: fetching done; check for critical errors");
if (curs->conn->critical) {
return pq_resolve_critical(curs->conn, ex == -1 ? 1 : 0);
}
return ex; return ex;
} }

View File

@ -39,24 +39,19 @@ RAISES_NEG HIDDEN int pq_fetch(cursorObject *curs, int no_result);
RAISES_NEG HIDDEN int pq_execute(cursorObject *curs, const char *query, RAISES_NEG HIDDEN int pq_execute(cursorObject *curs, const char *query,
int async, int no_result, int no_begin); int async, int no_result, int no_begin);
HIDDEN int pq_send_query(connectionObject *conn, const char *query); HIDDEN int pq_send_query(connectionObject *conn, const char *query);
HIDDEN int pq_begin_locked(connectionObject *conn, PGresult **pgres, HIDDEN int pq_begin_locked(connectionObject *conn, PyThreadState **tstate);
char **error, PyThreadState **tstate);
HIDDEN int pq_commit(connectionObject *conn); HIDDEN int pq_commit(connectionObject *conn);
RAISES_NEG HIDDEN int pq_abort_locked(connectionObject *conn, PGresult **pgres, RAISES_NEG HIDDEN int pq_abort_locked(connectionObject *conn,
char **error, PyThreadState **tstate); PyThreadState **tstate);
RAISES_NEG HIDDEN int pq_abort(connectionObject *conn); RAISES_NEG HIDDEN int pq_abort(connectionObject *conn);
HIDDEN int pq_reset_locked(connectionObject *conn, PGresult **pgres, HIDDEN int pq_reset_locked(connectionObject *conn, PyThreadState **tstate);
char **error, PyThreadState **tstate);
RAISES_NEG HIDDEN int pq_reset(connectionObject *conn); RAISES_NEG HIDDEN int pq_reset(connectionObject *conn);
HIDDEN char *pq_get_guc_locked(connectionObject *conn, const char *param, HIDDEN char *pq_get_guc_locked(connectionObject *conn, const char *param,
PGresult **pgres, PyThreadState **tstate);
char **error, PyThreadState **tstate);
HIDDEN int pq_set_guc_locked(connectionObject *conn, const char *param, HIDDEN int pq_set_guc_locked(connectionObject *conn, const char *param,
const char *value, PGresult **pgres, const char *value, PyThreadState **tstate);
char **error, PyThreadState **tstate);
HIDDEN int pq_tpc_command_locked(connectionObject *conn, HIDDEN int pq_tpc_command_locked(connectionObject *conn,
const char *cmd, const char *tid, const char *cmd, const char *tid,
PGresult **pgres, char **error,
PyThreadState **tstate); PyThreadState **tstate);
RAISES_NEG HIDDEN int pq_get_result_async(connectionObject *conn); RAISES_NEG HIDDEN int pq_get_result_async(connectionObject *conn);
HIDDEN int pq_flush(connectionObject *conn); HIDDEN int pq_flush(connectionObject *conn);
@ -65,12 +60,9 @@ RAISES_NEG HIDDEN int pq_set_non_blocking(connectionObject *conn, int arg);
HIDDEN void pq_set_critical(connectionObject *conn, const char *msg); HIDDEN void pq_set_critical(connectionObject *conn, const char *msg);
HIDDEN int pq_execute_command_locked(connectionObject *conn, HIDDEN int pq_execute_command_locked(connectionObject *conn, const char *query,
const char *query,
PGresult **pgres, char **error,
PyThreadState **tstate); PyThreadState **tstate);
RAISES HIDDEN void pq_complete_error(connectionObject *conn, PGresult **pgres, RAISES HIDDEN void pq_complete_error(connectionObject *conn);
char **error);
/* replication protocol support */ /* replication protocol support */
HIDDEN int pq_copy_both(replicationCursorObject *repl, PyObject *consumer, HIDDEN int pq_copy_both(replicationCursorObject *repl, PyObject *consumer,