Added cursor.cast() method

The method exposes the typecasters lookup algorithm. Useful to create
recursive typecasters.
This commit is contained in:
Daniele Varrazzo 2011-01-01 22:55:10 +01:00
parent f9be48d89e
commit 159cda3688
6 changed files with 122 additions and 27 deletions

View File

@ -122,20 +122,6 @@ The ``cursor`` class
values can be retrieved using |fetch*|_ methods.
.. method:: mogrify(operation [, parameters])
Return a query string after arguments binding. The string returned is
exactly the one that would be sent to the database running the
`~cursor.execute()` method or similar.
>>> cur.mogrify("INSERT INTO test (num, data) VALUES (%s, %s)", (42, 'bar'))
"INSERT INTO test (num, data) VALUES (42, E'bar')"
.. extension::
The `mogrify()` method is a Psycopg extension to the |DBAPI|.
.. method:: executemany(operation, seq_of_parameters)
Prepare a database operation (query or command) and then execute it
@ -167,6 +153,34 @@ The ``cursor`` class
does nothing but it is safe to call it.
.. method:: mogrify(operation [, parameters])
Return a query string after arguments binding. The string returned is
exactly the one that would be sent to the database running the
`~cursor.execute()` method or similar.
>>> cur.mogrify("INSERT INTO test (num, data) VALUES (%s, %s)", (42, 'bar'))
"INSERT INTO test (num, data) VALUES (42, E'bar')"
.. extension::
The `mogrify()` method is a Psycopg extension to the |DBAPI|.
.. method:: cast(oid, s)
Convert a value from the PostgreSQL string representation to a Python
object.
Use the most specific of the typecasters registered by
`~psycopg2.extensions.register_type()`.
.. versionadded:: 2.3.3
.. extension::
The `cast()` method is a Psycopg extension to the |DBAPI|.
.. |fetch*| replace:: `!fetch*()`

View File

@ -84,6 +84,7 @@ typedef struct {
} cursorObject;
/* C-callable functions in cursor_int.c and cursor_ext.c */
HIDDEN PyObject *curs_get_cast(cursorObject *self, PyObject *oid);
HIDDEN void curs_reset(cursorObject *self);
/* exception-raising macros */

View File

@ -32,6 +32,41 @@
#include "psycopg/psycopg.h"
#include "psycopg/cursor.h"
#include "psycopg/pqpath.h"
#include "psycopg/typecast.h"
/* curs_get_cast - return the type caster for an oid.
*
* Return the most specific type caster, from cursor to connection to global.
* If no type caster is found, return the default one.
*
* Return a borrowed reference.
*/
PyObject *
curs_get_cast(cursorObject *self, PyObject *oid)
{
PyObject *cast;
/* cursor lookup */
if (self->string_types != NULL && self->string_types != Py_None) {
cast = PyDict_GetItem(self->string_types, oid);
Dprintf("curs_get_cast: per-cursor dict: %p", cast);
if (cast) { return cast; }
}
/* connection lookup */
cast = PyDict_GetItem(self->conn->string_types, oid);
Dprintf("curs_get_cast: per-connection dict: %p", cast);
if (cast) { return cast; }
/* global lookup */
cast = PyDict_GetItem(psyco_types, oid);
Dprintf("curs_get_cast: global dict: %p", cast);
if (cast) { return cast; }
/* fallback */
return psyco_default_cast;
}
/* curs_reset - reset the cursor to a clean state */

View File

@ -629,6 +629,29 @@ psyco_curs_mogrify(cursorObject *self, PyObject *args, PyObject *kwargs)
#endif
/* cast method - convert an oid/string into a Python object */
#define psyco_curs_cast_doc \
"cast(oid, s) -> value\n\n" \
"Convert the string s to a Python object according to its oid.\n\n" \
"Look for a typecaster first in the cursor, then in its connection," \
"then in the global register. If no suitable typecaster is found," \
"leave the value as a string."
static PyObject *
psyco_curs_cast(cursorObject *self, PyObject *args)
{
PyObject *oid;
PyObject *s;
PyObject *cast;
if (!PyArg_ParseTuple(args, "OO", &oid, &s))
return NULL;
cast = curs_get_cast(self, oid);
return PyObject_CallFunctionObjArgs(cast, s, (PyObject *)self, NULL);
}
/* fetchone method - fetch one row of results */
#define psyco_curs_fetchone_doc \
@ -1524,6 +1547,8 @@ static struct PyMethodDef cursorObject_methods[] = {
METH_VARARGS|METH_KEYWORDS, psyco_curs_scroll_doc},
/* psycopg extensions */
#ifdef PSYCOPG_EXTENSIONS
{"cast", (PyCFunction)psyco_curs_cast,
METH_VARARGS, psyco_curs_cast_doc},
{"mogrify", (PyCFunction)psyco_curs_mogrify,
METH_VARARGS|METH_KEYWORDS, psyco_curs_mogrify_doc},
{"copy_from", (PyCFunction)psyco_curs_copy_from,

View File

@ -958,19 +958,7 @@ _pq_fetch_tuples(cursorObject *curs)
type = PyInt_FromLong(ftype);
Dprintf("_pq_fetch_tuples: looking for cast %d:", ftype);
if (curs->string_types != NULL && curs->string_types != Py_None) {
cast = PyDict_GetItem(curs->string_types, type);
Dprintf("_pq_fetch_tuples: per-cursor dict: %p", cast);
}
if (cast == NULL) {
cast = PyDict_GetItem(curs->conn->string_types, type);
Dprintf("_pq_fetch_tuples: per-connection dict: %p", cast);
}
if (cast == NULL) {
cast = PyDict_GetItem(psyco_types, type);
Dprintf("_pq_fetch_tuples: global dict: %p", cast);
}
if (cast == NULL) cast = psyco_default_cast;
cast = curs_get_cast(curs, type);
/* else if we got binary tuples and if we got a field that
is binary use the default cast

View File

@ -67,6 +67,38 @@ class CursorTests(unittest.TestCase):
self.assertEqual('SELECT 10.3;',
cur.mogrify("SELECT %s;", (Decimal("10.3"),)))
def test_cast(self):
curs = self.conn.cursor()
self.assertEqual(42, curs.cast(20, '42'))
self.assertAlmostEqual(3.14, curs.cast(700, '3.14'))
try:
from decimal import Decimal
except ImportError:
self.assertAlmostEqual(123.45, curs.cast(1700, '123.45'))
else:
self.assertEqual(Decimal('123.45'), curs.cast(1700, '123.45'))
from datetime import date
self.assertEqual(date(2011,1,2), curs.cast(1082, '2011-01-02'))
self.assertEqual("who am i?", curs.cast(705, 'who am i?')) # unknown
def test_cast_specificity(self):
curs = self.conn.cursor()
self.assertEqual("foo", curs.cast(705, 'foo'))
D = psycopg2.extensions.new_type((705,), "DOUBLING", lambda v, c: v * 2)
psycopg2.extensions.register_type(D, self.conn)
self.assertEqual("foofoo", curs.cast(705, 'foo'))
T = psycopg2.extensions.new_type((705,), "TREBLING", lambda v, c: v * 3)
psycopg2.extensions.register_type(T, curs)
self.assertEqual("foofoofoo", curs.cast(705, 'foo'))
curs2 = self.conn.cursor()
self.assertEqual("foofoo", curs2.cast(705, 'foo'))
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)