diff --git a/dev/airflow.values.yaml b/dev/airflow.values.yaml index 73fadc291a..49b8c1e85d 100644 --- a/dev/airflow.values.yaml +++ b/dev/airflow.values.yaml @@ -8,10 +8,13 @@ scheduler: hostPath: path: /tmp/colima/dags type: Directory - + livenessProbe: + command: ["bash", "-c", "airflow jobs check --job-type SchedulerJob --allow-multiple --limit 100"] + initialDelaySeconds: 10 + timeoutSeconds: 60 config: core: - dags_folder: /opt/airflow/dags/dags + dags_folder: /opt/airflow/dags webserver: expose_config: 'True' # by default this is 'False' diff --git a/ext/scheduler/airflow2/resources/__lib.py b/ext/scheduler/airflow2/resources/__lib.py index 03e68d36ab..df7607a75a 100644 --- a/ext/scheduler/airflow2/resources/__lib.py +++ b/ext/scheduler/airflow2/resources/__lib.py @@ -80,6 +80,7 @@ def __init__(self, optimus_namespacename, optimus_jobname, optimus_jobtype, + optimus_instancename, *args, **kwargs): super(SuperKubernetesPodOperator, self).__init__(*args, **kwargs) @@ -94,6 +95,7 @@ def __init__(self, self.optimus_hostname = optimus_hostname self.optimus_namespacename = optimus_namespacename self.optimus_jobname = optimus_jobname + self.optimus_instancename = optimus_instancename self.optimus_projectname = optimus_projectname self.optimus_jobtype = optimus_jobtype self._optimus_client = OptimusAPIClient(optimus_hostname) @@ -106,7 +108,7 @@ def render_init_containers(self, context): def fetch_env_from_optimus(self, context): scheduled_at = context["next_execution_date"].strftime(TIMESTAMP_FORMAT) - job_meta = self._optimus_client.get_job_run_input(scheduled_at, self.optimus_projectname, self.optimus_jobname, self.optimus_jobtype) + job_meta = self._optimus_client.get_job_run_input(scheduled_at, self.optimus_projectname, self.optimus_jobname, self.optimus_jobtype, self.optimus_instancename) return [ k8s.V1EnvVar(name=key,value=val) for key, val in job_meta["envs"].items() ] + [ @@ -245,10 +247,10 @@ def get_task_window(self, scheduled_at: str, version: int, window_size: str, win return response.json() - def get_job_run_input(self, execution_date: str, project_name: str, job_name: str, job_type: str) -> dict: + def get_job_run_input(self, execution_date: str, project_name: str, job_name: str, job_type: str, instance_name: str) -> dict: response = requests.post(url="{}/api/v1beta1/project/{}/job/{}/run_input".format(self.host, project_name, job_name), json={'scheduled_at': execution_date, - 'instance_name': job_name, + 'instance_name': instance_name, 'instance_type': "TYPE_" + job_type.upper()}) self._raise_error_if_request_failed(response) diff --git a/ext/scheduler/airflow2/resources/base_dag.py b/ext/scheduler/airflow2/resources/base_dag.py index 34c42aa217..d7debaabaa 100644 --- a/ext/scheduler/airflow2/resources/base_dag.py +++ b/ext/scheduler/airflow2/resources/base_dag.py @@ -159,6 +159,7 @@ optimus_namespacename="{{$.Namespace.Name}}", optimus_jobname="{{.Job.Name}}", optimus_jobtype="{{$.InstanceTypeTask}}", + optimus_instancename="{{$baseTaskSchema.Name}}", image_pull_policy=IMAGE_PULL_POLICY, namespace = conf.get('kubernetes', 'namespace', fallback="default"), image = {{ $baseTaskSchema.Image | quote}}, @@ -207,6 +208,7 @@ optimus_namespacename="{{$.Namespace.Name}}", optimus_jobname="{{$.Job.Name}}", optimus_jobtype="{{$.InstanceTypeHook}}", + optimus_instancename="{{$hookSchema.Name}}", image_pull_policy=IMAGE_PULL_POLICY, namespace = conf.get('kubernetes', 'namespace', fallback="default"), image = "{{ $hookSchema.Image }}", diff --git a/ext/scheduler/airflow2/resources/expected_compiled_template.py b/ext/scheduler/airflow2/resources/expected_compiled_template.py index f139e15729..5ffd9e80b2 100644 --- a/ext/scheduler/airflow2/resources/expected_compiled_template.py +++ b/ext/scheduler/airflow2/resources/expected_compiled_template.py @@ -121,6 +121,7 @@ optimus_namespacename="bar-namespace", optimus_jobname="foo", optimus_jobtype="task", + optimus_instancename="bq", image_pull_policy=IMAGE_PULL_POLICY, namespace = conf.get('kubernetes', 'namespace', fallback="default"), image = "example.io/namespace/image:latest", @@ -162,6 +163,7 @@ optimus_namespacename="bar-namespace", optimus_jobname="foo", optimus_jobtype="hook", + optimus_instancename="transporter", image_pull_policy=IMAGE_PULL_POLICY, namespace = conf.get('kubernetes', 'namespace', fallback="default"), image = "example.io/namespace/hook-image:latest", @@ -198,6 +200,7 @@ optimus_namespacename="bar-namespace", optimus_jobname="foo", optimus_jobtype="hook", + optimus_instancename="predator", image_pull_policy=IMAGE_PULL_POLICY, namespace = conf.get('kubernetes', 'namespace', fallback="default"), image = "example.io/namespace/predator-image:latest", @@ -234,6 +237,7 @@ optimus_namespacename="bar-namespace", optimus_jobname="foo", optimus_jobtype="hook", + optimus_instancename="hook-for-fail", image_pull_policy=IMAGE_PULL_POLICY, namespace = conf.get('kubernetes', 'namespace', fallback="default"), image = "example.io/namespace/fail-image:latest",