diff --git a/NEWS b/NEWS index b4d11e64..a0f1810e 100644 --- a/NEWS +++ b/NEWS @@ -27,6 +27,12 @@ New features: - `~cursor.callproc()` now accepts a dictionary of parameters (:ticket:`#381`). - Using Python C API decoding functions and codecs caching for faster unicode encoding/decoding (:ticket:`#473`). +- `~cursor.executemany()` slowness addressed by + `~psycopg2.extras.execute_batch()` and `~psycopg2.extras.execute_values()` + (:ticket:`#491`). + +Bug fixes: + - Fixed error caused by missing decoding `~psycopg2.extras.LoggingConnection` (:ticket:`#483`). diff --git a/doc/src/cursor.rst b/doc/src/cursor.rst index aee6b465..4161f2a1 100644 --- a/doc/src/cursor.rst +++ b/doc/src/cursor.rst @@ -172,33 +172,38 @@ The ``cursor`` class .. method:: execute(operation [, parameters]) - + Prepare and execute a database operation (query or command). Parameters may be provided as sequence or mapping and will be bound to variables in the operation. Variables are specified either with positional (``%s``) or named (:samp:`%({name})s`) placeholders. See :ref:`query-parameters`. - + The method returns `!None`. If a query was executed, the returned values can be retrieved using |fetch*|_ methods. .. method:: executemany(operation, seq_of_parameters) - + Prepare a database operation (query or command) and then execute it against all parameter tuples or mappings found in the sequence `seq_of_parameters`. - + The function is mostly useful for commands that update the database: any result set returned by the query is discarded. - + Parameters are bounded to the query using the same rules described in the `~cursor.execute()` method. + .. warning:: + In its current implementation this method is not faster than + executing `~cursor.execute()` in a loop. For better performance + you can use the functions described in :ref:`fast-exec`. + .. method:: callproc(procname [, parameters]) - + Call a stored database procedure with the given name. The sequence of parameters must contain one entry for each argument that the procedure expects. Overloaded procedures are supported. Named parameters can be diff --git a/doc/src/extras.rst b/doc/src/extras.rst index d33b8eed..66be5902 100644 --- a/doc/src/extras.rst +++ b/doc/src/extras.rst @@ -974,6 +974,31 @@ converted into lists of strings. future versions. + +.. _fast-exec: + +Fast execution helpers +---------------------- + +The current implementation of `~cursor.executemany()` is (using an extremely +charitable understatement) not particularly performing. These functions can +be used to speed up the repeated execution of a statement againts a set of +parameters. By reducing the number of server roundtrips the performance can be +`orders of magnitude better`__ than using `!executemany()`. + +.. __: https://github.com/psycopg/psycopg2/issues/491#issuecomment-276551038 + + +.. autofunction:: execute_batch + + .. versionadded:: 2.7 + +.. autofunction:: execute_values + + .. versionadded:: 2.7 + + + .. index:: single: Time zones; Fractional diff --git a/lib/extras.py b/lib/extras.py index c1d15670..85963c9f 100644 --- a/lib/extras.py +++ b/lib/extras.py @@ -1141,3 +1141,83 @@ def register_composite(name, conn_or_curs, globally=False, factory=None): caster.array_typecaster, not globally and conn_or_curs or None) return caster + + +def _paginate(seq, page_size): + """Consume an iterable and return it in chunks. + + Every chunk is at most `page_size`. Never return an empty chunk. + """ + page = [] + it = iter(seq) + while 1: + try: + for i in xrange(page_size): + page.append(it.next()) + yield page + page = [] + except StopIteration: + if page: + yield page + return + + +def execute_batch(cur, sql, argslist, page_size=100): + """Execute groups of statements in fewer server roundtrips. + + Execute *sql* several times, against all parameters set (sequences or + mappings) found in *argslist*. + + The function is semantically similar to `~cursor.executemany()`, but has a + different implementation: Psycopg will join the statements into fewer + multi-statement commands, reducing the number of server roundtrips, + resulting in better performances. Every command contains at most + *page_size* statements. + + """ + for page in _paginate(argslist, page_size=page_size): + sqls = [cur.mogrify(sql, args) for args in page] + cur.execute(";".join(sqls)) + + +def execute_values(cur, sql, argslist, template=None, page_size=100): + '''Execute a statement using :sql:`VALUES` with a sequence of parameters. + + *sql* must contain a single ``%s`` placeholder, which will be replaced by a + `VALUES list`__. Every statement will contain at most *page_size* sets of + arguments. + + .. __: https://www.postgresql.org/docs/current/static/queries-values.html + + *template* is the part merged to the arguments, so it should be compatible + with the content of *argslist* (it should contain the right number of + arguments if *argslist* is a sequence of sequences, or compatible names if + *argslist* is a sequence of mappings). If not specified, assume the + arguments are sequence and use a simple positional template (i.e. + ``(%s, %s, ...)``). + + While :sql:`INSERT` is an obvious candidate for this function it is + possible to use it with other statements, for example:: + + >>> cur.execute( + ... "create table test (id int primary key, v1 int, v2 int)") + + >>> execute_values(cur, + ... "INSERT INTO test (id, v1, v2) VALUES %s", + ... [(1, 2, 3), (4, 5, 6), (7, 8, 9)]) + + >>> execute_values(cur, + ... """UPDATE test SET v1 = data.v1 FROM (VALUES %s) AS data (id, v1) + ... WHERE test.id = data.id""", + ... [(1, 20), (4, 50)]) + + >>> cur.execute("select * from test order by id") + >>> cur.fetchall() + [(1, 20, 3), (4, 50, 6), (7, 8, 9)]) + + ''' + for page in _paginate(argslist, page_size=page_size): + if template is None: + template = '(%s)' % ','.join(['%s'] * len(page[0])) + values = ",".join(cur.mogrify(template, args) for args in page) + cur.execute(sql % (values,)) diff --git a/tests/test_types_extras.py b/tests/test_types_extras.py index 8e615616..ab8e1707 100755 --- a/tests/test_types_extras.py +++ b/tests/test_types_extras.py @@ -1766,6 +1766,132 @@ class RangeCasterTestCase(ConnectingTestCase): decorate_all_tests(RangeCasterTestCase, skip_if_no_range) +class TestFastExecute(ConnectingTestCase): + def setUp(self): + super(TestFastExecute, self).setUp() + cur = self.conn.cursor() + cur.execute( + "create table testfast (id serial primary key, date date, val int)") + + def test_paginate(self): + def pag(seq): + return psycopg2.extras._paginate(seq, 100) + + self.assertEqual(list(pag([])), []) + self.assertEqual(list(pag([1])), [[1]]) + self.assertEqual(list(pag(range(99))), [list(range(99))]) + self.assertEqual(list(pag(range(100))), [list(range(100))]) + self.assertEqual(list(pag(range(101))), [list(range(100)), [100]]) + self.assertEqual( + list(pag(range(200))), [list(range(100)), list(range(100, 200))]) + self.assertEqual( + list(pag(range(1000))), + [list(range(i * 100, (i + 1) * 100)) for i in range(10)]) + + def test_execute_batch_empty(self): + cur = self.conn.cursor() + psycopg2.extras.execute_batch(cur, + "insert into testfast (id, val) values (%s, %s)", + []) + cur.execute("select * from testfast order by id") + self.assertEqual(cur.fetchall(), []) + + def test_execute_batch_one(self): + cur = self.conn.cursor() + psycopg2.extras.execute_batch(cur, + "insert into testfast (id, val) values (%s, %s)", + iter([(1, 10)])) + cur.execute("select id, val from testfast order by id") + self.assertEqual(cur.fetchall(), [(1, 10)]) + + def test_execute_batch_tuples(self): + cur = self.conn.cursor() + psycopg2.extras.execute_batch(cur, + "insert into testfast (id, date, val) values (%s, %s, %s)", + ((i, date(2017, 1, i + 1), i * 10) for i in range(10))) + cur.execute("select id, date, val from testfast order by id") + self.assertEqual(cur.fetchall(), + [(i, date(2017, 1, i + 1), i * 10) for i in range(10)]) + + def test_execute_batch_many(self): + cur = self.conn.cursor() + psycopg2.extras.execute_batch(cur, + "insert into testfast (id, val) values (%s, %s)", + ((i, i * 10) for i in range(1000))) + cur.execute("select id, val from testfast order by id") + self.assertEqual(cur.fetchall(), [(i, i * 10) for i in range(1000)]) + + def test_execute_batch_pages(self): + cur = self.conn.cursor() + psycopg2.extras.execute_batch(cur, + "insert into testfast (id, val) values (%s, %s)", + ((i, i * 10) for i in range(25)), + page_size=10) + + # last command was 5 statements + self.assertEqual(sum(c == ';' for c in cur.query), 4) + + cur.execute("select id, val from testfast order by id") + self.assertEqual(cur.fetchall(), [(i, i * 10) for i in range(25)]) + + def test_execute_values_empty(self): + cur = self.conn.cursor() + psycopg2.extras.execute_values(cur, + "insert into testfast (id, val) values %s", + []) + cur.execute("select * from testfast order by id") + self.assertEqual(cur.fetchall(), []) + + def test_execute_values_one(self): + cur = self.conn.cursor() + psycopg2.extras.execute_values(cur, + "insert into testfast (id, val) values %s", + iter([(1, 10)])) + cur.execute("select id, val from testfast order by id") + self.assertEqual(cur.fetchall(), [(1, 10)]) + + def test_execute_values_tuples(self): + cur = self.conn.cursor() + psycopg2.extras.execute_values(cur, + "insert into testfast (id, date, val) values %s", + ((i, date(2017, 1, i + 1), i * 10) for i in range(10))) + cur.execute("select id, date, val from testfast order by id") + self.assertEqual(cur.fetchall(), + [(i, date(2017, 1, i + 1), i * 10) for i in range(10)]) + + def test_execute_values_dicts(self): + cur = self.conn.cursor() + psycopg2.extras.execute_values(cur, + "insert into testfast (id, date, val) values %s", + (dict(id=i, date=date(2017, 1, i + 1), val=i * 10, foo="bar") + for i in range(10)), + template='(%(id)s, %(date)s, %(val)s)') + cur.execute("select id, date, val from testfast order by id") + self.assertEqual(cur.fetchall(), + [(i, date(2017, 1, i + 1), i * 10) for i in range(10)]) + + def test_execute_values_many(self): + cur = self.conn.cursor() + psycopg2.extras.execute_values(cur, + "insert into testfast (id, val) values %s", + ((i, i * 10) for i in range(1000))) + cur.execute("select id, val from testfast order by id") + self.assertEqual(cur.fetchall(), [(i, i * 10) for i in range(1000)]) + + def test_execute_values_pages(self): + cur = self.conn.cursor() + psycopg2.extras.execute_values(cur, + "insert into testfast (id, val) values %s", + ((i, i * 10) for i in range(25)), + page_size=10) + + # last statement was 5 tuples (one parens is for the fields list) + self.assertEqual(sum(c == '(' for c in cur.query), 6) + + cur.execute("select id, val from testfast order by id") + self.assertEqual(cur.fetchall(), [(i, i * 10) for i in range(25)]) + + def test_suite(): return unittest.TestLoader().loadTestsFromName(__name__)