Skip to content

Commit

Permalink
Implement first Airflow upgrade rule
Browse files Browse the repository at this point in the history
  • Loading branch information
uranusjr committed Nov 19, 2024
1 parent f8c2025 commit c5c2075
Show file tree
Hide file tree
Showing 8 changed files with 207 additions and 3 deletions.
30 changes: 30 additions & 0 deletions crates/ruff_linter/resources/test/fixtures/airflow/AIR201.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
from airflow import DAG, dag
from airflow.timetables.trigger import CronTriggerTimetable

DAG(dag_id="class_default_schedule")

DAG(dag_id="class_schedule", schedule="@hourly")

DAG(dag_id="class_timetable", timetable=CronTriggerTimetable())

DAG(dag_id="class_schedule_interval", schedule_interval="@hourly")


@dag()
def decorator_default_schedule():
pass


@dag(schedule="0 * * * *")
def decorator_schedule():
pass


@dag(timetable=CronTriggerTimetable())
def decorator_timetable():
pass


@dag(schedule_interval="0 * * * *")
def decorator_schedule_interval():
pass
7 changes: 5 additions & 2 deletions crates/ruff_linter/src/checkers/ast/analyze/expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use ruff_text_size::Ranged;
use crate::checkers::ast::Checker;
use crate::registry::Rule;
use crate::rules::{
flake8_2020, flake8_async, flake8_bandit, flake8_boolean_trap, flake8_bugbear, flake8_builtins,
flake8_comprehensions, flake8_datetimez, flake8_debugger, flake8_django,
airflow, flake8_2020, flake8_async, flake8_bandit, flake8_boolean_trap, flake8_bugbear,
flake8_builtins, flake8_comprehensions, flake8_datetimez, flake8_debugger, flake8_django,
flake8_future_annotations, flake8_gettext, flake8_implicit_str_concat, flake8_logging,
flake8_logging_format, flake8_pie, flake8_print, flake8_pyi, flake8_pytest_style, flake8_self,
flake8_simplify, flake8_tidy_imports, flake8_type_checking, flake8_use_pathlib, flynt, numpy,
Expand Down Expand Up @@ -1058,6 +1058,9 @@ pub(crate) fn expression(expr: &Expr, checker: &mut Checker) {
if checker.enabled(Rule::MapIntVersionParsing) {
ruff::rules::map_int_version_parsing(checker, call);
}
if checker.enabled(Rule::AirflowDagNoScheduleArgument) {
airflow::rules::dag_no_schedule_argument(checker, expr);
}
}
Expr::Dict(dict) => {
if checker.any_enabled(&[
Expand Down
1 change: 1 addition & 0 deletions crates/ruff_linter/src/codes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1026,6 +1026,7 @@ pub fn code_to_rule(linter: Linter, code: &str) -> Option<(RuleGroup, Rule)> {

// airflow
(Airflow, "001") => (RuleGroup::Stable, rules::airflow::rules::AirflowVariableNameTaskIdMismatch),
(Airflow, "201") => (RuleGroup::Preview, rules::airflow::rules::AirflowDagNoScheduleArgument),

// perflint
(Perflint, "101") => (RuleGroup::Stable, rules::perflint::rules::UnnecessaryListCast),
Expand Down
1 change: 1 addition & 0 deletions crates/ruff_linter/src/rules/airflow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mod tests {
use crate::{assert_messages, settings};

#[test_case(Rule::AirflowVariableNameTaskIdMismatch, Path::new("AIR001.py"))]
#[test_case(Rule::AirflowDagNoScheduleArgument, Path::new("AIR201.py"))]
fn rules(rule_code: Rule, path: &Path) -> Result<()> {
let snapshot = format!("{}_{}", rule_code.noqa_code(), path.to_string_lossy());
let diagnostics = test_path(
Expand Down
110 changes: 110 additions & 0 deletions crates/ruff_linter/src/rules/airflow/rules/dag_schedule_argument.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use ruff_diagnostics::{Diagnostic, Violation};
use ruff_macros::{derive_message_formats, violation};
use ruff_python_ast::Expr;
use ruff_python_ast::{self as ast, Keyword};
use ruff_text_size::Ranged;

use crate::checkers::ast::Checker;

/// ## What it does
/// Checks that the `DAG()` class or a `@dag()` decorator has an explicit
/// `schedule` parameter.
///
/// ## Why is this bad?
/// The default `schedule` value on Airflow 2 is `timedelta(days=1)`, which is
/// almost never what a user is looking for. Airflow 3 changes this the default
/// to *None*, and would break existing DAGs using the implicit default.
///
/// Airflow 2 also provides alternative arguments `schedule_interval` and
/// `timetable` to specify the DAG schedule. They existed for backward
/// compatibility, and have been removed from Airflow 3.
///
/// ## Example
/// ```python
/// from airflow import DAG
///
///
/// # Using the implicit default schedule.
/// dag1 = DAG(dag_id="my_dag_1")
///
/// # Using a deprecated argument to set schedule.
/// dag2 = DAG(dag_id="my_dag_2", schedule_interval="@daily")
/// ```
///
/// Use instead:
/// ```python
/// from datetime import timedelta
///
/// from airflow import DAG
///
///
/// dag1 = DAG(dag_id="my_dag_1", schedule=timedelta(days=1))
/// dag2 = DAG(dag_id="my_dag_2", schedule="@daily")
/// ```
#[violation]
pub struct AirflowDagNoScheduleArgument {
deprecated_argument: Option<String>,
}

impl Violation for AirflowDagNoScheduleArgument {
#[derive_message_formats]
fn message(&self) -> String {
let AirflowDagNoScheduleArgument {
deprecated_argument,
} = self;
match deprecated_argument {
Some(argument) => {
format!("argument `{argument}` is deprecated; use `schedule` instead")
}
None => "DAG should have an explicit `schedule` argument".to_string(),
}
}
}

/// AIR201
pub(crate) fn dag_no_schedule_argument(checker: &mut Checker, expr: &Expr) {
// Don't check non-call expressions.
let Expr::Call(ast::ExprCall { arguments, .. }) = expr else {
return;
};

// We don't do anything unless this is a `DAG` (class) or `dag` (decorator
// function) from Airflow.
if !checker
.semantic()
.resolve_qualified_name(expr)
.is_some_and(|qualname| matches!(qualname.segments(), ["airflow", .., "DAG" | "dag"]))
{
return;
}

// If there's a `schedule` keyword argument, we are good.
if arguments.find_keyword("schedule").is_some() {
return;
}

// Produce a diagnostic on either a deprecated schedule keyword argument,
// or no schedule-related keyword arguments at all.
let diagnostic = if let Some(keyword) = arguments.keywords.iter().find(|keyword| {
let Keyword { arg, .. } = keyword;
arg.as_ref()
.is_some_and(|arg| arg == "timetable" || arg == "schedule_interval")
}) {
// A deprecated argument is used.
Diagnostic::new(
AirflowDagNoScheduleArgument {
deprecated_argument: keyword.arg.as_ref().map(ToString::to_string),
},
keyword.range(),
)
} else {
// The implicit default is used.
Diagnostic::new(
AirflowDagNoScheduleArgument {
deprecated_argument: None,
},
expr.range(),
)
};
checker.diagnostics.push(diagnostic);
}
2 changes: 2 additions & 0 deletions crates/ruff_linter/src/rules/airflow/rules/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub(crate) use dag_schedule_argument::*;
pub(crate) use task_variable_name::*;

mod dag_schedule_argument;
mod task_variable_name;
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
---
source: crates/ruff_linter/src/rules/airflow/mod.rs
---
AIR201.py:4:1: AIR201 DAG should have an explicit `schedule` argument
|
2 | from airflow.timetables.trigger import CronTriggerTimetable
3 |
4 | DAG(dag_id="class_default_schedule")
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ AIR201
5 |
6 | DAG(dag_id="class_schedule", schedule="@hourly")
|

AIR201.py:8:31: AIR201 argument `timetable` is deprecated; use `schedule` instead
|
6 | DAG(dag_id="class_schedule", schedule="@hourly")
7 |
8 | DAG(dag_id="class_timetable", timetable=CronTriggerTimetable())
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ AIR201
9 |
10 | DAG(dag_id="class_schedule_interval", schedule_interval="@hourly")
|

AIR201.py:10:39: AIR201 argument `schedule_interval` is deprecated; use `schedule` instead
|
8 | DAG(dag_id="class_timetable", timetable=CronTriggerTimetable())
9 |
10 | DAG(dag_id="class_schedule_interval", schedule_interval="@hourly")
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^ AIR201
|

AIR201.py:13:2: AIR201 DAG should have an explicit `schedule` argument
|
13 | @dag()
| ^^^^^ AIR201
14 | def decorator_default_schedule():
15 | pass
|

AIR201.py:23:6: AIR201 argument `timetable` is deprecated; use `schedule` instead
|
23 | @dag(timetable=CronTriggerTimetable())
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ AIR201
24 | def decorator_timetable():
25 | pass
|

AIR201.py:28:6: AIR201 argument `schedule_interval` is deprecated; use `schedule` instead
|
28 | @dag(schedule_interval="0 * * * *")
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ AIR201
29 | def decorator_schedule_interval():
30 | pass
|
5 changes: 4 additions & 1 deletion ruff.schema.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit c5c2075

Please sign in to comment.