merge heads

This commit is contained in:
Benjamin Peterson 2011-07-15 21:10:44 -05:00
commit 7dc35f6fea
10 changed files with 88 additions and 48 deletions

View file

@ -205,6 +205,8 @@ def _queue_management_worker(executor_reference,
nb_children_alive = sum(p.is_alive() for p in processes.values()) nb_children_alive = sum(p.is_alive() for p in processes.values())
for i in range(0, nb_children_alive): for i in range(0, nb_children_alive):
call_queue.put_nowait(None) call_queue.put_nowait(None)
# Release the queue's resources as soon as possible.
call_queue.close()
# If .join() is not called on the created processes then # If .join() is not called on the created processes then
# some multiprocessing.Queue methods may deadlock on Mac OS X. # some multiprocessing.Queue methods may deadlock on Mac OS X.
for p in processes.values(): for p in processes.values():
@ -239,14 +241,14 @@ def _queue_management_worker(executor_reference,
# locks may be in a dirty state and block forever. # locks may be in a dirty state and block forever.
for p in processes.values(): for p in processes.values():
p.terminate() p.terminate()
for p in processes.values(): shutdown_worker()
p.join()
return return
if isinstance(result_item, int): if isinstance(result_item, int):
# Clean shutdown of a worker using its PID # Clean shutdown of a worker using its PID
# (avoids marking the executor broken) # (avoids marking the executor broken)
assert shutting_down() assert shutting_down()
del processes[result_item] p = processes.pop(result_item)
p.join()
if not processes: if not processes:
shutdown_worker() shutdown_worker()
return return
@ -334,6 +336,10 @@ class ProcessPoolExecutor(_base.Executor):
# because futures in the call queue cannot be cancelled. # because futures in the call queue cannot be cancelled.
self._call_queue = multiprocessing.Queue(self._max_workers + self._call_queue = multiprocessing.Queue(self._max_workers +
EXTRA_QUEUED_CALLS) EXTRA_QUEUED_CALLS)
# Killed worker processes can produce spurious "broken pipe"
# tracebacks in the queue's own worker thread. But we detect killed
# processes anyway, so silence the tracebacks.
self._call_queue._ignore_epipe = True
self._result_queue = SimpleQueue() self._result_queue = SimpleQueue()
self._work_ids = queue.Queue() self._work_ids = queue.Queue()
self._queue_management_thread = None self._queue_management_thread = None

View file

@ -41,6 +41,7 @@ import collections
import time import time
import atexit import atexit
import weakref import weakref
import errno
from queue import Empty, Full from queue import Empty, Full
import _multiprocessing import _multiprocessing
@ -67,6 +68,8 @@ class Queue(object):
else: else:
self._wlock = Lock() self._wlock = Lock()
self._sem = BoundedSemaphore(maxsize) self._sem = BoundedSemaphore(maxsize)
# For use by concurrent.futures
self._ignore_epipe = False
self._after_fork() self._after_fork()
@ -178,7 +181,7 @@ class Queue(object):
self._thread = threading.Thread( self._thread = threading.Thread(
target=Queue._feed, target=Queue._feed,
args=(self._buffer, self._notempty, self._send, args=(self._buffer, self._notempty, self._send,
self._wlock, self._writer.close), self._wlock, self._writer.close, self._ignore_epipe),
name='QueueFeederThread' name='QueueFeederThread'
) )
self._thread.daemon = True self._thread.daemon = True
@ -229,7 +232,7 @@ class Queue(object):
notempty.release() notempty.release()
@staticmethod @staticmethod
def _feed(buffer, notempty, send, writelock, close): def _feed(buffer, notempty, send, writelock, close, ignore_epipe):
debug('starting thread to feed data to pipe') debug('starting thread to feed data to pipe')
from .util import is_exiting from .util import is_exiting
@ -271,6 +274,8 @@ class Queue(object):
except IndexError: except IndexError:
pass pass
except Exception as e: except Exception as e:
if ignore_epipe and getattr(e, 'errno', 0) == errno.EPIPE:
return
# Since this runs in a daemon thread the resources it uses # Since this runs in a daemon thread the resources it uses
# may be become unusable while the process is cleaning up. # may be become unusable while the process is cleaning up.
# We ignore errors which happen after the process has # We ignore errors which happen after the process has

View file

@ -5,15 +5,18 @@
import os import os
import sys import sys
import unittest import unittest
from test.support import run_unittest, reap_children from test.support import run_unittest, reap_children, reap_threads
@reap_threads
def test_main(): def test_main():
start_dir = os.path.dirname(__file__) try:
top_dir = os.path.dirname(os.path.dirname(start_dir)) start_dir = os.path.dirname(__file__)
test_loader = unittest.TestLoader() top_dir = os.path.dirname(os.path.dirname(start_dir))
run_unittest(test_loader.discover(start_dir, top_level_dir=top_dir)) test_loader = unittest.TestLoader()
reap_children() run_unittest(test_loader.discover(start_dir, top_level_dir=top_dir))
finally:
reap_children()
if __name__ == '__main__': if __name__ == '__main__':

View file

@ -24,9 +24,15 @@ import sysconfig
import logging.handlers import logging.handlers
try: try:
import _thread import _thread, threading
except ImportError: except ImportError:
_thread = None _thread = None
threading = None
try:
import multiprocessing.process
except ImportError:
multiprocessing = None
try: try:
import zlib import zlib
@ -1358,19 +1364,20 @@ def modules_cleanup(oldmodules):
def threading_setup(): def threading_setup():
if _thread: if _thread:
return _thread._count(), return _thread._count(), threading._dangling.copy()
else: else:
return 1, return 1, ()
def threading_cleanup(nb_threads): def threading_cleanup(*original_values):
if not _thread: if not _thread:
return return
_MAX_COUNT = 10 _MAX_COUNT = 10
for count in range(_MAX_COUNT): for count in range(_MAX_COUNT):
n = _thread._count() values = _thread._count(), threading._dangling
if n == nb_threads: if values == original_values:
break break
time.sleep(0.1) time.sleep(0.1)
gc_collect()
# XXX print a warning in case of failure? # XXX print a warning in case of failure?
def reap_threads(func): def reap_threads(func):

View file

@ -634,7 +634,8 @@ def test_main():
ThreadPoolAsCompletedTests, ThreadPoolAsCompletedTests,
FutureTests, FutureTests,
ProcessPoolShutdownTest, ProcessPoolShutdownTest,
ThreadPoolShutdownTest) ThreadPoolShutdownTest,
)
finally: finally:
test.support.reap_children() test.support.reap_children()

