From 3cf4b7ca6fc59e0dbfdb371c751da42c479a6cf4 Mon Sep 17 00:00:00 2001 From: Federico Di Gregorio Date: Wed, 2 Mar 2005 14:07:03 +0000 Subject: [PATCH] Finished COPY TO/COPY FROM implementation. --- ChangeLog | 7 +++ examples/copy_to.py | 105 ++++++++++++++++++++++++++++++++++++++++++ psycopg/cursor_type.c | 51 ++++++++++++++++++-- psycopg/pqpath.c | 43 ++++++++++------- 4 files changed, 187 insertions(+), 19 deletions(-) create mode 100644 examples/copy_to.py diff --git a/ChangeLog b/ChangeLog index 2c49c7ce..6b342656 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,5 +1,12 @@ 2005-03-02 Federico Di Gregorio + * COPY TO implemented using both old and new (v3) protocol. + + * psycopg/pqpath.c (_pq_copy_out_v3): implemented and working. + + * psycopg/cursor_type.c (psyco_curs_copy_to): added cursor object + interface for copy_to. + * COPY FROM implemented using both old and new (v3) protocol. * psycopg/config.h (Dprintf): declaration for asprintf is gone. diff --git a/examples/copy_to.py b/examples/copy_to.py new file mode 100644 index 00000000..f4433ed7 --- /dev/null +++ b/examples/copy_to.py @@ -0,0 +1,105 @@ +# copy_to.py -- example about copy_to +# +# Copyright (C) 2002 Tom Jenkins +# Copyright (C) 2005 Federico Di Gregorio +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by the +# Free Software Foundation; either version 2, or (at your option) any later +# version. +# +# This program is distributed in the hope that it will be useful, but +# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY +# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License +# for more details. +# + +## put in DSN your DSN string + +DSN = 'dbname=test' + +## don't modify anything below tis line (except for experimenting) + +import sys +import os +import StringIO +import psycopg + +if len(sys.argv) > 1: + DSN = sys.argv[1] + +print "Opening connection using dns:", DSN +conn = psycopg.connect(DSN) +print "Encoding for this connection is", conn.encoding + +curs = conn.cursor() +try: + curs.execute("CREATE TABLE test_copy (fld1 text, fld2 text, fld3 int4)") +except: + conn.rollback() + curs.execute("DROP TABLE test_copy") + curs.execute("CREATE TABLE test_copy (fld1 text, fld2 text, fld3 int4)") +conn.commit() + +# demostrate copy_to functionality +data = [('Tom', 'Jenkins', '37'), + ('Madonna', None, '45'), + ('Federico', 'Di Gregorio', None)] +query = "INSERT INTO test_copy VALUES (%s, %s, %s)" +for row in data: + curs.execute(query, row) +conn.commit() + +# copy_to using defaults +io = open('copy_to.txt', 'w') +curs.copy_to(io, 'test_copy') +print "1) Copy %d records into file object using defaults: " % len (data) + \ + "sep = \\t and null = \\N" +io.close() + +rows = open('copy_to.txt', 'r').readlines() +print " File has %d rows:" % len(rows) + +for r in rows: + print " ", r, + +# copy_to using custom separator +io = open('copy_to.txt', 'w') +curs.copy_to(io, 'test_copy', ':') +print "2) Copy %d records into file object using sep = :" % len(data) +io.close() + +rows = open('copy_to.txt', 'r').readlines() +print " File has %d rows:" % len(rows) + +for r in rows: + print " ", r, + +# copy_to using custom null identifier +io = open('copy_to.txt', 'w') +curs.copy_to(io, 'test_copy', null='NULL') +print "3) Copy %d records into file object using null = NULL" % len(data) +io.close() + +rows = open('copy_to.txt', 'r').readlines() +print " File has %d rows:" % len(rows) + +for r in rows: + print " ", r, + +# copy_to using custom separator and null identifier +io = open('copy_to.txt', 'w') +curs.copy_to(io, 'test_copy', ':', 'NULL') +print "4) Copy %d records into file object using sep = : and null ) NULL" % \ + len(data) +io.close() + +rows = open('copy_to.txt', 'r').readlines() +print " File has %d rows:" % len(rows) + +for r in rows: + print " ", r, + +curs.execute("DROP TABLE test_copy") +os.unlink('copy_to.txt') +conn.commit() diff --git a/psycopg/cursor_type.c b/psycopg/cursor_type.c index 95729b04..3dab43a4 100644 --- a/psycopg/cursor_type.c +++ b/psycopg/cursor_type.c @@ -949,27 +949,31 @@ psyco_curs_copy_from(cursorObject *self, PyObject *args, PyObject *kwargs) EXC_IF_CURS_CLOSED(self); if (null) { - PyOS_snprintf(query, 256, "COPY %s FROM stdin USING DELIMITERS '%s'" + PyOS_snprintf(query, 255, "COPY %s FROM stdin USING DELIMITERS '%s'" " WITH NULL AS '%s'", table_name, sep, null); } else { - PyOS_snprintf(query, 256, "COPY %s FROM stdin USING DELIMITERS '%s'", + PyOS_snprintf(query, 255, "COPY %s FROM stdin USING DELIMITERS '%s'", table_name, sep); } Dprintf("psyco_curs_copy_from: query = %s", query); self->copysize = bufsize; self->copyfile = file; - Py_INCREF(file); if (pq_execute(self, query, 0) == 1) { res = Py_None; Py_INCREF(Py_None); } + + self->copyfile =NULL; return res; } +#define psyco_curs_copy_to_doc \ +"copy_to(file, table, sep='\\t', null='\\N') -> copy file to table." + static int _psyco_curs_has_write_check(PyObject* o, void* var) { @@ -985,6 +989,45 @@ _psyco_curs_has_write_check(PyObject* o, void* var) } } +static PyObject * +psyco_curs_copy_to(cursorObject *self, PyObject *args, PyObject *kwargs) +{ + char query[256]; + char *table_name; + char *sep = "\t", *null = NULL; + PyObject *file, *res = NULL; + + static char *kwlist[] = {"file", "table", "sep", "null", NULL}; + + if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O&s|ss", kwlist, + _psyco_curs_has_write_check, &file, + &table_name, &sep, &null)) { + return NULL; + } + + EXC_IF_CURS_CLOSED(self); + + if (null) { + PyOS_snprintf(query, 255, "COPY %s TO stdout USING DELIMITERS '%s'" + " WITH NULL AS '%s'", table_name, sep, null); + } + else { + PyOS_snprintf(query, 255, "COPY %s TO stdout USING DELIMITERS '%s'", + table_name, sep); + } + + self->copysize = 0; + self->copyfile = file; + + if (pq_execute(self, query, 0) == 1) { + res = Py_None; + Py_INCREF(Py_None); + } + + self->copyfile = NULL; + + return res; +} /* extension: fileno - return the file descripor of the connection */ #define psyco_curs_fileno_doc \ @@ -1108,6 +1151,8 @@ static struct PyMethodDef cursorObject_methods[] = { METH_VARARGS, psyco_curs_isready_doc}, {"copy_from", (PyCFunction)psyco_curs_copy_from, METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_from_doc}, + {"copy_to", (PyCFunction)psyco_curs_copy_to, + METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_to_doc}, #endif {NULL} }; diff --git a/psycopg/pqpath.c b/psycopg/pqpath.c index e9b0afb5..c51ed5fa 100644 --- a/psycopg/pqpath.c +++ b/psycopg/pqpath.c @@ -451,7 +451,7 @@ _pq_fetch_tuples(cursorObject *curs) /* calculate the display size for each column (cpu intensive, can be switched off at configuration time) */ #ifdef PSYCOPG_DISPLAY_SIZE - dsize = (int *)calloc(pgnfields, sizeof(int)); + dsize = (int *)PyMem_Malloc(pgnfields * sizeof(int)); if (dsize != NULL) { if (curs->rowcount == 0) { for (i=0; i < pgnfields; i++) @@ -554,7 +554,7 @@ _pq_fetch_tuples(cursorObject *curs) PyTuple_SET_ITEM(dtitem, 6, Py_None); } - if (dsize) free(dsize); + if (dsize) PyMem_Free(dsize); } #ifdef HAVE_PQPROTOCOL3 @@ -566,8 +566,6 @@ _pq_copy_in_v3(cursorObject *curs) exception */ PyObject *o; int length = 0, error = 0; - - Dprintf("_pq_copy_in_v3: called with object at %p", curs->copyfile); while (1) { o = PyObject_CallMethod(curs->copyfile, "read", "i", curs->copysize); @@ -589,8 +587,6 @@ _pq_copy_in_v3(cursorObject *curs) } Py_XDECREF(o); - - Dprintf("_pq_copy_in_v3: error = %d", error); if (error == 0 || error == 2) /* 0 means that the copy went well, 2 that there was an error on the @@ -657,7 +653,7 @@ _pq_copy_out_v3(cursorObject *curs) Py_END_ALLOW_THREADS; if (len > 0 && buffer) { - PyObject_CallMethod(curs->copyfile, "write", "s", buffer); + PyObject_CallMethod(curs->copyfile, "write", "s#", buffer, len); PQfreemem(buffer); } /* we break on len == 0 but note that that should *not* happen, @@ -671,6 +667,13 @@ _pq_copy_out_v3(cursorObject *curs) return -1; } + /* and finally we grab the operation result from the backend */ + IFCLEARPGRES(curs->pgres); + while ((curs->pgres = PQgetResult(curs->conn->pgconn)) != NULL) { + if (PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR) + pq_raise(curs->conn, curs, NULL, NULL); + IFCLEARPGRES(curs->pgres); + } return 1; } #endif @@ -680,11 +683,11 @@ _pq_copy_out(cursorObject *curs) { char buffer[4096]; int status, len; - PyObject *o; while (1) { Py_BEGIN_ALLOW_THREADS; status = PQgetline(curs->conn->pgconn, buffer, 4096); + Py_END_ALLOW_THREADS; if (status == 0) { if (buffer[0] == '\\' && buffer[1] == '.') break; @@ -695,21 +698,27 @@ _pq_copy_out(cursorObject *curs) len = 4096-1; } else { - Py_BLOCK_THREADS; return -1; } - Py_END_ALLOW_THREADS; - o = PyString_FromStringAndSize(buffer, len); - PyObject_CallMethod(curs->copyfile, "write", "O", o); - Py_DECREF(o); + PyObject_CallMethod(curs->copyfile, "write", "s#", buffer, len); } - if (PQendcopy(curs->conn->pgconn) != 0) return -1; + status = 1; + if (PQendcopy(curs->conn->pgconn) != 0) + status = -1; - return 1; -} + /* if for some reason we're using a protocol 3 libpq to connect to a + protocol 2 backend we still need to cycle on the result set */ + IFCLEARPGRES(curs->pgres); + while ((curs->pgres = PQgetResult(curs->conn->pgconn)) != NULL) { + if (PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR) + pq_raise(curs->conn, curs, NULL, NULL); + IFCLEARPGRES(curs->pgres); + } + return status; +} int pq_fetch(cursorObject *curs) @@ -840,6 +849,8 @@ pq_fetch(cursorObject *curs) break; } + Dprintf("pq_fetch: fetching done; check for critical errors"); + /* error checking, close the connection if necessary (some critical errors are not really critical, like a COPY FROM error: if that's the case we raise the exception but we avoid to close the connection) */