Use the connection's PGresult to pass results through calls

This commit is contained in:
Daniele Varrazzo 2019-03-17 02:11:41 +00:00
parent 5957a7ee45
commit c15e4c1a85
4 changed files with 113 additions and 132 deletions

View File

@ -651,7 +651,6 @@ conn_is_datestyle_ok(PGconn *pgconn)
RAISES_NEG int RAISES_NEG int
conn_setup(connectionObject *self, PGconn *pgconn) conn_setup(connectionObject *self, PGconn *pgconn)
{ {
PGresult *pgres = NULL;
char *error = NULL; char *error = NULL;
int rv = -1; int rv = -1;
@ -678,11 +677,10 @@ conn_setup(connectionObject *self, PGconn *pgconn)
if (!dsn_has_replication(self->dsn) && !conn_is_datestyle_ok(self->pgconn)) { if (!dsn_has_replication(self->dsn) && !conn_is_datestyle_ok(self->pgconn)) {
int res; int res;
Py_UNBLOCK_THREADS; Py_UNBLOCK_THREADS;
res = pq_set_guc_locked(self, "datestyle", "ISO", res = pq_set_guc_locked(self, "datestyle", "ISO", &error, &_save);
&pgres, &error, &_save);
Py_BLOCK_THREADS; Py_BLOCK_THREADS;
if (res < 0) { if (res < 0) {
pq_complete_error(self, &pgres, &error); pq_complete_error(self, &error);
goto unlock; goto unlock;
} }
} }
@ -1225,7 +1223,6 @@ conn_set_session(connectionObject *self, int autocommit,
int isolevel, int readonly, int deferrable) int isolevel, int readonly, int deferrable)
{ {
int rv = -1; int rv = -1;
PGresult *pgres = NULL;
char *error = NULL; char *error = NULL;
int want_autocommit = autocommit == SRV_STATE_UNCHANGED ? int want_autocommit = autocommit == SRV_STATE_UNCHANGED ?
self->autocommit : autocommit; self->autocommit : autocommit;
@ -1256,21 +1253,21 @@ conn_set_session(connectionObject *self, int autocommit,
if (isolevel != SRV_STATE_UNCHANGED) { if (isolevel != SRV_STATE_UNCHANGED) {
if (0 > pq_set_guc_locked(self, if (0 > pq_set_guc_locked(self,
"default_transaction_isolation", srv_isolevels[isolevel], "default_transaction_isolation", srv_isolevels[isolevel],
&pgres, &error, &_save)) { &error, &_save)) {
goto endlock; goto endlock;
} }
} }
if (readonly != SRV_STATE_UNCHANGED) { if (readonly != SRV_STATE_UNCHANGED) {
if (0 > pq_set_guc_locked(self, if (0 > pq_set_guc_locked(self,
"default_transaction_read_only", srv_state_guc[readonly], "default_transaction_read_only", srv_state_guc[readonly],
&pgres, &error, &_save)) { &error, &_save)) {
goto endlock; goto endlock;
} }
} }
if (deferrable != SRV_STATE_UNCHANGED) { if (deferrable != SRV_STATE_UNCHANGED) {
if (0 > pq_set_guc_locked(self, if (0 > pq_set_guc_locked(self,
"default_transaction_deferrable", srv_state_guc[deferrable], "default_transaction_deferrable", srv_state_guc[deferrable],
&pgres, &error, &_save)) { &error, &_save)) {
goto endlock; goto endlock;
} }
} }
@ -1281,21 +1278,21 @@ conn_set_session(connectionObject *self, int autocommit,
if (self->isolevel != ISOLATION_LEVEL_DEFAULT) { if (self->isolevel != ISOLATION_LEVEL_DEFAULT) {
if (0 > pq_set_guc_locked(self, if (0 > pq_set_guc_locked(self,
"default_transaction_isolation", "default", "default_transaction_isolation", "default",
&pgres, &error, &_save)) { &error, &_save)) {
goto endlock; goto endlock;
} }
} }
if (self->readonly != STATE_DEFAULT) { if (self->readonly != STATE_DEFAULT) {
if (0 > pq_set_guc_locked(self, if (0 > pq_set_guc_locked(self,
"default_transaction_read_only", "default", "default_transaction_read_only", "default",
&pgres, &error, &_save)) { &error, &_save)) {
goto endlock; goto endlock;
} }
} }
if (self->server_version >= 90100 && self->deferrable != STATE_DEFAULT) { if (self->server_version >= 90100 && self->deferrable != STATE_DEFAULT) {
if (0 > pq_set_guc_locked(self, if (0 > pq_set_guc_locked(self,
"default_transaction_deferrable", "default", "default_transaction_deferrable", "default",
&pgres, &error, &_save)) { &error, &_save)) {
goto endlock; goto endlock;
} }
} }
@ -1320,7 +1317,7 @@ endlock:
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
if (rv < 0) { if (rv < 0) {
pq_complete_error(self, &pgres, &error); pq_complete_error(self, &error);
goto exit; goto exit;
} }
@ -1339,7 +1336,6 @@ exit:
RAISES_NEG int RAISES_NEG int
conn_set_client_encoding(connectionObject *self, const char *pgenc) conn_set_client_encoding(connectionObject *self, const char *pgenc)
{ {
PGresult *pgres = NULL;
char *error = NULL; char *error = NULL;
int res = -1; int res = -1;
char *clean_enc = NULL; char *clean_enc = NULL;
@ -1356,12 +1352,12 @@ conn_set_client_encoding(connectionObject *self, const char *pgenc)
/* abort the current transaction, to set the encoding ouside of /* abort the current transaction, to set the encoding ouside of
transactions */ transactions */
if ((res = pq_abort_locked(self, &pgres, &error, &_save))) { if ((res = pq_abort_locked(self, &error, &_save))) {
goto endlock; goto endlock;
} }
if ((res = pq_set_guc_locked(self, "client_encoding", clean_enc, if ((res = pq_set_guc_locked(self, "client_encoding", clean_enc,
&pgres, &error, &_save))) { &error, &_save))) {
goto endlock; goto endlock;
} }
@ -1370,7 +1366,7 @@ endlock:
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
if (res < 0) { if (res < 0) {
pq_complete_error(self, &pgres, &error); pq_complete_error(self, &error);
goto exit; goto exit;
} }
@ -1396,7 +1392,6 @@ exit:
RAISES_NEG int RAISES_NEG int
conn_tpc_begin(connectionObject *self, xidObject *xid) conn_tpc_begin(connectionObject *self, xidObject *xid)
{ {
PGresult *pgres = NULL;
char *error = NULL; char *error = NULL;
Dprintf("conn_tpc_begin: starting transaction"); Dprintf("conn_tpc_begin: starting transaction");
@ -1404,10 +1399,10 @@ conn_tpc_begin(connectionObject *self, xidObject *xid)
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&self->lock); pthread_mutex_lock(&self->lock);
if (pq_begin_locked(self, &pgres, &error, &_save) < 0) { if (pq_begin_locked(self, &error, &_save) < 0) {
pthread_mutex_unlock(&(self->lock)); pthread_mutex_unlock(&(self->lock));
Py_BLOCK_THREADS; Py_BLOCK_THREADS;
pq_complete_error(self, &pgres, &error); pq_complete_error(self, &error);
return -1; return -1;
} }
@ -1430,7 +1425,6 @@ conn_tpc_begin(connectionObject *self, xidObject *xid)
RAISES_NEG int RAISES_NEG int
conn_tpc_command(connectionObject *self, const char *cmd, xidObject *xid) conn_tpc_command(connectionObject *self, const char *cmd, xidObject *xid)
{ {
PGresult *pgres = NULL;
char *error = NULL; char *error = NULL;
PyObject *tid = NULL; PyObject *tid = NULL;
const char *ctid; const char *ctid;
@ -1446,10 +1440,10 @@ conn_tpc_command(connectionObject *self, const char *cmd, xidObject *xid)
pthread_mutex_lock(&self->lock); pthread_mutex_lock(&self->lock);
if (0 > (rv = pq_tpc_command_locked(self, cmd, ctid, if (0 > (rv = pq_tpc_command_locked(self, cmd, ctid,
&pgres, &error, &_save))) { &error, &_save))) {
pthread_mutex_unlock(&self->lock); pthread_mutex_unlock(&self->lock);
Py_BLOCK_THREADS; Py_BLOCK_THREADS;
pq_complete_error(self, &pgres, &error); pq_complete_error(self, &error);
goto exit; goto exit;
} }

View File

@ -150,7 +150,6 @@ lobject_open(lobjectObject *self, connectionObject *conn,
Oid oid, const char *smode, Oid new_oid, const char *new_file) Oid oid, const char *smode, Oid new_oid, const char *new_file)
{ {
int retvalue = -1; int retvalue = -1;
PGresult *pgres = NULL;
char *error = NULL; char *error = NULL;
int pgmode = 0; int pgmode = 0;
int mode; int mode;
@ -162,7 +161,7 @@ lobject_open(lobjectObject *self, connectionObject *conn,
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&(self->conn->lock)); pthread_mutex_lock(&(self->conn->lock));
retvalue = pq_begin_locked(self->conn, &pgres, &error, &_save); retvalue = pq_begin_locked(self->conn, &error, &_save);
if (retvalue < 0) if (retvalue < 0)
goto end; goto end;
@ -228,7 +227,7 @@ lobject_open(lobjectObject *self, connectionObject *conn,
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
if (retvalue < 0) if (retvalue < 0)
pq_complete_error(self->conn, &pgres, &error); pq_complete_error(self->conn, &error);
/* if retvalue > 0, an exception is already set */ /* if retvalue > 0, an exception is already set */
return retvalue; return retvalue;
@ -272,7 +271,6 @@ lobject_close_locked(lobjectObject *self, char **error)
RAISES_NEG int RAISES_NEG int
lobject_close(lobjectObject *self) lobject_close(lobjectObject *self)
{ {
PGresult *pgres = NULL;
char *error = NULL; char *error = NULL;
int retvalue; int retvalue;
@ -285,7 +283,7 @@ lobject_close(lobjectObject *self)
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
if (retvalue < 0) if (retvalue < 0)
pq_complete_error(self->conn, &pgres, &error); pq_complete_error(self->conn, &error);
return retvalue; return retvalue;
} }
@ -294,14 +292,13 @@ lobject_close(lobjectObject *self)
RAISES_NEG int RAISES_NEG int
lobject_unlink(lobjectObject *self) lobject_unlink(lobjectObject *self)
{ {
PGresult *pgres = NULL;
char *error = NULL; char *error = NULL;
int retvalue = -1; int retvalue = -1;
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&(self->conn->lock)); pthread_mutex_lock(&(self->conn->lock));
retvalue = pq_begin_locked(self->conn, &pgres, &error, &_save); retvalue = pq_begin_locked(self->conn, &error, &_save);
if (retvalue < 0) if (retvalue < 0)
goto end; goto end;
@ -319,7 +316,7 @@ lobject_unlink(lobjectObject *self)
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
if (retvalue < 0) if (retvalue < 0)
pq_complete_error(self->conn, &pgres, &error); pq_complete_error(self->conn, &error);
return retvalue; return retvalue;
} }
@ -329,7 +326,6 @@ RAISES_NEG Py_ssize_t
lobject_write(lobjectObject *self, const char *buf, size_t len) lobject_write(lobjectObject *self, const char *buf, size_t len)
{ {
Py_ssize_t written; Py_ssize_t written;
PGresult *pgres = NULL;
char *error = NULL; char *error = NULL;
Dprintf("lobject_writing: fd = %d, len = " FORMAT_CODE_SIZE_T, Dprintf("lobject_writing: fd = %d, len = " FORMAT_CODE_SIZE_T,
@ -346,7 +342,7 @@ lobject_write(lobjectObject *self, const char *buf, size_t len)
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
if (written < 0) if (written < 0)
pq_complete_error(self->conn, &pgres, &error); pq_complete_error(self->conn, &error);
return written; return written;
} }
@ -356,7 +352,6 @@ RAISES_NEG Py_ssize_t
lobject_read(lobjectObject *self, char *buf, size_t len) lobject_read(lobjectObject *self, char *buf, size_t len)
{ {
Py_ssize_t n_read; Py_ssize_t n_read;
PGresult *pgres = NULL;
char *error = NULL; char *error = NULL;
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
@ -370,7 +365,7 @@ lobject_read(lobjectObject *self, char *buf, size_t len)
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
if (n_read < 0) if (n_read < 0)
pq_complete_error(self->conn, &pgres, &error); pq_complete_error(self->conn, &error);
return n_read; return n_read;
} }
@ -379,7 +374,6 @@ lobject_read(lobjectObject *self, char *buf, size_t len)
RAISES_NEG Py_ssize_t RAISES_NEG Py_ssize_t
lobject_seek(lobjectObject *self, Py_ssize_t pos, int whence) lobject_seek(lobjectObject *self, Py_ssize_t pos, int whence)
{ {
PGresult *pgres = NULL;
char *error = NULL; char *error = NULL;
Py_ssize_t where; Py_ssize_t where;
@ -406,7 +400,7 @@ lobject_seek(lobjectObject *self, Py_ssize_t pos, int whence)
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
if (where < 0) if (where < 0)
pq_complete_error(self->conn, &pgres, &error); pq_complete_error(self->conn, &error);
return where; return where;
} }
@ -415,7 +409,6 @@ lobject_seek(lobjectObject *self, Py_ssize_t pos, int whence)
RAISES_NEG Py_ssize_t RAISES_NEG Py_ssize_t
lobject_tell(lobjectObject *self) lobject_tell(lobjectObject *self)
{ {
PGresult *pgres = NULL;
char *error = NULL; char *error = NULL;
Py_ssize_t where; Py_ssize_t where;
@ -441,7 +434,7 @@ lobject_tell(lobjectObject *self)
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
if (where < 0) if (where < 0)
pq_complete_error(self->conn, &pgres, &error); pq_complete_error(self->conn, &error);
return where; return where;
} }
@ -450,14 +443,13 @@ lobject_tell(lobjectObject *self)
RAISES_NEG int RAISES_NEG int
lobject_export(lobjectObject *self, const char *filename) lobject_export(lobjectObject *self, const char *filename)
{ {
PGresult *pgres = NULL;
char *error = NULL; char *error = NULL;
int retvalue; int retvalue;
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&(self->conn->lock)); pthread_mutex_lock(&(self->conn->lock));
retvalue = pq_begin_locked(self->conn, &pgres, &error, &_save); retvalue = pq_begin_locked(self->conn, &error, &_save);
if (retvalue < 0) if (retvalue < 0)
goto end; goto end;
@ -470,7 +462,7 @@ lobject_export(lobjectObject *self, const char *filename)
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
if (retvalue < 0) if (retvalue < 0)
pq_complete_error(self->conn, &pgres, &error); pq_complete_error(self->conn, &error);
return retvalue; return retvalue;
} }
@ -478,7 +470,6 @@ RAISES_NEG int
lobject_truncate(lobjectObject *self, size_t len) lobject_truncate(lobjectObject *self, size_t len)
{ {
int retvalue; int retvalue;
PGresult *pgres = NULL;
char *error = NULL; char *error = NULL;
Dprintf("lobject_truncate: fd = %d, len = " FORMAT_CODE_SIZE_T, Dprintf("lobject_truncate: fd = %d, len = " FORMAT_CODE_SIZE_T,
@ -504,7 +495,7 @@ lobject_truncate(lobjectObject *self, size_t len)
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
if (retvalue < 0) if (retvalue < 0)
pq_complete_error(self->conn, &pgres, &error); pq_complete_error(self->conn, &error);
return retvalue; return retvalue;
} }

View File

@ -305,7 +305,7 @@ pq_set_non_blocking(connectionObject *conn, int arg)
This function should only be called on a locked connection without This function should only be called on a locked connection without
holding the global interpreter lock. holding the global interpreter lock.
On error, -1 is returned, and the pgres argument will hold the On error, -1 is returned, and the conn->pgres will hold the
relevant result structure. relevant result structure.
The tstate parameter should be the pointer of the _save variable created by The tstate parameter should be the pointer of the _save variable created by
@ -314,23 +314,22 @@ pq_set_non_blocking(connectionObject *conn, int arg)
*/ */
int int
pq_execute_command_locked(connectionObject *conn, const char *query, pq_execute_command_locked(connectionObject *conn, const char *query,
PGresult **pgres, char **error, char **error, PyThreadState **tstate)
PyThreadState **tstate)
{ {
int pgstatus, retvalue = -1; int pgstatus, retvalue = -1;
Dprintf("pq_execute_command_locked: pgconn = %p, query = %s", Dprintf("pq_execute_command_locked: pgconn = %p, query = %s",
conn->pgconn, query); conn->pgconn, query);
*error = NULL; *error = NULL;
CLEARPGRES(conn->pgres);
if (!psyco_green()) { if (!psyco_green()) {
*pgres = PQexec(conn->pgconn, query); conn->pgres = PQexec(conn->pgconn, query);
} else { } else {
PyEval_RestoreThread(*tstate); PyEval_RestoreThread(*tstate);
*pgres = psyco_exec_green(conn, query); conn->pgres = psyco_exec_green(conn, query);
*tstate = PyEval_SaveThread(); *tstate = PyEval_SaveThread();
} }
if (*pgres == NULL) { if (conn->pgres == NULL) {
Dprintf("pq_execute_command_locked: PQexec returned NULL"); Dprintf("pq_execute_command_locked: PQexec returned NULL");
PyEval_RestoreThread(*tstate); PyEval_RestoreThread(*tstate);
if (!PyErr_Occurred()) { if (!PyErr_Occurred()) {
@ -342,7 +341,7 @@ pq_execute_command_locked(connectionObject *conn, const char *query,
goto cleanup; goto cleanup;
} }
pgstatus = PQresultStatus(*pgres); pgstatus = PQresultStatus(conn->pgres);
if (pgstatus != PGRES_COMMAND_OK ) { if (pgstatus != PGRES_COMMAND_OK ) {
Dprintf("pq_execute_command_locked: result was not COMMAND_OK (%d)", Dprintf("pq_execute_command_locked: result was not COMMAND_OK (%d)",
pgstatus); pgstatus);
@ -350,7 +349,7 @@ pq_execute_command_locked(connectionObject *conn, const char *query,
} }
retvalue = 0; retvalue = 0;
CLEARPGRES(*pgres); CLEARPGRES(conn->pgres);
cleanup: cleanup:
return retvalue; return retvalue;
@ -365,13 +364,13 @@ cleanup:
lock. lock.
*/ */
RAISES void RAISES void
pq_complete_error(connectionObject *conn, PGresult **pgres, char **error) pq_complete_error(connectionObject *conn, char **error)
{ {
Dprintf("pq_complete_error: pgconn = %p, pgres = %p, error = %s", Dprintf("pq_complete_error: pgconn = %p, pgres = %p, error = %s",
conn->pgconn, *pgres, *error ? *error : "(null)"); conn->pgconn, conn->pgres, *error ? *error : "(null)");
if (*pgres != NULL) { if (conn->pgres) {
pq_raise(conn, NULL, pgres); pq_raise(conn, NULL, &conn->pgres);
/* now *pgres is null */ /* now conn->pgres is null */
} }
else { else {
if (*error != NULL) { if (*error != NULL) {
@ -405,11 +404,11 @@ pq_complete_error(connectionObject *conn, PGresult **pgres, char **error)
This function should only be called on a locked connection without This function should only be called on a locked connection without
holding the global interpreter lock. holding the global interpreter lock.
On error, -1 is returned, and the pgres argument will hold the On error, -1 is returned, and the conn->pgres argument will hold the
relevant result structure. relevant result structure.
*/ */
int int
pq_begin_locked(connectionObject *conn, PGresult **pgres, char **error, pq_begin_locked(connectionObject *conn, char **error,
PyThreadState **tstate) PyThreadState **tstate)
{ {
const size_t bufsize = 256; const size_t bufsize = 256;
@ -441,7 +440,7 @@ pq_begin_locked(connectionObject *conn, PGresult **pgres, char **error,
srv_deferrable[conn->deferrable]); srv_deferrable[conn->deferrable]);
} }
result = pq_execute_command_locked(conn, buf, pgres, error, tstate); result = pq_execute_command_locked(conn, buf, error, tstate);
if (result == 0) if (result == 0)
conn->status = CONN_STATUS_BEGIN; conn->status = CONN_STATUS_BEGIN;
@ -458,7 +457,6 @@ int
pq_commit(connectionObject *conn) pq_commit(connectionObject *conn)
{ {
int retvalue = -1; int retvalue = -1;
PGresult *pgres = NULL;
char *error = NULL; char *error = NULL;
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
@ -473,7 +471,7 @@ pq_commit(connectionObject *conn)
} }
else { else {
conn->mark += 1; conn->mark += 1;
retvalue = pq_execute_command_locked(conn, "COMMIT", &pgres, &error, &_save); retvalue = pq_execute_command_locked(conn, "COMMIT", &error, &_save);
} }
Py_BLOCK_THREADS; Py_BLOCK_THREADS;
@ -488,13 +486,13 @@ pq_commit(connectionObject *conn)
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
if (retvalue < 0) if (retvalue < 0)
pq_complete_error(conn, &pgres, &error); pq_complete_error(conn, &error);
return retvalue; return retvalue;
} }
RAISES_NEG int RAISES_NEG int
pq_abort_locked(connectionObject *conn, PGresult **pgres, char **error, pq_abort_locked(connectionObject *conn, char **error,
PyThreadState **tstate) PyThreadState **tstate)
{ {
int retvalue = -1; int retvalue = -1;
@ -508,7 +506,7 @@ pq_abort_locked(connectionObject *conn, PGresult **pgres, char **error,
} }
conn->mark += 1; conn->mark += 1;
retvalue = pq_execute_command_locked(conn, "ROLLBACK", pgres, error, tstate); retvalue = pq_execute_command_locked(conn, "ROLLBACK", error, tstate);
if (retvalue == 0) if (retvalue == 0)
conn->status = CONN_STATUS_READY; conn->status = CONN_STATUS_READY;
@ -524,7 +522,6 @@ RAISES_NEG int
pq_abort(connectionObject *conn) pq_abort(connectionObject *conn)
{ {
int retvalue = -1; int retvalue = -1;
PGresult *pgres = NULL;
char *error = NULL; char *error = NULL;
Dprintf("pq_abort: pgconn = %p, autocommit = %d, status = %d", Dprintf("pq_abort: pgconn = %p, autocommit = %d, status = %d",
@ -533,7 +530,7 @@ pq_abort(connectionObject *conn)
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&conn->lock); pthread_mutex_lock(&conn->lock);
retvalue = pq_abort_locked(conn, &pgres, &error, &_save); retvalue = pq_abort_locked(conn, &error, &_save);
Py_BLOCK_THREADS; Py_BLOCK_THREADS;
conn_notice_process(conn); conn_notice_process(conn);
@ -543,7 +540,7 @@ pq_abort(connectionObject *conn)
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
if (retvalue < 0) if (retvalue < 0)
pq_complete_error(conn, &pgres, &error); pq_complete_error(conn, &error);
return retvalue; return retvalue;
} }
@ -558,8 +555,7 @@ pq_abort(connectionObject *conn)
*/ */
RAISES_NEG int RAISES_NEG int
pq_reset_locked(connectionObject *conn, PGresult **pgres, char **error, pq_reset_locked(connectionObject *conn, char **error, PyThreadState **tstate)
PyThreadState **tstate)
{ {
int retvalue = -1; int retvalue = -1;
@ -569,20 +565,20 @@ pq_reset_locked(connectionObject *conn, PGresult **pgres, char **error,
conn->mark += 1; conn->mark += 1;
if (!conn->autocommit && conn->status == CONN_STATUS_BEGIN) { if (!conn->autocommit && conn->status == CONN_STATUS_BEGIN) {
retvalue = pq_execute_command_locked(conn, "ABORT", pgres, error, tstate); retvalue = pq_execute_command_locked(conn, "ABORT", error, tstate);
if (retvalue != 0) return retvalue; if (retvalue != 0) return retvalue;
} }
if (conn->server_version >= 80300) { if (conn->server_version >= 80300) {
retvalue = pq_execute_command_locked(conn, "DISCARD ALL", pgres, error, tstate); retvalue = pq_execute_command_locked(conn, "DISCARD ALL", error, tstate);
if (retvalue != 0) return retvalue; if (retvalue != 0) return retvalue;
} }
else { else {
retvalue = pq_execute_command_locked(conn, "RESET ALL", pgres, error, tstate); retvalue = pq_execute_command_locked(conn, "RESET ALL", error, tstate);
if (retvalue != 0) return retvalue; if (retvalue != 0) return retvalue;
retvalue = pq_execute_command_locked(conn, retvalue = pq_execute_command_locked(conn,
"SET SESSION AUTHORIZATION DEFAULT", pgres, error, tstate); "SET SESSION AUTHORIZATION DEFAULT", error, tstate);
if (retvalue != 0) return retvalue; if (retvalue != 0) return retvalue;
} }
@ -596,7 +592,6 @@ int
pq_reset(connectionObject *conn) pq_reset(connectionObject *conn)
{ {
int retvalue = -1; int retvalue = -1;
PGresult *pgres = NULL;
char *error = NULL; char *error = NULL;
Dprintf("pq_reset: pgconn = %p, autocommit = %d, status = %d", Dprintf("pq_reset: pgconn = %p, autocommit = %d, status = %d",
@ -605,7 +600,7 @@ pq_reset(connectionObject *conn)
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&conn->lock); pthread_mutex_lock(&conn->lock);
retvalue = pq_reset_locked(conn, &pgres, &error, &_save); retvalue = pq_reset_locked(conn, &error, &_save);
Py_BLOCK_THREADS; Py_BLOCK_THREADS;
conn_notice_process(conn); conn_notice_process(conn);
@ -615,7 +610,7 @@ pq_reset(connectionObject *conn)
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
if (retvalue < 0) { if (retvalue < 0) {
pq_complete_error(conn, &pgres, &error); pq_complete_error(conn, &error);
} }
else { else {
Py_CLEAR(conn->tpc_xid); Py_CLEAR(conn->tpc_xid);
@ -635,7 +630,7 @@ pq_reset(connectionObject *conn)
char * char *
pq_get_guc_locked( pq_get_guc_locked(
connectionObject *conn, const char *param, connectionObject *conn, const char *param,
PGresult **pgres, char **error, PyThreadState **tstate) char **error, PyThreadState **tstate)
{ {
char query[256]; char query[256];
int size; int size;
@ -652,15 +647,17 @@ pq_get_guc_locked(
Dprintf("pq_get_guc_locked: pgconn = %p, query = %s", conn->pgconn, query); Dprintf("pq_get_guc_locked: pgconn = %p, query = %s", conn->pgconn, query);
*error = NULL; *error = NULL;
CLEARPGRES(conn->pgres);
if (!psyco_green()) { if (!psyco_green()) {
*pgres = PQexec(conn->pgconn, query); conn->pgres = PQexec(conn->pgconn, query);
} else { } else {
PyEval_RestoreThread(*tstate); PyEval_RestoreThread(*tstate);
*pgres = psyco_exec_green(conn, query); conn->pgres = psyco_exec_green(conn, query);
*tstate = PyEval_SaveThread(); *tstate = PyEval_SaveThread();
} }
if (*pgres == NULL) { if (!conn->pgres) {
Dprintf("pq_get_guc_locked: PQexec returned NULL"); Dprintf("pq_get_guc_locked: PQexec returned NULL");
PyEval_RestoreThread(*tstate); PyEval_RestoreThread(*tstate);
if (!PyErr_Occurred()) { if (!PyErr_Occurred()) {
@ -671,14 +668,14 @@ pq_get_guc_locked(
*tstate = PyEval_SaveThread(); *tstate = PyEval_SaveThread();
goto cleanup; goto cleanup;
} }
if (PQresultStatus(*pgres) != PGRES_TUPLES_OK) { if (PQresultStatus(conn->pgres) != PGRES_TUPLES_OK) {
Dprintf("pq_get_guc_locked: result was not TUPLES_OK (%d)", Dprintf("pq_get_guc_locked: result was not TUPLES_OK (%s)",
PQresultStatus(*pgres)); PQresStatus(PQresultStatus(conn->pgres)));
goto cleanup; goto cleanup;
} }
rv = strdup(PQgetvalue(*pgres, 0, 0)); rv = strdup(PQgetvalue(conn->pgres, 0, 0));
CLEARPGRES(*pgres); CLEARPGRES(conn->pgres);
cleanup: cleanup:
return rv; return rv;
@ -693,7 +690,7 @@ cleanup:
int int
pq_set_guc_locked( pq_set_guc_locked(
connectionObject *conn, const char *param, const char *value, connectionObject *conn, const char *param, const char *value,
PGresult **pgres, char **error, PyThreadState **tstate) char **error, PyThreadState **tstate)
{ {
char query[256]; char query[256];
int size; int size;
@ -714,7 +711,7 @@ pq_set_guc_locked(
goto exit; goto exit;
} }
rv = pq_execute_command_locked(conn, query, pgres, error, tstate); rv = pq_execute_command_locked(conn, query, error, tstate);
exit: exit:
return rv; return rv;
@ -727,7 +724,7 @@ exit:
int int
pq_tpc_command_locked(connectionObject *conn, const char *cmd, const char *tid, pq_tpc_command_locked(connectionObject *conn, const char *cmd, const char *tid,
PGresult **pgres, char **error, PyThreadState **tstate) char **error, PyThreadState **tstate)
{ {
int rv = -1; int rv = -1;
char *etid = NULL, *buf = NULL; char *etid = NULL, *buf = NULL;
@ -754,7 +751,7 @@ pq_tpc_command_locked(connectionObject *conn, const char *cmd, const char *tid,
/* run the command and let it handle the error cases */ /* run the command and let it handle the error cases */
*tstate = PyEval_SaveThread(); *tstate = PyEval_SaveThread();
rv = pq_execute_command_locked(conn, buf, pgres, error, tstate); rv = pq_execute_command_locked(conn, buf, error, tstate);
PyEval_RestoreThread(*tstate); PyEval_RestoreThread(*tstate);
exit: exit:
@ -769,7 +766,7 @@ exit:
/* pq_get_result_async - read an available result without blocking. /* pq_get_result_async - read an available result without blocking.
* *
* Return 0 if the result is ready, 1 if it will block, -1 on error. * Return 0 if the result is ready, 1 if it will block, -1 on error.
* The last result will be returned in pgres. * The last result will be returned in conn->pgres.
* *
* The function should be called with the lock and holding the GIL. * The function should be called with the lock and holding the GIL.
*/ */
@ -891,42 +888,43 @@ pq_flush(connectionObject *conn)
RAISES_NEG int RAISES_NEG int
_pq_execute_sync(cursorObject *curs, const char *query, int no_result, int no_begin) _pq_execute_sync(cursorObject *curs, const char *query, int no_result, int no_begin)
{ {
PGresult *pgres = NULL;
char *error = NULL; char *error = NULL;
connectionObject *conn = curs->conn;
CLEARPGRES(curs->pgres); CLEARPGRES(curs->pgres);
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&(curs->conn->lock)); pthread_mutex_lock(&(conn->lock));
if (!no_begin && pq_begin_locked(curs->conn, &pgres, &error, &_save) < 0) { if (!no_begin && pq_begin_locked(conn, &error, &_save) < 0) {
pthread_mutex_unlock(&(curs->conn->lock)); pthread_mutex_unlock(&(conn->lock));
Py_BLOCK_THREADS; Py_BLOCK_THREADS;
pq_complete_error(curs->conn, &pgres, &error); pq_complete_error(conn, &error);
return -1; return -1;
} }
Dprintf("pq_execute: executing SYNC query: pgconn = %p", curs->conn->pgconn); CLEARPGRES(conn->pgres);
Dprintf("pq_execute: executing SYNC query: pgconn = %p", conn->pgconn);
Dprintf(" %-.200s", query); Dprintf(" %-.200s", query);
if (!psyco_green()) { if (!psyco_green()) {
pgres = PQexec(curs->conn->pgconn, query); conn->pgres = PQexec(conn->pgconn, query);
} }
else { else {
Py_BLOCK_THREADS; Py_BLOCK_THREADS;
pgres = psyco_exec_green(curs->conn, query); conn->pgres = psyco_exec_green(conn, query);
Py_UNBLOCK_THREADS; Py_UNBLOCK_THREADS;
} }
/* don't let pgres = NULL go to pq_fetch() */ /* don't let pgres = NULL go to pq_fetch() */
if (pgres == NULL) { if (!conn->pgres) {
if (CONNECTION_BAD == PQstatus(curs->conn->pgconn)) { if (CONNECTION_BAD == PQstatus(conn->pgconn)) {
curs->conn->closed = 2; conn->closed = 2;
} }
pthread_mutex_unlock(&(curs->conn->lock)); pthread_mutex_unlock(&(conn->lock));
Py_BLOCK_THREADS; Py_BLOCK_THREADS;
if (!PyErr_Occurred()) { if (!PyErr_Occurred()) {
PyErr_SetString(OperationalError, PyErr_SetString(OperationalError,
PQerrorMessage(curs->conn->pgconn)); PQerrorMessage(conn->pgconn));
} }
return -1; return -1;
} }
@ -934,18 +932,18 @@ _pq_execute_sync(cursorObject *curs, const char *query, int no_result, int no_be
Py_BLOCK_THREADS; Py_BLOCK_THREADS;
/* assign the result back to the cursor now that we have the GIL */ /* assign the result back to the cursor now that we have the GIL */
curs->pgres = pgres; curs->pgres = conn->pgres;
pgres = NULL; conn->pgres = NULL;
/* Process notifies here instead of when fetching the tuple as we are /* Process notifies here instead of when fetching the tuple as we are
* into the same critical section that received the data. Without this * into the same critical section that received the data. Without this
* care, reading notifies may disrupt other thread communications. * care, reading notifies may disrupt other thread communications.
* (as in ticket #55). */ * (as in ticket #55). */
conn_notifies_process(curs->conn); conn_notifies_process(conn);
conn_notice_process(curs->conn); conn_notice_process(conn);
Py_UNBLOCK_THREADS; Py_UNBLOCK_THREADS;
pthread_mutex_unlock(&(curs->conn->lock)); pthread_mutex_unlock(&(conn->lock));
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
/* if the execute was sync, we call pq_fetch() immediately, /* if the execute was sync, we call pq_fetch() immediately,
@ -960,29 +958,30 @@ RAISES_NEG int
_pq_execute_async(cursorObject *curs, const char *query, int no_result) _pq_execute_async(cursorObject *curs, const char *query, int no_result)
{ {
int async_status = ASYNC_WRITE; int async_status = ASYNC_WRITE;
connectionObject *conn = curs->conn;
int ret; int ret;
CLEARPGRES(curs->pgres); CLEARPGRES(curs->pgres);
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&(curs->conn->lock)); pthread_mutex_lock(&(conn->lock));
Dprintf("pq_execute: executing ASYNC query: pgconn = %p", curs->conn->pgconn); Dprintf("pq_execute: executing ASYNC query: pgconn = %p", conn->pgconn);
Dprintf(" %-.200s", query); Dprintf(" %-.200s", query);
if (PQsendQuery(curs->conn->pgconn, query) == 0) { if (PQsendQuery(conn->pgconn, query) == 0) {
if (CONNECTION_BAD == PQstatus(curs->conn->pgconn)) { if (CONNECTION_BAD == PQstatus(conn->pgconn)) {
curs->conn->closed = 2; conn->closed = 2;
} }
pthread_mutex_unlock(&(curs->conn->lock)); pthread_mutex_unlock(&(conn->lock));
Py_BLOCK_THREADS; Py_BLOCK_THREADS;
PyErr_SetString(OperationalError, PyErr_SetString(OperationalError,
PQerrorMessage(curs->conn->pgconn)); PQerrorMessage(conn->pgconn));
return -1; return -1;
} }
Dprintf("pq_execute: async query sent to backend"); Dprintf("pq_execute: async query sent to backend");
ret = PQflush(curs->conn->pgconn); ret = PQflush(conn->pgconn);
if (ret == 0) { if (ret == 0) {
/* the query got fully sent to the server */ /* the query got fully sent to the server */
Dprintf("pq_execute: query got flushed immediately"); Dprintf("pq_execute: query got flushed immediately");
@ -995,18 +994,18 @@ _pq_execute_async(cursorObject *curs, const char *query, int no_result)
} }
else { else {
/* there was an error */ /* there was an error */
pthread_mutex_unlock(&(curs->conn->lock)); pthread_mutex_unlock(&(conn->lock));
Py_BLOCK_THREADS; Py_BLOCK_THREADS;
PyErr_SetString(OperationalError, PyErr_SetString(OperationalError,
PQerrorMessage(curs->conn->pgconn)); PQerrorMessage(conn->pgconn));
return -1; return -1;
} }
pthread_mutex_unlock(&(curs->conn->lock)); pthread_mutex_unlock(&(conn->lock));
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
curs->conn->async_status = async_status; conn->async_status = async_status;
if (!(curs->conn->async_cursor if (!(conn->async_cursor
= PyWeakref_NewRef((PyObject *)curs, NULL))) { = PyWeakref_NewRef((PyObject *)curs, NULL))) {
return -1; return -1;
} }

View File

@ -39,25 +39,23 @@ RAISES_NEG HIDDEN int pq_fetch(cursorObject *curs, int no_result);
RAISES_NEG HIDDEN int pq_execute(cursorObject *curs, const char *query, RAISES_NEG HIDDEN int pq_execute(cursorObject *curs, const char *query,
int async, int no_result, int no_begin); int async, int no_result, int no_begin);
HIDDEN int pq_send_query(connectionObject *conn, const char *query); HIDDEN int pq_send_query(connectionObject *conn, const char *query);
HIDDEN int pq_begin_locked(connectionObject *conn, PGresult **pgres, HIDDEN int pq_begin_locked(connectionObject *conn,
char **error, PyThreadState **tstate); char **error, PyThreadState **tstate);
HIDDEN int pq_commit(connectionObject *conn); HIDDEN int pq_commit(connectionObject *conn);
RAISES_NEG HIDDEN int pq_abort_locked(connectionObject *conn, PGresult **pgres, RAISES_NEG HIDDEN int pq_abort_locked(connectionObject *conn,
char **error, PyThreadState **tstate); char **error, PyThreadState **tstate);
RAISES_NEG HIDDEN int pq_abort(connectionObject *conn); RAISES_NEG HIDDEN int pq_abort(connectionObject *conn);
HIDDEN int pq_reset_locked(connectionObject *conn, PGresult **pgres, HIDDEN int pq_reset_locked(connectionObject *conn,
char **error, PyThreadState **tstate); char **error, PyThreadState **tstate);
RAISES_NEG HIDDEN int pq_reset(connectionObject *conn); RAISES_NEG HIDDEN int pq_reset(connectionObject *conn);
HIDDEN char *pq_get_guc_locked(connectionObject *conn, const char *param, HIDDEN char *pq_get_guc_locked(connectionObject *conn, const char *param,
PGresult **pgres,
char **error, PyThreadState **tstate); char **error, PyThreadState **tstate);
HIDDEN int pq_set_guc_locked(connectionObject *conn, const char *param, HIDDEN int pq_set_guc_locked(connectionObject *conn, const char *param,
const char *value, PGresult **pgres, const char *value,
char **error, PyThreadState **tstate); char **error, PyThreadState **tstate);
HIDDEN int pq_tpc_command_locked(connectionObject *conn, HIDDEN int pq_tpc_command_locked(connectionObject *conn,
const char *cmd, const char *tid, const char *cmd, const char *tid,
PGresult **pgres, char **error, char **error, PyThreadState **tstate);
PyThreadState **tstate);
RAISES_NEG HIDDEN int pq_get_result_async(connectionObject *conn); RAISES_NEG HIDDEN int pq_get_result_async(connectionObject *conn);
HIDDEN int pq_flush(connectionObject *conn); HIDDEN int pq_flush(connectionObject *conn);
HIDDEN void pq_clear_async(connectionObject *conn); HIDDEN void pq_clear_async(connectionObject *conn);
@ -67,10 +65,9 @@ HIDDEN void pq_set_critical(connectionObject *conn, const char *msg);
HIDDEN int pq_execute_command_locked(connectionObject *conn, HIDDEN int pq_execute_command_locked(connectionObject *conn,
const char *query, const char *query,
PGresult **pgres, char **error, char **error,
PyThreadState **tstate); PyThreadState **tstate);
RAISES HIDDEN void pq_complete_error(connectionObject *conn, PGresult **pgres, RAISES HIDDEN void pq_complete_error(connectionObject *conn, char **error);
char **error);
/* replication protocol support */ /* replication protocol support */
HIDDEN int pq_copy_both(replicationCursorObject *repl, PyObject *consumer, HIDDEN int pq_copy_both(replicationCursorObject *repl, PyObject *consumer,