View file

@ -1506,6 +1506,7 @@ class TestSendfile(unittest.TestCase):
raise raise
@support.reap_threads
def test_main(): def test_main():
support.run_unittest( support.run_unittest(
FileTests, FileTests,

View file

@ -15,9 +15,12 @@ import textwrap
from io import StringIO from io import StringIO
from collections import namedtuple from collections import namedtuple
from contextlib import contextmanager from contextlib import contextmanager
from test.support import TESTFN, forget, rmtree, EnvironmentVarGuard, \
reap_children, captured_output, captured_stdout, unlink
from test.script_helper import assert_python_ok
from test.support import (
TESTFN, forget, rmtree, EnvironmentVarGuard,
reap_children, reap_threads, captured_output, captured_stdout, unlink
)
from test import pydoc_mod from test import pydoc_mod
try: try:
@ -199,17 +202,14 @@ missing_pattern = "no Python documentation found for '%s'"
# output pattern for module with bad imports # output pattern for module with bad imports
badimport_pattern = "problem in %s - ImportError: No module named %r" badimport_pattern = "problem in %s - ImportError: No module named %r"
def run_pydoc(module_name, *args): def run_pydoc(module_name, *args, **env):
""" """
Runs pydoc on the specified module. Returns the stripped Runs pydoc on the specified module. Returns the stripped
output of pydoc. output of pydoc.
""" """
cmd = [sys.executable, pydoc.__file__, " ".join(args), module_name] args = args + (module_name,)
try: rc, out, err = assert_python_ok(pydoc.__file__, *args, **env)
output = subprocess.Popen(cmd, stdout=subprocess.PIPE).communicate()[0] return out.strip()
return output.strip()
finally:
reap_children()
def get_pydoc_html(module): def get_pydoc_html(module):
"Returns pydoc generated output as html" "Returns pydoc generated output as html"
@ -312,19 +312,20 @@ class PydocDocTest(unittest.TestCase):
def newdirinpath(dir): def newdirinpath(dir):
os.mkdir(dir) os.mkdir(dir)
sys.path.insert(0, dir) sys.path.insert(0, dir)
yield try:
sys.path.pop(0) yield
rmtree(dir) finally:
sys.path.pop(0)
rmtree(dir)
with newdirinpath(TESTFN), EnvironmentVarGuard() as env: with newdirinpath(TESTFN):
env['PYTHONPATH'] = TESTFN
fullmodname = os.path.join(TESTFN, modname) fullmodname = os.path.join(TESTFN, modname)
sourcefn = fullmodname + os.extsep + "py" sourcefn = fullmodname + os.extsep + "py"
for importstring, expectedinmsg in testpairs: for importstring, expectedinmsg in testpairs:
with open(sourcefn, 'w') as f: with open(sourcefn, 'w') as f:
f.write("import {}\n".format(importstring)) f.write("import {}\n".format(importstring))
try: try:
result = run_pydoc(modname).decode("ascii") result = run_pydoc(modname, PYTHONPATH=TESTFN).decode("ascii")
finally: finally:
forget(modname) forget(modname)
expected = badimport_pattern % (modname, expectedinmsg) expected = badimport_pattern % (modname, expectedinmsg)
@ -494,13 +495,17 @@ class TestHelper(unittest.TestCase):
self.assertEqual(sorted(pydoc.Helper.keywords), self.assertEqual(sorted(pydoc.Helper.keywords),
sorted(keyword.kwlist)) sorted(keyword.kwlist))
@reap_threads
def test_main(): def test_main():
test.support.run_unittest(PydocDocTest, try:
TestDescriptions, test.support.run_unittest(PydocDocTest,
PydocServerTest, TestDescriptions,
PydocUrlHandlerTest, PydocServerTest,
TestHelper, PydocUrlHandlerTest,
) TestHelper,
)
finally:
reap_children()
if __name__ == "__main__": if __name__ == "__main__":
test_main() test_main()

View file

@ -11,8 +11,8 @@ import sys
import time import time
import shutil import shutil
import unittest import unittest
from test.support import verbose, import_module, run_unittest, TESTFN from test.support import (
thread = import_module('_thread') verbose, import_module, run_unittest, TESTFN, reap_threads)
threading = import_module('threading') threading = import_module('threading')
def task(N, done, done_tasks, errors): def task(N, done, done_tasks, errors):
@ -62,7 +62,7 @@ class Finder:
def __init__(self): def __init__(self):
self.numcalls = 0 self.numcalls = 0
self.x = 0 self.x = 0
self.lock = thread.allocate_lock() self.lock = threading.Lock()
def find_module(self, name, path=None): def find_module(self, name, path=None):
# Simulate some thread-unsafe behaviour. If calls to find_module() # Simulate some thread-unsafe behaviour. If calls to find_module()
@ -113,7 +113,9 @@ class ThreadedImportTests(unittest.TestCase):
done_tasks = [] done_tasks = []
done.clear() done.clear()
for i in range(N): for i in range(N):
thread.start_new_thread(task, (N, done, done_tasks, errors,)) t = threading.Thread(target=task,
args=(N, done, done_tasks, errors,))
t.start()
done.wait(60) done.wait(60)
self.assertFalse(errors) self.assertFalse(errors)
if verbose: if verbose:
@ -203,6 +205,7 @@ class ThreadedImportTests(unittest.TestCase):
self.assertEqual(set(results), {'a', 'b'}) self.assertEqual(set(results), {'a', 'b'})
@reap_threads
def test_main(): def test_main():
run_unittest(ThreadedImportTests) run_unittest(ThreadedImportTests)

View file

@ -35,8 +35,11 @@ for name, func, args in [
("os.path.abspath", os.path.abspath, ('.',)), ("os.path.abspath", os.path.abspath, ('.',)),
]: ]:
t = Worker(func, args) try:
t.start() t = Worker(func, args)
t.join(TIMEOUT) t.start()
if t.is_alive(): t.join(TIMEOUT)
errors.append("%s appeared to hang" % name) if t.is_alive():
errors.append("%s appeared to hang" % name)
finally:
del t

View file

@ -231,6 +231,12 @@ Core and Builtins
Library Library
------- -------
- Silence spurious "broken pipe" tracebacks when shutting down a
ProcessPoolExecutor.
- Fix potential resource leaks in concurrent.futures.ProcessPoolExecutor
by joining all queues and processes when shutdown() is called.
- Issue #11603: Fix a crash when __str__ is rebound as __repr__. Patch by - Issue #11603: Fix a crash when __str__ is rebound as __repr__. Patch by
Andreas Stührk. Andreas Stührk.