Fix issue #20367: concurrent.futures.as_completed() for duplicate arguments.

Patch by Glenn Langford.
This commit is contained in:
Guido van Rossum 2014-01-26 09:57:51 -08:00
parent 252fd0c24b
commit e6994ff6e3
4 changed files with 16 additions and 3 deletions

View file

@ -371,7 +371,8 @@ Module Functions
Returns an iterator over the :class:`Future` instances (possibly created by Returns an iterator over the :class:`Future` instances (possibly created by
different :class:`Executor` instances) given by *fs* that yields futures as different :class:`Executor` instances) given by *fs* that yields futures as
they complete (finished or were cancelled). Any futures that completed they complete (finished or were cancelled). Any futures given by *fs* that
are duplicated will be returned once. Any futures that completed
before :func:`as_completed` is called will be yielded first. The returned before :func:`as_completed` is called will be yielded first. The returned
iterator raises a :exc:`TimeoutError` if :meth:`~iterator.__next__` is iterator raises a :exc:`TimeoutError` if :meth:`~iterator.__next__` is
called and the result isn't available after *timeout* seconds from the called and the result isn't available after *timeout* seconds from the

View file

@ -181,7 +181,8 @@ def as_completed(fs, timeout=None):
Returns: Returns:
An iterator that yields the given Futures as they complete (finished or An iterator that yields the given Futures as they complete (finished or
cancelled). cancelled). If any given Futures are duplicated, they will be returned
once.
Raises: Raises:
TimeoutError: If the entire result iterator could not be generated TimeoutError: If the entire result iterator could not be generated
@ -190,11 +191,12 @@ def as_completed(fs, timeout=None):
if timeout is not None: if timeout is not None:
end_time = timeout + time.time() end_time = timeout + time.time()
fs = set(fs)
with _AcquireFutures(fs): with _AcquireFutures(fs):
finished = set( finished = set(
f for f in fs f for f in fs
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
pending = set(fs) - finished pending = fs - finished
waiter = _create_and_install_waiters(fs, _AS_COMPLETED) waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
try: try:

View file

@ -350,6 +350,13 @@ class AsCompletedTests:
SUCCESSFUL_FUTURE]), SUCCESSFUL_FUTURE]),
completed_futures) completed_futures)
def test_duplicate_futures(self):
# Issue 20367. Duplicate futures should not raise exceptions or give
# duplicate responses.
future1 = self.executor.submit(time.sleep, 2)
completed = [f for f in futures.as_completed([future1,future1])]
self.assertEqual(len(completed), 1)
class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests, unittest.TestCase): class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests, unittest.TestCase):
pass pass

View file

@ -36,6 +36,9 @@ Core and Builtins
Library Library
------- -------
- Issue #20367: Fix behavior of concurrent.futures.as_completed() for
duplicate arguments. Patch by Glenn Langford.
- Issue #8260: The read(), readline() and readlines() methods of - Issue #8260: The read(), readline() and readlines() methods of
codecs.StreamReader returned incomplete data when were called after codecs.StreamReader returned incomplete data when were called after
readline() or read(size). Based on patch by Amaury Forgeot d'Arc. readline() or read(size). Based on patch by Amaury Forgeot d'Arc.