mirror of
https://github.com/python/cpython.git
synced 2025-07-19 17:25:54 +00:00

Skip test_perf_profiler if Python is built with ASAN, MSAN or UBSAN sanitizer. Python does crash randomly in this test on such build.
358 lines
11 KiB
Python
358 lines
11 KiB
Python
import unittest
|
|
import string
|
|
import subprocess
|
|
import sys
|
|
import sysconfig
|
|
import os
|
|
import pathlib
|
|
from test import support
|
|
from test.support.script_helper import (
|
|
make_script,
|
|
assert_python_failure,
|
|
assert_python_ok,
|
|
)
|
|
from test.support.os_helper import temp_dir
|
|
|
|
|
|
if not support.has_subprocess_support:
|
|
raise unittest.SkipTest("test module requires subprocess")
|
|
|
|
if support.check_sanitizer(address=True, memory=True, ub=True):
|
|
# gh-109580: Skip the test because it does crash randomly if Python is
|
|
# built with ASAN.
|
|
raise unittest.SkipTest("test crash randomly on ASAN/MSAN/UBSAN build")
|
|
|
|
|
|
def supports_trampoline_profiling():
|
|
perf_trampoline = sysconfig.get_config_var("PY_HAVE_PERF_TRAMPOLINE")
|
|
if not perf_trampoline:
|
|
return False
|
|
return int(perf_trampoline) == 1
|
|
|
|
|
|
if not supports_trampoline_profiling():
|
|
raise unittest.SkipTest("perf trampoline profiling not supported")
|
|
|
|
|
|
class TestPerfTrampoline(unittest.TestCase):
|
|
def setUp(self):
|
|
super().setUp()
|
|
self.perf_files = set(pathlib.Path("/tmp/").glob("perf-*.map"))
|
|
|
|
def tearDown(self) -> None:
|
|
super().tearDown()
|
|
files_to_delete = (
|
|
set(pathlib.Path("/tmp/").glob("perf-*.map")) - self.perf_files
|
|
)
|
|
for file in files_to_delete:
|
|
file.unlink()
|
|
|
|
def test_trampoline_works(self):
|
|
code = """if 1:
|
|
def foo():
|
|
pass
|
|
|
|
def bar():
|
|
foo()
|
|
|
|
def baz():
|
|
bar()
|
|
|
|
baz()
|
|
"""
|
|
with temp_dir() as script_dir:
|
|
script = make_script(script_dir, "perftest", code)
|
|
with subprocess.Popen(
|
|
[sys.executable, "-Xperf", script],
|
|
text=True,
|
|
stderr=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
) as process:
|
|
stdout, stderr = process.communicate()
|
|
|
|
self.assertEqual(stderr, "")
|
|
self.assertEqual(stdout, "")
|
|
|
|
perf_file = pathlib.Path(f"/tmp/perf-{process.pid}.map")
|
|
self.assertTrue(perf_file.exists())
|
|
perf_file_contents = perf_file.read_text()
|
|
perf_lines = perf_file_contents.splitlines();
|
|
expected_symbols = [f"py::foo:{script}", f"py::bar:{script}", f"py::baz:{script}"]
|
|
for expected_symbol in expected_symbols:
|
|
perf_line = next((line for line in perf_lines if expected_symbol in line), None)
|
|
self.assertIsNotNone(perf_line, f"Could not find {expected_symbol} in perf file")
|
|
perf_addr = perf_line.split(" ")[0]
|
|
self.assertFalse(perf_addr.startswith("0x"), "Address should not be prefixed with 0x")
|
|
self.assertTrue(set(perf_addr).issubset(string.hexdigits), "Address should contain only hex characters")
|
|
|
|
def test_trampoline_works_with_forks(self):
|
|
code = """if 1:
|
|
import os, sys
|
|
|
|
def foo_fork():
|
|
pass
|
|
|
|
def bar_fork():
|
|
foo_fork()
|
|
|
|
def baz_fork():
|
|
bar_fork()
|
|
|
|
def foo():
|
|
pid = os.fork()
|
|
if pid == 0:
|
|
print(os.getpid())
|
|
baz_fork()
|
|
else:
|
|
_, status = os.waitpid(-1, 0)
|
|
sys.exit(status)
|
|
|
|
def bar():
|
|
foo()
|
|
|
|
def baz():
|
|
bar()
|
|
|
|
baz()
|
|
"""
|
|
with temp_dir() as script_dir:
|
|
script = make_script(script_dir, "perftest", code)
|
|
with subprocess.Popen(
|
|
[sys.executable, "-Xperf", script],
|
|
text=True,
|
|
stderr=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
) as process:
|
|
stdout, stderr = process.communicate()
|
|
|
|
self.assertEqual(process.returncode, 0)
|
|
self.assertEqual(stderr, "")
|
|
child_pid = int(stdout.strip())
|
|
perf_file = pathlib.Path(f"/tmp/perf-{process.pid}.map")
|
|
perf_child_file = pathlib.Path(f"/tmp/perf-{child_pid}.map")
|
|
self.assertTrue(perf_file.exists())
|
|
self.assertTrue(perf_child_file.exists())
|
|
|
|
perf_file_contents = perf_file.read_text()
|
|
self.assertIn(f"py::foo:{script}", perf_file_contents)
|
|
self.assertIn(f"py::bar:{script}", perf_file_contents)
|
|
self.assertIn(f"py::baz:{script}", perf_file_contents)
|
|
|
|
child_perf_file_contents = perf_child_file.read_text()
|
|
self.assertIn(f"py::foo_fork:{script}", child_perf_file_contents)
|
|
self.assertIn(f"py::bar_fork:{script}", child_perf_file_contents)
|
|
self.assertIn(f"py::baz_fork:{script}", child_perf_file_contents)
|
|
|
|
def test_sys_api(self):
|
|
code = """if 1:
|
|
import sys
|
|
def foo():
|
|
pass
|
|
|
|
def spam():
|
|
pass
|
|
|
|
def bar():
|
|
sys.deactivate_stack_trampoline()
|
|
foo()
|
|
sys.activate_stack_trampoline("perf")
|
|
spam()
|
|
|
|
def baz():
|
|
bar()
|
|
|
|
sys.activate_stack_trampoline("perf")
|
|
baz()
|
|
"""
|
|
with temp_dir() as script_dir:
|
|
script = make_script(script_dir, "perftest", code)
|
|
with subprocess.Popen(
|
|
[sys.executable, script],
|
|
text=True,
|
|
stderr=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
) as process:
|
|
stdout, stderr = process.communicate()
|
|
|
|
self.assertEqual(stderr, "")
|
|
self.assertEqual(stdout, "")
|
|
|
|
perf_file = pathlib.Path(f"/tmp/perf-{process.pid}.map")
|
|
self.assertTrue(perf_file.exists())
|
|
perf_file_contents = perf_file.read_text()
|
|
self.assertNotIn(f"py::foo:{script}", perf_file_contents)
|
|
self.assertIn(f"py::spam:{script}", perf_file_contents)
|
|
self.assertIn(f"py::bar:{script}", perf_file_contents)
|
|
self.assertIn(f"py::baz:{script}", perf_file_contents)
|
|
|
|
def test_sys_api_with_existing_trampoline(self):
|
|
code = """if 1:
|
|
import sys
|
|
sys.activate_stack_trampoline("perf")
|
|
sys.activate_stack_trampoline("perf")
|
|
"""
|
|
assert_python_ok("-c", code)
|
|
|
|
def test_sys_api_with_invalid_trampoline(self):
|
|
code = """if 1:
|
|
import sys
|
|
sys.activate_stack_trampoline("invalid")
|
|
"""
|
|
rc, out, err = assert_python_failure("-c", code)
|
|
self.assertIn("invalid backend: invalid", err.decode())
|
|
|
|
def test_sys_api_get_status(self):
|
|
code = """if 1:
|
|
import sys
|
|
sys.activate_stack_trampoline("perf")
|
|
assert sys.is_stack_trampoline_active() is True
|
|
sys.deactivate_stack_trampoline()
|
|
assert sys.is_stack_trampoline_active() is False
|
|
"""
|
|
assert_python_ok("-c", code)
|
|
|
|
|
|
def is_unwinding_reliable():
|
|
cflags = sysconfig.get_config_var("PY_CORE_CFLAGS")
|
|
if not cflags:
|
|
return False
|
|
return "no-omit-frame-pointer" in cflags
|
|
|
|
|
|
def perf_command_works():
|
|
try:
|
|
cmd = ["perf", "--help"]
|
|
stdout = subprocess.check_output(cmd, text=True)
|
|
except (subprocess.SubprocessError, OSError):
|
|
return False
|
|
|
|
# perf version does not return a version number on Fedora. Use presence
|
|
# of "perf.data" in help as indicator that it's perf from Linux tools.
|
|
if "perf.data" not in stdout:
|
|
return False
|
|
|
|
# Check that we can run a simple perf run
|
|
with temp_dir() as script_dir:
|
|
try:
|
|
output_file = script_dir + "/perf_output.perf"
|
|
cmd = (
|
|
"perf",
|
|
"record",
|
|
"-g",
|
|
"--call-graph=fp",
|
|
"-o",
|
|
output_file,
|
|
"--",
|
|
sys.executable,
|
|
"-c",
|
|
'print("hello")',
|
|
)
|
|
stdout = subprocess.check_output(
|
|
cmd, cwd=script_dir, text=True, stderr=subprocess.STDOUT
|
|
)
|
|
except (subprocess.SubprocessError, OSError):
|
|
return False
|
|
|
|
if "hello" not in stdout:
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
def run_perf(cwd, *args, **env_vars):
|
|
if env_vars:
|
|
env = os.environ.copy()
|
|
env.update(env_vars)
|
|
else:
|
|
env = None
|
|
output_file = cwd + "/perf_output.perf"
|
|
base_cmd = ("perf", "record", "-g", "--call-graph=fp", "-o", output_file, "--")
|
|
proc = subprocess.run(
|
|
base_cmd + args,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
env=env,
|
|
)
|
|
if proc.returncode:
|
|
print(proc.stderr)
|
|
raise ValueError(f"Perf failed with return code {proc.returncode}")
|
|
|
|
base_cmd = ("perf", "script")
|
|
proc = subprocess.run(
|
|
("perf", "script", "-i", output_file),
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
env=env,
|
|
check=True,
|
|
)
|
|
return proc.stdout.decode("utf-8", "replace"), proc.stderr.decode(
|
|
"utf-8", "replace"
|
|
)
|
|
|
|
|
|
@unittest.skipUnless(perf_command_works(), "perf command doesn't work")
|
|
@unittest.skipUnless(is_unwinding_reliable(), "Unwinding is unreliable")
|
|
class TestPerfProfiler(unittest.TestCase):
|
|
def setUp(self):
|
|
super().setUp()
|
|
self.perf_files = set(pathlib.Path("/tmp/").glob("perf-*.map"))
|
|
|
|
def tearDown(self) -> None:
|
|
super().tearDown()
|
|
files_to_delete = (
|
|
set(pathlib.Path("/tmp/").glob("perf-*.map")) - self.perf_files
|
|
)
|
|
for file in files_to_delete:
|
|
file.unlink()
|
|
|
|
def test_python_calls_appear_in_the_stack_if_perf_activated(self):
|
|
with temp_dir() as script_dir:
|
|
code = """if 1:
|
|
def foo(n):
|
|
x = 0
|
|
for i in range(n):
|
|
x += i
|
|
|
|
def bar(n):
|
|
foo(n)
|
|
|
|
def baz(n):
|
|
bar(n)
|
|
|
|
baz(10000000)
|
|
"""
|
|
script = make_script(script_dir, "perftest", code)
|
|
stdout, stderr = run_perf(script_dir, sys.executable, "-Xperf", script)
|
|
self.assertEqual(stderr, "")
|
|
|
|
self.assertIn(f"py::foo:{script}", stdout)
|
|
self.assertIn(f"py::bar:{script}", stdout)
|
|
self.assertIn(f"py::baz:{script}", stdout)
|
|
|
|
def test_python_calls_do_not_appear_in_the_stack_if_perf_activated(self):
|
|
with temp_dir() as script_dir:
|
|
code = """if 1:
|
|
def foo(n):
|
|
x = 0
|
|
for i in range(n):
|
|
x += i
|
|
|
|
def bar(n):
|
|
foo(n)
|
|
|
|
def baz(n):
|
|
bar(n)
|
|
|
|
baz(10000000)
|
|
"""
|
|
script = make_script(script_dir, "perftest", code)
|
|
stdout, stderr = run_perf(script_dir, sys.executable, script)
|
|
self.assertEqual(stderr, "")
|
|
|
|
self.assertNotIn(f"py::foo:{script}", stdout)
|
|
self.assertNotIn(f"py::bar:{script}", stdout)
|
|
self.assertNotIn(f"py::baz:{script}", stdout)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|