Added execute_batch and execute_values functions

This commit is contained in:
Daniele Varrazzo 2017-02-01 01:59:47 +00:00
parent 8ac839ce95
commit a95fd3df1a
5 changed files with 248 additions and 6 deletions

6
NEWS
View File

@ -27,6 +27,12 @@ New features:
- `~cursor.callproc()` now accepts a dictionary of parameters (:ticket:`#381`).
- Using Python C API decoding functions and codecs caching for faster
unicode encoding/decoding (:ticket:`#473`).
- `~cursor.executemany()` slowness addressed by
`~psycopg2.extras.execute_batch()` and `~psycopg2.extras.execute_values()`
(:ticket:`#491`).
Bug fixes:
- Fixed error caused by missing decoding `~psycopg2.extras.LoggingConnection`
(:ticket:`#483`).

View File

@ -172,33 +172,38 @@ The ``cursor`` class
.. method:: execute(operation [, parameters])
Prepare and execute a database operation (query or command).
Parameters may be provided as sequence or mapping and will be bound to
variables in the operation. Variables are specified either with
positional (``%s``) or named (:samp:`%({name})s`) placeholders. See
:ref:`query-parameters`.
The method returns `!None`. If a query was executed, the returned
values can be retrieved using |fetch*|_ methods.
.. method:: executemany(operation, seq_of_parameters)
Prepare a database operation (query or command) and then execute it
against all parameter tuples or mappings found in the sequence
`seq_of_parameters`.
The function is mostly useful for commands that update the database:
any result set returned by the query is discarded.
Parameters are bounded to the query using the same rules described in
the `~cursor.execute()` method.
.. warning::
In its current implementation this method is not faster than
executing `~cursor.execute()` in a loop. For better performance
you can use the functions described in :ref:`fast-exec`.
.. method:: callproc(procname [, parameters])
Call a stored database procedure with the given name. The sequence of
parameters must contain one entry for each argument that the procedure
expects. Overloaded procedures are supported. Named parameters can be

View File

@ -974,6 +974,31 @@ converted into lists of strings.
future versions.
.. _fast-exec:
Fast execution helpers
----------------------
The current implementation of `~cursor.executemany()` is (using an extremely
charitable understatement) not particularly performing. These functions can
be used to speed up the repeated execution of a statement againts a set of
parameters. By reducing the number of server roundtrips the performance can be
`orders of magnitude better`__ than using `!executemany()`.
.. __: https://github.com/psycopg/psycopg2/issues/491#issuecomment-276551038
.. autofunction:: execute_batch
.. versionadded:: 2.7
.. autofunction:: execute_values
.. versionadded:: 2.7
.. index::
single: Time zones; Fractional

View File

@ -1141,3 +1141,83 @@ def register_composite(name, conn_or_curs, globally=False, factory=None):
caster.array_typecaster, not globally and conn_or_curs or None)
return caster
def _paginate(seq, page_size):
"""Consume an iterable and return it in chunks.
Every chunk is at most `page_size`. Never return an empty chunk.
"""
page = []
it = iter(seq)
while 1:
try:
for i in xrange(page_size):
page.append(it.next())
yield page
page = []
except StopIteration:
if page:
yield page
return
def execute_batch(cur, sql, argslist, page_size=100):
"""Execute groups of statements in fewer server roundtrips.
Execute *sql* several times, against all parameters set (sequences or
mappings) found in *argslist*.
The function is semantically similar to `~cursor.executemany()`, but has a
different implementation: Psycopg will join the statements into fewer
multi-statement commands, reducing the number of server roundtrips,
resulting in better performances. Every command contains at most
*page_size* statements.
"""
for page in _paginate(argslist, page_size=page_size):
sqls = [cur.mogrify(sql, args) for args in page]
cur.execute(";".join(sqls))
def execute_values(cur, sql, argslist, template=None, page_size=100):
'''Execute a statement using :sql:`VALUES` with a sequence of parameters.
*sql* must contain a single ``%s`` placeholder, which will be replaced by a
`VALUES list`__. Every statement will contain at most *page_size* sets of
arguments.
.. __: https://www.postgresql.org/docs/current/static/queries-values.html
*template* is the part merged to the arguments, so it should be compatible
with the content of *argslist* (it should contain the right number of
arguments if *argslist* is a sequence of sequences, or compatible names if
*argslist* is a sequence of mappings). If not specified, assume the
arguments are sequence and use a simple positional template (i.e.
``(%s, %s, ...)``).
While :sql:`INSERT` is an obvious candidate for this function it is
possible to use it with other statements, for example::
>>> cur.execute(
... "create table test (id int primary key, v1 int, v2 int)")
>>> execute_values(cur,
... "INSERT INTO test (id, v1, v2) VALUES %s",
... [(1, 2, 3), (4, 5, 6), (7, 8, 9)])
>>> execute_values(cur,
... """UPDATE test SET v1 = data.v1 FROM (VALUES %s) AS data (id, v1)
... WHERE test.id = data.id""",
... [(1, 20), (4, 50)])
>>> cur.execute("select * from test order by id")
>>> cur.fetchall()
[(1, 20, 3), (4, 50, 6), (7, 8, 9)])
'''
for page in _paginate(argslist, page_size=page_size):
if template is None:
template = '(%s)' % ','.join(['%s'] * len(page[0]))
values = ",".join(cur.mogrify(template, args) for args in page)
cur.execute(sql % (values,))

View File

@ -1766,6 +1766,132 @@ class RangeCasterTestCase(ConnectingTestCase):
decorate_all_tests(RangeCasterTestCase, skip_if_no_range)
class TestFastExecute(ConnectingTestCase):
def setUp(self):
super(TestFastExecute, self).setUp()
cur = self.conn.cursor()
cur.execute(
"create table testfast (id serial primary key, date date, val int)")
def test_paginate(self):
def pag(seq):
return psycopg2.extras._paginate(seq, 100)
self.assertEqual(list(pag([])), [])
self.assertEqual(list(pag([1])), [[1]])
self.assertEqual(list(pag(range(99))), [list(range(99))])
self.assertEqual(list(pag(range(100))), [list(range(100))])
self.assertEqual(list(pag(range(101))), [list(range(100)), [100]])
self.assertEqual(
list(pag(range(200))), [list(range(100)), list(range(100, 200))])
self.assertEqual(
list(pag(range(1000))),
[list(range(i * 100, (i + 1) * 100)) for i in range(10)])
def test_execute_batch_empty(self):
cur = self.conn.cursor()
psycopg2.extras.execute_batch(cur,
"insert into testfast (id, val) values (%s, %s)",
[])
cur.execute("select * from testfast order by id")
self.assertEqual(cur.fetchall(), [])
def test_execute_batch_one(self):
cur = self.conn.cursor()
psycopg2.extras.execute_batch(cur,
"insert into testfast (id, val) values (%s, %s)",
iter([(1, 10)]))
cur.execute("select id, val from testfast order by id")
self.assertEqual(cur.fetchall(), [(1, 10)])
def test_execute_batch_tuples(self):
cur = self.conn.cursor()
psycopg2.extras.execute_batch(cur,
"insert into testfast (id, date, val) values (%s, %s, %s)",
((i, date(2017, 1, i + 1), i * 10) for i in range(10)))
cur.execute("select id, date, val from testfast order by id")
self.assertEqual(cur.fetchall(),
[(i, date(2017, 1, i + 1), i * 10) for i in range(10)])
def test_execute_batch_many(self):
cur = self.conn.cursor()
psycopg2.extras.execute_batch(cur,
"insert into testfast (id, val) values (%s, %s)",
((i, i * 10) for i in range(1000)))
cur.execute("select id, val from testfast order by id")
self.assertEqual(cur.fetchall(), [(i, i * 10) for i in range(1000)])
def test_execute_batch_pages(self):
cur = self.conn.cursor()
psycopg2.extras.execute_batch(cur,
"insert into testfast (id, val) values (%s, %s)",
((i, i * 10) for i in range(25)),
page_size=10)
# last command was 5 statements
self.assertEqual(sum(c == ';' for c in cur.query), 4)
cur.execute("select id, val from testfast order by id")
self.assertEqual(cur.fetchall(), [(i, i * 10) for i in range(25)])
def test_execute_values_empty(self):
cur = self.conn.cursor()
psycopg2.extras.execute_values(cur,
"insert into testfast (id, val) values %s",
[])
cur.execute("select * from testfast order by id")
self.assertEqual(cur.fetchall(), [])
def test_execute_values_one(self):
cur = self.conn.cursor()
psycopg2.extras.execute_values(cur,
"insert into testfast (id, val) values %s",
iter([(1, 10)]))
cur.execute("select id, val from testfast order by id")
self.assertEqual(cur.fetchall(), [(1, 10)])
def test_execute_values_tuples(self):
cur = self.conn.cursor()
psycopg2.extras.execute_values(cur,
"insert into testfast (id, date, val) values %s",
((i, date(2017, 1, i + 1), i * 10) for i in range(10)))
cur.execute("select id, date, val from testfast order by id")
self.assertEqual(cur.fetchall(),
[(i, date(2017, 1, i + 1), i * 10) for i in range(10)])
def test_execute_values_dicts(self):
cur = self.conn.cursor()
psycopg2.extras.execute_values(cur,
"insert into testfast (id, date, val) values %s",
(dict(id=i, date=date(2017, 1, i + 1), val=i * 10, foo="bar")
for i in range(10)),
template='(%(id)s, %(date)s, %(val)s)')
cur.execute("select id, date, val from testfast order by id")
self.assertEqual(cur.fetchall(),
[(i, date(2017, 1, i + 1), i * 10) for i in range(10)])
def test_execute_values_many(self):
cur = self.conn.cursor()
psycopg2.extras.execute_values(cur,
"insert into testfast (id, val) values %s",
((i, i * 10) for i in range(1000)))
cur.execute("select id, val from testfast order by id")
self.assertEqual(cur.fetchall(), [(i, i * 10) for i in range(1000)])
def test_execute_values_pages(self):
cur = self.conn.cursor()
psycopg2.extras.execute_values(cur,
"insert into testfast (id, val) values %s",
((i, i * 10) for i in range(25)),
page_size=10)
# last statement was 5 tuples (one parens is for the fields list)
self.assertEqual(sum(c == '(' for c in cur.query), 6)
cur.execute("select id, val from testfast order by id")
self.assertEqual(cur.fetchall(), [(i, i * 10) for i in range(25)])
def test_suite():
return unittest.TestLoader().loadTestsFromName(__name__)