Skip to content

Commit 93872da

Browse files
committed
Add AIR005: flag parse-time Variable uses
1 parent 50f8602 commit 93872da

12 files changed

Lines changed: 399 additions & 0 deletions
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
from airflow import DAG
2+
from airflow.models import Variable
3+
from airflow.sdk import Variable as SdkVariable
4+
from airflow.decorators import task
5+
from airflow.operators.bash import BashOperator
6+
from airflow.models.baseoperator import BaseOperator
7+
8+
9+
# Violations (should trigger AIR005):
10+
11+
# Variable.get() at module level
12+
foo = Variable.get("foo") # AIR005
13+
bar = SdkVariable.get("bar") # AIR005
14+
15+
# Variable.get() in operator constructor arguments
16+
BashOperator(
17+
task_id="bad_inline",
18+
bash_command=f"echo {Variable.get('foo')}", # AIR005
19+
)
20+
21+
BashOperator(
22+
task_id="bad_env",
23+
bash_command="echo $FOO",
24+
env={"FOO": Variable.get("foo")}, # AIR005
25+
)
26+
27+
# Variable.get() in a regular (non-task) function
28+
def helper():
29+
return Variable.get("foo") # AIR005
30+
31+
32+
# No violations:
33+
34+
# Variable.get() inside a @task-decorated function
35+
@task
36+
def my_task():
37+
var = Variable.get("foo")
38+
print(var)
39+
40+
41+
# Variable.get() inside a @task() with parentheses
42+
@task()
43+
def my_task_with_parens():
44+
var = Variable.get("foo")
45+
print(var)
46+
47+
48+
# Variable.get() inside a @task.branch-decorated function
49+
@task.branch
50+
def my_branch_task():
51+
if Variable.get("flag") == "true":
52+
return ["downstream"]
53+
return []
54+
55+
56+
# Variable.get() inside an operator's execute() method
57+
class MyOperator(BaseOperator):
58+
def execute(self, context):
59+
var = Variable.get("foo")
60+
print(var)
61+
62+
63+
# Variable.get() inside pre_execute / post_execute
64+
class AnotherOperator(BaseOperator):
65+
def pre_execute(self, context):
66+
var = Variable.get("foo")
67+
print(var)
68+
69+
def post_execute(self, context, result):
70+
var = Variable.get("foo")
71+
print(var)
72+
73+
74+
# Jinja template usage (no Variable.get() call at all)
75+
BashOperator(
76+
task_id="good",
77+
bash_command="echo $FOO",
78+
env={"FOO": "{{ var.value.foo }}"},
79+
)
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
from airflow.decorators import dag, task
2+
from airflow.models import Variable
3+
4+
var = Variable.get("foo") # AIR005
5+
6+
7+
@dag
8+
def my_dag():
9+
@task
10+
def my_task():
11+
Variable.get("bar") # OK - inside task
12+
13+
my_task()
14+
15+
16+
my_dag()
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
from airflow.models import Variable
2+
3+
# No Dag import → not a Dag file → no violations
4+
foo = Variable.get("foo")
5+
6+
def helper():
7+
return Variable.get("bar")

