GH-121970: Extract `audit_events` into a new extension (#122325)

This commit is contained in:
Adam Turner 2024-07-30 04:49:00 +01:00 committed by GitHub
parent ac8da34621
commit 11ad731f4f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 263 additions and 207 deletions

View file

@ -20,6 +20,7 @@ from pyspecific import SOURCE_URI
# --------------------- # ---------------------
extensions = [ extensions = [
'audit_events',
'c_annotations', 'c_annotations',
'glossary_search', 'glossary_search',
'lexers', 'lexers',

View file

@ -0,0 +1,262 @@
"""Support for documenting audit events."""
from __future__ import annotations
import re
from typing import TYPE_CHECKING
from docutils import nodes
from sphinx.errors import NoUri
from sphinx.locale import _ as sphinx_gettext
from sphinx.transforms.post_transforms import SphinxPostTransform
from sphinx.util import logging
from sphinx.util.docutils import SphinxDirective
if TYPE_CHECKING:
from collections.abc import Iterator
from sphinx.application import Sphinx
from sphinx.builders import Builder
from sphinx.environment import BuildEnvironment
logger = logging.getLogger(__name__)
# This list of sets are allowable synonyms for event argument names.
# If two names are in the same set, they are treated as equal for the
# purposes of warning. This won't help if the number of arguments is
# different!
_SYNONYMS = [
frozenset({"file", "path", "fd"}),
]
class AuditEvents:
def __init__(self) -> None:
self.events: dict[str, list[str]] = {}
self.sources: dict[str, list[tuple[str, str]]] = {}
def __iter__(self) -> Iterator[tuple[str, list[str], tuple[str, str]]]:
for name, args in self.events.items():
for source in self.sources[name]:
yield name, args, source
def add_event(
self, name, args: list[str], source: tuple[str, str]
) -> None:
if name in self.events:
self._check_args_match(name, args)
else:
self.events[name] = args
self.sources.setdefault(name, []).append(source)
def _check_args_match(self, name: str, args: list[str]) -> None:
current_args = self.events[name]
msg = (
f"Mismatched arguments for audit-event {name}: "
f"{current_args!r} != {args!r}"
)
if current_args == args:
return
if len(current_args) != len(args):
logger.warning(msg)
return
for a1, a2 in zip(current_args, args, strict=False):
if a1 == a2:
continue
if any(a1 in s and a2 in s for s in _SYNONYMS):
continue
logger.warning(msg)
return
def id_for(self, name) -> str:
source_count = len(self.sources.get(name, ()))
name_clean = re.sub(r"\W", "_", name)
return f"audit_event_{name_clean}_{source_count}"
def rows(self) -> Iterator[tuple[str, list[str], list[tuple[str, str]]]]:
for name in sorted(self.events.keys()):
yield name, self.events[name], self.sources[name]
def initialise_audit_events(app: Sphinx) -> None:
"""Initialise the audit_events attribute on the environment."""
if not hasattr(app.env, "audit_events"):
app.env.audit_events = AuditEvents()
def audit_events_purge(
app: Sphinx, env: BuildEnvironment, docname: str
) -> None:
"""This is to remove traces of removed documents from env.audit_events."""
fresh_audit_events = AuditEvents()
for name, args, (doc, target) in env.audit_events:
if doc != docname:
fresh_audit_events.add_event(name, args, (doc, target))
def audit_events_merge(
app: Sphinx,
env: BuildEnvironment,
docnames: list[str],
other: BuildEnvironment,
) -> None:
"""In Sphinx parallel builds, this merges audit_events from subprocesses."""
for name, args, source in other.audit_events:
env.audit_events.add_event(name, args, source)
class AuditEvent(SphinxDirective):
has_content = True
required_arguments = 1
optional_arguments = 2
final_argument_whitespace = True
_label = [
sphinx_gettext(
"Raises an :ref:`auditing event <auditing>` "
"{name} with no arguments."
),
sphinx_gettext(
"Raises an :ref:`auditing event <auditing>` "
"{name} with argument {args}."
),
sphinx_gettext(
"Raises an :ref:`auditing event <auditing>` "
"{name} with arguments {args}."
),
]
def run(self) -> list[nodes.paragraph]:
name = self.arguments[0]
if len(self.arguments) >= 2 and self.arguments[1]:
args = [
arg
for argument in self.arguments[1].strip("'\"").split(",")
if (arg := argument.strip())
]
else:
args = []
ids = []
try:
target = self.arguments[2].strip("\"'")
except (IndexError, TypeError):
target = None
if not target:
target = self.env.audit_events.id_for(name)
ids.append(target)
self.env.audit_events.add_event(name, args, (self.env.docname, target))
node = nodes.paragraph("", classes=["audit-hook"], ids=ids)
self.set_source_info(node)
if self.content:
self.state.nested_parse(self.content, self.content_offset, node)
else:
num_args = min(2, len(args))
text = self._label[num_args].format(
name=f"``{name}``",
args=", ".join(f"``{a}``" for a in args),
)
parsed, messages = self.state.inline_text(text, self.lineno)
node += parsed
node += messages
return [node]
class audit_event_list(nodes.General, nodes.Element): # noqa: N801
pass
class AuditEventListDirective(SphinxDirective):
def run(self) -> list[audit_event_list]:
return [audit_event_list()]
class AuditEventListTransform(SphinxPostTransform):
default_priority = 500
def run(self) -> None:
if self.document.next_node(audit_event_list) is None:
return
table = self._make_table(self.app.builder, self.env.docname)
for node in self.document.findall(audit_event_list):
node.replace_self(table)
def _make_table(self, builder: Builder, docname: str) -> nodes.table:
table = nodes.table(cols=3)
group = nodes.tgroup(
"",
nodes.colspec(colwidth=30),
nodes.colspec(colwidth=55),
nodes.colspec(colwidth=15),
cols=3,
)
head = nodes.thead()
body = nodes.tbody()
table += group
group += head
group += body
head += nodes.row(
"",
nodes.entry("", nodes.paragraph("", "Audit event")),
nodes.entry("", nodes.paragraph("", "Arguments")),
nodes.entry("", nodes.paragraph("", "References")),
)
for name, args, sources in builder.env.audit_events.rows():
body += self._make_row(builder, docname, name, args, sources)
return table
@staticmethod
def _make_row(
builder: Builder,
docname: str,
name: str,
args: list[str],
sources: list[tuple[str, str]],
) -> nodes.row:
row = nodes.row()
name_node = nodes.paragraph("", nodes.Text(name))
row += nodes.entry("", name_node)
args_node = nodes.paragraph()
for arg in args:
args_node += nodes.literal(arg, arg)
args_node += nodes.Text(", ")
if len(args_node.children) > 0:
args_node.children.pop() # remove trailing comma
row += nodes.entry("", args_node)
backlinks_node = nodes.paragraph()
backlinks = enumerate(sorted(set(sources)), start=1)
for i, (doc, label) in backlinks:
if isinstance(label, str):
ref = nodes.reference("", f"[{i}]", internal=True)
try:
target = (
f"{builder.get_relative_uri(docname, doc)}#{label}"
)
except NoUri:
continue
else:
ref["refuri"] = target
backlinks_node += ref
row += nodes.entry("", backlinks_node)
return row
def setup(app: Sphinx):
app.add_directive("audit-event", AuditEvent)
app.add_directive("audit-event-table", AuditEventListDirective)
app.add_post_transform(AuditEventListTransform)
app.connect("builder-inited", initialise_audit_events)
app.connect("env-purge-doc", audit_events_purge)
app.connect("env-merge-info", audit_events_merge)
return {
"version": "1.0",
"parallel_read_safe": True,
"parallel_write_safe": True,
}

View file

@ -15,7 +15,6 @@ from os import getenv, path
from time import asctime from time import asctime
from pprint import pformat from pprint import pformat
import sphinx
from docutils import nodes from docutils import nodes
from docutils.io import StringOutput from docutils.io import StringOutput
from docutils.parsers.rst import directives from docutils.parsers.rst import directives
@ -24,7 +23,6 @@ from sphinx import addnodes
from sphinx.builders import Builder from sphinx.builders import Builder
from sphinx.domains.changeset import VersionChange, versionlabels, versionlabel_classes from sphinx.domains.changeset import VersionChange, versionlabels, versionlabel_classes
from sphinx.domains.python import PyFunction, PyMethod, PyModule from sphinx.domains.python import PyFunction, PyMethod, PyModule
from sphinx.errors import NoUri
from sphinx.locale import _ as sphinx_gettext from sphinx.locale import _ as sphinx_gettext
from sphinx.util import logging from sphinx.util import logging
from sphinx.util.docutils import SphinxDirective from sphinx.util.docutils import SphinxDirective
@ -184,142 +182,6 @@ class Availability(SphinxDirective):
return platforms return platforms
# Support for documenting audit event
def audit_events_purge(app, env, docname):
"""This is to remove from env.all_audit_events old traces of removed
documents.
"""
if not hasattr(env, 'all_audit_events'):
return
fresh_all_audit_events = {}
for name, event in env.all_audit_events.items():
event["source"] = [(d, t) for d, t in event["source"] if d != docname]
if event["source"]:
# Only keep audit_events that have at least one source.
fresh_all_audit_events[name] = event
env.all_audit_events = fresh_all_audit_events
def audit_events_merge(app, env, docnames, other):
"""In Sphinx parallel builds, this merges env.all_audit_events from
subprocesses.
all_audit_events is a dict of names, with values like:
{'source': [(docname, target), ...], 'args': args}
"""
if not hasattr(other, 'all_audit_events'):
return
if not hasattr(env, 'all_audit_events'):
env.all_audit_events = {}
for name, value in other.all_audit_events.items():
if name in env.all_audit_events:
env.all_audit_events[name]["source"].extend(value["source"])
else:
env.all_audit_events[name] = value
class AuditEvent(SphinxDirective):
has_content = True
required_arguments = 1
optional_arguments = 2
final_argument_whitespace = True
_label = [
sphinx_gettext("Raises an :ref:`auditing event <auditing>` {name} with no arguments."),
sphinx_gettext("Raises an :ref:`auditing event <auditing>` {name} with argument {args}."),
sphinx_gettext("Raises an :ref:`auditing event <auditing>` {name} with arguments {args}."),
]
@property
def logger(self):
cls = type(self)
return logging.getLogger(cls.__module__ + "." + cls.__name__)
def run(self):
name = self.arguments[0]
if len(self.arguments) >= 2 and self.arguments[1]:
args = (a.strip() for a in self.arguments[1].strip("'\"").split(","))
args = [a for a in args if a]
else:
args = []
label = self._label[min(2, len(args))]
text = label.format(name="``{}``".format(name),
args=", ".join("``{}``".format(a) for a in args if a))
if not hasattr(self.env, 'all_audit_events'):
self.env.all_audit_events = {}
new_info = {
'source': [],
'args': args
}
info = self.env.all_audit_events.setdefault(name, new_info)
if info is not new_info:
if not self._do_args_match(info['args'], new_info['args']):
self.logger.warning(
"Mismatched arguments for audit-event {}: {!r} != {!r}"
.format(name, info['args'], new_info['args'])
)
ids = []
try:
target = self.arguments[2].strip("\"'")
except (IndexError, TypeError):
target = None
if not target:
target = "audit_event_{}_{}".format(
re.sub(r'\W', '_', name),
len(info['source']),
)
ids.append(target)
info['source'].append((self.env.docname, target))
pnode = nodes.paragraph(text, classes=["audit-hook"], ids=ids)
pnode.line = self.lineno
if self.content:
self.state.nested_parse(self.content, self.content_offset, pnode)
else:
n, m = self.state.inline_text(text, self.lineno)
pnode.extend(n + m)
return [pnode]
# This list of sets are allowable synonyms for event argument names.
# If two names are in the same set, they are treated as equal for the
# purposes of warning. This won't help if number of arguments is
# different!
_SYNONYMS = [
{"file", "path", "fd"},
]
def _do_args_match(self, args1, args2):
if args1 == args2:
return True
if len(args1) != len(args2):
return False
for a1, a2 in zip(args1, args2):
if a1 == a2:
continue
if any(a1 in s and a2 in s for s in self._SYNONYMS):
continue
return False
return True
class audit_event_list(nodes.General, nodes.Element):
pass
class AuditEventListDirective(SphinxDirective):
def run(self):
return [audit_event_list('')]
# Support for documenting decorators # Support for documenting decorators
class PyDecoratorMixin(object): class PyDecoratorMixin(object):
@ -583,70 +445,6 @@ def parse_monitoring_event(env, sig, signode):
return sig return sig
def process_audit_events(app, doctree, fromdocname):
for node in doctree.findall(audit_event_list):
break
else:
return
env = app.builder.env
table = nodes.table(cols=3)
group = nodes.tgroup(
'',
nodes.colspec(colwidth=30),
nodes.colspec(colwidth=55),
nodes.colspec(colwidth=15),
cols=3,
)
head = nodes.thead()
body = nodes.tbody()
table += group
group += head
group += body
row = nodes.row()
row += nodes.entry('', nodes.paragraph('', nodes.Text('Audit event')))
row += nodes.entry('', nodes.paragraph('', nodes.Text('Arguments')))
row += nodes.entry('', nodes.paragraph('', nodes.Text('References')))
head += row
for name in sorted(getattr(env, "all_audit_events", ())):
audit_event = env.all_audit_events[name]
row = nodes.row()
node = nodes.paragraph('', nodes.Text(name))
row += nodes.entry('', node)
node = nodes.paragraph()
for i, a in enumerate(audit_event['args']):
if i:
node += nodes.Text(", ")
node += nodes.literal(a, nodes.Text(a))
row += nodes.entry('', node)
node = nodes.paragraph()
backlinks = enumerate(sorted(set(audit_event['source'])), start=1)
for i, (doc, label) in backlinks:
if isinstance(label, str):
ref = nodes.reference("", nodes.Text("[{}]".format(i)), internal=True)
try:
ref['refuri'] = "{}#{}".format(
app.builder.get_relative_uri(fromdocname, doc),
label,
)
except NoUri:
continue
node += ref
row += nodes.entry('', node)
body += row
for node in doctree.findall(audit_event_list):
node.replace_self(table)
def patch_pairindextypes(app, _env) -> None: def patch_pairindextypes(app, _env) -> None:
"""Remove all entries from ``pairindextypes`` before writing POT files. """Remove all entries from ``pairindextypes`` before writing POT files.
@ -676,8 +474,6 @@ def setup(app):
app.add_role('gh', gh_issue_role) app.add_role('gh', gh_issue_role)
app.add_directive('impl-detail', ImplementationDetail) app.add_directive('impl-detail', ImplementationDetail)
app.add_directive('availability', Availability) app.add_directive('availability', Availability)
app.add_directive('audit-event', AuditEvent)
app.add_directive('audit-event-table', AuditEventListDirective)
app.add_directive('deprecated-removed', DeprecatedRemoved) app.add_directive('deprecated-removed', DeprecatedRemoved)
app.add_builder(PydocTopicsBuilder) app.add_builder(PydocTopicsBuilder)
app.add_object_type('opcode', 'opcode', '%s (opcode)', parse_opcode_signature) app.add_object_type('opcode', 'opcode', '%s (opcode)', parse_opcode_signature)
@ -692,7 +488,4 @@ def setup(app):
app.add_directive_to_domain('py', 'abstractmethod', PyAbstractMethod) app.add_directive_to_domain('py', 'abstractmethod', PyAbstractMethod)
app.add_directive('miscnews', MiscNews) app.add_directive('miscnews', MiscNews)
app.connect('env-check-consistency', patch_pairindextypes) app.connect('env-check-consistency', patch_pairindextypes)
app.connect('doctree-resolved', process_audit_events)
app.connect('env-merge-info', audit_events_merge)
app.connect('env-purge-doc', audit_events_purge)
return {'version': '1.0', 'parallel_read_safe': True} return {'version': '1.0', 'parallel_read_safe': True}