#!/usr/bin/env python3 """Check two versions of ruff against a corpus of open-source code. Example usage: scripts/check_ecosystem.py """ from __future__ import annotations import argparse import asyncio import difflib import heapq import json import logging import re import tempfile import time from asyncio.subprocess import PIPE, create_subprocess_exec from contextlib import asynccontextmanager, nullcontext from pathlib import Path from signal import SIGINT, SIGTERM from typing import TYPE_CHECKING, NamedTuple, Self if TYPE_CHECKING: from collections.abc import AsyncIterator, Iterator, Sequence logger = logging.getLogger(__name__) class Repository(NamedTuple): """A GitHub repository at a specific ref.""" org: str repo: str ref: str | None select: str = "" ignore: str = "" exclude: str = "" # Generating fixes is slow and verbose show_fixes: bool = False @asynccontextmanager async def clone(self: Self, checkout_dir: Path) -> AsyncIterator[Path]: """Shallow clone this repository to a temporary directory.""" if checkout_dir.exists(): logger.debug(f"Reusing {self.org}/{self.repo}") yield Path(checkout_dir) return logger.debug(f"Cloning {self.org}/{self.repo}") git_command = [ "git", "clone", "--config", "advice.detachedHead=false", "--quiet", "--depth", "1", "--no-tags", ] if self.ref: git_command.extend(["--branch", self.ref]) git_command.extend( [ f"https://github.com/{self.org}/{self.repo}", checkout_dir, ], ) process = await create_subprocess_exec(*git_command) status_code = await process.wait() logger.debug( f"Finished cloning {self.org}/{self.repo} with status {status_code}", ) yield Path(checkout_dir) REPOSITORIES: list[Repository] = [ Repository("apache", "airflow", "main", select="ALL"), Repository("bokeh", "bokeh", "branch-3.2", select="ALL"), Repository("pypa", "build", "main"), Repository("pypa", "cibuildwheel", "main"), Repository("DisnakeDev", "disnake", "master"), Repository("scikit-build", "scikit-build", "main"), Repository("scikit-build", "scikit-build-core", "main"), Repository("python", "typeshed", "main", select="PYI"), Repository("zulip", "zulip", "main", select="ALL"), ] SUMMARY_LINE_RE = re.compile(r"^(Found \d+ error.*)|(.*potentially fixable with.*)$") class RuffError(Exception): """An error reported by ruff.""" async def check( *, ruff: Path, path: Path, name: str, select: str = "", ignore: str = "", exclude: str = "", show_fixes: bool = False, ) -> Sequence[str]: """Run the given ruff binary against the specified path.""" logger.debug(f"Checking {name} with {ruff}") ruff_args = ["check", "--no-cache", "--exit-zero"] if select: ruff_args.extend(["--select", select]) if ignore: ruff_args.extend(["--ignore", ignore]) if exclude: ruff_args.extend(["--exclude", exclude]) if show_fixes: ruff_args.extend(["--show-fixes", "--ecosystem-ci"]) start = time.time() proc = await create_subprocess_exec( ruff.absolute(), *ruff_args, ".", stdout=PIPE, stderr=PIPE, cwd=path, ) result, err = await proc.communicate() end = time.time() logger.debug(f"Finished checking {name} with {ruff} in {end - start:.2f}") if proc.returncode != 0: raise RuffError(err.decode("utf8")) lines = [ line for line in result.decode("utf8").splitlines() if not SUMMARY_LINE_RE.match(line) ] return sorted(lines) class Diff(NamedTuple): """A diff between two runs of ruff.""" removed: set[str] added: set[str] def __bool__(self: Self) -> bool: """Return true if this diff is non-empty.""" return bool(self.removed or self.added) def __iter__(self: Self) -> Iterator[str]: """Iterate through the changed lines in diff format.""" for line in heapq.merge(sorted(self.removed), sorted(self.added)): if line in self.removed: yield f"- {line}" else: yield f"+ {line}" async def compare( ruff1: Path, ruff2: Path, repo: Repository, checkouts: Path | None = None, ) -> Diff | None: """Check a specific repository against two versions of ruff.""" removed, added = set(), set() # Allows to keep the checkouts locations if checkouts: checkout_parent = checkouts.joinpath(repo.org) # Don't create the repodir itself, we need that for checking for existing # clones checkout_parent.mkdir(exist_ok=True, parents=True) location_context = nullcontext(checkout_parent) else: location_context = tempfile.TemporaryDirectory() with location_context as checkout_parent: checkout_dir = Path(checkout_parent).joinpath(repo.repo) async with repo.clone(checkout_dir) as path: try: async with asyncio.TaskGroup() as tg: check1 = tg.create_task( check( ruff=ruff1, path=path, name=f"{repo.org}/{repo.repo}", select=repo.select, ignore=repo.ignore, exclude=repo.exclude, show_fixes=repo.show_fixes, ), ) check2 = tg.create_task( check( ruff=ruff2, path=path, name=f"{repo.org}/{repo.repo}", select=repo.select, ignore=repo.ignore, exclude=repo.exclude, show_fixes=repo.show_fixes, ), ) except ExceptionGroup as e: raise e.exceptions[0] from e for line in difflib.ndiff(check1.result(), check2.result()): if line.startswith("- "): removed.add(line[2:]) elif line.startswith("+ "): added.add(line[2:]) return Diff(removed, added) def read_projects_jsonl(projects_jsonl: Path) -> dict[tuple[str, str], Repository]: """Read either of the two formats of https://github.com/akx/ruff-usage-aggregate.""" repositories = {} for line in projects_jsonl.read_text().splitlines(): data = json.loads(line) # Check the input format. if "items" in data: for item in data["items"]: # Pick only the easier case for now. if item["path"] != "pyproject.toml": continue repository = item["repository"] assert re.fullmatch(r"[a-zA-Z0-9_.-]+", repository["name"]), repository[ "name" ] # GitHub doesn't give us any branch or pure rev info. This would give # us the revision, but there's no way with git to just do # `git clone --depth 1` with a specific ref. # `ref = item["url"].split("?ref=")[1]` would be exact repositories[(repository["owner"], repository["repo"])] = Repository( repository["owner"]["login"], repository["name"], None, select=repository.get("select"), ignore=repository.get("ignore"), exclude=repository.get("exclude"), ) else: assert "owner" in data, "Unknown ruff-usage-aggregate format" # Pick only the easier case for now. if data["path"] != "pyproject.toml": continue repositories[(data["owner"], data["repo"])] = Repository( data["owner"], data["repo"], data.get("ref"), select=data.get("select"), ignore=data.get("ignore"), exclude=data.get("exclude"), ) return repositories async def main( *, ruff1: Path, ruff2: Path, projects_jsonl: Path | None, checkouts: Path | None = None, ) -> None: """Check two versions of ruff against a corpus of open-source code.""" if projects_jsonl: repositories = read_projects_jsonl(projects_jsonl) else: repositories = {(repo.org, repo.repo): repo for repo in REPOSITORIES} logger.debug(f"Checking {len(repositories)} projects") results = await asyncio.gather( *[compare(ruff1, ruff2, repo, checkouts) for repo in repositories.values()], return_exceptions=True, ) diffs = dict(zip(repositories, results, strict=True)) total_removed = total_added = 0 errors = 0 for diff in diffs.values(): if isinstance(diff, Exception): errors += 1 else: total_removed += len(diff.removed) total_added += len(diff.added) if total_removed == 0 and total_added == 0 and errors == 0: print("\u2705 ecosystem check detected no changes.") else: rule_changes: dict[str, tuple[int, int]] = {} changes = f"(+{total_added}, -{total_removed}, {errors} error(s))" print(f"\u2139\ufe0f ecosystem check **detected changes**. {changes}") print() for (org, repo), diff in diffs.items(): if isinstance(diff, Exception): changes = "error" print(f"
{repo} ({changes})") repo = repositories[(org, repo)] print( f"https://github.com/{repo.org}/{repo.repo} ref {repo.ref} " f"select {repo.select} ignore {repo.ignore} exclude {repo.exclude}", ) print("

") print() print("```") print(str(diff)) print("```") print() print("

") print("
") elif diff: changes = f"+{len(diff.added)}, -{len(diff.removed)}" print(f"
{repo} ({changes})") print("

") print() diff_str = "\n".join(diff) print("```diff") print(diff_str) print("```") print() print("

") print("
") # Count rule changes for line in diff_str.splitlines(): # Find rule change for current line or construction # + /::: matches = re.search(r": ([A-Z]{1,3}[0-9]{3,4})", line) if matches is None: # Handle case where there are no regex matches e.g. # + "?application=AIRFLOW&authenticator=TEST_AUTH&role=TEST_ROLE&warehouse=TEST_WAREHOUSE" # noqa: E501, ERA001 # Which was found in local testing continue rule_code = matches.group(1) # Get current additions and removals for this rule current_changes = rule_changes.get(rule_code, (0, 0)) # Check if addition or removal depending on the first character if line[0] == "+": current_changes = (current_changes[0] + 1, current_changes[1]) elif line[0] == "-": current_changes = (current_changes[0], current_changes[1] + 1) rule_changes[rule_code] = current_changes else: continue if len(rule_changes.keys()) > 0: print(f"Rules changed: {len(rule_changes.keys())}") print() print("| Rule | Changes | Additions | Removals |") print("| ---- | ------- | --------- | -------- |") for rule, (additions, removals) in sorted( rule_changes.items(), key=lambda x: (x[1][0] + x[1][1]), reverse=True, ): print(f"| {rule} | {additions + removals} | {additions} | {removals} |") logger.debug(f"Finished {len(repositories)} repositories") if __name__ == "__main__": parser = argparse.ArgumentParser( description="Check two versions of ruff against a corpus of open-source code.", epilog="scripts/check_ecosystem.py ", ) parser.add_argument( "--projects", type=Path, help=( "Optional JSON files to use over the default repositories. " "Supports both github_search_*.jsonl and known-github-tomls.jsonl." ), ) parser.add_argument( "--checkouts", type=Path, help=( "Location for the git checkouts, in case you want to save them" " (defaults to temporary directory)" ), ) parser.add_argument( "-v", "--verbose", action="store_true", help="Activate debug logging", ) parser.add_argument( "ruff1", type=Path, ) parser.add_argument( "ruff2", type=Path, ) args = parser.parse_args() if args.verbose: logging.basicConfig(level=logging.DEBUG) else: logging.basicConfig(level=logging.INFO) loop = asyncio.get_event_loop() main_task = asyncio.ensure_future( main( ruff1=args.ruff1, ruff2=args.ruff2, projects_jsonl=args.projects, checkouts=args.checkouts, ), ) # https://stackoverflow.com/a/58840987/3549270 for signal in [SIGINT, SIGTERM]: loop.add_signal_handler(signal, main_task.cancel) try: loop.run_until_complete(main_task) finally: loop.close()