Skip to content

Commit

Permalink
Issue #936 ad-hoc attempt to first run input staging job
Browse files Browse the repository at this point in the history
  • Loading branch information
soxofaan committed Dec 2, 2024
1 parent eca7a61 commit 1899538
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 7 deletions.
23 changes: 23 additions & 0 deletions openeogeotrellis/deploy/kube.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,29 @@ def on_started():
}
)

@app.route("/tmp/ogd936", methods=["GET"])
def tmp_ogd936():
"""Temporary endpoint to play with Calrissian based CWL job management"""
import kubernetes
from openeogeotrellis.integrations.calrissian import (
create_cwl_job_body,
launch_cwl_job_and_wait,
create_input_staging_job_body,
)

namespace = "calrissian-demo-project"
kubernetes.config.load_incluster_config()

# Input staging
body = create_input_staging_job_body(namespace=namespace)
res = launch_cwl_job_and_wait(body=body, namespace=namespace)

# CWL job
body = create_cwl_job_body(namespace=namespace)
res = launch_cwl_job_and_wait(body=body, namespace=namespace)

return f"Hello from the backend: {res!r}"

host = os.environ.get('SPARK_LOCAL_IP', None)
if host is None:
host, _ = get_socket()
Expand Down
91 changes: 84 additions & 7 deletions openeogeotrellis/integrations/calrissian.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,90 @@ class VolumeInfo:
read_only: Optional[bool] = None


def create_input_staging_job_body(
*,
namespace: Optional[str] = None,
) -> "kubernetes.client.V1Job":
"""
Input staging job to put CWL resources on the input data volume.
"""
import kubernetes.client

name = generate_unique_id(prefix="cjs")
namespace = namespace or get_backend_config().calrissian_namespace
assert namespace

# TODO: config for this?
security_context = kubernetes.client.V1SecurityContext(run_as_user=1000, run_as_group=1000)

volumes = [
VolumeInfo(
name="calrissian-input-data",
claim_name="calrissian-input-data",
mount_path="/calrissian/input-data",
read_only=True,
),
]

container = kubernetes.client.V1Container(
name="calrissian-input-staging",
image="alpine:3",
security_context=security_context,
command=["/bin/sh"],
args=[
"-c",
"; ".join(
[
"set -euxo pipefail",
"wget -O /tmp/calrissian-resources.tar.gz https://artifactory.vgt.vito.be/artifactory/auxdata-public/openeo/calrissian-resources/calrissian-resources.tar.gz",
"tar -xzvf /tmp/calrissian-resources.tar.gz -C /calrissian/input-data",
"ls -al /calrissian/input-data",
]
),
],
volume_mounts=[
kubernetes.client.V1VolumeMount(
name=v.name,
mount_path=v.mount_path,
read_only=v.read_only,
)
for v in volumes
],
)
body = kubernetes.client.V1Job(
metadata=kubernetes.client.V1ObjectMeta(
name=name,
namespace=namespace,
),
spec=kubernetes.client.V1JobSpec(
template=kubernetes.client.V1PodTemplateSpec(
spec=kubernetes.client.V1PodSpec(
containers=[container],
restart_policy="Never",
volumes=[
kubernetes.client.V1Volume(
name=v.name,
persistent_volume_claim=kubernetes.client.V1PersistentVolumeClaimVolumeSource(
claim_name=v.claim_name,
read_only=v.read_only,
),
)
for v in volumes
],
)
)
),
)
return body

def create_cwl_job_body(
*,
namespace: Optional[str] = None,
name: Optional[str] = None,
# TODO: arguments to set an actual CWL workflow and inputs
) -> "kubernetes.client.V1Job":
import kubernetes.client

name = name or generate_unique_id(prefix="cj")
name = generate_unique_id(prefix="cj")
namespace = namespace or get_backend_config().calrissian_namespace
container_image = get_backend_config().calrissian_image
if not namespace or not container_image:
Expand Down Expand Up @@ -54,6 +129,7 @@ def create_cwl_job_body(
),
]


calrissian_arguments = [
"--max-ram",
"2G",
Expand All @@ -64,12 +140,13 @@ def create_cwl_job_body(
"/calrissian/tmpout/",
"--outdir",
"/calrissian/output-data/",
"/calrissian/input-data/hello-workflow.cwl",
"/calrissian/input-data/hello-input.yaml",
"/calrissian/input-data/hello-world.cwl",
"--message",
"Hello EO world!",
]

container = kubernetes.client.V1Container(
name="calrissian",
name="calrissian-job",
image=container_image,
security_context=security_context,
command=["calrissian"],
Expand Down Expand Up @@ -122,8 +199,8 @@ def launch_cwl_job_and_wait(
body: "kubernetes.client.V1Job",
*,
namespace: str,
sleep: float = 10,
timeout: float = 300,
sleep: float = 5,
timeout: float = 60,
) -> "kubernetes.client.V1Job":
import kubernetes.client

Expand Down
15 changes: 15 additions & 0 deletions openeogeotrellis/integrations/resources/calrissian/hello-world.cwl
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
cwlVersion: v1.2
class: CommandLineTool
baseCommand: echo
stdout: hello-stdout.txt

inputs:
message:
type: string
default: "Hello World"
inputBinding:
position: 1

outputs:
stdout:
type: stdout

0 comments on commit 1899538

Please sign in to comment.