use std::borrow::Cow; use std::path::Path; use anyhow::{Result, anyhow}; use colored::Colorize; use itertools::Itertools; use ruff_python_parser::semantic_errors::SemanticSyntaxError; use rustc_hash::FxBuildHasher; use ruff_db::diagnostic::{Diagnostic, SecondaryCode}; use ruff_notebook::Notebook; use ruff_python_ast::{ModModule, PySourceType, PythonVersion}; use ruff_python_codegen::Stylist; use ruff_python_index::Indexer; use ruff_python_parser::{ParseError, ParseOptions, Parsed, UnsupportedSyntaxError}; use ruff_source_file::SourceFile; use crate::checkers::ast::{LintContext, check_ast}; use crate::checkers::filesystem::check_file_path; use crate::checkers::imports::check_imports; use crate::checkers::noqa::check_noqa; use crate::checkers::physical_lines::check_physical_lines; use crate::checkers::tokens::check_tokens; use crate::directives::Directives; use crate::doc_lines::{doc_lines_from_ast, doc_lines_from_tokens}; use crate::fix::{FixResult, fix_file}; use crate::message::create_syntax_error_diagnostic; use crate::noqa::add_noqa; use crate::package::PackageRoot; use crate::preview::is_py314_support_enabled; use crate::registry::Rule; #[cfg(any(feature = "test-rules", test))] use crate::rules::ruff::rules::test_rules::{self, TEST_RULES, TestRule}; use crate::settings::types::UnsafeFixes; use crate::settings::{LinterSettings, TargetVersion, flags}; use crate::source_kind::SourceKind; use crate::{Locator, directives, fs, warn_user_once}; pub(crate) mod float; pub struct LinterResult { /// A collection of diagnostic messages generated by the linter. pub diagnostics: Vec, /// Flag indicating that the parsed source code does not contain any /// [`ParseError`]s has_valid_syntax: bool, /// Flag indicating that the parsed source code does not contain any [`ParseError`]s, /// [`UnsupportedSyntaxError`]s, or [`SemanticSyntaxError`]s. has_no_syntax_errors: bool, } impl LinterResult { /// Returns `true` if the parsed source code contains any [`ParseError`]s *or* /// [`UnsupportedSyntaxError`]s. /// /// See [`LinterResult::has_invalid_syntax`] for a version specific to [`ParseError`]s. pub fn has_syntax_errors(&self) -> bool { !self.has_no_syntax_errors() } /// Returns `true` if the parsed source code does not contain any [`ParseError`]s *or* /// [`UnsupportedSyntaxError`]s. /// /// See [`LinterResult::has_valid_syntax`] for a version specific to [`ParseError`]s. pub fn has_no_syntax_errors(&self) -> bool { self.has_valid_syntax() && self.has_no_syntax_errors } /// Returns `true` if the parsed source code is valid i.e., it has no [`ParseError`]s. /// /// Note that this does not include version-related [`UnsupportedSyntaxError`]s. /// /// See [`LinterResult::has_no_syntax_errors`] for a version that takes these into account. pub fn has_valid_syntax(&self) -> bool { self.has_valid_syntax } /// Returns `true` if the parsed source code is invalid i.e., it has [`ParseError`]s. /// /// Note that this does not include version-related [`UnsupportedSyntaxError`]s. /// /// See [`LinterResult::has_no_syntax_errors`] for a version that takes these into account. pub fn has_invalid_syntax(&self) -> bool { !self.has_valid_syntax() } } #[derive(Debug, Default, PartialEq)] struct FixCount { rule_name: &'static str, count: usize, } /// A mapping from a noqa code to the corresponding lint name and a count of applied fixes. #[derive(Debug, Default, PartialEq)] pub struct FixTable(hashbrown::HashMap); impl FixTable { pub fn counts(&self) -> impl Iterator { self.0.values().map(|fc| fc.count) } pub fn entry<'a>(&'a mut self, code: &'a SecondaryCode) -> FixTableEntry<'a> { FixTableEntry(self.0.entry_ref(code)) } pub fn iter(&self) -> impl Iterator { self.0 .iter() .map(|(code, FixCount { rule_name, count })| (code, *rule_name, *count)) } pub fn keys(&self) -> impl Iterator { self.0.keys() } pub fn is_empty(&self) -> bool { self.0.is_empty() } } pub struct FixTableEntry<'a>( hashbrown::hash_map::EntryRef<'a, 'a, SecondaryCode, SecondaryCode, FixCount, FxBuildHasher>, ); impl<'a> FixTableEntry<'a> { pub fn or_default(self, rule_name: &'static str) -> &'a mut usize { &mut (self .0 .or_insert(FixCount { rule_name, count: 0, }) .count) } } pub struct FixerResult<'a> { /// The result returned by the linter, after applying any fixes. pub result: LinterResult, /// The resulting source code, after applying any fixes. pub transformed: Cow<'a, SourceKind>, /// The number of fixes applied for each [`Rule`]. pub fixed: FixTable, } /// Generate [`Diagnostic`]s from the source code contents at the given `Path`. #[expect(clippy::too_many_arguments)] pub fn check_path( path: &Path, package: Option>, locator: &Locator, stylist: &Stylist, indexer: &Indexer, directives: &Directives, settings: &LinterSettings, noqa: flags::Noqa, source_kind: &SourceKind, source_type: PySourceType, parsed: &Parsed, target_version: TargetVersion, ) -> Vec { // Aggregate all diagnostics. let mut context = LintContext::new(path, locator.contents(), settings); // Aggregate all semantic syntax errors. let mut semantic_syntax_errors = vec![]; let tokens = parsed.tokens(); let comment_ranges = indexer.comment_ranges(); // Collect doc lines. This requires a rare mix of tokens (for comments) and AST // (for docstrings), which demands special-casing at this level. let use_doc_lines = context.is_rule_enabled(Rule::DocLineTooLong); let mut doc_lines = vec![]; if use_doc_lines { doc_lines.extend(doc_lines_from_tokens(tokens)); } // Run the token-based rules. if context .iter_enabled_rules() .any(|rule_code| rule_code.lint_source().is_tokens()) { check_tokens( tokens, path, locator, indexer, stylist, source_type, source_kind.as_ipy_notebook().map(Notebook::cell_offsets), &mut context, ); } // Run the filesystem-based rules. if context .iter_enabled_rules() .any(|rule_code| rule_code.lint_source().is_filesystem()) { check_file_path( path, package, locator, comment_ranges, settings, target_version.linter_version(), &context, ); } // Run the logical line-based rules. if context .iter_enabled_rules() .any(|rule_code| rule_code.lint_source().is_logical_lines()) { crate::checkers::logical_lines::check_logical_lines( tokens, locator, indexer, stylist, settings, &context, ); } // Run the AST-based rules only if there are no syntax errors. if parsed.has_valid_syntax() { let cell_offsets = source_kind.as_ipy_notebook().map(Notebook::cell_offsets); let notebook_index = source_kind.as_ipy_notebook().map(Notebook::index); semantic_syntax_errors.extend(check_ast( parsed, locator, stylist, indexer, &directives.noqa_line_for, settings, noqa, path, package, source_type, cell_offsets, notebook_index, target_version, &context, )); let use_imports = !directives.isort.skip_file && context .iter_enabled_rules() .any(|rule_code| rule_code.lint_source().is_imports()); if use_imports || use_doc_lines { if use_imports { check_imports( parsed, locator, indexer, &directives.isort, settings, stylist, package, source_type, cell_offsets, target_version.linter_version(), &context, ); } if use_doc_lines { doc_lines.extend(doc_lines_from_ast(parsed.suite(), locator)); } } } // Deduplicate and reorder any doc lines. if use_doc_lines { doc_lines.sort_unstable(); doc_lines.dedup(); } // Run the lines-based rules. if context .iter_enabled_rules() .any(|rule_code| rule_code.lint_source().is_physical_lines()) { check_physical_lines(locator, stylist, indexer, &doc_lines, settings, &context); } // Raise violations for internal test rules #[cfg(any(feature = "test-rules", test))] { for test_rule in TEST_RULES { if !context.is_rule_enabled(*test_rule) { continue; } match test_rule { Rule::StableTestRule => { test_rules::StableTestRule::diagnostic(locator, comment_ranges, &context); } Rule::StableTestRuleSafeFix => { test_rules::StableTestRuleSafeFix::diagnostic( locator, comment_ranges, &context, ); } Rule::StableTestRuleUnsafeFix => test_rules::StableTestRuleUnsafeFix::diagnostic( locator, comment_ranges, &context, ), Rule::StableTestRuleDisplayOnlyFix => { test_rules::StableTestRuleDisplayOnlyFix::diagnostic( locator, comment_ranges, &context, ); } Rule::PreviewTestRule => { test_rules::PreviewTestRule::diagnostic(locator, comment_ranges, &context); } Rule::DeprecatedTestRule => { test_rules::DeprecatedTestRule::diagnostic(locator, comment_ranges, &context); } Rule::AnotherDeprecatedTestRule => { test_rules::AnotherDeprecatedTestRule::diagnostic( locator, comment_ranges, &context, ); } Rule::RemovedTestRule => { test_rules::RemovedTestRule::diagnostic(locator, comment_ranges, &context); } Rule::AnotherRemovedTestRule => test_rules::AnotherRemovedTestRule::diagnostic( locator, comment_ranges, &context, ), Rule::RedirectedToTestRule => { test_rules::RedirectedToTestRule::diagnostic(locator, comment_ranges, &context); } Rule::RedirectedFromTestRule => test_rules::RedirectedFromTestRule::diagnostic( locator, comment_ranges, &context, ), Rule::RedirectedFromPrefixTestRule => { test_rules::RedirectedFromPrefixTestRule::diagnostic( locator, comment_ranges, &context, ); } _ => unreachable!("All test rules must have an implementation"), } } } // Enforce `noqa` directives. if noqa.is_enabled() || context .iter_enabled_rules() .any(|rule_code| rule_code.lint_source().is_noqa()) { let ignored = check_noqa( &mut context, path, locator, comment_ranges, &directives.noqa_line_for, parsed.has_valid_syntax(), settings, ); if noqa.is_enabled() { for index in ignored.iter().rev() { context.as_mut_vec().swap_remove(*index); } } } let (mut diagnostics, source_file) = context.into_parts(); if !parsed.has_valid_syntax() { // Avoid fixing in case the source code contains syntax errors. for diagnostic in &mut diagnostics { diagnostic.remove_fix(); } } let syntax_errors = parsed.unsupported_syntax_errors(); diagnostics_to_messages( diagnostics, parsed.errors(), syntax_errors, &semantic_syntax_errors, directives, &source_file, ) } const MAX_ITERATIONS: usize = 100; /// Add any missing `# noqa` pragmas to the source code at the given `Path`. pub fn add_noqa_to_path( path: &Path, package: Option>, source_kind: &SourceKind, source_type: PySourceType, settings: &LinterSettings, ) -> Result { // Parse once. let target_version = settings.resolve_target_version(path); let parsed = parse_unchecked_source(source_kind, source_type, target_version.parser_version()); // Map row and column locations to byte slices (lazily). let locator = Locator::new(source_kind.source_code()); // Detect the current code style (lazily). let stylist = Stylist::from_tokens(parsed.tokens(), locator.contents()); // Extra indices from the code. let indexer = Indexer::from_tokens(parsed.tokens(), locator.contents()); // Extract the `# noqa` and `# isort: skip` directives from the source. let directives = directives::extract_directives( parsed.tokens(), directives::Flags::from_settings(settings), &locator, &indexer, ); // Generate diagnostics, ignoring any existing `noqa` directives. let diagnostics = check_path( path, package, &locator, &stylist, &indexer, &directives, settings, flags::Noqa::Disabled, source_kind, source_type, &parsed, target_version, ); // Add any missing `# noqa` pragmas. // TODO(dhruvmanila): Add support for Jupyter Notebooks add_noqa( path, &diagnostics, &locator, indexer.comment_ranges(), &settings.external, &directives.noqa_line_for, stylist.line_ending(), ) } /// Generate a [`Diagnostic`] for each diagnostic triggered by the given source code. pub fn lint_only( path: &Path, package: Option>, settings: &LinterSettings, noqa: flags::Noqa, source_kind: &SourceKind, source_type: PySourceType, source: ParseSource, ) -> LinterResult { let target_version = settings.resolve_target_version(path); if matches!(target_version, TargetVersion(Some(PythonVersion::PY314))) && !is_py314_support_enabled(settings) { warn_user_once!( "Support for Python 3.14 is in preview and may undergo breaking changes. Enable `preview` to remove this warning." ); } let parsed = source.into_parsed(source_kind, source_type, target_version.parser_version()); // Map row and column locations to byte slices (lazily). let locator = Locator::new(source_kind.source_code()); // Detect the current code style (lazily). let stylist = Stylist::from_tokens(parsed.tokens(), locator.contents()); // Extra indices from the code. let indexer = Indexer::from_tokens(parsed.tokens(), locator.contents()); // Extract the `# noqa` and `# isort: skip` directives from the source. let directives = directives::extract_directives( parsed.tokens(), directives::Flags::from_settings(settings), &locator, &indexer, ); // Generate diagnostics. let diagnostics = check_path( path, package, &locator, &stylist, &indexer, &directives, settings, noqa, source_kind, source_type, &parsed, target_version, ); LinterResult { has_valid_syntax: parsed.has_valid_syntax(), has_no_syntax_errors: !diagnostics.iter().any(Diagnostic::is_invalid_syntax), diagnostics, } } /// Convert various error types into a single collection of diagnostics. /// /// Also use `directives` to attach noqa offsets to lint diagnostics. fn diagnostics_to_messages( diagnostics: Vec, parse_errors: &[ParseError], unsupported_syntax_errors: &[UnsupportedSyntaxError], semantic_syntax_errors: &[SemanticSyntaxError], directives: &Directives, source_file: &SourceFile, ) -> Vec { parse_errors .iter() .map(|parse_error| { create_syntax_error_diagnostic(source_file.clone(), &parse_error.error, parse_error) }) .chain(unsupported_syntax_errors.iter().map(|syntax_error| { create_syntax_error_diagnostic(source_file.clone(), syntax_error, syntax_error) })) .chain( semantic_syntax_errors .iter() .map(|error| create_syntax_error_diagnostic(source_file.clone(), error, error)), ) .chain(diagnostics.into_iter().map(|mut diagnostic| { let noqa_offset = directives .noqa_line_for .resolve(diagnostic.expect_range().start()); diagnostic.set_noqa_offset(noqa_offset); diagnostic })) .collect() } /// Generate `Diagnostic`s from source code content, iteratively fixing /// until stable. pub fn lint_fix<'a>( path: &Path, package: Option>, noqa: flags::Noqa, unsafe_fixes: UnsafeFixes, settings: &LinterSettings, source_kind: &'a SourceKind, source_type: PySourceType, ) -> Result> { let mut transformed = Cow::Borrowed(source_kind); // Track the number of fixed errors across iterations. let mut fixed = FixTable::default(); // As an escape hatch, bail after 100 iterations. let mut iterations = 0; // Track whether the _initial_ source code has valid syntax. let mut has_valid_syntax = false; // Track whether the _initial_ source code has no unsupported syntax errors. let mut has_no_syntax_errors = false; let target_version = settings.resolve_target_version(path); if matches!(target_version, TargetVersion(Some(PythonVersion::PY314))) && !is_py314_support_enabled(settings) { warn_user_once!( "Support for Python 3.14 is in preview and may undergo breaking changes. Enable `preview` to remove this warning." ); } // Continuously fix until the source code stabilizes. loop { // Parse once. let parsed = parse_unchecked_source(&transformed, source_type, target_version.parser_version()); // Map row and column locations to byte slices (lazily). let locator = Locator::new(transformed.source_code()); // Detect the current code style (lazily). let stylist = Stylist::from_tokens(parsed.tokens(), locator.contents()); // Extra indices from the code. let indexer = Indexer::from_tokens(parsed.tokens(), locator.contents()); // Extract the `# noqa` and `# isort: skip` directives from the source. let directives = directives::extract_directives( parsed.tokens(), directives::Flags::from_settings(settings), &locator, &indexer, ); // Generate diagnostics. let diagnostics = check_path( path, package, &locator, &stylist, &indexer, &directives, settings, noqa, &transformed, source_type, &parsed, target_version, ); if iterations == 0 { has_valid_syntax = parsed.has_valid_syntax(); has_no_syntax_errors = !diagnostics.iter().any(Diagnostic::is_invalid_syntax); } else { // If the source code had no syntax errors on the first pass, but // does on a subsequent pass, then we've introduced a // syntax error. Return the original code. if has_valid_syntax && has_no_syntax_errors { if let Some(error) = parsed.errors().first() { report_fix_syntax_error(path, transformed.source_code(), error, fixed.keys()); return Err(anyhow!("Fix introduced a syntax error")); } } } // Apply fix. if let Some(FixResult { code: fixed_contents, fixes: applied, source_map, }) = fix_file(&diagnostics, &locator, unsafe_fixes) { if iterations < MAX_ITERATIONS { // Count the number of fixed errors. for (rule, name, count) in applied.iter() { *fixed.entry(rule).or_default(name) += count; } transformed = Cow::Owned(transformed.updated(fixed_contents, &source_map)); // Increment the iteration count. iterations += 1; // Re-run the linter pass (by avoiding the return). continue; } report_failed_to_converge_error(path, transformed.source_code(), &diagnostics); } return Ok(FixerResult { result: LinterResult { diagnostics, has_valid_syntax, has_no_syntax_errors, }, transformed, fixed, }); } } fn collect_rule_codes(rules: impl IntoIterator) -> String where T: Ord + PartialEq + std::fmt::Display, { rules.into_iter().sorted_unstable().dedup().join(", ") } #[expect(clippy::print_stderr)] fn report_failed_to_converge_error(path: &Path, transformed: &str, diagnostics: &[Diagnostic]) { let codes = collect_rule_codes(diagnostics.iter().filter_map(Diagnostic::secondary_code)); if cfg!(debug_assertions) { eprintln!( "{}{} Failed to converge after {} iterations in `{}` with rule codes {}:---\n{}\n---", "debug error".red().bold(), ":".bold(), MAX_ITERATIONS, fs::relativize_path(path), codes, transformed, ); } else { eprintln!( r#" {}{} Failed to converge after {} iterations. This indicates a bug in Ruff. If you could open an issue at: https://github.com/astral-sh/ruff/issues/new?title=%5BInfinite%20loop%5D ...quoting the contents of `{}`, the rule codes {}, along with the `pyproject.toml` settings and executed command, we'd be very appreciative! "#, "error".red().bold(), ":".bold(), MAX_ITERATIONS, fs::relativize_path(path), codes ); } } #[expect(clippy::print_stderr)] fn report_fix_syntax_error<'a>( path: &Path, transformed: &str, error: &ParseError, rules: impl IntoIterator, ) { let codes = collect_rule_codes(rules); if cfg!(debug_assertions) { eprintln!( "{}{} Fix introduced a syntax error in `{}` with rule codes {}: {}\n---\n{}\n---", "error".red().bold(), ":".bold(), fs::relativize_path(path), codes, error, transformed, ); } else { eprintln!( r#" {}{} Fix introduced a syntax error. Reverting all changes. This indicates a bug in Ruff. If you could open an issue at: https://github.com/astral-sh/ruff/issues/new?title=%5BFix%20error%5D ...quoting the contents of `{}`, the rule codes {}, along with the `pyproject.toml` settings and executed command, we'd be very appreciative! "#, "error".red().bold(), ":".bold(), fs::relativize_path(path), codes, ); } } #[derive(Debug, Clone)] pub enum ParseSource { /// Parse the [`Parsed`] from the given source code. None, /// Use the precomputed [`Parsed`]. Precomputed(Parsed), } impl ParseSource { /// Consumes the [`ParseSource`] and returns the parsed [`Parsed`], parsing the source code if /// necessary. fn into_parsed( self, source_kind: &SourceKind, source_type: PySourceType, target_version: PythonVersion, ) -> Parsed { match self { ParseSource::None => parse_unchecked_source(source_kind, source_type, target_version), ParseSource::Precomputed(parsed) => parsed, } } } /// Like [`ruff_python_parser::parse_unchecked_source`] but with an additional [`PythonVersion`] /// argument. fn parse_unchecked_source( source_kind: &SourceKind, source_type: PySourceType, target_version: PythonVersion, ) -> Parsed { let options = ParseOptions::from(source_type).with_target_version(target_version); // SAFETY: Safe because `PySourceType` always parses to a `ModModule`. See // `ruff_python_parser::parse_unchecked_source`. We use `parse_unchecked` (and thus // have to unwrap) in order to pass the `PythonVersion` via `ParseOptions`. ruff_python_parser::parse_unchecked(source_kind.source_code(), options) .try_into_module() .expect("PySourceType always parses into a module") } #[cfg(test)] mod tests { use std::path::Path; use anyhow::Result; use ruff_python_ast::{PySourceType, PythonVersion}; use ruff_python_codegen::Stylist; use ruff_python_index::Indexer; use ruff_python_parser::ParseOptions; use ruff_python_trivia::textwrap::dedent; use test_case::test_case; use ruff_db::diagnostic::Diagnostic; use ruff_notebook::{Notebook, NotebookError}; use crate::linter::check_path; use crate::registry::Rule; use crate::settings::LinterSettings; use crate::source_kind::SourceKind; use crate::test::{TestedNotebook, assert_notebook_path, test_contents, test_snippet}; use crate::{Locator, assert_diagnostics, directives, settings}; /// Construct a path to a Jupyter notebook in the `resources/test/fixtures/jupyter` directory. fn notebook_path(path: impl AsRef) -> std::path::PathBuf { Path::new("../ruff_notebook/resources/test/fixtures/jupyter").join(path) } #[test] fn test_import_sorting() -> Result<(), NotebookError> { let actual = notebook_path("isort.ipynb"); let expected = notebook_path("isort_expected.ipynb"); let TestedNotebook { diagnostics, source_notebook, .. } = assert_notebook_path( &actual, expected, &LinterSettings::for_rule(Rule::UnsortedImports), )?; assert_diagnostics!(diagnostics, actual, source_notebook); Ok(()) } #[test] fn test_ipy_escape_command() -> Result<(), NotebookError> { let actual = notebook_path("ipy_escape_command.ipynb"); let expected = notebook_path("ipy_escape_command_expected.ipynb"); let TestedNotebook { diagnostics, source_notebook, .. } = assert_notebook_path( &actual, expected, &LinterSettings::for_rule(Rule::UnusedImport), )?; assert_diagnostics!(diagnostics, actual, source_notebook); Ok(()) } #[test] fn test_unused_variable() -> Result<(), NotebookError> { let actual = notebook_path("unused_variable.ipynb"); let expected = notebook_path("unused_variable_expected.ipynb"); let TestedNotebook { diagnostics, source_notebook, .. } = assert_notebook_path( &actual, expected, &LinterSettings::for_rule(Rule::UnusedVariable), )?; assert_diagnostics!(diagnostics, actual, source_notebook); Ok(()) } #[test] fn test_undefined_name() -> Result<(), NotebookError> { let actual = notebook_path("undefined_name.ipynb"); let expected = notebook_path("undefined_name.ipynb"); let TestedNotebook { diagnostics, source_notebook, .. } = assert_notebook_path( &actual, expected, &LinterSettings::for_rule(Rule::UndefinedName), )?; assert_diagnostics!(diagnostics, actual, source_notebook); Ok(()) } #[test] fn test_json_consistency() -> Result<()> { let actual_path = notebook_path("before_fix.ipynb"); let expected_path = notebook_path("after_fix.ipynb"); let TestedNotebook { linted_notebook: fixed_notebook, .. } = assert_notebook_path( actual_path, &expected_path, &LinterSettings::for_rule(Rule::UnusedImport), )?; let mut writer = Vec::new(); fixed_notebook.write(&mut writer)?; let actual = String::from_utf8(writer)?; let expected = std::fs::read_to_string(expected_path)?; assert_eq!(actual, expected); Ok(()) } #[test] fn test_vscode_language_id() -> Result<()> { let actual = notebook_path("vscode_language_id.ipynb"); let expected = notebook_path("vscode_language_id_expected.ipynb"); let TestedNotebook { diagnostics, source_notebook, .. } = assert_notebook_path( &actual, expected, &LinterSettings::for_rule(Rule::UnusedImport), )?; assert_diagnostics!(diagnostics, actual, source_notebook); Ok(()) } #[test_case(Path::new("before_fix.ipynb"), true; "trailing_newline")] #[test_case(Path::new("no_trailing_newline.ipynb"), false; "no_trailing_newline")] fn test_trailing_newline(path: &Path, trailing_newline: bool) -> Result<()> { let notebook = Notebook::from_path(¬ebook_path(path))?; assert_eq!(notebook.trailing_newline(), trailing_newline); let mut writer = Vec::new(); notebook.write(&mut writer)?; let string = String::from_utf8(writer)?; assert_eq!(string.ends_with('\n'), trailing_newline); Ok(()) } // Version <4.5, don't emit cell ids #[test_case(Path::new("no_cell_id.ipynb"), false; "no_cell_id")] // Version 4.5, cell ids are missing and need to be added #[test_case(Path::new("add_missing_cell_id.ipynb"), true; "add_missing_cell_id")] fn test_cell_id(path: &Path, has_id: bool) -> Result<()> { let source_notebook = Notebook::from_path(¬ebook_path(path))?; let source_kind = SourceKind::ipy_notebook(source_notebook); let (_, transformed) = test_contents( &source_kind, path, &LinterSettings::for_rule(Rule::UnusedImport), ); let linted_notebook = transformed.into_owned().expect_ipy_notebook(); let mut writer = Vec::new(); linted_notebook.write(&mut writer)?; let actual = String::from_utf8(writer)?; if has_id { assert!(actual.contains(r#""id": ""#)); } else { assert!(!actual.contains(r#""id":"#)); } Ok(()) } /// Wrapper around `test_contents_syntax_errors` for testing a snippet of code instead of a /// file. fn test_snippet_syntax_errors(contents: &str, settings: &LinterSettings) -> Vec { let contents = dedent(contents); test_contents_syntax_errors( &SourceKind::Python(contents.to_string()), Path::new(""), settings, ) } /// A custom test runner that prints syntax errors in addition to other diagnostics. Adapted /// from `flakes` in pyflakes/mod.rs. fn test_contents_syntax_errors( source_kind: &SourceKind, path: &Path, settings: &LinterSettings, ) -> Vec { let source_type = PySourceType::from(path); let target_version = settings.resolve_target_version(path); let options = ParseOptions::from(source_type).with_target_version(target_version.parser_version()); let parsed = ruff_python_parser::parse_unchecked(source_kind.source_code(), options) .try_into_module() .expect("PySourceType always parses into a module"); let locator = Locator::new(source_kind.source_code()); let stylist = Stylist::from_tokens(parsed.tokens(), locator.contents()); let indexer = Indexer::from_tokens(parsed.tokens(), locator.contents()); let directives = directives::extract_directives( parsed.tokens(), directives::Flags::from_settings(settings), &locator, &indexer, ); let mut diagnostics = check_path( path, None, &locator, &stylist, &indexer, &directives, settings, settings::flags::Noqa::Enabled, source_kind, source_type, &parsed, target_version, ); diagnostics.sort_by_key(|diagnostic| diagnostic.expect_range().start()); diagnostics } #[test_case( "async_in_sync_error_on_310", "async def f(): return [[x async for x in foo(n)] for n in range(3)]", PythonVersion::PY310, "AsyncComprehensionOutsideAsyncFunction" )] #[test_case( "async_in_sync_okay_on_311", "async def f(): return [[x async for x in foo(n)] for n in range(3)]", PythonVersion::PY311, "AsyncComprehensionOutsideAsyncFunction" )] #[test_case( "async_in_sync_okay_on_310", "async def test(): return [[x async for x in elements(n)] async for n in range(3)]", PythonVersion::PY310, "AsyncComprehensionOutsideAsyncFunction" )] #[test_case( "deferred_function_body", " async def f(): [x for x in foo()] and [x async for x in foo()] async def f(): def g(): ... [x async for x in foo()] ", PythonVersion::PY310, "AsyncComprehensionOutsideAsyncFunction" )] #[test_case( "async_in_sync_false_positive", "[x async for x in y]", PythonVersion::PY310, "AsyncComprehensionOutsideAsyncFunction" )] #[test_case( "rebound_comprehension", "[x:= 2 for x in range(2)]", PythonVersion::PY310, "ReboundComprehensionVariable" )] #[test_case( "duplicate_type_param", "class C[T, T]: pass", PythonVersion::PY312, "DuplicateTypeParameter" )] #[test_case( "multiple_case_assignment", " match x: case [a, a]: pass case _: pass ", PythonVersion::PY310, "MultipleCaseAssignment" )] #[test_case( "duplicate_match_key", " match x: case {'key': 1, 'key': 2}: pass ", PythonVersion::PY310, "DuplicateMatchKey" )] #[test_case( "duplicate_match_class_attribute", " match x: case Point(x=1, x=2): pass ", PythonVersion::PY310, "DuplicateMatchClassAttribute" )] #[test_case( "invalid_star_expression", " def func(): return *x ", PythonVersion::PY310, "InvalidStarExpression" )] #[test_case( "invalid_star_expression_for", " for *x in range(10): pass ", PythonVersion::PY310, "InvalidStarExpression" )] #[test_case( "invalid_star_expression_yield", " def func(): yield *x ", PythonVersion::PY310, "InvalidStarExpression" )] #[test_case( "irrefutable_case_pattern_wildcard", " match value: case _: pass case 1: pass ", PythonVersion::PY310, "IrrefutableCasePattern" )] #[test_case( "irrefutable_case_pattern_capture", " match value: case irrefutable: pass case 1: pass ", PythonVersion::PY310, "IrrefutableCasePattern" )] #[test_case( "single_starred_assignment", "*a = [1, 2, 3, 4]", PythonVersion::PY310, "SingleStarredAssignment" )] #[test_case( "write_to_debug", " __debug__ = False ", PythonVersion::PY310, "WriteToDebug" )] #[test_case( "write_to_debug_in_function_param", " def process(__debug__): pass ", PythonVersion::PY310, "WriteToDebug" )] #[test_case( "write_to_debug_class_type_param", " class Generic[__debug__]: pass ", PythonVersion::PY312, "WriteToDebug" )] #[test_case( "invalid_expression_yield_in_type_param", " type X[T: (yield 1)] = int ", PythonVersion::PY312, "InvalidExpression" )] #[test_case( "invalid_expression_yield_in_type_alias", " type Y = (yield 1) ", PythonVersion::PY312, "InvalidExpression" )] #[test_case( "invalid_expression_walrus_in_return_annotation", " def f[T](x: int) -> (y := 3): return x ", PythonVersion::PY312, "InvalidExpression" )] #[test_case( "invalid_expression_yield_from_in_base_class", " class C[T]((yield from [object])): pass ", PythonVersion::PY312, "InvalidExpression" )] fn test_semantic_errors( name: &str, contents: &str, python_version: PythonVersion, error_type: &str, ) { let snapshot = format!("semantic_syntax_error_{error_type}_{name}_{python_version}"); let diagnostics = test_snippet_syntax_errors( contents, &LinterSettings { rules: settings::rule_table::RuleTable::empty(), unresolved_target_version: python_version.into(), preview: settings::types::PreviewMode::Enabled, ..Default::default() }, ); assert_diagnostics!(snapshot, diagnostics); } #[test_case(PythonVersion::PY310)] #[test_case(PythonVersion::PY311)] fn test_async_comprehension_notebook(python_version: PythonVersion) -> Result<()> { let snapshot = format!("async_comprehension_in_sync_comprehension_notebook_{python_version}"); let path = Path::new("resources/test/fixtures/syntax_errors/async_comprehension.ipynb"); let diagnostics = test_contents_syntax_errors( &SourceKind::ipy_notebook(Notebook::from_path(path)?), path, &LinterSettings { unresolved_target_version: python_version.into(), rules: settings::rule_table::RuleTable::empty(), preview: settings::types::PreviewMode::Enabled, ..Default::default() }, ); assert_diagnostics!(snapshot, diagnostics); Ok(()) } #[test_case(Rule::LateFutureImport, Path::new("late_future_import.py"))] #[test_case(Rule::YieldOutsideFunction, Path::new("yield_scope.py"))] #[test_case(Rule::ReturnOutsideFunction, Path::new("return_outside_function.py"))] #[test_case( Rule::LoadBeforeGlobalDeclaration, Path::new("load_before_global_declaration.py") )] #[test_case(Rule::AwaitOutsideAsync, Path::new("await_outside_async_function.py"))] #[test_case(Rule::AwaitOutsideAsync, Path::new("async_comprehension.py"))] fn test_syntax_errors(rule: Rule, path: &Path) -> Result<()> { let snapshot = path.to_string_lossy().to_string(); let path = Path::new("resources/test/fixtures/syntax_errors").join(path); let diagnostics = test_contents_syntax_errors( &SourceKind::Python(std::fs::read_to_string(&path)?), &path, &LinterSettings::for_rule(rule), ); insta::with_settings!({filters => vec![(r"\\", "/")]}, { assert_diagnostics!(snapshot, diagnostics); }); Ok(()) } #[test] fn test_await_scope_notebook() -> Result<()> { let path = Path::new("resources/test/fixtures/syntax_errors/await_scope.ipynb"); let TestedNotebook { diagnostics, source_notebook, .. } = assert_notebook_path( path, path, &LinterSettings::for_rule(Rule::YieldOutsideFunction), )?; assert_diagnostics!(diagnostics, path, source_notebook); Ok(()) } const PYI019_EXAMPLE: &str = r#" from typing import TypeVar T = TypeVar("T", bound="_NiceReprEnum") class C: def __new__(cls: type[T]) -> T: return cls "#; #[test_case( "pyi019_adds_typing_extensions", PYI019_EXAMPLE, &LinterSettings { unresolved_target_version: PythonVersion::PY310.into(), typing_extensions: true, ..LinterSettings::for_rule(Rule::CustomTypeVarForSelf) } )] #[test_case( "pyi019_does_not_add_typing_extensions", PYI019_EXAMPLE, &LinterSettings { unresolved_target_version: PythonVersion::PY310.into(), typing_extensions: false, ..LinterSettings::for_rule(Rule::CustomTypeVarForSelf) } )] #[test_case( "pyi019_adds_typing_without_extensions_disabled", PYI019_EXAMPLE, &LinterSettings { unresolved_target_version: PythonVersion::PY311.into(), typing_extensions: true, ..LinterSettings::for_rule(Rule::CustomTypeVarForSelf) } )] #[test_case( "pyi019_adds_typing_with_extensions_disabled", PYI019_EXAMPLE, &LinterSettings { unresolved_target_version: PythonVersion::PY311.into(), typing_extensions: false, ..LinterSettings::for_rule(Rule::CustomTypeVarForSelf) } )] #[test_case( "pyi034_disabled", " class C: def __new__(cls) -> C: ... ", &LinterSettings { unresolved_target_version: PythonVersion { major: 3, minor: 10 }.into(), typing_extensions: false, ..LinterSettings::for_rule(Rule::NonSelfReturnType) } )] #[test_case( "fast002_disabled", r#" from fastapi import Depends, FastAPI app = FastAPI() @app.get("/items/") async def read_items(commons: dict = Depends(common_parameters)): return commons "#, &LinterSettings { unresolved_target_version: PythonVersion { major: 3, minor: 8 }.into(), typing_extensions: false, ..LinterSettings::for_rule(Rule::FastApiNonAnnotatedDependency) } )] fn test_disabled_typing_extensions(name: &str, contents: &str, settings: &LinterSettings) { let snapshot = format!("disabled_typing_extensions_{name}"); let diagnostics = test_snippet(contents, settings); assert_diagnostics!(snapshot, diagnostics); } #[test_case( "pyi026_disabled", "Vector = list[float]", &LinterSettings { unresolved_target_version: PythonVersion { major: 3, minor: 9 }.into(), typing_extensions: false, ..LinterSettings::for_rule(Rule::TypeAliasWithoutAnnotation) } )] fn test_disabled_typing_extensions_pyi(name: &str, contents: &str, settings: &LinterSettings) { let snapshot = format!("disabled_typing_extensions_pyi_{name}"); let path = Path::new(".pyi"); let contents = dedent(contents); let diagnostics = test_contents(&SourceKind::Python(contents.into_owned()), path, settings).0; assert_diagnostics!(snapshot, diagnostics); } }