mirror of
https://github.com/python/cpython.git
synced 2025-11-25 21:11:09 +00:00
After approval from Anthony, merge the tim-current_frames
branch into the trunk. This adds a new sys._current_frames() function, which returns a dict mapping thread id to topmost thread stack frame.
This commit is contained in:
parent
2b221ed657
commit
32a8361f2d
6 changed files with 166 additions and 11 deletions
|
|
@ -237,6 +237,67 @@ class SysModuleTest(unittest.TestCase):
|
|||
is sys._getframe().f_code
|
||||
)
|
||||
|
||||
# sys._current_frames() is a CPython-only gimmick.
|
||||
def test_current_frames(self):
|
||||
import threading, thread
|
||||
import traceback
|
||||
|
||||
# Spawn a thread that blocks at a known place. Then the main
|
||||
# thread does sys._current_frames(), and verifies that the frames
|
||||
# returned make sense.
|
||||
entered_g = threading.Event()
|
||||
leave_g = threading.Event()
|
||||
thread_info = [] # the thread's id
|
||||
|
||||
def f123():
|
||||
g456()
|
||||
|
||||
def g456():
|
||||
thread_info.append(thread.get_ident())
|
||||
entered_g.set()
|
||||
leave_g.wait()
|
||||
|
||||
t = threading.Thread(target=f123)
|
||||
t.start()
|
||||
entered_g.wait()
|
||||
|
||||
# At this point, t has finished its entered_g.set(), and is blocked
|
||||
# in its leave_g.wait().
|
||||
self.assertEqual(len(thread_info), 1)
|
||||
thread_id = thread_info[0]
|
||||
|
||||
d = sys._current_frames()
|
||||
|
||||
main_id = thread.get_ident()
|
||||
self.assert_(main_id in d)
|
||||
self.assert_(thread_id in d)
|
||||
|
||||
# Verify that the captured main-thread frame is _this_ frame.
|
||||
frame = d.pop(main_id)
|
||||
self.assert_(frame is sys._getframe())
|
||||
|
||||
# Verify that the captured thread frame is blocked in g456, called
|
||||
# from f123. This is a litte tricky, since various bits of
|
||||
# threading.py are also in the thread's call stack.
|
||||
frame = d.pop(thread_id)
|
||||
stack = traceback.extract_stack(frame)
|
||||
for i, (filename, lineno, funcname, sourceline) in enumerate(stack):
|
||||
if funcname == "f123":
|
||||
break
|
||||
else:
|
||||
self.fail("didn't find f123() on thread's call stack")
|
||||
|
||||
self.assertEqual(sourceline, "g456()")
|
||||
|
||||
# And the next record must be for g456().
|
||||
filename, lineno, funcname, sourceline = stack[i+1]
|
||||
self.assertEqual(funcname, "g456")
|
||||
self.assertEqual(sourceline, "leave_g.wait()")
|
||||
|
||||
# Reap the spawned thread.
|
||||
leave_g.set()
|
||||
t.join()
|
||||
|
||||
def test_attributes(self):
|
||||
self.assert_(isinstance(sys.api_version, int))
|
||||
self.assert_(isinstance(sys.argv, list))
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue