Merge branch 'python2' into python3

Conflicts:
	NEWS-2.3
	setup.py
This commit is contained in:
Daniele Varrazzo 2011-01-02 03:28:00 +01:00
commit 929d62053a
14 changed files with 522 additions and 49 deletions

View File

@ -27,12 +27,14 @@ ENV_LIB = $(ENV_DIR)/lib
SOURCE_C := $(wildcard psycopg/*.c psycopg/*.h)
SOURCE_PY := $(wildcard lib/*.py)
SOURCE_TESTS := $(wildcard tests/*.py)
SOURCE_DOC := $(wildcard doc/src/*.rst)
SOURCE := $(SOURCE_C) $(SOURCE_PY) $(SOURCE_DOC)
SOURCE := $(SOURCE_C) $(SOURCE_PY) $(SOURCE_TESTS) $(SOURCE_DOC)
PACKAGE := $(BUILD_DIR)/psycopg2
PLATLIB := $(PACKAGE)/_psycopg.so
PURELIB := $(patsubst lib/%,$(PACKAGE)/%,$(SOURCE_PY))
PURELIB := $(patsubst lib/%,$(PACKAGE)/%,$(SOURCE_PY)) \
$(patsubst tests/%,$(PACKAGE)/tests/%,$(SOURCE_TESTS))
BUILD_OPT := --build-lib=$(BUILD_DIR)
BUILD_EXT_OPT := --build-lib=$(BUILD_DIR)
@ -104,8 +106,12 @@ $(PLATLIB): $(SOURCE_C)
$(PYTHON) setup.py build_ext $(BUILD_EXT_OPT)
$(PACKAGE)/%.py: lib/%.py
$(PYTHON) setup.py build $(BUILD_OPT)
$(PYTHON) setup.py build_py $(BUILD_OPT)
touch $@
$(PACKAGE)/tests/%.py: tests/%.py
$(PYTHON) setup.py build_py $(BUILD_OPT)
touch $@
$(SDIST): docs MANIFEST $(SOURCE)
$(PYTHON) setup.py sdist $(SDIST_OPT)

View File

@ -1,15 +1,18 @@
What's new in psycopg 2.3.3
---------------------------
* Changes:
* New features and changes:
- Added `register_composite()` function to cast PostgreSQL composite types
into Python tuples/namedtuples.
- The build script refuses to guess values if pg_config is not found.
- Improved PostgreSQL-Python encodings mapping. Added a few
missing encodings: EUC_CN, EUC_JIS_2004, ISO885910, ISO885916,
LATIN10, SHIFT_JIS_2004.
- Dropped repeated dictionary lookups with unicode query/parameters.
- Empty lists correctly roundtrip Python -> PostgreSQL -> Python.
* Bug fixes
* Bug fixes:
- Fixed adaptation of None in composite types (ticket #26). Bug report by
Karsten Hilbert.

View File

@ -122,20 +122,6 @@ The ``cursor`` class
values can be retrieved using |fetch*|_ methods.
.. method:: mogrify(operation [, parameters])
Return a query string after arguments binding. The string returned is
exactly the one that would be sent to the database running the
`~cursor.execute()` method or similar.
>>> cur.mogrify("INSERT INTO test (num, data) VALUES (%s, %s)", (42, 'bar'))
"INSERT INTO test (num, data) VALUES (42, E'bar')"
.. extension::
The `mogrify()` method is a Psycopg extension to the |DBAPI|.
.. method:: executemany(operation, seq_of_parameters)
Prepare a database operation (query or command) and then execute it
@ -167,6 +153,34 @@ The ``cursor`` class
does nothing but it is safe to call it.
.. method:: mogrify(operation [, parameters])
Return a query string after arguments binding. The string returned is
exactly the one that would be sent to the database running the
`~cursor.execute()` method or similar.
>>> cur.mogrify("INSERT INTO test (num, data) VALUES (%s, %s)", (42, 'bar'))
"INSERT INTO test (num, data) VALUES (42, E'bar')"
.. extension::
The `mogrify()` method is a Psycopg extension to the |DBAPI|.
.. method:: cast(oid, s)
Convert a value from the PostgreSQL string representation to a Python
object.
Use the most specific of the typecasters registered by
`~psycopg2.extensions.register_type()`.
.. versionadded:: 2.3.3
.. extension::
The `cast()` method is a Psycopg extension to the |DBAPI|.
.. |fetch*| replace:: `!fetch*()`

View File

@ -157,6 +157,55 @@ can be enabled using the `register_hstore()` function.
.. index::
pair: Composite types; Data types
pair: tuple; Adaptation
pair: namedtuple; Adaptation
Composite types casting
^^^^^^^^^^^^^^^^^^^^^^^
.. versionadded:: 2.3.3
Using `register_composite()` it is possible to cast a PostgreSQL composite
type (e.g. created with |CREATE TYPE|_ command) into a Python named tuple, or
into a regular tuple if `!collections.namedtuple()` is not found.
.. |CREATE TYPE| replace:: :sql:`CREATE TYPE`
.. _CREATE TYPE: http://www.postgresql.org/docs/9.0/static/sql-createtype.html
.. doctest::
>>> cur.execute("CREATE TYPE card AS (value int, suit text);")
>>> psycopg2.extras.register_composite('card', cur)
<psycopg2.extras.CompositeCaster object at 0x...>
>>> cur.execute("select (8, 'hearts')::card")
>>> cur.fetchone()[0]
card(value=8, suit='hearts')
Nested composite types are handled as expected, but the type of the composite
components must be registered as well.
.. doctest::
>>> cur.execute("CREATE TYPE card_back AS (face card, back text);")
>>> psycopg2.extras.register_composite('card_back', cur)
<psycopg2.extras.CompositeCaster object at 0x...>
>>> cur.execute("select ((8, 'hearts'), 'blue')::card_back")
>>> cur.fetchone()[0]
card_back(face=card(value=8, suit='hearts'), back='blue')
Adaptation from Python tuples to composite types is automatic instead and
requires no adapter registration.
.. autofunction:: register_composite
.. autoclass:: CompositeCaster
.. index::
pair: UUID; Data types

View File

@ -280,7 +280,7 @@ the SQL string that would be sent to the database.
single: IN operator
- Python tuples are converted in a syntax suitable for the SQL :sql:`IN`
operator::
operator and to represent a composite type::
>>> cur.mogrify("SELECT %s IN %s;", (10, (10, 20, 30)))
'SELECT 10 IN (10, 20, 30);'
@ -290,6 +290,10 @@ the SQL string that would be sent to the database.
SQL doesn't allow an empty list in the IN operator, so your code should
guard against empty tuples.
If you want PostgreSQL composite types to be converted into a Python
tuple/namedtuple you can use the `~psycopg2.extras.register_composite()`
function.
.. versionadded:: 2.0.6
the tuple :sql:`IN` adaptation.
@ -298,6 +302,10 @@ the SQL string that would be sent to the database.
was necessary to import the `~psycopg2.extensions` module to have it
registered.
.. versionchanged:: 2.3
named tuples are adapted like regular tuples and can thus be used to
represent composite types.
- Python dictionaries are converted into the |hstore|_ data type. See
`~psycopg2.extras.register_hstore()` for further details.
@ -307,6 +315,7 @@ the SQL string that would be sent to the database.
.. versionadded:: 2.3
the :sql:`hstore` adaptation.
.. index::
single: Unicode

View File

@ -735,4 +735,146 @@ def register_hstore(conn_or_curs, globally=False, unicode=False):
_ext.register_adapter(dict, HstoreAdapter)
class CompositeCaster(object):
"""Helps conversion of a PostgreSQL composite type into a Python object.
The class is usually created by the `register_composite()` function.
.. attribute:: name
The name of the PostgreSQL type.
.. attribute:: oid
The oid of the PostgreSQL type.
.. attribute:: type
The type of the Python objects returned. If `!collections.namedtuple()`
is available, it is a named tuple with attributes equal to the type
components. Otherwise it is just the `tuple` object.
.. attribute:: attnames
List of component names of the type to be casted.
.. attribute:: atttypes
List of component type oids of the type to be casted.
"""
def __init__(self, name, oid, attrs):
self.name = name
self.oid = oid
self.attnames = [ a[0] for a in attrs ]
self.atttypes = [ a[1] for a in attrs ]
self.type = self._create_type(name, self.attnames)
self.typecaster = _ext.new_type((oid,), name, self.parse)
def parse(self, s, curs):
if s is None:
return None
tokens = self.tokenize(s)
if len(tokens) != len(self.atttypes):
raise psycopg2.DataError(
"expecting %d components for the type %s, %d found instead",
(len(self.atttypes), self.name, len(self.tokens)))
attrs = [ curs.cast(oid, token)
for oid, token in zip(self.atttypes, tokens) ]
return self.type(*attrs)
_re_tokenize = regex.compile(r"""
\(? ([,\)]) # an empty token, representing NULL
| \(? " ((?: [^"] | "")*) " [,)] # or a quoted string
| \(? ([^",\)]+) [,\)] # or an unquoted string
""", regex.VERBOSE)
_re_undouble = regex.compile(r'(["\\])\1')
@classmethod
def tokenize(self, s):
rv = []
for m in self._re_tokenize.finditer(s):
if m is None:
raise psycopg2.InterfaceError("can't parse type: %r", s)
if m.group(1):
rv.append(None)
elif m.group(2):
rv.append(self._re_undouble.sub(r"\1", m.group(2)))
else:
rv.append(m.group(3))
return rv
def _create_type(self, name, attnames):
try:
from collections import namedtuple
except ImportError:
return tuple
else:
return namedtuple(name, attnames)
@classmethod
def _from_db(self, name, conn_or_curs):
"""Return a `CompositeCaster` instance for the type *name*.
Raise `ProgrammingError` if the type is not found.
"""
if hasattr(conn_or_curs, 'execute'):
conn = conn_or_curs.connection
curs = conn_or_curs
else:
conn = conn_or_curs
curs = conn_or_curs.cursor()
# Store the transaction status of the connection to revert it after use
conn_status = conn.status
# get the type oid and attributes
curs.execute("""\
SELECT t.oid, attname, atttypid
FROM pg_type t
JOIN pg_namespace ns ON typnamespace = ns.oid
JOIN pg_attribute a ON attrelid = typrelid
WHERE typname = %s and nspname = 'public';
""", (name, ))
recs = curs.fetchall()
# revert the status of the connection as before the command
if (conn_status != _ext.STATUS_IN_TRANSACTION
and conn.isolation_level != _ext.ISOLATION_LEVEL_AUTOCOMMIT):
conn.rollback()
if not recs:
raise psycopg2.ProgrammingError(
"PostgreSQL type '%s' not found" % name)
type_oid = recs[0][0]
type_attrs = [ (r[1], r[2]) for r in recs ]
return CompositeCaster(name, type_oid, type_attrs)
def register_composite(name, conn_or_curs, globally=False):
"""Register a typecaster to convert a composite type into a tuple.
:param name: the name of a PostgreSQL composite type, e.g. created using
the |CREATE TYPE|_ command
:param conn_or_curs: a connection or cursor used to find the type oid and
components; the typecaster is registered in a scope limited to this
object, unless *globally* is set to `True`
:param globally: if `False` (default) register the typecaster only on
*conn_or_curs*, otherwise register it globally
:return: the registered `CompositeCaster` instance responsible for the
conversion
"""
caster = CompositeCaster._from_db(name, conn_or_curs)
_ext.register_type(caster.typecaster, not globally and conn_or_curs or None)
return caster
__all__ = filter(lambda k: not k.startswith('_'), locals().keys())

View File

@ -79,6 +79,7 @@ typedef struct {
} cursorObject;
/* C-callable functions in cursor_int.c and cursor_ext.c */
HIDDEN PyObject *curs_get_cast(cursorObject *self, PyObject *oid);
HIDDEN void curs_reset(cursorObject *self);
/* exception-raising macros */

View File

@ -28,6 +28,41 @@
#include "psycopg/cursor.h"
#include "psycopg/pqpath.h"
#include "psycopg/typecast.h"
/* curs_get_cast - return the type caster for an oid.
*
* Return the most specific type caster, from cursor to connection to global.
* If no type caster is found, return the default one.
*
* Return a borrowed reference.
*/
PyObject *
curs_get_cast(cursorObject *self, PyObject *oid)
{
PyObject *cast;
/* cursor lookup */
if (self->string_types != NULL && self->string_types != Py_None) {
cast = PyDict_GetItem(self->string_types, oid);
Dprintf("curs_get_cast: per-cursor dict: %p", cast);
if (cast) { return cast; }
}
/* connection lookup */
cast = PyDict_GetItem(self->conn->string_types, oid);
Dprintf("curs_get_cast: per-connection dict: %p", cast);
if (cast) { return cast; }
/* global lookup */
cast = PyDict_GetItem(psyco_types, oid);
Dprintf("curs_get_cast: global dict: %p", cast);
if (cast) { return cast; }
/* fallback */
return psyco_default_cast;
}
#include <string.h>

View File

@ -616,6 +616,29 @@ psyco_curs_mogrify(cursorObject *self, PyObject *args, PyObject *kwargs)
#endif
/* cast method - convert an oid/string into a Python object */
#define psyco_curs_cast_doc \
"cast(oid, s) -> value\n\n" \
"Convert the string s to a Python object according to its oid.\n\n" \
"Look for a typecaster first in the cursor, then in its connection," \
"then in the global register. If no suitable typecaster is found," \
"leave the value as a string."
static PyObject *
psyco_curs_cast(cursorObject *self, PyObject *args)
{
PyObject *oid;
PyObject *s;
PyObject *cast;
if (!PyArg_ParseTuple(args, "OO", &oid, &s))
return NULL;
cast = curs_get_cast(self, oid);
return PyObject_CallFunctionObjArgs(cast, s, (PyObject *)self, NULL);
}
/* fetchone method - fetch one row of results */
#define psyco_curs_fetchone_doc \
@ -1508,6 +1531,8 @@ static struct PyMethodDef cursorObject_methods[] = {
METH_VARARGS|METH_KEYWORDS, psyco_curs_scroll_doc},
/* psycopg extensions */
#ifdef PSYCOPG_EXTENSIONS
{"cast", (PyCFunction)psyco_curs_cast,
METH_VARARGS, psyco_curs_cast_doc},
{"mogrify", (PyCFunction)psyco_curs_mogrify,
METH_VARARGS|METH_KEYWORDS, psyco_curs_mogrify_doc},
{"copy_from", (PyCFunction)psyco_curs_copy_from,

View File

@ -954,19 +954,7 @@ _pq_fetch_tuples(cursorObject *curs)
type = PyInt_FromLong(ftype);
Dprintf("_pq_fetch_tuples: looking for cast %d:", ftype);
if (curs->string_types != NULL && curs->string_types != Py_None) {
cast = PyDict_GetItem(curs->string_types, type);
Dprintf("_pq_fetch_tuples: per-cursor dict: %p", cast);
}
if (cast == NULL) {
cast = PyDict_GetItem(curs->conn->string_types, type);
Dprintf("_pq_fetch_tuples: per-connection dict: %p", cast);
}
if (cast == NULL) {
cast = PyDict_GetItem(psyco_types, type);
Dprintf("_pq_fetch_tuples: global dict: %p", cast);
}
if (cast == NULL) cast = psyco_default_cast;
cast = curs_get_cast(curs, type);
/* else if we got binary tuples and if we got a field that
is binary use the default cast

View File

@ -73,14 +73,14 @@ version_flags = ['dt', 'dec']
PLATFORM_IS_WINDOWS = sys.platform.lower().startswith('win')
def get_pg_config(kind, pg_config="pg_config"):
def get_pg_config(kind, pg_config):
try:
p = subprocess.Popen([pg_config, "--" + kind],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
except OSError:
raise Warning("Unable to find 'pg_config' file")
raise Warning("Unable to find 'pg_config' file in '%s'" % pg_config)
p.stdin.close()
r = p.stdout.readline().strip()
if not r:
@ -114,8 +114,6 @@ class psycopg_build_ext(build_ext):
boolean_options = build_ext.boolean_options[:]
boolean_options.extend(('use-pydatetime', 'have-ssl', 'static-libpq'))
DEFAULT_PG_CONFIG = "pg_config"
def initialize_options(self):
build_ext.initialize_options(self)
self.use_pg_dll = 1
@ -123,8 +121,7 @@ class psycopg_build_ext(build_ext):
self.mx_include_dir = None
self.use_pydatetime = 1
self.have_ssl = have_ssl
self.pg_config = self.autodetect_pg_config_path()
self.pg_config = None
def get_compiler(self):
"""Return the name of the C compiler used to compile extensions.
@ -222,6 +219,20 @@ class psycopg_build_ext(build_ext):
def finalize_options(self):
"""Complete the build system configuation."""
build_ext.finalize_options(self)
if self.pg_config is None:
self.pg_config = self.autodetect_pg_config_path()
if self.pg_config is None:
sys.stderr.write("""\
Error: pg_config executable not found.
Please add the directory containing pg_config to the PATH
or specify the full executable path with the option:
python setup.py build_ext --pg-config /path/to/pg_config build ...
or with the pg_config option in 'setup.cfg'.
""")
sys.exit(1)
self.include_dirs.append(".")
if static_libpq:
@ -258,23 +269,25 @@ class psycopg_build_ext(build_ext):
define_macros.append(("PG_VERSION_HEX", "0x%02X%02X%02X" %
(int(pgmajor), int(pgminor), int(pgpatch))))
except Warning:
w = sys.exc_info() # work around py 2/3 different syntax
if self.pg_config == self.DEFAULT_PG_CONFIG:
sys.stderr.write("Warning: %s" % str(w))
else:
sys.stderr.write("Error: %s" % str(w))
sys.exit(1)
w = sys.exc_info()[1] # work around py 2/3 different syntax
sys.stderr.write("Error: %s\n" % w)
sys.exit(1)
if hasattr(self, "finalize_" + sys.platform):
getattr(self, "finalize_" + sys.platform)()
def autodetect_pg_config_path(self):
res = None
if PLATFORM_IS_WINDOWS:
res = self.autodetect_pg_config_path_windows()
return self.autodetect_pg_config_path_windows()
else:
return self.autodetect_pg_config_path_posix()
return res or self.DEFAULT_PG_CONFIG
def autodetect_pg_config_path_posix(self):
exename = 'pg_config'
for dir in os.environ['PATH'].split(os.pathsep):
fn = os.path.join(dir, exename)
if os.path.isfile(fn):
return fn
def autodetect_pg_config_path_windows(self):
# Find the first PostgreSQL installation listed in the registry and

View File

@ -15,6 +15,7 @@ except Exception, e:
else:
cnn.close()
import bug_gc
import bugX000
import extras_dictcursor
import test_dates
@ -34,6 +35,7 @@ import test_cancel
def test_suite():
suite = unittest.TestSuite()
suite.addTest(bug_gc.test_suite())
suite.addTest(bugX000.test_suite())
suite.addTest(extras_dictcursor.test_suite())
suite.addTest(test_dates.test_suite())

View File

@ -68,6 +68,38 @@ class CursorTests(unittest.TestCase):
self.assertEqual(b('SELECT 10.3;'),
cur.mogrify("SELECT %s;", (Decimal("10.3"),)))
def test_cast(self):
curs = self.conn.cursor()
self.assertEqual(42, curs.cast(20, '42'))
self.assertAlmostEqual(3.14, curs.cast(700, '3.14'))
try:
from decimal import Decimal
except ImportError:
self.assertAlmostEqual(123.45, curs.cast(1700, '123.45'))
else:
self.assertEqual(Decimal('123.45'), curs.cast(1700, '123.45'))
from datetime import date
self.assertEqual(date(2011,1,2), curs.cast(1082, '2011-01-02'))
self.assertEqual("who am i?", curs.cast(705, 'who am i?')) # unknown
def test_cast_specificity(self):
curs = self.conn.cursor()
self.assertEqual("foo", curs.cast(705, 'foo'))
D = psycopg2.extensions.new_type((705,), "DOUBLING", lambda v, c: v * 2)
psycopg2.extensions.register_type(D, self.conn)
self.assertEqual("foofoo", curs.cast(705, 'foo'))
T = psycopg2.extensions.new_type((705,), "TREBLING", lambda v, c: v * 3)
psycopg2.extensions.register_type(T, curs)
self.assertEqual("foofoofoo", curs.cast(705, 'foo'))
curs2 = self.conn.cursor()
self.assertEqual("foofoo", curs2.cast(705, 'foo'))
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)

View File

@ -20,6 +20,8 @@ except:
pass
import re
import sys
from datetime import date
from testutils import unittest
import psycopg2
@ -390,6 +392,158 @@ class AdaptTypeTestCase(unittest.TestCase):
finally:
ext.register_adapter(type(None), orig_adapter)
def test_tokenization(self):
from psycopg2.extras import CompositeCaster
def ok(s, v):
self.assertEqual(CompositeCaster.tokenize(s), v)
ok("(,)", [None, None])
ok('(hello,,10.234,2010-11-11)', ['hello', None, '10.234', '2010-11-11'])
ok('(10,"""")', ['10', '"'])
ok('(10,",")', ['10', ','])
ok(r'(10,"\\")', ['10', '\\'])
ok(r'''(10,"\\',""")''', ['10', '''\\',"'''])
ok('(10,"(20,""(30,40)"")")', ['10', '(20,"(30,40)")'])
ok('(10,"(20,""(30,""""(40,50)"""")"")")', ['10', '(20,"(30,""(40,50)"")")'])
ok('(,"(,""(a\nb\tc)"")")', [None, '(,"(a\nb\tc)")'])
ok('(\x01,\x02,\x03,\x04,\x05,\x06,\x07,\x08,"\t","\n","\x0b",'
'"\x0c","\r",\x0e,\x0f,\x10,\x11,\x12,\x13,\x14,\x15,\x16,'
'\x17,\x18,\x19,\x1a,\x1b,\x1c,\x1d,\x1e,\x1f," ",!,"""",#,'
'$,%,&,\',"(",")",*,+,",",-,.,/,0,1,2,3,4,5,6,7,8,9,:,;,<,=,>,?,'
'@,A,B,C,D,E,F,G,H,I,J,K,L,M,N,O,P,Q,R,S,T,U,V,W,X,Y,Z,[,"\\\\",],'
'^,_,`,a,b,c,d,e,f,g,h,i,j,k,l,m,n,o,p,q,r,s,t,u,v,w,x,y,z,{,|,},'
'~,\x7f)',
map(chr, range(1, 128)))
ok('(,"\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f'
'\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f !'
'""#$%&\'()*+,-./0123456789:;<=>?@ABCDEFGHIJKLMNOPQRSTUVWXYZ[\\\\]'
'^_`abcdefghijklmnopqrstuvwxyz{|}~\x7f")',
[None, ''.join(map(chr, range(1, 128)))])
def test_cast_composite(self):
oid = self._create_type("type_isd",
[('anint', 'integer'), ('astring', 'text'), ('adate', 'date')])
t = psycopg2.extras.register_composite("type_isd", self.conn)
self.assertEqual(t.name, 'type_isd')
self.assertEqual(t.oid, oid)
self.assert_(issubclass(t.type, tuple))
self.assertEqual(t.attnames, ['anint', 'astring', 'adate'])
self.assertEqual(t.atttypes, [23,25,1082])
curs = self.conn.cursor()
r = (10, 'hello', date(2011,1,2))
curs.execute("select %s::type_isd;", (r,))
v = curs.fetchone()[0]
self.assert_(isinstance(v, t.type))
self.assertEqual(v[0], 10)
self.assertEqual(v[1], "hello")
self.assertEqual(v[2], date(2011,1,2))
try:
from collections import namedtuple
except ImportError:
pass
else:
self.assert_(t.type is not tuple)
self.assertEqual(v.anint, 10)
self.assertEqual(v.astring, "hello")
self.assertEqual(v.adate, date(2011,1,2))
def test_cast_nested(self):
self._create_type("type_is",
[("anint", "integer"), ("astring", "text")])
self._create_type("type_r_dt",
[("adate", "date"), ("apair", "type_is")])
self._create_type("type_r_ft",
[("afloat", "float8"), ("anotherpair", "type_r_dt")])
psycopg2.extras.register_composite("type_is", self.conn)
psycopg2.extras.register_composite("type_r_dt", self.conn)
psycopg2.extras.register_composite("type_r_ft", self.conn)
curs = self.conn.cursor()
r = (0.25, (date(2011,1,2), (42, "hello")))
curs.execute("select %s::type_r_ft;", (r,))
v = curs.fetchone()[0]
self.assertEqual(r, v)
try:
from collections import namedtuple
except ImportError:
pass
else:
self.assertEqual(v.anotherpair.apair.astring, "hello")
def test_register_on_cursor(self):
self._create_type("type_ii", [("a", "integer"), ("b", "integer")])
curs1 = self.conn.cursor()
curs2 = self.conn.cursor()
psycopg2.extras.register_composite("type_ii", curs1)
curs1.execute("select (1,2)::type_ii")
self.assertEqual(curs1.fetchone()[0], (1,2))
curs2.execute("select (1,2)::type_ii")
self.assertEqual(curs2.fetchone()[0], "(1,2)")
def test_register_on_connection(self):
self._create_type("type_ii", [("a", "integer"), ("b", "integer")])
conn1 = psycopg2.connect(self.conn.dsn)
conn2 = psycopg2.connect(self.conn.dsn)
try:
psycopg2.extras.register_composite("type_ii", conn1)
curs1 = conn1.cursor()
curs2 = conn2.cursor()
curs1.execute("select (1,2)::type_ii")
self.assertEqual(curs1.fetchone()[0], (1,2))
curs2.execute("select (1,2)::type_ii")
self.assertEqual(curs2.fetchone()[0], "(1,2)")
finally:
conn1.close()
conn2.close()
def test_register_globally(self):
self._create_type("type_ii", [("a", "integer"), ("b", "integer")])
conn1 = psycopg2.connect(self.conn.dsn)
conn2 = psycopg2.connect(self.conn.dsn)
try:
t = psycopg2.extras.register_composite("type_ii", conn1, globally=True)
try:
curs1 = conn1.cursor()
curs2 = conn2.cursor()
curs1.execute("select (1,2)::type_ii")
self.assertEqual(curs1.fetchone()[0], (1,2))
curs2.execute("select (1,2)::type_ii")
self.assertEqual(curs2.fetchone()[0], (1,2))
finally:
del psycopg2.extensions.string_types[t.oid]
finally:
conn1.close()
conn2.close()
def _create_type(self, name, fields):
curs = self.conn.cursor()
try:
curs.execute("drop type %s cascade;" % name)
except psycopg2.ProgrammingError:
self.conn.rollback()
curs.execute("create type %s as (%s);" % (name,
", ".join(["%s %s" % p for p in fields])))
curs.execute("""\
SELECT t.oid
FROM pg_type t JOIN pg_namespace ns ON typnamespace = ns.oid
WHERE typname = %s and nspname = 'public';
""", (name,))
oid = curs.fetchone()[0]
self.conn.commit()
return oid
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)