mirror of
https://github.com/python/cpython.git
synced 2025-08-03 16:39:00 +00:00
Recorded merge of revisions 78508-78509 via svnmerge from
svn+ssh://pythondev@svn.python.org/python/trunk ........ r78508 | florent.xicluna | 2010-02-27 20:20:50 +0100 (sam, 27 fév 2010) | 2 lines Clean test_subprocess: use assertRaises, skipIf, skipUnless helpers and a custom helper assertStderrEqual. ........ r78509 | florent.xicluna | 2010-02-27 22:15:27 +0100 (sam, 27 fév 2010) | 2 lines Fix an oversight in r78508: p.wait() should be compared to 0 ........
This commit is contained in:
parent
09259e2043
commit
b1e94e8af5
1 changed files with 290 additions and 313 deletions
|
@ -21,37 +21,30 @@ if mswindows:
|
|||
else:
|
||||
SETBINARY = ''
|
||||
|
||||
# In a debug build, stuff like "[6580 refs]" is printed to stderr at
|
||||
# shutdown time. That frustrates tests trying to check stderr produced
|
||||
# from a spawned Python process.
|
||||
def remove_stderr_debug_decorations(stderr):
|
||||
return re.sub("\[\d+ refs\]\r?\n?$", "", stderr.decode()).encode()
|
||||
#return re.sub(r"\[\d+ refs\]\r?\n?$", "", stderr)
|
||||
|
||||
try:
|
||||
mkstemp = tempfile.mkstemp
|
||||
except AttributeError:
|
||||
# tempfile.mkstemp is not available
|
||||
def mkstemp():
|
||||
"""Replacement for mkstemp, calling mktemp."""
|
||||
fname = tempfile.mktemp()
|
||||
return os.open(fname, os.O_RDWR|os.O_CREAT), fname
|
||||
|
||||
|
||||
class ProcessTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# Try to minimize the number of children we have so this test
|
||||
# doesn't crash on some buildbots (Alphas in particular).
|
||||
if hasattr(support, "reap_children"):
|
||||
support.reap_children()
|
||||
support.reap_children()
|
||||
|
||||
def tearDown(self):
|
||||
# Try to minimize the number of children we have so this test
|
||||
# doesn't crash on some buildbots (Alphas in particular).
|
||||
if hasattr(support, "reap_children"):
|
||||
support.reap_children()
|
||||
def assertStderrEqual(self, stderr, expected, msg=None):
|
||||
# In a debug build, stuff like "[6580 refs]" is printed to stderr at
|
||||
# shutdown time. That frustrates tests trying to check stderr produced
|
||||
# from a spawned Python process.
|
||||
actual = re.sub("\[\d+ refs\]\r?\n?$", "", stderr.decode()).encode()
|
||||
self.assertEqual(actual, expected, msg)
|
||||
|
||||
def mkstemp(self):
|
||||
"""wrapper for mkstemp, calling mktemp if mkstemp is not available"""
|
||||
if hasattr(tempfile, "mkstemp"):
|
||||
return tempfile.mkstemp()
|
||||
else:
|
||||
fname = tempfile.mktemp()
|
||||
return os.open(fname, os.O_RDWR|os.O_CREAT), fname
|
||||
|
||||
#
|
||||
# Generic tests
|
||||
#
|
||||
def test_call_seq(self):
|
||||
# call() function with sequence argument
|
||||
rc = subprocess.call([sys.executable, "-c",
|
||||
|
@ -66,13 +59,10 @@ class ProcessTestCase(unittest.TestCase):
|
|||
|
||||
def test_check_call_nonzero(self):
|
||||
# check_call() function with non-zero return code
|
||||
try:
|
||||
with self.assertRaises(subprocess.CalledProcessError) as c:
|
||||
subprocess.check_call([sys.executable, "-c",
|
||||
"import sys; sys.exit(47)"])
|
||||
except subprocess.CalledProcessError as e:
|
||||
self.assertEqual(e.returncode, 47)
|
||||
else:
|
||||
self.fail("Expected CalledProcessError")
|
||||
self.assertEqual(c.exception.returncode, 47)
|
||||
|
||||
def test_check_output(self):
|
||||
# check_output() function with zero return code
|
||||
|
@ -82,13 +72,10 @@ class ProcessTestCase(unittest.TestCase):
|
|||
|
||||
def test_check_output_nonzero(self):
|
||||
# check_call() function with non-zero return code
|
||||
try:
|
||||
with self.assertRaises(subprocess.CalledProcessError) as c:
|
||||
subprocess.check_output(
|
||||
[sys.executable, "-c", "import sys; sys.exit(5)"])
|
||||
except subprocess.CalledProcessError as e:
|
||||
self.assertEqual(e.returncode, 5)
|
||||
else:
|
||||
self.fail("Expected CalledProcessError")
|
||||
self.assertEqual(c.exception.returncode, 5)
|
||||
|
||||
def test_check_output_stderr(self):
|
||||
# check_output() function stderr redirected to stdout
|
||||
|
@ -99,14 +86,12 @@ class ProcessTestCase(unittest.TestCase):
|
|||
|
||||
def test_check_output_stdout_arg(self):
|
||||
# check_output() function stderr redirected to stdout
|
||||
try:
|
||||
with self.assertRaises(ValueError) as c:
|
||||
output = subprocess.check_output(
|
||||
[sys.executable, "-c", "print('will not be run')"],
|
||||
stdout=sys.stdout)
|
||||
except ValueError as e:
|
||||
self.assertIn('stdout', e.args[0])
|
||||
else:
|
||||
self.fail("Expected ValueError when stdout arg supplied.")
|
||||
self.assertIn('stdout', c.exception.args[0])
|
||||
|
||||
def test_call_kwargs(self):
|
||||
# call() function with keyword args
|
||||
|
@ -227,8 +212,7 @@ class ProcessTestCase(unittest.TestCase):
|
|||
p = subprocess.Popen([sys.executable, "-c",
|
||||
'import sys; sys.stderr.write("strawberry")'],
|
||||
stderr=subprocess.PIPE)
|
||||
self.assertEqual(remove_stderr_debug_decorations(p.stderr.read()),
|
||||
b"strawberry")
|
||||
self.assertStderrEqual(p.stderr.read(), b"strawberry")
|
||||
|
||||
def test_stderr_filedes(self):
|
||||
# stderr is set to open file descriptor
|
||||
|
@ -239,8 +223,7 @@ class ProcessTestCase(unittest.TestCase):
|
|||
stderr=d)
|
||||
p.wait()
|
||||
os.lseek(d, 0, 0)
|
||||
self.assertEqual(remove_stderr_debug_decorations(os.read(d, 1024)),
|
||||
b"strawberry")
|
||||
self.assertStderrEqual(os.read(d, 1024), b"strawberry")
|
||||
|
||||
def test_stderr_fileobj(self):
|
||||
# stderr is set to open file object
|
||||
|
@ -250,8 +233,7 @@ class ProcessTestCase(unittest.TestCase):
|
|||
stderr=tf)
|
||||
p.wait()
|
||||
tf.seek(0)
|
||||
self.assertEqual(remove_stderr_debug_decorations(tf.read()),
|
||||
b"strawberry")
|
||||
self.assertStderrEqual(tf.read(), b"strawberry")
|
||||
|
||||
def test_stdout_stderr_pipe(self):
|
||||
# capture stdout and stderr to the same pipe
|
||||
|
@ -262,9 +244,7 @@ class ProcessTestCase(unittest.TestCase):
|
|||
'sys.stderr.write("orange")'],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT)
|
||||
output = p.stdout.read()
|
||||
stripped = remove_stderr_debug_decorations(output)
|
||||
self.assertEqual(stripped, b"appleorange")
|
||||
self.assertStderrEqual(p.stdout.read(), b"appleorange")
|
||||
|
||||
def test_stdout_stderr_file(self):
|
||||
# capture stdout and stderr to the same open file
|
||||
|
@ -278,15 +258,13 @@ class ProcessTestCase(unittest.TestCase):
|
|||
stderr=tf)
|
||||
p.wait()
|
||||
tf.seek(0)
|
||||
output = tf.read()
|
||||
stripped = remove_stderr_debug_decorations(output)
|
||||
self.assertEqual(stripped, b"appleorange")
|
||||
self.assertStderrEqual(tf.read(), b"appleorange")
|
||||
|
||||
def test_stdout_filedes_of_stdout(self):
|
||||
# stdout is set to 1 (#1531862).
|
||||
cmd = r"import sys, os; sys.exit(os.write(sys.stdout.fileno(), b'.\n'))"
|
||||
rc = subprocess.call([sys.executable, "-c", cmd], stdout=1)
|
||||
self.assertEquals(rc, 2)
|
||||
self.assertEqual(rc, 2)
|
||||
|
||||
def test_cwd(self):
|
||||
tmpdir = tempfile.gettempdir()
|
||||
|
@ -337,9 +315,7 @@ class ProcessTestCase(unittest.TestCase):
|
|||
stderr=subprocess.PIPE)
|
||||
(stdout, stderr) = p.communicate()
|
||||
self.assertEqual(stdout, None)
|
||||
# When running with a pydebug build, the # of references is outputted
|
||||
# to stderr, so just check if stderr at least started with "pinapple"
|
||||
self.assertEqual(remove_stderr_debug_decorations(stderr), b"pineapple")
|
||||
self.assertStderrEqual(stderr, b"pineapple")
|
||||
|
||||
def test_communicate(self):
|
||||
p = subprocess.Popen([sys.executable, "-c",
|
||||
|
@ -351,24 +327,24 @@ class ProcessTestCase(unittest.TestCase):
|
|||
stderr=subprocess.PIPE)
|
||||
(stdout, stderr) = p.communicate(b"banana")
|
||||
self.assertEqual(stdout, b"banana")
|
||||
self.assertEqual(remove_stderr_debug_decorations(stderr),
|
||||
b"pineapple")
|
||||
self.assertStderrEqual(stderr, b"pineapple")
|
||||
|
||||
# This test is Linux specific for simplicity to at least have
|
||||
# some coverage. It is not a platform specific bug.
|
||||
if os.path.isdir('/proc/%d/fd' % os.getpid()):
|
||||
# Test for the fd leak reported in http://bugs.python.org/issue2791.
|
||||
def test_communicate_pipe_fd_leak(self):
|
||||
fd_directory = '/proc/%d/fd' % os.getpid()
|
||||
num_fds_before_popen = len(os.listdir(fd_directory))
|
||||
p = subprocess.Popen([sys.executable, '-c', 'print()'],
|
||||
stdout=subprocess.PIPE)
|
||||
p.communicate()
|
||||
num_fds_after_communicate = len(os.listdir(fd_directory))
|
||||
del p
|
||||
num_fds_after_destruction = len(os.listdir(fd_directory))
|
||||
self.assertEqual(num_fds_before_popen, num_fds_after_destruction)
|
||||
self.assertEqual(num_fds_before_popen, num_fds_after_communicate)
|
||||
@unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()),
|
||||
"Linux specific")
|
||||
# Test for the fd leak reported in http://bugs.python.org/issue2791.
|
||||
def test_communicate_pipe_fd_leak(self):
|
||||
fd_directory = '/proc/%d/fd' % os.getpid()
|
||||
num_fds_before_popen = len(os.listdir(fd_directory))
|
||||
p = subprocess.Popen([sys.executable, "-c", "print()"],
|
||||
stdout=subprocess.PIPE)
|
||||
p.communicate()
|
||||
num_fds_after_communicate = len(os.listdir(fd_directory))
|
||||
del p
|
||||
num_fds_after_destruction = len(os.listdir(fd_directory))
|
||||
self.assertEqual(num_fds_before_popen, num_fds_after_destruction)
|
||||
self.assertEqual(num_fds_before_popen, num_fds_after_communicate)
|
||||
|
||||
def test_communicate_returns(self):
|
||||
# communicate() should return None if no redirection is active
|
||||
|
@ -412,7 +388,7 @@ class ProcessTestCase(unittest.TestCase):
|
|||
p.stdin.write(b"banana")
|
||||
(stdout, stderr) = p.communicate(b"split")
|
||||
self.assertEqual(stdout, b"bananasplit")
|
||||
self.assertEqual(remove_stderr_debug_decorations(stderr), b"")
|
||||
self.assertStderrEqual(stderr, b"")
|
||||
|
||||
def test_universal_newlines(self):
|
||||
p = subprocess.Popen([sys.executable, "-c",
|
||||
|
@ -503,7 +479,7 @@ class ProcessTestCase(unittest.TestCase):
|
|||
# but, based on system scheduling we can't control, it's possible
|
||||
# poll() never returned None. It "should be" very rare that it
|
||||
# didn't go around at least twice.
|
||||
self.assertTrue(count >= 2)
|
||||
self.assertGreaterEqual(count, 2)
|
||||
# Subsequent invocations should just return the returncode
|
||||
self.assertEqual(p.poll(), 0)
|
||||
|
||||
|
@ -519,12 +495,8 @@ class ProcessTestCase(unittest.TestCase):
|
|||
def test_invalid_bufsize(self):
|
||||
# an invalid type of the bufsize argument should raise
|
||||
# TypeError.
|
||||
try:
|
||||
with self.assertRaises(TypeError):
|
||||
subprocess.Popen([sys.executable, "-c", "pass"], "orange")
|
||||
except TypeError:
|
||||
pass
|
||||
else:
|
||||
self.fail("Expected TypeError")
|
||||
|
||||
def test_bufsize_is_none(self):
|
||||
# bufsize=None should be the same as bufsize=0.
|
||||
|
@ -541,292 +513,297 @@ class ProcessTestCase(unittest.TestCase):
|
|||
# value for that limit, but Windows has 2048, so we loop
|
||||
# 1024 times (each call leaked two fds).
|
||||
for i in range(1024):
|
||||
try:
|
||||
# Windows raises IOError. Others raise OSError.
|
||||
with self.assertRaises(EnvironmentError) as c:
|
||||
subprocess.Popen(['nonexisting_i_hope'],
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.PIPE)
|
||||
# Windows raises IOError
|
||||
except (IOError, OSError) as err:
|
||||
if err.errno != 2: # ignore "no such file"
|
||||
raise
|
||||
if c.exception.errno != 2: # ignore "no such file"
|
||||
raise c.exception
|
||||
|
||||
#
|
||||
# POSIX tests
|
||||
#
|
||||
if not mswindows:
|
||||
def test_exceptions(self):
|
||||
# caught & re-raised exceptions
|
||||
try:
|
||||
p = subprocess.Popen([sys.executable, "-c", ""],
|
||||
cwd="/this/path/does/not/exist")
|
||||
except OSError as e:
|
||||
# The attribute child_traceback should contain "os.chdir"
|
||||
# somewhere.
|
||||
self.assertNotEqual(e.child_traceback.find("os.chdir"), -1)
|
||||
else:
|
||||
self.fail("Expected OSError")
|
||||
|
||||
def _suppress_core_files(self):
|
||||
"""Try to prevent core files from being created.
|
||||
Returns previous ulimit if successful, else None.
|
||||
"""
|
||||
try:
|
||||
import resource
|
||||
old_limit = resource.getrlimit(resource.RLIMIT_CORE)
|
||||
resource.setrlimit(resource.RLIMIT_CORE, (0,0))
|
||||
return old_limit
|
||||
except (ImportError, ValueError, resource.error):
|
||||
return None
|
||||
# context manager
|
||||
class _SuppressCoreFiles(object):
|
||||
"""Try to prevent core files from being created."""
|
||||
old_limit = None
|
||||
|
||||
def _unsuppress_core_files(self, old_limit):
|
||||
"""Return core file behavior to default."""
|
||||
if old_limit is None:
|
||||
return
|
||||
try:
|
||||
import resource
|
||||
resource.setrlimit(resource.RLIMIT_CORE, old_limit)
|
||||
except (ImportError, ValueError, resource.error):
|
||||
return
|
||||
def __enter__(self):
|
||||
"""Try to save previous ulimit, then set it to (0, 0)."""
|
||||
try:
|
||||
import resource
|
||||
self.old_limit = resource.getrlimit(resource.RLIMIT_CORE)
|
||||
resource.setrlimit(resource.RLIMIT_CORE, (0, 0))
|
||||
except (ImportError, ValueError, resource.error):
|
||||
pass
|
||||
|
||||
def test_run_abort(self):
|
||||
# returncode handles signal termination
|
||||
old_limit = self._suppress_core_files()
|
||||
try:
|
||||
p = subprocess.Popen([sys.executable,
|
||||
"-c", "import os; os.abort()"])
|
||||
finally:
|
||||
self._unsuppress_core_files(old_limit)
|
||||
p.wait()
|
||||
self.assertEqual(-p.returncode, signal.SIGABRT)
|
||||
def __exit__(self, *args):
|
||||
"""Return core file behavior to default."""
|
||||
if self.old_limit is None:
|
||||
return
|
||||
try:
|
||||
import resource
|
||||
resource.setrlimit(resource.RLIMIT_CORE, self.old_limit)
|
||||
except (ImportError, ValueError, resource.error):
|
||||
pass
|
||||
|
||||
def test_preexec(self):
|
||||
# preexec function
|
||||
|
||||
@unittest.skipIf(sys.platform == "win32", "POSIX specific tests")
|
||||
class POSIXProcessTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# Try to minimize the number of children we have so this test
|
||||
# doesn't crash on some buildbots (Alphas in particular).
|
||||
support.reap_children()
|
||||
|
||||
def test_exceptions(self):
|
||||
# caught & re-raised exceptions
|
||||
with self.assertRaises(OSError) as c:
|
||||
p = subprocess.Popen([sys.executable, "-c", ""],
|
||||
cwd="/this/path/does/not/exist")
|
||||
# The attribute child_traceback should contain "os.chdir" somewhere.
|
||||
self.assertIn("os.chdir", c.exception.child_traceback)
|
||||
|
||||
def test_run_abort(self):
|
||||
# returncode handles signal termination
|
||||
with _SuppressCoreFiles():
|
||||
p = subprocess.Popen([sys.executable, "-c",
|
||||
'import sys,os;'
|
||||
'sys.stdout.write(os.getenv("FRUIT"))'],
|
||||
stdout=subprocess.PIPE,
|
||||
preexec_fn=lambda: os.putenv("FRUIT",
|
||||
"apple"))
|
||||
self.assertEqual(p.stdout.read(), b"apple")
|
||||
|
||||
def test_args_string(self):
|
||||
# args is a string
|
||||
fd, fname = self.mkstemp()
|
||||
# reopen in text mode
|
||||
with open(fd, "w") as fobj:
|
||||
fobj.write("#!/bin/sh\n")
|
||||
fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" %
|
||||
sys.executable)
|
||||
os.chmod(fname, 0o700)
|
||||
p = subprocess.Popen(fname)
|
||||
'import os; os.abort()'])
|
||||
p.wait()
|
||||
os.remove(fname)
|
||||
self.assertEqual(p.returncode, 47)
|
||||
self.assertEqual(-p.returncode, signal.SIGABRT)
|
||||
|
||||
def test_invalid_args(self):
|
||||
# invalid arguments should raise ValueError
|
||||
self.assertRaises(ValueError, subprocess.call,
|
||||
[sys.executable,
|
||||
"-c", "import sys; sys.exit(47)"],
|
||||
startupinfo=47)
|
||||
self.assertRaises(ValueError, subprocess.call,
|
||||
[sys.executable,
|
||||
"-c", "import sys; sys.exit(47)"],
|
||||
creationflags=47)
|
||||
def test_preexec(self):
|
||||
# preexec function
|
||||
p = subprocess.Popen([sys.executable, "-c",
|
||||
'import sys,os;'
|
||||
'sys.stdout.write(os.getenv("FRUIT"))'],
|
||||
stdout=subprocess.PIPE,
|
||||
preexec_fn=lambda: os.putenv("FRUIT", "apple"))
|
||||
self.assertEqual(p.stdout.read(), b"apple")
|
||||
|
||||
def test_shell_sequence(self):
|
||||
# Run command through the shell (sequence)
|
||||
newenv = os.environ.copy()
|
||||
newenv["FRUIT"] = "apple"
|
||||
p = subprocess.Popen(["echo $FRUIT"], shell=1,
|
||||
stdout=subprocess.PIPE,
|
||||
env=newenv)
|
||||
self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple")
|
||||
def test_args_string(self):
|
||||
# args is a string
|
||||
fd, fname = mkstemp()
|
||||
# reopen in text mode
|
||||
with open(fd, "w") as fobj:
|
||||
fobj.write("#!/bin/sh\n")
|
||||
fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" %
|
||||
sys.executable)
|
||||
os.chmod(fname, 0o700)
|
||||
p = subprocess.Popen(fname)
|
||||
p.wait()
|
||||
os.remove(fname)
|
||||
self.assertEqual(p.returncode, 47)
|
||||
|
||||
def test_shell_string(self):
|
||||
# Run command through the shell (string)
|
||||
newenv = os.environ.copy()
|
||||
newenv["FRUIT"] = "apple"
|
||||
p = subprocess.Popen("echo $FRUIT", shell=1,
|
||||
stdout=subprocess.PIPE,
|
||||
env=newenv)
|
||||
self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple")
|
||||
def test_invalid_args(self):
|
||||
# invalid arguments should raise ValueError
|
||||
self.assertRaises(ValueError, subprocess.call,
|
||||
[sys.executable, "-c",
|
||||
"import sys; sys.exit(47)"],
|
||||
startupinfo=47)
|
||||
self.assertRaises(ValueError, subprocess.call,
|
||||
[sys.executable, "-c",
|
||||
"import sys; sys.exit(47)"],
|
||||
creationflags=47)
|
||||
|
||||
def test_call_string(self):
|
||||
# call() function with string argument on UNIX
|
||||
fd, fname = self.mkstemp()
|
||||
# reopen in text mode
|
||||
with open(fd, "w") as fobj:
|
||||
fobj.write("#!/bin/sh\n")
|
||||
fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" %
|
||||
sys.executable)
|
||||
os.chmod(fname, 0o700)
|
||||
rc = subprocess.call(fname)
|
||||
os.remove(fname)
|
||||
self.assertEqual(rc, 47)
|
||||
def test_shell_sequence(self):
|
||||
# Run command through the shell (sequence)
|
||||
newenv = os.environ.copy()
|
||||
newenv["FRUIT"] = "apple"
|
||||
p = subprocess.Popen(["echo $FRUIT"], shell=1,
|
||||
stdout=subprocess.PIPE,
|
||||
env=newenv)
|
||||
self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple")
|
||||
|
||||
def DISABLED_test_send_signal(self):
|
||||
p = subprocess.Popen([sys.executable,
|
||||
"-c", "input()"])
|
||||
def test_shell_string(self):
|
||||
# Run command through the shell (string)
|
||||
newenv = os.environ.copy()
|
||||
newenv["FRUIT"] = "apple"
|
||||
p = subprocess.Popen("echo $FRUIT", shell=1,
|
||||
stdout=subprocess.PIPE,
|
||||
env=newenv)
|
||||
self.assertEqual(p.stdout.read().strip(b" \t\r\n\f"), b"apple")
|
||||
|
||||
self.assertTrue(p.poll() is None, p.poll())
|
||||
p.send_signal(signal.SIGINT)
|
||||
self.assertNotEqual(p.wait(), 0)
|
||||
def test_call_string(self):
|
||||
# call() function with string argument on UNIX
|
||||
fd, fname = mkstemp()
|
||||
# reopen in text mode
|
||||
with open(fd, "w") as fobj:
|
||||
fobj.write("#!/bin/sh\n")
|
||||
fobj.write("exec '%s' -c 'import sys; sys.exit(47)'\n" %
|
||||
sys.executable)
|
||||
os.chmod(fname, 0o700)
|
||||
rc = subprocess.call(fname)
|
||||
os.remove(fname)
|
||||
self.assertEqual(rc, 47)
|
||||
|
||||
def DISABLED_test_kill(self):
|
||||
p = subprocess.Popen([sys.executable,
|
||||
"-c", "input()"])
|
||||
@unittest.skip("See issue #2777")
|
||||
def test_send_signal(self):
|
||||
p = subprocess.Popen([sys.executable, "-c", "input()"])
|
||||
|
||||
self.assertTrue(p.poll() is None, p.poll())
|
||||
p.kill()
|
||||
self.assertEqual(p.wait(), -signal.SIGKILL)
|
||||
self.assertIs(p.poll(), None)
|
||||
p.send_signal(signal.SIGINT)
|
||||
self.assertIsNot(p.wait(), None)
|
||||
|
||||
def DISABLED_test_terminate(self):
|
||||
p = subprocess.Popen([sys.executable,
|
||||
"-c", "input()"])
|
||||
@unittest.skip("See issue #2777")
|
||||
def test_kill(self):
|
||||
p = subprocess.Popen([sys.executable, "-c", "input()"])
|
||||
|
||||
self.assertTrue(p.poll() is None, p.poll())
|
||||
p.terminate()
|
||||
self.assertEqual(p.wait(), -signal.SIGTERM)
|
||||
self.assertIs(p.poll(), None)
|
||||
p.kill()
|
||||
self.assertEqual(p.wait(), -signal.SIGKILL)
|
||||
|
||||
#
|
||||
# Windows tests
|
||||
#
|
||||
if mswindows:
|
||||
def test_startupinfo(self):
|
||||
# startupinfo argument
|
||||
# We uses hardcoded constants, because we do not want to
|
||||
# depend on win32all.
|
||||
STARTF_USESHOWWINDOW = 1
|
||||
SW_MAXIMIZE = 3
|
||||
startupinfo = subprocess.STARTUPINFO()
|
||||
startupinfo.dwFlags = STARTF_USESHOWWINDOW
|
||||
startupinfo.wShowWindow = SW_MAXIMIZE
|
||||
# Since Python is a console process, it won't be affected
|
||||
# by wShowWindow, but the argument should be silently
|
||||
# ignored
|
||||
subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
|
||||
@unittest.skip("See issue #2777")
|
||||
def test_terminate(self):
|
||||
p = subprocess.Popen([sys.executable, "-c", "input()"])
|
||||
|
||||
self.assertIs(p.poll(), None)
|
||||
p.terminate()
|
||||
self.assertEqual(p.wait(), -signal.SIGTERM)
|
||||
|
||||
|
||||
@unittest.skipUnless(sys.platform == "win32", "Windows specific tests")
|
||||
class Win32ProcessTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
# Try to minimize the number of children we have so this test
|
||||
# doesn't crash on some buildbots (Alphas in particular).
|
||||
support.reap_children()
|
||||
|
||||
def test_startupinfo(self):
|
||||
# startupinfo argument
|
||||
# We uses hardcoded constants, because we do not want to
|
||||
# depend on win32all.
|
||||
STARTF_USESHOWWINDOW = 1
|
||||
SW_MAXIMIZE = 3
|
||||
startupinfo = subprocess.STARTUPINFO()
|
||||
startupinfo.dwFlags = STARTF_USESHOWWINDOW
|
||||
startupinfo.wShowWindow = SW_MAXIMIZE
|
||||
# Since Python is a console process, it won't be affected
|
||||
# by wShowWindow, but the argument should be silently
|
||||
# ignored
|
||||
subprocess.call([sys.executable, "-c", "import sys; sys.exit(0)"],
|
||||
startupinfo=startupinfo)
|
||||
|
||||
def test_creationflags(self):
|
||||
# creationflags argument
|
||||
CREATE_NEW_CONSOLE = 16
|
||||
sys.stderr.write(" a DOS box should flash briefly ...\n")
|
||||
subprocess.call(sys.executable +
|
||||
' -c "import time; time.sleep(0.25)"',
|
||||
creationflags=CREATE_NEW_CONSOLE)
|
||||
def test_creationflags(self):
|
||||
# creationflags argument
|
||||
CREATE_NEW_CONSOLE = 16
|
||||
sys.stderr.write(" a DOS box should flash briefly ...\n")
|
||||
subprocess.call(sys.executable +
|
||||
' -c "import time; time.sleep(0.25)"',
|
||||
creationflags=CREATE_NEW_CONSOLE)
|
||||
|
||||
def test_invalid_args(self):
|
||||
# invalid arguments should raise ValueError
|
||||
self.assertRaises(ValueError, subprocess.call,
|
||||
[sys.executable,
|
||||
"-c", "import sys; sys.exit(47)"],
|
||||
preexec_fn=lambda: 1)
|
||||
self.assertRaises(ValueError, subprocess.call,
|
||||
[sys.executable,
|
||||
"-c", "import sys; sys.exit(47)"],
|
||||
stdout=subprocess.PIPE,
|
||||
def test_invalid_args(self):
|
||||
# invalid arguments should raise ValueError
|
||||
self.assertRaises(ValueError, subprocess.call,
|
||||
[sys.executable, "-c",
|
||||
"import sys; sys.exit(47)"],
|
||||
preexec_fn=lambda: 1)
|
||||
self.assertRaises(ValueError, subprocess.call,
|
||||
[sys.executable, "-c",
|
||||
"import sys; sys.exit(47)"],
|
||||
stdout=subprocess.PIPE,
|
||||
close_fds=True)
|
||||
|
||||
def test_close_fds(self):
|
||||
# close file descriptors
|
||||
rc = subprocess.call([sys.executable, "-c",
|
||||
"import sys; sys.exit(47)"],
|
||||
close_fds=True)
|
||||
self.assertEqual(rc, 47)
|
||||
|
||||
def test_close_fds(self):
|
||||
# close file descriptors
|
||||
rc = subprocess.call([sys.executable, "-c",
|
||||
"import sys; sys.exit(47)"],
|
||||
close_fds=True)
|
||||
self.assertEqual(rc, 47)
|
||||
def test_shell_sequence(self):
|
||||
# Run command through the shell (sequence)
|
||||
newenv = os.environ.copy()
|
||||
newenv["FRUIT"] = "physalis"
|
||||
p = subprocess.Popen(["set"], shell=1,
|
||||
stdout=subprocess.PIPE,
|
||||
env=newenv)
|
||||
self.assertIn(b"physalis", p.stdout.read())
|
||||
|
||||
def test_shell_sequence(self):
|
||||
# Run command through the shell (sequence)
|
||||
newenv = os.environ.copy()
|
||||
newenv["FRUIT"] = "physalis"
|
||||
p = subprocess.Popen(["set"], shell=1,
|
||||
stdout=subprocess.PIPE,
|
||||
env=newenv)
|
||||
self.assertNotEqual(p.stdout.read().find(b"physalis"), -1)
|
||||
def test_shell_string(self):
|
||||
# Run command through the shell (string)
|
||||
newenv = os.environ.copy()
|
||||
newenv["FRUIT"] = "physalis"
|
||||
p = subprocess.Popen("set", shell=1,
|
||||
stdout=subprocess.PIPE,
|
||||
env=newenv)
|
||||
self.assertIn(b"physalis", p.stdout.read())
|
||||
|
||||
def test_shell_string(self):
|
||||
# Run command through the shell (string)
|
||||
newenv = os.environ.copy()
|
||||
newenv["FRUIT"] = "physalis"
|
||||
p = subprocess.Popen("set", shell=1,
|
||||
stdout=subprocess.PIPE,
|
||||
env=newenv)
|
||||
self.assertNotEqual(p.stdout.read().find(b"physalis"), -1)
|
||||
def test_call_string(self):
|
||||
# call() function with string argument on Windows
|
||||
rc = subprocess.call(sys.executable +
|
||||
' -c "import sys; sys.exit(47)"')
|
||||
self.assertEqual(rc, 47)
|
||||
|
||||
def test_call_string(self):
|
||||
# call() function with string argument on Windows
|
||||
rc = subprocess.call(sys.executable +
|
||||
' -c "import sys; sys.exit(47)"')
|
||||
self.assertEqual(rc, 47)
|
||||
@unittest.skip("See issue #2777")
|
||||
def test_send_signal(self):
|
||||
p = subprocess.Popen([sys.executable, "-c", "input()"])
|
||||
|
||||
def DISABLED_test_send_signal(self):
|
||||
p = subprocess.Popen([sys.executable,
|
||||
"-c", "input()"])
|
||||
self.assertIs(p.poll(), None)
|
||||
p.send_signal(signal.SIGTERM)
|
||||
self.assertNotEqual(p.wait(), 0)
|
||||
|
||||
self.assertTrue(p.poll() is None, p.poll())
|
||||
p.send_signal(signal.SIGTERM)
|
||||
self.assertNotEqual(p.wait(), 0)
|
||||
@unittest.skip("See issue #2777")
|
||||
def test_kill(self):
|
||||
p = subprocess.Popen([sys.executable, "-c", "input()"])
|
||||
|
||||
def DISABLED_test_kill(self):
|
||||
p = subprocess.Popen([sys.executable,
|
||||
"-c", "input()"])
|
||||
self.assertIs(p.poll(), None)
|
||||
p.kill()
|
||||
self.assertNotEqual(p.wait(), 0)
|
||||
|
||||
self.assertTrue(p.poll() is None, p.poll())
|
||||
p.kill()
|
||||
self.assertNotEqual(p.wait(), 0)
|
||||
@unittest.skip("See issue #2777")
|
||||
def test_terminate(self):
|
||||
p = subprocess.Popen([sys.executable, "-c", "input()"])
|
||||
|
||||
def DISABLED_test_terminate(self):
|
||||
p = subprocess.Popen([sys.executable,
|
||||
"-c", "input()"])
|
||||
self.assertIs(p.poll(), None)
|
||||
p.terminate()
|
||||
self.assertNotEqual(p.wait(), 0)
|
||||
|
||||
self.assertTrue(p.poll() is None, p.poll())
|
||||
p.terminate()
|
||||
self.assertNotEqual(p.wait(), 0)
|
||||
|
||||
class CommandTests(unittest.TestCase):
|
||||
# The module says:
|
||||
# "NB This only works (and is only relevant) for UNIX."
|
||||
#
|
||||
# Actually, getoutput should work on any platform with an os.popen, but
|
||||
# I'll take the comment as given, and skip this suite.
|
||||
if os.name == 'posix':
|
||||
@unittest.skipUnless(os.name != 'posix', "only relevant for UNIX")
|
||||
class CommandTests(unittest.TestCase):
|
||||
def test_getoutput(self):
|
||||
self.assertEqual(subprocess.getoutput('echo xyzzy'), 'xyzzy')
|
||||
self.assertEqual(subprocess.getstatusoutput('echo xyzzy'),
|
||||
(0, 'xyzzy'))
|
||||
|
||||
def test_getoutput(self):
|
||||
self.assertEquals(subprocess.getoutput('echo xyzzy'), 'xyzzy')
|
||||
self.assertEquals(subprocess.getstatusoutput('echo xyzzy'),
|
||||
(0, 'xyzzy'))
|
||||
# we use mkdtemp in the next line to create an empty directory
|
||||
# under our exclusive control; from that, we can invent a pathname
|
||||
# that we _know_ won't exist. This is guaranteed to fail.
|
||||
dir = None
|
||||
try:
|
||||
dir = tempfile.mkdtemp()
|
||||
name = os.path.join(dir, "foo")
|
||||
|
||||
# we use mkdtemp in the next line to create an empty directory
|
||||
# under our exclusive control; from that, we can invent a pathname
|
||||
# that we _know_ won't exist. This is guaranteed to fail.
|
||||
dir = None
|
||||
try:
|
||||
dir = tempfile.mkdtemp()
|
||||
name = os.path.join(dir, "foo")
|
||||
|
||||
status, output = subprocess.getstatusoutput('cat ' + name)
|
||||
self.assertNotEquals(status, 0)
|
||||
finally:
|
||||
if dir is not None:
|
||||
os.rmdir(dir)
|
||||
status, output = subprocess.getstatusoutput('cat ' + name)
|
||||
self.assertNotEqual(status, 0)
|
||||
finally:
|
||||
if dir is not None:
|
||||
os.rmdir(dir)
|
||||
|
||||
|
||||
unit_tests = [ProcessTestCase, CommandTests]
|
||||
@unittest.skipUnless(getattr(subprocess, '_has_poll', False),
|
||||
"poll system call not supported")
|
||||
class ProcessTestCaseNoPoll(ProcessTestCase):
|
||||
def setUp(self):
|
||||
subprocess._has_poll = False
|
||||
ProcessTestCase.setUp(self)
|
||||
|
||||
if getattr(subprocess, '_has_poll', False):
|
||||
class ProcessTestCaseNoPoll(ProcessTestCase):
|
||||
def setUp(self):
|
||||
subprocess._has_poll = False
|
||||
ProcessTestCase.setUp(self)
|
||||
|
||||
def tearDown(self):
|
||||
subprocess._has_poll = True
|
||||
ProcessTestCase.tearDown(self)
|
||||
|
||||
unit_tests.append(ProcessTestCaseNoPoll)
|
||||
def tearDown(self):
|
||||
subprocess._has_poll = True
|
||||
ProcessTestCase.tearDown(self)
|
||||
|
||||
|
||||
def test_main():
|
||||
unit_tests = (ProcessTestCase,
|
||||
POSIXProcessTestCase,
|
||||
Win32ProcessTestCase,
|
||||
CommandTests,
|
||||
ProcessTestCaseNoPoll)
|
||||
|
||||
support.run_unittest(*unit_tests)
|
||||
support.reap_children()
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue