COPY problem tests and partial fix

This commit is contained in:
Federico Di Gregorio 2008-05-27 17:40:19 +02:00
parent 6073193314
commit 5a428642f8
8 changed files with 60830 additions and 28 deletions

View File

@ -1,3 +1,9 @@
2008-05-27 Federico Di Gregorio <fog@initd.org>
* psycopg/pqpath.c: better error checks in _pq_copy_in_v3 to
avoid calling blocking libpq functions when the connection to
the server has been broken
2008-05-19 Federico Di Gregorio <fog@initd.org>
* psycopg/cursor_type.c: fixed memory leak in .executemany(); on

View File

@ -52,7 +52,8 @@ typedef struct {
char *critical; /* critical error on this connection */
char *encoding; /* current backend encoding */
long int closed; /* 2 means connection has been closed */
long int closed; /* 1 means connection has been closed;
2 that something horrible happened */
long int isolation_level; /* isolation level for this connection */
long int mark; /* number of commits/rollbacks done so far */
int status; /* status of the connection */

View File

@ -215,11 +215,12 @@ conn_close(connectionObject *self)
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&self->lock);
if (self->closed == 0)
self->closed = 1;
/* execute a forced rollback on the connection (but don't check the
result, we're going to close the pq connection anyway */
if (self->pgconn) {
if (self->pgconn && self->closed == 1) {
PGresult *pgres = NULL;
char *error = NULL;
@ -228,6 +229,8 @@ conn_close(connectionObject *self)
if (error)
free (error);
}
}
if (self->pgconn) {
PQfinish(self->pgconn);
Dprintf("conn_close: PQfinish called");
self->pgconn = NULL;
@ -235,7 +238,6 @@ conn_close(connectionObject *self)
pthread_mutex_unlock(&self->lock);
Py_END_ALLOW_THREADS;
}
/* conn_commit - commit on a connection */

View File

