Skip to content

Commit 383e6f9

Browse files
committed
Add AIR003: flag parse-time Variable uses
1 parent 8d5d41c commit 383e6f9

12 files changed

Lines changed: 364 additions & 0 deletions
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
from airflow import DAG
2+
from airflow.models import Variable
3+
from airflow.sdk import Variable as SdkVariable
4+
from airflow.decorators import task
5+
from airflow.operators.bash import BashOperator
6+
from airflow.models.baseoperator import BaseOperator
7+
8+
9+
# Violations (should trigger AIR003):
10+
11+
# Variable.get() at module level
12+
foo = Variable.get("foo") # AIR003
13+
bar = SdkVariable.get("bar") # AIR003
14+
15+
# Variable.get() in operator constructor arguments
16+
BashOperator(
17+
task_id="bad_inline",
18+
bash_command=f"echo {Variable.get('foo')}", # AIR003
19+
)
20+
21+
BashOperator(
22+
task_id="bad_env",
23+
bash_command="echo $FOO",
24+
env={"FOO": Variable.get("foo")}, # AIR003
25+
)
26+
27+
# Variable.get() in a regular (non-task) function
28+
def helper():
29+
return Variable.get("foo") # AIR003
30+
31+
32+
# No violations:
33+
34+
# Variable.get() inside a @task-decorated function
35+
@task
36+
def my_task():
37+
var = Variable.get("foo")
38+
print(var)
39+
40+
41+
# Variable.get() inside a @task() with parentheses
42+
@task()
43+
def my_task_with_parens():
44+
var = Variable.get("foo")
45+
print(var)
46+
47+
48+
# Variable.get() inside a @task.branch-decorated function
49+
@task.branch
50+
def my_branch_task():
51+
if Variable.get("flag") == "true":
52+
return ["downstream"]
53+
return []
54+
55+
56+
# Variable.get() inside an operator's execute() method
57+
class MyOperator(BaseOperator):
58+
def execute(self, context):
59+
var = Variable.get("foo")
60+
print(var)
61+
62+
63+
# Variable.get() inside pre_execute / post_execute
64+
class AnotherOperator(BaseOperator):
65+
def pre_execute(self, context):
66+
var = Variable.get("foo")
67+
print(var)
68+
69+
def post_execute(self, context, result):
70+
var = Variable.get("foo")
71+
print(var)
72+
73+
74+
# Jinja template usage (no Variable.get() call at all)
75+
BashOperator(
76+
task_id="good",
77+
bash_command="echo $FOO",
78+
env={"FOO": "{{ var.value.foo }}"},
79+
)
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from airflow.decorators import dag, task
2+
from airflow.models import Variable
3+
4+
var = Variable.get("foo") # AIR003
5+
6+
7+
@dag
8+
def my_dag():
9+
@task
10+
def my_task():
11+
Variable.get("bar") # OK - inside task
12+
13+
my_task()
14+
15+
16+
my_dag()
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from airflow.models import Variable
2+
3+
# No Dag import → not a Dag file → no violations
4+
foo = Variable.get("foo")
5+
6+
def helper():
7+
return Variable.get("bar")

