Merge remote branch 'piro/python2' into python2

This commit is contained in:
Federico Di Gregorio 2010-12-01 19:47:44 +01:00
commit f981e81813
47 changed files with 1203 additions and 563 deletions

View File

@ -1,3 +1,50 @@
2010-12-01 Daniele Varrazzo <daniele.varrazzo@gmail.com>
* lib/extras.py: DictRow items can be updated. Patch by Alex Aster.
2010-11-28 Daniele Varrazzo <daniele.varrazzo@gmail.com>
* Cancel patch from Jan integrated.
* psycopg/connection_type.c: can't cancel a prepared connection
2010-11-22 Daniele Varrazzo <daniele.varrazzo@gmail.com>
* psycopg/connection_int.c: dropped notices hack to get COPY errors from
V2 protocol.
2010-11-18 Daniele Varrazzo <daniele.varrazzo@gmail.com>
* psycopg/connection.h: Added enum with possilbe isolation level states.
Also, general isolation levels cleanup and tests added.
* psycopg/utils.c: compiler warning dropped.
* typecast.h: functions exported to drop warnings.
* datetime module initialized at is supposed to be.
* mx.DateTime module initialized at is supposed to be.
2010-11-17 Daniele Varrazzo <daniele.varrazzo@gmail.com>
* psycopg/connection_type.c: don't clobber exception if
isolation_level_switch fails.
2010-11-16 Daniele Varrazzo <daniele.varrazzo@gmail.com>
* psycopg/connection_int.c: abort connection to protocol 2 server.
* psycopg/pqpath.c
* psycopg/connection_int.c: dropped support for protocol 2 at compile
time and protocol 2-specific code.
* psycopg/connection_int.c: don't run a query at every connection to detect
client encoding: use PQparameterStatus() instead.
* psycopg/connection_int.c: don't run a query at every connection to set
the datestyle to ISO if PQparameterStatus() reports it already is.
2010-11-11 Daniele Varrazzo <daniele.varrazzo@gmail.com>
* lib/extras.py: build the namedtuple only once per execution, not once

46
INSTALL
View File

@ -6,9 +6,10 @@ Compiling and installing psycopg
option. psycopg2 will work correctly even with a non-thread-safe libpq but
libpq will leak memory.
While psycopg 1.x used autoconf for its build process psycopg 2 switched to
the more pythonic setup.py. Before building psycopg look at setup.cfg file
and change any settings to follow your system (or taste); then:
psycopg2 uses distutils for its build process, so most of the process is
executed by the setup.py script. Before building psycopg look at
setup.cfg file and change any settings to follow your system (or taste);
then:
python setup.py build
@ -34,6 +35,45 @@ libpq-fe.h: No such file or directory
include_dirs variable (and note that a working pg_config is better.)
Running the test suite
======================
The included Makefile allows to run all the tests included in the
distribution. Just use:
make
make runtests
The tests are run against a database called psycopg2_test on unix socket
and standard port. You can configure a different database to run the test
by setting the environment variables:
- PSYCOPG2_TESTDB
- PSYCOPG2_TESTDB_HOST
- PSYCOPG2_TESTDB_PORT
- PSYCOPG2_TESTDB_USER
The database should be created before running the tests.
The standard Python unittest is used to run the tests. But if unittest2 is
found it will be used instead, with the result of having more informations
about skipped tests.
Building the documentation
==========================
In order to build the documentation included in the distribution, use
make env
make docs
The first command will install all the dependencies (Sphinx, Docutils) in
an 'env' directory in the project tree. The second command will build both
the html format (in the 'doc/html' directory) and in plain text
(doc/psycopg2.txt)
Using setuptools and EasyInstall
================================

View File

@ -71,7 +71,7 @@ docs-txt: doc/psycopg2.txt
sdist: $(SDIST)
runtests: package
PSYCOPG2_TESTDB=$(TESTDB) PYTHONPATH=$(BUILD_DIR):. $(PYTHON) tests/__init__.py --verbose
PSYCOPG2_TESTDB=$(TESTDB) PYTHONPATH=$(BUILD_DIR):.:$(PYTHONPATH) $(PYTHON) tests/__init__.py --verbose
# The environment is currently required to build the documentation.

View File

@ -3,18 +3,24 @@ What's new in psycopg 2.3.0
psycopg 2.3 aims to expose some new features introduced in PostgreSQL 9.0.
* New features related to features introduced in PostgreSQL 9.0:
* Main new features:
- `dict` to `hstore` adapter and `hstore` to `dict` typecaster, using both
9.0 and pre-9.0 syntax.
- Two-phase commit protocol support as per DBAPI specification.
- Support for payload in notifications received from the backed.
- namedtuple returning cursor.
- `namedtuple`-returning cursor.
- Query execution cancel.
* Other features and changes:
- Dropped support for protocol 2: Psycopg 2.3 can only connect to PostgreSQL
servers with version at least 7.4.
- Don't issue a query at every connection to detect the client encoding
and to set the datestyle to ISO if it is already compatible with what
expected.
- `mogrify()` now supports unicode queries.
- subclasses of a type that can be adapted are adapted as the superclass.
- Subclasses of a type that can be adapted are adapted as the superclass.
- `errorcodes` knows a couple of new codes introduced in PostgreSQL 9.0.
- Dropped deprecated Psycopg "own quoting".
- Never issue a ROLLBACK on close/GC. This behaviour was introduced as a bug

17
README
View File

@ -2,21 +2,18 @@ psycopg2 - Python-PostgreSQL Database Adapter
********************************************
psycopg2 is a PostgreSQL database adapter for the Python programming
language. This is version 2, a complete rewrite of the original code to
provide new-style classes for connection and cursor objects and other
sweet candies. Like the original, psycopg2 was written with the aim of
being very small and fast, and stable as a rock.
language. psycopg2 was written with the aim of being very small and fast,
and stable as a rock.
psycopg2 is different from the other database adapter because it was
designed for heavily multi-threaded applications that create and destroy
lots of cursors and make a conspicuous number of concurrent INSERTs or
UPDATEs. psycopg2 also provide full asycronous operations for the really
brave programmer.
UPDATEs. psycopg2 also provide full asycronous operations and support
for coroutine libraries.
There are confirmed reports of psycopg 1.x compiling and running on Linux
and FreeBSD on i386, Solaris, MacOS X and win32 architectures. psycopg2
does not introduce build-wise incompatible changes so it should be able to
compile on all architectures just as its predecessor did.
psycopg2 can compile and run on Linux, FreeBSD, Solaris, MacOS X and
Windows architecture. It supports Python versions from 2.4 onwards and
PostgreSQL versions from 7.4 onwards.
psycopg2 is free software ("free as in freedom" but I like beer too.)
It is licensed under the GNU Lesser General Public License, version 3 or

View File

@ -264,6 +264,26 @@ The ``connection`` class
(0) or closed (1).
.. method:: cancel
Cancel the current database operation.
The method interrupts the processing of the current operation. If no
query is being executed, it does nothing. You can call this function
from a different thread than the one currently executing a database
operation, for instance if you want to cancel a long running query if a
button is pushed in the UI. Interrupting query execution will cause the
cancelled method to raise a
`~psycopg2.extensions.QueryCanceledError`. Note that the termination
of the query is not guaranteed to succeed: see the documentation for
|PQcancel|_.
.. |PQcancel| replace:: `!PQcancel()`
.. _PQcancel: http://www.postgresql.org/docs/8.4/static/libpq-cancel.html#AEN34765
.. versionadded:: 2.3
.. method:: reset
Reset the connection to the default.
@ -428,7 +448,9 @@ The ``connection`` class
.. attribute:: protocol_version
A read-only integer representing frontend/backend protocol being used.
It can be 2 or 3.
Currently Psycopg supports only protocol 3, which allows connection
to PostgreSQL server from version 7.4. Psycopg versions previous than
2.3 support both protocols 2 and 3.
.. seealso:: libpq docs for `PQprotocolVersion()`__ for details.

View File

@ -165,7 +165,7 @@ deal with Python objects adaptation:
.. class:: ISQLQuote(wrapped_object)
Represents the SQL adaptation protocol. Objects conforming this protocol
should implement a `!getquoted()` method.
should implement a `getquoted()` and optionally a `prepare()` method.
Adapters may subclass `!ISQLQuote`, but is not necessary: it is
enough to expose a `!getquoted()` method to be conforming.
@ -180,7 +180,21 @@ deal with Python objects adaptation:
string representing the wrapped object. The `!ISQLQuote`
implementation does nothing.
.. class:: AsIs
.. method:: prepare(conn)
Prepare the adapter for a connection. The method is optional: if
implemented, it will be invoked before `!getquoted()` with the
connection to adapt for as argument.
A conform object can implement this method if the SQL
representation depends on any server parameter, such as the server
version or the ``standard_conforming_string`` setting. Container
objects may store the connection and use it to recursively prepare
contained objects: see the implementation for
``psycopg2.extensions.SQL_IN`` for a simple example.
.. class:: AsIs(object)
Adapter conform to the `ISQLQuote` protocol useful for objects
whose string representation is already valid as SQL representation.
@ -192,7 +206,7 @@ deal with Python objects adaptation:
>>> AsIs(42).getquoted()
'42'
.. class:: QuotedString
.. class:: QuotedString(str)
Adapter conform to the `ISQLQuote` protocol for string-like
objects.
@ -206,7 +220,7 @@ deal with Python objects adaptation:
>>> QuotedString(r"O'Reilly").getquoted()
"'O''Reilly'"
.. class:: Binary
.. class:: Binary(str)
Adapter conform to the `ISQLQuote` protocol for binary objects.

View File

@ -63,7 +63,7 @@ I can't pass an integer or a float parameter to my query: it says *a number is r
>>> cur.execute("INSERT INTO numbers VALUES (%s)", (42,)) # correct
I try to execute a query but it fails with the error *not all arguments converted during string formatting* (or *object does not support indexing*). Why?
Psycopg always require positional arguments to be passed as a tuple, even
Psycopg always require positional arguments to be passed as a sequence, even
when the query takes a single parameter. And remember that to make a
single item tuple in Python you need a comma! See :ref:`query-parameters`.
::
@ -71,6 +71,7 @@ I try to execute a query but it fails with the error *not all arguments converte
>>> cur.execute("INSERT INTO foo VALUES (%s)", "bar") # WRONG
>>> cur.execute("INSERT INTO foo VALUES (%s)", ("bar")) # WRONG
>>> cur.execute("INSERT INTO foo VALUES (%s)", ("bar",)) # correct
>>> cur.execute("INSERT INTO foo VALUES (%s)", ["bar"]) # correct
My database is Unicode, but I receive all the strings as UTF-8 `str`. Can I receive `unicode` objects instead?
The following magic formula will do the trick::

View File

@ -118,12 +118,13 @@ query:
>>> cur.execute("INSERT INTO numbers VALUES (%s)", (42,)) # correct
- For positional variables binding, *the second argument must always be a
tuple*, even if it contains a single variable. And remember that Python
sequence*, even if it contains a single variable. And remember that Python
requires a comma to create a single element tuple::
>>> cur.execute("INSERT INTO foo VALUES (%s)", "bar") # WRONG
>>> cur.execute("INSERT INTO foo VALUES (%s)", ("bar")) # WRONG
>>> cur.execute("INSERT INTO foo VALUES (%s)", ("bar",)) # correct
>>> cur.execute("INSERT INTO foo VALUES (%s)", ["bar"]) # correct
- Only variable values should be bound via this method: it shouldn't be used
to set table or field names. For these elements, ordinary string formatting

View File

