Ensure running COPY in blocking mode.

This commit is contained in:
Daniele Varrazzo 2010-04-07 23:39:07 +01:00
parent b114e25c31
commit aeec583ff1

View File

@ -944,18 +944,25 @@ _pq_copy_in_v3(cursorObject *curs)
/* COPY FROM implementation when protocol 3 is available: this function /* COPY FROM implementation when protocol 3 is available: this function
uses the new PQputCopyData() and can detect errors and set the correct uses the new PQputCopyData() and can detect errors and set the correct
exception */ exception */
PyObject *o, *func, *size; PyObject *o, *func = NULL, *size = NULL;
Py_ssize_t length = 0; Py_ssize_t length = 0;
int res, error = 0; int res, error = 0;
if (!(func = PyObject_GetAttrString(curs->copyfile, "read"))) { if (!(func = PyObject_GetAttrString(curs->copyfile, "read"))) {
Dprintf("_pq_copy_in_v3: can't get o.read"); Dprintf("_pq_copy_in_v3: can't get o.read");
return -1; error = 1;
goto exit;
} }
if (!(size = PyInt_FromSsize_t(curs->copysize))) { if (!(size = PyInt_FromSsize_t(curs->copysize))) {
Dprintf("_pq_copy_in_v3: can't get int from copysize"); Dprintf("_pq_copy_in_v3: can't get int from copysize");
Py_DECREF(func); error = 1;
return -1; goto exit;
}
/* Put the connection in blocking mode */
if (0 != pq_set_non_blocking(curs->conn, 0, 1)) {
error = 1;
goto exit;
} }
while (1) { while (1) {
@ -989,8 +996,6 @@ _pq_copy_in_v3(cursorObject *curs)
} }
Py_XDECREF(o); Py_XDECREF(o);
Py_DECREF(func);
Py_DECREF(size);
Dprintf("_pq_copy_in_v3: error = %d", error); Dprintf("_pq_copy_in_v3: error = %d", error);
@ -1025,35 +1030,46 @@ _pq_copy_in_v3(cursorObject *curs)
} }
} }
return error == 0 ? 1 : -1; /* clear: */
/* Ignoring error value: if this failed, we have worse problems. */
pq_set_non_blocking(curs->conn, 1, 0);
exit:
Py_XDECREF(func);
Py_XDECREF(size);
return (error == 0 ? 1 : -1);
} }
#endif #endif
static int static int
_pq_copy_in(cursorObject *curs) _pq_copy_in(cursorObject *curs)
{ {
/* COPY FROM implementation when protocol 3 is not available: this /* COPY FROM implementation when protocol 3 is not available: this
function can't fail but the backend will send an ERROR notice that will function can't fail but the backend will send an ERROR notice that will
be catched by our notice collector */ be catched by our notice collector */
PyObject *o, *func; PyObject *o, *func = NULL;
int ret = -1;
if (!(func = PyObject_GetAttrString(curs->copyfile, "readline"))) { if (!(func = PyObject_GetAttrString(curs->copyfile, "readline"))) {
Dprintf("_pq_copy_in: can't get o.readline"); Dprintf("_pq_copy_in: can't get o.readline");
return -1; goto exit;
}
/* Put the connection in blocking mode */
if (0 != pq_set_non_blocking(curs->conn, 0, 1)) {
goto exit;
} }
while (1) { while (1) {
int rv;
o = PyObject_CallFunction(func, NULL); o = PyObject_CallFunction(func, NULL);
if (o == NULL) return -1; if (o == NULL) goto clear;
if (o == Py_None || PyString_GET_SIZE(o) == 0) break; if (o == Py_None || PyString_GET_SIZE(o) == 0) break;
if (PQputline(curs->conn->pgconn, PyString_AS_STRING(o)) != 0) { rv = PQputline(curs->conn->pgconn, PyString_AS_STRING(o));
Py_DECREF(o);
Py_DECREF(func);
return -1;
}
Py_DECREF(o); Py_DECREF(o);
if (0 != rv) goto clear;
} }
Py_XDECREF(o); Py_XDECREF(o);
Py_DECREF(func);
PQputline(curs->conn->pgconn, "\\.\n"); PQputline(curs->conn->pgconn, "\\.\n");
PQendcopy(curs->conn->pgconn); PQendcopy(curs->conn->pgconn);
@ -1066,7 +1082,15 @@ _pq_copy_in(cursorObject *curs)
IFCLEARPGRES(curs->pgres); IFCLEARPGRES(curs->pgres);
} }
return 1; ret = 1;
clear:
/* Ignoring error value: if this failed, we have worse problems. */
pq_set_non_blocking(curs->conn, 1, 0);
exit:
Py_XDECREF(func);
return ret;
} }
#ifdef HAVE_PQPROTOCOL3 #ifdef HAVE_PQPROTOCOL3
@ -1074,13 +1098,19 @@ static int
_pq_copy_out_v3(cursorObject *curs) _pq_copy_out_v3(cursorObject *curs)
{ {
PyObject *tmp = NULL, *func; PyObject *tmp = NULL, *func;
int ret = -1;
char *buffer; char *buffer;
Py_ssize_t len; Py_ssize_t len;
if (!(func = PyObject_GetAttrString(curs->copyfile, "write"))) { if (!(func = PyObject_GetAttrString(curs->copyfile, "write"))) {
Dprintf("_pq_copy_out_v3: can't get o.write"); Dprintf("_pq_copy_out_v3: can't get o.write");
return -1; goto exit;
}
/* Put the connection in blocking mode */
if (0 != pq_set_non_blocking(curs->conn, 0, 1)) {
goto exit;
} }
while (1) { while (1) {
@ -1092,8 +1122,7 @@ _pq_copy_out_v3(cursorObject *curs)
tmp = PyObject_CallFunction(func, "s#", buffer, len); tmp = PyObject_CallFunction(func, "s#", buffer, len);
PQfreemem(buffer); PQfreemem(buffer);
if (tmp == NULL) { if (tmp == NULL) {
Py_DECREF(func); goto clear;
return -1;
} else { } else {
Py_DECREF(tmp); Py_DECREF(tmp);
} }
@ -1103,11 +1132,10 @@ _pq_copy_out_v3(cursorObject *curs)
postgresql authors :/) */ postgresql authors :/) */
else if (len <= 0) break; else if (len <= 0) break;
} }
Py_DECREF(func);
if (len == -2) { if (len == -2) {
pq_raise(curs->conn, curs, NULL); pq_raise(curs->conn, curs, NULL);
return -1; goto clear;
} }
/* and finally we grab the operation result from the backend */ /* and finally we grab the operation result from the backend */
@ -1117,7 +1145,15 @@ _pq_copy_out_v3(cursorObject *curs)
pq_raise(curs->conn, curs, NULL); pq_raise(curs->conn, curs, NULL);
IFCLEARPGRES(curs->pgres); IFCLEARPGRES(curs->pgres);
} }
return 1; ret = 1;
clear:
/* Ignoring error value: if this failed, we have worse problems. */
pq_set_non_blocking(curs->conn, 1, 0);
exit:
Py_XDECREF(func);
return ret;
} }
#endif #endif
@ -1127,12 +1163,17 @@ _pq_copy_out(cursorObject *curs)
PyObject *tmp = NULL, *func; PyObject *tmp = NULL, *func;
char buffer[4096]; char buffer[4096];
int status, ll=0; int status = -1, ll = 0;
Py_ssize_t len; Py_ssize_t len;
if (!(func = PyObject_GetAttrString(curs->copyfile, "write"))) { if (!(func = PyObject_GetAttrString(curs->copyfile, "write"))) {
Dprintf("_pq_copy_out: can't get o.write"); Dprintf("_pq_copy_out: can't get o.write");
return -1; goto exit;
}
/* Put the connection in blocking mode */
if (0 != pq_set_non_blocking(curs->conn, 0, 1)) {
goto exit;
} }
while (1) { while (1) {
@ -1151,21 +1192,17 @@ _pq_copy_out(cursorObject *curs)
ll = 1; ll = 1;
} }
else { else {
Py_DECREF(func); goto clear;
return -1;
} }
tmp = PyObject_CallFunction(func, "s#", buffer, len); tmp = PyObject_CallFunction(func, "s#", buffer, len);
if (tmp == NULL) { if (tmp == NULL) {
Py_DECREF(func); goto clear;
return -1;
} else { } else {
Py_DECREF(tmp); Py_DECREF(tmp);
} }
} }
Py_DECREF(func);
status = 1; status = 1;
if (PQendcopy(curs->conn->pgconn) != 0) if (PQendcopy(curs->conn->pgconn) != 0)
status = -1; status = -1;
@ -1179,6 +1216,12 @@ _pq_copy_out(cursorObject *curs)
IFCLEARPGRES(curs->pgres); IFCLEARPGRES(curs->pgres);
} }
clear:
/* Ignoring error value: if this failed, we have worse problems. */
pq_set_non_blocking(curs->conn, 1, 0);
exit:
Py_XDECREF(func);
return status; return status;
} }