mirror of
				https://github.com/python/cpython.git
				synced 2025-11-03 19:34:08 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			130 lines
		
	
	
	
		
			4.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			130 lines
		
	
	
	
		
			4.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# Copyright 2009 Brian Quinlan. All Rights Reserved.
 | 
						|
# Licensed to PSF under a Contributor Agreement.
 | 
						|
 | 
						|
"""Implements ThreadPoolExecutor."""
 | 
						|
 | 
						|
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
 | 
						|
 | 
						|
import atexit
 | 
						|
from concurrent.futures import _base
 | 
						|
import queue
 | 
						|
import threading
 | 
						|
import weakref
 | 
						|
 | 
						|
# Workers are created as daemon threads. This is done to allow the interpreter
 | 
						|
# to exit when there are still idle threads in a ThreadPoolExecutor's thread
 | 
						|
# pool (i.e. shutdown() was not called). However, allowing workers to die with
 | 
						|
# the interpreter has two undesirable properties:
 | 
						|
#   - The workers would still be running during interpretor shutdown,
 | 
						|
#     meaning that they would fail in unpredictable ways.
 | 
						|
#   - The workers could be killed while evaluating a work item, which could
 | 
						|
#     be bad if the callable being evaluated has external side-effects e.g.
 | 
						|
#     writing to a file.
 | 
						|
#
 | 
						|
# To work around this problem, an exit handler is installed which tells the
 | 
						|
# workers to exit when their work queues are empty and then waits until the
 | 
						|
# threads finish.
 | 
						|
 | 
						|
_threads_queues = weakref.WeakKeyDictionary()
 | 
						|
_shutdown = False
 | 
						|
 | 
						|
def _python_exit():
 | 
						|
    global _shutdown
 | 
						|
    _shutdown = True
 | 
						|
    items = list(_threads_queues.items())
 | 
						|
    for t, q in items:
 | 
						|
        q.put(None)
 | 
						|
    for t, q in items:
 | 
						|
        t.join()
 | 
						|
 | 
						|
atexit.register(_python_exit)
 | 
						|
 | 
						|
class _WorkItem(object):
 | 
						|
    def __init__(self, future, fn, args, kwargs):
 | 
						|
        self.future = future
 | 
						|
        self.fn = fn
 | 
						|
        self.args = args
 | 
						|
        self.kwargs = kwargs
 | 
						|
 | 
						|
    def run(self):
 | 
						|
        if not self.future.set_running_or_notify_cancel():
 | 
						|
            return
 | 
						|
 | 
						|
        try:
 | 
						|
            result = self.fn(*self.args, **self.kwargs)
 | 
						|
        except BaseException as e:
 | 
						|
            self.future.set_exception(e)
 | 
						|
        else:
 | 
						|
            self.future.set_result(result)
 | 
						|
 | 
						|
def _worker(executor_reference, work_queue):
 | 
						|
    try:
 | 
						|
        while True:
 | 
						|
            work_item = work_queue.get(block=True)
 | 
						|
            if work_item is not None:
 | 
						|
                work_item.run()
 | 
						|
                continue
 | 
						|
            executor = executor_reference()
 | 
						|
            # Exit if:
 | 
						|
            #   - The interpreter is shutting down OR
 | 
						|
            #   - The executor that owns the worker has been collected OR
 | 
						|
            #   - The executor that owns the worker has been shutdown.
 | 
						|
            if _shutdown or executor is None or executor._shutdown:
 | 
						|
                # Notice other workers
 | 
						|
                work_queue.put(None)
 | 
						|
                return
 | 
						|
            del executor
 | 
						|
    except BaseException:
 | 
						|
        _base.LOGGER.critical('Exception in worker', exc_info=True)
 | 
						|
 | 
						|
class ThreadPoolExecutor(_base.Executor):
 | 
						|
    def __init__(self, max_workers):
 | 
						|
        """Initializes a new ThreadPoolExecutor instance.
 | 
						|
 | 
						|
        Args:
 | 
						|
            max_workers: The maximum number of threads that can be used to
 | 
						|
                execute the given calls.
 | 
						|
        """
 | 
						|
        self._max_workers = max_workers
 | 
						|
        self._work_queue = queue.Queue()
 | 
						|
        self._threads = set()
 | 
						|
        self._shutdown = False
 | 
						|
        self._shutdown_lock = threading.Lock()
 | 
						|
 | 
						|
    def submit(self, fn, *args, **kwargs):
 | 
						|
        with self._shutdown_lock:
 | 
						|
            if self._shutdown:
 | 
						|
                raise RuntimeError('cannot schedule new futures after shutdown')
 | 
						|
 | 
						|
            f = _base.Future()
 | 
						|
            w = _WorkItem(f, fn, args, kwargs)
 | 
						|
 | 
						|
            self._work_queue.put(w)
 | 
						|
            self._adjust_thread_count()
 | 
						|
            return f
 | 
						|
    submit.__doc__ = _base.Executor.submit.__doc__
 | 
						|
 | 
						|
    def _adjust_thread_count(self):
 | 
						|
        # When the executor gets lost, the weakref callback will wake up
 | 
						|
        # the worker threads.
 | 
						|
        def weakref_cb(_, q=self._work_queue):
 | 
						|
            q.put(None)
 | 
						|
        # TODO(bquinlan): Should avoid creating new threads if there are more
 | 
						|
        # idle threads than items in the work queue.
 | 
						|
        if len(self._threads) < self._max_workers:
 | 
						|
            t = threading.Thread(target=_worker,
 | 
						|
                                 args=(weakref.ref(self, weakref_cb),
 | 
						|
                                       self._work_queue))
 | 
						|
            t.daemon = True
 | 
						|
            t.start()
 | 
						|
            self._threads.add(t)
 | 
						|
            _threads_queues[t] = self._work_queue
 | 
						|
 | 
						|
    def shutdown(self, wait=True):
 | 
						|
        with self._shutdown_lock:
 | 
						|
            self._shutdown = True
 | 
						|
            self._work_queue.put(None)
 | 
						|
        if wait:
 | 
						|
            for t in self._threads:
 | 
						|
                t.join()
 | 
						|
    shutdown.__doc__ = _base.Executor.shutdown.__doc__
 |