Merge branch 'fast-executemany'

This commit is contained in:
Daniele Varrazzo 2017-02-02 02:40:28 +00:00
commit 626e57acda
5 changed files with 334 additions and 6 deletions

6
NEWS
View File

@ -27,6 +27,12 @@ New features:
- `~cursor.callproc()` now accepts a dictionary of parameters (:ticket:`#381`).
- Using Python C API decoding functions and codecs caching for faster
unicode encoding/decoding (:ticket:`#473`).
- `~cursor.executemany()` slowness addressed by
`~psycopg2.extras.execute_batch()` and `~psycopg2.extras.execute_values()`
(:ticket:`#491`).
Bug fixes:
- Fixed error caused by missing decoding `~psycopg2.extras.LoggingConnection`
(:ticket:`#483`).

View File

@ -196,6 +196,11 @@ The ``cursor`` class
Parameters are bounded to the query using the same rules described in
the `~cursor.execute()` method.
.. warning::
In its current implementation this method is not faster than
executing `~cursor.execute()` in a loop. For better performance
you can use the functions described in :ref:`fast-exec`.
.. method:: callproc(procname [, parameters])

View File

@ -974,6 +974,63 @@ converted into lists of strings.
future versions.
.. _fast-exec:
Fast execution helpers
----------------------
The current implementation of `~cursor.executemany()` is (using an extremely
charitable understatement) not particularly performing. These functions can
be used to speed up the repeated execution of a statement againts a set of
parameters. By reducing the number of server roundtrips the performance can be
`orders of magnitude better`__ than using `!executemany()`.
.. __: https://github.com/psycopg/psycopg2/issues/491#issuecomment-276551038
.. autofunction:: execute_batch
.. versionadded:: 2.7
.. note::
`!execute_batch()` can be also used in conjunction with PostgreSQL
prepared statements using |PREPARE|_, |EXECUTE|_, |DEALLOCATE|_.
Instead of executing::
execute_batch(cur,
"big and complex SQL with %s %s params",
params_list)
it is possible to execute something like::
cur.execute("PREPARE stmt AS big and complex SQL with $1 $2 params")
execute_batch(cur, "EXECUTE stmt (%s, %s)", params_list)
cur.execute("DEALLOCATE stmt")
which may bring further performance benefits: if the operation to perform
is complex, every single execution will be faster as the query plan is
already cached; furthermore the amount of data to send on the server will
be lesser (one |EXECUTE| per param set instead of the whole, likely
longer, statement).
.. |PREPARE| replace:: :sql:`PREPARE`
.. _PREPARE: https://www.postgresql.org/docs/current/static/sql-prepare.html
.. |EXECUTE| replace:: :sql:`EXECUTE`
.. _EXECUTE: https://www.postgresql.org/docs/current/static/sql-execute.html
.. |DEALLOCATE| replace:: :sql:`DEALLOCATE`
.. _DEALLOCATE: https://www.postgresql.org/docs/current/static/sql-deallocate.html
.. autofunction:: execute_values
.. versionadded:: 2.7
.. index::
single: Time zones; Fractional

View File

