ruff/python/py-fuzzer/fuzz.py
Carl Meyer 3d2485eb1b
[ty] fix more ecosystem/fuzzer panics with fixpoint (#17758)
## Summary

Add cycle handling for `try_metaclass` and `pep695_generic_context`
queries, as well as adjusting the cycle handling for `try_mro` to ensure
that it short-circuits on cycles and won't grow MROs indefinitely.

This reduces the number of failing fuzzer seeds from 68 to 17. The
latter count includes fuzzer seeds 120, 160, and 335, all of which
previously panicked but now either hang or are very slow; I've
temporarily skipped those seeds in the fuzzer until I can dig into that
slowness further.

This also allows us to move some more ecosystem projects from `bad.txt`
to `good.txt`, which I've done in
https://github.com/astral-sh/ruff/pull/17903

## Test Plan

Added mdtests.
2025-05-08 20:36:20 -07:00

444 lines
15 KiB
Python

"""
Run a Ruff executable on randomly generated (but syntactically valid)
Python source-code files.
This script can be installed into a virtual environment using
`uv pip install -e ./python/py-fuzzer` from the Ruff repository root,
or can be run using `uvx --from ./python/py-fuzzer fuzz`
(in which case the virtual environment does not need to be activated).
Example invocations of the script using `uv`:
- Run the fuzzer on Ruff's parser using seeds 0, 1, 2, 78 and 93 to generate the code:
`uvx --from ./python/py-fuzzer fuzz --bin ruff 0-2 78 93`
- Run the fuzzer concurrently using seeds in range 0-10 inclusive,
but only reporting bugs that are new on your branch:
`uvx --from ./python/py-fuzzer fuzz --bin ruff 0-10 --new-bugs-only`
- Run the fuzzer concurrently on 10,000 different Python source-code files,
using a random selection of seeds, and only print a summary at the end
(the `shuf` command is Unix-specific):
`uvx --from ./python/py-fuzzer fuzz --bin ruff $(shuf -i 0-1000000 -n 10000) --quiet
If you make local modifications to this script, you'll need to run the above
with `--reinstall` to get your changes reflected in the uv-cached installed
package. Alternatively, if iterating quickly on changes, you can add
`--with-editable ./python/py-fuzzer`.
"""
from __future__ import annotations
import argparse
import ast
import concurrent.futures
import enum
import subprocess
import tempfile
from collections.abc import Callable
from dataclasses import KW_ONLY, dataclass
from functools import partial
from pathlib import Path
from typing import NewType, NoReturn, assert_never
from pysource_codegen import generate as generate_random_code
from pysource_minimize import CouldNotMinimize, minimize as minimize_repro
from rich_argparse import RawDescriptionRichHelpFormatter
from termcolor import colored
MinimizedSourceCode = NewType("MinimizedSourceCode", str)
Seed = NewType("Seed", int)
ExitCode = NewType("ExitCode", int)
def ty_contains_bug(code: str, *, ty_executable: Path) -> bool:
"""Return `True` if the code triggers a panic in type-checking code."""
with tempfile.TemporaryDirectory() as tempdir:
input_file = Path(tempdir, "input.py")
input_file.write_text(code)
completed_process = subprocess.run(
[ty_executable, "check", input_file], capture_output=True, text=True
)
return completed_process.returncode not in {0, 1, 2}
def ruff_contains_bug(code: str, *, ruff_executable: Path) -> bool:
"""Return `True` if the code triggers a parser error."""
completed_process = subprocess.run(
[
ruff_executable,
"check",
"--config",
"lint.select=[]",
"--no-cache",
"--target-version",
"py313",
"--preview",
"-",
],
capture_output=True,
text=True,
input=code,
)
return completed_process.returncode != 0
def contains_bug(code: str, *, executable: Executable, executable_path: Path) -> bool:
"""Return `True` if the code triggers an error."""
match executable:
case Executable.RUFF:
return ruff_contains_bug(code, ruff_executable=executable_path)
case Executable.TY:
return ty_contains_bug(code, ty_executable=executable_path)
case _ as unreachable:
assert_never(unreachable)
def contains_new_bug(
code: str,
*,
executable: Executable,
test_executable_path: Path,
baseline_executable_path: Path,
) -> bool:
"""Return `True` if the code triggers a *new* parser error.
A "new" parser error is one that exists with `test_executable`,
but did not exist with `baseline_executable`.
"""
return contains_bug(
code, executable=executable, executable_path=test_executable_path
) and not contains_bug(
code, executable=executable, executable_path=baseline_executable_path
)
@dataclass(slots=True)
class FuzzResult:
# The seed used to generate the random Python file.
# The same seed always generates the same file.
seed: Seed
# If we found a bug, this will be the minimum Python code
# required to trigger the bug. If not, it will be `None`.
maybe_bug: MinimizedSourceCode | None
# The executable we're testing
executable: Executable
_: KW_ONLY
only_new_bugs: bool
def print_description(self, index: int, num_seeds: int) -> None:
"""Describe the results of fuzzing the parser with this seed."""
progress = f"[{index}/{num_seeds}]"
msg = (
colored(f"Ran fuzzer on seed {self.seed}", "red")
if self.maybe_bug
else colored(f"Ran fuzzer successfully on seed {self.seed}", "green")
)
print(f"{msg:<60} {progress:>15}", flush=True)
new = "new " if self.only_new_bugs else ""
if self.maybe_bug:
match self.executable:
case Executable.RUFF:
panic_message = f"The following code triggers a {new}parser bug:"
case Executable.TY:
panic_message = f"The following code triggers a {new}ty panic:"
case _ as unreachable:
assert_never(unreachable)
print(colored(panic_message, "red"))
print()
print(self.maybe_bug)
print(flush=True)
def fuzz_code(seed: Seed, args: ResolvedCliArgs) -> FuzzResult:
"""Return a `FuzzResult` instance describing the fuzzing result from this seed."""
# TODO(carljm) remove once we debug the slowness of these seeds
skip_check = seed in {120, 160, 335}
code = generate_random_code(seed)
bug_found = False
minimizer_callback: Callable[[str], bool] | None = None
if args.baseline_executable_path is None:
only_new_bugs = False
if not skip_check and contains_bug(
code, executable=args.executable, executable_path=args.test_executable_path
):
bug_found = True
minimizer_callback = partial(
contains_bug,
executable=args.executable,
executable_path=args.test_executable_path,
)
else:
only_new_bugs = True
if not skip_check and contains_new_bug(
code,
executable=args.executable,
test_executable_path=args.test_executable_path,
baseline_executable_path=args.baseline_executable_path,
):
bug_found = True
minimizer_callback = partial(
contains_new_bug,
executable=args.executable,
test_executable_path=args.test_executable_path,
baseline_executable_path=args.baseline_executable_path,
)
if not bug_found:
return FuzzResult(seed, None, args.executable, only_new_bugs=only_new_bugs)
assert minimizer_callback is not None
try:
maybe_bug = MinimizedSourceCode(minimize_repro(code, minimizer_callback))
except CouldNotMinimize as e:
# This is to double-check that there isn't a bug in
# `pysource-minimize`/`pysource-codegen`.
# `pysource-minimize` *should* never produce code that's invalid syntax.
try:
ast.parse(code)
except SyntaxError:
raise e from None
else:
maybe_bug = MinimizedSourceCode(code)
return FuzzResult(seed, maybe_bug, args.executable, only_new_bugs=only_new_bugs)
def run_fuzzer_concurrently(args: ResolvedCliArgs) -> list[FuzzResult]:
num_seeds = len(args.seeds)
print(
f"Concurrently running the fuzzer on "
f"{num_seeds} randomly generated source-code "
f"file{'s' if num_seeds != 1 else ''}..."
)
bugs: list[FuzzResult] = []
with concurrent.futures.ProcessPoolExecutor() as executor:
fuzz_result_futures = [
executor.submit(fuzz_code, seed, args) for seed in args.seeds
]
try:
for i, future in enumerate(
concurrent.futures.as_completed(fuzz_result_futures), start=1
):
fuzz_result = future.result()
if not args.quiet:
fuzz_result.print_description(i, num_seeds)
if fuzz_result.maybe_bug:
bugs.append(fuzz_result)
except KeyboardInterrupt:
print("\nShutting down the ProcessPoolExecutor due to KeyboardInterrupt...")
print("(This might take a few seconds)")
executor.shutdown(cancel_futures=True)
raise
return bugs
def run_fuzzer_sequentially(args: ResolvedCliArgs) -> list[FuzzResult]:
num_seeds = len(args.seeds)
print(
f"Sequentially running the fuzzer on "
f"{num_seeds} randomly generated source-code "
f"file{'s' if num_seeds != 1 else ''}..."
)
bugs: list[FuzzResult] = []
for i, seed in enumerate(args.seeds, start=1):
fuzz_result = fuzz_code(seed, args)
if not args.quiet:
fuzz_result.print_description(i, num_seeds)
if fuzz_result.maybe_bug:
bugs.append(fuzz_result)
return bugs
def run_fuzzer(args: ResolvedCliArgs) -> ExitCode:
if len(args.seeds) <= 5:
bugs = run_fuzzer_sequentially(args)
else:
bugs = run_fuzzer_concurrently(args)
noun_phrase = "New bugs" if args.baseline_executable_path is not None else "Bugs"
if bugs:
print(colored(f"{noun_phrase} found in the following seeds:", "red"))
print(*sorted(bug.seed for bug in bugs))
return ExitCode(1)
else:
print(colored(f"No {noun_phrase.lower()} found!", "green"))
return ExitCode(0)
def parse_seed_argument(arg: str) -> int | range:
"""Helper for argument parsing"""
if "-" in arg:
start, end = map(int, arg.split("-"))
if end <= start:
raise argparse.ArgumentTypeError(
f"Error when parsing seed argument {arg!r}: "
f"range end must be > range start"
)
seed_range = range(start, end + 1)
range_too_long = (
f"Error when parsing seed argument {arg!r}: "
f"maximum allowed range length is 1_000_000_000"
)
try:
if len(seed_range) > 1_000_000_000:
raise argparse.ArgumentTypeError(range_too_long)
except OverflowError:
raise argparse.ArgumentTypeError(range_too_long) from None
return range(int(start), int(end) + 1)
return int(arg)
class Executable(enum.StrEnum):
RUFF = "ruff"
TY = "ty"
@dataclass(slots=True)
class ResolvedCliArgs:
seeds: list[Seed]
_: KW_ONLY
executable: Executable
test_executable_path: Path
baseline_executable_path: Path | None
quiet: bool
def parse_args() -> ResolvedCliArgs:
"""Parse command-line arguments"""
parser = argparse.ArgumentParser(
description=__doc__, formatter_class=RawDescriptionRichHelpFormatter
)
parser.add_argument(
"seeds",
type=parse_seed_argument,
nargs="+",
help="Either a single seed, or an inclusive range of seeds in the format `0-5`",
)
parser.add_argument(
"--only-new-bugs",
action="store_true",
help=(
"Only report bugs if they exist on the current branch, "
"but *didn't* exist on the released version "
"installed into the Python environment we're running in"
),
)
parser.add_argument(
"--quiet",
action="store_true",
help="Print fewer things to the terminal while running the fuzzer",
)
parser.add_argument(
"--test-executable",
help=(
"Executable to test. "
"Defaults to a fresh build of the currently checked-out branch."
),
type=Path,
)
parser.add_argument(
"--baseline-executable",
help=(
"Executable to compare results against. "
"Defaults to whatever version is installed "
"in the Python environment."
),
type=Path,
)
parser.add_argument(
"--bin",
help="Which executable to test.",
required=True,
choices=[member.value for member in Executable],
)
args = parser.parse_args()
executable = Executable(args.bin)
if args.baseline_executable:
if not args.only_new_bugs:
parser.error(
"Specifying `--baseline-executable` has no effect "
"unless `--only-new-bugs` is also specified"
)
try:
subprocess.run(
[args.baseline_executable, "--version"], check=True, capture_output=True
)
except FileNotFoundError:
parser.error(
f"Bad argument passed to `--baseline-executable`: "
f"no such file or executable {args.baseline_executable!r}"
)
elif args.only_new_bugs:
try:
version_proc = subprocess.run(
[executable, "--version"], text=True, capture_output=True, check=True
)
except FileNotFoundError:
parser.error(
"`--only-new-bugs` was specified without specifying a baseline "
f"executable, and no released version of `{executable}` appears to be "
"installed in your Python environment"
)
else:
if not args.quiet:
version = version_proc.stdout.strip().split(" ")[1]
print(
f"`--only-new-bugs` was specified without specifying a baseline "
f"executable; falling back to using `{executable}=={version}` as "
f"the baseline (the version of `{executable}` installed in your "
f"current Python environment)"
)
if not args.test_executable:
print(
"Running `cargo build --release` since no test executable was specified...",
flush=True,
)
cmd: list[str] = [
"cargo",
"build",
"--release",
"--locked",
"--color",
"always",
"--bin",
executable,
]
try:
subprocess.run(cmd, check=True, capture_output=True, text=True)
except subprocess.CalledProcessError as e:
print(e.stderr)
raise
args.test_executable = Path("target", "release", executable)
assert args.test_executable.is_file()
seed_arguments: list[range | int] = args.seeds
seen_seeds: set[int] = set()
for arg in seed_arguments:
if isinstance(arg, int):
seen_seeds.add(arg)
else:
seen_seeds.update(arg)
return ResolvedCliArgs(
sorted(map(Seed, seen_seeds)),
quiet=args.quiet,
executable=executable,
test_executable_path=args.test_executable,
baseline_executable_path=args.baseline_executable,
)
def main() -> NoReturn:
args = parse_args()
raise SystemExit(run_fuzzer(args))
if __name__ == "__main__":
main()