@ -144,6 +144,11 @@ class DictRow(list):
x = self._index[x]
return list.__getitem__(self, x)
def __setitem__(self, x, v):
if type(x) != int:
x = self._index[x]
list.__setitem__(self, x, v)
def items(self):
res = []
for n, v in self._index.items():
@ -655,6 +660,7 @@ class HstoreAdapter(object):
parse_unicode = classmethod(parse_unicode)
@classmethod
def get_oids(self, conn_or_curs):
"""Return the oid of the hstore and hstore[] types.
@ -670,13 +676,16 @@ class HstoreAdapter(object):
# Store the transaction status of the connection to revert it after use
conn_status = conn.status
# column typarray not available before PG 8.3
typarray = conn.server_version >= 80300 and "typarray" or "NULL"
# get the oid for the hstore
curs.execute("""\
SELECT t.oid, typarray
SELECT t.oid, %s
FROM pg_type t JOIN pg_namespace ns
ON typnamespace = ns.oid
WHERE typname = 'hstore' and nspname = 'public';
""")
""" % typarray)
oids = curs.fetchone()
# revert the status of the connection as before the command
@ -686,8 +695,6 @@ WHERE typname = 'hstore' and nspname = 'public';
return oids
get_oids = classmethod(get_oids)
def register_hstore(conn_or_curs, globally=False, unicode=False):
"""Register adapter and typecaster for `dict`\-\ |hstore| conversions.

View File

@ -39,18 +39,23 @@
#include "psycopg/adapter_datetime.h"
#include "psycopg/microprotocols_proto.h"
/* the pointer to the datetime module API is initialized by the module init
code, we just need to grab it */
extern HIDDEN PyObject* pyDateTimeModuleP;
extern HIDDEN PyObject *pyDateTypeP;
extern HIDDEN PyObject *pyTimeTypeP;
extern HIDDEN PyObject *pyDateTimeTypeP;
extern HIDDEN PyObject *pyDeltaTypeP;
extern HIDDEN PyObject *pyPsycopgTzModule;
extern HIDDEN PyObject *pyPsycopgTzLOCAL;
int
psyco_adapter_datetime_init(void)
{
Dprintf("psyco_adapter_datetime_init: datetime init");
PyDateTime_IMPORT;
if (!PyDateTimeAPI) {
PyErr_SetString(PyExc_ImportError, "datetime initialization failed");
return -1;
}
return 0;
}
/* datetime_str, datetime_getquoted - return result of quoting */
static PyObject *
@ -298,7 +303,8 @@ psyco_Date(PyObject *self, PyObject *args)
if (!PyArg_ParseTuple(args, "iii", &year, &month, &day))
return NULL;
obj = PyObject_CallFunction(pyDateTypeP, "iii", year, month, day);
obj = PyObject_CallFunction((PyObject*)PyDateTimeAPI->DateType,
"iii", year, month, day);
if (obj) {
res = PyObject_CallFunction((PyObject *)&pydatetimeType,
@ -327,10 +333,10 @@ psyco_Time(PyObject *self, PyObject *args)
second = floor(second);
if (tzinfo == NULL)
obj = PyObject_CallFunction(pyTimeTypeP, "iiii",
obj = PyObject_CallFunction((PyObject*)PyDateTimeAPI->TimeType, "iiii",
hours, minutes, (int)second, (int)round(micro));
else
obj = PyObject_CallFunction(pyTimeTypeP, "iiiiO",
obj = PyObject_CallFunction((PyObject*)PyDateTimeAPI->TimeType, "iiiiO",
hours, minutes, (int)second, (int)round(micro), tzinfo);
if (obj) {
@ -361,11 +367,13 @@ psyco_Timestamp(PyObject *self, PyObject *args)
second = floor(second);
if (tzinfo == NULL)
obj = PyObject_CallFunction(pyDateTimeTypeP, "iiiiiii",
obj = PyObject_CallFunction((PyObject*)PyDateTimeAPI->DateTimeType,
"iiiiiii",
year, month, day, hour, minute, (int)second,
(int)round(micro));
else
obj = PyObject_CallFunction(pyDateTimeTypeP, "iiiiiiiO",
obj = PyObject_CallFunction((PyObject*)PyDateTimeAPI->DateTimeType,
"iiiiiiiO",
year, month, day, hour, minute, (int)second,
(int)round(micro), tzinfo);
@ -462,7 +470,7 @@ psyco_DateFromPy(PyObject *self, PyObject *args)
{
PyObject *obj;
if (!PyArg_ParseTuple(args, "O!", pyDateTypeP, &obj))
if (!PyArg_ParseTuple(args, "O!", PyDateTimeAPI->DateType, &obj))
return NULL;
return PyObject_CallFunction((PyObject *)&pydatetimeType, "Oi", obj,
@ -474,7 +482,7 @@ psyco_TimeFromPy(PyObject *self, PyObject *args)
{
PyObject *obj;
if (!PyArg_ParseTuple(args, "O!", pyTimeTypeP, &obj))
if (!PyArg_ParseTuple(args, "O!", PyDateTimeAPI->TimeType, &obj))
return NULL;
return PyObject_CallFunction((PyObject *)&pydatetimeType, "Oi", obj,
@ -486,7 +494,7 @@ psyco_TimestampFromPy(PyObject *self, PyObject *args)
{
PyObject *obj;
if (!PyArg_ParseTuple(args, "O!", pyDateTimeTypeP, &obj))
if (!PyArg_ParseTuple(args, "O!", PyDateTimeAPI->DateTimeType, &obj))
return NULL;
return PyObject_CallFunction((PyObject *)&pydatetimeType, "Oi", obj,
@ -498,7 +506,7 @@ psyco_IntervalFromPy(PyObject *self, PyObject *args)
{
PyObject *obj;
if (!PyArg_ParseTuple(args, "O!", pyDeltaTypeP, &obj))
if (!PyArg_ParseTuple(args, "O!", PyDateTimeAPI->DeltaType, &obj))
return NULL;
return PyObject_CallFunction((PyObject *)&pydatetimeType, "Oi", obj,

View File

@ -53,6 +53,8 @@ typedef struct {
/* functions exported to psycopgmodule.c */
#ifdef PSYCOPG_DEFAULT_PYDATETIME
HIDDEN int psyco_adapter_datetime_init(void);
HIDDEN PyObject *psyco_Date(PyObject *module, PyObject *args);
#define psyco_Date_doc \
"Date(year, month, day) -> new date\n\n" \

View File

@ -37,9 +37,17 @@
#include "psycopg/adapter_mxdatetime.h"
#include "psycopg/microprotocols_proto.h"
/* the pointer to the mxDateTime API is initialized by the module init code,
we just need to grab it */
extern HIDDEN mxDateTimeModule_APIObject *mxDateTimeP;
int
psyco_adapter_mxdatetime_init(void)
{
Dprintf("psyco_adapter_mxdatetime_init: mx.DateTime init");
if(mxDateTime_ImportModuleAndAPI()) {
PyErr_SetString(PyExc_ImportError, "mx.DateTime initialization failed");
return -1;
}
return 0;
}
/* mxdatetime_str, mxdatetime_getquoted - return result of quoting */
@ -300,7 +308,7 @@ psyco_Date(PyObject *self, PyObject *args)
if (!PyArg_ParseTuple(args, "iii", &year, &month, &day))
return NULL;
mx = mxDateTimeP->DateTime_FromDateAndTime(year, month, day, 0, 0, 0.0);
mx = mxDateTime.DateTime_FromDateAndTime(year, month, day, 0, 0, 0.0);
if (mx == NULL) return NULL;
res = PyObject_CallFunction((PyObject *)&mxdatetimeType, "Oi", mx,
@ -319,7 +327,7 @@ psyco_Time(PyObject *self, PyObject *args)
if (!PyArg_ParseTuple(args, "iid", &hours, &minutes, &seconds))
return NULL;
mx = mxDateTimeP->DateTimeDelta_FromTime(hours, minutes, seconds);
mx = mxDateTime.DateTimeDelta_FromTime(hours, minutes, seconds);
if (mx == NULL) return NULL;
res = PyObject_CallFunction((PyObject *)&mxdatetimeType, "Oi", mx,
@ -340,7 +348,7 @@ psyco_Timestamp(PyObject *self, PyObject *args)
&hour, &minute, &second))
return NULL;
mx = mxDateTimeP->DateTime_FromDateAndTime(year, month, day,
mx = mxDateTime.DateTime_FromDateAndTime(year, month, day,
hour, minute, second);
if (mx == NULL) return NULL;
@ -359,7 +367,7 @@ psyco_DateFromTicks(PyObject *self, PyObject *args)
if (!PyArg_ParseTuple(args,"d", &ticks))
return NULL;
if (!(mx = mxDateTimeP->DateTime_FromTicks(ticks)))
if (!(mx = mxDateTime.DateTime_FromTicks(ticks)))
return NULL;
res = PyObject_CallFunction((PyObject *)&mxdatetimeType, "Oi", mx,
@ -377,10 +385,10 @@ psyco_TimeFromTicks(PyObject *self, PyObject *args)
if (!PyArg_ParseTuple(args,"d", &ticks))
return NULL;
if (!(dt = mxDateTimeP->DateTime_FromTicks(ticks)))
if (!(dt = mxDateTime.DateTime_FromTicks(ticks)))
return NULL;
if (!(mx = mxDateTimeP->DateTimeDelta_FromDaysAndSeconds(
if (!(mx = mxDateTime.DateTimeDelta_FromDaysAndSeconds(
0, ((mxDateTimeObject*)dt)->abstime)))
{
Py_DECREF(dt);
@ -403,7 +411,7 @@ psyco_TimestampFromTicks(PyObject *self, PyObject *args)
if (!PyArg_ParseTuple(args, "d", &ticks))
return NULL;
if (!(mx = mxDateTimeP->DateTime_FromTicks(ticks)))
if (!(mx = mxDateTime.DateTime_FromTicks(ticks)))
return NULL;
res = PyObject_CallFunction((PyObject *)&mxdatetimeType, "Oi", mx,
@ -419,7 +427,7 @@ psyco_DateFromMx(PyObject *self, PyObject *args)
{
PyObject *mx;
if (!PyArg_ParseTuple(args, "O!", mxDateTimeP->DateTime_Type, &mx))
if (!PyArg_ParseTuple(args, "O!", mxDateTime.DateTime_Type, &mx))
return NULL;
return PyObject_CallFunction((PyObject *)&mxdatetimeType, "Oi", mx,
@ -431,7 +439,7 @@ psyco_TimeFromMx(PyObject *self, PyObject *args)
{
PyObject *mx;
if (!PyArg_ParseTuple(args, "O!", mxDateTimeP->DateTimeDelta_Type, &mx))
if (!PyArg_ParseTuple(args, "O!", mxDateTime.DateTimeDelta_Type, &mx))
return NULL;
return PyObject_CallFunction((PyObject *)&mxdatetimeType, "Oi", mx,
@ -443,7 +451,7 @@ psyco_TimestampFromMx(PyObject *self, PyObject *args)
{
PyObject *mx;
if (!PyArg_ParseTuple(args, "O!", mxDateTimeP->DateTime_Type, &mx))
if (!PyArg_ParseTuple(args, "O!", mxDateTime.DateTime_Type, &mx))
return NULL;
return PyObject_CallFunction((PyObject *)&mxdatetimeType, "Oi", mx,
@ -455,7 +463,7 @@ psyco_IntervalFromMx(PyObject *self, PyObject *args)
{
PyObject *mx;
if (!PyArg_ParseTuple(args, "O!", mxDateTimeP->DateTime_Type, &mx))
if (!PyArg_ParseTuple(args, "O!", mxDateTime.DateTime_Type, &mx))
return NULL;
return PyObject_CallFunction((PyObject *)&mxdatetimeType, "Oi", mx,

View File

@ -78,6 +78,8 @@ HIDDEN PyObject *psyco_TimestampFromTicks(PyObject *module, PyObject *args);
#endif /* PSYCOPG_DEFAULT_MXDATETIME */
HIDDEN int psyco_adapter_mxdatetime_init(void);
HIDDEN PyObject *psyco_DateFromMx(PyObject *module, PyObject *args);
#define psyco_DateFromMx_doc \
"DateFromMx(mx) -> new date"

View File

@ -45,7 +45,6 @@ extern "C" {
/* async connection building statuses */
#define CONN_STATUS_CONNECTING 20
#define CONN_STATUS_DATESTYLE 21
#define CONN_STATUS_CLIENT_ENCODING 22
/* async query execution status */
#define ASYNC_DONE 0
@ -65,9 +64,15 @@ extern "C" {
later change it, she must know what she's doing... these are the queries we
need to issue */
#define psyco_datestyle "SET DATESTYLE TO 'ISO'"
#define psyco_client_encoding "SHOW client_encoding"
#define psyco_transaction_isolation "SHOW default_transaction_isolation"
/* possible values for isolation_level */
typedef enum {
ISOLATION_LEVEL_AUTOCOMMIT = 0,
ISOLATION_LEVEL_READ_COMMITTED = 1,
ISOLATION_LEVEL_SERIALIZABLE = 2,
} conn_isolation_level_t;
extern HIDDEN PyTypeObject connectionType;
struct connectionObject_notice {
@ -96,6 +101,7 @@ typedef struct {
int server_version; /* server version */
PGconn *pgconn; /* the postgresql connection */
PGcancel *cancel; /* the cancellation structure */
PyObject *async_cursor; /* a cursor executing an asynchronous query */
int async_status; /* asynchronous execution status */
@ -118,7 +124,6 @@ typedef struct {
/* C-callable functions in connection_int.c and connection_ext.c */
HIDDEN int conn_get_standard_conforming_strings(PGconn *pgconn);
HIDDEN char *conn_get_encoding(PGresult *pgres);
HIDDEN int conn_get_isolation_level(PGresult *pgres);
HIDDEN int conn_get_protocol_version(PGconn *pgconn);
HIDDEN void conn_notice_process(connectionObject *self);

View File

@ -46,25 +46,16 @@ conn_notice_callback(void *args, const char *message)
Dprintf("conn_notice_callback: %s", message);
/* unfortunately the old protocol return COPY FROM errors only as notices,
so we need to filter them looking for such errors (but we do it
only if the protocol if <3, else we don't need that)
NOTE: if we get here and the connection is unlocked then there is a
/* NOTE: if we get here and the connection is unlocked then there is a
problem but this should happen because the notice callback is only
called from libpq and when we're inside libpq the connection is usually
locked.
*/
if (self->protocol < 3 && strncmp(message, "ERROR", 5) == 0)
pq_set_critical(self, message);
else {
notice = (struct connectionObject_notice *)
malloc(sizeof(struct connectionObject_notice));
notice->message = strdup(message);
notice->next = self->notice_pending;
self->notice_pending = notice;
}
notice = (struct connectionObject_notice *)
malloc(sizeof(struct connectionObject_notice));
notice->message = strdup(message);
notice->next = self->notice_pending;
self->notice_pending = notice;
}
void
@ -222,22 +213,36 @@ conn_get_standard_conforming_strings(PGconn *pgconn)
return equote;
}
char *
conn_get_encoding(PGresult *pgres)
/* Return a string containing the client_encoding setting.
*
* Return a new string allocated by malloc(): use free() to free it.
* Return NULL in case of failure.
*/
static char *
conn_get_encoding(PGconn *pgconn)
{
char *tmp, *encoding;
size_t i;
const char *tmp, *i;
char *encoding, *j;
tmp = PQparameterStatus(pgconn, "client_encoding");
Dprintf("conn_connect: client encoding: %s", tmp ? tmp : "(none)");
if (!tmp) {
PyErr_SetString(OperationalError,
"server didn't return client encoding");
return NULL;
}
tmp = PQgetvalue(pgres, 0, 0);
encoding = malloc(strlen(tmp)+1);
if (encoding == NULL) {
PyErr_NoMemory();
IFCLEARPGRES(pgres);
return NULL;
}
for (i=0 ; i < strlen(tmp) ; i++)
encoding[i] = toupper(tmp[i]);
encoding[i] = '\0';
/* return in uppercase */
i = tmp;
j = encoding;
while (*i) { *j++ = toupper(*i++); }
*j = '\0';
return encoding;
}
@ -251,11 +256,11 @@ conn_get_isolation_level(PGresult *pgres)
char *isolation_level = PQgetvalue(pgres, 0, 0);
if ((strncmp(lvl1a, isolation_level, strlen(isolation_level)) == 0)
|| (strncmp(lvl1b, isolation_level, strlen(isolation_level)) == 0))
rv = 1;
if ((strcmp(lvl1b, isolation_level) == 0) /* most likely */
|| (strcmp(lvl1a, isolation_level) == 0))
rv = ISOLATION_LEVEL_READ_COMMITTED;
else /* if it's not one of the lower ones, it's SERIALIZABLE */
rv = 2;
rv = ISOLATION_LEVEL_SERIALIZABLE;
CLEARPGRES(pgres);
@ -266,13 +271,7 @@ int
conn_get_protocol_version(PGconn *pgconn)
{
int ret;
#ifdef HAVE_PQPROTOCOL3
ret = PQprotocolVersion(pgconn);
#else
ret = 2;
#endif
Dprintf("conn_connect: using protocol %d", ret);
return ret;
}
@ -283,6 +282,28 @@ conn_get_server_version(PGconn *pgconn)
return (int)PQserverVersion(pgconn);
}
PGcancel *
conn_get_cancel(PGconn *pgconn)
{
return PQgetCancel(pgconn);
}
/* Return 1 if the server datestyle allows us to work without problems,
0 if it needs to be set to something better, e.g. ISO. */
static int
conn_is_datestyle_ok(PGconn *pgconn)
{
const char *ds;
ds = PQparameterStatus(pgconn, "DateStyle");
Dprintf("conn_connect: DateStyle %s", ds);
/* Return true if ds starts with "ISO"
* e.g. "ISO, DMY" is fine, "German" not. */
return (ds[0] == 'I' && ds[1] == 'S' && ds[2] == 'O');
}
/* conn_setup - setup and read basic information about the connection */
@ -295,6 +316,21 @@ conn_setup(connectionObject *self, PGconn *pgconn)
self->equote = conn_get_standard_conforming_strings(pgconn);
self->server_version = conn_get_server_version(pgconn);
self->protocol = conn_get_protocol_version(self->pgconn);
if (3 != self->protocol) {
PyErr_SetString(InterfaceError, "only protocol 3 supported");
return -1;
}
/* conn_get_encoding returns a malloc'd string */
self->encoding = conn_get_encoding(pgconn);
if (self->encoding == NULL) {
return -1;
}
self->cancel = conn_get_cancel(self->pgconn);
if (self->cancel == NULL) {
PyErr_SetString(OperationalError, "can't get cancellation key");
return -1;
}
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&self->lock);
@ -306,50 +342,26 @@ conn_setup(connectionObject *self, PGconn *pgconn)
return -1;
}
if (!green) {
Py_UNBLOCK_THREADS;
pgres = PQexec(pgconn, psyco_datestyle);
Py_BLOCK_THREADS;
} else {
pgres = psyco_exec_green(self, psyco_datestyle);
}
if (!conn_is_datestyle_ok(self->pgconn)) {
if (!green) {
Py_UNBLOCK_THREADS;
Dprintf("conn_connect: exec query \"%s\";", psyco_datestyle);
pgres = PQexec(pgconn, psyco_datestyle);
Py_BLOCK_THREADS;
} else {
pgres = psyco_exec_green(self, psyco_datestyle);
}
if (pgres == NULL || PQresultStatus(pgres) != PGRES_COMMAND_OK ) {
PyErr_SetString(OperationalError, "can't set datestyle to ISO");
IFCLEARPGRES(pgres);
Py_UNBLOCK_THREADS;
pthread_mutex_unlock(&self->lock);
Py_BLOCK_THREADS;
return -1;
if (pgres == NULL || PQresultStatus(pgres) != PGRES_COMMAND_OK ) {
PyErr_SetString(OperationalError, "can't set datestyle to ISO");
IFCLEARPGRES(pgres);
Py_UNBLOCK_THREADS;
pthread_mutex_unlock(&self->lock);
Py_BLOCK_THREADS;
return -1;
}
CLEARPGRES(pgres);
}
CLEARPGRES(pgres);
if (!green) {
Py_UNBLOCK_THREADS;
pgres = PQexec(pgconn, psyco_client_encoding);
Py_BLOCK_THREADS;
} else {
pgres = psyco_exec_green(self, psyco_client_encoding);
}
if (pgres == NULL || PQresultStatus(pgres) != PGRES_TUPLES_OK) {
PyErr_SetString(OperationalError, "can't fetch client_encoding");
IFCLEARPGRES(pgres);
Py_UNBLOCK_THREADS;
pthread_mutex_unlock(&self->lock);
Py_BLOCK_THREADS;
return -1;
}
/* conn_get_encoding returns a malloc'd string */
self->encoding = conn_get_encoding(pgres);
if (self->encoding == NULL) {
Py_UNBLOCK_THREADS;
pthread_mutex_unlock(&self->lock);
Py_BLOCK_THREADS;
return -1;
}
CLEARPGRES(pgres);
if (!green) {
Py_UNBLOCK_THREADS;
@ -636,22 +648,45 @@ _conn_poll_setup_async(connectionObject *self)
self->equote = conn_get_standard_conforming_strings(self->pgconn);
self->protocol = conn_get_protocol_version(self->pgconn);
self->server_version = conn_get_server_version(self->pgconn);
if (3 != self->protocol) {
PyErr_SetString(InterfaceError, "only protocol 3 supported");
break;
}
/* conn_get_encoding returns a malloc'd string */
self->encoding = conn_get_encoding(self->pgconn);
if (self->encoding == NULL) {
break;
}
self->cancel = conn_get_cancel(self->pgconn);
if (self->cancel == NULL) {
PyErr_SetString(OperationalError, "can't get cancellation key");
break;
}
/* asynchronous connections always use isolation level 0, the user is
* expected to manage the transactions himself, by sending
* (asynchronously) BEGIN and COMMIT statements.
*/
self->isolation_level = 0;
self->isolation_level = ISOLATION_LEVEL_AUTOCOMMIT;
Dprintf("conn_poll: status -> CONN_STATUS_DATESTYLE");
self->status = CONN_STATUS_DATESTYLE;
if (0 == pq_send_query(self, psyco_datestyle)) {
PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn));
break;
/* If the datestyle is ISO or anything else good,
* we can skip the CONN_STATUS_DATESTYLE step. */
if (!conn_is_datestyle_ok(self->pgconn)) {
Dprintf("conn_poll: status -> CONN_STATUS_DATESTYLE");
self->status = CONN_STATUS_DATESTYLE;
if (0 == pq_send_query(self, psyco_datestyle)) {
PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn));
break;
}
Dprintf("conn_poll: async_status -> ASYNC_WRITE");
self->async_status = ASYNC_WRITE;
res = PSYCO_POLL_WRITE;
}
else {
Dprintf("conn_poll: status -> CONN_STATUS_READY");
self->status = CONN_STATUS_READY;
res = PSYCO_POLL_OK;
}
Dprintf("conn_poll: async_status -> ASYNC_WRITE");
self->async_status = ASYNC_WRITE;
res = PSYCO_POLL_WRITE;
break;
case CONN_STATUS_DATESTYLE:
@ -665,40 +700,12 @@ _conn_poll_setup_async(connectionObject *self)
}
CLEARPGRES(pgres);
Dprintf("conn_poll: status -> CONN_STATUS_CLIENT_ENCODING");
self->status = CONN_STATUS_CLIENT_ENCODING;
if (0 == pq_send_query(self, psyco_client_encoding)) {
PyErr_SetString(OperationalError, PQerrorMessage(self->pgconn));
break;
}
Dprintf("conn_poll: async_status -> ASYNC_WRITE");
self->async_status = ASYNC_WRITE;
res = PSYCO_POLL_WRITE;
}
break;
case CONN_STATUS_CLIENT_ENCODING:
res = _conn_poll_query(self);
if (res == PSYCO_POLL_OK) {
res = PSYCO_POLL_ERROR;
pgres = pq_get_last_result(self);
if (pgres == NULL || PQresultStatus(pgres) != PGRES_TUPLES_OK) {
PyErr_SetString(OperationalError, "can't fetch client_encoding");
break;
}
/* conn_get_encoding returns a malloc'd string */
self->encoding = conn_get_encoding(pgres);
CLEARPGRES(pgres);
if (self->encoding == NULL) { break; }
Dprintf("conn_poll: status -> CONN_STATUS_READY");
self->status = CONN_STATUS_READY;
res = PSYCO_POLL_OK;
}
break;
}
return res;
}
@ -730,7 +737,6 @@ conn_poll(connectionObject *self)
break;
case CONN_STATUS_DATESTYLE:
case CONN_STATUS_CLIENT_ENCODING:
res = _conn_poll_setup_async(self);
break;
@ -793,8 +799,10 @@ conn_close(connectionObject *self)
if (self->pgconn) {
PQfinish(self->pgconn);
PQfreeCancel(self->cancel);
Dprintf("conn_close: PQfinish called");
self->pgconn = NULL;
self->cancel = NULL;
}
pthread_mutex_unlock(&self->lock);
@ -832,20 +840,21 @@ conn_switch_isolation_level(connectionObject *self, int level)
char *error = NULL;
int res = 0;
/* if the current isolation level is equal to the requested one don't switch */
if (self->isolation_level == level) return 0;
Py_BEGIN_ALLOW_THREADS;
pthread_mutex_lock(&self->lock);
/* if the current isolation level is > 0 we need to abort the current
transaction before changing; that all folks! */
if (self->isolation_level != level && self->isolation_level > 0) {
res = pq_abort_locked(self, &pgres, &error, &_save);
}
self->isolation_level = level;
/* if the current isolation level is equal to the requested one don't switch */
if (self->isolation_level != level) {
Dprintf("conn_switch_isolation_level: switched to level %d", level);
/* if the current isolation level is > 0 we need to abort the current
transaction before changing; that all folks! */
if (self->isolation_level != ISOLATION_LEVEL_AUTOCOMMIT) {
res = pq_abort_locked(self, &pgres, &error, &_save);
}
self->isolation_level = level;
Dprintf("conn_switch_isolation_level: switched to level %d", level);
}
pthread_mutex_unlock(&self->lock);
Py_END_ALLOW_THREADS;

View File

@ -209,7 +209,7 @@ psyco_conn_tpc_begin(connectionObject *self, PyObject *args)
}
/* two phase commit and autocommit make no point */
if (self->isolation_level == 0) {
if (self->isolation_level == ISOLATION_LEVEL_AUTOCOMMIT) {
PyErr_SetString(ProgrammingError,
"tpc_begin can't be called in autocommit mode");
goto exit;
@ -410,17 +410,14 @@ psyco_conn_set_isolation_level(connectionObject *self, PyObject *args)
if (level < 0 || level > 2) {
PyErr_SetString(PyExc_ValueError,
"isolation level out of bounds (0,3)");
"isolation level must be between 0 and 2");
return NULL;
}
if (conn_switch_isolation_level(self, level) < 0) {
PyErr_SetString(OperationalError,
PQerrorMessage(self->pgconn));
return NULL;
}
Py_INCREF(Py_None);
return Py_None;
}
@ -700,6 +697,38 @@ psyco_conn_isexecuting(connectionObject *self)
return Py_False;
}
/* extension: cancel - cancel the current operation */
#define psyco_conn_cancel_doc \
"cancel() -- cancel the current operation"
static PyObject *
psyco_conn_cancel(connectionObject *self)
{
char errbuf[256];
EXC_IF_CONN_CLOSED(self);
EXC_IF_TPC_PREPARED(self, cancel);
/* do not allow cancellation while the connection is being built */
Dprintf("psyco_conn_cancel: cancelling with key %p", self->cancel);
if (self->status != CONN_STATUS_READY &&
self->status != CONN_STATUS_BEGIN) {
PyErr_SetString(OperationalError,
"asynchronous connection attempt underway");
return NULL;
}
if (PQcancel(self->cancel, errbuf, sizeof(errbuf)) == 0) {
Dprintf("psyco_conn_cancel: cancelling failed: %s", errbuf);
PyErr_SetString(OperationalError, errbuf);
return NULL;
}
Py_INCREF(Py_None);
return Py_None;
}
#endif /* PSYCOPG_EXTENSIONS */
@ -750,6 +779,8 @@ static struct PyMethodDef connectionObject_methods[] = {
METH_NOARGS, psyco_conn_fileno_doc},
{"isexecuting", (PyCFunction)psyco_conn_isexecuting,
METH_NOARGS, psyco_conn_isexecuting_doc},
{"cancel", (PyCFunction)psyco_conn_cancel,
METH_NOARGS, psyco_conn_cancel_doc},
#endif
{NULL}
};
@ -780,7 +811,7 @@ static struct PyMemberDef connectionObject_members[] = {
"A set of typecasters to convert binary values."},
{"protocol_version", T_INT,
offsetof(connectionObject, protocol), RO,
"Protocol version (2 or 3) used for this connection."},
"Protocol version used for this connection. Currently always 3."},
{"server_version", T_INT,
offsetof(connectionObject, server_version), RO,
"Server version."},
@ -830,6 +861,7 @@ connection_setup(connectionObject *self, const char *dsn, long int async)
self->async_cursor = NULL;
self->async_status = ASYNC_DONE;
self->pgconn = NULL;
self->cancel = NULL;
self->mark = 0;
self->string_types = PyDict_New();
self->binary_types = PyDict_New();

View File

@ -476,7 +476,7 @@ psyco_curs_execute(cursorObject *self, PyObject *args, PyObject *kwargs)
NULL, NULL);
return NULL;
}
if (self->conn->isolation_level == 0) {
if (self->conn->isolation_level == ISOLATION_LEVEL_AUTOCOMMIT) {
psyco_set_error(ProgrammingError, (PyObject*)self,
"can't use a named cursor outside of transactions", NULL, NULL);
return NULL;

View File

@ -129,7 +129,7 @@ lobject_close_locked(lobjectObject *self, char **error)
{
int retvalue;
if (self->conn->isolation_level == 0 ||
if (self->conn->isolation_level == ISOLATION_LEVEL_AUTOCOMMIT ||
self->conn->mark != self->mark ||
self->fd == -1)
return 0;

View File

@ -55,7 +55,7 @@ psyco_lobj_close(lobjectObject *self, PyObject *args)
closing the current transaction is equivalent to close all the
opened large objects */
if (!lobject_is_closed(self)
&& self->conn->isolation_level > 0
&& self->conn->isolation_level != ISOLATION_LEVEL_AUTOCOMMIT
&& self->conn->mark == self->mark)
{
Dprintf("psyco_lobj_close: closing lobject at %p", self);
@ -300,7 +300,7 @@ lobject_setup(lobjectObject *self, connectionObject *conn,
{
Dprintf("lobject_setup: init lobject object at %p", self);
if (conn->isolation_level == 0) {
if (conn->isolation_level == ISOLATION_LEVEL_AUTOCOMMIT) {
psyco_set_error(ProgrammingError, (PyObject*)self,
"can't use a lobject outside of transactions", NULL, NULL);
return -1;

View File

@ -173,11 +173,9 @@ pq_raise(connectionObject *conn, cursorObject *curs, PGresult *pgres)
if (pgres) {
err = PQresultErrorMessage(pgres);
#ifdef HAVE_PQPROTOCOL3
if (err != NULL && conn->protocol == 3) {
if (err != NULL) {
code = PQresultErrorField(pgres, PG_DIAG_SQLSTATE);
}
#endif
}
if (err == NULL)
err = PQerrorMessage(conn->pgconn);
@ -196,25 +194,6 @@ pq_raise(connectionObject *conn, cursorObject *curs, PGresult *pgres)
exc = exception_from_sqlstate(code);
}
/* if exc is still NULL psycopg was not built with HAVE_PQPROTOCOL3 or the
connection is using protocol 2: in both cases we default to comparing
error messages */
if (exc == NULL) {
if (!strncmp(err, "ERROR: Cannot insert a duplicate key", 37)
|| !strncmp(err, "ERROR: ExecAppend: Fail to add null", 36)
|| strstr(err, "referential integrity violation"))
exc = IntegrityError;
else if (strstr(err, "could not serialize") ||
strstr(err, "deadlock detected"))
#ifdef PSYCOPG_EXTENSIONS
exc = TransactionRollbackError;
#else
exc = OperationalError;
#endif
else
exc = ProgrammingError;
}
/* try to remove the initial "ERROR: " part from the postgresql error */
err2 = strip_severity(err);
@ -430,7 +409,8 @@ pq_begin_locked(connectionObject *conn, PGresult **pgres, char **error,
Dprintf("pq_begin_locked: pgconn = %p, isolevel = %ld, status = %d",
conn->pgconn, conn->isolation_level, conn->status);
if (conn->isolation_level == 0 || conn->status != CONN_STATUS_READY) {
if (conn->isolation_level == ISOLATION_LEVEL_AUTOCOMMIT
|| conn->status != CONN_STATUS_READY) {
Dprintf("pq_begin_locked: transaction in progress");
return 0;
}
@ -459,7 +439,8 @@ pq_commit(connectionObject *conn)
Dprintf("pq_commit: pgconn = %p, isolevel = %ld, status = %d",
conn->pgconn, conn->isolation_level, conn->status);
if (conn->isolation_level == 0 || conn->status != CONN_STATUS_BEGIN) {
if (conn->isolation_level == ISOLATION_LEVEL_AUTOCOMMIT
|| conn->status != CONN_STATUS_BEGIN) {
Dprintf("pq_commit: no transaction to commit");
return 0;
}
@ -494,7 +475,8 @@ pq_abort_locked(connectionObject *conn, PGresult **pgres, char **error,
Dprintf("pq_abort_locked: pgconn = %p, isolevel = %ld, status = %d",
conn->pgconn, conn->isolation_level, conn->status);
if (conn->isolation_level == 0 || conn->status != CONN_STATUS_BEGIN) {
if (conn->isolation_level == ISOLATION_LEVEL_AUTOCOMMIT
|| conn->status != CONN_STATUS_BEGIN) {
Dprintf("pq_abort_locked: no transaction to abort");
return 0;
}
@ -522,7 +504,8 @@ pq_abort(connectionObject *conn)
Dprintf("pq_abort: pgconn = %p, isolevel = %ld, status = %d",
conn->pgconn, conn->isolation_level, conn->status);
if (conn->isolation_level == 0 || conn->status != CONN_STATUS_BEGIN) {
if (conn->isolation_level == ISOLATION_LEVEL_AUTOCOMMIT
|| conn->status != CONN_STATUS_BEGIN) {
Dprintf("pq_abort: no transaction to abort");
return 0;
}
@ -563,7 +546,8 @@ pq_reset_locked(connectionObject *conn, PGresult **pgres, char **error,
conn->mark += 1;
if (conn->isolation_level > 0 && conn->status == CONN_STATUS_BEGIN) {
if (conn->isolation_level != ISOLATION_LEVEL_AUTOCOMMIT
&& conn->status == CONN_STATUS_BEGIN) {
retvalue = pq_execute_command_locked(conn, "ABORT", pgres, error, tstate);
if (retvalue != 0) return retvalue;
}
@ -1063,7 +1047,6 @@ _pq_fetch_tuples(cursorObject *curs)
Py_END_ALLOW_THREADS;
}
#ifdef HAVE_PQPROTOCOL3
static int
_pq_copy_in_v3(cursorObject *curs)
{
@ -1155,57 +1138,7 @@ exit:
Py_XDECREF(size);
return (error == 0 ? 1 : -1);
}
#endif
static int
_pq_copy_in(cursorObject *curs)
{
/* COPY FROM implementation when protocol 3 is not available: this
function can't fail but the backend will send an ERROR notice that will
be catched by our notice collector */
PyObject *o, *func = NULL;
int ret = -1;
if (!(func = PyObject_GetAttrString(curs->copyfile, "readline"))) {
Dprintf("_pq_copy_in: can't get o.readline");
goto exit;
}
while (1) {
int rv;
o = PyObject_CallFunction(func, NULL);
if (o == NULL) goto exit;
if (o == Py_None || PyString_GET_SIZE(o) == 0) break;
Py_BEGIN_ALLOW_THREADS;
rv = PQputline(curs->conn->pgconn, PyString_AS_STRING(o));
Py_END_ALLOW_THREADS;
Py_DECREF(o);
if (0 != rv) goto exit;
}
Py_XDECREF(o);
Py_BEGIN_ALLOW_THREADS;
PQputline(curs->conn->pgconn, "\\.\n");
PQendcopy(curs->conn->pgconn);
Py_END_ALLOW_THREADS;
/* if for some reason we're using a protocol 3 libpq to connect to a
protocol 2 backend we still need to cycle on the result set */
IFCLEARPGRES(curs->pgres);
while ((curs->pgres = PQgetResult(curs->conn->pgconn)) != NULL) {
if (PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR)
pq_raise(curs->conn, curs, NULL);
IFCLEARPGRES(curs->pgres);
}
ret = 1;
exit:
Py_XDECREF(func);
return ret;
}
#ifdef HAVE_PQPROTOCOL3
static int
_pq_copy_out_v3(cursorObject *curs)
{
@ -1258,66 +1191,6 @@ exit:
Py_XDECREF(func);
return ret;
}
#endif
static int
_pq_copy_out(cursorObject *curs)
{
PyObject *tmp = NULL, *func;
char buffer[4096];
int status = -1, ll = 0;
Py_ssize_t len;
if (!(func = PyObject_GetAttrString(curs->copyfile, "write"))) {
Dprintf("_pq_copy_out: can't get o.write");
goto exit;
}
while (1) {
Py_BEGIN_ALLOW_THREADS;
status = PQgetline(curs->conn->pgconn, buffer, 4096);
Py_END_ALLOW_THREADS;
if (status == 0) {
if (!ll && buffer[0] == '\\' && buffer[1] == '.') break;
len = (Py_ssize_t) strlen(buffer);
buffer[len++] = '\n';
ll = 0;
}
else if (status == 1) {
len = 4096-1;
ll = 1;
}
else {
goto exit;
}
tmp = PyObject_CallFunction(func, "s#", buffer, len);
if (tmp == NULL) {
goto exit;
} else {
Py_DECREF(tmp);
}
}
status = 1;
if (PQendcopy(curs->conn->pgconn) != 0)
status = -1;
/* if for some reason we're using a protocol 3 libpq to connect to a
protocol 2 backend we still need to cycle on the result set */
IFCLEARPGRES(curs->pgres);
while ((curs->pgres = PQgetResult(curs->conn->pgconn)) != NULL) {
if (PQresultStatus(curs->pgres) == PGRES_FATAL_ERROR)
pq_raise(curs->conn, curs, NULL);
IFCLEARPGRES(curs->pgres);
}
exit:
Py_XDECREF(func);
return status;
}
int
pq_fetch(cursorObject *curs)
@ -1368,12 +1241,7 @@ pq_fetch(cursorObject *curs)
case PGRES_COPY_OUT:
Dprintf("pq_fetch: data from a COPY TO (no tuples)");
#ifdef HAVE_PQPROTOCOL3
if (curs->conn->protocol == 3)
ex = _pq_copy_out_v3(curs);
else
#endif
ex = _pq_copy_out(curs);
ex = _pq_copy_out_v3(curs);
curs->rowcount = -1;
/* error caught by out glorious notice handler */
if (PyErr_Occurred()) ex = -1;
@ -1382,12 +1250,7 @@ pq_fetch(cursorObject *curs)
case PGRES_COPY_IN:
Dprintf("pq_fetch: data from a COPY FROM (no tuples)");
#ifdef HAVE_PQPROTOCOL3
if (curs->conn->protocol == 3)
ex = _pq_copy_in_v3(curs);
else
#endif
ex = _pq_copy_in(curs);
ex = _pq_copy_in_v3(curs);
curs->rowcount = -1;
/* error caught by out glorious notice handler */
if (PyErr_Occurred()) ex = -1;

View File

@ -48,21 +48,18 @@
#include "psycopg/adapter_asis.h"
#include "psycopg/adapter_list.h"
#include "psycopg/typecast_binary.h"
#include "psycopg/typecast_datetime.h"
#ifdef HAVE_MXDATETIME
#include <mxDateTime.h>
#include "psycopg/adapter_mxdatetime.h"
HIDDEN mxDateTimeModule_APIObject *mxDateTimeP = NULL;
#include "psycopg/typecast_mxdatetime.h"
#endif
/* some module-level variables, like the datetime module */
#include <datetime.h>
#include "psycopg/adapter_datetime.h"
HIDDEN PyObject *pyDateTimeModuleP = NULL;
HIDDEN PyObject *pyDateTypeP = NULL;
HIDDEN PyObject *pyTimeTypeP = NULL;
HIDDEN PyObject *pyDateTimeTypeP = NULL;
HIDDEN PyObject *pyDeltaTypeP = NULL;
/* pointers to the psycopg.tz classes */
HIDDEN PyObject *pyPsycopgTzModule = NULL;
@ -312,20 +309,20 @@ psyco_adapters_init(PyObject *mod)
/* the module has already been initialized, so we can obtain the callable
objects directly from its dictionary :) */
call = PyMapping_GetItemString(mod, "DateFromPy");
microprotocols_add((PyTypeObject*)pyDateTypeP, NULL, call);
microprotocols_add(PyDateTimeAPI->DateType, NULL, call);
call = PyMapping_GetItemString(mod, "TimeFromPy");
microprotocols_add((PyTypeObject*)pyTimeTypeP, NULL, call);
microprotocols_add(PyDateTimeAPI->TimeType, NULL, call);
call = PyMapping_GetItemString(mod, "TimestampFromPy");
microprotocols_add((PyTypeObject*)pyDateTimeTypeP, NULL, call);
microprotocols_add(PyDateTimeAPI->DateTimeType, NULL, call);
call = PyMapping_GetItemString(mod, "IntervalFromPy");
microprotocols_add((PyTypeObject*)pyDeltaTypeP, NULL, call);
microprotocols_add(PyDateTimeAPI->DeltaType, NULL, call);
#ifdef HAVE_MXDATETIME
/* as above, we use the callable objects from the psycopg module */
call = PyMapping_GetItemString(mod, "TimestampFromMx");
microprotocols_add(mxDateTimeP->DateTime_Type, NULL, call);
microprotocols_add(mxDateTime.DateTime_Type, NULL, call);
call = PyMapping_GetItemString(mod, "TimeFromMx");
microprotocols_add(mxDateTimeP->DateTimeDelta_Type, NULL, call);
microprotocols_add(mxDateTime.DateTimeDelta_Type, NULL, call);
#endif
}
@ -766,7 +763,8 @@ init_psycopg(void)
PyErr_SetString(PyExc_ImportError, "can't import mx.DateTime module");
return;
}
mxDateTimeP = &mxDateTime;
if (psyco_adapter_mxdatetime_init()) { return; }
if (psyco_typecast_mxdatetime_init()) { return; }
#endif
/* import python builtin datetime module, if available */
@ -776,16 +774,15 @@ init_psycopg(void)
PyErr_SetString(PyExc_ImportError, "can't import datetime module");
return;
}
/* Initialize the PyDateTimeAPI everywhere is used */
PyDateTime_IMPORT;
if (psyco_adapter_datetime_init()) { return; }
if (psyco_typecast_datetime_init()) { return; }
pydatetimeType.ob_type = &PyType_Type;
if (PyType_Ready(&pydatetimeType) == -1) return;
/* now we define the datetime types, this is crazy because python should
be doing that, not us! */
pyDateTypeP = PyObject_GetAttrString(pyDateTimeModuleP, "date");
pyTimeTypeP = PyObject_GetAttrString(pyDateTimeModuleP, "time");
pyDateTimeTypeP = PyObject_GetAttrString(pyDateTimeModuleP, "datetime");
pyDeltaTypeP = PyObject_GetAttrString(pyDateTimeModuleP, "timedelta");
/* import psycopg2.tz anyway (TODO: replace with C-level module?) */
pyPsycopgTzModule = PyImport_ImportModule("psycopg2.tz");
if (pyPsycopgTzModule == NULL) {

View File

@ -34,16 +34,16 @@
#include "psycopg/typecast.h"
#include "psycopg/cursor.h"
/* usefull function used by some typecasters */
/* useful function used by some typecasters */
static const char *
const char *
skip_until_space(const char *s)
{
while (*s && *s != ' ') s++;
return s;
}
static const char *
const char *
skip_until_space2(const char *s, Py_ssize_t *len)
{
while (*len > 0 && *s && *s != ' ') {

View File

@ -90,4 +90,9 @@ HIDDEN PyObject *typecast_from_python(
HIDDEN PyObject *typecast_cast(
PyObject *self, const char *str, Py_ssize_t len, PyObject *curs);
/** utility functions **/
HIDDEN const char *skip_until_space(const char *s);
HIDDEN const char *skip_until_space2(const char *s, Py_ssize_t *len);
#endif /* !defined(PSYCOPG_TYPECAST_H) */

View File

@ -26,14 +26,19 @@
#include <math.h>
#include "datetime.h"
int
psyco_typecast_datetime_init(void)
{
Dprintf("psyco_typecast_datetime_init: datetime init");
/* the pointer to the datetime module API is initialized by the module init
code, we just need to grab it */
extern HIDDEN PyObject* pyDateTimeModuleP;
extern HIDDEN PyObject *pyDateTypeP;
extern HIDDEN PyObject *pyTimeTypeP;
extern HIDDEN PyObject *pyDateTimeTypeP;
extern HIDDEN PyObject *pyDeltaTypeP;
PyDateTime_IMPORT;
if (!PyDateTimeAPI) {
PyErr_SetString(PyExc_ImportError, "datetime initialization failed");
return -1;
}
return 0;
}
/** DATE - cast a date into a date python object **/
@ -47,10 +52,12 @@ typecast_PYDATE_cast(const char *str, Py_ssize_t len, PyObject *curs)
if (!strcmp(str, "infinity") || !strcmp(str, "-infinity")) {
if (str[0] == '-') {
obj = PyObject_GetAttrString(pyDateTypeP, "min");
obj = PyObject_GetAttrString(
(PyObject*)PyDateTimeAPI->DateType, "min");
}
else {
obj = PyObject_GetAttrString(pyDateTypeP, "max");
obj = PyObject_GetAttrString(
(PyObject*)PyDateTimeAPI->DateType, "max");
}
}
@ -66,7 +73,8 @@ typecast_PYDATE_cast(const char *str, Py_ssize_t len, PyObject *curs)
}
else {
if (y > 9999) y = 9999;
obj = PyObject_CallFunction(pyDateTypeP, "iii", y, m, d);
obj = PyObject_CallFunction(
(PyObject*)PyDateTimeAPI->DateType, "iii", y, m, d);
}
}
return obj;
@ -89,10 +97,12 @@ typecast_PYDATETIME_cast(const char *str, Py_ssize_t len, PyObject *curs)
/* check for infinity */
if (!strcmp(str, "infinity") || !strcmp(str, "-infinity")) {
if (str[0] == '-') {
obj = PyObject_GetAttrString(pyDateTimeTypeP, "min");
obj = PyObject_GetAttrString(
(PyObject*)PyDateTimeAPI->DateTimeType, "min");
}
else {
obj = PyObject_GetAttrString(pyDateTimeTypeP, "max");
obj = PyObject_GetAttrString(
(PyObject*)PyDateTimeAPI->DateTimeType, "max");
}
}
@ -144,8 +154,9 @@ typecast_PYDATETIME_cast(const char *str, Py_ssize_t len, PyObject *curs)
tzinfo = Py_None;
}
if (tzinfo != NULL) {
obj = PyObject_CallFunction(pyDateTimeTypeP, "iiiiiiiO",
y, m, d, hh, mm, ss, us, tzinfo);
obj = PyObject_CallFunction(
(PyObject*)PyDateTimeAPI->DateTimeType, "iiiiiiiO",
y, m, d, hh, mm, ss, us, tzinfo);
Dprintf("typecast_PYDATETIME_cast: tzinfo: %p, refcnt = "
FORMAT_CODE_PY_SSIZE_T,
tzinfo, tzinfo->ob_refcnt
@ -197,7 +208,7 @@ typecast_PYTIME_cast(const char *str, Py_ssize_t len, PyObject *curs)
tzinfo = Py_None;
}
if (tzinfo != NULL) {
obj = PyObject_CallFunction(pyTimeTypeP, "iiiiO",
obj = PyObject_CallFunction((PyObject*)PyDateTimeAPI->TimeType, "iiiiO",
hh, mm, ss, us, tzinfo);
Py_DECREF(tzinfo);
}
@ -308,7 +319,7 @@ typecast_PYINTERVAL_cast(const char *str, Py_ssize_t len, PyObject *curs)
micro = (seconds - floor(seconds)) * 1000000.0;
sec = (int)floor(seconds);
return PyObject_CallFunction(pyDeltaTypeP, "iii",
return PyObject_CallFunction((PyObject*)PyDateTimeAPI->DeltaType, "iii",
days, sec, (int)round(micro));
}

View File

@ -0,0 +1,44 @@
/* typecast_datetime.h - definitions for datetime objects typecasters
*
* Copyright (C) 2010 Daniele Varrazzo <daniele.varrazzo@gmail.com>
*
* This file is part of psycopg.
*
* psycopg2 is free software: you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* In addition, as a special exception, the copyright holders give
* permission to link this program with the OpenSSL library (or with
* modified versions of OpenSSL that use the same license as OpenSSL),
* and distribute linked combinations including the two.
*
* You must obey the GNU Lesser General Public License in all respects for
* all of the code used other than OpenSSL.
*
* psycopg2 is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
* License for more details.
*/
#ifndef PSYCOPG_TYPECAST_DATETIME_H
#define PSYCOPG_TYPECAST_DATETIME_H 1
#define PY_SSIZE_T_CLEAN
#include <Python.h>
#include "psycopg/config.h"
#ifdef __cplusplus
extern "C" {
#endif
HIDDEN int psyco_typecast_datetime_init(void);
#ifdef __cplusplus
}
#endif
#endif /* !defined(PSYCOPG_TYPECAST_DATETIME_H) */

View File

@ -25,9 +25,18 @@
#include "mxDateTime.h"
/* the pointer to the mxDateTime API is initialized by the module init code,
we just need to grab it */
extern HIDDEN mxDateTimeModule_APIObject *mxDateTimeP;
int
psyco_typecast_mxdatetime_init(void)
{
Dprintf("psyco_typecast_mxdatetime_init: mx.DateTime init");
if(mxDateTime_ImportModuleAndAPI()) {
PyErr_SetString(PyExc_ImportError, "mx.DateTime initialization failed");
return -1;
}
return 0;
}
/** DATE - cast a date into mx.DateTime python object **/
@ -45,10 +54,10 @@ typecast_MXDATE_cast(const char *str, Py_ssize_t len, PyObject *curs)
/* check for infinity */
if (!strcmp(str, "infinity") || !strcmp(str, "-infinity")) {
if (str[0] == '-') {
return mxDateTimeP->DateTime_FromDateAndTime(-999998,1,1, 0,0,0);
return mxDateTime.DateTime_FromDateAndTime(-999998,1,1, 0,0,0);
}
else {
return mxDateTimeP->DateTime_FromDateAndTime(999999,12,31, 0,0,0);
return mxDateTime.DateTime_FromDateAndTime(999999,12,31, 0,0,0);
}
}
@ -75,7 +84,7 @@ typecast_MXDATE_cast(const char *str, Py_ssize_t len, PyObject *curs)
Dprintf("typecast_MXDATE_cast: fractionary seconds: %lf",
(double)ss + (double)us/(double)1000000.0);
return mxDateTimeP->DateTime_FromDateAndTime(y, m, d, hh, mm,
return mxDateTime.DateTime_FromDateAndTime(y, m, d, hh, mm,
(double)ss + (double)us/(double)1000000.0);
}
@ -102,7 +111,7 @@ typecast_MXTIME_cast(const char *str, Py_ssize_t len, PyObject *curs)
Dprintf("typecast_MXTIME_cast: fractionary seconds: %lf",
(double)ss + (double)us/(double)1000000.0);
return mxDateTimeP->DateTimeDelta_FromTime(hh, mm,
return mxDateTime.DateTimeDelta_FromTime(hh, mm,
(double)ss + (double)us/(double)1000000.0);
}
@ -225,7 +234,7 @@ typecast_MXINTERVAL_cast(const char *str, Py_ssize_t len, PyObject *curs)
Dprintf("typecast_MXINTERVAL_cast: days = %ld, seconds = %f",
days, seconds);
return mxDateTimeP->DateTimeDelta_FromDaysAndSeconds(days, seconds);
return mxDateTime.DateTimeDelta_FromDaysAndSeconds(days, seconds);
}
/* psycopg defaults to using mx types */

View File

@ -0,0 +1,44 @@
/* typecast_mxdatetime.h - definitions for mx.DateTime typecasters
*
* Copyright (C) 2010 Daniele Varrazzo <daniele.varrazzo@gmail.com>
*
* This file is part of psycopg.
*
* psycopg2 is free software: you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published
* by the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* In addition, as a special exception, the copyright holders give
* permission to link this program with the OpenSSL library (or with
* modified versions of OpenSSL that use the same license as OpenSSL),
* and distribute linked combinations including the two.
*
* You must obey the GNU Lesser General Public License in all respects for
* all of the code used other than OpenSSL.
*
* psycopg2 is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public
* License for more details.
*/
#ifndef PSYCOPG_TYPECAST_MXDATETIME_H
#define PSYCOPG_TYPECAST_MXDATETIME_H 1
#define PY_SSIZE_T_CLEAN
#include <Python.h>
#include "psycopg/config.h"
#ifdef __cplusplus
extern "C" {
#endif
HIDDEN int psyco_typecast_mxdatetime_init(void);
#ifdef __cplusplus
}
#endif
#endif /* !defined(PSYCOPG_TYPECAST_MXDATETIME_H) */

View File

@ -26,6 +26,7 @@
#include <Python.h>
#include <string.h>
#define PSYCOPG_MODULE
#include "psycopg/config.h"
#include "psycopg/psycopg.h"
#include "psycopg/connection.h"

View File

@ -1,10 +1,9 @@
[build_ext]
define=PSYCOPG_EXTENSIONS,PSYCOPG_NEW_BOOLEAN,HAVE_PQFREEMEM,HAVE_PQPROTOCOL3
define=PSYCOPG_EXTENSIONS,PSYCOPG_NEW_BOOLEAN,HAVE_PQFREEMEM
# PSYCOPG_EXTENSIONS enables extensions to PEP-249 (you really want this)
# PSYCOPG_DISPLAY_SIZE enable display size calculation (a little slower)
# HAVE_PQFREEMEM should be defined on PostgreSQL >= 7.4
# HAVE_PQPROTOCOL3 should be defined on PostgreSQL >= 7.4
# PSYCOPG_DEBUG can be added to enable verbose debug information
# PSYCOPG_NEW_BOOLEAN to format booleans as true/false vs 't'/'f'

View File

@ -14,17 +14,15 @@
"""Python-PostgreSQL Database Adapter
psycopg is a PostgreSQL database adapter for the Python programming
language. This is version 2, a complete rewrite of the original code to
provide new-style classes for connection and cursor objects and other sweet
candies. Like the original, psycopg 2 was written with the aim of being
very small and fast, and stable as a rock.
psycopg2 is a PostgreSQL database adapter for the Python programming
language. psycopg2 was written with the aim of being very small and fast,
and stable as a rock.
psycopg is different from the other database adapter because it was
psycopg2 is different from the other database adapter because it was
designed for heavily multi-threaded applications that create and destroy
lots of cursors and make a conspicuous number of concurrent INSERTs or
UPDATEs. psycopg 2 also provide full asycronous operations for the really
brave programmer.
UPDATEs. psycopg2 also provide full asycronous operations and support
for coroutine libraries.
"""
classifiers = """\
@ -393,11 +391,10 @@ else:
sys.exit(1)
# generate a nice version string to avoid confusion when users report bugs
version_flags.append('pq3') # no more a choice
for have in parser.get('build_ext', 'define').split(','):
if have == 'PSYCOPG_EXTENSIONS':
version_flags.append('ext')
elif have == 'HAVE_PQPROTOCOL3':
version_flags.append('pq3')
if version_flags:
PSYCOPG_VERSION_EX = PSYCOPG_VERSION + " (%s)" % ' '.join(version_flags)
else:
@ -440,8 +437,8 @@ setup(name="psycopg2",
long_description="\n".join(__doc__.split("\n")[2:]),
classifiers=filter(None, classifiers.split("\n")),
data_files=data_files,
package_dir={'psycopg2':'lib'},
packages=['psycopg2'],
package_dir={'psycopg2':'lib', 'psycopg2.tests': 'tests'},
packages=['psycopg2', 'psycopg2.tests'],
cmdclass={ 'build_ext': psycopg_build_ext },
ext_modules=ext)

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python
import os
import unittest
from testutils import unittest
dbname = os.environ.get('PSYCOPG2_TESTDB', 'psycopg2_test')
dbhost = os.environ.get('PSYCOPG2_TESTDB_HOST', None)
@ -46,6 +46,7 @@ import test_copy
import test_notify
import test_async
import test_green
import test_cancel
def test_suite():
suite = unittest.TestSuite()
@ -59,17 +60,12 @@ def test_suite():
suite.addTest(test_transaction.test_suite())
suite.addTest(types_basic.test_suite())
suite.addTest(types_extras.test_suite())
if not green:
suite.addTest(test_lobject.test_suite())
suite.addTest(test_copy.test_suite())
else:
import warnings
warnings.warn("copy/lobjects not implemented in green mode: skipping tests")
suite.addTest(test_lobject.test_suite())
suite.addTest(test_copy.test_suite())
suite.addTest(test_notify.test_suite())
suite.addTest(test_async.test_suite())
suite.addTest(test_green.test_suite())
suite.addTest(test_cancel.test_suite())
return suite
if __name__ == '__main__':

