cpython/Lib/test/libregrtest/win_utils.py
Jeremy Kloth 19058b9f62
bpo-44336: Prevent tests hanging on child process handles on Windows (GH-26578)
Replace the child process `typeperf.exe` with a daemon thread that reads the performance counters directly.  This prevents the issues that arise from inherited handles in grandchild processes (see issue37531 for discussion).

We only use the load tracker when running tests in multiprocess mode. This prevents inadvertent interactions with tests expecting a single threaded environment.  Displaying load is really only helpful for buildbots running in multiprocess mode anyway.
2022-03-22 00:06:55 +00:00

124 lines
4.4 KiB
Python

import _overlapped
import _thread
import _winapi
import math
import struct
import winreg
# Seconds per measurement
SAMPLING_INTERVAL = 1
# Exponential damping factor to compute exponentially weighted moving average
# on 1 minute (60 seconds)
LOAD_FACTOR_1 = 1 / math.exp(SAMPLING_INTERVAL / 60)
# Initialize the load using the arithmetic mean of the first NVALUE values
# of the Processor Queue Length
NVALUE = 5
class WindowsLoadTracker():
"""
This class asynchronously reads the performance counters to calculate
the system load on Windows. A "raw" thread is used here to prevent
interference with the test suite's cases for the threading module.
"""
def __init__(self):
# Pre-flight test for access to the performance data;
# `PermissionError` will be raised if not allowed
winreg.QueryInfoKey(winreg.HKEY_PERFORMANCE_DATA)
self._values = []
self._load = None
self._running = _overlapped.CreateEvent(None, True, False, None)
self._stopped = _overlapped.CreateEvent(None, True, False, None)
_thread.start_new_thread(self._update_load, (), {})
def _update_load(self,
# localize module access to prevent shutdown errors
_wait=_winapi.WaitForSingleObject,
_signal=_overlapped.SetEvent):
# run until signaled to stop
while _wait(self._running, 1000):
self._calculate_load()
# notify stopped
_signal(self._stopped)
def _calculate_load(self,
# localize module access to prevent shutdown errors
_query=winreg.QueryValueEx,
_hkey=winreg.HKEY_PERFORMANCE_DATA,
_unpack=struct.unpack_from):
# get the 'System' object
data, _ = _query(_hkey, '2')
# PERF_DATA_BLOCK {
# WCHAR Signature[4] 8 +
# DWOWD LittleEndian 4 +
# DWORD Version 4 +
# DWORD Revision 4 +
# DWORD TotalByteLength 4 +
# DWORD HeaderLength = 24 byte offset
# ...
# }
obj_start, = _unpack('L', data, 24)
# PERF_OBJECT_TYPE {
# DWORD TotalByteLength
# DWORD DefinitionLength
# DWORD HeaderLength
# ...
# }
data_start, defn_start = _unpack('4xLL', data, obj_start)
data_base = obj_start + data_start
defn_base = obj_start + defn_start
# find the 'Processor Queue Length' counter (index=44)
while defn_base < data_base:
# PERF_COUNTER_DEFINITION {
# DWORD ByteLength
# DWORD CounterNameTitleIndex
# ... [7 DWORDs/28 bytes]
# DWORD CounterOffset
# }
size, idx, offset = _unpack('LL28xL', data, defn_base)
defn_base += size
if idx == 44:
counter_offset = data_base + offset
# the counter is known to be PERF_COUNTER_RAWCOUNT (DWORD)
processor_queue_length, = _unpack('L', data, counter_offset)
break
else:
return
# We use an exponentially weighted moving average, imitating the
# load calculation on Unix systems.
# https://en.wikipedia.org/wiki/Load_(computing)#Unix-style_load_calculation
# https://en.wikipedia.org/wiki/Moving_average#Exponential_moving_average
if self._load is not None:
self._load = (self._load * LOAD_FACTOR_1
+ processor_queue_length * (1.0 - LOAD_FACTOR_1))
elif len(self._values) < NVALUE:
self._values.append(processor_queue_length)
else:
self._load = sum(self._values) / len(self._values)
def close(self, kill=True):
self.__del__()
return
def __del__(self,
# localize module access to prevent shutdown errors
_wait=_winapi.WaitForSingleObject,
_close=_winapi.CloseHandle,
_signal=_overlapped.SetEvent):
if self._running is not None:
# tell the update thread to quit
_signal(self._running)
# wait for the update thread to signal done
_wait(self._stopped, -1)
# cleanup events
_close(self._running)
_close(self._stopped)
self._running = self._stopped = None
def getloadavg(self):
return self._load