-
Notifications
You must be signed in to change notification settings - Fork 14.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Bug: @task.skip_if
decorator fails when used with @task.virtualenv
in Airflow TaskFlow API
#43354
Comments
In this example, I believe the correct syntax should be |
Sorry, the intention is not to skip the task, but to execute the task, I got a bit messed up with another bug report. Updated the text above. |
I've investigated the root cause of the issue. The problem occurs in the rendered Python script (as shown below) that executes in the virtual environment through jinja templating. Specifically, in the Script section (where DAG authors define their code), we're using from __future__ import annotations
import pickle
import sys
if sys.version_info >= (3,6):
try:
from airflow.plugins_manager import integrate_macros_plugins
integrate_macros_plugins()
except ImportError:
pass
# Script
@task.skip_if(lambda x: False)
def potentially_skipped_task():
import requests
import time
print("This task should be skipped, but it might run anyway!")
response = requests.get("https://example.com")
print(f"Response status code: {response.status_code}")
# monkey patching for the cases when python_callable is part of the dag module.
import types
unusual_prefix_df02535c9d01616ab041af6470774738d51e9a41_43354 = types.ModuleType("unusual_prefix_df02535c9d01616ab041af6470774738d51e9a41_43354")
unusual_prefix_df02535c9d01616ab041af6470774738d51e9a41_43354.potentially_skipped_task = potentially_skipped_task
sys.modules["unusual_prefix_df02535c9d01616ab041af6470774738d51e9a41_43354"] = unusual_prefix_df02535c9d01616ab041af6470774738d51e9a41_43354
arg_dict = {"args": [], "kwargs": {}}
# Read string args
with open(sys.argv[3], "r") as file:
virtualenv_string_args = list(map(lambda x: x.strip(), list(file)))
try:
res = potentially_skipped_task(*arg_dict["args"], **arg_dict["kwargs"])
except Exception as e:
with open(sys.argv[4], "w") as file:
file.write(str(e))
raise
# Write output
with open(sys.argv[2], "wb") as file:
if res is not None:
pickle.dump(res, file) |
Just found that #41832 has covered this issue, I guess we would have a patch at v2.10.3 cc @phi-friday |
Closing as fixed in 2.10.3 |
In [2]: import airflow
/Users/***/git/python/repo/***/.venv/lib/python3.12/site-packages/airflow/configuration.py:859 FutureWarning: section/key [core/sql_alchemy_conn] has been deprecated, you should use[database/sql_alchemy_conn] instead. Please update your `conf.get*` call to use the new name
In [3]: airflow.__version__
Out[3]: '2.10.3'
In [4]: from airflow.utils.decorators import remove_task_decorator
In [5]: import inspect
In [6]: source = inspect.getsource(remove_task_decorator)
In [7]: print(source)
def remove_task_decorator(python_source: str, task_decorator_name: str) -> str:
"""
Remove @task or similar decorators as well as @setup and @teardown.
:param python_source: The python source code
:param task_decorator_name: the decorator name
TODO: Python 3.9+: Rewrite this to use ast.parse and ast.unparse
"""
def _remove_task_decorator(py_source, decorator_name):
# if no line starts with @decorator_name, we can early exit
for line in py_source.split("\n"):
if line.startswith(decorator_name):
break
else:
return python_source
split = python_source.split(decorator_name, 1)
before_decorator, after_decorator = split[0], split[1]
if after_decorator[0] == "(":
after_decorator = _balance_parens(after_decorator)
if after_decorator[0] == "\n":
after_decorator = after_decorator[1:]
return before_decorator + after_decorator
decorators = ["@setup", "@teardown", task_decorator_name]
for decorator in decorators:
python_source = _remove_task_decorator(python_source, decorator)
return python_source This does not appear to have been fixed in 2.10.3. |
Apache Airflow version
2.10.2
If "Other Airflow 2 version" selected, which one?
No response
What happened?
When using the
@task.skip_if
decorator in combination with the@task.virtualenv
decorator in an Airflow DAG using the TaskFlow API, the task fails with aNameError: name 'task' is not defined
error.What you think should happen instead?
The task should execute.
How to reproduce
Run the following DAG:
Operating System
Ubuntu 22
Versions of Apache Airflow Providers
No response
Deployment
Docker-Compose
Deployment details
LocalExecutor
Anything else?
Error logs:
Are you willing to submit PR?
Code of Conduct
The text was updated successfully, but these errors were encountered: