COPY FROM works.

This commit is contained in:
Federico Di Gregorio 2005-03-01 16:41:02 +00:00
parent 3141770f53
commit e5f558a6be
10 changed files with 343 additions and 57 deletions

View File

@ -1,5 +1,17 @@
2005-03-02 Federico Di Gregorio <fog@debian.org>
* COPY FROM implemented using both old and new (v3) protocol.
* psycopg/config.h (Dprintf): declaration for asprintf is gone.
* psycopg/pqpath.c (_pq_copy_in_v3): implemented.
2005-03-01 Federico Di Gregorio <fog@debian.org>
* setup.py: now we generate a slighly more verbose version string
that embeds some of the compile options, to facilitate users' bug
reports.
* psycopg/cursor_type.c (psyco_curs_copy_from): we now use
PyOS_snprintf instead of asprintf. On some platforms this can be
bad (win32).. if that's your case, get a better platform. :/
@ -57,7 +69,8 @@
2005-01-13 Federico Di Gregorio <fog@debian.org>
* ZPsycopgDA/db.py (DB.query): ported ZPsycopgDA connection fix
* ZPsycopgDA/db.py (DB.query
): ported ZPsycopgDA connection fix
from psycopg 1.1.
* lib/*.py: added pydoc-friendly messages.

View File

@ -18,7 +18,8 @@ DSN = 'dbname=test'
## don't modify anything below tis line (except for experimenting)
import sys, psycopg
import sys
import psycopg
if len(sys.argv) > 1:
DSN = sys.argv[1]

178
examples/copy_from.py Normal file
View File

@ -0,0 +1,178 @@
# copy_from.py -- example about copy_from
#
# Copyright (C) 2002 Tom Jenkins <tjenkins@devis.com>
# Copyright (C) 2005 Federico Di Gregorio <fog@initd.org>
#
# This program is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2, or (at your option) any later
# version.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTIBILITY
# or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
# for more details.
#
## put in DSN your DSN string
DSN = 'dbname=test'
## don't modify anything below tis line (except for experimenting)
import sys
import os
import StringIO
import psycopg
if len(sys.argv) > 1:
DSN = sys.argv[1]
print "Opening connection using dns:", DSN
conn = psycopg.connect(DSN)
print "Encoding for this connection is", conn.encoding
curs = conn.cursor()
try:
curs.execute("CREATE TABLE test_copy (fld1 text, fld2 text, fld3 int4)")
except:
conn.rollback()
curs.execute("DROP TABLE test_copy")
curs.execute("CREATE TABLE test_copy (fld1 text, fld2 text, fld3 int4)")
conn.commit()
# copy_from with default arguments, from open file
io = open('copy_from.txt', 'wr')
data = ['Tom\tJenkins\t37\n',
'Madonna\t\N\t45\n',
'Federico\tDi Gregorio\t\N\n']
io.writelines(data)
io.close()
io = open('copy_from.txt', 'r')
curs.copy_from(io, 'test_copy')
print "1) Copy %d records from file object " % len(data) + \
"using defaults (sep: \\t and null = \\N)"
io.close()
curs.execute("SELECT * FROM test_copy")
rows = curs.fetchall()
print " Select returned %d rows" % len(rows)
for r in rows:
print " %s %s\t%s" % (r[0], r[1], r[2])
curs.execute("delete from test_copy")
conn.commit()
# copy_from using custom separator, from open file
io = open('copy_from.txt', 'wr')
data = ['Tom:Jenkins:37\n',
'Madonna:\N:45\n',
'Federico:Di Gregorio:\N\n']
io.writelines(data)
io.close()
io = open('copy_from.txt', 'r')
curs.copy_from(io, 'test_copy', ':')
print "2) Copy %d records from file object using sep = :" % len(data)
io.close()
curs.execute("SELECT * FROM test_copy")
rows = curs.fetchall()
print " Select returned %d rows" % len(rows)
for r in rows:
print " %s %s\t%s" % (r[0], r[1], r[2])
curs.execute("delete from test_copy")
conn.commit()
# copy_from using custom null identifier, from open file
io = open('copy_from.txt', 'wr')
data = ['Tom\tJenkins\t37\n',
'Madonna\tNULL\t45\n',
'Federico\tDi Gregorio\tNULL\n']
io.writelines(data)
io.close()
io = open('copy_from.txt', 'r')
curs.copy_from(io, 'test_copy', null='NULL')
print "3) Copy %d records from file object using null = NULL" % len(data)
io.close()
curs.execute("SELECT * FROM test_copy")
rows = curs.fetchall()
print " Select using cursor returned %d rows" % len(rows)
for r in rows:
print " %s %s\t%s" % (r[0], r[1], r[2])
curs.execute("delete from test_copy")
conn.commit()
# copy_from using custom separator and null identifier
io = open('copy_from.txt', 'wr')
data = ['Tom:Jenkins:37\n', 'Madonna:NULL:45\n', 'Federico:Di Gregorio:NULL\n']
io.writelines(data)
io.close()
io = open('copy_from.txt', 'r')
curs.copy_from(io, 'test_copy', ':', 'NULL')
print "4) Copy %d records from file object " % len(data) + \
"using sep = : and null = NULL"
io.close()
curs.execute("SELECT * FROM test_copy")
rows = curs.fetchall()
print " Select using cursor returned %d rows" % len(rows)
for r in rows:
print " %s %s\t%s" % (r[0], r[1], r[2])
curs.execute("delete from test_copy")
conn.commit()
# anything can be used as a file if it has .read() and .readline() methods
data = StringIO.StringIO()
data.write('\n'.join(['Tom\tJenkins\t37',
'Madonna\t\N\t45',
'Federico\tDi Gregorio\t\N']))
data.seek(0)
curs.copy_from(data, 'test_copy')
print "5) Copy 3 records from StringIO object using defaults"
curs.execute("SELECT * FROM test_copy")
rows = curs.fetchall()
print " Select using cursor returned %d rows" % len(rows)
for r in rows:
print " %s %s\t%s" % (r[0], r[1], r[2])
curs.execute("delete from test_copy")
conn.commit()
# simple error test
print "6) About to raise an error"
data = StringIO.StringIO()
data.write('\n'.join(['Tom\tJenkins\t37',
'Madonna\t\N\t45',
'Federico\tDi Gregorio\taaa']))
data.seek(0)
try:
curs.copy_from(data, 'test_copy')
except StandardError, err:
conn.rollback()
print " Catched error (as expected):\n", err
conn.rollback()
curs.execute("DROP TABLE test_copy")
os.unlink('copy_from.txt')
conn.commit()

View File

@ -57,6 +57,7 @@ class DictCursor(_cursor):
self._build_index()
return res
class DictRow(list):
"""A row object that allow by-colun-name access to data."""

View File

@ -22,11 +22,6 @@
#ifndef PSYCOPG_CONFIG_H
#define PSYCOPG_CONFIG_H 1
/* replacement for asprintf() */
#ifndef HAVE_ASPRINTF
extern int asprintf(char **buffer, char *fmt, ...);
#endif
/* debug printf-like function */
#if defined( __GNUC__) && !defined(__APPLE__)
#ifdef PSYCOPG_DEBUG

