mirror of
				https://github.com/python/cpython.git
				synced 2025-10-24 15:36:26 +00:00 
			
		
		
		
	
		
			
				
	
	
		
			84 lines
		
	
	
	
		
			3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			84 lines
		
	
	
	
		
			3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import unittest
 | |
| import os
 | |
| import textwrap
 | |
| import importlib
 | |
| import sys
 | |
| from test.support import os_helper, SHORT_TIMEOUT
 | |
| from test.support.script_helper import make_script
 | |
| 
 | |
| import subprocess
 | |
| 
 | |
| PROCESS_VM_READV_SUPPORTED = False
 | |
| 
 | |
| try:
 | |
|     from _testexternalinspection import PROCESS_VM_READV_SUPPORTED
 | |
|     from _testexternalinspection import get_stack_trace
 | |
| except ImportError:
 | |
|     raise unittest.SkipTest("Test only runs when _testexternalinspection is available")
 | |
| 
 | |
| def _make_test_script(script_dir, script_basename, source):
 | |
|     to_return = make_script(script_dir, script_basename, source)
 | |
|     importlib.invalidate_caches()
 | |
|     return to_return
 | |
| 
 | |
| class TestGetStackTrace(unittest.TestCase):
 | |
| 
 | |
|     @unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux", "Test only runs on Linux and MacOS")
 | |
|     @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, "Test only runs on Linux with process_vm_readv support")
 | |
|     def test_remote_stack_trace(self):
 | |
|         # Spawn a process with some realistic Python code
 | |
|         script = textwrap.dedent("""\
 | |
|             import time, sys, os
 | |
|             def bar():
 | |
|                 for x in range(100):
 | |
|                     if x == 50:
 | |
|                         baz()
 | |
|             def baz():
 | |
|                 foo()
 | |
| 
 | |
|             def foo():
 | |
|                 fifo = sys.argv[1]
 | |
|                 with open(sys.argv[1], "w") as fifo:
 | |
|                     fifo.write("ready")
 | |
|                 time.sleep(1000)
 | |
| 
 | |
|             bar()
 | |
|             """)
 | |
|         stack_trace = None
 | |
|         with os_helper.temp_dir() as work_dir:
 | |
|             script_dir = os.path.join(work_dir, "script_pkg")
 | |
|             os.mkdir(script_dir)
 | |
|             fifo = f"{work_dir}/the_fifo"
 | |
|             os.mkfifo(fifo)
 | |
|             script_name = _make_test_script(script_dir, 'script', script)
 | |
|             try:
 | |
|                 p = subprocess.Popen([sys.executable, script_name,  str(fifo)])
 | |
|                 with open(fifo, "r") as fifo_file:
 | |
|                     response = fifo_file.read()
 | |
|                 self.assertEqual(response, "ready")
 | |
|                 stack_trace = get_stack_trace(p.pid)
 | |
|             except PermissionError:
 | |
|                 self.skipTest("Insufficient permissions to read the stack trace")
 | |
|             finally:
 | |
|                 os.remove(fifo)
 | |
|                 p.kill()
 | |
|                 p.terminate()
 | |
|                 p.wait(timeout=SHORT_TIMEOUT)
 | |
| 
 | |
| 
 | |
|             expected_stack_trace = [
 | |
|                 'foo',
 | |
|                 'baz',
 | |
|                 'bar',
 | |
|                 '<module>'
 | |
|             ]
 | |
|             self.assertEqual(stack_trace, expected_stack_trace)
 | |
| 
 | |
|     @unittest.skipIf(sys.platform != "darwin" and sys.platform != "linux", "Test only runs on Linux and MacOS")
 | |
|     @unittest.skipIf(sys.platform == "linux" and not PROCESS_VM_READV_SUPPORTED, "Test only runs on Linux with process_vm_readv support")
 | |
|     def test_self_trace(self):
 | |
|         stack_trace = get_stack_trace(os.getpid())
 | |
|         self.assertEqual(stack_trace[0], "test_self_trace")
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     unittest.main()
 | 
