Issue #27664: Add to concurrent.futures.thread.ThreadPoolExecutor()

the ability to specify a thread name prefix.
This commit is contained in:
Gregory P. Smith 2016-08-07 10:19:20 -07:00
parent d0d24fd1ae
commit 50abe877ee
4 changed files with 40 additions and 4 deletions

View file

@ -124,7 +124,7 @@ And::
executor.submit(wait_on_future) executor.submit(wait_on_future)
.. class:: ThreadPoolExecutor(max_workers=None) .. class:: ThreadPoolExecutor(max_workers=None, thread_name_prefix='')
An :class:`Executor` subclass that uses a pool of at most *max_workers* An :class:`Executor` subclass that uses a pool of at most *max_workers*
threads to execute calls asynchronously. threads to execute calls asynchronously.
@ -137,6 +137,10 @@ And::
should be higher than the number of workers should be higher than the number of workers
for :class:`ProcessPoolExecutor`. for :class:`ProcessPoolExecutor`.
.. versionadded:: 3.6
The *thread_name_prefix* argument was added to allow users to
control the threading.Thread names for worker threads created by
the pool for easier debugging.
.. _threadpoolexecutor-example: .. _threadpoolexecutor-example:

View file

@ -81,12 +81,13 @@ def _worker(executor_reference, work_queue):
_base.LOGGER.critical('Exception in worker', exc_info=True) _base.LOGGER.critical('Exception in worker', exc_info=True)
class ThreadPoolExecutor(_base.Executor): class ThreadPoolExecutor(_base.Executor):
def __init__(self, max_workers=None): def __init__(self, max_workers=None, thread_name_prefix=''):
"""Initializes a new ThreadPoolExecutor instance. """Initializes a new ThreadPoolExecutor instance.
Args: Args:
max_workers: The maximum number of threads that can be used to max_workers: The maximum number of threads that can be used to
execute the given calls. execute the given calls.
thread_name_prefix: An optional name prefix to give our threads.
""" """
if max_workers is None: if max_workers is None:
# Use this number because ThreadPoolExecutor is often # Use this number because ThreadPoolExecutor is often
@ -100,6 +101,7 @@ class ThreadPoolExecutor(_base.Executor):
self._threads = set() self._threads = set()
self._shutdown = False self._shutdown = False
self._shutdown_lock = threading.Lock() self._shutdown_lock = threading.Lock()
self._thread_name_prefix = thread_name_prefix
def submit(self, fn, *args, **kwargs): def submit(self, fn, *args, **kwargs):
with self._shutdown_lock: with self._shutdown_lock:
@ -121,8 +123,11 @@ class ThreadPoolExecutor(_base.Executor):
q.put(None) q.put(None)
# TODO(bquinlan): Should avoid creating new threads if there are more # TODO(bquinlan): Should avoid creating new threads if there are more
# idle threads than items in the work queue. # idle threads than items in the work queue.
if len(self._threads) < self._max_workers: num_threads = len(self._threads)
t = threading.Thread(target=_worker, if num_threads < self._max_workers:
thread_name = '%s_%d' % (self._thread_name_prefix or self,
num_threads)
t = threading.Thread(name=thread_name, target=_worker,
args=(weakref.ref(self, weakref_cb), args=(weakref.ref(self, weakref_cb),
self._work_queue)) self._work_queue))
t.daemon = True t.daemon = True

View file

@ -154,6 +154,30 @@ class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, unittest.Tes
for t in threads: for t in threads:
t.join() t.join()
def test_thread_names_assigned(self):
executor = futures.ThreadPoolExecutor(
max_workers=5, thread_name_prefix='SpecialPool')
executor.map(abs, range(-5, 5))
threads = executor._threads
del executor
for t in threads:
self.assertRegex(t.name, r'^SpecialPool_[0-4]$')
t.join()
def test_thread_names_default(self):
executor = futures.ThreadPoolExecutor(max_workers=5)
executor.map(abs, range(-5, 5))
threads = executor._threads
del executor
for t in threads:
# We don't particularly care what the default name is, just that
# it has a default name implying that it is a ThreadPoolExecutor
# followed by what looks like a thread number.
self.assertRegex(t.name, r'^.*ThreadPoolExecutor.*_[0-4]$')
t.join()
class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest, unittest.TestCase): class ProcessPoolShutdownTest(ProcessPoolMixin, ExecutorShutdownTest, unittest.TestCase):
def _prime_executor(self): def _prime_executor(self):

View file

@ -43,6 +43,9 @@ Core and Builtins
Library Library
------- -------
- Issue #27664: Add to concurrent.futures.thread.ThreadPoolExecutor()
the ability to specify a thread name prefix.
- Issue #26750: unittest.mock.create_autospec() now works properly for - Issue #26750: unittest.mock.create_autospec() now works properly for
subclasses of property() and other data descriptors. Removes the never subclasses of property() and other data descriptors. Removes the never
publicly used, never documented unittest.mock.DescriptorTypes tuple. publicly used, never documented unittest.mock.DescriptorTypes tuple.