mirror of
https://github.com/astral-sh/ruff.git
synced 2025-07-07 21:25:08 +00:00

## Summary I tried running `py-fuzzer` using executables in the current working directory, but that failed with: ``` ▶ uvx --from ./python/py-fuzzer --reinstall fuzz --test-executable ./ty_feature --bin=ty --baseline-executable ./ty_main --only-new-bugs 0-500 Usage: fuzz [-h] [--only-new-bugs] [--quiet] [--test-executable TEST_EXECUTABLE] [--baseline-executable BASELINE_EXECUTABLE] --bin {ruff,ty} seeds [seeds ...] fuzz: error: Bad argument passed to `--baseline-executable`: no such file or executable PosixPath('ty_main') "Bad argument passed to `--baseline-executable`: no such file or executable PosixPath('ty_main')" ``` Using `.absolute()` on the `Path` fixes this. ## Test Plan Successful `py-fuzzer` run with the invocation above.
448 lines
15 KiB
Python
448 lines
15 KiB
Python
"""
|
|
Run a Ruff executable on randomly generated (but syntactically valid)
|
|
Python source-code files.
|
|
|
|
This script can be installed into a virtual environment using
|
|
`uv pip install -e ./python/py-fuzzer` from the Ruff repository root,
|
|
or can be run using `uvx --from ./python/py-fuzzer fuzz`
|
|
(in which case the virtual environment does not need to be activated).
|
|
|
|
Example invocations of the script using `uv`:
|
|
- Run the fuzzer on Ruff's parser using seeds 0, 1, 2, 78 and 93 to generate the code:
|
|
`uvx --from ./python/py-fuzzer fuzz --bin ruff 0-2 78 93`
|
|
- Run the fuzzer concurrently using seeds in range 0-10 inclusive,
|
|
but only reporting bugs that are new on your branch:
|
|
`uvx --from ./python/py-fuzzer fuzz --bin ruff 0-10 --only-new-bugs`
|
|
- Run the fuzzer concurrently on 10,000 different Python source-code files,
|
|
using a random selection of seeds, and only print a summary at the end
|
|
(the `shuf` command is Unix-specific):
|
|
`uvx --from ./python/py-fuzzer fuzz --bin ruff $(shuf -i 0-1000000 -n 10000) --quiet
|
|
|
|
If you make local modifications to this script, you'll need to run the above
|
|
with `--reinstall` to get your changes reflected in the uv-cached installed
|
|
package. Alternatively, if iterating quickly on changes, you can add
|
|
`--with-editable ./python/py-fuzzer`.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import ast
|
|
import concurrent.futures
|
|
import enum
|
|
import subprocess
|
|
import tempfile
|
|
from collections.abc import Callable
|
|
from dataclasses import KW_ONLY, dataclass
|
|
from functools import partial
|
|
from pathlib import Path
|
|
from typing import NewType, NoReturn, assert_never
|
|
|
|
from pysource_codegen import generate as generate_random_code
|
|
from pysource_minimize import CouldNotMinimize, minimize as minimize_repro
|
|
from rich_argparse import RawDescriptionRichHelpFormatter
|
|
from termcolor import colored
|
|
|
|
MinimizedSourceCode = NewType("MinimizedSourceCode", str)
|
|
Seed = NewType("Seed", int)
|
|
ExitCode = NewType("ExitCode", int)
|
|
|
|
|
|
def ty_contains_bug(code: str, *, ty_executable: Path) -> bool:
|
|
"""Return `True` if the code triggers a panic in type-checking code."""
|
|
with tempfile.TemporaryDirectory() as tempdir:
|
|
input_file = Path(tempdir, "input.py")
|
|
input_file.write_text(code)
|
|
completed_process = subprocess.run(
|
|
[ty_executable, "check", input_file], capture_output=True, text=True
|
|
)
|
|
return completed_process.returncode not in {0, 1, 2}
|
|
|
|
|
|
def ruff_contains_bug(code: str, *, ruff_executable: Path) -> bool:
|
|
"""Return `True` if the code triggers a parser error."""
|
|
completed_process = subprocess.run(
|
|
[
|
|
ruff_executable,
|
|
"check",
|
|
"--config",
|
|
"lint.select=[]",
|
|
"--no-cache",
|
|
"--target-version",
|
|
"py313",
|
|
"--preview",
|
|
"-",
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
input=code,
|
|
)
|
|
return completed_process.returncode != 0
|
|
|
|
|
|
def contains_bug(code: str, *, executable: Executable, executable_path: Path) -> bool:
|
|
"""Return `True` if the code triggers an error."""
|
|
match executable:
|
|
case Executable.RUFF:
|
|
return ruff_contains_bug(code, ruff_executable=executable_path)
|
|
case Executable.TY:
|
|
return ty_contains_bug(code, ty_executable=executable_path)
|
|
case _ as unreachable:
|
|
assert_never(unreachable)
|
|
|
|
|
|
def contains_new_bug(
|
|
code: str,
|
|
*,
|
|
executable: Executable,
|
|
test_executable_path: Path,
|
|
baseline_executable_path: Path,
|
|
) -> bool:
|
|
"""Return `True` if the code triggers a *new* parser error.
|
|
|
|
A "new" parser error is one that exists with `test_executable`,
|
|
but did not exist with `baseline_executable`.
|
|
"""
|
|
return contains_bug(
|
|
code, executable=executable, executable_path=test_executable_path
|
|
) and not contains_bug(
|
|
code, executable=executable, executable_path=baseline_executable_path
|
|
)
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class FuzzResult:
|
|
# The seed used to generate the random Python file.
|
|
# The same seed always generates the same file.
|
|
seed: Seed
|
|
# If we found a bug, this will be the minimum Python code
|
|
# required to trigger the bug. If not, it will be `None`.
|
|
maybe_bug: MinimizedSourceCode | None
|
|
# The executable we're testing
|
|
executable: Executable
|
|
_: KW_ONLY
|
|
only_new_bugs: bool
|
|
|
|
def print_description(self, index: int, num_seeds: int) -> None:
|
|
"""Describe the results of fuzzing the parser with this seed."""
|
|
progress = f"[{index}/{num_seeds}]"
|
|
msg = (
|
|
colored(f"Ran fuzzer on seed {self.seed}", "red")
|
|
if self.maybe_bug
|
|
else colored(f"Ran fuzzer successfully on seed {self.seed}", "green")
|
|
)
|
|
print(f"{msg:<60} {progress:>15}", flush=True)
|
|
|
|
new = "new " if self.only_new_bugs else ""
|
|
|
|
if self.maybe_bug:
|
|
match self.executable:
|
|
case Executable.RUFF:
|
|
panic_message = f"The following code triggers a {new}parser bug:"
|
|
case Executable.TY:
|
|
panic_message = f"The following code triggers a {new}ty panic:"
|
|
case _ as unreachable:
|
|
assert_never(unreachable)
|
|
|
|
print(colored(panic_message, "red"))
|
|
print()
|
|
print(self.maybe_bug)
|
|
print(flush=True)
|
|
|
|
|
|
def fuzz_code(seed: Seed, args: ResolvedCliArgs) -> FuzzResult:
|
|
"""Return a `FuzzResult` instance describing the fuzzing result from this seed."""
|
|
# TODO(carljm) remove once we debug the slowness of these seeds
|
|
skip_check = seed in {120, 160, 335}
|
|
|
|
code = generate_random_code(seed)
|
|
bug_found = False
|
|
minimizer_callback: Callable[[str], bool] | None = None
|
|
|
|
if args.baseline_executable_path is None:
|
|
only_new_bugs = False
|
|
if not skip_check and contains_bug(
|
|
code, executable=args.executable, executable_path=args.test_executable_path
|
|
):
|
|
bug_found = True
|
|
minimizer_callback = partial(
|
|
contains_bug,
|
|
executable=args.executable,
|
|
executable_path=args.test_executable_path,
|
|
)
|
|
else:
|
|
only_new_bugs = True
|
|
if not skip_check and contains_new_bug(
|
|
code,
|
|
executable=args.executable,
|
|
test_executable_path=args.test_executable_path,
|
|
baseline_executable_path=args.baseline_executable_path,
|
|
):
|
|
bug_found = True
|
|
minimizer_callback = partial(
|
|
contains_new_bug,
|
|
executable=args.executable,
|
|
test_executable_path=args.test_executable_path,
|
|
baseline_executable_path=args.baseline_executable_path,
|
|
)
|
|
|
|
if not bug_found:
|
|
return FuzzResult(seed, None, args.executable, only_new_bugs=only_new_bugs)
|
|
|
|
assert minimizer_callback is not None
|
|
|
|
try:
|
|
maybe_bug = MinimizedSourceCode(minimize_repro(code, minimizer_callback))
|
|
except CouldNotMinimize as e:
|
|
# This is to double-check that there isn't a bug in
|
|
# `pysource-minimize`/`pysource-codegen`.
|
|
# `pysource-minimize` *should* never produce code that's invalid syntax.
|
|
try:
|
|
ast.parse(code)
|
|
except SyntaxError:
|
|
raise e from None
|
|
else:
|
|
maybe_bug = MinimizedSourceCode(code)
|
|
|
|
return FuzzResult(seed, maybe_bug, args.executable, only_new_bugs=only_new_bugs)
|
|
|
|
|
|
def run_fuzzer_concurrently(args: ResolvedCliArgs) -> list[FuzzResult]:
|
|
num_seeds = len(args.seeds)
|
|
print(
|
|
f"Concurrently running the fuzzer on "
|
|
f"{num_seeds} randomly generated source-code "
|
|
f"file{'s' if num_seeds != 1 else ''}..."
|
|
)
|
|
bugs: list[FuzzResult] = []
|
|
with concurrent.futures.ProcessPoolExecutor() as executor:
|
|
fuzz_result_futures = [
|
|
executor.submit(fuzz_code, seed, args) for seed in args.seeds
|
|
]
|
|
try:
|
|
for i, future in enumerate(
|
|
concurrent.futures.as_completed(fuzz_result_futures), start=1
|
|
):
|
|
fuzz_result = future.result()
|
|
if not args.quiet:
|
|
fuzz_result.print_description(i, num_seeds)
|
|
if fuzz_result.maybe_bug:
|
|
bugs.append(fuzz_result)
|
|
except KeyboardInterrupt:
|
|
print("\nShutting down the ProcessPoolExecutor due to KeyboardInterrupt...")
|
|
print("(This might take a few seconds)")
|
|
executor.shutdown(cancel_futures=True)
|
|
raise
|
|
return bugs
|
|
|
|
|
|
def run_fuzzer_sequentially(args: ResolvedCliArgs) -> list[FuzzResult]:
|
|
num_seeds = len(args.seeds)
|
|
print(
|
|
f"Sequentially running the fuzzer on "
|
|
f"{num_seeds} randomly generated source-code "
|
|
f"file{'s' if num_seeds != 1 else ''}..."
|
|
)
|
|
bugs: list[FuzzResult] = []
|
|
for i, seed in enumerate(args.seeds, start=1):
|
|
fuzz_result = fuzz_code(seed, args)
|
|
if not args.quiet:
|
|
fuzz_result.print_description(i, num_seeds)
|
|
if fuzz_result.maybe_bug:
|
|
bugs.append(fuzz_result)
|
|
return bugs
|
|
|
|
|
|
def run_fuzzer(args: ResolvedCliArgs) -> ExitCode:
|
|
if len(args.seeds) <= 5:
|
|
bugs = run_fuzzer_sequentially(args)
|
|
else:
|
|
bugs = run_fuzzer_concurrently(args)
|
|
noun_phrase = "New bugs" if args.baseline_executable_path is not None else "Bugs"
|
|
if bugs:
|
|
print(colored(f"{noun_phrase} found in the following seeds:", "red"))
|
|
print(*sorted(bug.seed for bug in bugs))
|
|
return ExitCode(1)
|
|
else:
|
|
print(colored(f"No {noun_phrase.lower()} found!", "green"))
|
|
return ExitCode(0)
|
|
|
|
|
|
def absolute_path(p: str) -> Path:
|
|
return Path(p).absolute()
|
|
|
|
|
|
def parse_seed_argument(arg: str) -> int | range:
|
|
"""Helper for argument parsing"""
|
|
if "-" in arg:
|
|
start, end = map(int, arg.split("-"))
|
|
if end <= start:
|
|
raise argparse.ArgumentTypeError(
|
|
f"Error when parsing seed argument {arg!r}: "
|
|
f"range end must be > range start"
|
|
)
|
|
seed_range = range(start, end + 1)
|
|
range_too_long = (
|
|
f"Error when parsing seed argument {arg!r}: "
|
|
f"maximum allowed range length is 1_000_000_000"
|
|
)
|
|
try:
|
|
if len(seed_range) > 1_000_000_000:
|
|
raise argparse.ArgumentTypeError(range_too_long)
|
|
except OverflowError:
|
|
raise argparse.ArgumentTypeError(range_too_long) from None
|
|
return range(int(start), int(end) + 1)
|
|
return int(arg)
|
|
|
|
|
|
class Executable(enum.StrEnum):
|
|
RUFF = "ruff"
|
|
TY = "ty"
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class ResolvedCliArgs:
|
|
seeds: list[Seed]
|
|
_: KW_ONLY
|
|
executable: Executable
|
|
test_executable_path: Path
|
|
baseline_executable_path: Path | None
|
|
quiet: bool
|
|
|
|
|
|
def parse_args() -> ResolvedCliArgs:
|
|
"""Parse command-line arguments"""
|
|
parser = argparse.ArgumentParser(
|
|
description=__doc__, formatter_class=RawDescriptionRichHelpFormatter
|
|
)
|
|
parser.add_argument(
|
|
"seeds",
|
|
type=parse_seed_argument,
|
|
nargs="+",
|
|
help="Either a single seed, or an inclusive range of seeds in the format `0-5`",
|
|
)
|
|
parser.add_argument(
|
|
"--only-new-bugs",
|
|
action="store_true",
|
|
help=(
|
|
"Only report bugs if they exist on the current branch, "
|
|
"but *didn't* exist on the released version "
|
|
"installed into the Python environment we're running in"
|
|
),
|
|
)
|
|
parser.add_argument(
|
|
"--quiet",
|
|
action="store_true",
|
|
help="Print fewer things to the terminal while running the fuzzer",
|
|
)
|
|
parser.add_argument(
|
|
"--test-executable",
|
|
help=(
|
|
"Executable to test. "
|
|
"Defaults to a fresh build of the currently checked-out branch."
|
|
),
|
|
type=absolute_path,
|
|
)
|
|
parser.add_argument(
|
|
"--baseline-executable",
|
|
help=(
|
|
"Executable to compare results against. "
|
|
"Defaults to whatever version is installed "
|
|
"in the Python environment."
|
|
),
|
|
type=absolute_path,
|
|
)
|
|
parser.add_argument(
|
|
"--bin",
|
|
help="Which executable to test.",
|
|
required=True,
|
|
choices=[member.value for member in Executable],
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
executable = Executable(args.bin)
|
|
|
|
if args.baseline_executable:
|
|
if not args.only_new_bugs:
|
|
parser.error(
|
|
"Specifying `--baseline-executable` has no effect "
|
|
"unless `--only-new-bugs` is also specified"
|
|
)
|
|
try:
|
|
subprocess.run(
|
|
[args.baseline_executable, "--version"], check=True, capture_output=True
|
|
)
|
|
except FileNotFoundError:
|
|
parser.error(
|
|
f"Bad argument passed to `--baseline-executable`: "
|
|
f"no such file or executable {args.baseline_executable!r}"
|
|
)
|
|
elif args.only_new_bugs:
|
|
try:
|
|
version_proc = subprocess.run(
|
|
[executable, "--version"], text=True, capture_output=True, check=True
|
|
)
|
|
except FileNotFoundError:
|
|
parser.error(
|
|
"`--only-new-bugs` was specified without specifying a baseline "
|
|
f"executable, and no released version of `{executable}` appears to be "
|
|
"installed in your Python environment"
|
|
)
|
|
else:
|
|
if not args.quiet:
|
|
version = version_proc.stdout.strip().split(" ")[1]
|
|
print(
|
|
f"`--only-new-bugs` was specified without specifying a baseline "
|
|
f"executable; falling back to using `{executable}=={version}` as "
|
|
f"the baseline (the version of `{executable}` installed in your "
|
|
f"current Python environment)"
|
|
)
|
|
|
|
if not args.test_executable:
|
|
print(
|
|
"Running `cargo build --release` since no test executable was specified...",
|
|
flush=True,
|
|
)
|
|
cmd: list[str] = [
|
|
"cargo",
|
|
"build",
|
|
"--release",
|
|
"--locked",
|
|
"--color",
|
|
"always",
|
|
"--bin",
|
|
executable,
|
|
]
|
|
try:
|
|
subprocess.run(cmd, check=True, capture_output=True, text=True)
|
|
except subprocess.CalledProcessError as e:
|
|
print(e.stderr)
|
|
raise
|
|
args.test_executable = Path("target", "release", executable)
|
|
assert args.test_executable.is_file()
|
|
|
|
seed_arguments: list[range | int] = args.seeds
|
|
seen_seeds: set[int] = set()
|
|
for arg in seed_arguments:
|
|
if isinstance(arg, int):
|
|
seen_seeds.add(arg)
|
|
else:
|
|
seen_seeds.update(arg)
|
|
|
|
return ResolvedCliArgs(
|
|
sorted(map(Seed, seen_seeds)),
|
|
quiet=args.quiet,
|
|
executable=executable,
|
|
test_executable_path=args.test_executable,
|
|
baseline_executable_path=args.baseline_executable,
|
|
)
|
|
|
|
|
|
def main() -> NoReturn:
|
|
args = parse_args()
|
|
raise SystemExit(run_fuzzer(args))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|