From 1a4b2c1ba4bc94d2ff0b7afda24b997e2ca8c173 Mon Sep 17 00:00:00 2001 From: Sarit Kumar Si Date: Mon, 26 Apr 2021 19:41:13 -0700 Subject: [PATCH] Added new tasks as examples --- source-code/dags/hello-world.py | 10 +++--- source-code/dags/parallel-tasks.py | 54 ++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 5 deletions(-) create mode 100644 source-code/dags/parallel-tasks.py diff --git a/source-code/dags/hello-world.py b/source-code/dags/hello-world.py index 35050bf..bd110c2 100644 --- a/source-code/dags/hello-world.py +++ b/source-code/dags/hello-world.py @@ -23,22 +23,22 @@ description=f"Hello World!!!", ) as dag: - t1 = DummyOperator( + start = DummyOperator( task_id='Start', ) - t2 = BashOperator( + t1 = BashOperator( task_id='Trigger_Job', bash_command='curl "${PDI_CONN_STR}/kettle/executeJob/?rep=test-repo&job=/helloworld/helloworld-job"' ) - t3 = BashOperator( + t2 = BashOperator( task_id='Trigger_Transformation', bash_command='curl "${PDI_CONN_STR}/kettle/executeTrans/?rep=test-repo&trans=/helloworld/helloworld-trans"' ) - t4 = DummyOperator( + stop = DummyOperator( task_id='Stop', ) - t1 >> [t2, t3] >> t4 \ No newline at end of file + start >> [t1, t2] >> stop \ No newline at end of file diff --git a/source-code/dags/parallel-tasks.py b/source-code/dags/parallel-tasks.py new file mode 100644 index 0000000..5c2e80f --- /dev/null +++ b/source-code/dags/parallel-tasks.py @@ -0,0 +1,54 @@ +# To illustrate how we can trigger a job/transformation in the PDI container via Carte APIs +# Reference: https://help.pentaho.com/Documentation/9.1/Developer_center/REST_API_Reference/Carte + +from airflow import DAG +from airflow.utils.dates import days_ago +from airflow.operators.bash_operator import BashOperator +from airflow.operators.dummy import DummyOperator + +args = { + "owner": "airflow", + "start_date": days_ago(1), + "depends_on_past": False, + "wait_for_downstream": False, + "catchup": False, +} + + +with DAG( + dag_id="hello-world", + default_args=args, + schedule_interval=None, + catchup=False, + description=f"Hello World!!!", +) as dag: + + start = DummyOperator( + task_id='Start', + ) + + t1 = BashOperator( + task_id='Trigger_Job1', + bash_command='curl "${PDI_CONN_STR}/kettle/executeJob/?rep=test-repo&job=/helloworld/helloworld-job"' + ) + + t2 = BashOperator( + task_id='Trigger_Job2', + bash_command='curl "${PDI_CONN_STR}/kettle/executeJob/?rep=test-repo&job=/helloworld/helloworld-job"' + ) + + t3 = BashOperator( + task_id='Trigger_Job3', + bash_command='curl "${PDI_CONN_STR}/kettle/executeJob/?rep=test-repo&job=/helloworld/helloworld-job"' + ) + + t4 = BashOperator( + task_id='Trigger_Transformation', + bash_command='curl "${PDI_CONN_STR}/kettle/executeTrans/?rep=test-repo&trans=/helloworld/helloworld-trans"' + ) + + stop = DummyOperator( + task_id='Stop', + ) + + start >> [t1, t2, t3, t4] >> stop \ No newline at end of file