mirror of
https://github.com/python/cpython.git
synced 2025-08-04 00:48:58 +00:00

When monitoring LINE events, instrument all instructions that can have a predecessor on a different line. Then check that the a new line has been hit in the instrumentation code. This brings the behavior closer to that of 3.11, simplifying implementation and porting of tools.
1210 lines
39 KiB
Python
1210 lines
39 KiB
Python
"""Test suite for the sys.monitoring."""
|
|
|
|
import collections
|
|
import functools
|
|
import operator
|
|
import sys
|
|
import types
|
|
import unittest
|
|
|
|
|
|
PAIR = (0,1)
|
|
|
|
def f1():
|
|
pass
|
|
|
|
def f2():
|
|
len([])
|
|
sys.getsizeof(0)
|
|
|
|
def floop():
|
|
for item in PAIR:
|
|
pass
|
|
|
|
def gen():
|
|
yield
|
|
yield
|
|
|
|
def g1():
|
|
for _ in gen():
|
|
pass
|
|
|
|
TEST_TOOL = 2
|
|
TEST_TOOL2 = 3
|
|
TEST_TOOL3 = 4
|
|
|
|
class MonitoringBasicTest(unittest.TestCase):
|
|
|
|
def test_has_objects(self):
|
|
m = sys.monitoring
|
|
m.events
|
|
m.use_tool_id
|
|
m.free_tool_id
|
|
m.get_tool
|
|
m.get_events
|
|
m.set_events
|
|
m.get_local_events
|
|
m.set_local_events
|
|
m.register_callback
|
|
m.restart_events
|
|
m.DISABLE
|
|
m.MISSING
|
|
m.events.NO_EVENTS
|
|
|
|
def test_tool(self):
|
|
sys.monitoring.use_tool_id(TEST_TOOL, "MonitoringTest.Tool")
|
|
self.assertEqual(sys.monitoring.get_tool(TEST_TOOL), "MonitoringTest.Tool")
|
|
sys.monitoring.set_events(TEST_TOOL, 15)
|
|
self.assertEqual(sys.monitoring.get_events(TEST_TOOL), 15)
|
|
sys.monitoring.set_events(TEST_TOOL, 0)
|
|
with self.assertRaises(ValueError):
|
|
sys.monitoring.set_events(TEST_TOOL, sys.monitoring.events.C_RETURN)
|
|
with self.assertRaises(ValueError):
|
|
sys.monitoring.set_events(TEST_TOOL, sys.monitoring.events.C_RAISE)
|
|
sys.monitoring.free_tool_id(TEST_TOOL)
|
|
self.assertEqual(sys.monitoring.get_tool(TEST_TOOL), None)
|
|
with self.assertRaises(ValueError):
|
|
sys.monitoring.set_events(TEST_TOOL, sys.monitoring.events.CALL)
|
|
|
|
|
|
class MonitoringTestBase:
|
|
|
|
def setUp(self):
|
|
# Check that a previous test hasn't left monitoring on.
|
|
for tool in range(6):
|
|
self.assertEqual(sys.monitoring.get_events(tool), 0)
|
|
self.assertIs(sys.monitoring.get_tool(TEST_TOOL), None)
|
|
self.assertIs(sys.monitoring.get_tool(TEST_TOOL2), None)
|
|
self.assertIs(sys.monitoring.get_tool(TEST_TOOL3), None)
|
|
sys.monitoring.use_tool_id(TEST_TOOL, "test " + self.__class__.__name__)
|
|
sys.monitoring.use_tool_id(TEST_TOOL2, "test2 " + self.__class__.__name__)
|
|
sys.monitoring.use_tool_id(TEST_TOOL3, "test3 " + self.__class__.__name__)
|
|
|
|
def tearDown(self):
|
|
# Check that test hasn't left monitoring on.
|
|
for tool in range(6):
|
|
self.assertEqual(sys.monitoring.get_events(tool), 0)
|
|
sys.monitoring.free_tool_id(TEST_TOOL)
|
|
sys.monitoring.free_tool_id(TEST_TOOL2)
|
|
sys.monitoring.free_tool_id(TEST_TOOL3)
|
|
|
|
|
|
class MonitoringCountTest(MonitoringTestBase, unittest.TestCase):
|
|
|
|
def check_event_count(self, func, event, expected):
|
|
|
|
class Counter:
|
|
def __init__(self):
|
|
self.count = 0
|
|
def __call__(self, *args):
|
|
self.count += 1
|
|
|
|
counter = Counter()
|
|
sys.monitoring.register_callback(TEST_TOOL, event, counter)
|
|
if event == E.C_RETURN or event == E.C_RAISE:
|
|
sys.monitoring.set_events(TEST_TOOL, E.CALL)
|
|
else:
|
|
sys.monitoring.set_events(TEST_TOOL, event)
|
|
self.assertEqual(counter.count, 0)
|
|
counter.count = 0
|
|
func()
|
|
self.assertEqual(counter.count, expected)
|
|
prev = sys.monitoring.register_callback(TEST_TOOL, event, None)
|
|
counter.count = 0
|
|
func()
|
|
self.assertEqual(counter.count, 0)
|
|
self.assertEqual(prev, counter)
|
|
sys.monitoring.set_events(TEST_TOOL, 0)
|
|
|
|
def test_start_count(self):
|
|
self.check_event_count(f1, E.PY_START, 1)
|
|
|
|
def test_resume_count(self):
|
|
self.check_event_count(g1, E.PY_RESUME, 2)
|
|
|
|
def test_return_count(self):
|
|
self.check_event_count(f1, E.PY_RETURN, 1)
|
|
|
|
def test_call_count(self):
|
|
self.check_event_count(f2, E.CALL, 3)
|
|
|
|
def test_c_return_count(self):
|
|
self.check_event_count(f2, E.C_RETURN, 2)
|
|
|
|
|
|
E = sys.monitoring.events
|
|
|
|
SIMPLE_EVENTS = [
|
|
(E.PY_START, "start"),
|
|
(E.PY_RESUME, "resume"),
|
|
(E.PY_RETURN, "return"),
|
|
(E.PY_YIELD, "yield"),
|
|
(E.JUMP, "jump"),
|
|
(E.BRANCH, "branch"),
|
|
(E.RAISE, "raise"),
|
|
(E.PY_UNWIND, "unwind"),
|
|
(E.EXCEPTION_HANDLED, "exception_handled"),
|
|
(E.C_RAISE, "c_raise"),
|
|
(E.C_RETURN, "c_return"),
|
|
]
|
|
|
|
SIMPLE_EVENT_SET = functools.reduce(operator.or_, [ev for (ev, _) in SIMPLE_EVENTS], 0) | E.CALL
|
|
|
|
|
|
def just_pass():
|
|
pass
|
|
|
|
just_pass.events = [
|
|
"py_call",
|
|
"start",
|
|
"return",
|
|
]
|
|
|
|
def just_raise():
|
|
raise Exception
|
|
|
|
just_raise.events = [
|
|
'py_call',
|
|
"start",
|
|
"raise",
|
|
"unwind",
|
|
]
|
|
|
|
def just_call():
|
|
len([])
|
|
|
|
just_call.events = [
|
|
'py_call',
|
|
"start",
|
|
"c_call",
|
|
"c_return",
|
|
"return",
|
|
]
|
|
|
|
def caught():
|
|
try:
|
|
1/0
|
|
except Exception:
|
|
pass
|
|
|
|
caught.events = [
|
|
'py_call',
|
|
"start",
|
|
"raise",
|
|
"exception_handled",
|
|
"branch",
|
|
"return",
|
|
]
|
|
|
|
def nested_call():
|
|
just_pass()
|
|
|
|
nested_call.events = [
|
|
"py_call",
|
|
"start",
|
|
"py_call",
|
|
"start",
|
|
"return",
|
|
"return",
|
|
]
|
|
|
|
PY_CALLABLES = (types.FunctionType, types.MethodType)
|
|
|
|
class MonitoringEventsBase(MonitoringTestBase):
|
|
|
|
def gather_events(self, func):
|
|
events = []
|
|
for event, event_name in SIMPLE_EVENTS:
|
|
def record(*args, event_name=event_name):
|
|
events.append(event_name)
|
|
sys.monitoring.register_callback(TEST_TOOL, event, record)
|
|
def record_call(code, offset, obj, arg):
|
|
if isinstance(obj, PY_CALLABLES):
|
|
events.append("py_call")
|
|
else:
|
|
events.append("c_call")
|
|
sys.monitoring.register_callback(TEST_TOOL, E.CALL, record_call)
|
|
sys.monitoring.set_events(TEST_TOOL, SIMPLE_EVENT_SET)
|
|
events = []
|
|
try:
|
|
func()
|
|
except:
|
|
pass
|
|
sys.monitoring.set_events(TEST_TOOL, 0)
|
|
#Remove the final event, the call to `sys.monitoring.set_events`
|
|
events = events[:-1]
|
|
return events
|
|
|
|
def check_events(self, func, expected=None):
|
|
events = self.gather_events(func)
|
|
if expected is None:
|
|
expected = func.events
|
|
self.assertEqual(events, expected)
|
|
|
|
|
|
class MonitoringEventsTest(MonitoringEventsBase, unittest.TestCase):
|
|
|
|
def test_just_pass(self):
|
|
self.check_events(just_pass)
|
|
|
|
def test_just_raise(self):
|
|
try:
|
|
self.check_events(just_raise)
|
|
except Exception:
|
|
pass
|
|
self.assertEqual(sys.monitoring.get_events(TEST_TOOL), 0)
|
|
|
|
def test_just_call(self):
|
|
self.check_events(just_call)
|
|
|
|
def test_caught(self):
|
|
self.check_events(caught)
|
|
|
|
def test_nested_call(self):
|
|
self.check_events(nested_call)
|
|
|
|
UP_EVENTS = (E.C_RETURN, E.C_RAISE, E.PY_RETURN, E.PY_UNWIND, E.PY_YIELD)
|
|
DOWN_EVENTS = (E.PY_START, E.PY_RESUME)
|
|
|
|
from test.profilee import testfunc
|
|
|
|
class SimulateProfileTest(MonitoringEventsBase, unittest.TestCase):
|
|
|
|
def test_balanced(self):
|
|
events = self.gather_events(testfunc)
|
|
c = collections.Counter(events)
|
|
self.assertEqual(c["c_call"], c["c_return"])
|
|
self.assertEqual(c["start"], c["return"] + c["unwind"])
|
|
self.assertEqual(c["raise"], c["exception_handled"] + c["unwind"])
|
|
|
|
def test_frame_stack(self):
|
|
self.maxDiff = None
|
|
stack = []
|
|
errors = []
|
|
seen = set()
|
|
def up(*args):
|
|
frame = sys._getframe(1)
|
|
if not stack:
|
|
errors.append("empty")
|
|
else:
|
|
expected = stack.pop()
|
|
if frame != expected:
|
|
errors.append(f" Popping {frame} expected {expected}")
|
|
def down(*args):
|
|
frame = sys._getframe(1)
|
|
stack.append(frame)
|
|
seen.add(frame.f_code)
|
|
def call(code, offset, callable, arg):
|
|
if not isinstance(callable, PY_CALLABLES):
|
|
stack.append(sys._getframe(1))
|
|
for event in UP_EVENTS:
|
|
sys.monitoring.register_callback(TEST_TOOL, event, up)
|
|
for event in DOWN_EVENTS:
|
|
sys.monitoring.register_callback(TEST_TOOL, event, down)
|
|
sys.monitoring.register_callback(TEST_TOOL, E.CALL, call)
|
|
sys.monitoring.set_events(TEST_TOOL, SIMPLE_EVENT_SET)
|
|
testfunc()
|
|
sys.monitoring.set_events(TEST_TOOL, 0)
|
|
self.assertEqual(errors, [])
|
|
self.assertEqual(stack, [sys._getframe()])
|
|
self.assertEqual(len(seen), 9)
|
|
|
|
|
|
class CounterWithDisable:
|
|
|
|
def __init__(self):
|
|
self.disable = False
|
|
self.count = 0
|
|
|
|
def __call__(self, *args):
|
|
self.count += 1
|
|
if self.disable:
|
|
return sys.monitoring.DISABLE
|
|
|
|
|
|
class RecorderWithDisable:
|
|
|
|
def __init__(self, events):
|
|
self.disable = False
|
|
self.events = events
|
|
|
|
def __call__(self, code, event):
|
|
self.events.append(event)
|
|
if self.disable:
|
|
return sys.monitoring.DISABLE
|
|
|
|
|
|
class MontoringDisableAndRestartTest(MonitoringTestBase, unittest.TestCase):
|
|
|
|
def test_disable(self):
|
|
try:
|
|
counter = CounterWithDisable()
|
|
sys.monitoring.register_callback(TEST_TOOL, E.PY_START, counter)
|
|
sys.monitoring.set_events(TEST_TOOL, E.PY_START)
|
|
self.assertEqual(counter.count, 0)
|
|
counter.count = 0
|
|
f1()
|
|
self.assertEqual(counter.count, 1)
|
|
counter.disable = True
|
|
counter.count = 0
|
|
f1()
|
|
self.assertEqual(counter.count, 1)
|
|
counter.count = 0
|
|
f1()
|
|
self.assertEqual(counter.count, 0)
|
|
sys.monitoring.set_events(TEST_TOOL, 0)
|
|
finally:
|
|
sys.monitoring.restart_events()
|
|
|
|
def test_restart(self):
|
|
try:
|
|
counter = CounterWithDisable()
|
|
sys.monitoring.register_callback(TEST_TOOL, E.PY_START, counter)
|
|
sys.monitoring.set_events(TEST_TOOL, E.PY_START)
|
|
counter.disable = True
|
|
f1()
|
|
counter.count = 0
|
|
f1()
|
|
self.assertEqual(counter.count, 0)
|
|
sys.monitoring.restart_events()
|
|
counter.count = 0
|
|
f1()
|
|
self.assertEqual(counter.count, 1)
|
|
sys.monitoring.set_events(TEST_TOOL, 0)
|
|
finally:
|
|
sys.monitoring.restart_events()
|
|
|
|
|
|
class MultipleMonitorsTest(MonitoringTestBase, unittest.TestCase):
|
|
|
|
def test_two_same(self):
|
|
try:
|
|
self.assertEqual(sys.monitoring._all_events(), {})
|
|
counter1 = CounterWithDisable()
|
|
counter2 = CounterWithDisable()
|
|
sys.monitoring.register_callback(TEST_TOOL, E.PY_START, counter1)
|
|
sys.monitoring.register_callback(TEST_TOOL2, E.PY_START, counter2)
|
|
sys.monitoring.set_events(TEST_TOOL, E.PY_START)
|
|
sys.monitoring.set_events(TEST_TOOL2, E.PY_START)
|
|
self.assertEqual(sys.monitoring.get_events(TEST_TOOL), E.PY_START)
|
|
self.assertEqual(sys.monitoring.get_events(TEST_TOOL2), E.PY_START)
|
|
self.assertEqual(sys.monitoring._all_events(), {'PY_START': (1 << TEST_TOOL) | (1 << TEST_TOOL2)})
|
|
counter1.count = 0
|
|
counter2.count = 0
|
|
f1()
|
|
count1 = counter1.count
|
|
count2 = counter2.count
|
|
self.assertEqual((count1, count2), (1, 1))
|
|
finally:
|
|
sys.monitoring.set_events(TEST_TOOL, 0)
|
|
sys.monitoring.set_events(TEST_TOOL2, 0)
|
|
sys.monitoring.register_callback(TEST_TOOL, E.PY_START, None)
|
|
sys.monitoring.register_callback(TEST_TOOL2, E.PY_START, None)
|
|
self.assertEqual(sys.monitoring._all_events(), {})
|
|
|
|
def test_three_same(self):
|
|
try:
|
|
self.assertEqual(sys.monitoring._all_events(), {})
|
|
counter1 = CounterWithDisable()
|
|
counter2 = CounterWithDisable()
|
|
counter3 = CounterWithDisable()
|
|
sys.monitoring.register_callback(TEST_TOOL, E.PY_START, counter1)
|
|
sys.monitoring.register_callback(TEST_TOOL2, E.PY_START, counter2)
|
|
sys.monitoring.register_callback(TEST_TOOL3, E.PY_START, counter3)
|
|
sys.monitoring.set_events(TEST_TOOL, E.PY_START)
|
|
sys.monitoring.set_events(TEST_TOOL2, E.PY_START)
|
|
sys.monitoring.set_events(TEST_TOOL3, E.PY_START)
|
|
self.assertEqual(sys.monitoring.get_events(TEST_TOOL), E.PY_START)
|
|
self.assertEqual(sys.monitoring.get_events(TEST_TOOL2), E.PY_START)
|
|
self.assertEqual(sys.monitoring.get_events(TEST_TOOL3), E.PY_START)
|
|
self.assertEqual(sys.monitoring._all_events(), {'PY_START': (1 << TEST_TOOL) | (1 << TEST_TOOL2) | (1 << TEST_TOOL3)})
|
|
counter1.count = 0
|
|
counter2.count = 0
|
|
counter3.count = 0
|
|
f1()
|
|
count1 = counter1.count
|
|
count2 = counter2.count
|
|
count3 = counter3.count
|
|
self.assertEqual((count1, count2, count3), (1, 1, 1))
|
|
finally:
|
|
sys.monitoring.set_events(TEST_TOOL, 0)
|
|
sys.monitoring.set_events(TEST_TOOL2, 0)
|
|
sys.monitoring.set_events(TEST_TOOL3, 0)
|
|
sys.monitoring.register_callback(TEST_TOOL, E.PY_START, None)
|
|
sys.monitoring.register_callback(TEST_TOOL2, E.PY_START, None)
|
|
sys.monitoring.register_callback(TEST_TOOL3, E.PY_START, None)
|
|
self.assertEqual(sys.monitoring._all_events(), {})
|
|
|
|
def test_two_different(self):
|
|
try:
|
|
self.assertEqual(sys.monitoring._all_events(), {})
|
|
counter1 = CounterWithDisable()
|
|
counter2 = CounterWithDisable()
|
|
sys.monitoring.register_callback(TEST_TOOL, E.PY_START, counter1)
|
|
sys.monitoring.register_callback(TEST_TOOL2, E.PY_RETURN, counter2)
|
|
sys.monitoring.set_events(TEST_TOOL, E.PY_START)
|
|
sys.monitoring.set_events(TEST_TOOL2, E.PY_RETURN)
|
|
self.assertEqual(sys.monitoring.get_events(TEST_TOOL), E.PY_START)
|
|
self.assertEqual(sys.monitoring.get_events(TEST_TOOL2), E.PY_RETURN)
|
|
self.assertEqual(sys.monitoring._all_events(), {'PY_START': 1 << TEST_TOOL, 'PY_RETURN': 1 << TEST_TOOL2})
|
|
counter1.count = 0
|
|
counter2.count = 0
|
|
f1()
|
|
count1 = counter1.count
|
|
count2 = counter2.count
|
|
self.assertEqual((count1, count2), (1, 1))
|
|
finally:
|
|
sys.monitoring.set_events(TEST_TOOL, 0)
|
|
sys.monitoring.set_events(TEST_TOOL2, 0)
|
|
sys.monitoring.register_callback(TEST_TOOL, E.PY_START, None)
|
|
sys.monitoring.register_callback(TEST_TOOL2, E.PY_RETURN, None)
|
|
self.assertEqual(sys.monitoring._all_events(), {})
|
|
|
|
def test_two_with_disable(self):
|
|
try:
|
|
self.assertEqual(sys.monitoring._all_events(), {})
|
|
counter1 = CounterWithDisable()
|
|
counter2 = CounterWithDisable()
|
|
sys.monitoring.register_callback(TEST_TOOL, E.PY_START, counter1)
|
|
sys.monitoring.register_callback(TEST_TOOL2, E.PY_START, counter2)
|
|
sys.monitoring.set_events(TEST_TOOL, E.PY_START)
|
|
sys.monitoring.set_events(TEST_TOOL2, E.PY_START)
|
|
self.assertEqual(sys.monitoring.get_events(TEST_TOOL), E.PY_START)
|
|
self.assertEqual(sys.monitoring.get_events(TEST_TOOL2), E.PY_START)
|
|
self.assertEqual(sys.monitoring._all_events(), {'PY_START': (1 << TEST_TOOL) | (1 << TEST_TOOL2)})
|
|
counter1.count = 0
|
|
counter2.count = 0
|
|
counter1.disable = True
|
|
f1()
|
|
count1 = counter1.count
|
|
count2 = counter2.count
|
|
self.assertEqual((count1, count2), (1, 1))
|
|
counter1.count = 0
|
|
counter2.count = 0
|
|
f1()
|
|
count1 = counter1.count
|
|
count2 = counter2.count
|
|
self.assertEqual((count1, count2), (0, 1))
|
|
finally:
|
|
sys.monitoring.set_events(TEST_TOOL, 0)
|
|
sys.monitoring.set_events(TEST_TOOL2, 0)
|
|
sys.monitoring.register_callback(TEST_TOOL, E.PY_START, None)
|
|
sys.monitoring.register_callback(TEST_TOOL2, E.PY_START, None)
|
|
self.assertEqual(sys.monitoring._all_events(), {})
|
|
sys.monitoring.restart_events()
|
|
|
|
class LineMonitoringTest(MonitoringTestBase, unittest.TestCase):
|
|
|
|
def test_lines_single(self):
|
|
try:
|
|
self.assertEqual(sys.monitoring._all_events(), {})
|
|
events = []
|
|
recorder = RecorderWithDisable(events)
|
|
sys.monitoring.register_callback(TEST_TOOL, E.LINE, recorder)
|
|
sys.monitoring.set_events(TEST_TOOL, E.LINE)
|
|
f1()
|
|
sys.monitoring.set_events(TEST_TOOL, 0)
|
|
sys.monitoring.register_callback(TEST_TOOL, E.LINE, None)
|
|
start = LineMonitoringTest.test_lines_single.__code__.co_firstlineno
|
|
self.assertEqual(events, [start+7, 14, start+8])
|
|
finally:
|
|
sys.monitoring.set_events(TEST_TOOL, 0)
|
|
sys.monitoring.register_callback(TEST_TOOL, E.LINE, None)
|
|
self.assertEqual(sys.monitoring._all_events(), {})
|
|
sys.monitoring.restart_events()
|
|
|
|
def test_lines_loop(self):
|
|
try:
|
|
self.assertEqual(sys.monitoring._all_events(), {})
|
|
events = []
|
|
recorder = RecorderWithDisable(events)
|
|
sys.monitoring.register_callback(TEST_TOOL, E.LINE, recorder)
|
|
sys.monitoring.set_events(TEST_TOOL, E.LINE)
|
|
floop()
|
|
sys.monitoring.set_events(TEST_TOOL, 0)
|
|
sys.monitoring.register_callback(TEST_TOOL, E.LINE, None)
|
|
start = LineMonitoringTest.test_lines_loop.__code__.co_firstlineno
|
|
self.assertEqual(events, [start+7, 21, 22, 21, 22, 21, start+8])
|
|
finally:
|
|
sys.monitoring.set_events(TEST_TOOL, 0)
|
|
sys.monitoring.register_callback(TEST_TOOL, E.LINE, None)
|
|
self.assertEqual(sys.monitoring._all_events(), {})
|
|
sys.monitoring.restart_events()
|
|
|
|
def test_lines_two(self):
|
|
try:
|
|
self.assertEqual(sys.monitoring._all_events(), {})
|
|
events = []
|
|
recorder = RecorderWithDisable(events)
|
|
events2 = []
|
|
recorder2 = RecorderWithDisable(events2)
|
|
sys.monitoring.register_callback(TEST_TOOL, E.LINE, recorder)
|
|
sys.monitoring.register_callback(TEST_TOOL2, E.LINE, recorder2)
|
|
sys.monitoring.set_events(TEST_TOOL, E.LINE); sys.monitoring.set_events(TEST_TOOL2, E.LINE)
|
|
f1()
|
|
sys.monitoring.set_events(TEST_TOOL, 0); sys.monitoring.set_events(TEST_TOOL2, 0)
|
|
sys.monitoring.register_callback(TEST_TOOL, E.LINE, None)
|
|
sys.monitoring.register_callback(TEST_TOOL2, E.LINE, None)
|
|
start = LineMonitoringTest.test_lines_two.__code__.co_firstlineno
|
|
expected = [start+10, 14, start+11]
|
|
self.assertEqual(events, expected)
|
|
self.assertEqual(events2, expected)
|
|
finally:
|
|
sys.monitoring.set_events(TEST_TOOL, 0)
|
|
sys.monitoring.set_events(TEST_TOOL2, 0)
|
|
sys.monitoring.register_callback(TEST_TOOL, E.LINE, None)
|
|
sys.monitoring.register_callback(TEST_TOOL2, E.LINE, None)
|
|
self.assertEqual(sys.monitoring._all_events(), {})
|
|
sys.monitoring.restart_events()
|
|
|
|
def check_lines(self, func, expected, tool=TEST_TOOL):
|
|
try:
|
|
self.assertEqual(sys.monitoring._all_events(), {})
|
|
events = []
|
|
recorder = RecorderWithDisable(events)
|
|
sys.monitoring.register_callback(tool, E.LINE, recorder)
|
|
sys.monitoring.set_events(tool, E.LINE)
|
|
func()
|
|
sys.monitoring.set_events(tool, 0)
|
|
sys.monitoring.register_callback(tool, E.LINE, None)
|
|
lines = [ line - func.__code__.co_firstlineno for line in events[1:-1] ]
|
|
self.assertEqual(lines, expected)
|
|
finally:
|
|
sys.monitoring.set_events(tool, 0)
|
|
|
|
|
|
def test_linear(self):
|
|
|
|
def func():
|
|
line = 1
|
|
line = 2
|
|
line = 3
|
|
line = 4
|
|
line = 5
|
|
|
|
self.check_lines(func, [1,2,3,4,5])
|
|
|
|
def test_branch(self):
|
|
def func():
|
|
if "true".startswith("t"):
|
|
line = 2
|
|
line = 3
|
|
else:
|
|
line = 5
|
|
line = 6
|
|
|
|
self.check_lines(func, [1,2,3,6])
|
|
|
|
def test_try_except(self):
|
|
|
|
def func1():
|
|
try:
|
|
line = 2
|
|
line = 3
|
|
except:
|
|
line = 5
|
|
line = 6
|
|
|
|
self.check_lines(func1, [1,2,3,6])
|
|
|
|
def func2():
|
|
try:
|
|
line = 2
|
|
raise 3
|
|
except:
|
|
line = 5
|
|
line = 6
|
|
|
|
self.check_lines(func2, [1,2,3,4,5,6])
|
|
|
|
|
|
class ExceptionRecorder:
|
|
|
|
event_type = E.RAISE
|
|
|
|
def __init__(self, events):
|
|
self.events = events
|
|
|
|
def __call__(self, code, offset, exc):
|
|
self.events.append(("raise", type(exc)))
|
|
|
|
class CheckEvents(MonitoringTestBase, unittest.TestCase):
|
|
|
|
def check_events(self, func, expected, tool=TEST_TOOL, recorders=(ExceptionRecorder,)):
|
|
try:
|
|
self.assertEqual(sys.monitoring._all_events(), {})
|
|
event_list = []
|
|
all_events = 0
|
|
for recorder in recorders:
|
|
ev = recorder.event_type
|
|
sys.monitoring.register_callback(tool, ev, recorder(event_list))
|
|
all_events |= ev
|
|
sys.monitoring.set_events(tool, all_events)
|
|
func()
|
|
sys.monitoring.set_events(tool, 0)
|
|
for recorder in recorders:
|
|
sys.monitoring.register_callback(tool, recorder.event_type, None)
|
|
self.assertEqual(event_list, expected)
|
|
finally:
|
|
sys.monitoring.set_events(tool, 0)
|
|
for recorder in recorders:
|
|
sys.monitoring.register_callback(tool, recorder.event_type, None)
|
|
|
|
class StopiterationRecorder(ExceptionRecorder):
|
|
|
|
event_type = E.STOP_ITERATION
|
|
|
|
class ExceptionMontoringTest(CheckEvents):
|
|
|
|
recorder = ExceptionRecorder
|
|
|
|
def test_simple_try_except(self):
|
|
|
|
def func1():
|
|
try:
|
|
line = 2
|
|
raise KeyError
|
|
except:
|
|
line = 5
|
|
line = 6
|
|
|
|
self.check_events(func1, [("raise", KeyError)])
|
|
|
|
def gen():
|
|
yield 1
|
|
return 2
|
|
|
|
def implicit_stop_iteration():
|
|
for _ in gen():
|
|
pass
|
|
|
|
self.check_events(implicit_stop_iteration, [("raise", StopIteration)], recorders=(StopiterationRecorder,))
|
|
|
|
class LineRecorder:
|
|
|
|
event_type = E.LINE
|
|
|
|
|
|
def __init__(self, events):
|
|
self.events = events
|
|
|
|
def __call__(self, code, line):
|
|
self.events.append(("line", code.co_name, line - code.co_firstlineno))
|
|
|
|
class CallRecorder:
|
|
|
|
event_type = E.CALL
|
|
|
|
def __init__(self, events):
|
|
self.events = events
|
|
|
|
def __call__(self, code, offset, func, arg):
|
|
self.events.append(("call", func.__name__, arg))
|
|
|
|
class CEventRecorder:
|
|
|
|
def __init__(self, events):
|
|
self.events = events
|
|
|
|
def __call__(self, code, offset, func, arg):
|
|
self.events.append((self.event_name, func.__name__, arg))
|
|
|
|
class CReturnRecorder(CEventRecorder):
|
|
|
|
event_type = E.C_RETURN
|
|
event_name = "C return"
|
|
|
|
class CRaiseRecorder(CEventRecorder):
|
|
|
|
event_type = E.C_RAISE
|
|
event_name = "C raise"
|
|
|
|
MANY_RECORDERS = ExceptionRecorder, CallRecorder, LineRecorder, CReturnRecorder, CRaiseRecorder
|
|
|
|
class TestManyEvents(CheckEvents):
|
|
|
|
def test_simple(self):
|
|
|
|
def func1():
|
|
line1 = 1
|
|
line2 = 2
|
|
line3 = 3
|
|
|
|
self.check_events(func1, recorders = MANY_RECORDERS, expected = [
|
|
('line', 'check_events', 10),
|
|
('call', 'func1', sys.monitoring.MISSING),
|
|
('line', 'func1', 1),
|
|
('line', 'func1', 2),
|
|
('line', 'func1', 3),
|
|
('line', 'check_events', 11),
|
|
('call', 'set_events', 2)])
|
|
|
|
def test_c_call(self):
|
|
|
|
def func2():
|
|
line1 = 1
|
|
[].append(2)
|
|
line3 = 3
|
|
|
|
self.check_events(func2, recorders = MANY_RECORDERS, expected = [
|
|
('line', 'check_events', 10),
|
|
('call', 'func2', sys.monitoring.MISSING),
|
|
('line', 'func2', 1),
|
|
('line', 'func2', 2),
|
|
('call', 'append', [2]),
|
|
('C return', 'append', [2]),
|
|
('line', 'func2', 3),
|
|
('line', 'check_events', 11),
|
|
('call', 'set_events', 2)])
|
|
|
|
def test_try_except(self):
|
|
|
|
def func3():
|
|
try:
|
|
line = 2
|
|
raise KeyError
|
|
except:
|
|
line = 5
|
|
line = 6
|
|
|
|
self.check_events(func3, recorders = MANY_RECORDERS, expected = [
|
|
('line', 'check_events', 10),
|
|
('call', 'func3', sys.monitoring.MISSING),
|
|
('line', 'func3', 1),
|
|
('line', 'func3', 2),
|
|
('line', 'func3', 3),
|
|
('raise', KeyError),
|
|
('line', 'func3', 4),
|
|
('line', 'func3', 5),
|
|
('line', 'func3', 6),
|
|
('line', 'check_events', 11),
|
|
('call', 'set_events', 2)])
|
|
|
|
class InstructionRecorder:
|
|
|
|
event_type = E.INSTRUCTION
|
|
|
|
def __init__(self, events):
|
|
self.events = events
|
|
|
|
def __call__(self, code, offset):
|
|
# Filter out instructions in check_events to lower noise
|
|
if code.co_name != "check_events":
|
|
self.events.append(("instruction", code.co_name, offset))
|
|
|
|
|
|
LINE_AND_INSTRUCTION_RECORDERS = InstructionRecorder, LineRecorder
|
|
|
|
class TestLineAndInstructionEvents(CheckEvents):
|
|
maxDiff = None
|
|
|
|
def test_simple(self):
|
|
|
|
def func1():
|
|
line1 = 1
|
|
line2 = 2
|
|
line3 = 3
|
|
|
|
self.check_events(func1, recorders = LINE_AND_INSTRUCTION_RECORDERS, expected = [
|
|
('line', 'check_events', 10),
|
|
('line', 'func1', 1),
|
|
('instruction', 'func1', 2),
|
|
('instruction', 'func1', 4),
|
|
('line', 'func1', 2),
|
|
('instruction', 'func1', 6),
|
|
('instruction', 'func1', 8),
|
|
('line', 'func1', 3),
|
|
('instruction', 'func1', 10),
|
|
('instruction', 'func1', 12),
|
|
('instruction', 'func1', 14),
|
|
('line', 'check_events', 11)])
|
|
|
|
def test_c_call(self):
|
|
|
|
def func2():
|
|
line1 = 1
|
|
[].append(2)
|
|
line3 = 3
|
|
|
|
self.check_events(func2, recorders = LINE_AND_INSTRUCTION_RECORDERS, expected = [
|
|
('line', 'check_events', 10),
|
|
('line', 'func2', 1),
|
|
('instruction', 'func2', 2),
|
|
('instruction', 'func2', 4),
|
|
('line', 'func2', 2),
|
|
('instruction', 'func2', 6),
|
|
('instruction', 'func2', 8),
|
|
('instruction', 'func2', 28),
|
|
('instruction', 'func2', 30),
|
|
('instruction', 'func2', 38),
|
|
('line', 'func2', 3),
|
|
('instruction', 'func2', 40),
|
|
('instruction', 'func2', 42),
|
|
('instruction', 'func2', 44),
|
|
('line', 'check_events', 11)])
|
|
|
|
def test_try_except(self):
|
|
|
|
def func3():
|
|
try:
|
|
line = 2
|
|
raise KeyError
|
|
except:
|
|
line = 5
|
|
line = 6
|
|
|
|
self.check_events(func3, recorders = LINE_AND_INSTRUCTION_RECORDERS, expected = [
|
|
('line', 'check_events', 10),
|
|
('line', 'func3', 1),
|
|
('instruction', 'func3', 2),
|
|
('line', 'func3', 2),
|
|
('instruction', 'func3', 4),
|
|
('instruction', 'func3', 6),
|
|
('line', 'func3', 3),
|
|
('instruction', 'func3', 8),
|
|
('instruction', 'func3', 18),
|
|
('instruction', 'func3', 20),
|
|
('line', 'func3', 4),
|
|
('instruction', 'func3', 22),
|
|
('line', 'func3', 5),
|
|
('instruction', 'func3', 24),
|
|
('instruction', 'func3', 26),
|
|
('instruction', 'func3', 28),
|
|
('line', 'func3', 6),
|
|
('instruction', 'func3', 30),
|
|
('instruction', 'func3', 32),
|
|
('instruction', 'func3', 34),
|
|
('line', 'check_events', 11)])
|
|
|
|
def test_with_restart(self):
|
|
def func1():
|
|
line1 = 1
|
|
line2 = 2
|
|
line3 = 3
|
|
|
|
self.check_events(func1, recorders = LINE_AND_INSTRUCTION_RECORDERS, expected = [
|
|
('line', 'check_events', 10),
|
|
('line', 'func1', 1),
|
|
('instruction', 'func1', 2),
|
|
('instruction', 'func1', 4),
|
|
('line', 'func1', 2),
|
|
('instruction', 'func1', 6),
|
|
('instruction', 'func1', 8),
|
|
('line', 'func1', 3),
|
|
('instruction', 'func1', 10),
|
|
('instruction', 'func1', 12),
|
|
('instruction', 'func1', 14),
|
|
('line', 'check_events', 11)])
|
|
|
|
sys.monitoring.restart_events()
|
|
|
|
self.check_events(func1, recorders = LINE_AND_INSTRUCTION_RECORDERS, expected = [
|
|
('line', 'check_events', 10),
|
|
('line', 'func1', 1),
|
|
('instruction', 'func1', 2),
|
|
('instruction', 'func1', 4),
|
|
('line', 'func1', 2),
|
|
('instruction', 'func1', 6),
|
|
('instruction', 'func1', 8),
|
|
('line', 'func1', 3),
|
|
('instruction', 'func1', 10),
|
|
('instruction', 'func1', 12),
|
|
('instruction', 'func1', 14),
|
|
('line', 'check_events', 11)])
|
|
|
|
class TestInstallIncrementallly(MonitoringTestBase, unittest.TestCase):
|
|
|
|
def check_events(self, func, must_include, tool=TEST_TOOL, recorders=(ExceptionRecorder,)):
|
|
try:
|
|
self.assertEqual(sys.monitoring._all_events(), {})
|
|
event_list = []
|
|
all_events = 0
|
|
for recorder in recorders:
|
|
all_events |= recorder.event_type
|
|
sys.monitoring.set_events(tool, all_events)
|
|
for recorder in recorders:
|
|
sys.monitoring.register_callback(tool, recorder.event_type, recorder(event_list))
|
|
func()
|
|
sys.monitoring.set_events(tool, 0)
|
|
for recorder in recorders:
|
|
sys.monitoring.register_callback(tool, recorder.event_type, None)
|
|
for line in must_include:
|
|
self.assertIn(line, event_list)
|
|
finally:
|
|
sys.monitoring.set_events(tool, 0)
|
|
for recorder in recorders:
|
|
sys.monitoring.register_callback(tool, recorder.event_type, None)
|
|
|
|
@staticmethod
|
|
def func1():
|
|
line1 = 1
|
|
|
|
MUST_INCLUDE_LI = [
|
|
('instruction', 'func1', 2),
|
|
('line', 'func1', 1),
|
|
('instruction', 'func1', 4),
|
|
('instruction', 'func1', 6)]
|
|
|
|
def test_line_then_instruction(self):
|
|
recorders = [ LineRecorder, InstructionRecorder ]
|
|
self.check_events(self.func1,
|
|
recorders = recorders, must_include = self.EXPECTED_LI)
|
|
|
|
def test_instruction_then_line(self):
|
|
recorders = [ InstructionRecorder, LineRecorderLowNoise ]
|
|
self.check_events(self.func1,
|
|
recorders = recorders, must_include = self.EXPECTED_LI)
|
|
|
|
@staticmethod
|
|
def func2():
|
|
len(())
|
|
|
|
MUST_INCLUDE_CI = [
|
|
('instruction', 'func2', 2),
|
|
('call', 'func2', sys.monitoring.MISSING),
|
|
('call', 'len', ()),
|
|
('instruction', 'func2', 12),
|
|
('instruction', 'func2', 14)]
|
|
|
|
|
|
|
|
def test_line_then_instruction(self):
|
|
recorders = [ CallRecorder, InstructionRecorder ]
|
|
self.check_events(self.func2,
|
|
recorders = recorders, must_include = self.MUST_INCLUDE_CI)
|
|
|
|
def test_instruction_then_line(self):
|
|
recorders = [ InstructionRecorder, CallRecorder ]
|
|
self.check_events(self.func2,
|
|
recorders = recorders, must_include = self.MUST_INCLUDE_CI)
|
|
|
|
class TestLocalEvents(MonitoringTestBase, unittest.TestCase):
|
|
|
|
def check_events(self, func, expected, tool=TEST_TOOL, recorders=(ExceptionRecorder,)):
|
|
try:
|
|
self.assertEqual(sys.monitoring._all_events(), {})
|
|
event_list = []
|
|
all_events = 0
|
|
for recorder in recorders:
|
|
ev = recorder.event_type
|
|
sys.monitoring.register_callback(tool, ev, recorder(event_list))
|
|
all_events |= ev
|
|
sys.monitoring.set_local_events(tool, func.__code__, all_events)
|
|
func()
|
|
sys.monitoring.set_local_events(tool, func.__code__, 0)
|
|
for recorder in recorders:
|
|
sys.monitoring.register_callback(tool, recorder.event_type, None)
|
|
self.assertEqual(event_list, expected)
|
|
finally:
|
|
sys.monitoring.set_local_events(tool, func.__code__, 0)
|
|
for recorder in recorders:
|
|
sys.monitoring.register_callback(tool, recorder.event_type, None)
|
|
|
|
|
|
def test_simple(self):
|
|
|
|
def func1():
|
|
line1 = 1
|
|
line2 = 2
|
|
line3 = 3
|
|
|
|
self.check_events(func1, recorders = MANY_RECORDERS, expected = [
|
|
('line', 'func1', 1),
|
|
('line', 'func1', 2),
|
|
('line', 'func1', 3)])
|
|
|
|
def test_c_call(self):
|
|
|
|
def func2():
|
|
line1 = 1
|
|
[].append(2)
|
|
line3 = 3
|
|
|
|
self.check_events(func2, recorders = MANY_RECORDERS, expected = [
|
|
('line', 'func2', 1),
|
|
('line', 'func2', 2),
|
|
('call', 'append', [2]),
|
|
('C return', 'append', [2]),
|
|
('line', 'func2', 3)])
|
|
|
|
def test_try_except(self):
|
|
|
|
def func3():
|
|
try:
|
|
line = 2
|
|
raise KeyError
|
|
except:
|
|
line = 5
|
|
line = 6
|
|
|
|
self.check_events(func3, recorders = MANY_RECORDERS, expected = [
|
|
('line', 'func3', 1),
|
|
('line', 'func3', 2),
|
|
('line', 'func3', 3),
|
|
('raise', KeyError),
|
|
('line', 'func3', 4),
|
|
('line', 'func3', 5),
|
|
('line', 'func3', 6)])
|
|
|
|
|
|
def line_from_offset(code, offset):
|
|
for start, end, line in code.co_lines():
|
|
if start <= offset < end:
|
|
if line is None:
|
|
return f"[offset={offset}]"
|
|
return line - code.co_firstlineno
|
|
return -1
|
|
|
|
class JumpRecorder:
|
|
|
|
event_type = E.JUMP
|
|
name = "jump"
|
|
|
|
def __init__(self, events):
|
|
self.events = events
|
|
|
|
def __call__(self, code, from_, to):
|
|
from_line = line_from_offset(code, from_)
|
|
to_line = line_from_offset(code, to)
|
|
self.events.append((self.name, code.co_name, from_line, to_line))
|
|
|
|
|
|
class BranchRecorder(JumpRecorder):
|
|
|
|
event_type = E.BRANCH
|
|
name = "branch"
|
|
|
|
class ReturnRecorder:
|
|
|
|
event_type = E.PY_RETURN
|
|
|
|
def __init__(self, events):
|
|
self.events = events
|
|
|
|
def __call__(self, code, offset, val):
|
|
self.events.append(("return", val))
|
|
|
|
|
|
JUMP_AND_BRANCH_RECORDERS = JumpRecorder, BranchRecorder
|
|
JUMP_BRANCH_AND_LINE_RECORDERS = JumpRecorder, BranchRecorder, LineRecorder
|
|
FLOW_AND_LINE_RECORDERS = JumpRecorder, BranchRecorder, LineRecorder, ExceptionRecorder, ReturnRecorder
|
|
|
|
class TestBranchAndJumpEvents(CheckEvents):
|
|
maxDiff = None
|
|
|
|
def test_loop(self):
|
|
|
|
def func():
|
|
x = 1
|
|
for a in range(2):
|
|
if a:
|
|
x = 4
|
|
else:
|
|
x = 6
|
|
|
|
self.check_events(func, recorders = JUMP_AND_BRANCH_RECORDERS, expected = [
|
|
('branch', 'func', 2, 2),
|
|
('branch', 'func', 3, 6),
|
|
('jump', 'func', 6, 2),
|
|
('branch', 'func', 2, 2),
|
|
('branch', 'func', 3, 4),
|
|
('jump', 'func', 4, 2),
|
|
('branch', 'func', 2, 2)])
|
|
|
|
self.check_events(func, recorders = JUMP_BRANCH_AND_LINE_RECORDERS, expected = [
|
|
('line', 'check_events', 10),
|
|
('line', 'func', 1),
|
|
('line', 'func', 2),
|
|
('branch', 'func', 2, 2),
|
|
('line', 'func', 3),
|
|
('branch', 'func', 3, 6),
|
|
('line', 'func', 6),
|
|
('jump', 'func', 6, 2),
|
|
('line', 'func', 2),
|
|
('branch', 'func', 2, 2),
|
|
('line', 'func', 3),
|
|
('branch', 'func', 3, 4),
|
|
('line', 'func', 4),
|
|
('jump', 'func', 4, 2),
|
|
('line', 'func', 2),
|
|
('branch', 'func', 2, 2),
|
|
('line', 'check_events', 11)])
|
|
|
|
def test_except_star(self):
|
|
|
|
class Foo:
|
|
def meth(self):
|
|
pass
|
|
|
|
def func():
|
|
try:
|
|
try:
|
|
raise KeyError
|
|
except* Exception as e:
|
|
f = Foo(); f.meth()
|
|
except KeyError:
|
|
pass
|
|
|
|
|
|
self.check_events(func, recorders = JUMP_BRANCH_AND_LINE_RECORDERS, expected = [
|
|
('line', 'check_events', 10),
|
|
('line', 'func', 1),
|
|
('line', 'func', 2),
|
|
('line', 'func', 3),
|
|
('line', 'func', 4),
|
|
('branch', 'func', 4, 4),
|
|
('line', 'func', 5),
|
|
('line', 'meth', 1),
|
|
('jump', 'func', 5, 5),
|
|
('jump', 'func', 5, '[offset=114]'),
|
|
('branch', 'func', '[offset=120]', '[offset=122]'),
|
|
('line', 'check_events', 11)])
|
|
|
|
self.check_events(func, recorders = FLOW_AND_LINE_RECORDERS, expected = [
|
|
('line', 'check_events', 10),
|
|
('line', 'func', 1),
|
|
('line', 'func', 2),
|
|
('line', 'func', 3),
|
|
('raise', KeyError),
|
|
('line', 'func', 4),
|
|
('branch', 'func', 4, 4),
|
|
('line', 'func', 5),
|
|
('line', 'meth', 1),
|
|
('return', None),
|
|
('jump', 'func', 5, 5),
|
|
('jump', 'func', 5, '[offset=114]'),
|
|
('branch', 'func', '[offset=120]', '[offset=122]'),
|
|
('return', None),
|
|
('line', 'check_events', 11)])
|
|
|
|
class TestSetGetEvents(MonitoringTestBase, unittest.TestCase):
|
|
|
|
def test_global(self):
|
|
sys.monitoring.set_events(TEST_TOOL, E.PY_START)
|
|
self.assertEqual(sys.monitoring.get_events(TEST_TOOL), E.PY_START)
|
|
sys.monitoring.set_events(TEST_TOOL2, E.PY_START)
|
|
self.assertEqual(sys.monitoring.get_events(TEST_TOOL2), E.PY_START)
|
|
sys.monitoring.set_events(TEST_TOOL, 0)
|
|
self.assertEqual(sys.monitoring.get_events(TEST_TOOL), 0)
|
|
sys.monitoring.set_events(TEST_TOOL2,0)
|
|
self.assertEqual(sys.monitoring.get_events(TEST_TOOL2), 0)
|
|
|
|
def test_local(self):
|
|
code = f1.__code__
|
|
sys.monitoring.set_local_events(TEST_TOOL, code, E.PY_START)
|
|
self.assertEqual(sys.monitoring.get_local_events(TEST_TOOL, code), E.PY_START)
|
|
sys.monitoring.set_local_events(TEST_TOOL2, code, E.PY_START)
|
|
self.assertEqual(sys.monitoring.get_local_events(TEST_TOOL2, code), E.PY_START)
|
|
sys.monitoring.set_local_events(TEST_TOOL, code, 0)
|
|
self.assertEqual(sys.monitoring.get_local_events(TEST_TOOL, code), 0)
|
|
sys.monitoring.set_local_events(TEST_TOOL2, code, 0)
|
|
self.assertEqual(sys.monitoring.get_local_events(TEST_TOOL2, code), 0)
|
|
|
|
class TestUninitialized(unittest.TestCase, MonitoringTestBase):
|
|
|
|
@staticmethod
|
|
def f():
|
|
pass
|
|
|
|
def test_get_local_events_uninitialized(self):
|
|
self.assertEqual(sys.monitoring.get_local_events(TEST_TOOL, self.f.__code__), 0)
|