diff --git a/lib/sql.py b/lib/sql.py index 287a1f7e..b465cbee 100644 --- a/lib/sql.py +++ b/lib/sql.py @@ -145,6 +145,20 @@ class SQL(Composable): """ A `Composable` representing a snippet of SQL string to be included verbatim. + `!SQL` supports the ``%`` operator to incorporate variable parts of a query + into a template: the operator takes a sequence or mapping of `Composable` + (according to the style of the placeholders in the *string*) and returning + a `Composed` object. + + Example:: + + >>> query = sql.SQL("select %s from %s") % [ + ... sql.SQL(', ').join([sql.Identifier('foo'), sql.Identifier('bar')]), + ... sql.Identifier('table')] + >>> query.as_string(conn) + select "foo", "bar" from "table"' + + .. automethod:: join """ def __init__(self, string): @@ -155,6 +169,9 @@ class SQL(Composable): def __repr__(self): return "sql.SQL(%r)" % (self._wrapped,) + def __mod__(self, args): + return _compose(self._wrapped, args) + def as_string(self, conn_or_curs): return self._wrapped @@ -162,7 +179,7 @@ class SQL(Composable): """ Join a sequence of `Composable` or a `Composed` and return a `!Composed`. - Use the object value to separate the *seq* elements. + Use the object *string* to separate the *seq* elements. Example:: @@ -215,7 +232,13 @@ class Literal(Composable): """ Represent an SQL value to be included in a query. - The object follows the normal :ref:`adaptation rules ` + Usually you will want to include placeholders in the query and pass values + as `~cursor.execute()` arguments. If however you really really need to + include a literal value in the query you can use this object. + + The string returned by `!as_string()` follows the normal :ref:`adaptation + rules ` for Python objects. + """ def __init__(self, wrapped): self._wrapped = wrapped @@ -254,13 +277,13 @@ class Placeholder(Composable): Examples:: - >>> sql.compose("insert into table (%s) values (%s)", [ + >>> (sql.SQL("insert into table (%s) values (%s)") % [ ... sql.SQL(', ').join(map(sql.Identifier, names)), ... sql.SQL(', ').join(sql.Placeholder() * 3) ... ]).as_string(conn) 'insert into table ("foo", "bar", "baz") values (%s, %s, %s)' - >>> sql.compose("insert into table (%s) values (%s)", [ + >>> (sql.SQL("insert into table (%s) values (%s)") % [ ... sql.SQL(', ').join(map(sql.Identifier, names)), ... sql.SQL(', ').join(map(sql.Placeholder, names)) ... ]).as_string(conn) @@ -297,7 +320,7 @@ re_compose = re.compile(""" """, re.VERBOSE) -def compose(sql, args=None): +def _compose(sql, args=None): """ Merge an SQL string with some variable parts. @@ -313,17 +336,6 @@ def compose(sql, args=None): The value returned is a `Composed` instance obtained replacing the arguments to the query placeholders. - - Example:: - - >>> query = sql.compose( - ... "select %s from %s", [ - ... sql.SQL(', ').join( - ... [sql.Identifier('foo'), sql.Identifier('bar')]), - ... sql.Identifier('table')]) - >>> query.as_string(conn) - select "foo", "bar" from "table"' - """ if args is None: args = () diff --git a/tests/test_sql.py b/tests/test_sql.py index 7fe201ae..5482ccb5 100755 --- a/tests/test_sql.py +++ b/tests/test_sql.py @@ -30,61 +30,60 @@ from psycopg2 import sql class ComposeTests(ConnectingTestCase): def test_pos(self): - s = sql.compose("select %s from %s", - (sql.Identifier('field'), sql.Identifier('table'))) + s = sql.SQL("select %s from %s") \ + % (sql.Identifier('field'), sql.Identifier('table')) s1 = s.as_string(self.conn) self.assert_(isinstance(s1, str)) self.assertEqual(s1, 'select "field" from "table"') def test_dict(self): - s = sql.compose("select %(f)s from %(t)s", - {'f': sql.Identifier('field'), 't': sql.Identifier('table')}) + s = sql.SQL("select %(f)s from %(t)s") \ + % {'f': sql.Identifier('field'), 't': sql.Identifier('table')} s1 = s.as_string(self.conn) self.assert_(isinstance(s1, str)) self.assertEqual(s1, 'select "field" from "table"') def test_unicode(self): - s = sql.compose(u"select %s from %s", - (sql.Identifier(u'field'), sql.Identifier('table'))) + s = sql.SQL(u"select %s from %s") \ + % (sql.Identifier(u'field'), sql.Identifier('table')) s1 = s.as_string(self.conn) self.assert_(isinstance(s1, unicode)) self.assertEqual(s1, u'select "field" from "table"') def test_compose_literal(self): - s = sql.compose("select %s;", [sql.Literal(dt.date(2016, 12, 31))]) + s = sql.SQL("select %s;") % [sql.Literal(dt.date(2016, 12, 31))] s1 = s.as_string(self.conn) self.assertEqual(s1, "select '2016-12-31'::date;") def test_compose_empty(self): - s = sql.compose("select foo;") + s = sql.SQL("select foo;") % () s1 = s.as_string(self.conn) self.assertEqual(s1, "select foo;") def test_percent_escape(self): - s = sql.compose("42 %% %s", [sql.Literal(7)]) + s = sql.SQL("42 %% %s") % [sql.Literal(7)] s1 = s.as_string(self.conn) self.assertEqual(s1, "42 % 7") - s = sql.compose("42 %% 7") + s = sql.SQL("42 %% 7") % [] s1 = s.as_string(self.conn) self.assertEqual(s1, "42 % 7") def test_compose_badnargs(self): - self.assertRaises(ValueError, sql.compose, "select foo;", [10]) - self.assertRaises(ValueError, sql.compose, "select %s;") - self.assertRaises(ValueError, sql.compose, "select %s;", []) - self.assertRaises(ValueError, sql.compose, "select %s;", [10, 20]) + self.assertRaises(ValueError, sql.SQL("select foo;").__mod__, [10]) + self.assertRaises(ValueError, sql.SQL("select %s;").__mod__, []) + self.assertRaises(ValueError, sql.SQL("select %s;").__mod__, [10, 20]) def test_compose_bad_args_type(self): - self.assertRaises(TypeError, sql.compose, "select %s;", {'a': 10}) - self.assertRaises(TypeError, sql.compose, "select %(x)s;", [10]) + self.assertRaises(TypeError, sql.SQL("select %s;").__mod__, {'a': 10}) + self.assertRaises(TypeError, sql.SQL("select %(x)s;").__mod__, [10]) def test_must_be_adaptable(self): class Foo(object): pass self.assertRaises(TypeError, - sql.compose, "select %s;", [Foo()]) + sql.SQL("select %s;").__mod__, [Foo()]) def test_execute(self): cur = self.conn.cursor() @@ -94,11 +93,11 @@ class ComposeTests(ConnectingTestCase): foo text, bar text, "ba'z" text) """) cur.execute( - sql.compose("insert into %s (id, %s) values (%%s, %s)", [ + sql.SQL("insert into %s (id, %s) values (%%s, %s)") % [ sql.Identifier('test_compose'), sql.SQL(', ').join(map(sql.Identifier, ['foo', 'bar', "ba'z"])), (sql.PH() * 3).join(', '), - ]), + ], (10, 'a', 'b', 'c')) cur.execute("select * from test_compose") @@ -112,11 +111,11 @@ class ComposeTests(ConnectingTestCase): foo text, bar text, "ba'z" text) """) cur.executemany( - sql.compose("insert into %s (id, %s) values (%%s, %s)", [ + sql.SQL("insert into %s (id, %s) values (%%s, %s)") % [ sql.Identifier('test_compose'), sql.SQL(', ').join(map(sql.Identifier, ['foo', 'bar', "ba'z"])), (sql.PH() * 3).join(', '), - ]), + ], [(10, 'a', 'b', 'c'), (20, 'd', 'e', 'f')]) cur.execute("select * from test_compose")