gh-74028: add buffersize parameter to concurrent.futures.Executor.map for lazier behavior (#125663)

`concurrent.futures.Executor.map` now supports limiting the number of submitted
tasks whose results have not yet been yielded via the new `buffersize` parameter.

---------

Co-authored-by: Bénédikt Tran <10796600+picnixz@users.noreply.github.com>
This commit is contained in:
Enzo Bonnal 2025-03-13 10:57:53 +00:00 committed by GitHub
parent e98d321bef
commit a005835f69
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 128 additions and 7 deletions

View file

@ -40,11 +40,14 @@ Executor Objects
future = executor.submit(pow, 323, 1235)
print(future.result())
.. method:: map(fn, *iterables, timeout=None, chunksize=1)
.. method:: map(fn, *iterables, timeout=None, chunksize=1, buffersize=None)
Similar to :func:`map(fn, *iterables) <map>` except:
* the *iterables* are collected immediately rather than lazily;
* The *iterables* are collected immediately rather than lazily, unless a
*buffersize* is specified to limit the number of submitted tasks whose
results have not yet been yielded. If the buffer is full, iteration over
the *iterables* pauses until a result is yielded from the buffer.
* *fn* is executed asynchronously and several calls to
*fn* may be made concurrently.
@ -68,7 +71,10 @@ Executor Objects
*chunksize* has no effect.
.. versionchanged:: 3.5
Added the *chunksize* argument.
Added the *chunksize* parameter.
.. versionchanged:: next
Added the *buffersize* parameter.
.. method:: shutdown(wait=True, *, cancel_futures=False)

View file

@ -465,6 +465,13 @@ contextvars
* Support context manager protocol by :class:`contextvars.Token`.
(Contributed by Andrew Svetlov in :gh:`129889`.)
* Add the optional ``buffersize`` parameter to
:meth:`concurrent.futures.Executor.map` to limit the number of submitted
tasks whose results have not yet been yielded. If the buffer is full,
iteration over the *iterables* pauses until a result is yielded from the
buffer.
(Contributed by Enzo Bonnal and Josh Rosenberg in :gh:`74028`.)
ctypes
------

View file

@ -8,6 +8,8 @@ import logging
import threading
import time
import types
import weakref
from itertools import islice
FIRST_COMPLETED = 'FIRST_COMPLETED'
FIRST_EXCEPTION = 'FIRST_EXCEPTION'
@ -572,7 +574,7 @@ class Executor(object):
"""
raise NotImplementedError()
def map(self, fn, *iterables, timeout=None, chunksize=1):
def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None):
"""Returns an iterator equivalent to map(fn, iter).
Args:
@ -584,6 +586,11 @@ class Executor(object):
before being passed to a child process. This argument is only
used by ProcessPoolExecutor; it is ignored by
ThreadPoolExecutor.
buffersize: The number of submitted tasks whose results have not
yet been yielded. If the buffer is full, iteration over the
iterables pauses until a result is yielded from the buffer.
If None, all input elements are eagerly collected, and a task is
submitted for each.
Returns:
An iterator equivalent to: map(func, *iterables) but the calls may
@ -594,10 +601,25 @@ class Executor(object):
before the given timeout.
Exception: If fn(*args) raises for any values.
"""
if buffersize is not None and not isinstance(buffersize, int):
raise TypeError("buffersize must be an integer or None")
if buffersize is not None and buffersize < 1:
raise ValueError("buffersize must be None or > 0")
if timeout is not None:
end_time = timeout + time.monotonic()
fs = [self.submit(fn, *args) for args in zip(*iterables)]
zipped_iterables = zip(*iterables)
if buffersize:
fs = collections.deque(
self.submit(fn, *args) for args in islice(zipped_iterables, buffersize)
)
else:
fs = [self.submit(fn, *args) for args in zipped_iterables]
# Use a weak reference to ensure that the executor can be garbage
# collected independently of the result_iterator closure.
executor_weakref = weakref.ref(self)
# Yield must be hidden in closure so that the futures are submitted
# before the first iterator value is required.
@ -606,6 +628,12 @@ class Executor(object):
# reverse to keep finishing order
fs.reverse()
while fs:
if (
buffersize
and (executor := executor_weakref())
and (args := next(zipped_iterables, None))
):
fs.appendleft(executor.submit(fn, *args))
# Careful not to keep a reference to the popped future
if timeout is None:
yield _result_or_cancel(fs.pop())

View file

@ -813,7 +813,7 @@ class ProcessPoolExecutor(_base.Executor):
return f
submit.__doc__ = _base.Executor.submit.__doc__
def map(self, fn, *iterables, timeout=None, chunksize=1):
def map(self, fn, *iterables, timeout=None, chunksize=1, buffersize=None):
"""Returns an iterator equivalent to map(fn, iter).
Args:
@ -824,6 +824,11 @@ class ProcessPoolExecutor(_base.Executor):
chunksize: If greater than one, the iterables will be chopped into
chunks of size chunksize and submitted to the process pool.
If set to one, the items in the list will be sent one at a time.
buffersize: The number of submitted tasks whose results have not
yet been yielded. If the buffer is full, iteration over the
iterables pauses until a result is yielded from the buffer.
If None, all input elements are eagerly collected, and a task is
submitted for each.
Returns:
An iterator equivalent to: map(func, *iterables) but the calls may
@ -839,7 +844,8 @@ class ProcessPoolExecutor(_base.Executor):
results = super().map(partial(_process_chunk, fn),
itertools.batched(zip(*iterables), chunksize),
timeout=timeout)
timeout=timeout,
buffersize=buffersize)
return _chain_from_iterable_of_lists(results)
def shutdown(self, wait=True, *, cancel_futures=False):