View File

@ -134,7 +134,7 @@ psyco_conn_rollback(connectionObject *self, PyObject *args)
#ifdef PSYCOPG_EXTENSIONS
/* set_isolation_level method - switch connection isolation level */
#define psyco_conn_set_isolation_level_doc \
@ -186,7 +186,7 @@ psyco_conn_set_client_encoding(connectionObject *self, PyObject *args)
return NULL;
}
}
#endif
/** the connection object **/

View File

@ -901,6 +901,7 @@ psyco_curs_scroll(cursorObject *self, PyObject *args, PyObject *kwargs)
Py_INCREF(Py_None);
return Py_None;
}
@ -910,7 +911,64 @@ psyco_curs_scroll(cursorObject *self, PyObject *args, PyObject *kwargs)
/* extension: copy_from - implements COPY FROM */
#define psyco_curs_copy_from_doc \
"copy_from(file, table, sep='\\t', null='NULL') -> copy file to table."
"copy_from(file, table, sep='\\t', null='\\N') -> copy file to table."
static int
_psyco_curs_has_read_check(PyObject* o, void* var)
{
if (PyObject_HasAttrString(o, "readline")
&& PyObject_HasAttrString(o, "read")) {
Py_INCREF(o);
*((PyObject**)var) = o;
return 1;
}
else {
PyErr_SetString(PyExc_TypeError,
"argument 1 must have both .read() and .readline() methods");
return 0;
}
}
static PyObject *
psyco_curs_copy_from(cursorObject *self, PyObject *args, PyObject *kwargs)
{
char query[256];
char *table_name;
char *sep = "\t", *null = NULL;
long int bufsize = DEFAULT_COPYSIZE;
PyObject *file, *res = NULL;
static char *kwlist[] = {"file", "table", "sep", "null", "size", NULL};
if (!PyArg_ParseTupleAndKeywords(args, kwargs, "O&s|ssi", kwlist,
_psyco_curs_has_read_check, &file,
&table_name, &sep, &null, &bufsize)) {
return NULL;
}
EXC_IF_CURS_CLOSED(self);
if (null) {
PyOS_snprintf(query, 256, "COPY %s FROM stdin USING DELIMITERS '%s'"
" WITH NULL AS '%s'", table_name, sep, null);
}
else {
PyOS_snprintf(query, 256, "COPY %s FROM stdin USING DELIMITERS '%s'",
table_name, sep);
}
Dprintf("psyco_curs_copy_from: query = %s", query);
self->copysize = bufsize;
self->copyfile = file;
Py_INCREF(file);
if (pq_execute(self, query, 0) == 1) {
res = Py_None;
Py_INCREF(Py_None);
}
return res;
}
static int
_psyco_curs_has_write_check(PyObject* o, void* var)
@ -927,41 +985,6 @@ _psyco_curs_has_write_check(PyObject* o, void* var)
}
}
static PyObject *
psyco_curs_copy_from(cursorObject *self, PyObject *args)
{
char query[256];
char *table_name;
char *sep = "\t", *null ="NULL";
long int bufsize = DEFAULT_COPYSIZE;
PyObject *file, *res = NULL;
if (!PyArg_ParseTuple(args, "O&s|ssi",
_psyco_curs_has_write_check, &file,
&table_name, &sep, &null, &bufsize)) {
return NULL;
}
EXC_IF_CURS_CLOSED(self);
PyOS_snprintf(query, 256, "COPY %s FROM stdin USING DELIMITERS '%s'"
" WITH NULL AS '%s'", table_name, sep, null);
Dprintf("psyco_curs_copy_from: query = %s", query);
self->copysize = bufsize;
self->copyfile = file;
Py_INCREF(file);
if (pq_execute(self, query, 0) == 1) {
res = Py_None;
Py_INCREF(Py_None);
}
free(query);
return res;
}
/* extension: fileno - return the file descripor of the connection */
#define psyco_curs_fileno_doc \
@ -1084,7 +1107,7 @@ static struct PyMethodDef cursorObject_methods[] = {
{"isready", (PyCFunction)psyco_curs_isready,
METH_VARARGS, psyco_curs_isready_doc},
{"copy_from", (PyCFunction)psyco_curs_copy_from,
METH_VARARGS, psyco_curs_copy_from_doc},
METH_VARARGS|METH_KEYWORDS, psyco_curs_copy_from_doc},
#endif
{NULL}
};

View File

@ -73,8 +73,8 @@ pq_raise(connectionObject *conn, cursorObject *curs, PyObject *exc, char *msg)
if (curs && curs->pgres) {
if (conn->protocol == 3) {
#ifdef HAVE_PQPROTOCOL3
char *pgstate = PQresultErrorField(curs->pgres,
PG_DIAG_SQLSTATE);
char *pgstate =
PQresultErrorField(curs->pgres, PG_DIAG_SQLSTATE);
if (!strncmp(pgstate, "23", 2))
exc = IntegrityError;
else
@ -130,6 +130,8 @@ pq_set_critical(connectionObject *conn, const char *msg)
PyObject *
pq_resolve_critical(connectionObject *conn, int close)
{
Dprintf("pq_resolve_critical: resolving %s", conn->critical);
if (conn->critical) {
char *msg = &(conn->critical[6]);
Dprintf("pq_resolve_critical: error = %s", msg);
@ -562,8 +564,51 @@ _pq_copy_in_v3(cursorObject *curs)
/* COPY FROM implementation when protocol 3 is available: this function
uses the new PQputCopyData() and can detect errors and set the correct
exception */
PyObject *o;
int length = 0, error = 0;
return -1;
Dprintf("_pq_copy_in_v3: called with object at %p", curs->copyfile);
while (1) {
o = PyObject_CallMethod(curs->copyfile, "read", "i", curs->copysize);
if (!o || !PyString_Check(o) || (length = PyString_Size(o)) == -1) {
error = 1;
}
if (length == 0 || error == 1) break;
Py_BEGIN_ALLOW_THREADS;
if (PQputCopyData(curs->conn->pgconn,
PyString_AS_STRING(o), length) == -1) {
error = 2;
}
Py_END_ALLOW_THREADS;
if (error == 2) break;
Py_DECREF(o);
}
Py_XDECREF(o);
Dprintf("_pq_copy_in_v3: error = %d", error);
if (error == 0 || error == 2)
/* 0 means that the copy went well, 2 that there was an error on the
backend: in both cases we'll get the error message from the
PQresult */
PQputCopyEnd(curs->conn->pgconn, NULL);
else
PQputCopyEnd(curs->conn->pgconn, "error during .read() call");
/* and finally we grab the operation result from the backend */
IFCLEARPGRES(curs->pgres);
while ((curs->pgres = PQgetResult(curs->conn->pgconn)) != NULL) {
if (PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR)
pq_raise(curs->conn, curs, NULL, NULL);
IFCLEARPGRES(curs->pgres);
}
return 1;
}
#endif
static int
@ -587,6 +632,15 @@ _pq_copy_in(cursorObject *curs)
PQputline(curs->conn->pgconn, "\\.\n");
PQendcopy(curs->conn->pgconn);
/* if for some reason we're using a protocol 3 libpq to connect to a
protocol 2 backend we still need to cycle on the result set */
IFCLEARPGRES(curs->pgres);
while ((curs->pgres = PQgetResult(curs->conn->pgconn)) != NULL) {
if (PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR)
pq_raise(curs->conn, curs, NULL, NULL);
IFCLEARPGRES(curs->pgres);
}
return 1;
}

View File

@ -1,5 +1,6 @@
[build_ext]
define=PSYCOPG_DEBUG,PSYCOPG_EXTENSIONS,PSYCOPG_DISPLAY_SIZE,HAVE_ASPRINTF,HAVE_PQFREEMEM,HAVE_PQPROTOCOL3
define=PSYCOPG_EXTENSIONS,PSYCOPG_DISPLAY_SIZE,HAVE_PQFREEMEM,HAVE_PQPROTOCOL3
# PSYCOPG_DEBUG can be added to enable verbose debug information
# PSYCOPG_OWN_QUOTING can be added above but it is deprecated
# include_dirs is the preferred method for locating postgresql headers,

View File

@ -48,6 +48,7 @@ from distutils.sysconfig import get_python_inc
import distutils.ccompiler
PSYCOPG_VERSION = '1.99.11/devel'
version_flags = []
have_pydatetime = False
have_mxdatetime = False
@ -80,10 +81,7 @@ if sys.version_info[0] >= 2 and sys.version_info[1] >= 4:
ext = [] ; data_files = []
library_dirs = [] ; libraries = [] ; include_dirs = []
if sys.platform != 'win32':
define_macros.append(('PSYCOPG_VERSION', '"'+PSYCOPG_VERSION+'"'))
else:
define_macros.append(('PSYCOPG_VERSION', '\\"'+PSYCOPG_VERSION+'\\"'))
if sys.platform == 'win32':
include_dirs = ['.',
POSTGRESQLDIR + "\\src\\interfaces\\libpq",
POSTGRESQLDIR + "\\src\\include" ]
@ -124,12 +122,14 @@ if os.path.exists(mxincludedir):
define_macros.append(('HAVE_MXDATETIME','1'))
sources.append('adapter_mxdatetime.c')
have_mxdatetime = True
version_flags.append('mx')
# check for python datetime package
if os.path.exists(os.path.join(get_python_inc(plat_specific=1),"datetime.h")):
define_macros.append(('HAVE_PYDATETIME','1'))
sources.append('adapter_datetime.c')
have_pydatetime = True
version_flags.append('dt')
# now decide which package will be the default for date/time typecasts
if have_pydatetime and use_pydatetime \
@ -143,6 +143,26 @@ else:
sys.stderr.write("error: python datetime module not found\n")
sys.exit(1)
# generate a nice version string to avoid confusion when users report bugs
from ConfigParser import ConfigParser
parser = ConfigParser()
parser.read('setup.cfg')
for have in parser.get('build_ext', 'define').split(','):
if have == 'PSYCOPG_EXTENSIONS':
version_flags.append('ext')
elif have == 'HAVE_PQPROTOCOL3':
version_flags.append('pq3')
if version_flags:
PSYCOPG_VERSION_EX = PSYCOPG_VERSION + " (%s)" % ' '.join(version_flags)
else:
PSYCOPG_VERSION_EX = PSYCOPG_VERSION
if sys.platform != 'win32':
define_macros.append(('PSYCOPG_VERSION', '"'+PSYCOPG_VERSION_EX+'"'))
else:
define_macros.append(('PSYCOPG_VERSION', '\\"'+PSYCOPG_VERSION_EX+'\\"'))
# build the extension
sources = map(lambda x: os.path.join('psycopg', x), sources)