mirror of
https://github.com/python/cpython.git
synced 2025-07-30 22:54:16 +00:00

pstats is really useful or profiling and printing the output of the execution of some block of code, but I've found on multiple occasions when I'd like to access this output directly in an easily usable dictionary on which I can further analyze or manipulate. The proposal is to add a function called get_profile_dict inside of pstats that'll automatically return this data the data in an easily accessible dict. The output of the following script: ``` import cProfile, pstats import pprint from pstats import func_std_string, f8 def fib(n): if n == 0: return 0 if n == 1: return 1 return fib(n-1) + fib(n-2) pr = cProfile.Profile() pr.enable() fib(5) pr.create_stats() ps = pstats.Stats(pr).sort_stats('tottime', 'cumtime') def get_profile_dict(self, keys_filter=None): """ Returns a dict where the key is a function name and the value is a dict with the following keys: - ncalls - tottime - percall_tottime - cumtime - percall_cumtime - file_name - line_number keys_filter can be optionally set to limit the key-value pairs in the retrieved dict. """ pstats_dict = {} func_list = self.fcn_list[:] if self.fcn_list else list(self.stats.keys()) if not func_list: return pstats_dict pstats_dict["total_tt"] = float(f8(self.total_tt)) for func in func_list: cc, nc, tt, ct, callers = self.stats[func] file, line, func_name = func ncalls = str(nc) if nc == cc else (str(nc) + '/' + str(cc)) tottime = float(f8(tt)) percall_tottime = -1 if nc == 0 else float(f8(tt/nc)) cumtime = float(f8(ct)) percall_cumtime = -1 if cc == 0 else float(f8(ct/cc)) func_dict = { "ncalls": ncalls, "tottime": tottime, # time spent in this function alone "percall_tottime": percall_tottime, "cumtime": cumtime, # time spent in the function plus all functions that this function called, "percall_cumtime": percall_cumtime, "file_name": file, "line_number": line } func_dict_filtered = func_dict if not keys_filter else { key: func_dict[key] for key in keys_filter } pstats_dict[func_name] = func_dict_filtered return pstats_dict pp = pprint.PrettyPrinter(depth=6) pp.pprint(get_profile_dict(ps)) ``` will produce: ``` {"<method 'disable' of '_lsprof.Profiler' objects>": {'cumtime': 0.0, 'file_name': '~', 'line_number': 0, 'ncalls': '1', 'percall_cumtime': 0.0, 'percall_tottime': 0.0, 'tottime': 0.0}, 'create_stats': {'cumtime': 0.0, 'file_name': '/usr/local/Cellar/python/3.7.4/Frameworks/Python.framework/Versions/3.7/lib/python3.7/cProfile.py', 'line_number': 50, 'ncalls': '1', 'percall_cumtime': 0.0, 'percall_tottime': 0.0, 'tottime': 0.0}, 'fib': {'cumtime': 0.0, 'file_name': 'get_profile_dict.py', 'line_number': 5, 'ncalls': '15/1', 'percall_cumtime': 0.0, 'percall_tottime': 0.0, 'tottime': 0.0}, 'total_tt': 0.0} ``` As an example, this can be used to generate a stacked column chart using various visualization tools which will assist in easily identifying program bottlenecks. https://bugs.python.org/issue37958 Automerge-Triggered-By: @gpshead
100 lines
3.4 KiB
Python
100 lines
3.4 KiB
Python
import unittest
|
|
|
|
from test import support
|
|
from io import StringIO
|
|
from pstats import SortKey
|
|
|
|
import pstats
|
|
import time
|
|
import cProfile
|
|
|
|
class AddCallersTestCase(unittest.TestCase):
|
|
"""Tests for pstats.add_callers helper."""
|
|
|
|
def test_combine_results(self):
|
|
# pstats.add_callers should combine the call results of both target
|
|
# and source by adding the call time. See issue1269.
|
|
# new format: used by the cProfile module
|
|
target = {"a": (1, 2, 3, 4)}
|
|
source = {"a": (1, 2, 3, 4), "b": (5, 6, 7, 8)}
|
|
new_callers = pstats.add_callers(target, source)
|
|
self.assertEqual(new_callers, {'a': (2, 4, 6, 8), 'b': (5, 6, 7, 8)})
|
|
# old format: used by the profile module
|
|
target = {"a": 1}
|
|
source = {"a": 1, "b": 5}
|
|
new_callers = pstats.add_callers(target, source)
|
|
self.assertEqual(new_callers, {'a': 2, 'b': 5})
|
|
|
|
|
|
class StatsTestCase(unittest.TestCase):
|
|
def setUp(self):
|
|
stats_file = support.findfile('pstats.pck')
|
|
self.stats = pstats.Stats(stats_file)
|
|
|
|
def test_add(self):
|
|
stream = StringIO()
|
|
stats = pstats.Stats(stream=stream)
|
|
stats.add(self.stats, self.stats)
|
|
|
|
def test_sort_stats_int(self):
|
|
valid_args = {-1: 'stdname',
|
|
0: 'calls',
|
|
1: 'time',
|
|
2: 'cumulative'}
|
|
for arg_int, arg_str in valid_args.items():
|
|
self.stats.sort_stats(arg_int)
|
|
self.assertEqual(self.stats.sort_type,
|
|
self.stats.sort_arg_dict_default[arg_str][-1])
|
|
|
|
def test_sort_stats_string(self):
|
|
for sort_name in ['calls', 'ncalls', 'cumtime', 'cumulative',
|
|
'filename', 'line', 'module', 'name', 'nfl', 'pcalls',
|
|
'stdname', 'time', 'tottime']:
|
|
self.stats.sort_stats(sort_name)
|
|
self.assertEqual(self.stats.sort_type,
|
|
self.stats.sort_arg_dict_default[sort_name][-1])
|
|
|
|
def test_sort_stats_partial(self):
|
|
sortkey = 'filename'
|
|
for sort_name in ['f', 'fi', 'fil', 'file', 'filen', 'filena',
|
|
'filenam', 'filename']:
|
|
self.stats.sort_stats(sort_name)
|
|
self.assertEqual(self.stats.sort_type,
|
|
self.stats.sort_arg_dict_default[sortkey][-1])
|
|
|
|
def test_sort_stats_enum(self):
|
|
for member in SortKey:
|
|
self.stats.sort_stats(member)
|
|
self.assertEqual(
|
|
self.stats.sort_type,
|
|
self.stats.sort_arg_dict_default[member.value][-1])
|
|
|
|
def test_sort_starts_mix(self):
|
|
self.assertRaises(TypeError, self.stats.sort_stats,
|
|
'calls',
|
|
SortKey.TIME)
|
|
self.assertRaises(TypeError, self.stats.sort_stats,
|
|
SortKey.TIME,
|
|
'calls')
|
|
|
|
def test_get_stats_profile(self):
|
|
def pass1(): pass
|
|
def pass2(): pass
|
|
def pass3(): pass
|
|
|
|
pr = cProfile.Profile()
|
|
pr.enable()
|
|
pass1()
|
|
pass2()
|
|
pass3()
|
|
pr.create_stats()
|
|
ps = pstats.Stats(pr)
|
|
|
|
stats_profile = ps.get_stats_profile()
|
|
funcs_called = set(stats_profile.func_profiles.keys())
|
|
self.assertIn('pass1', funcs_called)
|
|
self.assertIn('pass2', funcs_called)
|
|
self.assertIn('pass3', funcs_called)
|
|
|
|
if __name__ == "__main__":
|
|
unittest.main()
|