mirror of
https://github.com/python/cpython.git
synced 2025-08-04 08:59:19 +00:00
gh-112278: Add retry in WMI tests in case of slow initialization (GH-113154)
This commit is contained in:
parent
8f8f0f97e1
commit
d1a2adfb08
1 changed files with 19 additions and 7 deletions
|
@ -1,17 +1,29 @@
|
|||
# Test the internal _wmi module on Windows
|
||||
# This is used by the platform module, and potentially others
|
||||
|
||||
import time
|
||||
import unittest
|
||||
from test.support import import_helper, requires_resource
|
||||
from test.support import import_helper, requires_resource, LOOPBACK_TIMEOUT
|
||||
|
||||
|
||||
# Do this first so test will be skipped if module doesn't exist
|
||||
_wmi = import_helper.import_module('_wmi', required_on=['win'])
|
||||
|
||||
|
||||
def wmi_exec_query(query):
|
||||
# gh-112278: WMI maybe slow response when first call.
|
||||
try:
|
||||
return _wmi.exec_query(query)
|
||||
except WindowsError as e:
|
||||
if e.winerror != 258:
|
||||
raise
|
||||
time.sleep(LOOPBACK_TIMEOUT)
|
||||
return _wmi.exec_query(query)
|
||||
|
||||
|
||||
class WmiTests(unittest.TestCase):
|
||||
def test_wmi_query_os_version(self):
|
||||
r = _wmi.exec_query("SELECT Version FROM Win32_OperatingSystem").split("\0")
|
||||
r = wmi_exec_query("SELECT Version FROM Win32_OperatingSystem").split("\0")
|
||||
self.assertEqual(1, len(r))
|
||||
k, eq, v = r[0].partition("=")
|
||||
self.assertEqual("=", eq, r[0])
|
||||
|
@ -28,7 +40,7 @@ class WmiTests(unittest.TestCase):
|
|||
def test_wmi_query_error(self):
|
||||
# Invalid queries fail with OSError
|
||||
try:
|
||||
_wmi.exec_query("SELECT InvalidColumnName FROM InvalidTableName")
|
||||
wmi_exec_query("SELECT InvalidColumnName FROM InvalidTableName")
|
||||
except OSError as ex:
|
||||
if ex.winerror & 0xFFFFFFFF == 0x80041010:
|
||||
# This is the expected error code. All others should fail the test
|
||||
|
@ -42,7 +54,7 @@ class WmiTests(unittest.TestCase):
|
|||
def test_wmi_query_not_select(self):
|
||||
# Queries other than SELECT are blocked to avoid potential exploits
|
||||
with self.assertRaises(ValueError):
|
||||
_wmi.exec_query("not select, just in case someone tries something")
|
||||
wmi_exec_query("not select, just in case someone tries something")
|
||||
|
||||
@requires_resource('cpu')
|
||||
def test_wmi_query_overflow(self):
|
||||
|
@ -50,11 +62,11 @@ class WmiTests(unittest.TestCase):
|
|||
# Test multiple times to ensure consistency
|
||||
for _ in range(2):
|
||||
with self.assertRaises(OSError):
|
||||
_wmi.exec_query("SELECT * FROM CIM_DataFile")
|
||||
wmi_exec_query("SELECT * FROM CIM_DataFile")
|
||||
|
||||
def test_wmi_query_multiple_rows(self):
|
||||
# Multiple instances should have an extra null separator
|
||||
r = _wmi.exec_query("SELECT ProcessId FROM Win32_Process WHERE ProcessId < 1000")
|
||||
r = wmi_exec_query("SELECT ProcessId FROM Win32_Process WHERE ProcessId < 1000")
|
||||
self.assertFalse(r.startswith("\0"), r)
|
||||
self.assertFalse(r.endswith("\0"), r)
|
||||
it = iter(r.split("\0"))
|
||||
|
@ -69,6 +81,6 @@ class WmiTests(unittest.TestCase):
|
|||
from concurrent.futures import ThreadPoolExecutor
|
||||
query = "SELECT ProcessId FROM Win32_Process WHERE ProcessId < 1000"
|
||||
with ThreadPoolExecutor(4) as pool:
|
||||
task = [pool.submit(_wmi.exec_query, query) for _ in range(32)]
|
||||
task = [pool.submit(wmi_exec_query, query) for _ in range(32)]
|
||||
for t in task:
|
||||
self.assertRegex(t.result(), "ProcessId=")
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue