Issue #11815: Remove dead code in concurrent.futures (since a blocking Queue

cannot raise queue.Empty).
This commit is contained in:
Antoine Pitrou 2011-04-12 17:48:46 +02:00
parent 35af9536ad
commit 27be5da831
2 changed files with 28 additions and 51 deletions

View file

@ -104,7 +104,7 @@ class _CallItem(object):
self.args = args self.args = args
self.kwargs = kwargs self.kwargs = kwargs
def _process_worker(call_queue, result_queue, shutdown): def _process_worker(call_queue, result_queue):
"""Evaluates calls from call_queue and places the results in result_queue. """Evaluates calls from call_queue and places the results in result_queue.
This worker is run in a separate process. This worker is run in a separate process.
@ -118,24 +118,19 @@ def _process_worker(call_queue, result_queue, shutdown):
worker that it should exit when call_queue is empty. worker that it should exit when call_queue is empty.
""" """
while True: while True:
call_item = call_queue.get(block=True)
if call_item is None:
# Wake up queue management thread
result_queue.put(None)
return
try: try:
call_item = call_queue.get(block=True) r = call_item.fn(*call_item.args, **call_item.kwargs)
except queue.Empty: except BaseException as e:
if shutdown.is_set(): result_queue.put(_ResultItem(call_item.work_id,
return exception=e))
else: else:
if call_item is None: result_queue.put(_ResultItem(call_item.work_id,
# Wake up queue management thread result=r))
result_queue.put(None)
return
try:
r = call_item.fn(*call_item.args, **call_item.kwargs)
except BaseException as e:
result_queue.put(_ResultItem(call_item.work_id,
exception=e))
else:
result_queue.put(_ResultItem(call_item.work_id,
result=r))
def _add_call_item_to_queue(pending_work_items, def _add_call_item_to_queue(pending_work_items,
work_ids, work_ids,
@ -179,8 +174,7 @@ def _queue_manangement_worker(executor_reference,
pending_work_items, pending_work_items,
work_ids_queue, work_ids_queue,
call_queue, call_queue,
result_queue, result_queue):
shutdown_process_event):
"""Manages the communication between this process and the worker processes. """Manages the communication between this process and the worker processes.
This function is run in a local thread. This function is run in a local thread.
@ -198,9 +192,6 @@ def _queue_manangement_worker(executor_reference,
derived from _WorkItems for processing by the process workers. derived from _WorkItems for processing by the process workers.
result_queue: A multiprocessing.Queue of _ResultItems generated by the result_queue: A multiprocessing.Queue of _ResultItems generated by the
process workers. process workers.
shutdown_process_event: A multiprocessing.Event used to signal the
process workers that they should exit when their work queue is
empty.
""" """
nb_shutdown_processes = 0 nb_shutdown_processes = 0
def shutdown_one_process(): def shutdown_one_process():
@ -213,20 +204,16 @@ def _queue_manangement_worker(executor_reference,
work_ids_queue, work_ids_queue,
call_queue) call_queue)
try: result_item = result_queue.get(block=True)
result_item = result_queue.get(block=True) if result_item is not None:
except queue.Empty: work_item = pending_work_items[result_item.work_id]
pass del pending_work_items[result_item.work_id]
else:
if result_item is not None:
work_item = pending_work_items[result_item.work_id]
del pending_work_items[result_item.work_id]
if result_item.exception: if result_item.exception:
work_item.future.set_exception(result_item.exception) work_item.future.set_exception(result_item.exception)
else: else:
work_item.future.set_result(result_item.result) work_item.future.set_result(result_item.result)
continue continue
# If we come here, we either got a timeout or were explicitly woken up. # If we come here, we either got a timeout or were explicitly woken up.
# In either case, check whether we should start shutting down. # In either case, check whether we should start shutting down.
executor = executor_reference() executor = executor_reference()
@ -238,8 +225,6 @@ def _queue_manangement_worker(executor_reference,
# Since no new work items can be added, it is safe to shutdown # Since no new work items can be added, it is safe to shutdown
# this thread if there are no pending work items. # this thread if there are no pending work items.
if not pending_work_items: if not pending_work_items:
shutdown_process_event.set()
while nb_shutdown_processes < len(processes): while nb_shutdown_processes < len(processes):
shutdown_one_process() shutdown_one_process()
# If .join() is not called on the created processes then # If .join() is not called on the created processes then
@ -306,7 +291,6 @@ class ProcessPoolExecutor(_base.Executor):
# Shutdown is a two-step process. # Shutdown is a two-step process.
self._shutdown_thread = False self._shutdown_thread = False
self._shutdown_process_event = multiprocessing.Event()
self._shutdown_lock = threading.Lock() self._shutdown_lock = threading.Lock()
self._queue_count = 0 self._queue_count = 0
self._pending_work_items = {} self._pending_work_items = {}
@ -324,8 +308,7 @@ class ProcessPoolExecutor(_base.Executor):
self._pending_work_items, self._pending_work_items,
self._work_ids, self._work_ids,
self._call_queue, self._call_queue,
self._result_queue, self._result_queue))
self._shutdown_process_event))
self._queue_management_thread.daemon = True self._queue_management_thread.daemon = True
self._queue_management_thread.start() self._queue_management_thread.start()
_threads_queues[self._queue_management_thread] = self._result_queue _threads_queues[self._queue_management_thread] = self._result_queue
@ -335,8 +318,7 @@ class ProcessPoolExecutor(_base.Executor):
p = multiprocessing.Process( p = multiprocessing.Process(
target=_process_worker, target=_process_worker,
args=(self._call_queue, args=(self._call_queue,
self._result_queue, self._result_queue))
self._shutdown_process_event))
p.start() p.start()
self._processes.add(p) self._processes.add(p)
@ -372,7 +354,6 @@ class ProcessPoolExecutor(_base.Executor):
self._queue_management_thread = None self._queue_management_thread = None
self._call_queue = None self._call_queue = None
self._result_queue = None self._result_queue = None
self._shutdown_process_event = None
self._processes = None self._processes = None
shutdown.__doc__ = _base.Executor.shutdown.__doc__ shutdown.__doc__ = _base.Executor.shutdown.__doc__

View file

@ -60,14 +60,10 @@ class _WorkItem(object):
def _worker(executor_reference, work_queue): def _worker(executor_reference, work_queue):
try: try:
while True: while True:
try: work_item = work_queue.get(block=True)
work_item = work_queue.get(block=True) if work_item is not None:
except queue.Empty: work_item.run()
pass continue
else:
if work_item is not None:
work_item.run()
continue
executor = executor_reference() executor = executor_reference()
# Exit if: # Exit if:
# - The interpreter is shutting down OR # - The interpreter is shutting down OR