@ -1141,3 +1141,85 @@ def register_composite(name, conn_or_curs, globally=False, factory=None):
caster.array_typecaster, not globally and conn_or_curs or None)
return caster
def _paginate(seq, page_size):
"""Consume an iterable and return it in chunks.
Every chunk is at most `page_size`. Never return an empty chunk.
"""
page = []
it = iter(seq)
while 1:
try:
for i in xrange(page_size):
page.append(it.next())
yield page
page = []
except StopIteration:
if page:
yield page
return
def execute_batch(cur, sql, argslist, page_size=100):
"""Execute groups of statements in fewer server roundtrips.
Execute *sql* several times, against all parameters set (sequences or
mappings) found in *argslist*.
The function is semantically similar to `~cursor.executemany()`, but has a
different implementation: Psycopg will join the statements into fewer
multi-statement commands, reducing the number of server roundtrips,
resulting in better performances. Every command contains at most
*page_size* statements.
"""
for page in _paginate(argslist, page_size=page_size):
sqls = [cur.mogrify(sql, args) for args in page]
cur.execute(b";".join(sqls))
def execute_values(cur, sql, argslist, template=None, page_size=100):
'''Execute a statement using :sql:`VALUES` with a sequence of parameters.
*sql* must contain a single ``%s`` placeholder, which will be replaced by a
`VALUES list`__. Every statement will contain at most *page_size* sets of
arguments.
.. __: https://www.postgresql.org/docs/current/static/queries-values.html
*template* is the part merged to the arguments, so it should be compatible
with the content of *argslist* (it should contain the right number of
arguments if *argslist* is a sequence of sequences, or compatible names if
*argslist* is a sequence of mappings). If not specified, assume the
arguments are sequence and use a simple positional template (i.e.
``(%s, %s, ...)``).
While :sql:`INSERT` is an obvious candidate for this function it is
possible to use it with other statements, for example::
>>> cur.execute(
... "create table test (id int primary key, v1 int, v2 int)")
>>> execute_values(cur,
... "INSERT INTO test (id, v1, v2) VALUES %s",
... [(1, 2, 3), (4, 5, 6), (7, 8, 9)])
>>> execute_values(cur,
... """UPDATE test SET v1 = data.v1 FROM (VALUES %s) AS data (id, v1)
... WHERE test.id = data.id""",
... [(1, 20), (4, 50)])
>>> cur.execute("select * from test order by id")
>>> cur.fetchall()
[(1, 20, 3), (4, 50, 6), (7, 8, 9)])
'''
for page in _paginate(argslist, page_size=page_size):
if template is None:
template = '(%s)' % ','.join(['%s'] * len(page[0]))
values = b",".join(cur.mogrify(template, args) for args in page)
if isinstance(values, bytes):
values = values.decode(_ext.encodings[cur.connection.encoding])
cur.execute(sql % (values,))

View File

@ -1766,6 +1766,184 @@ class RangeCasterTestCase(ConnectingTestCase):
decorate_all_tests(RangeCasterTestCase, skip_if_no_range)
class TestFastExecute(ConnectingTestCase):
def setUp(self):
super(TestFastExecute, self).setUp()
cur = self.conn.cursor()
cur.execute("""create table testfast (
id serial primary key, date date, val int, data text)""")
def test_paginate(self):
def pag(seq):
return psycopg2.extras._paginate(seq, 100)
self.assertEqual(list(pag([])), [])
self.assertEqual(list(pag([1])), [[1]])
self.assertEqual(list(pag(range(99))), [list(range(99))])
self.assertEqual(list(pag(range(100))), [list(range(100))])
self.assertEqual(list(pag(range(101))), [list(range(100)), [100]])
self.assertEqual(
list(pag(range(200))), [list(range(100)), list(range(100, 200))])
self.assertEqual(
list(pag(range(1000))),
[list(range(i * 100, (i + 1) * 100)) for i in range(10)])
def test_execute_batch_empty(self):
cur = self.conn.cursor()
psycopg2.extras.execute_batch(cur,
"insert into testfast (id, val) values (%s, %s)",
[])
cur.execute("select * from testfast order by id")
self.assertEqual(cur.fetchall(), [])
def test_execute_batch_one(self):
cur = self.conn.cursor()
psycopg2.extras.execute_batch(cur,
"insert into testfast (id, val) values (%s, %s)",
iter([(1, 10)]))
cur.execute("select id, val from testfast order by id")
self.assertEqual(cur.fetchall(), [(1, 10)])
def test_execute_batch_tuples(self):
cur = self.conn.cursor()
psycopg2.extras.execute_batch(cur,
"insert into testfast (id, date, val) values (%s, %s, %s)",
((i, date(2017, 1, i + 1), i * 10) for i in range(10)))
cur.execute("select id, date, val from testfast order by id")
self.assertEqual(cur.fetchall(),
[(i, date(2017, 1, i + 1), i * 10) for i in range(10)])
def test_execute_batch_many(self):
cur = self.conn.cursor()
psycopg2.extras.execute_batch(cur,
"insert into testfast (id, val) values (%s, %s)",
((i, i * 10) for i in range(1000)))
cur.execute("select id, val from testfast order by id")
self.assertEqual(cur.fetchall(), [(i, i * 10) for i in range(1000)])
def test_execute_batch_pages(self):
cur = self.conn.cursor()
psycopg2.extras.execute_batch(cur,
"insert into testfast (id, val) values (%s, %s)",
((i, i * 10) for i in range(25)),
page_size=10)
# last command was 5 statements
self.assertEqual(sum(c == u';' for c in cur.query.decode('ascii')), 4)
cur.execute("select id, val from testfast order by id")
self.assertEqual(cur.fetchall(), [(i, i * 10) for i in range(25)])
def test_execute_batch_unicode(self):
cur = self.conn.cursor()
ext.register_type(ext.UNICODE, cur)
snowman = u"\u2603"
# unicode in statement
psycopg2.extras.execute_batch(cur,
"insert into testfast (id, data) values (%%s, %%s) -- %s" % snowman,
[(1, 'x')])
cur.execute("select id, data from testfast where id = 1")
self.assertEqual(cur.fetchone(), (1, 'x'))
# unicode in data
psycopg2.extras.execute_batch(cur,
"insert into testfast (id, data) values (%s, %s)",
[(2, snowman)])
cur.execute("select id, data from testfast where id = 2")
self.assertEqual(cur.fetchone(), (2, snowman))
# unicode in both
psycopg2.extras.execute_batch(cur,
"insert into testfast (id, data) values (%%s, %%s) -- %s" % snowman,
[(3, snowman)])
cur.execute("select id, data from testfast where id = 3")
self.assertEqual(cur.fetchone(), (3, snowman))
def test_execute_values_empty(self):
cur = self.conn.cursor()
psycopg2.extras.execute_values(cur,
"insert into testfast (id, val) values %s",
[])
cur.execute("select * from testfast order by id")
self.assertEqual(cur.fetchall(), [])
def test_execute_values_one(self):
cur = self.conn.cursor()
psycopg2.extras.execute_values(cur,
"insert into testfast (id, val) values %s",
iter([(1, 10)]))
cur.execute("select id, val from testfast order by id")
self.assertEqual(cur.fetchall(), [(1, 10)])
def test_execute_values_tuples(self):
cur = self.conn.cursor()
psycopg2.extras.execute_values(cur,
"insert into testfast (id, date, val) values %s",
((i, date(2017, 1, i + 1), i * 10) for i in range(10)))
cur.execute("select id, date, val from testfast order by id")
self.assertEqual(cur.fetchall(),
[(i, date(2017, 1, i + 1), i * 10) for i in range(10)])
def test_execute_values_dicts(self):
cur = self.conn.cursor()
psycopg2.extras.execute_values(cur,
"insert into testfast (id, date, val) values %s",
(dict(id=i, date=date(2017, 1, i + 1), val=i * 10, foo="bar")
for i in range(10)),
template='(%(id)s, %(date)s, %(val)s)')
cur.execute("select id, date, val from testfast order by id")
self.assertEqual(cur.fetchall(),
[(i, date(2017, 1, i + 1), i * 10) for i in range(10)])
def test_execute_values_many(self):
cur = self.conn.cursor()
psycopg2.extras.execute_values(cur,
"insert into testfast (id, val) values %s",
((i, i * 10) for i in range(1000)))
cur.execute("select id, val from testfast order by id")
self.assertEqual(cur.fetchall(), [(i, i * 10) for i in range(1000)])
def test_execute_values_pages(self):
cur = self.conn.cursor()
psycopg2.extras.execute_values(cur,
"insert into testfast (id, val) values %s",
((i, i * 10) for i in range(25)),
page_size=10)
# last statement was 5 tuples (one parens is for the fields list)
self.assertEqual(sum(c == '(' for c in cur.query.decode('ascii')), 6)
cur.execute("select id, val from testfast order by id")
self.assertEqual(cur.fetchall(), [(i, i * 10) for i in range(25)])
def test_execute_values_unicode(self):
cur = self.conn.cursor()
ext.register_type(ext.UNICODE, cur)
snowman = u"\u2603"
# unicode in statement
psycopg2.extras.execute_values(cur,
"insert into testfast (id, data) values %%s -- %s" % snowman,
[(1, 'x')])
cur.execute("select id, data from testfast where id = 1")
self.assertEqual(cur.fetchone(), (1, 'x'))
# unicode in data
psycopg2.extras.execute_values(cur,
"insert into testfast (id, data) values %s",
[(2, snowman)])
cur.execute("select id, data from testfast where id = 2")
self.assertEqual(cur.fetchone(), (2, snowman))
# unicode in both
psycopg2.extras.execute_values(cur,
"insert into testfast (id, data) values %%s -- %s" % snowman,
[(3, snowman)])
cur.execute("select id, data from testfast where id = 3")
self.assertEqual(cur.fetchone(), (3, snowman))
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)