diff --git a/ext/scheduler/airflow2/resources/base_dag.py b/ext/scheduler/airflow2/resources/base_dag.py index 1939cf1aa6..76d5124665 100644 --- a/ext/scheduler/airflow2/resources/base_dag.py +++ b/ext/scheduler/airflow2/resources/base_dag.py @@ -13,14 +13,10 @@ from __lib import optimus_sla_miss_notify, SuperKubernetesPodOperator, \ SuperExternalTaskSensor, ExternalHttpSensor -from __lib import JOB_START_EVENT_NAME, \ - JOB_END_EVENT_NAME, \ - log_start_event, \ - log_success_event, \ +from __lib import log_success_event, \ log_retry_event, \ log_failure_event, \ - EVENT_NAMES, \ - log_job_end, log_job_start + EVENT_NAMES SENSOR_DEFAULT_POKE_INTERVAL_IN_SECS = int(Variable.get("sensor_poke_interval_in_secs", default_var=15 * 60)) SENSOR_DEFAULT_TIMEOUT_IN_SECS = int(Variable.get("sensor_timeout_in_secs", default_var=15 * 60 * 60)) @@ -69,23 +65,6 @@ ] ) -publish_job_start_event = PythonOperator( - task_id = JOB_START_EVENT_NAME, - python_callable = log_job_start, - provide_context=True, - depends_on_past=False, - dag=dag - ) - -publish_job_end_event = PythonOperator( - task_id = JOB_END_EVENT_NAME, - python_callable = log_job_end, - provide_context=True, - trigger_rule= 'all_success', - depends_on_past=False, - dag=dag - ) - {{$baseTaskSchema := .Job.Task.Unit.Info -}} {{- $setCPURequest := not (empty .Metadata.Resource.Request.CPU) -}} {{- $setMemoryRequest := not (empty .Metadata.Resource.Request.Memory) -}} @@ -299,33 +278,33 @@ # upstream sensors -> base transformation task {{- range $i, $t := $.Job.Dependencies }} -publish_job_start_event >> wait_{{ $t.Job.Name | replace "-" "__dash__" | replace "." "__dot__" }} >> transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}} +wait_{{ $t.Job.Name | replace "-" "__dash__" | replace "." "__dot__" }} >> transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}} {{- end}} {{- range $_, $t := $.Job.ExternalDependencies.HTTPDependencies }} -publish_job_start_event >> wait_{{ $t.Name }} >> transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}} +wait_{{ $t.Name }} >> transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}} {{- end}} {{- range $_, $dependency := $.Job.ExternalDependencies.OptimusDependencies}} {{ $identity := print $dependency.Name "-" $dependency.ProjectName "-" $dependency.JobName }} -publish_job_start_event >> wait_{{ $identity | replace "-" "__dash__" | replace "." "__dot__" }} >> transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}} +wait_{{ $identity | replace "-" "__dash__" | replace "." "__dot__" }} >> transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}} {{- end}} {{if and (not $.Job.Dependencies) (not $.Job.ExternalDependencies.HTTPDependencies) (not $.Job.ExternalDependencies.OptimusDependencies)}} # if no sensor and dependency is configured -publish_job_start_event >> transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}} +transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}} {{end}} # post completion hook -transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}} >> publish_job_end_event +transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}} # set inter-dependencies between task and hooks {{- range $_, $task := .Job.Hooks }} {{- $hookSchema := $task.Unit.Info }} {{- if eq $hookSchema.HookType $.HookTypePre }} -publish_job_start_event >> hook_{{$hookSchema.Name | replace "-" "__dash__"}} >> transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}} +hook_{{$hookSchema.Name | replace "-" "__dash__"}} >> transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}} {{- end -}} {{- if eq $hookSchema.HookType $.HookTypePost }} -transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}} >> hook_{{$hookSchema.Name | replace "-" "__dash__"}} >> publish_job_end_event +transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}} >> hook_{{$hookSchema.Name | replace "-" "__dash__"}} {{- end -}} {{- if eq $hookSchema.HookType $.HookTypeFail }} -transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}} >> hook_{{$hookSchema.Name | replace "-" "__dash__"}} >> publish_job_end_event +transformation_{{$baseTaskSchema.Name | replace "-" "__dash__" | replace "." "__dot__"}} >> hook_{{$hookSchema.Name | replace "-" "__dash__"}} {{- end -}} {{- end }} @@ -334,7 +313,7 @@ {{- $hookSchema := $t.Unit.Info }} {{- range $_, $depend := $t.DependsOn }} {{- $dependHookSchema := $depend.Unit.Info }} -hook_{{$dependHookSchema.Name | replace "-" "__dash__"}} >> hook_{{$hookSchema.Name | replace "-" "__dash__"}} >> publish_job_end_event +hook_{{$dependHookSchema.Name | replace "-" "__dash__"}} >> hook_{{$hookSchema.Name | replace "-" "__dash__"}} {{- end }} {{- end }} @@ -349,7 +328,7 @@ {{- $fhookSchema := $ftask.Unit.Info }} {{- if eq $fhookSchema.HookType $.HookTypeFail }} hook_{{$fhookSchema.Name | replace "-" "__dash__"}}, {{- end -}} {{- end -}} -] >> publish_job_end_event +] {{- end -}} diff --git a/ext/scheduler/airflow2/resources/expected_compiled_template.py b/ext/scheduler/airflow2/resources/expected_compiled_template.py index adfb0f58c0..e131d0bf3c 100644 --- a/ext/scheduler/airflow2/resources/expected_compiled_template.py +++ b/ext/scheduler/airflow2/resources/expected_compiled_template.py @@ -13,14 +13,10 @@ from __lib import optimus_sla_miss_notify, SuperKubernetesPodOperator, \ SuperExternalTaskSensor, ExternalHttpSensor -from __lib import JOB_START_EVENT_NAME, \ - JOB_END_EVENT_NAME, \ - log_start_event, \ - log_success_event, \ +from __lib import log_success_event, \ log_retry_event, \ log_failure_event, \ - EVENT_NAMES, \ - log_job_end, log_job_start + EVENT_NAMES SENSOR_DEFAULT_POKE_INTERVAL_IN_SECS = int(Variable.get("sensor_poke_interval_in_secs", default_var=15 * 60)) SENSOR_DEFAULT_TIMEOUT_IN_SECS = int(Variable.get("sensor_timeout_in_secs", default_var=15 * 60 * 60)) @@ -61,23 +57,6 @@ ] ) -publish_job_start_event = PythonOperator( - task_id = JOB_START_EVENT_NAME, - python_callable = log_job_start, - provide_context=True, - depends_on_past=False, - dag=dag - ) - -publish_job_end_event = PythonOperator( - task_id = JOB_END_EVENT_NAME, - python_callable = log_job_end, - provide_context=True, - trigger_rule= 'all_success', - depends_on_past=False, - dag=dag - ) - JOB_DIR = "/data" IMAGE_PULL_POLICY="IfNotPresent" @@ -315,22 +294,22 @@ #################################### # upstream sensors -> base transformation task -publish_job_start_event >> wait_foo__dash__intra__dash__dep__dash__job >> transformation_bq -publish_job_start_event >> wait_foo__dash__inter__dash__dep__dash__job >> transformation_bq +wait_foo__dash__intra__dash__dep__dash__job >> transformation_bq +wait_foo__dash__inter__dash__dep__dash__job >> transformation_bq -publish_job_start_event >> wait_external__dash__optimus__dash__foo__dash__external__dash__optimus__dash__project__dash__foo__dash__external__dash__optimus__dash__dep__dash__job >> transformation_bq +wait_external__dash__optimus__dash__foo__dash__external__dash__optimus__dash__project__dash__foo__dash__external__dash__optimus__dash__dep__dash__job >> transformation_bq # post completion hook -transformation_bq >> publish_job_end_event +transformation_bq # set inter-dependencies between task and hooks -publish_job_start_event >> hook_transporter >> transformation_bq -transformation_bq >> hook_predator >> publish_job_end_event -transformation_bq >> hook_hook__dash__for__dash__fail >> publish_job_end_event +hook_transporter >> transformation_bq +transformation_bq >> hook_predator +transformation_bq >> hook_hook__dash__for__dash__fail # set inter-dependencies between hooks and hooks -hook_transporter >> hook_predator >> publish_job_end_event +hook_transporter >> hook_predator # arrange failure hook after post hooks -hook_predator >> [ hook_hook__dash__for__dash__fail,] >> publish_job_end_event \ No newline at end of file +hook_predator >> [ hook_hook__dash__for__dash__fail,] \ No newline at end of file