Use WeakSets rather than manual pruning to prevent unbounded growth of dead thread references.

This commit is contained in:
Brian Quinlan 2011-03-20 13:11:11 +11:00
parent 833d91204f
commit 142fad4b6b
2 changed files with 8 additions and 40 deletions

View file

@ -66,29 +66,15 @@ import weakref
# workers to exit when their work queues are empty and then waits until the # workers to exit when their work queues are empty and then waits until the
# threads/processes finish. # threads/processes finish.
_thread_references = set() _live_threads = weakref.WeakSet()
_shutdown = False _shutdown = False
def _python_exit(): def _python_exit():
global _shutdown global _shutdown
_shutdown = True _shutdown = True
for thread_reference in _thread_references: for thread in _live_threads:
thread = thread_reference()
if thread is not None:
thread.join() thread.join()
def _remove_dead_thread_references():
"""Remove inactive threads from _thread_references.
Should be called periodically to prevent memory leaks in scenarios such as:
>>> while True:
>>> ... t = ThreadPoolExecutor(max_workers=5)
>>> ... t.map(int, ['1', '2', '3', '4', '5'])
"""
for thread_reference in set(_thread_references):
if thread_reference() is None:
_thread_references.discard(thread_reference)
# Controls how many more calls than processes will be queued in the call queue. # Controls how many more calls than processes will be queued in the call queue.
# A smaller number will mean that processes spend more time idle waiting for # A smaller number will mean that processes spend more time idle waiting for
# work while a larger number will make Future.cancel() succeed less frequently # work while a larger number will make Future.cancel() succeed less frequently
@ -279,7 +265,6 @@ class ProcessPoolExecutor(_base.Executor):
worker processes will be created as the machine has processors. worker processes will be created as the machine has processors.
""" """
_check_system_limits() _check_system_limits()
_remove_dead_thread_references()
if max_workers is None: if max_workers is None:
self._max_workers = multiprocessing.cpu_count() self._max_workers = multiprocessing.cpu_count()
@ -316,7 +301,7 @@ class ProcessPoolExecutor(_base.Executor):
self._shutdown_process_event)) self._shutdown_process_event))
self._queue_management_thread.daemon = True self._queue_management_thread.daemon = True
self._queue_management_thread.start() self._queue_management_thread.start()
_thread_references.add(weakref.ref(self._queue_management_thread)) _live_threads.add(self._queue_management_thread)
def _adjust_process_count(self): def _adjust_process_count(self):
for _ in range(len(self._processes), self._max_workers): for _ in range(len(self._processes), self._max_workers):

View file

@ -25,29 +25,14 @@ import weakref
# workers to exit when their work queues are empty and then waits until the # workers to exit when their work queues are empty and then waits until the
# threads finish. # threads finish.
_thread_references = set() _live_threads = weakref.WeakSet()
_shutdown = False _shutdown = False
def _python_exit(): def _python_exit():
global _shutdown global _shutdown
_shutdown = True _shutdown = True
for thread_reference in _thread_references: for thread in _live_threads:
thread = thread_reference()
if thread is not None:
thread.join() thread.join()
def _remove_dead_thread_references():
"""Remove inactive threads from _thread_references.
Should be called periodically to prevent memory leaks in scenarios such as:
>>> while True:
... t = ThreadPoolExecutor(max_workers=5)
... t.map(int, ['1', '2', '3', '4', '5'])
"""
for thread_reference in set(_thread_references):
if thread_reference() is None:
_thread_references.discard(thread_reference)
atexit.register(_python_exit) atexit.register(_python_exit)
class _WorkItem(object): class _WorkItem(object):
@ -95,8 +80,6 @@ class ThreadPoolExecutor(_base.Executor):
max_workers: The maximum number of threads that can be used to max_workers: The maximum number of threads that can be used to
execute the given calls. execute the given calls.
""" """
_remove_dead_thread_references()
self._max_workers = max_workers self._max_workers = max_workers
self._work_queue = queue.Queue() self._work_queue = queue.Queue()
self._threads = set() self._threads = set()
@ -125,7 +108,7 @@ class ThreadPoolExecutor(_base.Executor):
t.daemon = True t.daemon = True
t.start() t.start()
self._threads.add(t) self._threads.add(t)
_thread_references.add(weakref.ref(t)) _live_threads.add(t)
def shutdown(self, wait=True): def shutdown(self, wait=True):
with self._shutdown_lock: with self._shutdown_lock: