mirror of
https://github.com/django-components/django-components.git
synced 2025-07-08 01:45:00 +00:00
chore: util to manage URLs in the codebase (#1179)
* chore: util to manage URLs in the codebase * docs: mentiion validate_links and supported_versions in docs * refactor: fix linter errors
This commit is contained in:
parent
5f4fbe76e5
commit
ccf02fa316
67 changed files with 678 additions and 309 deletions
406
scripts/validate_links.py
Normal file
406
scripts/validate_links.py
Normal file
|
@ -0,0 +1,406 @@
|
|||
"""
|
||||
validate_links.py - URL checker and rewriter for the codebase.
|
||||
|
||||
This script scans all files in the repository (respecting .gitignore and IGNORED_PATHS),
|
||||
finds all URLs, validates them (including checking for HTML fragments), and can optionally
|
||||
rewrite URLs in-place using a configurable mapping.
|
||||
|
||||
Features:
|
||||
- Finds all URLs in code, markdown, and docstrings.
|
||||
- Validates URLs by making GET requests (with caching and rate limiting).
|
||||
- Uses BeautifulSoup to check for HTML fragments (e.g., #section) in the target page.
|
||||
- Outputs a summary table of all issues (invalid, broken, missing fragment, etc).
|
||||
- Can output the summary table to a file with `-o`/`--output`.
|
||||
- Can rewrite URLs in-place using URL_REWRITE_MAP (supports both prefix and regex mapping).
|
||||
- Supports dry-run mode for rewrites with `--dry-run`.
|
||||
|
||||
Usage:
|
||||
|
||||
# Validate all links and print summary to stdout
|
||||
python scripts/validate_links.py
|
||||
|
||||
# Output summary table to a file
|
||||
python scripts/validate_links.py -o link_report.txt
|
||||
|
||||
# Rewrite URLs using URL_REWRITE_MAP (in-place)
|
||||
python scripts/validate_links.py --rewrite
|
||||
|
||||
# Show what would be rewritten, but do not write files
|
||||
python scripts/validate_links.py --rewrite --dry-run
|
||||
|
||||
Configuration:
|
||||
- IGNORED_PATHS: List of files/dirs to skip (in addition to .gitignore)
|
||||
- URL_REWRITE_MAP: Dict of {prefix or regex: replacement} for rewriting URLs
|
||||
|
||||
See the code for more details and examples.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import re
|
||||
import requests
|
||||
import sys
|
||||
import time
|
||||
from collections import defaultdict, deque
|
||||
from pathlib import Path
|
||||
from typing import DefaultDict, Deque, Dict, List, Tuple, Union
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from bs4 import BeautifulSoup
|
||||
import pathspec
|
||||
|
||||
from django_components.util.misc import format_as_ascii_table
|
||||
|
||||
# This script relies on .gitignore to know which files to search for URLs,
|
||||
# and which files to ignore.
|
||||
#
|
||||
# If there are files / dirs that you need to ignore, but they are not (or cannot be)
|
||||
# included in .gitignore, you can add them here.
|
||||
IGNORED_PATHS = [
|
||||
"package-lock.json",
|
||||
"package.json",
|
||||
"yarn.lock",
|
||||
"mdn_complete_page.html",
|
||||
"supported_versions.py",
|
||||
# Ignore auto-generated files
|
||||
"node_modules",
|
||||
"node_modules/",
|
||||
".asv/",
|
||||
"__snapshots__/",
|
||||
"docs/benchmarks/",
|
||||
".git/",
|
||||
"*.min.js",
|
||||
"*.min.css",
|
||||
]
|
||||
|
||||
# Domains that are not real and should be ignored.
|
||||
IGNORE_DOMAINS = [
|
||||
"127.0.0.1",
|
||||
"localhost",
|
||||
"0.0.0.0",
|
||||
"example.com",
|
||||
]
|
||||
|
||||
# This allows us to rewrite URLs across the codebase.
|
||||
# - If key is a str, it's a prefix and the value is the new prefix.
|
||||
# - If key is a re.Pattern, it's a regex and the value is the replacement string.
|
||||
URL_REWRITE_MAP: Dict[Union[str, re.Pattern], str] = {
|
||||
# Example with regex and capture groups
|
||||
# re.compile(r"https://github.com/old-org/([^/]+)/"): r"https://github.com/new-org/\1/",
|
||||
# Update all Django docs URLs to 5.2
|
||||
re.compile(r"https://docs.djangoproject.com/en/([^/]+)/"): "https://docs.djangoproject.com/en/5.2/",
|
||||
}
|
||||
|
||||
|
||||
REQUEST_TIMEOUT = 8 # seconds
|
||||
REQUEST_DELAY = 0.5 # seconds between requests
|
||||
|
||||
|
||||
# Simple regex for URLs to scan for
|
||||
URL_REGEX = re.compile(r'https?://[^\s\'"\)\]]+')
|
||||
|
||||
# Detailed regex for URLs to validate
|
||||
# See https://stackoverflow.com/a/7160778/9788634
|
||||
URL_VALIDATOR_REGEX = re.compile(
|
||||
r"^(?:http|ftp)s?://" # http:// or https://
|
||||
r"(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+(?:[A-Z]{2,6}\.?|[A-Z0-9-]{2,}\.?)|" # domain...
|
||||
r"localhost|" # localhost...
|
||||
r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})" # ...or ip
|
||||
r"(?::\d+)?" # optional port
|
||||
r"(?:/?|[/?]\S+)$",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
|
||||
|
||||
def is_binary_file(filepath: Path) -> bool:
|
||||
try:
|
||||
with open(filepath, "rb") as f:
|
||||
chunk = f.read(1024)
|
||||
if b"\0" in chunk:
|
||||
return True
|
||||
except Exception:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def load_gitignore(root: Path) -> pathspec.PathSpec:
|
||||
gitignore = root / ".gitignore"
|
||||
patterns = []
|
||||
if gitignore.exists():
|
||||
with open(gitignore) as f:
|
||||
patterns = f.read().splitlines()
|
||||
# Add additional ignored paths
|
||||
patterns += IGNORED_PATHS
|
||||
return pathspec.PathSpec.from_lines("gitwildmatch", patterns)
|
||||
|
||||
|
||||
# Recursively find all files not ignored by .gitignore
|
||||
def find_files(root: Path, spec: pathspec.PathSpec) -> List[Path]:
|
||||
files = []
|
||||
for dirpath, dirnames, filenames in os.walk(root):
|
||||
# Remove ignored dirs in-place
|
||||
rel_dir = os.path.relpath(dirpath, root)
|
||||
if rel_dir == ".":
|
||||
rel_dir = ""
|
||||
ignored_dirs = [d for d in dirnames if spec.match_file(os.path.join(rel_dir, d))]
|
||||
for d in ignored_dirs:
|
||||
dirnames.remove(d)
|
||||
for filename in filenames:
|
||||
rel_file = os.path.join(rel_dir, filename)
|
||||
if not spec.match_file(rel_file):
|
||||
files.append(Path(dirpath) / filename)
|
||||
return files
|
||||
|
||||
|
||||
# Extract URLs from a file
|
||||
def extract_urls_from_file(filepath: Path) -> List[Tuple[str, int, str, str]]:
|
||||
urls = []
|
||||
try:
|
||||
with open(filepath, encoding="utf-8", errors="replace") as f:
|
||||
for i, line in enumerate(f, 1):
|
||||
for match in URL_REGEX.finditer(line):
|
||||
url = match.group(0)
|
||||
urls.append((str(filepath), i, line.rstrip(), url))
|
||||
except Exception as e:
|
||||
print(f"[WARN] Could not read {filepath}: {e}", file=sys.stderr)
|
||||
return urls
|
||||
|
||||
|
||||
def get_base_url(url: str) -> str:
|
||||
"""Return the URL without the fragment."""
|
||||
return url.split("#", 1)[0]
|
||||
|
||||
|
||||
def pick_next_url(domains, domain_to_urls, last_request_time):
|
||||
"""
|
||||
Pick the next (domain, url) to fetch, respecting REQUEST_DELAY per domain.
|
||||
Returns (domain, url) or None if all are on cooldown or empty.
|
||||
"""
|
||||
now = time.time()
|
||||
for domain in domains:
|
||||
if not domain_to_urls[domain]:
|
||||
continue
|
||||
since_last = now - last_request_time[domain]
|
||||
if since_last >= REQUEST_DELAY:
|
||||
url = domain_to_urls[domain].popleft()
|
||||
return domain, url
|
||||
return None
|
||||
|
||||
|
||||
def validate_urls(all_urls):
|
||||
"""
|
||||
For each unique base URL, make a GET request (with caching).
|
||||
Print progress for each request (including cache hits).
|
||||
If a URL is invalid, print a warning and skip fetching.
|
||||
Skip URLs whose netloc matches IGNORE_DOMAINS.
|
||||
Use round-robin scheduling per domain, with cooldown.
|
||||
"""
|
||||
url_cache: Dict[str, Union[requests.Response, Exception, str]] = {}
|
||||
unique_base_urls = sorted(set(get_base_url(url) for _, _, _, url in all_urls))
|
||||
|
||||
# NOTE: Originally we fetched the URLs one after another. But the issue with this was that
|
||||
# there is a few large domains like Github, MDN, Djagno docs, etc. And there's a lot of URLs
|
||||
# point to them. So we ended up with a lot of 429 errors.
|
||||
#
|
||||
# The current approach is to group the URLs by domain, and then fetch them in parallel,
|
||||
# preferentially fetching from domains with most URLs (if not on cooldown).
|
||||
# This way we can spread the load over the domains, and avoid hitting the rate limits.
|
||||
|
||||
# Group URLs by domain
|
||||
domain_to_urls: DefaultDict[str, Deque[str]] = defaultdict(deque)
|
||||
for url in unique_base_urls:
|
||||
parsed = urlparse(url)
|
||||
if parsed.hostname and any(parsed.hostname == d for d in IGNORE_DOMAINS):
|
||||
url_cache[url] = "SKIPPED"
|
||||
continue
|
||||
domain_to_urls[parsed.netloc].append(url)
|
||||
|
||||
# Sort domains by number of URLs (descending)
|
||||
domains = sorted(domain_to_urls, key=lambda d: -len(domain_to_urls[d]))
|
||||
last_request_time = {domain: 0.0 for domain in domains}
|
||||
total_urls = sum(len(q) for q in domain_to_urls.values())
|
||||
done_count = 0
|
||||
|
||||
print(f"\nValidating {total_urls} unique base URLs (round-robin by domain)...")
|
||||
while any(domain_to_urls.values()):
|
||||
pick = pick_next_url(domains, domain_to_urls, last_request_time)
|
||||
if pick is None:
|
||||
# All domains are on cooldown, sleep until the soonest one is ready
|
||||
soonest = min(
|
||||
(last_request_time[d] + REQUEST_DELAY for d in domains if domain_to_urls[d]),
|
||||
default=time.time() + REQUEST_DELAY,
|
||||
)
|
||||
sleep_time = max(soonest - time.time(), 0.05)
|
||||
time.sleep(sleep_time)
|
||||
continue
|
||||
domain, url = pick
|
||||
|
||||
# Classify and fetch
|
||||
if url in url_cache:
|
||||
print(f"[done {done_count + 1}/{total_urls}] {url} (cache hit)")
|
||||
done_count += 1
|
||||
continue
|
||||
if not URL_VALIDATOR_REGEX.match(url):
|
||||
url_cache[url] = "INVALID_URL"
|
||||
print(f"[done {done_count + 1}/{total_urls}] {url} WARNING: Invalid URL format, not fetched.")
|
||||
done_count += 1
|
||||
continue
|
||||
|
||||
print(f"[done {done_count + 1}/{total_urls}] {url} ...", end=" ")
|
||||
try:
|
||||
resp = requests.get(
|
||||
url, timeout=REQUEST_TIMEOUT, headers={"User-Agent": "django-components-link-checker/0.1"}
|
||||
)
|
||||
url_cache[url] = resp
|
||||
print(f"{resp.status_code}")
|
||||
except Exception as err:
|
||||
url_cache[url] = err
|
||||
print(f"ERROR: {err}")
|
||||
|
||||
last_request_time[domain] = time.time()
|
||||
done_count += 1
|
||||
return url_cache
|
||||
|
||||
|
||||
def check_fragment_in_html(html: str, fragment: str) -> bool:
|
||||
"""Return True if id=fragment exists in the HTML."""
|
||||
print(f"Checking fragment {fragment} in HTML...")
|
||||
soup = BeautifulSoup(html, "html.parser")
|
||||
return bool(soup.find(id=fragment))
|
||||
|
||||
|
||||
def rewrite_url(url: str) -> Union[Tuple[None, None], Tuple[str, Union[str, re.Pattern]]]:
|
||||
"""Return (new_url, mapping_key) if a mapping applies, else (None, None)."""
|
||||
for key, repl in URL_REWRITE_MAP.items():
|
||||
if isinstance(key, str):
|
||||
if url.startswith(key):
|
||||
return url.replace(key, repl, 1), key
|
||||
elif isinstance(key, re.Pattern):
|
||||
if key.search(url):
|
||||
return key.sub(repl, url), key
|
||||
else:
|
||||
raise ValueError(f"Invalid key type: {type(key)}")
|
||||
return None, None
|
||||
|
||||
|
||||
def output_summary(errors: List[Tuple[str, int, str, str, str]], output: str):
|
||||
# Format the errors into a table
|
||||
headers = ["Type", "Details", "File", "URL"]
|
||||
data = [
|
||||
{"File": file + "#" + str(lineno), "Type": errtype, "URL": url, "Details": details}
|
||||
for file, lineno, errtype, url, details in errors
|
||||
]
|
||||
table = format_as_ascii_table(data, headers, include_headers=True)
|
||||
|
||||
# Output summary to file if specified
|
||||
if output:
|
||||
output_path = Path(output)
|
||||
output_path.write_text(table + "\n", encoding="utf-8")
|
||||
else:
|
||||
print(table + "\n")
|
||||
|
||||
|
||||
# TODO: Run this as a test in CI?
|
||||
# NOTE: At v0.140 there was ~800 URL instances total, ~300 unique URLs, and the script took 4 min.
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Validate links and fragments in the codebase.")
|
||||
parser.add_argument(
|
||||
"-o", "--output", type=str, help="Output summary table to file (suppress stdout except errors)"
|
||||
)
|
||||
parser.add_argument("--rewrite", action="store_true", help="Rewrite URLs using URL_REWRITE_MAP and update files")
|
||||
parser.add_argument(
|
||||
"--dry-run", action="store_true", help="Show what would be changed by --rewrite, but do not write files"
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
root = Path(os.getcwd())
|
||||
spec = load_gitignore(root)
|
||||
|
||||
files = find_files(root, spec)
|
||||
print(f"Scanning {len(files)} files...")
|
||||
|
||||
all_urls: List[Tuple[str, int, str, str]] = []
|
||||
for f in files:
|
||||
if is_binary_file(f):
|
||||
continue
|
||||
all_urls.extend(extract_urls_from_file(f))
|
||||
|
||||
# HTTP request and caching step
|
||||
url_cache = validate_urls(all_urls)
|
||||
|
||||
# --- URL rewriting logic ---
|
||||
if args.rewrite:
|
||||
# Group by file for efficient rewriting
|
||||
file_to_lines: Dict[str, List[str]] = {}
|
||||
for f in files:
|
||||
try:
|
||||
with open(f, encoding="utf-8", errors="replace") as fh:
|
||||
file_to_lines[str(f)] = fh.readlines()
|
||||
except Exception:
|
||||
continue
|
||||
|
||||
rewrites = []
|
||||
for file, lineno, line, url in all_urls:
|
||||
new_url, mapping_key = rewrite_url(url)
|
||||
if not new_url or new_url == url:
|
||||
continue
|
||||
|
||||
# Rewrite in memory, so we can have dry-run mode
|
||||
lines = file_to_lines[file]
|
||||
idx = lineno - 1
|
||||
old_line = lines[idx]
|
||||
new_line = old_line.replace(url, new_url)
|
||||
if old_line != new_line:
|
||||
lines[idx] = new_line
|
||||
rewrites.append((file, lineno, url, new_url, mapping_key))
|
||||
|
||||
# Write back or dry-run
|
||||
if args.dry_run:
|
||||
for file, lineno, old, new, _ in rewrites:
|
||||
print(f"[DRY-RUN] {file}#{lineno}: {old} -> {new}")
|
||||
else:
|
||||
for file, _, _, _, _ in rewrites:
|
||||
# Write only once per file
|
||||
lines = file_to_lines[file]
|
||||
Path(file).write_text("".join(lines), encoding="utf-8")
|
||||
for file, lineno, old, new, _ in rewrites:
|
||||
print(f"[REWRITE] {file}#{lineno}: {old} -> {new}")
|
||||
|
||||
return # After rewriting, skip error reporting
|
||||
|
||||
# --- Categorize the results / errors ---
|
||||
errors = []
|
||||
for file, lineno, line, url in all_urls:
|
||||
base_url = get_base_url(url)
|
||||
fragment = url.split("#", 1)[1] if "#" in url else None
|
||||
cache_val = url_cache.get(base_url)
|
||||
|
||||
if cache_val == "SKIPPED":
|
||||
continue
|
||||
elif cache_val == "INVALID_URL":
|
||||
errors.append((file, lineno, "INVALID", url, "Invalid URL format"))
|
||||
continue
|
||||
elif isinstance(cache_val, Exception):
|
||||
errors.append((file, lineno, "ERROR", url, str(cache_val)))
|
||||
continue
|
||||
elif hasattr(cache_val, "status_code") and getattr(cache_val, "status_code", 0) != 200:
|
||||
errors.append((file, lineno, "ERROR_HTTP", url, f"Status {getattr(cache_val, 'status_code', '?')}"))
|
||||
continue
|
||||
elif fragment and hasattr(cache_val, "text"):
|
||||
content_type = cache_val.headers.get("Content-Type", "")
|
||||
if "html" not in content_type:
|
||||
errors.append((file, lineno, "ERROR_FRAGMENT", url, "Not HTML content"))
|
||||
continue
|
||||
if not check_fragment_in_html(cache_val.text, fragment):
|
||||
errors.append((file, lineno, "ERROR_FRAGMENT", url, f"Fragment '#{fragment}' not found"))
|
||||
|
||||
if not errors:
|
||||
print("\nAll links and fragments are valid!")
|
||||
return
|
||||
|
||||
# Format the errors into a table
|
||||
output_summary(errors, args.output)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
Loading…
Add table
Add a link
Reference in a new issue