View File

@ -16,7 +16,7 @@
import psycopg2
import psycopg2.extras
import unittest
from testutils import unittest
import tests
@ -85,6 +85,7 @@ class ExtrasDictCursorTests(unittest.TestCase):
row = getter(curs)
self.failUnless(row['foo'] == 'bar')
self.failUnless(row[0] == 'bar')
return row
def _testWithNamedCursor(self, getter):
curs = self.conn.cursor('aname', cursor_factory=psycopg2.extras.DictCursor)
@ -105,14 +106,19 @@ class ExtrasDictCursorTests(unittest.TestCase):
row = getter(curs)
self.failUnless(row['foo'] == 'bar')
def testUpdateRow(self):
row = self._testWithPlainCursor(lambda curs: curs.fetchone())
row['foo'] = 'qux'
self.failUnless(row['foo'] == 'qux')
self.failUnless(row[0] == 'qux')
def if_has_namedtuple(f):
def if_has_namedtuple_(self):
try:
from collections import namedtuple
except ImportError:
import warnings
warnings.warn("collections.namedtuple not available")
return self.skipTest("collections.namedtuple not available")
else:
return f(self)
@ -133,10 +139,15 @@ class NamedTupleCursorTest(unittest.TestCase):
connection_factory=NamedTupleConnection)
curs = self.conn.cursor()
curs.execute("CREATE TEMPORARY TABLE nttest (i int, s text)")
curs.execute(
"INSERT INTO nttest VALUES (1, 'foo'), (2, 'bar'), (3, 'baz')")
curs.execute("INSERT INTO nttest VALUES (1, 'foo')")
curs.execute("INSERT INTO nttest VALUES (2, 'bar')")
curs.execute("INSERT INTO nttest VALUES (3, 'baz')")
self.conn.commit()
def tearDown(self):
if self.conn is not None:
self.conn.close()
@if_has_namedtuple
def test_fetchone(self):
curs = self.conn.cursor()
@ -194,6 +205,8 @@ class NamedTupleCursorTest(unittest.TestCase):
# an import error somewhere
from psycopg2.extras import NamedTupleConnection
try:
if self.conn is not None:
self.conn.close()
self.conn = psycopg2.connect(tests.dsn,
connection_factory=NamedTupleConnection)
curs = self.conn.cursor()

