Skip to content

Commit

Permalink
fix : job_run_input call from airflow worker & fix airflow restart is…
Browse files Browse the repository at this point in the history
…sue in dev (#686)
  • Loading branch information
smarchint authored Dec 9, 2022
1 parent 2c3e2b8 commit 5116e0d
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 5 deletions.
7 changes: 5 additions & 2 deletions dev/airflow.values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,13 @@ scheduler:
hostPath:
path: /tmp/colima/dags
type: Directory

livenessProbe:
command: ["bash", "-c", "airflow jobs check --job-type SchedulerJob --allow-multiple --limit 100"]
initialDelaySeconds: 10
timeoutSeconds: 60
config:
core:
dags_folder: /opt/airflow/dags/dags
dags_folder: /opt/airflow/dags
webserver:
expose_config: 'True' # by default this is 'False'

Expand Down
8 changes: 5 additions & 3 deletions ext/scheduler/airflow2/resources/__lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ def __init__(self,
optimus_namespacename,
optimus_jobname,
optimus_jobtype,
optimus_instancename,
*args,
**kwargs):
super(SuperKubernetesPodOperator, self).__init__(*args, **kwargs)
Expand All @@ -94,6 +95,7 @@ def __init__(self,
self.optimus_hostname = optimus_hostname
self.optimus_namespacename = optimus_namespacename
self.optimus_jobname = optimus_jobname
self.optimus_instancename = optimus_instancename
self.optimus_projectname = optimus_projectname
self.optimus_jobtype = optimus_jobtype
self._optimus_client = OptimusAPIClient(optimus_hostname)
Expand All @@ -106,7 +108,7 @@ def render_init_containers(self, context):

def fetch_env_from_optimus(self, context):
scheduled_at = context["next_execution_date"].strftime(TIMESTAMP_FORMAT)
job_meta = self._optimus_client.get_job_run_input(scheduled_at, self.optimus_projectname, self.optimus_jobname, self.optimus_jobtype)
job_meta = self._optimus_client.get_job_run_input(scheduled_at, self.optimus_projectname, self.optimus_jobname, self.optimus_jobtype, self.optimus_instancename)
return [
k8s.V1EnvVar(name=key,value=val) for key, val in job_meta["envs"].items()
] + [
Expand Down Expand Up @@ -245,10 +247,10 @@ def get_task_window(self, scheduled_at: str, version: int, window_size: str, win
return response.json()


def get_job_run_input(self, execution_date: str, project_name: str, job_name: str, job_type: str) -> dict:
def get_job_run_input(self, execution_date: str, project_name: str, job_name: str, job_type: str, instance_name: str) -> dict:
response = requests.post(url="{}/api/v1beta1/project/{}/job/{}/run_input".format(self.host, project_name, job_name),
json={'scheduled_at': execution_date,
'instance_name': job_name,
'instance_name': instance_name,
'instance_type': "TYPE_" + job_type.upper()})

self._raise_error_if_request_failed(response)
Expand Down
2 changes: 2 additions & 0 deletions ext/scheduler/airflow2/resources/base_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@
optimus_namespacename="{{$.Namespace.Name}}",
optimus_jobname="{{.Job.Name}}",
optimus_jobtype="{{$.InstanceTypeTask}}",
optimus_instancename="{{$baseTaskSchema.Name}}",
image_pull_policy=IMAGE_PULL_POLICY,
namespace = conf.get('kubernetes', 'namespace', fallback="default"),
image = {{ $baseTaskSchema.Image | quote}},
Expand Down Expand Up @@ -207,6 +208,7 @@
optimus_namespacename="{{$.Namespace.Name}}",
optimus_jobname="{{$.Job.Name}}",
optimus_jobtype="{{$.InstanceTypeHook}}",
optimus_instancename="{{$hookSchema.Name}}",
image_pull_policy=IMAGE_PULL_POLICY,
namespace = conf.get('kubernetes', 'namespace', fallback="default"),
image = "{{ $hookSchema.Image }}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@
optimus_namespacename="bar-namespace",
optimus_jobname="foo",
optimus_jobtype="task",
optimus_instancename="bq",
image_pull_policy=IMAGE_PULL_POLICY,
namespace = conf.get('kubernetes', 'namespace', fallback="default"),
image = "example.io/namespace/image:latest",
Expand Down Expand Up @@ -162,6 +163,7 @@
optimus_namespacename="bar-namespace",
optimus_jobname="foo",
optimus_jobtype="hook",
optimus_instancename="transporter",
image_pull_policy=IMAGE_PULL_POLICY,
namespace = conf.get('kubernetes', 'namespace', fallback="default"),
image = "example.io/namespace/hook-image:latest",
Expand Down Expand Up @@ -198,6 +200,7 @@
optimus_namespacename="bar-namespace",
optimus_jobname="foo",
optimus_jobtype="hook",
optimus_instancename="predator",
image_pull_policy=IMAGE_PULL_POLICY,
namespace = conf.get('kubernetes', 'namespace', fallback="default"),
image = "example.io/namespace/predator-image:latest",
Expand Down Expand Up @@ -234,6 +237,7 @@
optimus_namespacename="bar-namespace",
optimus_jobname="foo",
optimus_jobtype="hook",
optimus_instancename="hook-for-fail",
image_pull_policy=IMAGE_PULL_POLICY,
namespace = conf.get('kubernetes', 'namespace', fallback="default"),
image = "example.io/namespace/fail-image:latest",
Expand Down

0 comments on commit 5116e0d

Please sign in to comment.