mirror of
https://github.com/python/cpython.git
synced 2025-09-26 18:29:57 +00:00
bpo-31904: Disable os.popen and popen test cases on VxWorks (GH-21687)
This commit is contained in:
parent
9cc8fa6ac8
commit
e1e3c2dac3
7 changed files with 58 additions and 46 deletions
|
@ -32,7 +32,7 @@ Notes on the availability of these functions:
|
||||||
objects, and result in an object of the same type, if a path or file name is
|
objects, and result in an object of the same type, if a path or file name is
|
||||||
returned.
|
returned.
|
||||||
|
|
||||||
* On VxWorks, os.fork, os.execv and os.spawn*p* are not supported.
|
* On VxWorks, os.popen, os.fork, os.execv and os.spawn*p* are not supported.
|
||||||
|
|
||||||
.. note::
|
.. note::
|
||||||
|
|
||||||
|
|
94
Lib/os.py
94
Lib/os.py
|
@ -36,7 +36,7 @@ _names = sys.builtin_module_names
|
||||||
__all__ = ["altsep", "curdir", "pardir", "sep", "pathsep", "linesep",
|
__all__ = ["altsep", "curdir", "pardir", "sep", "pathsep", "linesep",
|
||||||
"defpath", "name", "path", "devnull", "SEEK_SET", "SEEK_CUR",
|
"defpath", "name", "path", "devnull", "SEEK_SET", "SEEK_CUR",
|
||||||
"SEEK_END", "fsencode", "fsdecode", "get_exec_path", "fdopen",
|
"SEEK_END", "fsencode", "fsdecode", "get_exec_path", "fdopen",
|
||||||
"popen", "extsep"]
|
"extsep"]
|
||||||
|
|
||||||
def _exists(name):
|
def _exists(name):
|
||||||
return name in globals()
|
return name in globals()
|
||||||
|
@ -969,51 +969,55 @@ otherwise return -SIG, where SIG is the signal that killed it. """
|
||||||
|
|
||||||
__all__.extend(["spawnlp", "spawnlpe"])
|
__all__.extend(["spawnlp", "spawnlpe"])
|
||||||
|
|
||||||
|
# VxWorks has no user space shell provided. As a result, running
|
||||||
# Supply os.popen()
|
# command in a shell can't be supported.
|
||||||
def popen(cmd, mode="r", buffering=-1):
|
if sys.platform != 'vxworks':
|
||||||
if not isinstance(cmd, str):
|
# Supply os.popen()
|
||||||
raise TypeError("invalid cmd type (%s, expected string)" % type(cmd))
|
def popen(cmd, mode="r", buffering=-1):
|
||||||
if mode not in ("r", "w"):
|
if not isinstance(cmd, str):
|
||||||
raise ValueError("invalid mode %r" % mode)
|
raise TypeError("invalid cmd type (%s, expected string)" % type(cmd))
|
||||||
if buffering == 0 or buffering is None:
|
if mode not in ("r", "w"):
|
||||||
raise ValueError("popen() does not support unbuffered streams")
|
raise ValueError("invalid mode %r" % mode)
|
||||||
import subprocess, io
|
if buffering == 0 or buffering is None:
|
||||||
if mode == "r":
|
raise ValueError("popen() does not support unbuffered streams")
|
||||||
proc = subprocess.Popen(cmd,
|
import subprocess, io
|
||||||
shell=True,
|
if mode == "r":
|
||||||
stdout=subprocess.PIPE,
|
proc = subprocess.Popen(cmd,
|
||||||
bufsize=buffering)
|
shell=True,
|
||||||
return _wrap_close(io.TextIOWrapper(proc.stdout), proc)
|
stdout=subprocess.PIPE,
|
||||||
else:
|
bufsize=buffering)
|
||||||
proc = subprocess.Popen(cmd,
|
return _wrap_close(io.TextIOWrapper(proc.stdout), proc)
|
||||||
shell=True,
|
|
||||||
stdin=subprocess.PIPE,
|
|
||||||
bufsize=buffering)
|
|
||||||
return _wrap_close(io.TextIOWrapper(proc.stdin), proc)
|
|
||||||
|
|
||||||
# Helper for popen() -- a proxy for a file whose close waits for the process
|
|
||||||
class _wrap_close:
|
|
||||||
def __init__(self, stream, proc):
|
|
||||||
self._stream = stream
|
|
||||||
self._proc = proc
|
|
||||||
def close(self):
|
|
||||||
self._stream.close()
|
|
||||||
returncode = self._proc.wait()
|
|
||||||
if returncode == 0:
|
|
||||||
return None
|
|
||||||
if name == 'nt':
|
|
||||||
return returncode
|
|
||||||
else:
|
else:
|
||||||
return returncode << 8 # Shift left to match old behavior
|
proc = subprocess.Popen(cmd,
|
||||||
def __enter__(self):
|
shell=True,
|
||||||
return self
|
stdin=subprocess.PIPE,
|
||||||
def __exit__(self, *args):
|
bufsize=buffering)
|
||||||
self.close()
|
return _wrap_close(io.TextIOWrapper(proc.stdin), proc)
|
||||||
def __getattr__(self, name):
|
|
||||||
return getattr(self._stream, name)
|
# Helper for popen() -- a proxy for a file whose close waits for the process
|
||||||
def __iter__(self):
|
class _wrap_close:
|
||||||
return iter(self._stream)
|
def __init__(self, stream, proc):
|
||||||
|
self._stream = stream
|
||||||
|
self._proc = proc
|
||||||
|
def close(self):
|
||||||
|
self._stream.close()
|
||||||
|
returncode = self._proc.wait()
|
||||||
|
if returncode == 0:
|
||||||
|
return None
|
||||||
|
if name == 'nt':
|
||||||
|
return returncode
|
||||||
|
else:
|
||||||
|
return returncode << 8 # Shift left to match old behavior
|
||||||
|
def __enter__(self):
|
||||||
|
return self
|
||||||
|
def __exit__(self, *args):
|
||||||
|
self.close()
|
||||||
|
def __getattr__(self, name):
|
||||||
|
return getattr(self._stream, name)
|
||||||
|
def __iter__(self):
|
||||||
|
return iter(self._stream)
|
||||||
|
|
||||||
|
__all__.append("popen")
|
||||||
|
|
||||||
# Supply os.fdopen()
|
# Supply os.fdopen()
|
||||||
def fdopen(fd, *args, **kwargs):
|
def fdopen(fd, *args, **kwargs):
|
||||||
|
|
|
@ -991,6 +991,7 @@ class EnvironTests(mapping_tests.BasicTestMappingProtocol):
|
||||||
# Bug 1110478
|
# Bug 1110478
|
||||||
@unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
|
@unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
|
||||||
'requires a shell')
|
'requires a shell')
|
||||||
|
@unittest.skipUnless(hasattr(os, 'popen'), "needs os.popen()")
|
||||||
def test_update2(self):
|
def test_update2(self):
|
||||||
os.environ.clear()
|
os.environ.clear()
|
||||||
os.environ.update(HELLO="World")
|
os.environ.update(HELLO="World")
|
||||||
|
@ -1000,6 +1001,7 @@ class EnvironTests(mapping_tests.BasicTestMappingProtocol):
|
||||||
|
|
||||||
@unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
|
@unittest.skipUnless(unix_shell and os.path.exists(unix_shell),
|
||||||
'requires a shell')
|
'requires a shell')
|
||||||
|
@unittest.skipUnless(hasattr(os, 'popen'), "needs os.popen()")
|
||||||
def test_os_popen_iter(self):
|
def test_os_popen_iter(self):
|
||||||
with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
|
with os.popen("%s -c 'echo \"line1\nline2\nline3\"'"
|
||||||
% unix_shell) as popen:
|
% unix_shell) as popen:
|
||||||
|
|
|
@ -7,6 +7,9 @@ import unittest
|
||||||
from test import support
|
from test import support
|
||||||
import os, sys
|
import os, sys
|
||||||
|
|
||||||
|
if not hasattr(os, 'popen'):
|
||||||
|
raise unittest.SkipTest("need os.popen()")
|
||||||
|
|
||||||
# Test that command-lines get down as we expect.
|
# Test that command-lines get down as we expect.
|
||||||
# To do this we execute:
|
# To do this we execute:
|
||||||
# python -c "import sys;print(sys.argv)" {rest_of_commandline}
|
# python -c "import sys;print(sys.argv)" {rest_of_commandline}
|
||||||
|
|
|
@ -1045,6 +1045,7 @@ class PosixTester(unittest.TestCase):
|
||||||
|
|
||||||
|
|
||||||
@unittest.skipUnless(hasattr(os, 'getegid'), "test needs os.getegid()")
|
@unittest.skipUnless(hasattr(os, 'getegid'), "test needs os.getegid()")
|
||||||
|
@unittest.skipUnless(hasattr(os, 'popen'), "test needs os.popen()")
|
||||||
def test_getgroups(self):
|
def test_getgroups(self):
|
||||||
with os.popen('id -G 2>/dev/null') as idg:
|
with os.popen('id -G 2>/dev/null') as idg:
|
||||||
groups = idg.read().strip()
|
groups = idg.read().strip()
|
||||||
|
|
|
@ -46,6 +46,7 @@ class SelectTestCase(unittest.TestCase):
|
||||||
self.assertIsNot(r, x)
|
self.assertIsNot(r, x)
|
||||||
self.assertIsNot(w, x)
|
self.assertIsNot(w, x)
|
||||||
|
|
||||||
|
@unittest.skipUnless(hasattr(os, 'popen'), "need os.popen()")
|
||||||
def test_select(self):
|
def test_select(self):
|
||||||
code = textwrap.dedent('''
|
code = textwrap.dedent('''
|
||||||
import time
|
import time
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Disable os.popen and impacted tests on VxWorks
|
Loading…
Add table
Add a link
Reference in a new issue