crates/ruff_linter/src/checkers/ast/analyze/expression.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1272,6 +1272,9 @@ pub(crate) fn expression(expr: &Expr, checker: &Checker) {
12721272
if checker.is_rule_enabled(Rule::AirflowDagNoScheduleArgument) {
12731273
airflow::rules::dag_no_schedule_argument(checker, expr);
12741274
}
1275+
if checker.is_rule_enabled(Rule::AirflowVariableGetOutsideTask) {
1276+
airflow::rules::variable_get_outside_task(checker, expr);
1277+
}
12751278
if checker.is_rule_enabled(Rule::UnnecessaryRegularExpression) {
12761279
ruff::rules::unnecessary_regular_expression(checker, call);
12771280
}

crates/ruff_linter/src/codes.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1127,6 +1127,7 @@ pub fn code_to_rule(linter: Linter, code: &str) -> Option<(RuleGroup, Rule)> {
11271127
// airflow
11281128
(Airflow, "001") => rules::airflow::rules::AirflowVariableNameTaskIdMismatch,
11291129
(Airflow, "002") => rules::airflow::rules::AirflowDagNoScheduleArgument,
1130+
(Airflow, "005") => rules::airflow::rules::AirflowVariableGetOutsideTask,
11301131
(Airflow, "301") => rules::airflow::rules::Airflow3Removal,
11311132
(Airflow, "302") => rules::airflow::rules::Airflow3MovedToProvider,
11321133
(Airflow, "303") => rules::airflow::rules::Airflow3IncompatibleFunctionSignature,

crates/ruff_linter/src/rules/airflow/mod.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,12 @@ mod tests {
1515

1616
#[test_case(Rule::AirflowVariableNameTaskIdMismatch, Path::new("AIR001.py"))]
1717
#[test_case(Rule::AirflowDagNoScheduleArgument, Path::new("AIR002.py"))]
18+
#[test_case(Rule::AirflowVariableGetOutsideTask, Path::new("AIR005.py"))]
19+
#[test_case(Rule::AirflowVariableGetOutsideTask, Path::new("AIR005_no_dag.py"))]
20+
#[test_case(
21+
Rule::AirflowVariableGetOutsideTask,
22+
Path::new("AIR005_dag_decorator.py")
23+
)]
1824
#[test_case(Rule::Airflow3Removal, Path::new("AIR301_args.py"))]
1925
#[test_case(Rule::Airflow3Removal, Path::new("AIR301_names.py"))]
2026
#[test_case(Rule::Airflow3Removal, Path::new("AIR301_names_fix.py"))]

crates/ruff_linter/src/rules/airflow/rules/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ pub(crate) use removal_in_3::*;
66
pub(crate) use suggested_to_move_to_provider_in_3::*;
77
pub(crate) use suggested_to_update_3_0::*;
88
pub(crate) use task_variable_name::*;
9+
pub(crate) use variable_get_outside_task::*;
910

1011
mod dag_schedule_argument;
1112
mod function_signature_change_in_3;
@@ -15,3 +16,4 @@ mod removal_in_3;
1516
mod suggested_to_move_to_provider_in_3;
1617
mod suggested_to_update_3_0;
1718
mod task_variable_name;
19+
mod variable_get_outside_task;
Lines changed: 218 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,218 @@
1+
use ruff_macros::{ViolationMetadata, derive_message_formats};
2+
use ruff_python_ast::helpers::map_callable;
3+
use ruff_python_ast::{self as ast, Expr, StmtFunctionDef};
4+
use ruff_python_semantic::{BindingKind, Imported, Modules, ScopeKind, SemanticModel};
5+
use ruff_text_size::Ranged;
6+
7+
use crate::Violation;
8+
use crate::checkers::ast::Checker;
9+
use crate::rules::airflow::helpers::is_airflow_task;
10+
11+
/// ## What it does
12+
/// Checks for `Variable.get()` calls outside of Airflow task execution
13+
/// context (i.e., outside `@task`-decorated functions and operator
14+
/// `execute()` methods).
15+
///
16+
/// ## Why is this bad?
17+
/// Calling `Variable.get()` at module level or in operator constructor
18+
/// arguments causes a database query every time the Dag file is parsed
19+
/// by the scheduler. This can degrade Dag parsing performance and, in
20+
/// some cases, cause the Dag file to time out before it is fully parsed.
21+
///
22+
/// Instead, pass Airflow Variables to operators via Jinja templates
23+
/// (`{{ var.value.my_var }}` or `{{ var.json.my_var }}`), which defer
24+
/// the lookup until task execution.
25+
///
26+
/// `Variable.get()` inside `@task`-decorated functions and operator
27+
/// `execute()` methods is fine because those only run during task
28+
/// execution, not during Dag parsing.
29+
///
30+
/// Note that this rule may produce false positives for helper functions
31+
/// that are only invoked at task execution time (e.g., passed as
32+
/// `python_callable` to `PythonOperator`). In such cases, suppress the
33+
/// diagnostic with `# noqa: AIR005`.
34+
///
35+
/// ## Example
36+
/// ```python
37+
/// from airflow.sdk import Variable
38+
/// from airflow.operators.bash import BashOperator
39+
///
40+
///
41+
/// foo = Variable.get("foo")
42+
/// BashOperator(task_id="bad", bash_command="echo $FOO", env={"FOO": foo})
43+
/// ```
44+
///
45+
/// Use instead:
46+
/// ```python
47+
/// from airflow.operators.bash import BashOperator
48+
///
49+
///
50+
/// BashOperator(
51+
/// task_id="good",
52+
/// bash_command="echo $FOO",
53+
/// env={"FOO": "{{ var.value.foo }}"},
54+
/// )
55+
/// ```
56+
#[derive(ViolationMetadata)]
57+
#[violation_metadata(preview_since = "NEXT_RUFF_VERSION")]
58+
pub(crate) struct AirflowVariableGetOutsideTask {
59+
in_function: bool,
60+
}
61+
62+
impl Violation for AirflowVariableGetOutsideTask {
63+
#[derive_message_formats]
64+
fn message(&self) -> String {
65+
if self.in_function {
66+
"`Variable.get()` outside of a task; move into a `@task`-decorated function".to_string()
67+
} else {
68+
"`Variable.get()` outside of a task; use Jinja templates instead".to_string()
69+
}
70+
}
71+
}
72+
73+
/// AIR005
74+
pub(crate) fn variable_get_outside_task(checker: &Checker, expr: &Expr) {
75+
if !checker.semantic().seen_module(Modules::AIRFLOW) {
76+
return;
77+
}
78+
79+
let Expr::Call(ast::ExprCall { func, .. }) = expr else {
80+
return;
81+
};
82+
83+
if !is_variable_get(func, checker.semantic()) {
84+
return;
85+
}
86+
87+
if !is_dag_file(checker.semantic()) {
88+
return;
89+
}
90+
91+
if in_task_execution_context(checker.semantic()) {
92+
return;
93+
}
94+
95+
let in_function = checker
96+
.semantic()
97+
.current_statements()
98+
.any(ast::Stmt::is_function_def_stmt);
99+
100+
checker.report_diagnostic(AirflowVariableGetOutsideTask { in_function }, expr.range());
101+
}
102+
103+
/// Returns `true` if the file defines a Dag, either through a `DAG()` call
104+
/// (assignment or context manager) or a `@dag`-decorated function. Falls back
105+
/// to checking for `DAG`/`dag` imports from airflow as a heuristic for cases
106+
/// where the Dag definition appears later in the file.
107+
fn is_dag_file(semantic: &SemanticModel) -> bool {
108+
semantic.global_scope().binding_ids().any(|binding_id| {
109+
let binding = semantic.binding(binding_id);
110+
111+
match &binding.kind {
112+
// `@dag def my_dag(): ...`
113+
BindingKind::FunctionDefinition(_) => binding.source.is_some_and(|source| {
114+
let stmt = semantic.statement(source);
115+
stmt.as_function_def_stmt().is_some_and(|func_def| {
116+
func_def.decorator_list.iter().any(|decorator| {
117+
let expr = map_callable(&decorator.expression);
118+
semantic
119+
.resolve_qualified_name(expr)
120+
.is_some_and(|qn| matches!(qn.segments(), ["airflow", .., "dag"]))
121+
})
122+
})
123+
}),
124+
125+
// `dag = DAG(...)` or `with DAG(...) as dag:`
126+
BindingKind::Assignment | BindingKind::WithItemVar => {
127+
binding.source.is_some_and(|source| {
128+
let stmt = semantic.statement(source);
129+
has_dag_call(stmt, semantic)
130+
})
131+
}
132+
133+
// Fallback: import of `DAG` or `dag` from airflow (covers cases where
134+
// the Dag definition appears later in the file than the Variable.get() call).
135+
_ => binding.as_any_import().is_some_and(|import| {
136+
matches!(
137+
import.qualified_name().segments(),
138+
["airflow", .., "DAG" | "dag"]
139+
)
140+
}),
141+
}
142+
})
143+
}
144+
145+
/// Returns `true` if the statement contains a `DAG()` call.
146+
fn has_dag_call(stmt: &ast::Stmt, semantic: &SemanticModel) -> bool {
147+
let call = match stmt {
148+
ast::Stmt::Assign(ast::StmtAssign { value, .. }) => value.as_call_expr(),
149+
ast::Stmt::With(ast::StmtWith { items, .. }) => items
150+
.iter()
151+
.find_map(|item| item.context_expr.as_call_expr()),
152+
_ => None,
153+
};
154+
155+
call.is_some_and(|call| {
156+
semantic
157+
.resolve_qualified_name(&call.func)
158+
.is_some_and(|qn| matches!(qn.segments(), ["airflow", .., "DAG"]))
159+
})
160+
}
161+
162+
/// Returns `true` if `func` resolves to `Variable.get`.
163+
fn is_variable_get(func: &Expr, semantic: &SemanticModel) -> bool {
164+
semantic
165+
.resolve_qualified_name(func)
166+
.is_some_and(|qualified_name| {
167+
matches!(
168+
qualified_name.segments(),
169+
["airflow", "models" | "sdk", .., "Variable", "get"]
170+
)
171+
})
172+
}
173+
174+
/// Returns `true` if the current location is inside a `@task`-decorated function
175+
/// or a task-execution-time method on an Airflow operator subclass.
176+
fn in_task_execution_context(semantic: &SemanticModel) -> bool {
177+
semantic
178+
.current_statements()
179+
.find_map(|stmt| stmt.as_function_def_stmt())
180+
.is_some_and(|function_def| {
181+
is_airflow_task(function_def, semantic)
182+
|| is_operator_task_method(function_def, semantic)
183+
})
184+
}
185+
186+
/// Returns `true` if the function is a task-execution-time method (`execute`,
187+
/// `pre_execute`, or `post_execute`) defined inside a class that inherits from
188+
/// an Airflow operator.
189+
///
190+
// This is similar to `helpers::is_method_in_subclass` but can't reuse it
191+
// directly because we're called from inside the function body (need to walk up
192+
// to the parent class scope), whereas `is_method_in_subclass` expects to already
193+
// be at the class scope.
194+
fn is_operator_task_method(function_def: &StmtFunctionDef, semantic: &SemanticModel) -> bool {
195+
if !matches!(
196+
function_def.name.as_str(),
197+
"execute" | "pre_execute" | "post_execute"
198+
) {
199+
return false;
200+
}
201+
202+
let Some(parent_scope) = semantic.first_non_type_parent_scope(semantic.current_scope()) else {
203+
return false;
204+
};
205+
206+
let ScopeKind::Class(class_def) = parent_scope.kind else {
207+
return false;
208+
};
209+
210+
class_def.bases().iter().any(|base| {
211+
semantic.resolve_qualified_name(base).is_some_and(|qn| {
212+
matches!(
213+
qn.segments(),
214+
["airflow", "models" | "sdk", .., "BaseOperator"]
215+
)
216+
})
217+
})
218+
}

0 commit comments

Comments
 (0)