refactor: replace isort, black and flake8 with ruff

This commit is contained in:
Juro Oravec 2025-08-18 10:19:03 +02:00
parent 8b9b93787f
commit 53a5804706
128 changed files with 3069 additions and 2594 deletions

View file

@ -35,19 +35,22 @@ Configuration:
See the code for more details and examples.
"""
# ruff: noqa: T201,BLE001,PTH118
import argparse
import os
import re
import requests
import sys
import time
from collections import defaultdict, deque
from dataclasses import dataclass
from pathlib import Path
from typing import DefaultDict, Deque, Dict, List, Tuple, Union
from typing import DefaultDict, Deque, Dict, List, Literal, Optional, Tuple, Union
from urllib.parse import urlparse
from bs4 import BeautifulSoup
import pathspec
import requests
from bs4 import BeautifulSoup
from django_components.util.misc import format_as_ascii_table
@ -77,7 +80,7 @@ IGNORED_PATHS = [
IGNORE_DOMAINS = [
"127.0.0.1",
"localhost",
"0.0.0.0",
"0.0.0.0", # noqa: S104
"example.com",
]
@ -112,9 +115,35 @@ URL_VALIDATOR_REGEX = re.compile(
)
@dataclass
class Link:
file: str
lineno: int
url: str
base_url: str # The URL without the fragment
fragment: Optional[str]
@dataclass
class LinkRewrite:
link: Link
new_url: str
mapping_key: Union[str, re.Pattern]
@dataclass
class LinkError:
link: Link
error_type: Literal["ERROR_FRAGMENT", "ERROR_HTTP", "ERROR_INVALID", "ERROR_OTHER"]
error_details: str
FetchedResults = Dict[str, Union[requests.Response, Exception, Literal["SKIPPED", "INVALID_URL"]]]
def is_binary_file(filepath: Path) -> bool:
try:
with open(filepath, "rb") as f:
with filepath.open("rb") as f:
chunk = f.read(1024)
if b"\0" in chunk:
return True
@ -127,7 +156,7 @@ def load_gitignore(root: Path) -> pathspec.PathSpec:
gitignore = root / ".gitignore"
patterns = []
if gitignore.exists():
with open(gitignore) as f:
with gitignore.open() as f:
patterns = f.read().splitlines()
# Add additional ignored paths
patterns += IGNORED_PATHS
@ -153,29 +182,33 @@ def find_files(root: Path, spec: pathspec.PathSpec) -> List[Path]:
# Extract URLs from a file
def extract_urls_from_file(filepath: Path) -> List[Tuple[str, int, str, str]]:
urls = []
def extract_links_from_file(filepath: Path) -> List[Link]:
urls: List[Link] = []
try:
with open(filepath, encoding="utf-8", errors="replace") as f:
with filepath.open(encoding="utf-8", errors="replace") as f:
for i, line in enumerate(f, 1):
for match in URL_REGEX.finditer(line):
url = match.group(0)
urls.append((str(filepath), i, line.rstrip(), url))
if "#" in url:
base_url, fragment = url.split("#", 1)
else:
base_url, fragment = url, None
urls.append(Link(file=str(filepath), lineno=i, url=url, base_url=base_url, fragment=fragment))
except Exception as e:
print(f"[WARN] Could not read {filepath}: {e}", file=sys.stderr)
return urls
def get_base_url(url: str) -> str:
"""Return the URL without the fragment."""
return url.split("#", 1)[0]
def pick_next_url(domains, domain_to_urls, last_request_time):
"""
Pick the next (domain, url) to fetch, respecting REQUEST_DELAY per domain.
Returns (domain, url) or None if all are on cooldown or empty.
"""
# We validate the links by fetching them, reaching the (potentially 3rd party) servers.
# This can be slow, because servers am have rate limiting policies.
# So we group the URLs by domain - URLs pointing to different domains can be
# fetched in parallel. This way we can spread the load over the domains, and avoid hitting the rate limits.
# This function picks the next URL to fetch, respecting the cooldown.
def pick_next_url(
domains: List[str],
domain_to_urls: Dict[str, Deque[str]],
last_request_time: Dict[str, float],
) -> Optional[Tuple[str, str]]:
now = time.time()
for domain in domains:
if not domain_to_urls[domain]:
@ -187,16 +220,23 @@ def pick_next_url(domains, domain_to_urls, last_request_time):
return None
def validate_urls(all_urls):
def fetch_urls(links: List[Link]) -> FetchedResults:
"""
For each unique base URL, make a GET request (with caching).
For each unique URL, make a GET request (with caching).
Print progress for each request (including cache hits).
If a URL is invalid, print a warning and skip fetching.
Skip URLs whose netloc matches IGNORE_DOMAINS.
Use round-robin scheduling per domain, with cooldown.
"""
url_cache: Dict[str, Union[requests.Response, Exception, str]] = {}
unique_base_urls = sorted(set(get_base_url(url) for _, _, _, url in all_urls))
all_url_results: FetchedResults = {}
unique_base_urls = set()
base_urls_with_fragments = set()
for link in links:
unique_base_urls.add(link.base_url)
if link.fragment:
base_urls_with_fragments.add(link.base_url)
base_urls = sorted(unique_base_urls) # Ensure consistency
# NOTE: Originally we fetched the URLs one after another. But the issue with this was that
# there is a few large domains like Github, MDN, Djagno docs, etc. And there's a lot of URLs
@ -208,10 +248,10 @@ def validate_urls(all_urls):
# Group URLs by domain
domain_to_urls: DefaultDict[str, Deque[str]] = defaultdict(deque)
for url in unique_base_urls:
for url in base_urls:
parsed = urlparse(url)
if parsed.hostname and any(parsed.hostname == d for d in IGNORE_DOMAINS):
url_cache[url] = "SKIPPED"
all_url_results[url] = "SKIPPED"
continue
domain_to_urls[parsed.netloc].append(url)
@ -236,37 +276,83 @@ def validate_urls(all_urls):
domain, url = pick
# Classify and fetch
if url in url_cache:
if url in all_url_results:
print(f"[done {done_count + 1}/{total_urls}] {url} (cache hit)")
done_count += 1
continue
if not URL_VALIDATOR_REGEX.match(url):
url_cache[url] = "INVALID_URL"
all_url_results[url] = "INVALID_URL"
print(f"[done {done_count + 1}/{total_urls}] {url} WARNING: Invalid URL format, not fetched.")
done_count += 1
continue
print(f"[done {done_count + 1}/{total_urls}] {url} ...", end=" ")
method = "GET" if url in base_urls_with_fragments else "HEAD"
print(f"[done {done_count + 1}/{total_urls}] {method:<4} {url} ...", end=" ")
try:
resp = requests.get(
url, timeout=REQUEST_TIMEOUT, headers={"User-Agent": "django-components-link-checker/0.1"}
)
url_cache[url] = resp
# If there is at least one URL that specifies a fragment in the URL,
# we will fetch the full HTML with GET.
# But if there isn't any, we can simply send HEAD request instead.
if method == "GET":
resp = requests.get(
url,
allow_redirects=True,
timeout=REQUEST_TIMEOUT,
headers={"User-Agent": "django-components-link-checker/0.1"},
)
else:
resp = requests.head(
url,
allow_redirects=True,
timeout=REQUEST_TIMEOUT,
headers={"User-Agent": "django-components-link-checker/0.1"},
)
all_url_results[url] = resp
print(f"{resp.status_code}")
except Exception as err:
url_cache[url] = err
all_url_results[url] = err
print(f"ERROR: {err}")
last_request_time[domain] = time.time()
done_count += 1
return url_cache
return all_url_results
def check_fragment_in_html(html: str, fragment: str) -> bool:
"""Return True if id=fragment exists in the HTML."""
print(f"Checking fragment {fragment} in HTML...")
soup = BeautifulSoup(html, "html.parser")
return bool(soup.find(id=fragment))
def rewrite_links(links: List[Link], files: List[Path], dry_run: bool) -> None:
# Group by file for efficient rewriting
file_to_lines: Dict[str, List[str]] = {}
for filepath in files:
try:
with filepath.open(encoding="utf-8", errors="replace") as f:
file_to_lines[str(filepath)] = f.readlines()
except Exception as e:
print(f"[WARN] Could not read {filepath}: {e}", file=sys.stderr)
continue
rewrites: List[LinkRewrite] = []
for link in links:
new_url, mapping_key = rewrite_url(link.url)
if not new_url or new_url == link.url or mapping_key is None:
continue
# Rewrite in memory, so we can have dry-run mode
lines = file_to_lines[link.file]
idx = link.lineno - 1
old_line = lines[idx]
new_line = old_line.replace(link.url, new_url)
if old_line != new_line:
lines[idx] = new_line
rewrites.append(LinkRewrite(link=link, new_url=new_url, mapping_key=mapping_key))
# Write back or dry-run
if dry_run:
for rewrite in rewrites:
print(f"[DRY-RUN] {rewrite.link.file}#{rewrite.link.lineno}: {rewrite.link.url} -> {rewrite.new_url}")
else:
for rewrite in rewrites:
# Write only once per file
lines = file_to_lines[rewrite.link.file]
Path(rewrite.link.file).write_text("".join(lines), encoding="utf-8")
print(f"[REWRITE] {rewrite.link.file}#{rewrite.link.lineno}: {rewrite.link.url} -> {rewrite.new_url}")
def rewrite_url(url: str) -> Union[Tuple[None, None], Tuple[str, Union[str, re.Pattern]]]:
@ -279,16 +365,82 @@ def rewrite_url(url: str) -> Union[Tuple[None, None], Tuple[str, Union[str, re.P
if key.search(url):
return key.sub(repl, url), key
else:
raise ValueError(f"Invalid key type: {type(key)}")
raise TypeError(f"Invalid key type: {type(key)}")
return None, None
def output_summary(errors: List[Tuple[str, int, str, str, str]], output: str):
def check_links_for_errors(all_urls: List[Link], all_url_results: FetchedResults) -> List[LinkError]:
errors: List[LinkError] = []
for link in all_urls:
cache_val = all_url_results.get(link.base_url)
if cache_val == "SKIPPED":
continue
if cache_val == "INVALID_URL":
link_error = LinkError(link=link, error_type="ERROR_INVALID", error_details="Invalid URL format")
errors.append(link_error)
continue
if isinstance(cache_val, Exception):
link_error = LinkError(link=link, error_type="ERROR_OTHER", error_details=str(cache_val))
errors.append(link_error)
continue
if isinstance(cache_val, requests.Response):
# Error response
if hasattr(cache_val, "status_code") and getattr(cache_val, "status_code", 0) != 200:
link_error = LinkError(
link=link,
error_type="ERROR_HTTP",
error_details=f"Status {getattr(cache_val, 'status_code', '?')}",
)
errors.append(link_error)
continue
# Success response
if cache_val and hasattr(cache_val, "text") and link.fragment:
content_type = cache_val.headers.get("Content-Type", "")
if "html" not in content_type:
# The specified URL does NOT point to an HTML page, so the fragment is not valid.
link_error = LinkError(link=link, error_type="ERROR_FRAGMENT", error_details="Not HTML content")
errors.append(link_error)
continue
fragment_in_html = check_fragment_in_html(cache_val.text, link.fragment)
if not fragment_in_html:
# The specified URL points to an HTML page, but the fragment is not valid.
link_error = LinkError(
link=link,
error_type="ERROR_FRAGMENT",
error_details=f"Fragment '#{link.fragment}' not found",
)
errors.append(link_error)
continue
else:
raise TypeError(f"Unknown cache value type: {type(cache_val)}")
return errors
def check_fragment_in_html(html: str, fragment: str) -> bool:
"""Return True if id=fragment exists in the HTML."""
print(f"Checking fragment {fragment} in HTML...")
soup = BeautifulSoup(html, "html.parser")
return bool(soup.find(id=fragment))
def output_summary(errors: List[LinkError], output: Optional[str]) -> None:
# Format the errors into a table
headers = ["Type", "Details", "File", "URL"]
data = [
{"File": file + "#" + str(lineno), "Type": errtype, "URL": url, "Details": details}
for file, lineno, errtype, url, details in errors
{
"File": link_error.link.file + "#" + str(link_error.link.lineno),
"Type": link_error.error_type,
"URL": link_error.link.url,
"Details": link_error.error_details,
}
for link_error in errors
]
table = format_as_ascii_table(data, headers, include_headers=True)
@ -300,106 +452,59 @@ def output_summary(errors: List[Tuple[str, int, str, str, str]], output: str):
print(table + "\n")
# TODO: Run this as a test in CI?
# NOTE: At v0.140 there was ~800 URL instances total, ~300 unique URLs, and the script took 4 min.
def main():
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Validate links and fragments in the codebase.")
parser.add_argument(
"-o", "--output", type=str, help="Output summary table to file (suppress stdout except errors)"
"-o",
"--output",
type=str,
help="Output summary table to file (suppress stdout except errors)",
)
parser.add_argument("--rewrite", action="store_true", help="Rewrite URLs using URL_REWRITE_MAP and update files")
parser.add_argument(
"--dry-run", action="store_true", help="Show what would be changed by --rewrite, but do not write files"
"--dry-run",
action="store_true",
help="Show what would be changed by --rewrite, but do not write files",
)
args = parser.parse_args()
return parser.parse_args()
root = Path(os.getcwd())
# TODO: Run this as a test in CI?
# NOTE: At v0.140 there was ~800 URL instances total, ~300 unique URLs, and the script took 4 min.
def main() -> None:
args = parse_args()
# Find all relevant files
root = Path.cwd()
spec = load_gitignore(root)
files = find_files(root, spec)
print(f"Scanning {len(files)} files...")
all_urls: List[Tuple[str, int, str, str]] = []
for f in files:
if is_binary_file(f):
# Find links in those files
all_links: List[Link] = []
for filepath in files:
if is_binary_file(filepath):
continue
all_urls.extend(extract_urls_from_file(f))
all_links.extend(extract_links_from_file(filepath))
# HTTP request and caching step
url_cache = validate_urls(all_urls)
# --- URL rewriting logic ---
# Rewrite links in those files if requested
if args.rewrite:
# Group by file for efficient rewriting
file_to_lines: Dict[str, List[str]] = {}
for f in files:
try:
with open(f, encoding="utf-8", errors="replace") as fh:
file_to_lines[str(f)] = fh.readlines()
except Exception:
continue
rewrites = []
for file, lineno, line, url in all_urls:
new_url, mapping_key = rewrite_url(url)
if not new_url or new_url == url:
continue
# Rewrite in memory, so we can have dry-run mode
lines = file_to_lines[file]
idx = lineno - 1
old_line = lines[idx]
new_line = old_line.replace(url, new_url)
if old_line != new_line:
lines[idx] = new_line
rewrites.append((file, lineno, url, new_url, mapping_key))
# Write back or dry-run
if args.dry_run:
for file, lineno, old, new, _ in rewrites:
print(f"[DRY-RUN] {file}#{lineno}: {old} -> {new}")
else:
for file, _, _, _, _ in rewrites:
# Write only once per file
lines = file_to_lines[file]
Path(file).write_text("".join(lines), encoding="utf-8")
for file, lineno, old, new, _ in rewrites:
print(f"[REWRITE] {file}#{lineno}: {old} -> {new}")
rewrite_links(all_links, files, dry_run=args.dry_run)
return # After rewriting, skip error reporting
# --- Categorize the results / errors ---
errors = []
for file, lineno, line, url in all_urls:
base_url = get_base_url(url)
fragment = url.split("#", 1)[1] if "#" in url else None
cache_val = url_cache.get(base_url)
if cache_val == "SKIPPED":
continue
elif cache_val == "INVALID_URL":
errors.append((file, lineno, "INVALID", url, "Invalid URL format"))
continue
elif isinstance(cache_val, Exception):
errors.append((file, lineno, "ERROR", url, str(cache_val)))
continue
elif hasattr(cache_val, "status_code") and getattr(cache_val, "status_code", 0) != 200:
errors.append((file, lineno, "ERROR_HTTP", url, f"Status {getattr(cache_val, 'status_code', '?')}"))
continue
elif fragment and hasattr(cache_val, "text"):
content_type = cache_val.headers.get("Content-Type", "")
if "html" not in content_type:
errors.append((file, lineno, "ERROR_FRAGMENT", url, "Not HTML content"))
continue
if not check_fragment_in_html(cache_val.text, fragment):
errors.append((file, lineno, "ERROR_FRAGMENT", url, f"Fragment '#{fragment}' not found"))
# Otherwise proceed to validation of the URLs and fragments
# by first fetching the HTTP requests.
all_url_results = fetch_urls(all_links)
# After everything's fetched, check for errors.
errors = check_links_for_errors(all_links, all_url_results)
if not errors:
print("\nAll links and fragments are valid!")
return
# Format the errors into a table
output_summary(errors, args.output)
output_summary(errors, args.output or None)
if __name__ == "__main__":