""" Run the parser on randomly generated (but syntactically valid) Python source-code files. To install all dependencies for this script into an environment using `uv`, run: uv pip install -r scripts/fuzz-parser/requirements.txt Example invocations of the script: - Run the fuzzer using seeds 0, 1, 2, 78 and 93 to generate the code: `python scripts/fuzz-parser/fuzz.py 0-2 78 93` - Run the fuzzer concurrently using seeds in range 0-10 inclusive, but only reporting bugs that are new on your branch: `python scripts/fuzz-parser/fuzz.py 0-10 --new-bugs-only` - Run the fuzzer concurrently on 10,000 different Python source-code files, using a random selection of seeds, and only print a summary at the end (the `shuf` command is Unix-specific): `python scripts/fuzz-parser/fuzz.py $(shuf -i 0-1000000 -n 10000) --quiet """ from __future__ import annotations import argparse import concurrent.futures import os.path import subprocess from dataclasses import KW_ONLY, dataclass from functools import partial from typing import NewType from pysource_codegen import generate as generate_random_code from pysource_minimize import minimize as minimize_repro from rich_argparse import RawDescriptionRichHelpFormatter from termcolor import colored MinimizedSourceCode = NewType("MinimizedSourceCode", str) Seed = NewType("Seed", int) ExitCode = NewType("ExitCode", int) def contains_bug(code: str, *, ruff_executable: str) -> bool: """Return `True` if the code triggers a parser error.""" completed_process = subprocess.run( [ruff_executable, "check", "--config", "lint.select=[]", "--no-cache", "-"], capture_output=True, text=True, input=code, ) return completed_process.returncode != 0 def contains_new_bug( code: str, *, test_executable: str, baseline_executable: str ) -> bool: """Return `True` if the code triggers a *new* parser error. A "new" parser error is one that exists with `test_executable`, but did not exist with `baseline_executable`. """ return contains_bug(code, ruff_executable=test_executable) and not contains_bug( code, ruff_executable=baseline_executable ) @dataclass(slots=True) class FuzzResult: # The seed used to generate the random Python file. # The same seed always generates the same file. seed: Seed # If we found a bug, this will be the minimum Python code # required to trigger the bug. If not, it will be `None`. maybe_bug: MinimizedSourceCode | None def print_description(self, index: int, num_seeds: int) -> None: """Describe the results of fuzzing the parser with this seed.""" progress = f"[{index}/{num_seeds}]" msg = ( colored(f"Ran fuzzer on seed {self.seed}", "red") if self.maybe_bug else colored(f"Ran fuzzer successfully on seed {self.seed}", "green") ) print(f"{msg:<60} {progress:>15}", flush=True) if self.maybe_bug: print(colored("The following code triggers a bug:", "red")) print() print(self.maybe_bug) print(flush=True) def fuzz_code( seed: Seed, *, test_executable: str, baseline_executable: str, only_new_bugs: bool, ) -> FuzzResult: """Return a `FuzzResult` instance describing the fuzzing result from this seed.""" code = generate_random_code(seed) has_bug = ( contains_new_bug( code, test_executable=test_executable, baseline_executable=baseline_executable, ) if only_new_bugs else contains_bug(code, ruff_executable=test_executable) ) if has_bug: maybe_bug = MinimizedSourceCode( minimize_repro(code, partial(contains_bug, ruff_executable=test_executable)) ) else: maybe_bug = None return FuzzResult(seed, maybe_bug) def run_fuzzer_concurrently(args: ResolvedCliArgs) -> list[FuzzResult]: num_seeds = len(args.seeds) print( f"Concurrently running the fuzzer on " f"{num_seeds} randomly generated source-code files..." ) bugs: list[FuzzResult] = [] with concurrent.futures.ProcessPoolExecutor() as executor: fuzz_result_futures = [ executor.submit( fuzz_code, seed, test_executable=args.test_executable, baseline_executable=args.baseline_executable, only_new_bugs=args.only_new_bugs, ) for seed in args.seeds ] try: for i, future in enumerate( concurrent.futures.as_completed(fuzz_result_futures), start=1 ): fuzz_result = future.result() if not args.quiet: fuzz_result.print_description(i, num_seeds) if fuzz_result.maybe_bug: bugs.append(fuzz_result) except KeyboardInterrupt: print("\nShutting down the ProcessPoolExecutor due to KeyboardInterrupt...") print("(This might take a few seconds)") executor.shutdown(cancel_futures=True) raise return bugs def run_fuzzer_sequentially(args: ResolvedCliArgs) -> list[FuzzResult]: num_seeds = len(args.seeds) print( f"Sequentially running the fuzzer on " f"{num_seeds} randomly generated source-code files..." ) bugs: list[FuzzResult] = [] for i, seed in enumerate(args.seeds, start=1): fuzz_result = fuzz_code( seed, test_executable=args.test_executable, baseline_executable=args.baseline_executable, only_new_bugs=args.only_new_bugs, ) if not args.quiet: fuzz_result.print_description(i, num_seeds) if fuzz_result.maybe_bug: bugs.append(fuzz_result) return bugs def main(args: ResolvedCliArgs) -> ExitCode: if len(args.seeds) <= 5: bugs = run_fuzzer_sequentially(args) else: bugs = run_fuzzer_concurrently(args) noun_phrase = "New bugs" if args.only_new_bugs else "Bugs" if bugs: print(colored(f"{noun_phrase} found in the following seeds:", "red")) print(*sorted(bug.seed for bug in bugs)) return ExitCode(1) else: print(colored(f"No {noun_phrase.lower()} found!", "green")) return ExitCode(0) def parse_seed_argument(arg: str) -> int | range: """Helper for argument parsing""" if "-" in arg: start, end = map(int, arg.split("-")) if end <= start: raise argparse.ArgumentTypeError( f"Error when parsing seed argument {arg!r}: " f"range end must be > range start" ) seed_range = range(start, end + 1) range_too_long = ( f"Error when parsing seed argument {arg!r}: " f"maximum allowed range length is 1_000_000_000" ) try: if len(seed_range) > 1_000_000_000: raise argparse.ArgumentTypeError(range_too_long) except OverflowError: raise argparse.ArgumentTypeError(range_too_long) from None return range(int(start), int(end) + 1) return int(arg) @dataclass(slots=True) class ResolvedCliArgs: seeds: list[Seed] _: KW_ONLY test_executable: str baseline_executable: str only_new_bugs: bool quiet: bool def parse_args() -> ResolvedCliArgs: """Parse command-line arguments""" parser = argparse.ArgumentParser( description=__doc__, formatter_class=RawDescriptionRichHelpFormatter ) parser.add_argument( "seeds", type=parse_seed_argument, nargs="+", help="Either a single seed, or an inclusive range of seeds in the format `0-5`", ) parser.add_argument( "--only-new-bugs", action="store_true", help=( "Only report bugs if they exist on the current branch, " "but *didn't* exist on the released version of Ruff " "installed into the Python environment we're running in" ), ) parser.add_argument( "--quiet", action="store_true", help="Print fewer things to the terminal while running the fuzzer", ) parser.add_argument( "--test-executable", help=( "`ruff` executable to test. " "Defaults to a fresh build of the currently checked-out branch." ), ) parser.add_argument( "--baseline-executable", help=( "`ruff` executable to compare results against. " "Defaults to whatever `ruff` version is installed " "in the Python environment." ), ) args = parser.parse_args() if args.baseline_executable: if not args.only_new_bugs: parser.error( "Specifying `--baseline-executable` has no effect " "unless `--only-new-bugs` is also specified" ) try: subprocess.run( [args.baseline_executable, "--version"], check=True, capture_output=True ) except FileNotFoundError: parser.error( f"Bad argument passed to `--baseline-executable`: " f"no such file or executable {args.baseline_executable!r}" ) elif args.only_new_bugs: try: ruff_version_proc = subprocess.run( ["ruff", "--version"], text=True, capture_output=True, check=True ) except FileNotFoundError: parser.error( "`--only-new-bugs` was specified without specifying a baseline " "executable, and no released version of Ruff appears to be installed " "in your Python environment" ) else: if not args.quiet: ruff_version = ruff_version_proc.stdout.strip().split(" ")[1] print( f"`--only-new-bugs` was specified without specifying a baseline " f"executable; falling back to using `ruff=={ruff_version}` as the " f"baseline (the version of Ruff installed in your current Python " f"environment)" ) args.baseline_executable = "ruff" if not args.test_executable: print( "Running `cargo build --release` since no test executable was specified...", flush=True, ) try: subprocess.run( ["cargo", "build", "--release", "--locked", "--color", "always"], check=True, capture_output=True, text=True, ) except subprocess.CalledProcessError as e: print(e.stderr) raise args.test_executable = os.path.join("target", "release", "ruff") assert os.path.exists(args.test_executable) seed_arguments: list[range | int] = args.seeds seen_seeds: set[int] = set() for arg in seed_arguments: if isinstance(arg, int): seen_seeds.add(arg) else: seen_seeds.update(arg) return ResolvedCliArgs( sorted(map(Seed, seen_seeds)), only_new_bugs=args.only_new_bugs, quiet=args.quiet, test_executable=args.test_executable, baseline_executable=args.baseline_executable, ) if __name__ == "__main__": args = parse_args() raise SystemExit(main(args))