View File

@ -1,5 +1,5 @@
#!/usr/bin/env python
import unittest
from testutils import unittest, skip_if_no_pg_sleep
import psycopg2
from psycopg2 import extensions
@ -94,13 +94,10 @@ class AsyncTests(unittest.TestCase):
self.assertFalse(self.conn.isexecuting())
self.assertEquals(cur.fetchone()[0], "a")
@skip_if_no_pg_sleep('conn')
def test_async_callproc(self):
cur = self.conn.cursor()
try:
cur.callproc("pg_sleep", (0.1, ))
except psycopg2.ProgrammingError:
# PG <8.1 did not have pg_sleep
return
cur.callproc("pg_sleep", (0.1, ))
self.assertTrue(self.conn.isexecuting())
self.wait(cur)
@ -213,7 +210,11 @@ class AsyncTests(unittest.TestCase):
cur.execute("begin")
self.wait(cur)
cur.execute("insert into table1 values (1), (2), (3)")
cur.execute("""
insert into table1 values (1);
insert into table1 values (2);
insert into table1 values (3);
""")
self.wait(cur)
cur.execute("select id from table1 order by id")
@ -247,7 +248,11 @@ class AsyncTests(unittest.TestCase):
def test_async_scroll(self):
cur = self.conn.cursor()
cur.execute("insert into table1 values (1), (2), (3)")
cur.execute("""
insert into table1 values (1);
insert into table1 values (2);
insert into table1 values (3);
""")
self.wait(cur)
cur.execute("select id from table1 order by id")
@ -279,7 +284,11 @@ class AsyncTests(unittest.TestCase):
def test_scroll(self):
cur = self.sync_conn.cursor()
cur.execute("create table table1 (id int)")
cur.execute("insert into table1 values (1), (2), (3)")
cur.execute("""
insert into table1 values (1);
insert into table1 values (2);
insert into table1 values (3);
""")
cur.execute("select id from table1 order by id")
cur.scroll(2)
cur.scroll(-1)
@ -317,7 +326,12 @@ class AsyncTests(unittest.TestCase):
if stub.polls.count(psycopg2.extensions.POLL_WRITE) > 1:
return
self.fail("sending a large query didn't trigger block on write.")
# This is more a testing glitch than an error: it happens
# on high load on linux: probably because the kernel has more
# buffers ready. A warning may be useful during development,
# but an error is bad during regression testing.
import warnings
warnings.warn("sending a large query didn't trigger block on write.")
def test_sync_poll(self):
cur = self.sync_conn.cursor()

91
tests/test_cancel.py Normal file
View File

@ -0,0 +1,91 @@
#!/usr/bin/env python
import time
import threading
from testutils import unittest, skip_if_no_pg_sleep
import tests
import psycopg2
import psycopg2.extensions
from psycopg2 import extras
class CancelTests(unittest.TestCase):
def setUp(self):
self.conn = psycopg2.connect(tests.dsn)
cur = self.conn.cursor()
cur.execute('''
CREATE TEMPORARY TABLE table1 (
id int PRIMARY KEY
)''')
self.conn.commit()
def tearDown(self):
self.conn.close()
def test_empty_cancel(self):
self.conn.cancel()
@skip_if_no_pg_sleep('conn')
def test_cancel(self):
errors = []
def neverending(conn):
cur = conn.cursor()
try:
self.assertRaises(psycopg2.extensions.QueryCanceledError,
cur.execute, "select pg_sleep(10000)")
# make sure the connection still works
conn.rollback()
cur.execute("select 1")
self.assertEqual(cur.fetchall(), [(1, )])
except Exception, e:
errors.append(e)
raise
def canceller(conn):
cur = conn.cursor()
try:
conn.cancel()
except Exception, e:
errors.append(e)
raise
thread1 = threading.Thread(target=neverending, args=(self.conn, ))
# wait a bit to make sure that the other thread is already in
# pg_sleep -- ugly and racy, but the chances are ridiculously low
thread2 = threading.Timer(0.3, canceller, args=(self.conn, ))
thread1.start()
thread2.start()
thread1.join()
thread2.join()
self.assertEqual(errors, [])
@skip_if_no_pg_sleep('conn')
def test_async_cancel(self):
async_conn = psycopg2.connect(tests.dsn, async=True)
self.assertRaises(psycopg2.OperationalError, async_conn.cancel)
extras.wait_select(async_conn)
cur = async_conn.cursor()
cur.execute("select pg_sleep(10000)")
self.assertTrue(async_conn.isexecuting())
async_conn.cancel()
self.assertRaises(psycopg2.extensions.QueryCanceledError,
extras.wait_select, async_conn)
cur.execute("select 1")
extras.wait_select(async_conn)
self.assertEqual(cur.fetchall(), [(1, )])
def test_async_connection_cancel(self):
async_conn = psycopg2.connect(tests.dsn, async=True)
async_conn.close()
self.assertTrue(async_conn.closed)
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)
if __name__ == "__main__":
unittest.main()

