[airflow] Avoid implicit DAG schedule (AIR301) (#14581)

This commit is contained in:
Tzu-ping Chung 2024-11-26 20:38:18 +08:00 committed by GitHub
parent f3dac27e9a
commit fbff4dec3a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 133 additions and 2 deletions

View file

@ -0,0 +1,15 @@
from airflow import DAG, dag
DAG(dag_id="class_default_schedule")
DAG(dag_id="class_schedule", schedule="@hourly")
@dag()
def decorator_default_schedule():
pass
@dag(schedule="0 * * * *")
def decorator_schedule():
pass

View file

@ -11,8 +11,8 @@ use ruff_text_size::Ranged;
use crate::checkers::ast::Checker;
use crate::registry::Rule;
use crate::rules::{
flake8_2020, flake8_async, flake8_bandit, flake8_boolean_trap, flake8_bugbear, flake8_builtins,
flake8_comprehensions, flake8_datetimez, flake8_debugger, flake8_django,
airflow, flake8_2020, flake8_async, flake8_bandit, flake8_boolean_trap, flake8_bugbear,
flake8_builtins, flake8_comprehensions, flake8_datetimez, flake8_debugger, flake8_django,
flake8_future_annotations, flake8_gettext, flake8_implicit_str_concat, flake8_logging,
flake8_logging_format, flake8_pie, flake8_print, flake8_pyi, flake8_pytest_style, flake8_self,
flake8_simplify, flake8_tidy_imports, flake8_type_checking, flake8_use_pathlib, flynt, numpy,
@ -1070,6 +1070,9 @@ pub(crate) fn expression(expr: &Expr, checker: &mut Checker) {
if checker.enabled(Rule::UnrawRePattern) {
ruff::rules::unraw_re_pattern(checker, call);
}
if checker.enabled(Rule::AirflowDagNoScheduleArgument) {
airflow::rules::dag_no_schedule_argument(checker, expr);
}
}
Expr::Dict(dict) => {
if checker.any_enabled(&[

View file

@ -1033,6 +1033,7 @@ pub fn code_to_rule(linter: Linter, code: &str) -> Option<(RuleGroup, Rule)> {
// airflow
(Airflow, "001") => (RuleGroup::Stable, rules::airflow::rules::AirflowVariableNameTaskIdMismatch),
(Airflow, "301") => (RuleGroup::Preview, rules::airflow::rules::AirflowDagNoScheduleArgument),
// perflint
(Perflint, "101") => (RuleGroup::Stable, rules::perflint::rules::UnnecessaryListCast),

View file

@ -13,6 +13,7 @@ mod tests {
use crate::{assert_messages, settings};
#[test_case(Rule::AirflowVariableNameTaskIdMismatch, Path::new("AIR001.py"))]
#[test_case(Rule::AirflowDagNoScheduleArgument, Path::new("AIR301.py"))]
fn rules(rule_code: Rule, path: &Path) -> Result<()> {
let snapshot = format!("{}_{}", rule_code.noqa_code(), path.to_string_lossy());
let diagnostics = test_path(

View file

@ -0,0 +1,84 @@
use ruff_diagnostics::{Diagnostic, Violation};
use ruff_macros::{derive_message_formats, violation};
use ruff_python_ast::Expr;
use ruff_python_ast::{self as ast};
use ruff_python_semantic::Modules;
use ruff_text_size::Ranged;
use crate::checkers::ast::Checker;
/// ## What it does
/// Checks for a `DAG()` class or `@dag()` decorator without an explicit
/// `schedule` parameter.
///
/// ## Why is this bad?
/// The default `schedule` value on Airflow 2 is `timedelta(days=1)`, which is
/// almost never what a user is looking for. Airflow 3 changes this the default
/// to *None*, and would break existing DAGs using the implicit default.
///
/// If your DAG does not have an explicit `schedule` argument, Airflow 2
/// schedules a run for it every day (at the time determined by `start_date`).
/// Such a DAG will no longer be scheduled on Airflow 3 at all, without any
/// exceptions or other messages visible to the user.
///
/// ## Example
/// ```python
/// from airflow import DAG
///
///
/// # Using the implicit default schedule.
/// dag = DAG(dag_id="my_dag")
/// ```
///
/// Use instead:
/// ```python
/// from datetime import timedelta
///
/// from airflow import DAG
///
///
/// dag = DAG(dag_id="my_dag", schedule=timedelta(days=1))
/// ```
#[violation]
pub struct AirflowDagNoScheduleArgument;
impl Violation for AirflowDagNoScheduleArgument {
#[derive_message_formats]
fn message(&self) -> String {
"DAG should have an explicit `schedule` argument".to_string()
}
}
/// AIR301
pub(crate) fn dag_no_schedule_argument(checker: &mut Checker, expr: &Expr) {
if !checker.semantic().seen_module(Modules::AIRFLOW) {
return;
}
// Don't check non-call expressions.
let Expr::Call(ast::ExprCall {
func, arguments, ..
}) = expr
else {
return;
};
// We don't do anything unless this is a `DAG` (class) or `dag` (decorator
// function) from Airflow.
if !checker
.semantic()
.resolve_qualified_name(func)
.is_some_and(|qualname| matches!(qualname.segments(), ["airflow", .., "DAG" | "dag"]))
{
return;
}
// If there's a `schedule` keyword argument, we are good.
if arguments.find_keyword("schedule").is_some() {
return;
}
// Produce a diagnostic when the `schedule` keyword argument is not found.
let diagnostic = Diagnostic::new(AirflowDagNoScheduleArgument, expr.range());
checker.diagnostics.push(diagnostic);
}

View file

@ -1,3 +1,5 @@
pub(crate) use dag_schedule_argument::*;
pub(crate) use task_variable_name::*;
mod dag_schedule_argument;
mod task_variable_name;

View file

@ -0,0 +1,20 @@
---
source: crates/ruff_linter/src/rules/airflow/mod.rs
---
AIR301.py:3:1: AIR301 DAG should have an explicit `schedule` argument
|
1 | from airflow import DAG, dag
2 |
3 | DAG(dag_id="class_default_schedule")
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ AIR301
4 |
5 | DAG(dag_id="class_schedule", schedule="@hourly")
|
AIR301.py:8:2: AIR301 DAG should have an explicit `schedule` argument
|
8 | @dag()
| ^^^^^ AIR301
9 | def decorator_default_schedule():
10 | pass
|