Keep the connection in blocking mode unless it is not async.

Dropped set/unset nonblocking mode for copy and lobject operations:
lobjects don't work in nonblocking mode so they will hardly be supported
in green/async branches.  Support for copy is still feasible, but it
will be done in other code paths (called by poll).
This commit is contained in:
Daniele Varrazzo 2010-05-09 20:37:48 +01:00
parent e29424a230
commit 73db6bee01
3 changed files with 12 additions and 49 deletions

View File

@ -265,11 +265,13 @@ conn_setup(connectionObject *self, PGconn *pgconn)
pthread_mutex_lock(&self->lock); pthread_mutex_lock(&self->lock);
Py_BLOCK_THREADS; Py_BLOCK_THREADS;
if (pq_set_non_blocking(self, 1, 1) != 0) { int green = psyco_green();
if (green && (pq_set_non_blocking(self, 1, 1) != 0)) {
return -1; return -1;
} }
if (!psyco_green()) { if (!green) {
Py_UNBLOCK_THREADS; Py_UNBLOCK_THREADS;
pgres = PQexec(pgconn, psyco_datestyle); pgres = PQexec(pgconn, psyco_datestyle);
Py_BLOCK_THREADS; Py_BLOCK_THREADS;
@ -287,7 +289,7 @@ conn_setup(connectionObject *self, PGconn *pgconn)
} }
CLEARPGRES(pgres); CLEARPGRES(pgres);
if (!psyco_green()) { if (!green) {
Py_UNBLOCK_THREADS; Py_UNBLOCK_THREADS;
pgres = PQexec(pgconn, psyco_client_encoding); pgres = PQexec(pgconn, psyco_client_encoding);
Py_BLOCK_THREADS; Py_BLOCK_THREADS;
@ -314,7 +316,7 @@ conn_setup(connectionObject *self, PGconn *pgconn)
} }
CLEARPGRES(pgres); CLEARPGRES(pgres);
if (!psyco_green()) { if (!green) {
Py_UNBLOCK_THREADS; Py_UNBLOCK_THREADS;
pgres = PQexec(pgconn, psyco_transaction_isolation); pgres = PQexec(pgconn, psyco_transaction_isolation);
Py_BLOCK_THREADS; Py_BLOCK_THREADS;

View File

@ -211,11 +211,9 @@ lobject_write(lobjectObject *self, const char *buf, size_t len)
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&(self->conn->lock)); pthread_mutex_lock(&(self->conn->lock));
PQsetnonblocking(self->conn->pgconn, 0);
written = lo_write(self->conn->pgconn, self->fd, buf, len); written = lo_write(self->conn->pgconn, self->fd, buf, len);
if (written < 0) if (written < 0)
collect_error(self->conn, &error); collect_error(self->conn, &error);
PQsetnonblocking(self->conn->pgconn, 1);
pthread_mutex_unlock(&(self->conn->lock)); pthread_mutex_unlock(&(self->conn->lock));
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;

View File

@ -1041,12 +1041,6 @@ _pq_copy_in_v3(cursorObject *curs)
goto exit; goto exit;
} }
/* Put the connection in blocking mode */
if (0 != pq_set_non_blocking(curs->conn, 0, 1)) {
error = 1;
goto exit;
}
while (1) { while (1) {
o = PyObject_CallFunctionObjArgs(func, size, NULL); o = PyObject_CallFunctionObjArgs(func, size, NULL);
if (!(o && PyString_Check(o) && (length = PyString_GET_SIZE(o)) != -1)) { if (!(o && PyString_Check(o) && (length = PyString_GET_SIZE(o)) != -1)) {
@ -1112,10 +1106,6 @@ _pq_copy_in_v3(cursorObject *curs)
} }
} }
/* clear: */
/* Ignoring error value: if this failed, we have worse problems. */
pq_set_non_blocking(curs->conn, 1, 0);
exit: exit:
Py_XDECREF(func); Py_XDECREF(func);
Py_XDECREF(size); Py_XDECREF(size);
@ -1137,21 +1127,16 @@ _pq_copy_in(cursorObject *curs)
goto exit; goto exit;
} }
/* Put the connection in blocking mode */
if (0 != pq_set_non_blocking(curs->conn, 0, 1)) {
goto exit;
}
while (1) { while (1) {
int rv; int rv;
o = PyObject_CallFunction(func, NULL); o = PyObject_CallFunction(func, NULL);
if (o == NULL) goto clear; if (o == NULL) goto exit;
if (o == Py_None || PyString_GET_SIZE(o) == 0) break; if (o == Py_None || PyString_GET_SIZE(o) == 0) break;
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
rv = PQputline(curs->conn->pgconn, PyString_AS_STRING(o)); rv = PQputline(curs->conn->pgconn, PyString_AS_STRING(o));
Py_END_ALLOW_THREADS; Py_END_ALLOW_THREADS;
Py_DECREF(o); Py_DECREF(o);
if (0 != rv) goto clear; if (0 != rv) goto exit;
} }
Py_XDECREF(o); Py_XDECREF(o);
@ -1171,10 +1156,6 @@ _pq_copy_in(cursorObject *curs)
ret = 1; ret = 1;
clear:
/* Ignoring error value: if this failed, we have worse problems. */
pq_set_non_blocking(curs->conn, 1, 0);
exit: exit:
Py_XDECREF(func); Py_XDECREF(func);
return ret; return ret;
@ -1195,11 +1176,6 @@ _pq_copy_out_v3(cursorObject *curs)
goto exit; goto exit;
} }
/* Put the connection in blocking mode */
if (0 != pq_set_non_blocking(curs->conn, 0, 1)) {
goto exit;
}
while (1) { while (1) {
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
len = PQgetCopyData(curs->conn->pgconn, &buffer, 0); len = PQgetCopyData(curs->conn->pgconn, &buffer, 0);
@ -1209,7 +1185,7 @@ _pq_copy_out_v3(cursorObject *curs)
tmp = PyObject_CallFunction(func, "s#", buffer, len); tmp = PyObject_CallFunction(func, "s#", buffer, len);
PQfreemem(buffer); PQfreemem(buffer);
if (tmp == NULL) { if (tmp == NULL) {
goto clear; goto exit;
} else { } else {
Py_DECREF(tmp); Py_DECREF(tmp);
} }
@ -1222,7 +1198,7 @@ _pq_copy_out_v3(cursorObject *curs)
if (len == -2) { if (len == -2) {
pq_raise(curs->conn, curs, NULL); pq_raise(curs->conn, curs, NULL);
goto clear; goto exit;
} }
/* and finally we grab the operation result from the backend */ /* and finally we grab the operation result from the backend */
@ -1234,10 +1210,6 @@ _pq_copy_out_v3(cursorObject *curs)
} }
ret = 1; ret = 1;
clear:
/* Ignoring error value: if this failed, we have worse problems. */
pq_set_non_blocking(curs->conn, 1, 0);
exit: exit:
Py_XDECREF(func); Py_XDECREF(func);
return ret; return ret;
@ -1258,11 +1230,6 @@ _pq_copy_out(cursorObject *curs)
goto exit; goto exit;
} }
/* Put the connection in blocking mode */
if (0 != pq_set_non_blocking(curs->conn, 0, 1)) {
goto exit;
}
while (1) { while (1) {
Py_BEGIN_ALLOW_THREADS; Py_BEGIN_ALLOW_THREADS;
status = PQgetline(curs->conn->pgconn, buffer, 4096); status = PQgetline(curs->conn->pgconn, buffer, 4096);
@ -1279,12 +1246,12 @@ _pq_copy_out(cursorObject *curs)
ll = 1; ll = 1;
} }
else { else {
goto clear; goto exit;
} }
tmp = PyObject_CallFunction(func, "s#", buffer, len); tmp = PyObject_CallFunction(func, "s#", buffer, len);
if (tmp == NULL) { if (tmp == NULL) {
goto clear; goto exit;
} else { } else {
Py_DECREF(tmp); Py_DECREF(tmp);
} }
@ -1303,10 +1270,6 @@ _pq_copy_out(cursorObject *curs)
IFCLEARPGRES(curs->pgres); IFCLEARPGRES(curs->pgres);
} }
clear:
/* Ignoring error value: if this failed, we have worse problems. */
pq_set_non_blocking(curs->conn, 1, 0);
exit: exit:
Py_XDECREF(func); Py_XDECREF(func);
return status; return status;