View File

@ -1,6 +1,8 @@
#!/usr/bin/env python
import unittest
import time
import threading
from testutils import unittest, decorate_all_tests, skip_if_no_pg_sleep
from operator import attrgetter
import psycopg2
@ -9,17 +11,21 @@ import tests
class ConnectionTests(unittest.TestCase):
def connect(self):
return psycopg2.connect(tests.dsn)
def setUp(self):
self.conn = psycopg2.connect(tests.dsn)
def tearDown(self):
if not self.conn.closed:
self.conn.close()
def test_closed_attribute(self):
conn = self.connect()
conn = self.conn
self.assertEqual(conn.closed, False)
conn.close()
self.assertEqual(conn.closed, True)
def test_cursor_closed_attribute(self):
conn = self.connect()
conn = self.conn
curs = conn.cursor()
self.assertEqual(curs.closed, False)
curs.close()
@ -31,7 +37,7 @@ class ConnectionTests(unittest.TestCase):
self.assertEqual(curs.closed, True)
def test_reset(self):
conn = self.connect()
conn = self.conn
# switch isolation level, then reset
level = conn.isolation_level
conn.set_isolation_level(0)
@ -41,15 +47,14 @@ class ConnectionTests(unittest.TestCase):
self.assertEqual(conn.isolation_level, level)
def test_notices(self):
conn = self.connect()
conn = self.conn
cur = conn.cursor()
cur.execute("create temp table chatty (id serial primary key);")
self.assertEqual("CREATE TABLE", cur.statusmessage)
self.assert_(conn.notices)
conn.close()
def test_notices_consistent_order(self):
conn = self.connect()
conn = self.conn
cur = conn.cursor()
cur.execute("create temp table table1 (id serial); create temp table table2 (id serial);")
cur.execute("create temp table table3 (id serial); create temp table table4 (id serial);")
@ -58,10 +63,9 @@ class ConnectionTests(unittest.TestCase):
self.assert_('table2' in conn.notices[1])
self.assert_('table3' in conn.notices[2])
self.assert_('table4' in conn.notices[3])
conn.close()
def test_notices_limited(self):
conn = self.connect()
conn = self.conn
cur = conn.cursor()
for i in range(0, 100, 10):
sql = " ".join(["create temp table table%d (id serial);" % j for j in range(i, i+10)])
@ -72,15 +76,66 @@ class ConnectionTests(unittest.TestCase):
self.assert_('table51' in conn.notices[1], conn.notices[1])
self.assert_('table98' in conn.notices[-2], conn.notices[-2])
self.assert_('table99' in conn.notices[-1], conn.notices[-1])
conn.close()
def test_server_version(self):
conn = self.connect()
self.assert_(conn.server_version)
self.assert_(self.conn.server_version)
def test_protocol_version(self):
self.assert_(self.conn.protocol_version in (2,3),
self.conn.protocol_version)
def test_tpc_unsupported(self):
cnn = self.conn
if cnn.server_version >= 80100:
return self.skipTest("tpc is supported")
self.assertRaises(psycopg2.NotSupportedError,
cnn.xid, 42, "foo", "bar")
@skip_if_no_pg_sleep('conn')
def test_concurrent_execution(self):
def slave():
cnn = psycopg2.connect(tests.dsn)
cur = cnn.cursor()
cur.execute("select pg_sleep(2)")
cur.close()
cnn.close()
t1 = threading.Thread(target=slave)
t2 = threading.Thread(target=slave)
t0 = time.time()
t1.start()
t2.start()
t1.join()
t2.join()
self.assert_(time.time() - t0 < 3,
"something broken in concurrency")
class IsolationLevelsTestCase(unittest.TestCase):
def setUp(self):
self._conns = []
conn = self.connect()
self.assert_(conn.protocol_version in (2,3), conn.protocol_version)
cur = conn.cursor()
try:
cur.execute("drop table isolevel;")
except psycopg2.ProgrammingError:
conn.rollback()
cur.execute("create table isolevel (id integer);")
conn.commit()
conn.close()
def tearDown(self):
# close the connections used in the test
for conn in self._conns:
if not conn.closed:
conn.close()
def connect(self):
conn = psycopg2.connect(tests.dsn)
self._conns.append(conn)
return conn
def test_isolation_level(self):
conn = self.connect()
@ -92,6 +147,135 @@ class ConnectionTests(unittest.TestCase):
conn = self.connect()
self.assert_(conn.encoding in psycopg2.extensions.encodings)
def test_set_isolation_level(self):
conn = self.connect()
conn.set_isolation_level(
psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
self.assertEqual(conn.isolation_level,
psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
conn.set_isolation_level(
psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
self.assertEqual(conn.isolation_level,
psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
conn.set_isolation_level(
psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
self.assertEqual(conn.isolation_level,
psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
self.assertRaises(ValueError, conn.set_isolation_level, -1)
self.assertRaises(ValueError, conn.set_isolation_level, 3)
def test_set_isolation_level_abort(self):
conn = self.connect()
cur = conn.cursor()
self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE,
conn.get_transaction_status())
cur.execute("insert into isolevel values (10);")
self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_INTRANS,
conn.get_transaction_status())
conn.set_isolation_level(
psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE,
conn.get_transaction_status())
cur.execute("select count(*) from isolevel;")
self.assertEqual(0, cur.fetchone()[0])
cur.execute("insert into isolevel values (10);")
self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_INTRANS,
conn.get_transaction_status())
conn.set_isolation_level(
psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE,
conn.get_transaction_status())
cur.execute("select count(*) from isolevel;")
self.assertEqual(0, cur.fetchone()[0])
cur.execute("insert into isolevel values (10);")
self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE,
conn.get_transaction_status())
conn.set_isolation_level(
psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
self.assertEqual(psycopg2.extensions.TRANSACTION_STATUS_IDLE,
conn.get_transaction_status())
cur.execute("select count(*) from isolevel;")
self.assertEqual(1, cur.fetchone()[0])
def test_isolation_level_autocommit(self):
cnn1 = self.connect()
cnn2 = self.connect()
cnn2.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_AUTOCOMMIT)
cur1 = cnn1.cursor()
cur1.execute("select count(*) from isolevel;")
self.assertEqual(0, cur1.fetchone()[0])
cnn1.commit()
cur2 = cnn2.cursor()
cur2.execute("insert into isolevel values (10);")
cur1.execute("select count(*) from isolevel;")
self.assertEqual(1, cur1.fetchone()[0])
def test_isolation_level_read_committed(self):
cnn1 = self.connect()
cnn2 = self.connect()
cnn2.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_READ_COMMITTED)
cur1 = cnn1.cursor()
cur1.execute("select count(*) from isolevel;")
self.assertEqual(0, cur1.fetchone()[0])
cnn1.commit()
cur2 = cnn2.cursor()
cur2.execute("insert into isolevel values (10);")
cur1.execute("insert into isolevel values (20);")
cur2.execute("select count(*) from isolevel;")
self.assertEqual(1, cur2.fetchone()[0])
cnn1.commit()
cur2.execute("select count(*) from isolevel;")
self.assertEqual(2, cur2.fetchone()[0])
cur1.execute("select count(*) from isolevel;")
self.assertEqual(1, cur1.fetchone()[0])
cnn2.commit()
cur1.execute("select count(*) from isolevel;")
self.assertEqual(2, cur1.fetchone()[0])
def test_isolation_level_serializable(self):
cnn1 = self.connect()
cnn2 = self.connect()
cnn2.set_isolation_level(psycopg2.extensions.ISOLATION_LEVEL_SERIALIZABLE)
cur1 = cnn1.cursor()
cur1.execute("select count(*) from isolevel;")
self.assertEqual(0, cur1.fetchone()[0])
cnn1.commit()
cur2 = cnn2.cursor()
cur2.execute("insert into isolevel values (10);")
cur1.execute("insert into isolevel values (20);")
cur2.execute("select count(*) from isolevel;")
self.assertEqual(1, cur2.fetchone()[0])
cnn1.commit()
cur2.execute("select count(*) from isolevel;")
self.assertEqual(1, cur2.fetchone()[0])
cur1.execute("select count(*) from isolevel;")
self.assertEqual(1, cur1.fetchone()[0])
cnn2.commit()
cur1.execute("select count(*) from isolevel;")
self.assertEqual(2, cur1.fetchone()[0])
cur2.execute("select count(*) from isolevel;")
self.assertEqual(2, cur2.fetchone()[0])
def skip_if_tpc_disabled(f):
"""Skip a test if the server has tpc support disabled."""
@ -101,39 +285,52 @@ def skip_if_tpc_disabled(f):
try:
cur.execute("SHOW max_prepared_transactions;")
except psycopg2.ProgrammingError:
# Server version too old: let's die a different death
mtp = 1
return self.skipTest(
"server too old: two phase transactions not supported.")
else:
mtp = int(cur.fetchone()[0])
cnn.close()
if not mtp:
import warnings
warnings.warn(
return self.skipTest(
"server not configured for two phase transactions. "
"set max_prepared_transactions to > 0 to run the test")
return
return f(self)
skip_if_tpc_disabled_.__name__ = f.__name__
return skip_if_tpc_disabled_
class ConnectionTwoPhaseTests(unittest.TestCase):
def setUp(self):
self._conns = []
self.make_test_table()
self.clear_test_xacts()
def tearDown(self):
self.clear_test_xacts()
# close the connections used in the test
for conn in self._conns:
if not conn.closed:
conn.close()
def clear_test_xacts(self):
"""Rollback all the prepared transaction in the testing db."""
cnn = self.connect()
cnn.set_isolation_level(0)
cur = cnn.cursor()
cur.execute(
"select gid from pg_prepared_xacts where database = %s",
(tests.dbname,))
try:
cur.execute(
"select gid from pg_prepared_xacts where database = %s",
(tests.dbname,))
except psycopg2.ProgrammingError:
cnn.rollback()
cnn.close()
return
gids = [ r[0] for r in cur ]
for gid in gids:
cur.execute("rollback prepared %s;", (gid,))
@ -142,7 +339,10 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
def make_test_table(self):
cnn = self.connect()
cur = cnn.cursor()
cur.execute("DROP TABLE IF EXISTS test_tpc;")
try:
cur.execute("DROP TABLE test_tpc;")
except psycopg2.ProgrammingError:
cnn.rollback()
cur.execute("CREATE TABLE test_tpc (data text);")
cnn.commit()
cnn.close()
@ -169,9 +369,10 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
return rv
def connect(self):
return psycopg2.connect(tests.dsn)
conn = psycopg2.connect(tests.dsn)
self._conns.append(conn)
return conn
@skip_if_tpc_disabled
def test_tpc_commit(self):
cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual")
@ -213,7 +414,6 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
self.assertEqual(0, self.count_xacts())
self.assertEqual(1, self.count_test_records())
@skip_if_tpc_disabled
def test_tpc_commit_recovered(self):
cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual")
@ -240,7 +440,6 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
self.assertEqual(0, self.count_xacts())
self.assertEqual(1, self.count_test_records())
@skip_if_tpc_disabled
def test_tpc_rollback(self):
cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual")
@ -282,7 +481,6 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
self.assertEqual(0, self.count_xacts())
self.assertEqual(0, self.count_test_records())
@skip_if_tpc_disabled
def test_tpc_rollback_recovered(self):
cnn = self.connect()
xid = cnn.xid(1, "gtrid", "bqual")
@ -321,7 +519,6 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
xns = cnn.tpc_recover()
self.assertEqual(psycopg2.extensions.STATUS_BEGIN, cnn.status)
@skip_if_tpc_disabled
def test_recovered_xids(self):
# insert a few test xns
cnn = self.connect()
@ -352,7 +549,6 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
self.assertEqual(xid.owner, owner)
self.assertEqual(xid.database, database)
@skip_if_tpc_disabled
def test_xid_encoding(self):
cnn = self.connect()
xid = cnn.xid(42, "gtrid", "bqual")
@ -365,7 +561,6 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
(tests.dbname,))
self.assertEqual('42_Z3RyaWQ=_YnF1YWw=', cur.fetchone()[0])
@skip_if_tpc_disabled
def test_xid_roundtrip(self):
for fid, gtrid, bqual in [
(0, "", ""),
@ -389,7 +584,6 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
cnn.tpc_rollback(xid)
@skip_if_tpc_disabled
def test_unparsed_roundtrip(self):
for tid in [
'',
@ -442,7 +636,6 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
x2 = Xid.from_string('99_xxx_yyy')
self.assertEqual(str(x2), '99_xxx_yyy')
@skip_if_tpc_disabled
def test_xid_unicode(self):
cnn = self.connect()
x1 = cnn.xid(10, u'uni', u'code')
@ -455,7 +648,6 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
self.assertEqual('uni', xid.gtrid)
self.assertEqual('code', xid.bqual)
@skip_if_tpc_disabled
def test_xid_unicode_unparsed(self):
# We don't expect people shooting snowmen as transaction ids,
# so if something explodes in an encode error I don't mind.
@ -472,6 +664,14 @@ class ConnectionTwoPhaseTests(unittest.TestCase):
self.assertEqual('transaction-id', xid.gtrid)
self.assertEqual(None, xid.bqual)
def test_cancel_fails_prepared(self):
cnn = self.connect()
cnn.tpc_begin('cancel')
cnn.tpc_prepare()
self.assertRaises(psycopg2.ProgrammingError, cnn.cancel)
decorate_all_tests(ConnectionTwoPhaseTests, skip_if_tpc_disabled)
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)

View File

@ -1,7 +1,7 @@
#!/usr/bin/env python
import os
import string
import unittest
from testutils import unittest, decorate_all_tests
from cStringIO import StringIO
from itertools import cycle, izip
@ -9,6 +9,15 @@ import psycopg2
import psycopg2.extensions
import tests
def skip_if_green(f):
def skip_if_green_(self):
if tests.green:
return self.skipTest("copy in async mode currently not supported")
else:
return f(self)
return skip_if_green_
class MinimalRead(object):
"""A file wrapper exposing the minimal interface to copy from."""
@ -29,6 +38,7 @@ class MinimalWrite(object):
def write(self, data):
return self.f.write(data)
class CopyTests(unittest.TestCase):
def setUp(self):
@ -124,6 +134,9 @@ class CopyTests(unittest.TestCase):
self.assertEqual(ntests, len(string.letters))
decorate_all_tests(CopyTests, skip_if_green)
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)

View File

@ -7,11 +7,14 @@ import tests
class CursorTests(unittest.TestCase):
def connect(self):
return psycopg2.connect(tests.dsn)
def setUp(self):
self.conn = psycopg2.connect(tests.dsn)
def tearDown(self):
self.conn.close()
def test_executemany_propagate_exceptions(self):
conn = self.connect()
conn = self.conn
cur = conn.cursor()
cur.execute("create temp table test_exc (data int);")
def buggygen():
@ -19,10 +22,9 @@ class CursorTests(unittest.TestCase):
self.assertRaises(ZeroDivisionError,
cur.executemany, "insert into test_exc values (%s)", buggygen())
cur.close()
conn.close()
def test_mogrify_unicode(self):
conn = self.connect()
conn = self.conn
cur = conn.cursor()
# test consistency between execute and mogrify.
@ -60,7 +62,7 @@ class CursorTests(unittest.TestCase):
except:
return
conn = self.connect()
conn = self.conn
cur = conn.cursor()
self.assertEqual('SELECT 10.3;',
cur.mogrify("SELECT %s;", (Decimal("10.3"),)))

View File

@ -21,14 +21,13 @@ class ConnectionStub(object):
return rv
class GreenTests(unittest.TestCase):
def connect(self):
return psycopg2.connect(tests.dsn)
def setUp(self):
self._cb = psycopg2.extensions.get_wait_callback()
psycopg2.extensions.set_wait_callback(psycopg2.extras.wait_select)
self.conn = psycopg2.connect(tests.dsn)
def tearDown(self):
self.conn.close()
psycopg2.extensions.set_wait_callback(self._cb)
def set_stub_wait_callback(self, conn):
@ -39,7 +38,7 @@ class GreenTests(unittest.TestCase):
def test_flush_on_write(self):
# a very large query requires a flush loop to be sent to the backend
conn = self.connect()
conn = self.conn
stub = self.set_stub_wait_callback(conn)
curs = conn.cursor()
for mb in 1, 5, 10, 20, 50:
@ -50,10 +49,15 @@ class GreenTests(unittest.TestCase):
if stub.polls.count(psycopg2.extensions.POLL_WRITE) > 1:
return
self.fail("sending a large query didn't trigger block on write.")
# This is more a testing glitch than an error: it happens
# on high load on linux: probably because the kernel has more
# buffers ready. A warning may be useful during development,
# but an error is bad during regression testing.
import warnings
warnings.warn("sending a large query didn't trigger block on write.")
def test_error_in_callback(self):
conn = self.connect()
conn = self.conn
curs = conn.cursor()
curs.execute("select 1") # have a BEGIN
curs.fetchone()

View File

@ -2,16 +2,33 @@
import os
import shutil
import tempfile
import unittest
import warnings
from testutils import unittest, decorate_all_tests
import psycopg2
import psycopg2.extensions
import tests
def skip_if_no_lo(f):
def skip_if_no_lo_(self):
if self.conn.server_version < 80100:
return self.skipTest("large objects only supported from PG 8.1")
else:
return f(self)
class LargeObjectTests(unittest.TestCase):
return skip_if_no_lo_
def skip_if_green(f):
def skip_if_green_(self):
if tests.green:
return self.skipTest("libpq doesn't support LO in async mode")
else:
return f(self)
return skip_if_green_
class LargeObjectMixin(object):
# doesn't derive from TestCase to avoid repeating tests twice.
def setUp(self):
self.conn = psycopg2.connect(tests.dsn)
self.lo_oid = None
@ -30,6 +47,8 @@ class LargeObjectTests(unittest.TestCase):
lo.unlink()
self.conn.close()
class LargeObjectTests(LargeObjectMixin, unittest.TestCase):
def test_create(self):
lo = self.conn.lobject()
self.assertNotEqual(lo, None)
@ -261,30 +280,25 @@ class LargeObjectTests(unittest.TestCase):
self.assertTrue(os.path.exists(filename))
self.assertEqual(open(filename, "rb").read(), "some data")
decorate_all_tests(LargeObjectTests, skip_if_no_lo)
decorate_all_tests(LargeObjectTests, skip_if_green)
class LargeObjectTruncateTests(LargeObjectTests):
skip = None
def skip_if_no_truncate(f):
def skip_if_no_truncate_(self):
if self.conn.server_version < 80300:
return self.skipTest(
"the server doesn't support large object truncate")
def setUp(self):
LargeObjectTests.setUp(self)
if not hasattr(psycopg2.extensions.lobject, 'truncate'):
return self.skipTest(
"psycopg2 has been built against a libpq "
"without large object truncate support.")
if self.skip is None:
self.skip = False
if self.conn.server_version < 80300:
warnings.warn("Large object truncate tests skipped, "
"the server does not support them")
self.skip = True
if not hasattr(psycopg2.extensions.lobject, 'truncate'):
warnings.warn("Large object truncate tests skipped, "
"psycopg2 has been built against an old library")
self.skip = True
return f(self)
class LargeObjectTruncateTests(LargeObjectMixin, unittest.TestCase):
def test_truncate(self):
if self.skip:
return
lo = self.conn.lobject()
lo.write("some data")
lo.close()
@ -308,23 +322,22 @@ class LargeObjectTruncateTests(LargeObjectTests):
self.assertEqual(lo.read(), "")
def test_truncate_after_close(self):
if self.skip:
return
lo = self.conn.lobject()
lo.close()
self.assertRaises(psycopg2.InterfaceError, lo.truncate)
def test_truncate_after_commit(self):
if self.skip:
return
lo = self.conn.lobject()
self.lo_oid = lo.oid
self.conn.commit()
self.assertRaises(psycopg2.ProgrammingError, lo.truncate)
decorate_all_tests(LargeObjectTruncateTests, skip_if_no_lo)
decorate_all_tests(LargeObjectTruncateTests, skip_if_green)
decorate_all_tests(LargeObjectTruncateTests, skip_if_no_truncate)
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)

View File

@ -1,6 +1,5 @@
#!/usr/bin/env python
import unittest
import warnings
from testutils import unittest
import psycopg2
from psycopg2 import extensions
@ -110,6 +109,7 @@ conn.close()
self.autocommit(self.conn)
self.listen('foo')
self.notify('foo').communicate()
time.sleep(0.1)
self.conn.poll()
notify = self.conn.notifies[0]
self.assert_(isinstance(notify, psycopg2.extensions.Notify))
@ -118,6 +118,7 @@ conn.close()
self.autocommit(self.conn)
self.listen('foo')
pid = int(self.notify('foo').communicate()[0])
time.sleep(0.1)
self.conn.poll()
self.assertEqual(1, len(self.conn.notifies))
notify = self.conn.notifies[0]
@ -127,12 +128,12 @@ conn.close()
def test_notify_payload(self):
if self.conn.server_version < 90000:
warnings.warn("server version %s doesn't support notify payload: skipping test"
return self.skipTest("server version %s doesn't support notify payload"
% self.conn.server_version)
return
self.autocommit(self.conn)
self.listen('foo')
pid = int(self.notify('foo', payload="Hello, world!").communicate()[0])
time.sleep(0.1)
self.conn.poll()
self.assertEqual(1, len(self.conn.notifies))
notify = self.conn.notifies[0]

View File

@ -2,7 +2,7 @@
import dbapi20
import dbapi20_tpc
from test_connection import skip_if_tpc_disabled
import unittest
from testutils import unittest, decorate_all_tests
import psycopg2
import tests
@ -23,19 +23,14 @@ class Psycopg2Tests(dbapi20.DatabaseAPI20Test):
pass
class Psycopg2TPCTests(dbapi20_tpc.TwoPhaseCommitTests):
class Psycopg2TPCTests(dbapi20_tpc.TwoPhaseCommitTests, unittest.TestCase):
driver = psycopg2
def connect(self):
return psycopg2.connect(dsn=tests.dsn)
@skip_if_tpc_disabled
def test_tpc_commit_with_prepare(self):
super(Psycopg2TPCTests, self).test_tpc_commit_with_prepare()
decorate_all_tests(Psycopg2TPCTests, skip_if_tpc_disabled)
@skip_if_tpc_disabled
def test_tpc_rollback_with_prepare(self):
super(Psycopg2TPCTests, self).test_tpc_rollback_with_prepare()
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)

View File

@ -1,6 +1,5 @@
#!/usr/bin/env python
import unittest
import warnings
from testutils import unittest
import psycopg2
import psycopg2.extensions
@ -61,9 +60,9 @@ class QuotingTestCase(unittest.TestCase):
curs.execute("SHOW server_encoding")
server_encoding = curs.fetchone()[0]
if server_encoding != "UTF8":
warnings.warn("Unicode test skipped since server encoding is %s"
% server_encoding)
return
return self.skipTest(
"Unicode test skipped since server encoding is %s"
% server_encoding)
data = u"""some data with \t chars
to escape into, 'quotes', \u20ac euro sign and \\ a backslash too.

View File

@ -1,6 +1,6 @@
#!/usr/bin/env python
import threading
import unittest
from testutils import unittest, skip_if_no_pg_sleep
import psycopg2
from psycopg2.extensions import (
@ -211,6 +211,10 @@ class QueryCancellationTests(unittest.TestCase):
self.conn = psycopg2.connect(tests.dsn)
self.conn.set_isolation_level(ISOLATION_LEVEL_SERIALIZABLE)
def tearDown(self):
self.conn.close()
@skip_if_no_pg_sleep('conn')
def test_statement_timeout(self):
curs = self.conn.cursor()
# Set a low statement timeout, then sleep for a longer period.

71
tests/testutils.py Normal file
View File

@ -0,0 +1,71 @@
# Utility module for psycopg2 testing.
#
# Copyright (C) 2010 Daniele Varrazzo <daniele.varrazzo@gmail.com>
# Use unittest2 if available. Otherwise mock a skip facility with warnings.
try:
import unittest2
unittest = unittest2
except ImportError:
import unittest
unittest2 = None
if hasattr(unittest, 'skipIf'):
skip = unittest.skip
skipIf = unittest.skipIf
else:
import warnings
def skipIf(cond, msg):
def skipIf_(f):
def skipIf__(self):
if cond:
warnings.warn(msg)
return
else:
return f(self)
return skipIf__
return skipIf_
def skip(msg):
return skipIf(True, msg)
def skipTest(self, msg):
warnings.warn(msg)
return
unittest.TestCase.skipTest = skipTest
def decorate_all_tests(cls, decorator):
"""Apply *decorator* to all the tests defined in the TestCase *cls*."""
for n in dir(cls):
if n.startswith('test'):
setattr(cls, n, decorator(getattr(cls, n)))
def skip_if_no_pg_sleep(name):
"""Decorator to skip a test if pg_sleep is not supported by the server.
Pass it the name of an attribute containing a connection or of a method
returning a connection.
"""
def skip_if_no_pg_sleep_(f):
def skip_if_no_pg_sleep__(self):
cnn = getattr(self, name)
if callable(cnn):
cnn = cnn()
if cnn.server_version < 80100:
return self.skipTest(
"server version %s doesn't support pg_sleep"
% cnn.server_version)
return f(self)
skip_if_no_pg_sleep__.__name__ = f.__name__
return skip_if_no_pg_sleep__
return skip_if_no_pg_sleep_

View File

@ -27,7 +27,7 @@ try:
except:
pass
import sys
import unittest
from testutils import unittest
import psycopg2
import tests
@ -39,6 +39,9 @@ class TypesBasicTests(unittest.TestCase):
def setUp(self):
self.conn = psycopg2.connect(tests.dsn)
def tearDown(self):
self.conn.close()
def execute(self, *args):
curs = self.conn.cursor()
curs.execute(*args)
@ -78,18 +81,27 @@ class TypesBasicTests(unittest.TestCase):
s = self.execute("SELECT %s AS foo", (decimal.Decimal("-infinity"),))
self.failUnless(str(s) == "NaN", "wrong decimal quoting: " + str(s))
self.failUnless(type(s) == decimal.Decimal, "wrong decimal conversion: " + repr(s))
else:
return self.skipTest("decimal not available")
def testFloat(self):
def testFloatNan(self):
try:
float("nan")
except ValueError:
import warnings
warnings.warn("nan not available on this platform")
return
return self.skipTest("nan not available on this platform")
s = self.execute("SELECT %s AS foo", (float("nan"),))
self.failUnless(str(s) == "nan", "wrong float quoting: " + str(s))
self.failUnless(type(s) == float, "wrong float conversion: " + repr(s))
def testFloatInf(self):
try:
self.execute("select 'inf'::float")
except psycopg2.DataError:
return self.skipTest("inf::float not available on the server")
except ValueError:
return self.skipTest("inf not available on this platform")
s = self.execute("SELECT %s AS foo", (float("inf"),))
self.failUnless(str(s) == "inf", "wrong float quoting: " + str(s))
self.failUnless(type(s) == float, "wrong float conversion: " + repr(s))

View File

@ -20,31 +20,58 @@ except:
pass
import re
import sys
import unittest
import warnings
from testutils import unittest
import psycopg2
import psycopg2.extras
import tests
def skip_if_no_uuid(f):
def skip_if_no_uuid_(self):
try:
import uuid
except ImportError:
return self.skipTest("uuid not available in this Python version")
try:
cur = self.conn.cursor()
cur.execute("select typname from pg_type where typname = 'uuid'")
has = cur.fetchone()
finally:
self.conn.rollback()
if has:
return f(self)
else:
return self.skipTest("uuid type not available on the server")
return skip_if_no_uuid_
def filter_scs(conn, s):
if conn.get_parameter_status("standard_conforming_strings") == 'off':
return s
else:
return s.replace("E'", "'")
class TypesExtrasTests(unittest.TestCase):
"""Test that all type conversions are working."""
def setUp(self):
self.conn = psycopg2.connect(tests.dsn)
def tearDown(self):
self.conn.close()
def execute(self, *args):
curs = self.conn.cursor()
curs.execute(*args)
return curs.fetchone()[0]
@skip_if_no_uuid
def testUUID(self):
try:
import uuid
psycopg2.extras.register_uuid()
except:
return
import uuid
psycopg2.extras.register_uuid()
u = uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350')
s = self.execute("SELECT %s AS foo", (u,))
self.failUnless(u == s)
@ -52,12 +79,10 @@ class TypesExtrasTests(unittest.TestCase):
s = self.execute("SELECT NULL::uuid AS foo")
self.failUnless(s is None)
@skip_if_no_uuid
def testUUIDARRAY(self):
try:
import uuid
psycopg2.extras.register_uuid()
except:
return
import uuid
psycopg2.extras.register_uuid()
u = [uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e350'), uuid.UUID('9c6d5a77-7256-457e-9461-347b4358e352')]
s = self.execute("SELECT %s AS foo", (u,))
self.failUnless(u == s)
@ -86,13 +111,17 @@ class TypesExtrasTests(unittest.TestCase):
i = Inet("192.168.1.0/24")
a = psycopg2.extensions.adapt(i)
a.prepare(self.conn)
self.assertEqual("E'192.168.1.0/24'::inet", a.getquoted())
self.assertEqual(
filter_scs(self.conn, "E'192.168.1.0/24'::inet"),
a.getquoted())
# adapts ok with unicode too
i = Inet(u"192.168.1.0/24")
a = psycopg2.extensions.adapt(i)
a.prepare(self.conn)
self.assertEqual("E'192.168.1.0/24'::inet", a.getquoted())
self.assertEqual(
filter_scs(self.conn, "E'192.168.1.0/24'::inet"),
a.getquoted())
def test_adapt_fail(self):
class Foo(object): pass
@ -109,8 +138,7 @@ def skip_if_no_hstore(f):
from psycopg2.extras import HstoreAdapter
oids = HstoreAdapter.get_oids(self.conn)
if oids is None:
warnings.warn("hstore not available in test database: skipping test")
return
return self.skipTest("hstore not available in test database")
return f(self)
return skip_if_no_hstore_
@ -119,14 +147,19 @@ class HstoreTestCase(unittest.TestCase):
def setUp(self):
self.conn = psycopg2.connect(tests.dsn)
def tearDown(self):
self.conn.close()
def test_adapt_8(self):
if self.conn.server_version >= 90000:
warnings.warn("skipping dict adaptation with PG pre-9 syntax")
return
return self.skipTest("skipping dict adaptation with PG pre-9 syntax")
from psycopg2.extras import HstoreAdapter
o = {'a': '1', 'b': "'", 'c': None, 'd': u'\xe0'}
o = {'a': '1', 'b': "'", 'c': None}
if self.conn.encoding == 'UTF8':
o['d'] = u'\xe0'
a = HstoreAdapter(o)
a.prepare(self.conn)
q = a.getquoted()
@ -136,20 +169,24 @@ class HstoreTestCase(unittest.TestCase):
ii = q[1:-1].split("||")
ii.sort()
self.assertEqual(ii[0], "(E'a' => E'1')")
self.assertEqual(ii[1], "(E'b' => E'''')")
self.assertEqual(ii[2], "(E'c' => NULL)")
encc = u'\xe0'.encode(psycopg2.extensions.encodings[self.conn.encoding])
self.assertEqual(ii[3], "(E'd' => E'%s')" % encc)
self.assertEqual(len(ii), len(o))
self.assertEqual(ii[0], filter_scs(self.conn, "(E'a' => E'1')"))
self.assertEqual(ii[1], filter_scs(self.conn, "(E'b' => E'''')"))
self.assertEqual(ii[2], filter_scs(self.conn, "(E'c' => NULL)"))
if 'd' in o:
encc = u'\xe0'.encode(psycopg2.extensions.encodings[self.conn.encoding])
self.assertEqual(ii[3], filter_scs(self.conn, "(E'd' => E'%s')" % encc))
def test_adapt_9(self):
if self.conn.server_version < 90000:
warnings.warn("skipping dict adaptation with PG 9 syntax")
return
return self.skipTest("skipping dict adaptation with PG 9 syntax")
from psycopg2.extras import HstoreAdapter
o = {'a': '1', 'b': "'", 'c': None, 'd': u'\xe0'}
o = {'a': '1', 'b': "'", 'c': None}
if self.conn.encoding == 'UTF8':
o['d'] = u'\xe0'
a = HstoreAdapter(o)
a.prepare(self.conn)
q = a.getquoted()
@ -162,11 +199,13 @@ class HstoreTestCase(unittest.TestCase):
ii = zip(kk, vv)
ii.sort()
self.assertEqual(len(ii), len(o))
self.assertEqual(ii[0], ("E'a'", "E'1'"))
self.assertEqual(ii[1], ("E'b'", "E''''"))
self.assertEqual(ii[2], ("E'c'", "NULL"))
encc = u'\xe0'.encode(psycopg2.extensions.encodings[self.conn.encoding])
self.assertEqual(ii[3], ("E'd'", "E'%s'" % encc))
if 'd' in o:
encc = u'\xe0'.encode(psycopg2.extensions.encodings[self.conn.encoding])
self.assertEqual(ii[3], ("E'd'", "E'%s'" % encc))
def test_parse(self):
from psycopg2.extras import HstoreAdapter
@ -244,11 +283,13 @@ class HstoreTestCase(unittest.TestCase):
try:
register_hstore(self.conn, globally=True)
conn2 = psycopg2.connect(self.conn.dsn)
cur2 = self.conn.cursor()
cur2.execute("select 'a => b'::hstore")
r = cur2.fetchone()
self.assert_(isinstance(r[0], dict))
conn2.close()
try:
cur2 = self.conn.cursor()
cur2.execute("select 'a => b'::hstore")
r = cur2.fetchone()
self.assert_(isinstance(r[0], dict))
finally:
conn2.close()
finally:
psycopg2.extensions.string_types.pop(oids[0])