mirror of
				https://github.com/python/cpython.git
				synced 2025-11-03 19:34:08 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			639 lines
		
	
	
	
		
			22 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			639 lines
		
	
	
	
		
			22 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
import test.support
 | 
						|
 | 
						|
# Skip tests if _multiprocessing wasn't built.
 | 
						|
test.support.import_module('_multiprocessing')
 | 
						|
# Skip tests if sem_open implementation is broken.
 | 
						|
test.support.import_module('multiprocessing.synchronize')
 | 
						|
# import threading after _multiprocessing to raise a more revelant error
 | 
						|
# message: "No module named _multiprocessing". _multiprocessing is not compiled
 | 
						|
# without thread support.
 | 
						|
test.support.import_module('threading')
 | 
						|
 | 
						|
from test.script_helper import assert_python_ok
 | 
						|
 | 
						|
import sys
 | 
						|
import threading
 | 
						|
import time
 | 
						|
import unittest
 | 
						|
 | 
						|
from concurrent import futures
 | 
						|
from concurrent.futures._base import (
 | 
						|
    PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future)
 | 
						|
import concurrent.futures.process
 | 
						|
 | 
						|
 | 
						|
def create_future(state=PENDING, exception=None, result=None):
 | 
						|
    f = Future()
 | 
						|
    f._state = state
 | 
						|
    f._exception = exception
 | 
						|
    f._result = result
 | 
						|
    return f
 | 
						|
 | 
						|
 | 
						|
PENDING_FUTURE = create_future(state=PENDING)
 | 
						|
RUNNING_FUTURE = create_future(state=RUNNING)
 | 
						|
CANCELLED_FUTURE = create_future(state=CANCELLED)
 | 
						|
CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
 | 
						|
EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError())
 | 
						|
SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
 | 
						|
 | 
						|
 | 
						|
def mul(x, y):
 | 
						|
    return x * y
 | 
						|
 | 
						|
 | 
						|
def sleep_and_raise(t):
 | 
						|
    time.sleep(t)
 | 
						|
    raise Exception('this is an exception')
 | 
						|
 | 
						|
def sleep_and_print(t, msg):
 | 
						|
    time.sleep(t)
 | 
						|
    print(msg)
 | 
						|
    sys.stdout.flush()
 | 
						|
 | 
						|
 | 
						|
class ExecutorMixin:
 | 
						|
    worker_count = 5
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        self.t1 = time.time()
 | 
						|
        try:
 | 
						|
            self.executor = self.executor_type(max_workers=self.worker_count)
 | 
						|
        except NotImplementedError as e:
 | 
						|
            self.skipTest(str(e))
 | 
						|
        self._prime_executor()
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        self.executor.shutdown(wait=True)
 | 
						|
        dt = time.time() - self.t1
 | 
						|
        if test.support.verbose:
 | 
						|
            print("%.2fs" % dt, end=' ')
 | 
						|
        self.assertLess(dt, 60, "synchronization issue: test lasted too long")
 | 
						|
 | 
						|
    def _prime_executor(self):
 | 
						|
        # Make sure that the executor is ready to do work before running the
 | 
						|
        # tests. This should reduce the probability of timeouts in the tests.
 | 
						|
        futures = [self.executor.submit(time.sleep, 0.1)
 | 
						|
                   for _ in range(self.worker_count)]
 | 
						|
 | 
						|
        for f in futures:
 | 
						|
            f.result()
 | 
						|
 | 
						|
 | 
						|
class ThreadPoolMixin(ExecutorMixin):
 | 
						|
    executor_type = futures.ThreadPoolExecutor
 | 
						|
 | 
						|
 | 
						|
class ProcessPoolMixin(ExecutorMixin):
 | 
						|
    executor_type = futures.ProcessPoolExecutor
 | 
						|
 | 
						|
 | 
						|
class ExecutorShutdownTest(unittest.TestCase):
 | 
						|
    def test_run_after_shutdown(self):
 | 
						|
        self.executor.shutdown()
 | 
						|
        self.assertRaises(RuntimeError,
 | 
						|
                          self.executor.submit,
 | 
						|
                          pow, 2, 5)
 | 
						|
 | 
						|
    def test_interpreter_shutdown(self):
 | 
						|
        # Test the atexit hook for shutdown of worker threads and processes
 | 
						|
        rc, out, err = assert_python_ok('-c', """if 1:
 | 
						|
            from concurrent.futures import {executor_type}
 | 
						|
            from time import sleep
 | 
						|
            from test.test_concurrent_futures import sleep_and_print
 | 
						|
            t = {executor_type}(5)
 | 
						|
            t.submit(sleep_and_print, 1.0, "apple")
 | 
						|
            """.format(executor_type=self.executor_type.__name__))
 | 
						|
        # Errors in atexit hooks don't change the process exit code, check
 | 
						|
        # stderr manually.
 | 
						|
        self.assertFalse(err)
 | 
						|
        self.assertEqual(out.strip(), b"apple")
 | 
						|
 | 
						|
    def test_hang_issue12364(self):
 | 
						|
        fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
 | 
						|
        self.executor.shutdown()
 | 
						|
        for f in fs:
 | 
						|
            f.result()
 | 
						|
 | 
						|
 | 
						|
class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest):
 | 
						|
    def _prime_executor(self):
 | 
						|
        pass
 | 
						|
 | 
						|
    def test_threads_terminate(self):
 | 
						|
        self.executor.submit(mul, 21, 2)
 | 
						|
        self.executor.submit(mul, 6, 7)
 | 
						|
        self.executor.submit(mul, 3, 14)
 | 
						|
        self.assertEqual(len(self.executor._threads), 3)
 | 
						|
        self.executor.shutdown()
 | 
						|
        for t in self.executor._threads:
 | 
						|
            t.join()
 | 
						|
 | 
						|
    def test_context_manager_shutdown(self):
 | 
						|
        with futures.ThreadPoolExecutor(max_workers=5) as e:
 | 
						|
            executor = e
 | 
						|
            self.assertEqual(list(e.map(abs, range(-5, 5))),
 | 
						|
                             [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
 | 
						|
 | 
						|
        for t in executor._threads:
 | 
						|
            t.join()
 | 
						|
 | 
						|
    def test_del_shutdown(self):
 | 
						|
        executor = futures.ThreadPoolExecutor(max_workers=5)
 | 
						|
        executor.map(abs, range(-5, 5))
 | 
						|
        threads = executor._threads
 | 
						|
        del executor
 | 
						|
 | 
						|
        for t in threads:
 | 
						|
            t.join()
 | 
						|
 | 
						|
 | 
						|
class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest):
 | 
						|
    def _prime_executor(self):
 | 
						|
        pass
 | 
						|
 | 
						|
    def test_processes_terminate(self):
 | 
						|
        self.executor.submit(mul, 21, 2)
 | 
						|
        self.executor.submit(mul, 6, 7)
 | 
						|
        self.executor.submit(mul, 3, 14)
 | 
						|
        self.assertEqual(len(self.executor._processes), 5)
 | 
						|
        processes = self.executor._processes
 | 
						|
        self.executor.shutdown()
 | 
						|
 | 
						|
        for p in processes:
 | 
						|
            p.join()
 | 
						|
 | 
						|
    def test_context_manager_shutdown(self):
 | 
						|
        with futures.ProcessPoolExecutor(max_workers=5) as e:
 | 
						|
            processes = e._processes
 | 
						|
            self.assertEqual(list(e.map(abs, range(-5, 5))),
 | 
						|
                             [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
 | 
						|
 | 
						|
        for p in processes:
 | 
						|
            p.join()
 | 
						|
 | 
						|
    def test_del_shutdown(self):
 | 
						|
        executor = futures.ProcessPoolExecutor(max_workers=5)
 | 
						|
        list(executor.map(abs, range(-5, 5)))
 | 
						|
        queue_management_thread = executor._queue_management_thread
 | 
						|
        processes = executor._processes
 | 
						|
        del executor
 | 
						|
 | 
						|
        queue_management_thread.join()
 | 
						|
        for p in processes:
 | 
						|
            p.join()
 | 
						|
 | 
						|
 | 
						|
class WaitTests(unittest.TestCase):
 | 
						|
 | 
						|
    def test_first_completed(self):
 | 
						|
        future1 = self.executor.submit(mul, 21, 2)
 | 
						|
        future2 = self.executor.submit(time.sleep, 1.5)
 | 
						|
 | 
						|
        done, not_done = futures.wait(
 | 
						|
                [CANCELLED_FUTURE, future1, future2],
 | 
						|
                 return_when=futures.FIRST_COMPLETED)
 | 
						|
 | 
						|
        self.assertEqual(set([future1]), done)
 | 
						|
        self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
 | 
						|
 | 
						|
    def test_first_completed_some_already_completed(self):
 | 
						|
        future1 = self.executor.submit(time.sleep, 1.5)
 | 
						|
 | 
						|
        finished, pending = futures.wait(
 | 
						|
                 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
 | 
						|
                 return_when=futures.FIRST_COMPLETED)
 | 
						|
 | 
						|
        self.assertEqual(
 | 
						|
                set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
 | 
						|
                finished)
 | 
						|
        self.assertEqual(set([future1]), pending)
 | 
						|
 | 
						|
    def test_first_exception(self):
 | 
						|
        future1 = self.executor.submit(mul, 2, 21)
 | 
						|
        future2 = self.executor.submit(sleep_and_raise, 1.5)
 | 
						|
        future3 = self.executor.submit(time.sleep, 3)
 | 
						|
 | 
						|
        finished, pending = futures.wait(
 | 
						|
                [future1, future2, future3],
 | 
						|
                return_when=futures.FIRST_EXCEPTION)
 | 
						|
 | 
						|
        self.assertEqual(set([future1, future2]), finished)
 | 
						|
        self.assertEqual(set([future3]), pending)
 | 
						|
 | 
						|
    def test_first_exception_some_already_complete(self):
 | 
						|
        future1 = self.executor.submit(divmod, 21, 0)
 | 
						|
        future2 = self.executor.submit(time.sleep, 1.5)
 | 
						|
 | 
						|
        finished, pending = futures.wait(
 | 
						|
                [SUCCESSFUL_FUTURE,
 | 
						|
                 CANCELLED_FUTURE,
 | 
						|
                 CANCELLED_AND_NOTIFIED_FUTURE,
 | 
						|
                 future1, future2],
 | 
						|
                return_when=futures.FIRST_EXCEPTION)
 | 
						|
 | 
						|
        self.assertEqual(set([SUCCESSFUL_FUTURE,
 | 
						|
                              CANCELLED_AND_NOTIFIED_FUTURE,
 | 
						|
                              future1]), finished)
 | 
						|
        self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
 | 
						|
 | 
						|
    def test_first_exception_one_already_failed(self):
 | 
						|
        future1 = self.executor.submit(time.sleep, 2)
 | 
						|
 | 
						|
        finished, pending = futures.wait(
 | 
						|
                 [EXCEPTION_FUTURE, future1],
 | 
						|
                 return_when=futures.FIRST_EXCEPTION)
 | 
						|
 | 
						|
        self.assertEqual(set([EXCEPTION_FUTURE]), finished)
 | 
						|
        self.assertEqual(set([future1]), pending)
 | 
						|
 | 
						|
    def test_all_completed(self):
 | 
						|
        future1 = self.executor.submit(divmod, 2, 0)
 | 
						|
        future2 = self.executor.submit(mul, 2, 21)
 | 
						|
 | 
						|
        finished, pending = futures.wait(
 | 
						|
                [SUCCESSFUL_FUTURE,
 | 
						|
                 CANCELLED_AND_NOTIFIED_FUTURE,
 | 
						|
                 EXCEPTION_FUTURE,
 | 
						|
                 future1,
 | 
						|
                 future2],
 | 
						|
                return_when=futures.ALL_COMPLETED)
 | 
						|
 | 
						|
        self.assertEqual(set([SUCCESSFUL_FUTURE,
 | 
						|
                              CANCELLED_AND_NOTIFIED_FUTURE,
 | 
						|
                              EXCEPTION_FUTURE,
 | 
						|
                              future1,
 | 
						|
                              future2]), finished)
 | 
						|
        self.assertEqual(set(), pending)
 | 
						|
 | 
						|
    def test_timeout(self):
 | 
						|
        future1 = self.executor.submit(mul, 6, 7)
 | 
						|
        future2 = self.executor.submit(time.sleep, 3)
 | 
						|
 | 
						|
        finished, pending = futures.wait(
 | 
						|
                [CANCELLED_AND_NOTIFIED_FUTURE,
 | 
						|
                 EXCEPTION_FUTURE,
 | 
						|
                 SUCCESSFUL_FUTURE,
 | 
						|
                 future1, future2],
 | 
						|
                timeout=1.5,
 | 
						|
                return_when=futures.ALL_COMPLETED)
 | 
						|
 | 
						|
        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
 | 
						|
                              EXCEPTION_FUTURE,
 | 
						|
                              SUCCESSFUL_FUTURE,
 | 
						|
                              future1]), finished)
 | 
						|
        self.assertEqual(set([future2]), pending)
 | 
						|
 | 
						|
 | 
						|
class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests):
 | 
						|
 | 
						|
    def test_pending_calls_race(self):
 | 
						|
        # Issue #14406: multi-threaded race condition when waiting on all
 | 
						|
        # futures.
 | 
						|
        event = threading.Event()
 | 
						|
        def future_func():
 | 
						|
            event.wait()
 | 
						|
        oldswitchinterval = sys.getswitchinterval()
 | 
						|
        sys.setswitchinterval(1e-6)
 | 
						|
        try:
 | 
						|
            fs = {self.executor.submit(future_func) for i in range(100)}
 | 
						|
            event.set()
 | 
						|
            futures.wait(fs, return_when=futures.ALL_COMPLETED)
 | 
						|
        finally:
 | 
						|
            sys.setswitchinterval(oldswitchinterval)
 | 
						|
 | 
						|
 | 
						|
class ProcessPoolWaitTests(ProcessPoolMixin, WaitTests):
 | 
						|
    pass
 | 
						|
 | 
						|
 | 
						|
class AsCompletedTests(unittest.TestCase):
 | 
						|
    # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
 | 
						|
    def test_no_timeout(self):
 | 
						|
        future1 = self.executor.submit(mul, 2, 21)
 | 
						|
        future2 = self.executor.submit(mul, 7, 6)
 | 
						|
 | 
						|
        completed = set(futures.as_completed(
 | 
						|
                [CANCELLED_AND_NOTIFIED_FUTURE,
 | 
						|
                 EXCEPTION_FUTURE,
 | 
						|
                 SUCCESSFUL_FUTURE,
 | 
						|
                 future1, future2]))
 | 
						|
        self.assertEqual(set(
 | 
						|
                [CANCELLED_AND_NOTIFIED_FUTURE,
 | 
						|
                 EXCEPTION_FUTURE,
 | 
						|
                 SUCCESSFUL_FUTURE,
 | 
						|
                 future1, future2]),
 | 
						|
                completed)
 | 
						|
 | 
						|
    def test_zero_timeout(self):
 | 
						|
        future1 = self.executor.submit(time.sleep, 2)
 | 
						|
        completed_futures = set()
 | 
						|
        try:
 | 
						|
            for future in futures.as_completed(
 | 
						|
                    [CANCELLED_AND_NOTIFIED_FUTURE,
 | 
						|
                     EXCEPTION_FUTURE,
 | 
						|
                     SUCCESSFUL_FUTURE,
 | 
						|
                     future1],
 | 
						|
                    timeout=0):
 | 
						|
                completed_futures.add(future)
 | 
						|
        except futures.TimeoutError:
 | 
						|
            pass
 | 
						|
 | 
						|
        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
 | 
						|
                              EXCEPTION_FUTURE,
 | 
						|
                              SUCCESSFUL_FUTURE]),
 | 
						|
                         completed_futures)
 | 
						|
 | 
						|
 | 
						|
class ThreadPoolAsCompletedTests(ThreadPoolMixin, AsCompletedTests):
 | 
						|
    pass
 | 
						|
 | 
						|
 | 
						|
class ProcessPoolAsCompletedTests(ProcessPoolMixin, AsCompletedTests):
 | 
						|
    pass
 | 
						|
 | 
						|
 | 
						|
class ExecutorTest(unittest.TestCase):
 | 
						|
    # Executor.shutdown() and context manager usage is tested by
 | 
						|
    # ExecutorShutdownTest.
 | 
						|
    def test_submit(self):
 | 
						|
        future = self.executor.submit(pow, 2, 8)
 | 
						|
        self.assertEqual(256, future.result())
 | 
						|
 | 
						|
    def test_submit_keyword(self):
 | 
						|
        future = self.executor.submit(mul, 2, y=8)
 | 
						|
        self.assertEqual(16, future.result())
 | 
						|
 | 
						|
    def test_map(self):
 | 
						|
        self.assertEqual(
 | 
						|
                list(self.executor.map(pow, range(10), range(10))),
 | 
						|
                list(map(pow, range(10), range(10))))
 | 
						|
 | 
						|
    def test_map_exception(self):
 | 
						|
        i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
 | 
						|
        self.assertEqual(i.__next__(), (0, 1))
 | 
						|
        self.assertEqual(i.__next__(), (0, 1))
 | 
						|
        self.assertRaises(ZeroDivisionError, i.__next__)
 | 
						|
 | 
						|
    def test_map_timeout(self):
 | 
						|
        results = []
 | 
						|
        try:
 | 
						|
            for i in self.executor.map(time.sleep,
 | 
						|
                                       [0, 0, 3],
 | 
						|
                                       timeout=1.5):
 | 
						|
                results.append(i)
 | 
						|
        except futures.TimeoutError:
 | 
						|
            pass
 | 
						|
        else:
 | 
						|
            self.fail('expected TimeoutError')
 | 
						|
 | 
						|
        self.assertEqual([None, None], results)
 | 
						|
 | 
						|
 | 
						|
