mirror of
				https://github.com/python/cpython.git
				synced 2025-10-26 16:27:06 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			140 lines
		
	
	
	
		
			4.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			140 lines
		
	
	
	
		
			4.8 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| # Copyright 2009 Brian Quinlan. All Rights Reserved.
 | |
| # Licensed to PSF under a Contributor Agreement.
 | |
| 
 | |
| """Implements ThreadPoolExecutor."""
 | |
| 
 | |
| __author__ = 'Brian Quinlan (brian@sweetapp.com)'
 | |
| 
 | |
| import atexit
 | |
| from concurrent.futures import _base
 | |
| import queue
 | |
| import threading
 | |
| import weakref
 | |
| import os
 | |
| 
 | |
| # Workers are created as daemon threads. This is done to allow the interpreter
 | |
| # to exit when there are still idle threads in a ThreadPoolExecutor's thread
 | |
| # pool (i.e. shutdown() was not called). However, allowing workers to die with
 | |
| # the interpreter has two undesirable properties:
 | |
| #   - The workers would still be running during interpretor shutdown,
 | |
| #     meaning that they would fail in unpredictable ways.
 | |
| #   - The workers could be killed while evaluating a work item, which could
 | |
| #     be bad if the callable being evaluated has external side-effects e.g.
 | |
| #     writing to a file.
 | |
| #
 | |
| # To work around this problem, an exit handler is installed which tells the
 | |
| # workers to exit when their work queues are empty and then waits until the
 | |
| # threads finish.
 | |
| 
 | |
| _threads_queues = weakref.WeakKeyDictionary()
 | |
| _shutdown = False
 | |
| 
 | |
| def _python_exit():
 | |
|     global _shutdown
 | |
|     _shutdown = True
 | |
|     items = list(_threads_queues.items())
 | |
|     for t, q in items:
 | |
|         q.put(None)
 | |
|     for t, q in items:
 | |
|         t.join()
 | |
| 
 | |
| atexit.register(_python_exit)
 | |
| 
 | |
| class _WorkItem(object):
 | |
|     def __init__(self, future, fn, args, kwargs):
 | |
|         self.future = future
 | |
|         self.fn = fn
 | |
|         self.args = args
 | |
|         self.kwargs = kwargs
 | |
| 
 | |
|     def run(self):
 | |
|         if not self.future.set_running_or_notify_cancel():
 | |
|             return
 | |
| 
 | |
|         try:
 | |
|             result = self.fn(*self.args, **self.kwargs)
 | |
|         except BaseException as e:
 | |
|             self.future.set_exception(e)
 | |
|         else:
 | |
|             self.future.set_result(result)
 | |
| 
 | |
| def _worker(executor_reference, work_queue):
 | |
|     try:
 | |
|         while True:
 | |
|             work_item = work_queue.get(block=True)
 | |
|             if work_item is not None:
 | |
|                 work_item.run()
 | |
|                 # Delete references to object. See issue16284
 | |
|                 del work_item
 | |
|                 continue
 | |
|             executor = executor_reference()
 | |
|             # Exit if:
 | |
|             #   - The interpreter is shutting down OR
 | |
|             #   - The executor that owns the worker has been collected OR
 | |
|             #   - The executor that owns the worker has been shutdown.
 | |
|             if _shutdown or executor is None or executor._shutdown:
 | |
|                 # Notice other workers
 | |
|                 work_queue.put(None)
 | |
|                 return
 | |
|             del executor
 | |
|     except BaseException:
 | |
|         _base.LOGGER.critical('Exception in worker', exc_info=True)
 | |
| 
 | |
| class ThreadPoolExecutor(_base.Executor):
 | |
|     def __init__(self, max_workers=None):
 | |
|         """Initializes a new ThreadPoolExecutor instance.
 | |
| 
 | |
|         Args:
 | |
|             max_workers: The maximum number of threads that can be used to
 | |
|                 execute the given calls.
 | |
|         """
 | |
|         if max_workers is None:
 | |
|             # Use this number because ThreadPoolExecutor is often
 | |
|             # used to overlap I/O instead of CPU work.
 | |
|             max_workers = (os.cpu_count() or 1) * 5
 | |
|         if max_workers <= 0:
 | |
|             raise ValueError("max_workers must be greater than 0")
 | |
| 
 | |
|         self._max_workers = max_workers
 | |
|         self._work_queue = queue.Queue()
 | |
|         self._threads = set()
 | |
|         self._shutdown = False
 | |
|         self._shutdown_lock = threading.Lock()
 | |
| 
 | |
|     def submit(self, fn, *args, **kwargs):
 | |
|         with self._shutdown_lock:
 | |
|             if self._shutdown:
 | |
|                 raise RuntimeError('cannot schedule new futures after shutdown')
 | |
| 
 | |
|             f = _base.Future()
 | |
|             w = _WorkItem(f, fn, args, kwargs)
 | |
| 
 | |
|             self._work_queue.put(w)
 | |
|             self._adjust_thread_count()
 | |
|             return f
 | |
|     submit.__doc__ = _base.Executor.submit.__doc__
 | |
| 
 | |
|     def _adjust_thread_count(self):
 | |
|         # When the executor gets lost, the weakref callback will wake up
 | |
|         # the worker threads.
 | |
|         def weakref_cb(_, q=self._work_queue):
 | |
|             q.put(None)
 | |
|         # TODO(bquinlan): Should avoid creating new threads if there are more
 | |
|         # idle threads than items in the work queue.
 | |
|         if len(self._threads) < self._max_workers:
 | |
|             t = threading.Thread(target=_worker,
 | |
|                                  args=(weakref.ref(self, weakref_cb),
 | |
|                                        self._work_queue))
 | |
|             t.daemon = True
 | |
|             t.start()
 | |
|             self._threads.add(t)
 | |
|             _threads_queues[t] = self._work_queue
 | |
| 
 | |
|     def shutdown(self, wait=True):
 | |
|         with self._shutdown_lock:
 | |
|             self._shutdown = True
 | |
|             self._work_queue.put(None)
 | |
|         if wait:
 | |
|             for t in self._threads:
 | |
|                 t.join()
 | |
|     shutdown.__doc__ = _base.Executor.shutdown.__doc__
 | 
