Dropped sql.compose

Use a SQL % operator instead.
This commit is contained in:
Daniele Varrazzo 2017-01-01 08:37:41 +01:00
parent 4a55b8018a
commit cf40bff2e2
2 changed files with 48 additions and 37 deletions

View File

@ -145,6 +145,20 @@ class SQL(Composable):
""" """
A `Composable` representing a snippet of SQL string to be included verbatim. A `Composable` representing a snippet of SQL string to be included verbatim.
`!SQL` supports the ``%`` operator to incorporate variable parts of a query
into a template: the operator takes a sequence or mapping of `Composable`
(according to the style of the placeholders in the *string*) and returning
a `Composed` object.
Example::
>>> query = sql.SQL("select %s from %s") % [
... sql.SQL(', ').join([sql.Identifier('foo'), sql.Identifier('bar')]),
... sql.Identifier('table')]
>>> query.as_string(conn)
select "foo", "bar" from "table"'
.. automethod:: join .. automethod:: join
""" """
def __init__(self, string): def __init__(self, string):
@ -155,6 +169,9 @@ class SQL(Composable):
def __repr__(self): def __repr__(self):
return "sql.SQL(%r)" % (self._wrapped,) return "sql.SQL(%r)" % (self._wrapped,)
def __mod__(self, args):
return _compose(self._wrapped, args)
def as_string(self, conn_or_curs): def as_string(self, conn_or_curs):
return self._wrapped return self._wrapped
@ -162,7 +179,7 @@ class SQL(Composable):
""" """
Join a sequence of `Composable` or a `Composed` and return a `!Composed`. Join a sequence of `Composable` or a `Composed` and return a `!Composed`.
Use the object value to separate the *seq* elements. Use the object *string* to separate the *seq* elements.
Example:: Example::
@ -215,7 +232,13 @@ class Literal(Composable):
""" """
Represent an SQL value to be included in a query. Represent an SQL value to be included in a query.
The object follows the normal :ref:`adaptation rules <python-types-adaptation>` Usually you will want to include placeholders in the query and pass values
as `~cursor.execute()` arguments. If however you really really need to
include a literal value in the query you can use this object.
The string returned by `!as_string()` follows the normal :ref:`adaptation
rules <python-types-adaptation>` for Python objects.
""" """
def __init__(self, wrapped): def __init__(self, wrapped):
self._wrapped = wrapped self._wrapped = wrapped
@ -254,13 +277,13 @@ class Placeholder(Composable):
Examples:: Examples::
>>> sql.compose("insert into table (%s) values (%s)", [ >>> (sql.SQL("insert into table (%s) values (%s)") % [
... sql.SQL(', ').join(map(sql.Identifier, names)), ... sql.SQL(', ').join(map(sql.Identifier, names)),
... sql.SQL(', ').join(sql.Placeholder() * 3) ... sql.SQL(', ').join(sql.Placeholder() * 3)
... ]).as_string(conn) ... ]).as_string(conn)
'insert into table ("foo", "bar", "baz") values (%s, %s, %s)' 'insert into table ("foo", "bar", "baz") values (%s, %s, %s)'
>>> sql.compose("insert into table (%s) values (%s)", [ >>> (sql.SQL("insert into table (%s) values (%s)") % [
... sql.SQL(', ').join(map(sql.Identifier, names)), ... sql.SQL(', ').join(map(sql.Identifier, names)),
... sql.SQL(', ').join(map(sql.Placeholder, names)) ... sql.SQL(', ').join(map(sql.Placeholder, names))
... ]).as_string(conn) ... ]).as_string(conn)
@ -297,7 +320,7 @@ re_compose = re.compile("""
""", re.VERBOSE) """, re.VERBOSE)
def compose(sql, args=None): def _compose(sql, args=None):
""" """
Merge an SQL string with some variable parts. Merge an SQL string with some variable parts.
@ -313,17 +336,6 @@ def compose(sql, args=None):
The value returned is a `Composed` instance obtained replacing the The value returned is a `Composed` instance obtained replacing the
arguments to the query placeholders. arguments to the query placeholders.
Example::
>>> query = sql.compose(
... "select %s from %s", [
... sql.SQL(', ').join(
... [sql.Identifier('foo'), sql.Identifier('bar')]),
... sql.Identifier('table')])
>>> query.as_string(conn)
select "foo", "bar" from "table"'
""" """
if args is None: if args is None:
args = () args = ()

View File

@ -30,61 +30,60 @@ from psycopg2 import sql
class ComposeTests(ConnectingTestCase): class ComposeTests(ConnectingTestCase):
def test_pos(self): def test_pos(self):
s = sql.compose("select %s from %s", s = sql.SQL("select %s from %s") \
(sql.Identifier('field'), sql.Identifier('table'))) % (sql.Identifier('field'), sql.Identifier('table'))
s1 = s.as_string(self.conn) s1 = s.as_string(self.conn)
self.assert_(isinstance(s1, str)) self.assert_(isinstance(s1, str))
self.assertEqual(s1, 'select "field" from "table"') self.assertEqual(s1, 'select "field" from "table"')
def test_dict(self): def test_dict(self):
s = sql.compose("select %(f)s from %(t)s", s = sql.SQL("select %(f)s from %(t)s") \
{'f': sql.Identifier('field'), 't': sql.Identifier('table')}) % {'f': sql.Identifier('field'), 't': sql.Identifier('table')}
s1 = s.as_string(self.conn) s1 = s.as_string(self.conn)
self.assert_(isinstance(s1, str)) self.assert_(isinstance(s1, str))
self.assertEqual(s1, 'select "field" from "table"') self.assertEqual(s1, 'select "field" from "table"')
def test_unicode(self): def test_unicode(self):
s = sql.compose(u"select %s from %s", s = sql.SQL(u"select %s from %s") \
(sql.Identifier(u'field'), sql.Identifier('table'))) % (sql.Identifier(u'field'), sql.Identifier('table'))
s1 = s.as_string(self.conn) s1 = s.as_string(self.conn)
self.assert_(isinstance(s1, unicode)) self.assert_(isinstance(s1, unicode))
self.assertEqual(s1, u'select "field" from "table"') self.assertEqual(s1, u'select "field" from "table"')
def test_compose_literal(self): def test_compose_literal(self):
s = sql.compose("select %s;", [sql.Literal(dt.date(2016, 12, 31))]) s = sql.SQL("select %s;") % [sql.Literal(dt.date(2016, 12, 31))]
s1 = s.as_string(self.conn) s1 = s.as_string(self.conn)
self.assertEqual(s1, "select '2016-12-31'::date;") self.assertEqual(s1, "select '2016-12-31'::date;")
def test_compose_empty(self): def test_compose_empty(self):
s = sql.compose("select foo;") s = sql.SQL("select foo;") % ()
s1 = s.as_string(self.conn) s1 = s.as_string(self.conn)
self.assertEqual(s1, "select foo;") self.assertEqual(s1, "select foo;")
def test_percent_escape(self): def test_percent_escape(self):
s = sql.compose("42 %% %s", [sql.Literal(7)]) s = sql.SQL("42 %% %s") % [sql.Literal(7)]
s1 = s.as_string(self.conn) s1 = s.as_string(self.conn)
self.assertEqual(s1, "42 % 7") self.assertEqual(s1, "42 % 7")
s = sql.compose("42 %% 7") s = sql.SQL("42 %% 7") % []
s1 = s.as_string(self.conn) s1 = s.as_string(self.conn)
self.assertEqual(s1, "42 % 7") self.assertEqual(s1, "42 % 7")
def test_compose_badnargs(self): def test_compose_badnargs(self):
self.assertRaises(ValueError, sql.compose, "select foo;", [10]) self.assertRaises(ValueError, sql.SQL("select foo;").__mod__, [10])
self.assertRaises(ValueError, sql.compose, "select %s;") self.assertRaises(ValueError, sql.SQL("select %s;").__mod__, [])
self.assertRaises(ValueError, sql.compose, "select %s;", []) self.assertRaises(ValueError, sql.SQL("select %s;").__mod__, [10, 20])
self.assertRaises(ValueError, sql.compose, "select %s;", [10, 20])
def test_compose_bad_args_type(self): def test_compose_bad_args_type(self):
self.assertRaises(TypeError, sql.compose, "select %s;", {'a': 10}) self.assertRaises(TypeError, sql.SQL("select %s;").__mod__, {'a': 10})
self.assertRaises(TypeError, sql.compose, "select %(x)s;", [10]) self.assertRaises(TypeError, sql.SQL("select %(x)s;").__mod__, [10])
def test_must_be_adaptable(self): def test_must_be_adaptable(self):
class Foo(object): class Foo(object):
pass pass
self.assertRaises(TypeError, self.assertRaises(TypeError,
sql.compose, "select %s;", [Foo()]) sql.SQL("select %s;").__mod__, [Foo()])
def test_execute(self): def test_execute(self):
cur = self.conn.cursor() cur = self.conn.cursor()
@ -94,11 +93,11 @@ class ComposeTests(ConnectingTestCase):
foo text, bar text, "ba'z" text) foo text, bar text, "ba'z" text)
""") """)
cur.execute( cur.execute(
sql.compose("insert into %s (id, %s) values (%%s, %s)", [ sql.SQL("insert into %s (id, %s) values (%%s, %s)") % [
sql.Identifier('test_compose'), sql.Identifier('test_compose'),
sql.SQL(', ').join(map(sql.Identifier, ['foo', 'bar', "ba'z"])), sql.SQL(', ').join(map(sql.Identifier, ['foo', 'bar', "ba'z"])),
(sql.PH() * 3).join(', '), (sql.PH() * 3).join(', '),
]), ],
(10, 'a', 'b', 'c')) (10, 'a', 'b', 'c'))
cur.execute("select * from test_compose") cur.execute("select * from test_compose")
@ -112,11 +111,11 @@ class ComposeTests(ConnectingTestCase):
foo text, bar text, "ba'z" text) foo text, bar text, "ba'z" text)
""") """)
cur.executemany( cur.executemany(
sql.compose("insert into %s (id, %s) values (%%s, %s)", [ sql.SQL("insert into %s (id, %s) values (%%s, %s)") % [
sql.Identifier('test_compose'), sql.Identifier('test_compose'),
sql.SQL(', ').join(map(sql.Identifier, ['foo', 'bar', "ba'z"])), sql.SQL(', ').join(map(sql.Identifier, ['foo', 'bar', "ba'z"])),
(sql.PH() * 3).join(', '), (sql.PH() * 3).join(', '),
]), ],
[(10, 'a', 'b', 'c'), (20, 'd', 'e', 'f')]) [(10, 'a', 'b', 'c'), (20, 'd', 'e', 'f')])
cur.execute("select * from test_compose") cur.execute("select * from test_compose")