View file

@ -1,7 +1,9 @@
import itertools
import threading
import time
import weakref
from concurrent import futures
from operator import add
from test import support
from test.support import Py_GIL_DISABLED
@ -73,6 +75,74 @@ class ExecutorTest:
# take longer than the specified timeout.
self.assertIn(results, ([None, None], [None], []))
def test_map_buffersize_type_validation(self):
for buffersize in ("foo", 2.0):
with self.subTest(buffersize=buffersize):
with self.assertRaisesRegex(
TypeError,
"buffersize must be an integer or None",
):
self.executor.map(str, range(4), buffersize=buffersize)
def test_map_buffersize_value_validation(self):
for buffersize in (0, -1):
with self.subTest(buffersize=buffersize):
with self.assertRaisesRegex(
ValueError,
"buffersize must be None or > 0",
):
self.executor.map(str, range(4), buffersize=buffersize)
def test_map_buffersize(self):
ints = range(4)
for buffersize in (1, 2, len(ints), len(ints) * 2):
with self.subTest(buffersize=buffersize):
res = self.executor.map(str, ints, buffersize=buffersize)
self.assertListEqual(list(res), ["0", "1", "2", "3"])
def test_map_buffersize_on_multiple_iterables(self):
ints = range(4)
for buffersize in (1, 2, len(ints), len(ints) * 2):
with self.subTest(buffersize=buffersize):
res = self.executor.map(add, ints, ints, buffersize=buffersize)
self.assertListEqual(list(res), [0, 2, 4, 6])
def test_map_buffersize_on_infinite_iterable(self):
res = self.executor.map(str, itertools.count(), buffersize=2)
self.assertEqual(next(res, None), "0")
self.assertEqual(next(res, None), "1")
self.assertEqual(next(res, None), "2")
def test_map_buffersize_on_multiple_infinite_iterables(self):
res = self.executor.map(
add,
itertools.count(),
itertools.count(),
buffersize=2
)
self.assertEqual(next(res, None), 0)
self.assertEqual(next(res, None), 2)
self.assertEqual(next(res, None), 4)
def test_map_buffersize_on_empty_iterable(self):
res = self.executor.map(str, [], buffersize=2)
self.assertIsNone(next(res, None))
def test_map_buffersize_without_iterable(self):
res = self.executor.map(str, buffersize=2)
self.assertIsNone(next(res, None))
def test_map_buffersize_when_buffer_is_full(self):
ints = iter(range(4))
buffersize = 2
self.executor.map(str, ints, buffersize=buffersize)
self.executor.shutdown(wait=True) # wait for tasks to complete
self.assertEqual(
next(ints),
buffersize,
msg="should have fetched only `buffersize` elements from `ints`.",
)
def test_shutdown_race_issue12456(self):
# Issue #12456: race condition at shutdown where trying to post a
# sentinel in the call queue blocks (the queue is full while processes

View file

@ -0,0 +1,4 @@
Add the optional ``buffersize`` parameter to
:meth:`concurrent.futures.Executor.map` to limit the number of submitted tasks
whose results have not yet been yielded. If the buffer is full, iteration over
the *iterables* pauses until a result is yielded from the buffer.