Skip to content

Commit

Permalink
Adding FileSensor example
Browse files Browse the repository at this point in the history
  • Loading branch information
romanoa77 committed Apr 6, 2024
1 parent d0183bc commit a054a67
Showing 1 changed file with 32 additions and 0 deletions.
32 changes: 32 additions & 0 deletions dags_airflow_ale_fsensor_exmp.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
"""
This is an example using a FileSensor
"""

import airflow.utils.dates
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.sensors.filesystem import FileSensor

dag = DAG(
dag_id="fsensor_exmp",
start_date=airflow.utils.dates.days_ago(3),
schedule_interval=None,
description="A demonstration DAG using FileSensor.",
#The execution doesn't proceed if a task fails
default_args={"depends_on_past": True},
)

create_metrics = DummyOperator(task_id="create_metrics", dag=dag)

for f_id in [1, 2, 3]:
wait = FileSensor(
task_id=f"wait_for_file_{f_id}",
filepath=f"/data/f_{f_id}/data.csv",
dag=dag,
)
copy = DummyOperator(task_id=f"copy_to_raw_supermarket_{f_id}", dag=dag)
process = DummyOperator(task_id=f"process_supermarket_{f_id}", dag=dag)
wait >> copy >> process >> create_metrics

0 comments on commit a054a67

Please sign in to comment.