crates/ruff_linter/src/checkers/ast/analyze/expression.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1278,6 +1278,9 @@ pub(crate) fn expression(expr: &Expr, checker: &Checker) {
12781278
if checker.is_rule_enabled(Rule::AirflowDagNoScheduleArgument) {
12791279
airflow::rules::dag_no_schedule_argument(checker, expr);
12801280
}
1281+
if checker.is_rule_enabled(Rule::AirflowVariableGetOutsideTask) {
1282+
airflow::rules::variable_get_outside_task(checker, expr);
1283+
}
12811284
if checker.is_rule_enabled(Rule::UnnecessaryRegularExpression) {
12821285
ruff::rules::unnecessary_regular_expression(checker, call);
12831286
}

crates/ruff_linter/src/codes.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1130,6 +1130,7 @@ pub fn code_to_rule(linter: Linter, code: &str) -> Option<(RuleGroup, Rule)> {
11301130
// airflow
11311131
(Airflow, "001") => rules::airflow::rules::AirflowVariableNameTaskIdMismatch,
11321132
(Airflow, "002") => rules::airflow::rules::AirflowDagNoScheduleArgument,
1133+
(Airflow, "003") => rules::airflow::rules::AirflowVariableGetOutsideTask,
11331134
(Airflow, "301") => rules::airflow::rules::Airflow3Removal,
11341135
(Airflow, "302") => rules::airflow::rules::Airflow3MovedToProvider,
11351136
(Airflow, "303") => rules::airflow::rules::Airflow3IncompatibleFunctionSignature,

crates/ruff_linter/src/rules/airflow/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,12 @@ mod tests {
1515

1616
#[test_case(Rule::AirflowVariableNameTaskIdMismatch, Path::new("AIR001.py"))]
1717
#[test_case(Rule::AirflowDagNoScheduleArgument, Path::new("AIR002.py"))]
18+
#[test_case(Rule::AirflowVariableGetOutsideTask, Path::new("AIR003.py"))]
19+
#[test_case(Rule::AirflowVariableGetOutsideTask, Path::new("AIR003_no_dag.py"))]
20+
#[test_case(
21+
Rule::AirflowVariableGetOutsideTask,
22+
Path::new("AIR003_dag_decorator.py")
23+
)]
1824
#[test_case(Rule::Airflow3Removal, Path::new("AIR301_args.py"))]
1925
#[test_case(Rule::Airflow3Removal, Path::new("AIR301_names.py"))]
2026
#[test_case(Rule::Airflow3Removal, Path::new("AIR301_names_fix.py"))]

crates/ruff_linter/src/rules/airflow/rules/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ pub(crate) use runtime_value_in_dag_or_task::*;
77
pub(crate) use suggested_to_move_to_provider_in_3::*;
88
pub(crate) use suggested_to_update_3_0::*;
99
pub(crate) use task_variable_name::*;
10+
pub(crate) use variable_get_outside_task::*;
1011

1112
mod dag_schedule_argument;
1213
mod function_signature_change_in_3;
@@ -17,3 +18,4 @@ mod runtime_value_in_dag_or_task;
1718
mod suggested_to_move_to_provider_in_3;
1819
mod suggested_to_update_3_0;
1920
mod task_variable_name;
21+
mod variable_get_outside_task;
Lines changed: 177 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,177 @@
1+
use ruff_macros::{ViolationMetadata, derive_message_formats};
2+
use ruff_python_ast::{self as ast, Expr, StmtFunctionDef};
3+
use ruff_python_semantic::analyze::class::any_qualified_base_class;
4+
use ruff_python_semantic::{Imported, Modules, ScopeKind, SemanticModel};
5+
use ruff_text_size::Ranged;
6+
7+
use crate::Violation;
8+
use crate::checkers::ast::Checker;
9+
use crate::rules::airflow::helpers::is_airflow_task;
10+
11+
/// ## What it does
12+
/// Checks for `Variable.get()` calls outside of Airflow task execution
13+
/// context (i.e., outside `@task`-decorated functions and operator
14+
/// `execute()` methods).
15+
///
16+
/// ## Why is this bad?
17+
/// Calling `Variable.get()` at module level or in operator constructor
18+
/// arguments causes a database query every time the Dag file is parsed
19+
/// by the scheduler. This can degrade Dag parsing performance and, in
20+
/// some cases, cause the Dag file to time out before it is fully parsed.
21+
///
22+
/// Instead, pass Airflow Variables to operators via Jinja templates
23+
/// (`{{ var.value.my_var }}` or `{{ var.json.my_var }}`), which defer
24+
/// the lookup until task execution.
25+
///
26+
/// `Variable.get()` inside `@task`-decorated functions and operator
27+
/// `execute()` methods is fine because those only run during task
28+
/// execution, not during Dag parsing.
29+
///
30+
/// Note that this rule may produce false positives for helper functions
31+
/// that are only invoked at task execution time (e.g., passed as
32+
/// `python_callable` to `PythonOperator`). In such cases, suppress the
33+
/// diagnostic with `# noqa: AIR003`.
34+
///
35+
/// ## Example
36+
/// ```python
37+
/// from airflow.sdk import Variable
38+
/// from airflow.operators.bash import BashOperator
39+
///
40+
///
41+
/// foo = Variable.get("foo")
42+
/// BashOperator(task_id="bad", bash_command="echo $FOO", env={"FOO": foo})
43+
/// ```
44+
///
45+
/// Use instead:
46+
/// ```python
47+
/// from airflow.operators.bash import BashOperator
48+
///
49+
///
50+
/// BashOperator(
51+
/// task_id="good",
52+
/// bash_command="echo $FOO",
53+
/// env={"FOO": "{{ var.value.foo }}"},
54+
/// )
55+
/// ```
56+
#[derive(ViolationMetadata)]
57+
#[violation_metadata(preview_since = "NEXT_RUFF_VERSION")]
58+
pub(crate) struct AirflowVariableGetOutsideTask {
59+
in_function: bool,
60+
}
61+
62+
impl Violation for AirflowVariableGetOutsideTask {
63+
#[derive_message_formats]
64+
fn message(&self) -> String {
65+
"`Variable.get()` outside of a task".to_string()
66+
}
67+
68+
fn fix_title(&self) -> Option<String> {
69+
if self.in_function {
70+
Some("Move into a `@task`-decorated function".to_string())
71+
} else {
72+
Some("Use Jinja templates instead".to_string())
73+
}
74+
}
75+
}
76+
77+
/// AIR003
78+
pub(crate) fn variable_get_outside_task(checker: &Checker, expr: &Expr) {
79+
if !checker.semantic().seen_module(Modules::AIRFLOW) {
80+
return;
81+
}
82+
83+
let Expr::Call(ast::ExprCall { func, .. }) = expr else {
84+
return;
85+
};
86+
87+
if !is_variable_get(func, checker.semantic()) {
88+
return;
89+
}
90+
91+
if !is_dag_file(checker.semantic()) {
92+
return;
93+
}
94+
95+
if in_task_execution_context(checker.semantic()) {
96+
return;
97+
}
98+
99+
let in_function = matches!(
100+
checker.semantic().current_scope().kind,
101+
ScopeKind::Function(_) | ScopeKind::Lambda(_)
102+
);
103+
104+
checker.report_diagnostic(AirflowVariableGetOutsideTask { in_function }, expr.range());
105+
}
106+
107+
/// Returns `true` if the file imports `DAG` or `dag` from airflow, which
108+
/// indicates it is a Dag definition file.
109+
fn is_dag_file(semantic: &SemanticModel) -> bool {
110+
semantic.global_scope().binding_ids().any(|binding_id| {
111+
semantic
112+
.binding(binding_id)
113+
.as_any_import()
114+
.is_some_and(|import| {
115+
matches!(
116+
import.qualified_name().segments(),
117+
["airflow", .., "DAG" | "dag"]
118+
)
119+
})
120+
})
121+
}
122+
123+
/// Returns `true` if `func` resolves to `Variable.get`.
124+
fn is_variable_get(func: &Expr, semantic: &SemanticModel) -> bool {
125+
semantic
126+
.resolve_qualified_name(func)
127+
.is_some_and(|qualified_name| {
128+
matches!(
129+
qualified_name.segments(),
130+
["airflow", "models" | "sdk", .., "Variable", "get"]
131+
)
132+
})
133+
}
134+
135+
/// Returns `true` if the current location is inside a `@task`-decorated function
136+
/// or a task-execution-time method on an Airflow operator subclass.
137+
fn in_task_execution_context(semantic: &SemanticModel) -> bool {
138+
semantic
139+
.current_statements()
140+
.find_map(|stmt| stmt.as_function_def_stmt())
141+
.is_some_and(|function_def| {
142+
is_airflow_task(function_def, semantic)
143+
|| is_operator_task_method(function_def, semantic)
144+
})
145+
}
146+
147+
/// Returns `true` if the function is a task-execution-time method (`execute`,
148+
/// `pre_execute`, or `post_execute`) defined inside a class that inherits from
149+
/// an Airflow operator.
150+
///
151+
/// This is similar to `helpers::is_method_in_subclass` but can't reuse it
152+
/// directly because we're called from inside the function body (need to walk up
153+
/// to the parent class scope), whereas `is_method_in_subclass` expects to already
154+
/// be at the class scope.
155+
fn is_operator_task_method(function_def: &StmtFunctionDef, semantic: &SemanticModel) -> bool {
156+
if !matches!(
157+
function_def.name.as_str(),
158+
"execute" | "pre_execute" | "post_execute"
159+
) {
160+
return false;
161+
}
162+
163+
let Some(parent_scope) = semantic.first_non_type_parent_scope(semantic.current_scope()) else {
164+
return false;
165+
};
166+
167+
let ScopeKind::Class(class_def) = parent_scope.kind else {
168+
return false;
169+
};
170+
171+
any_qualified_base_class(class_def, semantic, &|qn| {
172+
matches!(
173+
qn.segments(),
174+
["airflow", "models" | "sdk", .., "BaseOperator"]
175+
)
176+
})
177+
}
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
---
2+
source: crates/ruff_linter/src/rules/airflow/mod.rs
3+
---
4+
AIR003 `Variable.get()` outside of a task
5+
--> AIR003.py:12:7
6+
|
7+
11 | # Variable.get() at module level
8+
12 | foo = Variable.get("foo") # AIR003
9+
| ^^^^^^^^^^^^^^^^^^^
10+
13 | bar = SdkVariable.get("bar") # AIR003
11+
|
12+
help: Use Jinja templates instead
13+
14+
AIR003 `Variable.get()` outside of a task
15+
--> AIR003.py:13:7
16+
|
17+
11 | # Variable.get() at module level
18+
12 | foo = Variable.get("foo") # AIR003
19+
13 | bar = SdkVariable.get("bar") # AIR003
20+
| ^^^^^^^^^^^^^^^^^^^^^^
21+
14 |
22+
15 | # Variable.get() in operator constructor arguments
23+
|
24+
help: Use Jinja templates instead
25+
26+
AIR003 `Variable.get()` outside of a task
27+
--> AIR003.py:18:26
28+
|
29+
16 | BashOperator(
30+
17 | task_id="bad_inline",
31+
18 | bash_command=f"echo {Variable.get('foo')}", # AIR003
32+
| ^^^^^^^^^^^^^^^^^^^
33+
19 | )
34+
|
35+
help: Use Jinja templates instead
36+
37+
AIR003 `Variable.get()` outside of a task
38+
--> AIR003.py:24:17
39+
|
40+
22 | task_id="bad_env",
41+
23 | bash_command="echo $FOO",
42+
24 | env={"FOO": Variable.get("foo")}, # AIR003
43+
| ^^^^^^^^^^^^^^^^^^^
44+
25 | )
45+
|
46+
help: Use Jinja templates instead
47+
48+
AIR003 `Variable.get()` outside of a task
49+
--> AIR003.py:29:12
50+
|
51+
27 | # Variable.get() in a regular (non-task) function
52+
28 | def helper():
53+
29 | return Variable.get("foo") # AIR003
54+
| ^^^^^^^^^^^^^^^^^^^
55+
|
56+
help: Move into a `@task`-decorated function
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
---
2+
source: crates/ruff_linter/src/rules/airflow/mod.rs
3+
---
4+
AIR003 `Variable.get()` outside of a task
5+
--> AIR003_dag_decorator.py:4:7
6+
|
7+
2 | from airflow.models import Variable
8+
3 |
9+
4 | var = Variable.get("foo") # AIR003
10+
| ^^^^^^^^^^^^^^^^^^^
11+
|
12+
help: Use Jinja templates instead

0 commit comments

Comments
 (0)