mirror of
https://github.com/python/cpython.git
synced 2025-10-24 07:26:11 +00:00
576 lines
14 KiB
Python
576 lines
14 KiB
Python
"""This script contains the actual auditing tests.
|
|
|
|
It should not be imported directly, but should be run by the test_audit
|
|
module with arguments identifying each test.
|
|
|
|
"""
|
|
|
|
import contextlib
|
|
import os
|
|
import sys
|
|
|
|
|
|
class TestHook:
|
|
"""Used in standard hook tests to collect any logged events.
|
|
|
|
Should be used in a with block to ensure that it has no impact
|
|
after the test completes.
|
|
"""
|
|
|
|
def __init__(self, raise_on_events=None, exc_type=RuntimeError):
|
|
self.raise_on_events = raise_on_events or ()
|
|
self.exc_type = exc_type
|
|
self.seen = []
|
|
self.closed = False
|
|
|
|
def __enter__(self, *a):
|
|
sys.addaudithook(self)
|
|
return self
|
|
|
|
def __exit__(self, *a):
|
|
self.close()
|
|
|
|
def close(self):
|
|
self.closed = True
|
|
|
|
@property
|
|
def seen_events(self):
|
|
return [i[0] for i in self.seen]
|
|
|
|
def __call__(self, event, args):
|
|
if self.closed:
|
|
return
|
|
self.seen.append((event, args))
|
|
if event in self.raise_on_events:
|
|
raise self.exc_type("saw event " + event)
|
|
|
|
|
|
# Simple helpers, since we are not in unittest here
|
|
def assertEqual(x, y):
|
|
if x != y:
|
|
raise AssertionError(f"{x!r} should equal {y!r}")
|
|
|
|
|
|
def assertIn(el, series):
|
|
if el not in series:
|
|
raise AssertionError(f"{el!r} should be in {series!r}")
|
|
|
|
|
|
def assertNotIn(el, series):
|
|
if el in series:
|
|
raise AssertionError(f"{el!r} should not be in {series!r}")
|
|
|
|
|
|
def assertSequenceEqual(x, y):
|
|
if len(x) != len(y):
|
|
raise AssertionError(f"{x!r} should equal {y!r}")
|
|
if any(ix != iy for ix, iy in zip(x, y)):
|
|
raise AssertionError(f"{x!r} should equal {y!r}")
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def assertRaises(ex_type):
|
|
try:
|
|
yield
|
|
assert False, f"expected {ex_type}"
|
|
except BaseException as ex:
|
|
if isinstance(ex, AssertionError):
|
|
raise
|
|
assert type(ex) is ex_type, f"{ex} should be {ex_type}"
|
|
|
|
|
|
def test_basic():
|
|
with TestHook() as hook:
|
|
sys.audit("test_event", 1, 2, 3)
|
|
assertEqual(hook.seen[0][0], "test_event")
|
|
assertEqual(hook.seen[0][1], (1, 2, 3))
|
|
|
|
|
|
def test_block_add_hook():
|
|
# Raising an exception should prevent a new hook from being added,
|
|
# but will not propagate out.
|
|
with TestHook(raise_on_events="sys.addaudithook") as hook1:
|
|
with TestHook() as hook2:
|
|
sys.audit("test_event")
|
|
assertIn("test_event", hook1.seen_events)
|
|
assertNotIn("test_event", hook2.seen_events)
|
|
|
|
|
|
def test_block_add_hook_baseexception():
|
|
# Raising BaseException will propagate out when adding a hook
|
|
with assertRaises(BaseException):
|
|
with TestHook(
|
|
raise_on_events="sys.addaudithook", exc_type=BaseException
|
|
) as hook1:
|
|
# Adding this next hook should raise BaseException
|
|
with TestHook() as hook2:
|
|
pass
|
|
|
|
|
|
def test_marshal():
|
|
import marshal
|
|
o = ("a", "b", "c", 1, 2, 3)
|
|
payload = marshal.dumps(o)
|
|
|
|
with TestHook() as hook:
|
|
assertEqual(o, marshal.loads(marshal.dumps(o)))
|
|
|
|
try:
|
|
with open("test-marshal.bin", "wb") as f:
|
|
marshal.dump(o, f)
|
|
with open("test-marshal.bin", "rb") as f:
|
|
assertEqual(o, marshal.load(f))
|
|
finally:
|
|
os.unlink("test-marshal.bin")
|
|
|
|
actual = [(a[0], a[1]) for e, a in hook.seen if e == "marshal.dumps"]
|
|
assertSequenceEqual(actual, [(o, marshal.version)] * 2)
|
|
|
|
actual = [a[0] for e, a in hook.seen if e == "marshal.loads"]
|
|
assertSequenceEqual(actual, [payload])
|
|
|
|
actual = [e for e, a in hook.seen if e == "marshal.load"]
|
|
assertSequenceEqual(actual, ["marshal.load"])
|
|
|
|
|
|
def test_pickle():
|
|
import pickle
|
|
|
|
class PicklePrint:
|
|
def __reduce_ex__(self, p):
|
|
return str, ("Pwned!",)
|
|
|
|
payload_1 = pickle.dumps(PicklePrint())
|
|
payload_2 = pickle.dumps(("a", "b", "c", 1, 2, 3))
|
|
|
|
# Before we add the hook, ensure our malicious pickle loads
|
|
assertEqual("Pwned!", pickle.loads(payload_1))
|
|
|
|
with TestHook(raise_on_events="pickle.find_class") as hook:
|
|
with assertRaises(RuntimeError):
|
|
# With the hook enabled, loading globals is not allowed
|
|
pickle.loads(payload_1)
|
|
# pickles with no globals are okay
|
|
pickle.loads(payload_2)
|
|
|
|
|
|
def test_monkeypatch():
|
|
class A:
|
|
pass
|
|
|
|
class B:
|
|
pass
|
|
|
|
class C(A):
|
|
pass
|
|
|
|
a = A()
|
|
|
|
with TestHook() as hook:
|
|
# Catch name changes
|
|
C.__name__ = "X"
|
|
# Catch type changes
|
|
C.__bases__ = (B,)
|
|
# Ensure bypassing __setattr__ is still caught
|
|
type.__dict__["__bases__"].__set__(C, (B,))
|
|
# Catch attribute replacement
|
|
C.__init__ = B.__init__
|
|
# Catch attribute addition
|
|
C.new_attr = 123
|
|
# Catch class changes
|
|
a.__class__ = B
|
|
|
|
actual = [(a[0], a[1]) for e, a in hook.seen if e == "object.__setattr__"]
|
|
assertSequenceEqual(
|
|
[(C, "__name__"), (C, "__bases__"), (C, "__bases__"), (a, "__class__")], actual
|
|
)
|
|
|
|
|
|
def test_open(testfn):
|
|
# SSLContext.load_dh_params uses _Py_fopen_obj rather than normal open()
|
|
try:
|
|
import ssl
|
|
|
|
load_dh_params = ssl.create_default_context().load_dh_params
|
|
except ImportError:
|
|
load_dh_params = None
|
|
|
|
# Try a range of "open" functions.
|
|
# All of them should fail
|
|
with TestHook(raise_on_events={"open"}) as hook:
|
|
for fn, *args in [
|
|
(open, testfn, "r"),
|
|
(open, sys.executable, "rb"),
|
|
(open, 3, "wb"),
|
|
(open, testfn, "w", -1, None, None, None, False, lambda *a: 1),
|
|
(load_dh_params, testfn),
|
|
]:
|
|
if not fn:
|
|
continue
|
|
with assertRaises(RuntimeError):
|
|
fn(*args)
|
|
|
|
actual_mode = [(a[0], a[1]) for e, a in hook.seen if e == "open" and a[1]]
|
|
actual_flag = [(a[0], a[2]) for e, a in hook.seen if e == "open" and not a[1]]
|
|
assertSequenceEqual(
|
|
[
|
|
i
|
|
for i in [
|
|
(testfn, "r"),
|
|
(sys.executable, "r"),
|
|
(3, "w"),
|
|
(testfn, "w"),
|
|
(testfn, "rb") if load_dh_params else None,
|
|
]
|
|
if i is not None
|
|
],
|
|
actual_mode,
|
|
)
|
|
assertSequenceEqual([], actual_flag)
|
|
|
|
|
|
def test_cantrace():
|
|
traced = []
|
|
|
|
def trace(frame, event, *args):
|
|
if frame.f_code == TestHook.__call__.__code__:
|
|
traced.append(event)
|
|
|
|
old = sys.settrace(trace)
|
|
try:
|
|
with TestHook() as hook:
|
|
# No traced call
|
|
eval("1")
|
|
|
|
# No traced call
|
|
hook.__cantrace__ = False
|
|
eval("2")
|
|
|
|
# One traced call
|
|
hook.__cantrace__ = True
|
|
eval("3")
|
|
|
|
# Two traced calls (writing to private member, eval)
|
|
hook.__cantrace__ = 1
|
|
eval("4")
|
|
|
|
# One traced call (writing to private member)
|
|
hook.__cantrace__ = 0
|
|
finally:
|
|
sys.settrace(old)
|
|
|
|
assertSequenceEqual(["call"] * 4, traced)
|
|
|
|
|
|
def test_mmap():
|
|
import mmap
|
|
|
|
with TestHook() as hook:
|
|
mmap.mmap(-1, 8)
|
|
assertEqual(hook.seen[0][1][:2], (-1, 8))
|
|
|
|
|
|
def test_excepthook():
|
|
def excepthook(exc_type, exc_value, exc_tb):
|
|
if exc_type is not RuntimeError:
|
|
sys.__excepthook__(exc_type, exc_value, exc_tb)
|
|
|
|
def hook(event, args):
|
|
if event == "sys.excepthook":
|
|
if not isinstance(args[2], args[1]):
|
|
raise TypeError(f"Expected isinstance({args[2]!r}, " f"{args[1]!r})")
|
|
if args[0] != excepthook:
|
|
raise ValueError(f"Expected {args[0]} == {excepthook}")
|
|
print(event, repr(args[2]))
|
|
|
|
sys.addaudithook(hook)
|
|
sys.excepthook = excepthook
|
|
raise RuntimeError("fatal-error")
|
|
|
|
|
|
def test_unraisablehook():
|
|
from _testcapi import err_formatunraisable
|
|
|
|
def unraisablehook(hookargs):
|
|
pass
|
|
|
|
def hook(event, args):
|
|
if event == "sys.unraisablehook":
|
|
if args[0] != unraisablehook:
|
|
raise ValueError(f"Expected {args[0]} == {unraisablehook}")
|
|
print(event, repr(args[1].exc_value), args[1].err_msg)
|
|
|
|
sys.addaudithook(hook)
|
|
sys.unraisablehook = unraisablehook
|
|
err_formatunraisable(RuntimeError("nonfatal-error"),
|
|
"Exception ignored for audit hook test")
|
|
|
|
|
|
def test_winreg():
|
|
from winreg import OpenKey, EnumKey, CloseKey, HKEY_LOCAL_MACHINE
|
|
|
|
def hook(event, args):
|
|
if not event.startswith("winreg."):
|
|
return
|
|
print(event, *args)
|
|
|
|
sys.addaudithook(hook)
|
|
|
|
k = OpenKey(HKEY_LOCAL_MACHINE, "Software")
|
|
EnumKey(k, 0)
|
|
try:
|
|
EnumKey(k, 10000)
|
|
except OSError:
|
|
pass
|
|
else:
|
|
raise RuntimeError("Expected EnumKey(HKLM, 10000) to fail")
|
|
|
|
kv = k.Detach()
|
|
CloseKey(kv)
|
|
|
|
|
|
def test_socket():
|
|
import socket
|
|
|
|
def hook(event, args):
|
|
if event.startswith("socket."):
|
|
print(event, *args)
|
|
|
|
sys.addaudithook(hook)
|
|
|
|
socket.gethostname()
|
|
|
|
# Don't care if this fails, we just want the audit message
|
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
try:
|
|
# Don't care if this fails, we just want the audit message
|
|
sock.bind(('127.0.0.1', 8080))
|
|
except Exception:
|
|
pass
|
|
finally:
|
|
sock.close()
|
|
|
|
|
|
def test_gc():
|
|
import gc
|
|
|
|
def hook(event, args):
|
|
if event.startswith("gc."):
|
|
print(event, *args)
|
|
|
|
sys.addaudithook(hook)
|
|
|
|
gc.get_objects(generation=1)
|
|
|
|
x = object()
|
|
y = [x]
|
|
|
|
gc.get_referrers(x)
|
|
gc.get_referents(y)
|
|
|
|
|
|
def test_http_client():
|
|
import http.client
|
|
|
|
def hook(event, args):
|
|
if event.startswith("http.client."):
|
|
print(event, *args[1:])
|
|
|
|
sys.addaudithook(hook)
|
|
|
|
conn = http.client.HTTPConnection('www.python.org')
|
|
try:
|
|
conn.request('GET', '/')
|
|
except OSError:
|
|
print('http.client.send', '[cannot send]')
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
def test_sqlite3():
|
|
import sqlite3
|
|
|
|
def hook(event, *args):
|
|
if event.startswith("sqlite3."):
|
|
print(event, *args)
|
|
|
|
sys.addaudithook(hook)
|
|
cx1 = sqlite3.connect(":memory:")
|
|
cx2 = sqlite3.Connection(":memory:")
|
|
|
|
# Configured without --enable-loadable-sqlite-extensions
|
|
try:
|
|
if hasattr(sqlite3.Connection, "enable_load_extension"):
|
|
cx1.enable_load_extension(False)
|
|
try:
|
|
cx1.load_extension("test")
|
|
except sqlite3.OperationalError:
|
|
pass
|
|
else:
|
|
raise RuntimeError("Expected sqlite3.load_extension to fail")
|
|
finally:
|
|
cx1.close()
|
|
cx2.close()
|
|
|
|
def test_sys_getframe():
|
|
import sys
|
|
|
|
def hook(event, args):
|
|
if event.startswith("sys."):
|
|
print(event, args[0].f_code.co_name)
|
|
|
|
sys.addaudithook(hook)
|
|
sys._getframe()
|
|
|
|
|
|
def test_sys_getframemodulename():
|
|
import sys
|
|
|
|
def hook(event, args):
|
|
if event.startswith("sys."):
|
|
print(event, *args)
|
|
|
|
sys.addaudithook(hook)
|
|
sys._getframemodulename()
|
|
|
|
|
|
def test_threading():
|
|
import _thread
|
|
|
|
def hook(event, args):
|
|
if event.startswith(("_thread.", "cpython.PyThreadState", "test.")):
|
|
print(event, args)
|
|
|
|
sys.addaudithook(hook)
|
|
|
|
lock = _thread.allocate_lock()
|
|
lock.acquire()
|
|
|
|
class test_func:
|
|
def __repr__(self): return "<test_func>"
|
|
def __call__(self):
|
|
sys.audit("test.test_func")
|
|
lock.release()
|
|
|
|
i = _thread.start_new_thread(test_func(), ())
|
|
lock.acquire()
|
|
|
|
handle = _thread.start_joinable_thread(test_func())
|
|
handle.join()
|
|
|
|
|
|
def test_threading_abort():
|
|
# Ensures that aborting PyThreadState_New raises the correct exception
|
|
import _thread
|
|
|
|
class ThreadNewAbortError(Exception):
|
|
pass
|
|
|
|
def hook(event, args):
|
|
if event == "cpython.PyThreadState_New":
|
|
raise ThreadNewAbortError()
|
|
|
|
sys.addaudithook(hook)
|
|
|
|
try:
|
|
_thread.start_new_thread(lambda: None, ())
|
|
except ThreadNewAbortError:
|
|
# Other exceptions are raised and the test will fail
|
|
pass
|
|
|
|
|
|
def test_wmi_exec_query():
|
|
import _wmi
|
|
|
|
def hook(event, args):
|
|
if event.startswith("_wmi."):
|
|
print(event, args[0])
|
|
|
|
sys.addaudithook(hook)
|
|
try:
|
|
_wmi.exec_query("SELECT * FROM Win32_OperatingSystem")
|
|
except WindowsError as e:
|
|
# gh-112278: WMI may be slow response when first called, but we still
|
|
# get the audit event, so just ignore the timeout
|
|
if e.winerror != 258:
|
|
raise
|
|
|
|
def test_syslog():
|
|
import syslog
|
|
|
|
def hook(event, args):
|
|
if event.startswith("syslog."):
|
|
print(event, *args)
|
|
|
|
sys.addaudithook(hook)
|
|
syslog.openlog('python')
|
|
syslog.syslog('test')
|
|
syslog.setlogmask(syslog.LOG_DEBUG)
|
|
syslog.closelog()
|
|
# implicit open
|
|
syslog.syslog('test2')
|
|
# open with default ident
|
|
syslog.openlog(logoption=syslog.LOG_NDELAY, facility=syslog.LOG_LOCAL0)
|
|
sys.argv = None
|
|
syslog.openlog()
|
|
syslog.closelog()
|
|
|
|
|
|
def test_not_in_gc():
|
|
import gc
|
|
|
|
hook = lambda *a: None
|
|
sys.addaudithook(hook)
|
|
|
|
for o in gc.get_objects():
|
|
if isinstance(o, list):
|
|
assert hook not in o
|
|
|
|
|
|
def test_time(mode):
|
|
import time
|
|
|
|
def hook(event, args):
|
|
if event.startswith("time."):
|
|
if mode == 'print':
|
|
print(event, *args)
|
|
elif mode == 'fail':
|
|
raise AssertionError('hook failed')
|
|
sys.addaudithook(hook)
|
|
|
|
time.sleep(0)
|
|
time.sleep(0.0625) # 1/16, a small exact float
|
|
try:
|
|
time.sleep(-1)
|
|
except ValueError:
|
|
pass
|
|
|
|
def test_sys_monitoring_register_callback():
|
|
import sys
|
|
|
|
def hook(event, args):
|
|
if event.startswith("sys.monitoring"):
|
|
print(event, args)
|
|
|
|
sys.addaudithook(hook)
|
|
sys.monitoring.register_callback(1, 1, None)
|
|
|
|
|
|
def test_winapi_createnamedpipe(pipe_name):
|
|
import _winapi
|
|
|
|
def hook(event, args):
|
|
if event == "_winapi.CreateNamedPipe":
|
|
print(event, args)
|
|
|
|
sys.addaudithook(hook)
|
|
_winapi.CreateNamedPipe(pipe_name, _winapi.PIPE_ACCESS_DUPLEX, 8, 2, 0, 0, 0, 0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
from test.support import suppress_msvcrt_asserts
|
|
|
|
suppress_msvcrt_asserts()
|
|
|
|
test = sys.argv[1]
|
|
globals()[test](*sys.argv[2:])
|