From e95332b9eb8de4cdcac464ff704bf64f3285e776 Mon Sep 17 00:00:00 2001 From: Romil Bhardwaj Date: Wed, 25 Sep 2024 20:49:00 -0700 Subject: [PATCH] [Examples] Add airflow example (#3982) * Airflow example * Airflow example * Airflow example * Airflow example * wip * Update airflow examples * Update airflow examples * Update airflow examples * Add to readme * Add to readme * Add to readme * lint * updates * less salesy * comments * comments * comments --- README.md | 2 +- docs/source/docs/index.rst | 2 +- examples/airflow/README.md | 9 + examples/airflow/shared_state/README.md | 174 ++++++++++++++++++ examples/airflow/shared_state/sky-pv.yaml | 11 ++ examples/airflow/shared_state/sky-sa.yaml | 18 ++ .../airflow/shared_state/sky_k8s_example.py | 64 +++++++ .../shared_state/sky_k8s_example_xcoms.py | 87 +++++++++ examples/airflow/training_workflow/README.md | 166 +++++++++++++++++ .../training_workflow/create_gcloud_secret.sh | 30 +++ .../airflow/training_workflow/sky-sa.yaml | 18 ++ .../sky_k8s_train_pipeline.py | 87 +++++++++ 12 files changed, 666 insertions(+), 2 deletions(-) create mode 100644 examples/airflow/README.md create mode 100644 examples/airflow/shared_state/README.md create mode 100644 examples/airflow/shared_state/sky-pv.yaml create mode 100644 examples/airflow/shared_state/sky-sa.yaml create mode 100644 examples/airflow/shared_state/sky_k8s_example.py create mode 100644 examples/airflow/shared_state/sky_k8s_example_xcoms.py create mode 100644 examples/airflow/training_workflow/README.md create mode 100755 examples/airflow/training_workflow/create_gcloud_secret.sh create mode 100644 examples/airflow/training_workflow/sky-sa.yaml create mode 100644 examples/airflow/training_workflow/sky_k8s_train_pipeline.py diff --git a/README.md b/README.md index f887c6d690f..1f646b0e995 100644 --- a/README.md +++ b/README.md @@ -180,7 +180,7 @@ Runnable examples: - [LocalGPT](./llm/localgpt) - [Falcon](./llm/falcon) - Add yours here & see more in [`llm/`](./llm)! -- Framework examples: [PyTorch DDP](https://github.com/skypilot-org/skypilot/blob/master/examples/resnet_distributed_torch.yaml), [DeepSpeed](./examples/deepspeed-multinode/sky.yaml), [JAX/Flax on TPU](https://github.com/skypilot-org/skypilot/blob/master/examples/tpu/tpuvm_mnist.yaml), [Stable Diffusion](https://github.com/skypilot-org/skypilot/tree/master/examples/stable_diffusion), [Detectron2](https://github.com/skypilot-org/skypilot/blob/master/examples/detectron2_docker.yaml), [Distributed](https://github.com/skypilot-org/skypilot/blob/master/examples/resnet_distributed_tf_app.py) [TensorFlow](https://github.com/skypilot-org/skypilot/blob/master/examples/resnet_app_storage.yaml), [Ray Train](examples/distributed_ray_train/ray_train.yaml), [NeMo](https://github.com/skypilot-org/skypilot/blob/master/examples/nemo/nemo.yaml), [programmatic grid search](https://github.com/skypilot-org/skypilot/blob/master/examples/huggingface_glue_imdb_grid_search_app.py), [Docker](https://github.com/skypilot-org/skypilot/blob/master/examples/docker/echo_app.yaml), [Cog](https://github.com/skypilot-org/skypilot/blob/master/examples/cog/), [Unsloth](https://github.com/skypilot-org/skypilot/blob/master/examples/unsloth/unsloth.yaml), [Ollama](https://github.com/skypilot-org/skypilot/blob/master/llm/ollama), [llm.c](https://github.com/skypilot-org/skypilot/tree/master/llm/gpt-2) and [many more (`examples/`)](./examples). +- Framework examples: [PyTorch DDP](https://github.com/skypilot-org/skypilot/blob/master/examples/resnet_distributed_torch.yaml), [DeepSpeed](./examples/deepspeed-multinode/sky.yaml), [JAX/Flax on TPU](https://github.com/skypilot-org/skypilot/blob/master/examples/tpu/tpuvm_mnist.yaml), [Stable Diffusion](https://github.com/skypilot-org/skypilot/tree/master/examples/stable_diffusion), [Detectron2](https://github.com/skypilot-org/skypilot/blob/master/examples/detectron2_docker.yaml), [Distributed](https://github.com/skypilot-org/skypilot/blob/master/examples/resnet_distributed_tf_app.py) [TensorFlow](https://github.com/skypilot-org/skypilot/blob/master/examples/resnet_app_storage.yaml), [Ray Train](examples/distributed_ray_train/ray_train.yaml), [NeMo](https://github.com/skypilot-org/skypilot/blob/master/examples/nemo/nemo.yaml), [programmatic grid search](https://github.com/skypilot-org/skypilot/blob/master/examples/huggingface_glue_imdb_grid_search_app.py), [Docker](https://github.com/skypilot-org/skypilot/blob/master/examples/docker/echo_app.yaml), [Cog](https://github.com/skypilot-org/skypilot/blob/master/examples/cog/), [Unsloth](https://github.com/skypilot-org/skypilot/blob/master/examples/unsloth/unsloth.yaml), [Ollama](https://github.com/skypilot-org/skypilot/blob/master/llm/ollama), [llm.c](https://github.com/skypilot-org/skypilot/tree/master/llm/gpt-2), [Airflow](./examples/airflow/training_workflow) and [many more (`examples/`)](./examples). Case Studies and Integrations: [Community Spotlights](https://blog.skypilot.co/community/) diff --git a/docs/source/docs/index.rst b/docs/source/docs/index.rst index 00a645a3834..c219fcd5c85 100644 --- a/docs/source/docs/index.rst +++ b/docs/source/docs/index.rst @@ -103,7 +103,7 @@ Runnable examples: * `Falcon `_ * Add yours here & see more in `llm/ `_! -* Framework examples: `PyTorch DDP `_, `DeepSpeed `_, `JAX/Flax on TPU `_, `Stable Diffusion `_, `Detectron2 `_, `Distributed `_ `TensorFlow `_, `NeMo `_, `programmatic grid search `_, `Docker `_, `Cog `_, `Unsloth `_, `Ollama `_, `llm.c `__ and `many more `_. +* Framework examples: `PyTorch DDP `_, `DeepSpeed `_, `JAX/Flax on TPU `_, `Stable Diffusion `_, `Detectron2 `_, `Distributed `_ `TensorFlow `_, `NeMo `_, `programmatic grid search `_, `Docker `_, `Cog `_, `Unsloth `_, `Ollama `_, `llm.c `__, `Airflow `_ and `many more `_. Case Studies and Integrations: `Community Spotlights `_ diff --git a/examples/airflow/README.md b/examples/airflow/README.md new file mode 100644 index 00000000000..80d86f22b97 --- /dev/null +++ b/examples/airflow/README.md @@ -0,0 +1,9 @@ +# SkyPilot Airflow integration examples + +This directory contains two examples of integrating SkyPilot with Apache Airflow: +1. [training_workflow](training_workflow) + * A simple training workflow that preprocesses data, trains a model, and evaluates it. + * Showcases how SkyPilot can help easily transition from dev to production in Airflow. +2. [shared_state](shared_state) + * An example showing how SkyPilot state can be persisted across Airflow tasks. + * Useful for operating on the same shared SkyPilot clusters from different Airflow tasks. \ No newline at end of file diff --git a/examples/airflow/shared_state/README.md b/examples/airflow/shared_state/README.md new file mode 100644 index 00000000000..5f39471351a --- /dev/null +++ b/examples/airflow/shared_state/README.md @@ -0,0 +1,174 @@ +# Running SkyPilot tasks in an Airflow DAG + +SkyPilot can be used in an orchestration framework like Airflow to launch tasks as a part of a DAG. + +In this guide, we demonstrate how some simple SkyPilot operations, such as launching a cluster, getting its logs and tearing it down, can be orchestrated using Airflow. + +

+ +

+ +## Prerequisites + +* Airflow installed on a [Kubernetes cluster](https://airflow.apache.org/docs/helm-chart/stable/index.html) or [locally](https://airflow.apache.org/docs/apache-airflow/stable/start.html) (`SequentialExecutor`) +* A Kubernetes cluster to run tasks on. We'll use GKE in this example. + * You can use our guide on [setting up a Kubernetes cluster](https://skypilot.readthedocs.io/en/latest/reference/kubernetes/kubernetes-setup.html). + * A persistent volume storage class should be available that supports at least `ReadWriteOnce` access mode. GKE has this supported by default. + +## Preparing the Kubernetes Cluster + +1. Provision a service account on your Kubernetes cluster for SkyPilot to use to launch tasks. + ```bash + kubectl apply -f sky-sa.yaml + ``` + For reference, here are the contents of `sky-sa.yaml`: + ```yaml + # sky-sa.yaml + apiVersion: v1 + kind: ServiceAccount + metadata: + name: sky-airflow-sa + namespace: default + --- + apiVersion: rbac.authorization.k8s.io/v1 + kind: ClusterRoleBinding + metadata: + name: sky-airflow-sa-binding + subjects: + - kind: ServiceAccount + name: sky-airflow-sa + namespace: default + roleRef: + # For minimal permissions, refer to https://skypilot.readthedocs.io/en/latest/cloud-setup/cloud-permissions/kubernetes.html + kind: ClusterRole + name: cluster-admin + apiGroup: rbac.authorization.k8s.io + ``` + +2. Provision a persistent volume for SkyPilot to store state across runs. + ```bash + kubectl apply -f sky-pv.yaml + ``` + For reference, here are the contents of `sky-pv.yaml`: + ```yaml + # sky-pv.yaml + apiVersion: v1 + kind: PersistentVolumeClaim + metadata: + name: sky-pvc + spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 10Gi # 10Gi is minimum for GKE pd-balanced + storageClassName: standard-rwo + ``` + Note: The `storageClassName` should be set to the appropriate storage class that's supported on your cluster. If you have many concurrent tasks, you may want to use a storage class that supports `ReadWriteMany` access mode. + +## Writing the Airflow DAG + +We provide an example DAG in `sky_k8s_example.py` that: +1. Launches a SkyPilot cluster. +2. Writes logs from the cluster to a local file +3. Checks the status of the cluster and prints to Airflow logs +4. Tears down the cluster. + +The DAG is defined in `sky_k8s_example.py`: + +```python +# sky_k8s_example.py +from airflow import DAG +from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator +from airflow.utils.dates import days_ago + +from kubernetes.client import models as k8s + +default_args = { + 'owner': 'airflow', + 'start_date': days_ago(1), +} + +def get_skypilot_task(task_id: str, sky_command: str): + skypilot_task = KubernetesPodOperator( + task_id=task_id, + name="skypilot-pod", + namespace="default", + image="us-central1-docker.pkg.dev/skypilot-375900/skypilotk8s/skypilot:20240613", + cmds=["/bin/bash", "-i", "-c"], + arguments=[ + "chown -R 1000:1000 /home/sky/.sky /home/sky/.ssh && " + "pip install skypilot-nightly[kubernetes] && " + f"{sky_command}"], + service_account_name="sky-airflow-sa", + env_vars={"HOME": "/home/sky"}, + volumes=[ + k8s.V1Volume( + name="sky-pvc", + persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource( + claim_name="sky-pvc" + ), + ), + ], + volume_mounts=[ + k8s.V1VolumeMount(name="sky-pvc", mount_path="/home/sky/.sky", + sub_path="sky"), + k8s.V1VolumeMount(name="sky-pvc", mount_path="/home/sky/.ssh", + sub_path="ssh"), + ], + is_delete_operator_pod=True, + get_logs=True, + ) + return skypilot_task + + +with DAG(dag_id='sky_k8s_example', + default_args=default_args, + schedule_interval=None, + catchup=False) as dag: + # Task to launch a SkyPilot cluster + cmds = ("git clone https://github.com/skypilot-org/skypilot.git && " + "sky launch -y -c train --cloud kubernetes skypilot/examples/minimal.yaml") + sky_launch = get_skypilot_task("sky_launch", cmds) + # Task to get the logs of the SkyPilot cluster + sky_logs = get_skypilot_task("sky_logs", "sky logs train > task_logs.txt") + # Task to get the list of SkyPilot clusters + sky_status = get_skypilot_task("sky_status", "sky status") + # Task to delete the SkyPilot cluster + sky_down = get_skypilot_task("sky_down", "sky down train") + + sky_launch >> sky_logs >> sky_status >> sky_down +``` + +## Running the DAG + +1. Copy the DAG file to the Airflow DAGs directory. + ```bash + cp sky_k8s_example.py /path/to/airflow/dags + # If your Airflow is running on Kubernetes, you may use kubectl cp to copy the file to the pod + # kubectl cp sky_k8s_example.py :/opt/airflow/dags + ``` +2. Run `airflow dags list` to confirm that the DAG is loaded. +3. Find the DAG in the Airflow UI (typically http://localhost:8080) and enable it. The UI may take a couple of minutes to reflect the changes. +4. Trigger the DAG from the Airflow UI using the `Trigger DAG` button. +5. Navigate to the run in the Airflow UI to see the DAG progress and logs of each task. + +

+ +

+

+ +

+ +## Tips + +1. **Persistent Volume**: If you have many concurrent tasks, you may want to use a storage class that supports [`ReadWriteMany`](https://kubernetes.io/docs/concepts/storage/persistent-volumes/#access-modes) access mode. +2. **Cloud credentials**: If you wish to run tasks on different clouds, you can configure cloud credentials in Kubernetes secrets and mount them in the Sky pod defined in the DAG. See [SkyPilot docs on setting up cloud credentials](https://skypilot.readthedocs.io/en/latest/getting-started/installation.html#cloud-account-setup) for more on how to configure credentials in the pod. +3. **Logging**: All SkyPilot logs are written to container stdout, which is captured as task logs in Airflow and displayed in the UI. You can also write logs to a file and read them in subsequent tasks. +4. **XComs for shared state**: Airflow also provides [XComs](https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html) for cross-task communication. [`sky_k8s_example_xcoms.py`](sky_k8s_example_xcoms.py) demonstrates how to use XComs to share state between tasks. + +## Future work: a native Airflow Executor built on SkyPilot + +SkyPilot can in the future provide a native Airflow Executor, that provides an operator similar to the `KubernetesPodOperator` but runs the task as native SkyPilot task. + +In such a setup, SkyPilot state management would no longer be required, as the executor will handle SkyPilot cluster launching and termination. \ No newline at end of file diff --git a/examples/airflow/shared_state/sky-pv.yaml b/examples/airflow/shared_state/sky-pv.yaml new file mode 100644 index 00000000000..c17198515c4 --- /dev/null +++ b/examples/airflow/shared_state/sky-pv.yaml @@ -0,0 +1,11 @@ +apiVersion: v1 +kind: PersistentVolumeClaim +metadata: + name: sky-pvc +spec: + accessModes: + - ReadWriteOnce + resources: + requests: + storage: 10Gi + storageClassName: standard-rwo diff --git a/examples/airflow/shared_state/sky-sa.yaml b/examples/airflow/shared_state/sky-sa.yaml new file mode 100644 index 00000000000..b791bafdec1 --- /dev/null +++ b/examples/airflow/shared_state/sky-sa.yaml @@ -0,0 +1,18 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: sky-airflow-sa + namespace: default +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: sky-airflow-sa-binding +subjects: +- kind: ServiceAccount + name: sky-airflow-sa + namespace: default +roleRef: + kind: ClusterRole + name: cluster-admin + apiGroup: rbac.authorization.k8s.io diff --git a/examples/airflow/shared_state/sky_k8s_example.py b/examples/airflow/shared_state/sky_k8s_example.py new file mode 100644 index 00000000000..e61b4e92e5c --- /dev/null +++ b/examples/airflow/shared_state/sky_k8s_example.py @@ -0,0 +1,64 @@ +from airflow import DAG +from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( + KubernetesPodOperator) +from airflow.utils.dates import days_ago +from kubernetes.client import models as k8s + +default_args = { + 'owner': 'airflow', + 'start_date': days_ago(1), +} + + +def get_skypilot_task(task_id: str, sky_command: str): + skypilot_task = KubernetesPodOperator( + task_id=task_id, + name="skypilot-pod", + namespace="default", + image= + "us-central1-docker.pkg.dev/skypilot-375900/skypilotk8s/skypilot:20240613", + cmds=["/bin/bash", "-i", "-c"], + arguments=[ + "chown -R 1000:1000 /home/sky/.sky /home/sky/.ssh && " + "pip install skypilot-nightly[kubernetes] && " + f"{sky_command}" + ], + service_account_name="sky-airflow-sa", + env_vars={"HOME": "/home/sky"}, + volumes=[ + k8s.V1Volume( + name="sky-pvc", + persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource( + claim_name="sky-pvc"), + ), + ], + volume_mounts=[ + k8s.V1VolumeMount(name="sky-pvc", + mount_path="/home/sky/.sky", + sub_path="sky"), + k8s.V1VolumeMount(name="sky-pvc", + mount_path="/home/sky/.ssh", + sub_path="ssh"), + ], + is_delete_operator_pod=True, + get_logs=True, + ) + return skypilot_task + + +with DAG(dag_id='sky_k8s_example', + default_args=default_args, + schedule_interval=None, + catchup=False) as dag: + # Task to launch a SkyPilot cluster + sky_launch = get_skypilot_task( + "sky_launch", + "sky launch -y -c train --cloud kubernetes -- echo training the model") + # Task to get the logs of the SkyPilot cluster + sky_logs = get_skypilot_task("sky_logs", "sky logs train > task_logs.txt") + # Task to get the list of SkyPilot clusters + sky_status = get_skypilot_task("sky_status", "sky status") + # Task to delete the SkyPilot cluster + sky_down = get_skypilot_task("sky_down", "sky down train") + + sky_launch >> sky_logs >> sky_status >> sky_down diff --git a/examples/airflow/shared_state/sky_k8s_example_xcoms.py b/examples/airflow/shared_state/sky_k8s_example_xcoms.py new file mode 100644 index 00000000000..3bbac3299b3 --- /dev/null +++ b/examples/airflow/shared_state/sky_k8s_example_xcoms.py @@ -0,0 +1,87 @@ +# This is a WIP example that uses xcom serialization to pass state.db and sky keys between tasks. +# This should not required PVCs to be mounted to the pod, and should be able to run on any Kubernetes cluster. +from airflow import DAG +from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( + KubernetesPodOperator) +from airflow.utils.dates import days_ago + +default_args = { + 'owner': 'airflow', + 'start_date': days_ago(1), +} + + +def get_skypilot_task(task_id: str, + sky_command: str, + previous_task_id: str = None, + serialize_xcom: bool = False): + cmds = [ + "/bin/bash", "-i", "-c", + "chown -R 1000:1000 /home/sky/.sky /home/sky/.ssh && " + ] + + if previous_task_id is not None: + # Deserialize state.db and sky keys from xcom (if needed) + # TODO(romilb): Implement this using {{ ti.xcom_pull() }} templating + cmds.append(' echo \'{{ ti.xcom_pull(task_ids="' + previous_task_id + + '")["state_db"] }}\' > /home/sky/.sky/state.db &&' + ' echo \'{{ ti.xcom_pull(task_ids="' + previous_task_id + + '")["sky_key"] }}\' > /home/sky/.ssh/sky-key &&' + ' echo \'{{ ti.xcom_pull(task_ids="' + previous_task_id + + '")["sky_key_pub"] }}\' > /home/sky/.ssh/sky-key.pub') + + cmds.append( + f"pip install skypilot-nightly[kubernetes] && {sky_command} && ") + + if serialize_xcom: + # Serialize state.db and sky keys into xcom + cmds.append( + 'echo \'{"state_db": "$(cat /home/sky/.sky/state.db)", ' + '"sky_key": "$(cat /home/sky/.ssh/sky-key)", ' + '"sky_key_pub": "$(cat /home/sky/.ssh/sky-key.pub)"}\' > /airflow/xcom/return.json' + ) + + task = KubernetesPodOperator( + task_id=task_id, + name="skypilot-pod", + namespace="default", + image= + "us-central1-docker.pkg.dev/skypilot-375900/skypilotk8s/skypilot:20240613", + cmds=["/bin/bash", "-i", "-c"], + arguments=["".join(cmds)], + service_account_name="sky-airflow-sa", + env_vars={"HOME": "/home/sky"}, + is_delete_operator_pod=True, + get_logs=True, + do_xcom_push=serialize_xcom # Only push XCom if we're serializing data + ) + return task + + +with DAG(dag_id='sky_k8s_example_xcoms', + default_args=default_args, + schedule_interval=None, + catchup=False) as dag: + # Task to launch a SkyPilot cluster + sky_launch = get_skypilot_task( + "sky_launch", + "sky launch -y -c train --cloud kubernetes -- echo training the model", + previous_task_id=None, + serialize_xcom=True) + # Task to get the logs of the SkyPilot cluster + sky_logs = get_skypilot_task("sky_logs", + "sky logs train > task_logs.txt", + previous_task_id='sky_launch', + serialize_xcom=True) + # Task to get the list of SkyPilot clusters + sky_status = get_skypilot_task("sky_status", + "sky status", + previous_task_id='sky_logs', + serialize_xcom=True) + # Task to delete the SkyPilot cluster + sky_down = get_skypilot_task("sky_down", + "sky down train", + previous_task_id='sky_status', + serialize_xcom=False) + + sky_launch >> sky_logs >> sky_status >> sky_down diff --git a/examples/airflow/training_workflow/README.md b/examples/airflow/training_workflow/README.md new file mode 100644 index 00000000000..dad08d8d3b0 --- /dev/null +++ b/examples/airflow/training_workflow/README.md @@ -0,0 +1,166 @@ +# Running SkyPilot tasks in Airflow + + +In this guide, we show how a training workflow involving data preprocessing, training and evaluation can be first easily developed with SkyPilot, and then orchestrated in Airflow. + +

+ +

+ +**💡 Tip:** SkyPilot also supports defining and running pipelines without Airflow. Check out [Jobs Pipelines](https://skypilot.readthedocs.io/en/latest/examples/managed-jobs.html#job-pipelines) for more information. + +## Why use SkyPilot with Airflow? +In AI workflows, **the transition from development to production is hard**. + +Workflow development happens ad-hoc, with a lot of interaction required +with the code and data. When moving this to an Airflow DAG in production, managing dependencies, environments and the +infra requirements of the workflow gets complex. Porting the code to an airflow requires significant time to test and +validate any changes, often requiring re-writing the code as Airflow operators. + +**SkyPilot seamlessly bridges the dev -> production gap**. + +SkyPilot can operate on any of your infra, allowing you to package and run the same code that you ran during development on a +production Airflow cluster. Behind the scenes, SkyPilot handles environment setup, dependency management, and infra orchestration, allowing you to focus on your code. + +Here's how you can use SkyPilot to take your dev workflows to production in Airflow: +1. **Define and test your workflow as SkyPilot tasks**. + - Use `sky launch` and [Sky VSCode integration](https://skypilot.readthedocs.io/en/latest/examples/interactive-development.html#dev-vscode) to run, debug and iterate on your code. +2. **Orchestrate SkyPilot tasks in Airflow** by invoking `sky launch` on their YAMLs as a task in the Airflow DAG. + - Airflow does the scheduling, logging, and monitoring, while SkyPilot handles the infra setup and task execution. + + +## Prerequisites + +* Airflow installed on a [Kubernetes cluster](https://airflow.apache.org/docs/helm-chart/stable/index.html) or [locally](https://airflow.apache.org/docs/apache-airflow/stable/start.html) (`SequentialExecutor`) +* A Kubernetes cluster to run tasks on. We'll use GKE in this example. +* A Google cloud account with GCS access to store the data for task. + * Follow [SkyPilot instructions](https://skypilot.readthedocs.io/en/latest/getting-started/installation.html#google-cloud-platform-gcp) to set up Google Cloud credentials. + +## Preparing the Kubernetes Cluster + +1. Provision a service account on your Kubernetes cluster for SkyPilot to use to launch tasks. + ```bash + kubectl apply -f sky-sa.yaml + ``` + For reference, here are the contents of `sky-sa.yaml`: + ```yaml + # sky-sa.yaml + apiVersion: v1 + kind: ServiceAccount + metadata: + name: sky-airflow-sa + namespace: default + --- + apiVersion: rbac.authorization.k8s.io/v1 + kind: ClusterRoleBinding + metadata: + name: sky-airflow-sa-binding + subjects: + - kind: ServiceAccount + name: sky-airflow-sa + namespace: default + roleRef: + # For minimal permissions, refer to https://skypilot.readthedocs.io/en/latest/cloud-setup/cloud-permissions/kubernetes.html + kind: ClusterRole + name: cluster-admin + apiGroup: rbac.authorization.k8s.io + ``` + +2. We will store intermediate task outputs in a google cloud bucket. Use the following command to create a unique bucket: + ```bash + gsutil mb gs:// + ``` + Take note of the bucket name, as it will be used in the task YAMLs. + +3. To provide SkyPilot GCP access, we will create GCP credentials as secrets that will be mounted in SkyPilot's pods. We provide a helper script `create_gcloud_secret.sh` to create the secret: + ```bash + ./create_gcloud_secret.sh + ``` + You can also use other methods, such as GKE workload identity federation, to provide SkyPilot pods access to GCP credentials. + +## Defining the tasks + +We will define the following tasks to mock a training workflow: +1. `data_preprocessing.yaml`: Generates data and writes it to a bucket. +2. `train.yaml`: Trains a model on the data in the bucket. +3. `eval.yaml`: Evaluates the model and writes evaluation results to the bucket. + +We have defined these tasks in the [mock_training_workflow](https://github.com/romilbhardwaj/mock_train_workflow) repository. Clone the repository and follow the instructions in the README to run the tasks. + +When developing the workflow, you can run the tasks independently using `sky launch`: + +```bash +git clone https://github.com/romilbhardwaj/mock_train_workflow.git +cd mock_train_workflow +# Run the data preprocessing task, replacing with the bucket you created above +sky launch -c data --env DATA_BUCKET_URL=gs:// data_preprocessing.yaml +``` + +The train and eval step can be run in a similar way: + +```bash +# Run the train task +sky launch -c train --env DATA_BUCKET_URL=gs:// train.yaml +``` + +Hint: You can use `ssh` and VSCode to [interactively develop](https://skypilot.readthedocs.io/en/latest/examples/interactive-development.html) and debug the tasks. + +Note: `eval` can be optionally run on the same cluster as `train` with `sky exec`. Refer to the `shared_state` airflow example on how to do this. + +## Writing the Airflow DAG + +Once we have developed the tasks, we can seamlessly port them to Airflow. + +1. **No changes required to our tasks -** we use the same YAMLs we wrote in the previous step to create an Airflow DAG in `sky_k8s_train_pipeline.py`. +2. **Airflow native logging** - SkyPilot logs are written to container stdout, which is captured as task logs in Airflow and displayed in the UI. +3. **Easy debugging** - If a task fails, you can independently run the task using `sky launch` to debug the issue. SkyPilot will recreate the environment in which the task failed. + +Here's a snippet of the DAG declaration in `sky_k8s_train_pipeline.py`: +```python +with DAG(dag_id='sky_k8s_train_pipeline', ...) as dag: + # Make sure bucket exists with gsutil mb -l us-central1 gs:// + bucket_url = "gs://sky-data-demo" + + # Launch data preprocessing task. We use --down to clean up the SkyPilot cluster after the task is done. + data_preprocess = get_skypilot_task("sky_data_preprocess", + f"sky launch -y -c data --down --cloud kubernetes --env DATA_BUCKET_URL={bucket_url} mock_train_workflow/data_preprocessing.yaml") + + # Task to train the model + train = get_skypilot_task("sky_train", + f"sky launch -y -c train --down --cloud kubernetes --env DATA_BUCKET_URL={bucket_url} mock_train_workflow/train.yaml") + + # Task to evaluate the trained model. This can optionally be run on the same cluster as the training task using `sky exec` + eval = get_skypilot_task("sky_eval", + f"sky launch -y -c eval --down --cloud kubernetes --env DATA_BUCKET_URL={bucket_url} mock_train_workflow/eval.yaml") + + data_preprocess >> train >> eval +``` + +Behind the scenes, the `get_skypilot_task` uses the `KubernetesPodOperator` to run the `sky` CLI in an ephemeral pod. All clusters are set to auto-down after the task is done, so no dangling clusters are left behind. + +## Running the DAG + +1. Copy the DAG file to the Airflow DAGs directory. + ```bash + cp sky_k8s_train_pipeline.py /path/to/airflow/dags + # If your Airflow is running on Kubernetes, you may use kubectl cp to copy the file to the pod + # kubectl cp sky_k8s_example.py :/opt/airflow/dags + ``` +2. Run `airflow dags list` to confirm that the DAG is loaded. +3. Find the DAG in the Airflow UI (typically http://localhost:8080) and enable it. The UI may take a couple of minutes to reflect the changes. +4. Trigger the DAG from the Airflow UI using the `Trigger DAG` button. +5. Navigate to the run in the Airflow UI to see the DAG progress and logs of each task. + +

+ +

+

+ +

+ +## Future work: a native Airflow Executor built on SkyPilot + +Currently this example relies on a helper `get_skypilot_task` method to wrap SkyPilot invocation in a `KubernetesPodOperator`, but in the future SkyPilot can +provide a native Airflow Executor. + +In such a setup, SkyPilot state management also not be required, as the executor will handle SkyPilot cluster launching and termination. \ No newline at end of file diff --git a/examples/airflow/training_workflow/create_gcloud_secret.sh b/examples/airflow/training_workflow/create_gcloud_secret.sh new file mode 100755 index 00000000000..fa9e7d902a9 --- /dev/null +++ b/examples/airflow/training_workflow/create_gcloud_secret.sh @@ -0,0 +1,30 @@ +#!/bin/bash + +# Define variables +GCLOUD_DIR="$HOME/.config/gcloud" +TAR_FILE="gcloud-config.tar.gz" +SECRET_NAME="gcloud-secret" + +# List of files and directories to include in the tarball +FILES_TO_TAR=( + "credentials.db" + "access_tokens.db" + "configurations" + "legacy_credentials" + "active_config" + "application_default_credentials.json" +) + +# Create a tarball with the specified files and directories +echo "Creating tarball..." +tar -czvf $TAR_FILE -C $GCLOUD_DIR "${FILES_TO_TAR[@]}" + +# Create the Kubernetes Secret using the tarball +echo "Creating Kubernetes secret..." +kubectl create secret generic $SECRET_NAME --from-file=gcloud-config.tar.gz=$TAR_FILE + +# Remove the tarball after the secret is created +echo "Cleaning up tarball..." +rm -f $TAR_FILE + +echo "Secret '$SECRET_NAME' created successfully and temporary tarball removed." diff --git a/examples/airflow/training_workflow/sky-sa.yaml b/examples/airflow/training_workflow/sky-sa.yaml new file mode 100644 index 00000000000..b791bafdec1 --- /dev/null +++ b/examples/airflow/training_workflow/sky-sa.yaml @@ -0,0 +1,18 @@ +apiVersion: v1 +kind: ServiceAccount +metadata: + name: sky-airflow-sa + namespace: default +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: sky-airflow-sa-binding +subjects: +- kind: ServiceAccount + name: sky-airflow-sa + namespace: default +roleRef: + kind: ClusterRole + name: cluster-admin + apiGroup: rbac.authorization.k8s.io diff --git a/examples/airflow/training_workflow/sky_k8s_train_pipeline.py b/examples/airflow/training_workflow/sky_k8s_train_pipeline.py new file mode 100644 index 00000000000..ca00926aed9 --- /dev/null +++ b/examples/airflow/training_workflow/sky_k8s_train_pipeline.py @@ -0,0 +1,87 @@ +from airflow import DAG +from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import ( + KubernetesPodOperator) +from airflow.utils.dates import days_ago +from kubernetes.client import models as k8s + +default_args = { + 'owner': 'airflow', + 'start_date': days_ago(1), +} + + +def get_skypilot_task(task_id: str, sky_command: str): + INIT_COMMANDS = ( + # Install gcloud CLI and source the bashrc for accessing buckets in tasks + 'sudo conda install -y -c conda-forge google-cloud-sdk ') + + # Install SkyPilot and clone the mock train workflow repo + # In your workflow, you can have skypilot and the code baked into the image + SETUP_COMMAND = ( + "pip install skypilot-nightly[kubernetes,gcp] &&" + "git clone https://github.com/romilbhardwaj/mock_train_workflow.git /home/sky/mock_train_workflow" + ) + + # Command to extract the gcloud secrets tarball + EXTRACT_GCLOUD = ( + "mkdir -p /home/sky/.config/gcloud && " + "tar -xzf /tmp/gcloud-secrets/gcloud-config.tar.gz -C /home/sky/.config/gcloud " + ) + + skypilot_task = KubernetesPodOperator( + task_id=task_id, + name="skypilot-pod", + namespace="default", + image= + "us-central1-docker.pkg.dev/skypilot-375900/skypilotk8s/skypilot:20240613", + cmds=["/bin/bash", "-i", "-c"], + arguments=[ + f"{INIT_COMMANDS} && " + f"{EXTRACT_GCLOUD} && " + f"{SETUP_COMMAND} && " + f"{sky_command}" + ], + service_account_name="sky-airflow-sa", + env_vars={"HOME": "/home/sky"}, + volumes=[ + k8s.V1Volume( + name="gcloud-secret-volume", + secret=k8s.V1SecretVolumeSource(secret_name="gcloud-secret"), + ), + ], + volume_mounts=[ + k8s.V1VolumeMount(name="gcloud-secret-volume", + mount_path="/tmp/gcloud-secrets"), + ], + is_delete_operator_pod=True, + get_logs=True, + ) + return skypilot_task + + +with DAG(dag_id='sky_k8s_train_pipeline', + default_args=default_args, + schedule_interval=None, + catchup=False) as dag: + # Make sure bucket exists with gsutil mb -l us-central1 gs:// + bucket_url = "gs://sky-data-demo" + + # Launch data preprocessing task. We use --down to clean up the SkyPilot cluster after the task is done. + data_preprocess = get_skypilot_task( + "sky_data_preprocess", + f"sky launch -y -c data --down --cloud kubernetes --env DATA_BUCKET_URL={bucket_url} mock_train_workflow/data_preprocessing.yaml" + ) + + # Task to train the model + train = get_skypilot_task( + "sky_train", + f"sky launch -y -c train --down --cloud kubernetes --env DATA_BUCKET_URL={bucket_url} mock_train_workflow/train.yaml" + ) + + # Task to evaluate the trained model. This can optionally be run on the same cluster as the training task using `sky exec` + eval = get_skypilot_task( + "sky_eval", + f"sky launch -y -c eval --down --cloud kubernetes --env DATA_BUCKET_URL={bucket_url} mock_train_workflow/eval.yaml" + ) + + data_preprocess >> train >> eval