Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[airflow] Avoid implicit DAG schedule (AIR301) #14581

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions crates/ruff_linter/resources/test/fixtures/airflow/AIR301.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from airflow import DAG, dag

DAG(dag_id="class_default_schedule")

DAG(dag_id="class_schedule", schedule="@hourly")


@dag()
def decorator_default_schedule():
pass


@dag(schedule="0 * * * *")
def decorator_schedule():
pass
7 changes: 5 additions & 2 deletions crates/ruff_linter/src/checkers/ast/analyze/expression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use ruff_text_size::Ranged;
use crate::checkers::ast::Checker;
use crate::registry::Rule;
use crate::rules::{
flake8_2020, flake8_async, flake8_bandit, flake8_boolean_trap, flake8_bugbear, flake8_builtins,
flake8_comprehensions, flake8_datetimez, flake8_debugger, flake8_django,
airflow, flake8_2020, flake8_async, flake8_bandit, flake8_boolean_trap, flake8_bugbear,
flake8_builtins, flake8_comprehensions, flake8_datetimez, flake8_debugger, flake8_django,
flake8_future_annotations, flake8_gettext, flake8_implicit_str_concat, flake8_logging,
flake8_logging_format, flake8_pie, flake8_print, flake8_pyi, flake8_pytest_style, flake8_self,
flake8_simplify, flake8_tidy_imports, flake8_type_checking, flake8_use_pathlib, flynt, numpy,
Expand Down Expand Up @@ -1061,6 +1061,9 @@ pub(crate) fn expression(expr: &Expr, checker: &mut Checker) {
if checker.enabled(Rule::UnrawRePattern) {
ruff::rules::unraw_re_pattern(checker, call);
}
if checker.enabled(Rule::AirflowDagNoScheduleArgument) {
airflow::rules::dag_no_schedule_argument(checker, expr);
}
}
Expr::Dict(dict) => {
if checker.any_enabled(&[
Expand Down
1 change: 1 addition & 0 deletions crates/ruff_linter/src/codes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1032,6 +1032,7 @@ pub fn code_to_rule(linter: Linter, code: &str) -> Option<(RuleGroup, Rule)> {

// airflow
(Airflow, "001") => (RuleGroup::Stable, rules::airflow::rules::AirflowVariableNameTaskIdMismatch),
(Airflow, "301") => (RuleGroup::Preview, rules::airflow::rules::AirflowDagNoScheduleArgument),

// perflint
(Perflint, "101") => (RuleGroup::Stable, rules::perflint::rules::UnnecessaryListCast),
Expand Down
1 change: 1 addition & 0 deletions crates/ruff_linter/src/rules/airflow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ mod tests {
use crate::{assert_messages, settings};

#[test_case(Rule::AirflowVariableNameTaskIdMismatch, Path::new("AIR001.py"))]
#[test_case(Rule::AirflowDagNoScheduleArgument, Path::new("AIR301.py"))]
fn rules(rule_code: Rule, path: &Path) -> Result<()> {
let snapshot = format!("{}_{}", rule_code.noqa_code(), path.to_string_lossy());
let diagnostics = test_path(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
use ruff_diagnostics::{Diagnostic, Violation};
use ruff_macros::{derive_message_formats, violation};
use ruff_python_ast::Expr;
use ruff_python_ast::{self as ast};
use ruff_python_semantic::Modules;
use ruff_text_size::Ranged;

use crate::checkers::ast::Checker;

/// ## What it does
/// Checks for a `DAG()` class or `@dag()` decorator without an explicit
/// `schedule` parameter.
///
/// ## Why is this bad?
/// The default `schedule` value on Airflow 2 is `timedelta(days=1)`, which is
/// almost never what a user is looking for. Airflow 3 changes this the default
/// to *None*, and would break existing DAGs using the implicit default.
Comment on lines +16 to +17
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we tell a bit more about how it breaks existing DAGS that use the implicit default. Will it fail with an exception or is it just that the default is different?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just the default is different—your DAGs will simply stop doing anything silently. I’ll add a paragraph to call this out.

///
/// If your DAG does not have an explicit `schedule` argument, Airflow 2
/// schedules a run for it every day (at the time determined by `start_date`).
/// Such a DAG will no longer be scheduled on Airflow 3 at all, without any
/// exceptions or other messages visible to the user.
///
/// ## Example
/// ```python
/// from airflow import DAG
///
///
/// # Using the implicit default schedule.
/// dag = DAG(dag_id="my_dag")
/// ```
///
/// Use instead:
/// ```python
/// from datetime import timedelta
///
/// from airflow import DAG
///
///
/// dag = DAG(dag_id="my_dag", schedule=timedelta(days=1))
/// ```
#[violation]
pub struct AirflowDagNoScheduleArgument;

impl Violation for AirflowDagNoScheduleArgument {
#[derive_message_formats]
fn message(&self) -> String {
"DAG should have an explicit `schedule` argument".to_string()
}
}

/// AIR301
pub(crate) fn dag_no_schedule_argument(checker: &mut Checker, expr: &Expr) {
if !checker.semantic().seen_module(Modules::AIRFLOW) {
return;
}

// Don't check non-call expressions.
let Expr::Call(ast::ExprCall {
func, arguments, ..
}) = expr
else {
return;
};

// We don't do anything unless this is a `DAG` (class) or `dag` (decorator
// function) from Airflow.
if !checker
.semantic()
.resolve_qualified_name(func)
.is_some_and(|qualname| matches!(qualname.segments(), ["airflow", .., "DAG" | "dag"]))
{
return;
}

// If there's a `schedule` keyword argument, we are good.
if arguments.find_keyword("schedule").is_some() {
return;
}

// Produce a diagnostic when the `schedule` keyword argument is not found.
let diagnostic = Diagnostic::new(AirflowDagNoScheduleArgument, expr.range());
checker.diagnostics.push(diagnostic);
}
2 changes: 2 additions & 0 deletions crates/ruff_linter/src/rules/airflow/rules/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
pub(crate) use dag_schedule_argument::*;
pub(crate) use task_variable_name::*;

mod dag_schedule_argument;
mod task_variable_name;
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
---
source: crates/ruff_linter/src/rules/airflow/mod.rs
---
AIR301.py:3:1: AIR301 DAG should have an explicit `schedule` argument
|
1 | from airflow import DAG, dag
2 |
3 | DAG(dag_id="class_default_schedule")
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ AIR301
4 |
5 | DAG(dag_id="class_schedule", schedule="@hourly")
|

AIR301.py:8:2: AIR301 DAG should have an explicit `schedule` argument
|
8 | @dag()
| ^^^^^ AIR301
9 | def decorator_default_schedule():
10 | pass
|
2 changes: 2 additions & 0 deletions crates/ruff_python_semantic/src/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1289,6 +1289,7 @@ impl<'a> SemanticModel<'a> {
"typing" => self.seen.insert(Modules::TYPING),
"typing_extensions" => self.seen.insert(Modules::TYPING_EXTENSIONS),
"attr" | "attrs" => self.seen.insert(Modules::ATTRS),
"airflow" => self.seen.insert(Modules::AIRFLOW),
_ => {}
}
}
Expand Down Expand Up @@ -1860,6 +1861,7 @@ bitflags! {
const FLASK = 1 << 24;
const ATTRS = 1 << 25;
const REGEX = 1 << 26;
const AIRFLOW = 1 << 27;
}
}

Expand Down
3 changes: 3 additions & 0 deletions ruff.schema.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading