Finished COPY TO/COPY FROM implementation.

This commit is contained in:
Federico Di Gregorio 2005-03-02 14:07:03 +00:00
parent e5f558a6be
commit 3cf4b7ca6f
4 changed files with 187 additions and 19 deletions

View File

@ -1,5 +1,12 @@
2005-03-02 Federico Di Gregorio <fog@debian.org>
* COPY TO implemented using both old and new (v3) protocol.
* psycopg/pqpath.c (_pq_copy_out_v3): implemented and working.
* psycopg/cursor_type.c (psyco_curs_copy_to): added cursor object
interface for copy_to.
* COPY FROM implemented using both old and new (v3) protocol.
* psycopg/config.h (Dprintf): declaration for asprintf is gone.

105
examples/copy_to.py Normal file
View File

@ -0,0 +1,105 @@
# copy_to.py -- example about copy_to
#
# Copyright (C) 2002 Tom Jenkins <tjenkins@devis.com>
# Copyright (C) 2005 Federico Di Gregorio <fog@initd.org>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
## put in DSN your DSN string
DSN = 'dbname=test'
## don't modify anything below tis line (except for experimenting)
import sys
import os
import StringIO
import psycopg
if len(sys.argv) > 1:
DSN = sys.argv[1]
print "Opening connection using dns:", DSN
conn = psycopg.connect(DSN)
print "Encoding for this connection is", conn.encoding
curs = conn.cursor()
try:
curs.execute("CREATE TABLE test_copy (fld1 text, fld2 text, fld3 int4)")
except:
conn.rollback()
curs.execute("DROP TABLE test_copy")
curs.execute("CREATE TABLE test_copy (fld1 text, fld2 text, fld3 int4)")
conn.commit()
# demostrate copy_to functionality
data = [('Tom', 'Jenkins', '37'),
('Madonna', None, '45'),
('Federico', 'Di Gregorio', None)]
query = "INSERT INTO test_copy VALUES (%s, %s, %s)"
for row in data:
curs.execute(query, row)
conn.commit()
# copy_to using defaults
io = open('copy_to.txt', 'w')
curs.copy_to(io, 'test_copy')
print "1) Copy %d records into file object using defaults: " % len (data) + \
"sep = \\t and null = \\N"
io.close()
rows = open('copy_to.txt', 'r').readlines()
print " File has %d rows:" % len(rows)
for r in rows:
print " ", r,
# copy_to using custom separator
io = open('copy_to.txt', 'w')
curs.copy_to(io, 'test_copy', ':')
print "2) Copy %d records into file object using sep = :" % len(data)
io.close()
rows = open('copy_to.txt', 'r').readlines()
print " File has %d rows:" % len(rows)
for r in rows:
print " ", r,
# copy_to using custom null identifier
io = open('copy_to.txt', 'w')
curs.copy_to(io, 'test_copy', null='NULL')
print "3) Copy %d records into file object using null = NULL" % len(data)
io.close()
rows = open('copy_to.txt', 'r').readlines()
print " File has %d rows:" % len(rows)
for r in rows:
print " ", r,
# copy_to using custom separator and null identifier
io = open('copy_to.txt', 'w')
curs.copy_to(io, 'test_copy', ':', 'NULL')
print "4) Copy %d records into file object using sep = : and null ) NULL" % \
len(data)
io.close()
rows = open('copy_to.txt', 'r').readlines()
print " File has %d rows:" % len(rows)
for r in rows:
print " ", r,
curs.execute("DROP TABLE test_copy")
os.unlink('copy_to.txt')
conn.commit()

View File

@ -949,27 +949,31 @@ psyco_curs_copy_from(cursorObject *self, PyObject *args, PyObject *kwargs)
EXC_IF_CURS_CLOSED(self);
if (null) {
PyOS_snprintf(query, 256, "COPY %s FROM stdin USING DELIMITERS '%s'"
PyOS_snprintf(query, 255, "COPY %s FROM stdin USING DELIMITERS '%s'"
" WITH NULL AS '%s'", table_name, sep, null);
}
else {
PyOS_snprintf(query, 256, "COPY %s FROM stdin USING DELIMITERS '%s'",
PyOS_snprintf(query, 255, "COPY %s FROM stdin USING DELIMITERS '%s'",
table_name, sep);
}
Dprintf("psyco_curs_copy_from: query = %s", query);
self->copysize = bufsize;
self->copyfile = file;
Py_INCREF(file);
if (pq_execute(self, query, 0) == 1) {
res = Py_None;
Py_INCREF(Py_None);
}
self->copyfile =NULL;
return res;
}
#define psyco_curs_copy_to_doc \
"copy_to(file, table, sep='\\t', null='\\N') -> copy file to table."
static int
_psyco_curs_has_write_check(PyObject* o, void* var)
{
@ -985,6 +989,45 @@ _psyco_curs_has_write_check(PyObject* o, void* var)
}
}
static PyObject *
psyco_curs_copy_to(cursorObject *self, PyObject *args, PyObject *kwargs)
{
char query[256];
char *table_name;
char *sep = "\t", *null = NULL;
PyObject *file, *res = NULL;
static char *kwlist[] = {"file", "table", "sep", "null", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O&s|ss", kwlist,
_psyco_curs_has_write_check, &file,
&table_name, &sep, &null)) {
return NULL;
}
EXC_IF_CURS_CLOSED(self);
if (null) {
PyOS_snprintf(query, 255, "COPY %s TO stdout USING DELIMITERS '%s'"
" WITH NULL AS '%s'", table_name, sep, null);
}
else {
PyOS_snprintf(query, 255, "COPY %s TO stdout USING DELIMITERS '%s'",
table_name, sep);
}
self->copysize = 0;
self->copyfile = file;
if (pq_execute(self, query, 0) == 1) {
res = Py_None;
Py_INCREF(Py_None);
}
self->copyfile = NULL;
return res;
}
/* extension: fileno - return the file descripor of the connection */
#define psyco_curs_fileno_doc \
@ -1108,6 +1151,8 @@ static struct PyMethodDef cursorObject_methods[] = {
METH_VARARGS, psyco_curs_isready_doc},
{"copy_from", (PyCFunction)psyco_curs_copy_from,
METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_from_doc},
{"copy_to", (PyCFunction)psyco_curs_copy_to,
METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_to_doc},
#endif
{NULL}
};

