mirror of
https://github.com/astral-sh/ruff.git
synced 2025-10-16 21:38:11 +00:00

## Summary A [passing comment](https://github.com/astral-sh/ruff/pull/19711#issuecomment-3169312014) led me to explore why we didn't report a class attribute as possibly unbound if it was a method and defined in two different conditional branches. I found that the reason was because of our handling of "conflicting declarations" in `place_from_declarations`. It returned a `Result` which would be `Err` in case of conflicting declarations. But we only actually care about conflicting declarations when we are actually doing type inference on that scope and might emit a diagnostic about it. And in all cases (including that one), we want to otherwise proceed with the union of the declared types, as if there was no conflict. In several cases we were failing to handle the union of declared types in the same way as a normal declared type if there was a declared-types conflict. The `Result` return type made this mistake really easy to make, as we'd match on e.g. `Ok(Place::Type(...))` and do one thing, then match on `Err(...)` and do another, even though really both of those cases should be handled the same. This PR refactors `place_from_declarations` to instead return a struct which always represents the declared type we should use in the same way, as well as carrying the conflicting declared types, if any. This struct has a method to allow us to explicitly ignore the declared-types conflict (which is what we want in most cases), as well as a method to get the declared type and the conflict information, in the case where we want to emit a diagnostic on the conflict. ## Test Plan Existing CI; added a test showing that we now understand a multiply-conditionally-defined method as possibly-unbound. This does trigger issues on a couple new fuzzer seeds, but the issues are just new instances of an already-known (and rarely occurring) problem which I already plan to address in a future PR, so I think it's OK to land as-is. I happened to build this initially on top of https://github.com/astral-sh/ruff/pull/19711, which adds invalid-await diagnostics, so I also updated some invalid-syntax tests to not await on an invalid type, since the purpose of those tests is to check the syntactic location of the `await`, not the validity of the awaited type.
448 lines
15 KiB
Python
448 lines
15 KiB
Python
"""
|
|
Run a Ruff executable on randomly generated (but syntactically valid)
|
|
Python source-code files.
|
|
|
|
This script can be installed into a virtual environment using
|
|
`uv pip install -e ./python/py-fuzzer` from the Ruff repository root,
|
|
or can be run using `uvx --from ./python/py-fuzzer fuzz`
|
|
(in which case the virtual environment does not need to be activated).
|
|
|
|
Example invocations of the script using `uv`:
|
|
- Run the fuzzer on Ruff's parser using seeds 0, 1, 2, 78 and 93 to generate the code:
|
|
`uvx --from ./python/py-fuzzer fuzz --bin ruff 0-2 78 93`
|
|
- Run the fuzzer concurrently using seeds in range 0-10 inclusive,
|
|
but only reporting bugs that are new on your branch:
|
|
`uvx --from ./python/py-fuzzer fuzz --bin ruff 0-10 --only-new-bugs`
|
|
- Run the fuzzer concurrently on 10,000 different Python source-code files,
|
|
using a random selection of seeds, and only print a summary at the end
|
|
(the `shuf` command is Unix-specific):
|
|
`uvx --from ./python/py-fuzzer fuzz --bin ruff $(shuf -i 0-1000000 -n 10000) --quiet
|
|
|
|
If you make local modifications to this script, you'll need to run the above
|
|
with `--reinstall` to get your changes reflected in the uv-cached installed
|
|
package. Alternatively, if iterating quickly on changes, you can add
|
|
`--with-editable ./python/py-fuzzer`.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import ast
|
|
import concurrent.futures
|
|
import enum
|
|
import subprocess
|
|
import tempfile
|
|
from collections.abc import Callable
|
|
from dataclasses import KW_ONLY, dataclass
|
|
from functools import partial
|
|
from pathlib import Path
|
|
from typing import NewType, NoReturn, assert_never
|
|
|
|
from pysource_codegen import generate as generate_random_code
|
|
from pysource_minimize import CouldNotMinimize, minimize as minimize_repro
|
|
from rich_argparse import RawDescriptionRichHelpFormatter
|
|
from termcolor import colored
|
|
|
|
MinimizedSourceCode = NewType("MinimizedSourceCode", str)
|
|
Seed = NewType("Seed", int)
|
|
ExitCode = NewType("ExitCode", int)
|
|
|
|
|
|
def ty_contains_bug(code: str, *, ty_executable: Path) -> bool:
|
|
"""Return `True` if the code triggers a panic in type-checking code."""
|
|
with tempfile.TemporaryDirectory() as tempdir:
|
|
input_file = Path(tempdir, "input.py")
|
|
input_file.write_text(code)
|
|
completed_process = subprocess.run(
|
|
[ty_executable, "check", input_file], capture_output=True, text=True
|
|
)
|
|
return completed_process.returncode not in {0, 1, 2}
|
|
|
|
|
|
def ruff_contains_bug(code: str, *, ruff_executable: Path) -> bool:
|
|
"""Return `True` if the code triggers a parser error."""
|
|
completed_process = subprocess.run(
|
|
[
|
|
ruff_executable,
|
|
"check",
|
|
"--config",
|
|
"lint.select=[]",
|
|
"--no-cache",
|
|
"--target-version",
|
|
"py313",
|
|
"--preview",
|
|
"-",
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
input=code,
|
|
)
|
|
return completed_process.returncode != 0
|
|
|
|
|
|
def contains_bug(code: str, *, executable: Executable, executable_path: Path) -> bool:
|
|
"""Return `True` if the code triggers an error."""
|
|
match executable:
|
|
case Executable.RUFF:
|
|
return ruff_contains_bug(code, ruff_executable=executable_path)
|
|
case Executable.TY:
|
|
return ty_contains_bug(code, ty_executable=executable_path)
|
|
case _ as unreachable:
|
|
assert_never(unreachable)
|
|
|
|
|
|
def contains_new_bug(
|
|
code: str,
|
|
*,
|
|
executable: Executable,
|
|
test_executable_path: Path,
|
|
baseline_executable_path: Path,
|
|
) -> bool:
|
|
"""Return `True` if the code triggers a *new* parser error.
|
|
|
|
A "new" parser error is one that exists with `test_executable`,
|
|
but did not exist with `baseline_executable`.
|
|
"""
|
|
return contains_bug(
|
|
code, executable=executable, executable_path=test_executable_path
|
|
) and not contains_bug(
|
|
code, executable=executable, executable_path=baseline_executable_path
|
|
)
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class FuzzResult:
|
|
# The seed used to generate the random Python file.
|
|
# The same seed always generates the same file.
|
|
seed: Seed
|
|
# If we found a bug, this will be the minimum Python code
|
|
# required to trigger the bug. If not, it will be `None`.
|
|
maybe_bug: MinimizedSourceCode | None
|
|
# The executable we're testing
|
|
executable: Executable
|
|
_: KW_ONLY
|
|
only_new_bugs: bool
|
|
|
|
def print_description(self, index: int, num_seeds: int) -> None:
|
|
"""Describe the results of fuzzing the parser with this seed."""
|
|
progress = f"[{index}/{num_seeds}]"
|
|
msg = (
|
|
colored(f"Ran fuzzer on seed {self.seed}", "red")
|
|
if self.maybe_bug
|
|
else colored(f"Ran fuzzer successfully on seed {self.seed}", "green")
|
|
)
|
|
print(f"{msg:<60} {progress:>15}", flush=True)
|
|
|
|
new = "new " if self.only_new_bugs else ""
|
|
|
|
if self.maybe_bug:
|
|
match self.executable:
|
|
case Executable.RUFF:
|
|
panic_message = f"The following code triggers a {new}parser bug:"
|
|
case Executable.TY:
|
|
panic_message = f"The following code triggers a {new}ty panic:"
|
|
case _ as unreachable:
|
|
assert_never(unreachable)
|
|
|
|
print(colored(panic_message, "red"))
|
|
print()
|
|
print(self.maybe_bug)
|
|
print(flush=True)
|
|
|
|
|
|
def fuzz_code(seed: Seed, args: ResolvedCliArgs) -> FuzzResult:
|
|
"""Return a `FuzzResult` instance describing the fuzzing result from this seed."""
|
|
# TODO(carljm) remove once we debug the slowness of these seeds
|
|
skip_check = seed in {120, 160, 314, 335}
|
|
|
|
code = generate_random_code(seed)
|
|
bug_found = False
|
|
minimizer_callback: Callable[[str], bool] | None = None
|
|
|
|
if args.baseline_executable_path is None:
|
|
only_new_bugs = False
|
|
if not skip_check and contains_bug(
|
|
code, executable=args.executable, executable_path=args.test_executable_path
|
|
):
|
|
bug_found = True
|
|
minimizer_callback = partial(
|
|
contains_bug,
|
|
executable=args.executable,
|
|
executable_path=args.test_executable_path,
|
|
)
|
|
else:
|
|
only_new_bugs = True
|
|
if not skip_check and contains_new_bug(
|
|
code,
|
|
executable=args.executable,
|
|
test_executable_path=args.test_executable_path,
|
|
baseline_executable_path=args.baseline_executable_path,
|
|
):
|
|
bug_found = True
|
|
minimizer_callback = partial(
|
|
contains_new_bug,
|
|
executable=args.executable,
|
|
test_executable_path=args.test_executable_path,
|
|
baseline_executable_path=args.baseline_executable_path,
|
|
)
|
|
|
|
if not bug_found:
|
|
return FuzzResult(seed, None, args.executable, only_new_bugs=only_new_bugs)
|
|
|
|
assert minimizer_callback is not None
|
|
|
|
try:
|
|
maybe_bug = MinimizedSourceCode(minimize_repro(code, minimizer_callback))
|
|
except CouldNotMinimize as e:
|
|
# This is to double-check that there isn't a bug in
|
|
# `pysource-minimize`/`pysource-codegen`.
|
|
# `pysource-minimize` *should* never produce code that's invalid syntax.
|
|
try:
|
|
ast.parse(code)
|
|
except SyntaxError:
|
|
raise e from None
|
|
else:
|
|
maybe_bug = MinimizedSourceCode(code)
|
|
|
|
return FuzzResult(seed, maybe_bug, args.executable, only_new_bugs=only_new_bugs)
|
|
|
|
|
|
def run_fuzzer_concurrently(args: ResolvedCliArgs) -> list[FuzzResult]:
|
|
num_seeds = len(args.seeds)
|
|
print(
|
|
f"Concurrently running the fuzzer on "
|
|
f"{num_seeds} randomly generated source-code "
|
|
f"file{'s' if num_seeds != 1 else ''}..."
|
|
)
|
|
bugs: list[FuzzResult] = []
|
|
with concurrent.futures.ProcessPoolExecutor() as executor:
|
|
fuzz_result_futures = [
|
|
executor.submit(fuzz_code, seed, args) for seed in args.seeds
|
|
]
|
|
try:
|
|
for i, future in enumerate(
|
|
concurrent.futures.as_completed(fuzz_result_futures), start=1
|
|
):
|
|
fuzz_result = future.result()
|
|
if not args.quiet:
|
|
fuzz_result.print_description(i, num_seeds)
|
|
if fuzz_result.maybe_bug:
|
|
bugs.append(fuzz_result)
|
|
except KeyboardInterrupt:
|
|
print("\nShutting down the ProcessPoolExecutor due to KeyboardInterrupt...")
|
|
print("(This might take a few seconds)")
|
|
executor.shutdown(cancel_futures=True)
|
|
raise
|
|
return bugs
|
|
|
|
|
|
def run_fuzzer_sequentially(args: ResolvedCliArgs) -> list[FuzzResult]:
|
|
num_seeds = len(args.seeds)
|
|
print(
|
|
f"Sequentially running the fuzzer on "
|
|
f"{num_seeds} randomly generated source-code "
|
|
f"file{'s' if num_seeds != 1 else ''}..."
|
|
)
|
|
bugs: list[FuzzResult] = []
|
|
for i, seed in enumerate(args.seeds, start=1):
|
|
fuzz_result = fuzz_code(seed, args)
|
|
if not args.quiet:
|
|
fuzz_result.print_description(i, num_seeds)
|
|
if fuzz_result.maybe_bug:
|
|
bugs.append(fuzz_result)
|
|
return bugs
|
|
|
|
|
|
def run_fuzzer(args: ResolvedCliArgs) -> ExitCode:
|
|
if len(args.seeds) <= 5:
|
|
bugs = run_fuzzer_sequentially(args)
|
|
else:
|
|
bugs = run_fuzzer_concurrently(args)
|
|
noun_phrase = "New bugs" if args.baseline_executable_path is not None else "Bugs"
|
|
if bugs:
|
|
print(colored(f"{noun_phrase} found in the following seeds:", "red"))
|
|
print(*sorted(bug.seed for bug in bugs))
|
|
return ExitCode(1)
|
|
else:
|
|
print(colored(f"No {noun_phrase.lower()} found!", "green"))
|
|
return ExitCode(0)
|
|
|
|
|
|
def absolute_path(p: str) -> Path:
|
|
return Path(p).absolute()
|
|
|
|
|
|
def parse_seed_argument(arg: str) -> int | range:
|
|
"""Helper for argument parsing"""
|
|
if "-" in arg:
|
|
start, end = map(int, arg.split("-"))
|
|
if end <= start:
|
|
raise argparse.ArgumentTypeError(
|
|
f"Error when parsing seed argument {arg!r}: "
|
|
f"range end must be > range start"
|
|
)
|
|
seed_range = range(start, end + 1)
|
|
range_too_long = (
|
|
f"Error when parsing seed argument {arg!r}: "
|
|
f"maximum allowed range length is 1_000_000_000"
|
|
)
|
|
try:
|
|
if len(seed_range) > 1_000_000_000:
|
|
raise argparse.ArgumentTypeError(range_too_long)
|
|
except OverflowError:
|
|
raise argparse.ArgumentTypeError(range_too_long) from None
|
|
return range(int(start), int(end) + 1)
|
|
return int(arg)
|
|
|
|
|
|
class Executable(enum.StrEnum):
|
|
RUFF = "ruff"
|
|
TY = "ty"
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class ResolvedCliArgs:
|
|
seeds: list[Seed]
|
|
_: KW_ONLY
|
|
executable: Executable
|
|
test_executable_path: Path
|
|
baseline_executable_path: Path | None
|
|
quiet: bool
|
|
|
|
|
|
def parse_args() -> ResolvedCliArgs:
|
|
"""Parse command-line arguments"""
|
|
parser = argparse.ArgumentParser(
|
|
description=__doc__, formatter_class=RawDescriptionRichHelpFormatter
|
|
)
|
|
parser.add_argument(
|
|
"seeds",
|
|
type=parse_seed_argument,
|
|
nargs="+",
|
|
help="Either a single seed, or an inclusive range of seeds in the format `0-5`",
|
|
)
|
|
parser.add_argument(
|
|
"--only-new-bugs",
|
|
action="store_true",
|
|
help=(
|
|
"Only report bugs if they exist on the current branch, "
|
|
"but *didn't* exist on the released version "
|
|
"installed into the Python environment we're running in"
|
|
),
|
|
)
|
|
parser.add_argument(
|
|
"--quiet",
|
|
action="store_true",
|
|
help="Print fewer things to the terminal while running the fuzzer",
|
|
)
|
|
parser.add_argument(
|
|
"--test-executable",
|
|
help=(
|
|
"Executable to test. "
|
|
"Defaults to a fresh build of the currently checked-out branch."
|
|
),
|
|
type=absolute_path,
|
|
)
|
|
parser.add_argument(
|
|
"--baseline-executable",
|
|
help=(
|
|
"Executable to compare results against. "
|
|
"Defaults to whatever version is installed "
|
|
"in the Python environment."
|
|
),
|
|
type=absolute_path,
|
|
)
|
|
parser.add_argument(
|
|
"--bin",
|
|
help="Which executable to test.",
|
|
required=True,
|
|
choices=[member.value for member in Executable],
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
executable = Executable(args.bin)
|
|
|
|
if args.baseline_executable:
|
|
if not args.only_new_bugs:
|
|
parser.error(
|
|
"Specifying `--baseline-executable` has no effect "
|
|
"unless `--only-new-bugs` is also specified"
|
|
)
|
|
try:
|
|
subprocess.run(
|
|
[args.baseline_executable, "--version"], check=True, capture_output=True
|
|
)
|
|
except FileNotFoundError:
|
|
parser.error(
|
|
f"Bad argument passed to `--baseline-executable`: "
|
|
f"no such file or executable {args.baseline_executable!r}"
|
|
)
|
|
elif args.only_new_bugs:
|
|
try:
|
|
version_proc = subprocess.run(
|
|
[executable, "--version"], text=True, capture_output=True, check=True
|
|
)
|
|
except FileNotFoundError:
|
|
parser.error(
|
|
"`--only-new-bugs` was specified without specifying a baseline "
|
|
f"executable, and no released version of `{executable}` appears to be "
|
|
"installed in your Python environment"
|
|
)
|
|
else:
|
|
if not args.quiet:
|
|
version = version_proc.stdout.strip().split(" ")[1]
|
|
print(
|
|
f"`--only-new-bugs` was specified without specifying a baseline "
|
|
f"executable; falling back to using `{executable}=={version}` as "
|
|
f"the baseline (the version of `{executable}` installed in your "
|
|
f"current Python environment)"
|
|
)
|
|
|
|
if not args.test_executable:
|
|
print(
|
|
"Running `cargo build --release` since no test executable was specified...",
|
|
flush=True,
|
|
)
|
|
cmd: list[str] = [
|
|
"cargo",
|
|
"build",
|
|
"--release",
|
|
"--locked",
|
|
"--color",
|
|
"always",
|
|
"--bin",
|
|
executable,
|
|
]
|
|
try:
|
|
subprocess.run(cmd, check=True, capture_output=True, text=True)
|
|
except subprocess.CalledProcessError as e:
|
|
print(e.stderr)
|
|
raise
|
|
args.test_executable = Path("target", "release", executable)
|
|
assert args.test_executable.is_file()
|
|
|
|
seed_arguments: list[range | int] = args.seeds
|
|
seen_seeds: set[int] = set()
|
|
for arg in seed_arguments:
|
|
if isinstance(arg, int):
|
|
seen_seeds.add(arg)
|
|
else:
|
|
seen_seeds.update(arg)
|
|
|
|
return ResolvedCliArgs(
|
|
sorted(map(Seed, seen_seeds)),
|
|
quiet=args.quiet,
|
|
executable=executable,
|
|
test_executable_path=args.test_executable,
|
|
baseline_executable_path=args.baseline_executable,
|
|
)
|
|
|
|
|
|
def main() -> NoReturn:
|
|
args = parse_args()
|
|
raise SystemExit(run_fuzzer(args))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|