@ -153,11 +153,17 @@ pq_raise(connectionObject *conn, cursorObject *curs, PGresult *pgres)
const char *err2 = NULL;
const char *code = NULL;
if ((conn == NULL && curs == NULL) || (curs != NULL && conn == NULL)) {
if (conn == NULL) {
PyErr_SetString(Error, "psycopg went psycotic and raised a null error");
return;
}
/* if the connection has somehow beed broken, we mark the connection
object as closed but requiring cleanup */
Dprintf("%d %d", PQtransactionStatus(conn->pgconn), PQstatus(conn->pgconn));
if (conn->pgconn != NULL && PQstatus(conn->pgconn) == CONNECTION_BAD)
conn->closed = 2;
if (pgres == NULL && curs != NULL)
pgres = curs->pgres;
@ -808,23 +814,30 @@ _pq_copy_in_v3(cursorObject *curs)
exception */
PyObject *o;
Py_ssize_t length = 0;
int error = 0;
int res, error = 0;
while (1) {
o = PyObject_CallMethod(curs->copyfile, "read",
CONV_CODE_PY_SSIZE_T, curs->copysize
);
CONV_CODE_PY_SSIZE_T, curs->copysize);
if (!o || !PyString_Check(o) || (length = PyString_Size(o)) == -1) {
error = 1;
}
if (length == 0 || length > INT_MAX || error == 1) break;
Py_BEGIN_ALLOW_THREADS;
if (PQputCopyData(curs->conn->pgconn,
PyString_AS_STRING(o),
/* Py_ssize_t->int cast was validated above: */
(int) length
) == -1) {
res = PQputCopyData(curs->conn->pgconn, PyString_AS_STRING(o),
/* Py_ssize_t->int cast was validated above */
(int) length);
Dprintf("_pq_copy_in_v3: sent %d bytes of data; res = %d",
(int) length, res);
if (res == 0) {
/* FIXME: in theory this should not happen but adding a check
here would be a nice idea */
}
else if (res == -1) {
Dprintf("_pq_copy_in_v3: PQerrorMessage = %s",
PQerrorMessage(curs->conn->pgconn));
error = 2;
}
Py_END_ALLOW_THREADS;
@ -838,21 +851,36 @@ _pq_copy_in_v3(cursorObject *curs)
Dprintf("_pq_copy_in_v3: error = %d", error);
if (error == 0 || error == 2)
/* 0 means that the copy went well, 2 that there was an error on the
backend: in both cases we'll get the error message from the
PQresult */
PQputCopyEnd(curs->conn->pgconn, NULL);
backend: in both cases we'll get the error message from the PQresult */
if (error == 0)
res = PQputCopyEnd(curs->conn->pgconn, NULL);
else if (error == 2)
res = PQputCopyEnd(curs->conn->pgconn, "error in PQputCopyData() call");
else
PQputCopyEnd(curs->conn->pgconn, "error during .read() call");
res = PQputCopyEnd(curs->conn->pgconn, "error in .read() call");
/* and finally we grab the operation result from the backend */
IFCLEARPGRES(curs->pgres);
Dprintf("_pq_copy_in_v3: copy ended; res = %d", res);
/* if the result is -1 we should not even try to get a result from the
bacause that will lock the current thread forever */
if (res == -1) {
pq_raise(curs->conn, curs, NULL);
/* FIXME: pq_raise check the connection but for some reason even
if the error message says "server closed the connection unexpectedly"
the status returned by PQstatus is CONNECTION_OK! */
curs->conn->closed = 2;
}
else {
/* and finally we grab the operation result from the backend */
while ((curs->pgres = PQgetResult(curs->conn->pgconn)) != NULL) {
if (PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR)
pq_raise(curs->conn, curs, NULL);
IFCLEARPGRES(curs->pgres);
}
}
return error == 0 ? 1 : -1;
}

83
sandbox/leak.test.py Normal file
View File

@ -0,0 +1,83 @@
"""
script: test_leak.py
This script attempts to repeatedly insert the same list of rows into
the database table, causing a duplicate key error to occur. It will
then roll back the transaction and try again.
Database table schema:
-- CREATE TABLE t (foo TEXT PRIMARY KEY);
There are two ways to run the script, which will launch one of the
two functions:
# leak() will cause increasingly more RAM to be used by the script.
$ python <script_nam> leak
# noleak() does not have the RAM usage problem. The only difference
# between it and leak() is that 'rows' is created once, before the loop.
$ python <script_name> noleak
Use Control-C to quit the script.
"""
import sys
import psycopg2
DB_NAME = 'test'
connection = psycopg2.connect(database=DB_NAME)
cursor = connection.cursor()
# Uncomment the following if table 't' does not exist
create_table = """CREATE TABLE t (foo TEXT PRIMARY KEY)"""
cursor.execute(create_table)
insert = """INSERT INTO t VALUES (%(foo)s)"""
def leak():
"""rows created in each loop run"""
count = 0
while 1:
try:
rows = []
for i in range(1, 100):
row = {'foo': i}
rows.append(row)
count += 1
print "loop count:", count
cursor.executemany(insert, rows)
connection.commit()
except psycopg2.IntegrityError:
connection.rollback()
def noleak():
"""rows created once, before the loop"""
rows = []
for i in range(1, 100):
row = {'foo': i}
rows.append(row)
count = 0
while 1:
try:
count += 1
print "loop count:", count
cursor.executemany(insert, rows)
connection.commit()
except psycopg2.IntegrityError:
connection.rollback()
usage = "%s requires one argument: 'leak' or 'noleak'" % sys.argv[0]
try:
if 'leak' == sys.argv[1]:
run_function = leak
elif 'noleak' == sys.argv[1]:
run_function = noleak
else:
print usage
sys.exit()
except IndexError:
print usage
sys.exit()
# Run leak() or noleak(), whichever was indicated on the command line
run_function()

60639
sandbox/test_copy2.csv Normal file

File diff suppressed because it is too large Load Diff

43
sandbox/test_copy2.py Normal file
View File

@ -0,0 +1,43 @@
import psycopg2
dbconn = psycopg2.connect(database="test",host="localhost",port="5432")
query = """
CREATE TEMP TABLE data (
field01 char,
field02 varchar,
field03 varchar,
field04 varchar,
field05 varchar,
field06 varchar,
field07 varchar,
field08 varchar,
field09 numeric,
field10 integer,
field11 numeric,
field12 numeric,
field13 numeric,
field14 numeric,
field15 numeric,
field16 numeric,
field17 char,
field18 char,
field19 char,
field20 varchar,
field21 varchar,
field22 integer,
field23 char,
field24 char
);
"""
cursor = dbconn.cursor()
cursor.execute(query)
f = open('test_copy2.csv')
cursor.copy_from(f, 'data', sep='|')
f.close()
dbconn.commit()
cursor.close()
dbconn.close()

View File

@ -1,5 +1,5 @@
[build_ext]
define=PSYCOPG_EXTENSIONS,PSYCOPG_NEW_BOOLEAN,HAVE_PQFREEMEM,HAVE_PQPROTOCOL3
define=PSYCOPG_EXTENSIONS,PSYCOPG_NEW_BOOLEAN,HAVE_PQFREEMEM,HAVE_PQPROTOCOL3,PSYCOPG_DEBUG
# PSYCOPG_EXTENSIONS enables extensions to PEP-249 (you really want this)
# PSYCOPG_DISPLAY_SIZE enable display size calculation (a little slower)
# HAVE_PQFREEMEM should be defined on PostgreSQL >= 7.4