class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest):
 | 
						|
    pass
 | 
						|
 | 
						|
 | 
						|
class ProcessPoolExecutorTest(ProcessPoolMixin, ExecutorTest):
 | 
						|
    pass
 | 
						|
 | 
						|
 | 
						|
class FutureTests(unittest.TestCase):
 | 
						|
    def test_done_callback_with_result(self):
 | 
						|
        callback_result = None
 | 
						|
        def fn(callback_future):
 | 
						|
            nonlocal callback_result
 | 
						|
            callback_result = callback_future.result()
 | 
						|
 | 
						|
        f = Future()
 | 
						|
        f.add_done_callback(fn)
 | 
						|
        f.set_result(5)
 | 
						|
        self.assertEqual(5, callback_result)
 | 
						|
 | 
						|
    def test_done_callback_with_exception(self):
 | 
						|
        callback_exception = None
 | 
						|
        def fn(callback_future):
 | 
						|
            nonlocal callback_exception
 | 
						|
            callback_exception = callback_future.exception()
 | 
						|
 | 
						|
        f = Future()
 | 
						|
        f.add_done_callback(fn)
 | 
						|
        f.set_exception(Exception('test'))
 | 
						|
        self.assertEqual(('test',), callback_exception.args)
 | 
						|
 | 
						|
    def test_done_callback_with_cancel(self):
 | 
						|
        was_cancelled = None
 | 
						|
        def fn(callback_future):
 | 
						|
            nonlocal was_cancelled
 | 
						|
            was_cancelled = callback_future.cancelled()
 | 
						|
 | 
						|
        f = Future()
 | 
						|
        f.add_done_callback(fn)
 | 
						|
        self.assertTrue(f.cancel())
 | 
						|
        self.assertTrue(was_cancelled)
 | 
						|
 | 
						|
    def test_done_callback_raises(self):
 | 
						|
        with test.support.captured_stderr() as stderr:
 | 
						|
            raising_was_called = False
 | 
						|
            fn_was_called = False
 | 
						|
 | 
						|
            def raising_fn(callback_future):
 | 
						|
                nonlocal raising_was_called
 | 
						|
                raising_was_called = True
 | 
						|
                raise Exception('doh!')
 | 
						|
 | 
						|
            def fn(callback_future):
 | 
						|
                nonlocal fn_was_called
 | 
						|
                fn_was_called = True
 | 
						|
 | 
						|
            f = Future()
 | 
						|
            f.add_done_callback(raising_fn)
 | 
						|
            f.add_done_callback(fn)
 | 
						|
            f.set_result(5)
 | 
						|
            self.assertTrue(raising_was_called)
 | 
						|
            self.assertTrue(fn_was_called)
 | 
						|
            self.assertIn('Exception: doh!', stderr.getvalue())
 | 
						|
 | 
						|
    def test_done_callback_already_successful(self):
 | 
						|
        callback_result = None
 | 
						|
        def fn(callback_future):
 | 
						|
            nonlocal callback_result
 | 
						|
            callback_result = callback_future.result()
 | 
						|
 | 
						|
        f = Future()
 | 
						|
        f.set_result(5)
 | 
						|
        f.add_done_callback(fn)
 | 
						|
        self.assertEqual(5, callback_result)
 | 
						|
 | 
						|
    def test_done_callback_already_failed(self):
 | 
						|
        callback_exception = None
 | 
						|
        def fn(callback_future):
 | 
						|
            nonlocal callback_exception
 | 
						|
            callback_exception = callback_future.exception()
 | 
						|
 | 
						|
        f = Future()
 | 
						|
        f.set_exception(Exception('test'))
 | 
						|
        f.add_done_callback(fn)
 | 
						|
        self.assertEqual(('test',), callback_exception.args)
 | 
						|
 | 
						|
    def test_done_callback_already_cancelled(self):
 | 
						|
        was_cancelled = None
 | 
						|
        def fn(callback_future):
 | 
						|
            nonlocal was_cancelled
 | 
						|
            was_cancelled = callback_future.cancelled()
 | 
						|
 | 
						|
        f = Future()
 | 
						|
        self.assertTrue(f.cancel())
 | 
						|
        f.add_done_callback(fn)
 | 
						|
        self.assertTrue(was_cancelled)
 | 
						|
 | 
						|
    def test_repr(self):
 | 
						|
        self.assertRegex(repr(PENDING_FUTURE),
 | 
						|
                         '<Future at 0x[0-9a-f]+ state=pending>')
 | 
						|
        self.assertRegex(repr(RUNNING_FUTURE),
 | 
						|
                         '<Future at 0x[0-9a-f]+ state=running>')
 | 
						|
        self.assertRegex(repr(CANCELLED_FUTURE),
 | 
						|
                         '<Future at 0x[0-9a-f]+ state=cancelled>')
 | 
						|
        self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE),
 | 
						|
                         '<Future at 0x[0-9a-f]+ state=cancelled>')
 | 
						|
        self.assertRegex(
 | 
						|
                repr(EXCEPTION_FUTURE),
 | 
						|
                '<Future at 0x[0-9a-f]+ state=finished raised IOError>')
 | 
						|
        self.assertRegex(
 | 
						|
                repr(SUCCESSFUL_FUTURE),
 | 
						|
                '<Future at 0x[0-9a-f]+ state=finished returned int>')
 | 
						|
 | 
						|
 | 
						|
    def test_cancel(self):
 | 
						|
        f1 = create_future(state=PENDING)
 | 
						|
        f2 = create_future(state=RUNNING)
 | 
						|
        f3 = create_future(state=CANCELLED)
 | 
						|
        f4 = create_future(state=CANCELLED_AND_NOTIFIED)
 | 
						|
        f5 = create_future(state=FINISHED, exception=IOError())
 | 
						|
        f6 = create_future(state=FINISHED, result=5)
 | 
						|
 | 
						|
        self.assertTrue(f1.cancel())
 | 
						|
        self.assertEqual(f1._state, CANCELLED)
 | 
						|
 | 
						|
        self.assertFalse(f2.cancel())
 | 
						|
        self.assertEqual(f2._state, RUNNING)
 | 
						|
 | 
						|
        self.assertTrue(f3.cancel())
 | 
						|
        self.assertEqual(f3._state, CANCELLED)
 | 
						|
 | 
						|
        self.assertTrue(f4.cancel())
 | 
						|
        self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
 | 
						|
 | 
						|
        self.assertFalse(f5.cancel())
 | 
						|
        self.assertEqual(f5._state, FINISHED)
 | 
						|
 | 
						|
        self.assertFalse(f6.cancel())
 | 
						|
        self.assertEqual(f6._state, FINISHED)
 | 
						|
 | 
						|
    def test_cancelled(self):
 | 
						|
        self.assertFalse(PENDING_FUTURE.cancelled())
 | 
						|
        self.assertFalse(RUNNING_FUTURE.cancelled())
 | 
						|
        self.assertTrue(CANCELLED_FUTURE.cancelled())
 | 
						|
        self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
 | 
						|
        self.assertFalse(EXCEPTION_FUTURE.cancelled())
 | 
						|
        self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
 | 
						|
 | 
						|
    def test_done(self):
 | 
						|
        self.assertFalse(PENDING_FUTURE.done())
 | 
						|
        self.assertFalse(RUNNING_FUTURE.done())
 | 
						|
        self.assertTrue(CANCELLED_FUTURE.done())
 | 
						|
        self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
 | 
						|
        self.assertTrue(EXCEPTION_FUTURE.done())
 | 
						|
        self.assertTrue(SUCCESSFUL_FUTURE.done())
 | 
						|
 | 
						|
    def test_running(self):
 | 
						|
        self.assertFalse(PENDING_FUTURE.running())
 | 
						|
        self.assertTrue(RUNNING_FUTURE.running())
 | 
						|
        self.assertFalse(CANCELLED_FUTURE.running())
 | 
						|
        self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
 | 
						|
        self.assertFalse(EXCEPTION_FUTURE.running())
 | 
						|
        self.assertFalse(SUCCESSFUL_FUTURE.running())
 | 
						|
 | 
						|
    def test_result_with_timeout(self):
 | 
						|
        self.assertRaises(futures.TimeoutError,
 | 
						|
                          PENDING_FUTURE.result, timeout=0)
 | 
						|
        self.assertRaises(futures.TimeoutError,
 | 
						|
                          RUNNING_FUTURE.result, timeout=0)
 | 
						|
        self.assertRaises(futures.CancelledError,
 | 
						|
                          CANCELLED_FUTURE.result, timeout=0)
 | 
						|
        self.assertRaises(futures.CancelledError,
 | 
						|
                          CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
 | 
						|
        self.assertRaises(IOError, EXCEPTION_FUTURE.result, timeout=0)
 | 
						|
        self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
 | 
						|
 | 
						|
    def test_result_with_success(self):
 | 
						|
        # TODO(brian@sweetapp.com): This test is timing dependant.
 | 
						|
        def notification():
 | 
						|
            # Wait until the main thread is waiting for the result.
 | 
						|
            time.sleep(1)
 | 
						|
            f1.set_result(42)
 | 
						|
 | 
						|
        f1 = create_future(state=PENDING)
 | 
						|
        t = threading.Thread(target=notification)
 | 
						|
        t.start()
 | 
						|
 | 
						|
        self.assertEqual(f1.result(timeout=5), 42)
 | 
						|
 | 
						|
    def test_result_with_cancel(self):
 | 
						|
        # TODO(brian@sweetapp.com): This test is timing dependant.
 | 
						|
        def notification():
 | 
						|
            # Wait until the main thread is waiting for the result.
 | 
						|
            time.sleep(1)
 | 
						|
            f1.cancel()
 | 
						|
 | 
						|
        f1 = create_future(state=PENDING)
 | 
						|
        t = threading.Thread(target=notification)
 | 
						|
        t.start()
 | 
						|
 | 
						|
        self.assertRaises(futures.CancelledError, f1.result, timeout=5)
 | 
						|
 | 
						|
    def test_exception_with_timeout(self):
 | 
						|
        self.assertRaises(futures.TimeoutError,
 | 
						|
                          PENDING_FUTURE.exception, timeout=0)
 | 
						|
        self.assertRaises(futures.TimeoutError,
 | 
						|
                          RUNNING_FUTURE.exception, timeout=0)
 | 
						|
        self.assertRaises(futures.CancelledError,
 | 
						|
                          CANCELLED_FUTURE.exception, timeout=0)
 | 
						|
        self.assertRaises(futures.CancelledError,
 | 
						|
                          CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
 | 
						|
        self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
 | 
						|
                                   IOError))
 | 
						|
        self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
 | 
						|
 | 
						|
    def test_exception_with_success(self):
 | 
						|
        def notification():
 | 
						|
            # Wait until the main thread is waiting for the exception.
 | 
						|
            time.sleep(1)
 | 
						|
            with f1._condition:
 | 
						|
                f1._state = FINISHED
 | 
						|
                f1._exception = IOError()
 | 
						|
                f1._condition.notify_all()
 | 
						|
 | 
						|
        f1 = create_future(state=PENDING)
 | 
						|
        t = threading.Thread(target=notification)
 | 
						|
        t.start()
 | 
						|
 | 
						|
        self.assertTrue(isinstance(f1.exception(timeout=5), IOError))
 | 
						|
 | 
						|
@test.support.reap_threads
 | 
						|
def test_main():
 | 
						|
    try:
 | 
						|
        test.support.run_unittest(ProcessPoolExecutorTest,
 | 
						|
                                  ThreadPoolExecutorTest,
 | 
						|
                                  ProcessPoolWaitTests,
 | 
						|
                                  ThreadPoolWaitTests,
 | 
						|
                                  ProcessPoolAsCompletedTests,
 | 
						|
                                  ThreadPoolAsCompletedTests,
 | 
						|
                                  FutureTests,
 | 
						|
                                  ProcessPoolShutdownTest,
 | 
						|
                                  ThreadPoolShutdownTest)
 | 
						|
    finally:
 | 
						|
        test.support.reap_children()
 | 
						|
 | 
						|
if __name__ == "__main__":
 | 
						|
    test_main()
 |