View File

@ -451,7 +451,7 @@ _pq_fetch_tuples(cursorObject *curs)
/* calculate the display size for each column (cpu intensive, can be
switched off at configuration time) */
#ifdef PSYCOPG_DISPLAY_SIZE
dsize = (int *)calloc(pgnfields, sizeof(int));
dsize = (int *)PyMem_Malloc(pgnfields * sizeof(int));
if (dsize != NULL) {
if (curs->rowcount == 0) {
for (i=0; i < pgnfields; i++)
@ -554,7 +554,7 @@ _pq_fetch_tuples(cursorObject *curs)
PyTuple_SET_ITEM(dtitem, 6, Py_None);
}
if (dsize) free(dsize);
if (dsize) PyMem_Free(dsize);
}
#ifdef HAVE_PQPROTOCOL3
@ -566,8 +566,6 @@ _pq_copy_in_v3(cursorObject *curs)
exception */
PyObject *o;
int length = 0, error = 0;
Dprintf("_pq_copy_in_v3: called with object at %p", curs->copyfile);
while (1) {
o = PyObject_CallMethod(curs->copyfile, "read", "i", curs->copysize);
@ -589,8 +587,6 @@ _pq_copy_in_v3(cursorObject *curs)
}
Py_XDECREF(o);
Dprintf("_pq_copy_in_v3: error = %d", error);
if (error == 0 || error == 2)
/* 0 means that the copy went well, 2 that there was an error on the
@ -657,7 +653,7 @@ _pq_copy_out_v3(cursorObject *curs)
Py_END_ALLOW_THREADS;
if (len > 0 && buffer) {
PyObject_CallMethod(curs->copyfile, "write", "s", buffer);
PyObject_CallMethod(curs->copyfile, "write", "s#", buffer, len);
PQfreemem(buffer);
}
/* we break on len == 0 but note that that should *not* happen,
@ -671,6 +667,13 @@ _pq_copy_out_v3(cursorObject *curs)
return -1;
}
/* and finally we grab the operation result from the backend */
IFCLEARPGRES(curs->pgres);
while ((curs->pgres = PQgetResult(curs->conn->pgconn)) != NULL) {
if (PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR)
pq_raise(curs->conn, curs, NULL, NULL);
IFCLEARPGRES(curs->pgres);
}
return 1;
}
#endif
@ -680,11 +683,11 @@ _pq_copy_out(cursorObject *curs)
{
char buffer[4096];
int status, len;
PyObject *o;
while (1) {
Py_BEGIN_ALLOW_THREADS;
status = PQgetline(curs->conn->pgconn, buffer, 4096);
Py_END_ALLOW_THREADS;
if (status == 0) {
if (buffer[0] == '\\' && buffer[1] == '.') break;
@ -695,21 +698,27 @@ _pq_copy_out(cursorObject *curs)
len = 4096-1;
}
else {
Py_BLOCK_THREADS;
return -1;
}
Py_END_ALLOW_THREADS;
o = PyString_FromStringAndSize(buffer, len);
PyObject_CallMethod(curs->copyfile, "write", "O", o);
Py_DECREF(o);
PyObject_CallMethod(curs->copyfile, "write", "s#", buffer, len);
}
if (PQendcopy(curs->conn->pgconn) != 0) return -1;
status = 1;
if (PQendcopy(curs->conn->pgconn) != 0)
status = -1;
return 1;
}
/* if for some reason we're using a protocol 3 libpq to connect to a
protocol 2 backend we still need to cycle on the result set */
IFCLEARPGRES(curs->pgres);
while ((curs->pgres = PQgetResult(curs->conn->pgconn)) != NULL) {
if (PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR)
pq_raise(curs->conn, curs, NULL, NULL);
IFCLEARPGRES(curs->pgres);
}
return status;
}
int
pq_fetch(cursorObject *curs)
@ -840,6 +849,8 @@ pq_fetch(cursorObject *curs)
break;
}
Dprintf("pq_fetch: fetching done; check for critical errors");
/* error checking, close the connection if necessary (some critical errors
are not really critical, like a COPY FROM error: if that's the case we
raise the exception but we avoid to close the connection) */