From 7ad1af0cbb1e1343b0e5dc05d12eab8049ee784f Mon Sep 17 00:00:00 2001 From: Jessie Young Date: Fri, 17 Jan 2025 14:28:56 -0800 Subject: [PATCH 1/5] Instructions to install Ray and Daft on an existing Kubernetes cluster, and support BYOC k8s clusters in daft-launcher Added docs for kuberay + daft installation, fixed minor linter issue --- README.md | 241 ++++++-- docs/kubernetes/README.md | 18 + docs/kubernetes/cloud.md | 50 ++ docs/kubernetes/local.md | 127 ++++ docs/kubernetes/on-prem.md | 33 ++ docs/kubernetes/ray-installation.md | 100 ++++ examples/hello_daft.py | 27 + src/main.rs | 875 +++++++++++++++++----------- src/template.toml | 46 +- src/template_k8s.toml | 19 + template.toml | 17 + 11 files changed, 1154 insertions(+), 399 deletions(-) create mode 100644 docs/kubernetes/README.md create mode 100644 docs/kubernetes/cloud.md create mode 100644 docs/kubernetes/local.md create mode 100644 docs/kubernetes/on-prem.md create mode 100644 docs/kubernetes/ray-installation.md create mode 100644 examples/hello_daft.py create mode 100644 src/template_k8s.toml create mode 100644 template.toml diff --git a/README.md b/README.md index aaa7c05..587d60d 100644 --- a/README.md +++ b/README.md @@ -10,45 +10,120 @@ [![Latest](https://img.shields.io/github/v/tag/Eventual-Inc/daft-launcher?label=latest&logo=GitHub)](https://github.com/Eventual-Inc/daft-launcher/tags) [![License](https://img.shields.io/badge/daft_launcher-docs-red.svg)](https://eventual-inc.github.io/daft-launcher) -# Daft Launcher +# Daft Launcher CLI Tool `daft-launcher` is a simple launcher for spinning up and managing Ray clusters for [`daft`](https://github.com/Eventual-Inc/Daft). -It abstracts away all the complexities of dealing with Ray yourself, allowing you to focus on running `daft` in a distributed manner. + +## Goal + +Getting started with Daft in a local environment is easy. +However, getting started with Daft in a cloud environment is substantially more difficult. +So much more difficult, in fact, that users end up spending more time setting up their environment than actually playing with our query engine. + +Daft Launcher aims to solve this problem by providing a simple CLI tool to remove all of this unnecessary heavy-lifting. ## Capabilities -1. Spinning up clusters. -2. Listing all available clusters (as well as their statuses). -3. Submitting jobs to a cluster. -4. Connecting to the cluster (to view the Ray dashboard and submit jobs using the Ray protocol). -5. Spinning down clusters. -6. Creating configuration files. -7. Running raw SQL statements using Daft's SQL API. +What Daft Launcher is capable of: +1. Spinning up clusters (AWS only) +2. Listing all available clusters as well as their statuses (AWS only) +3. Submitting jobs to a cluster (AWS and Kubernetes) +4. Connecting to the cluster (AWS only, Kubernetes coming soon) +5. Spinning down clusters (AWS only) +6. Creating configuration files (AWS and Kubernetes) +7. Running raw SQL statements (AWS only, Kubernetes coming soon) ## Currently supported cloud providers -- [x] AWS -- [ ] GCP -- [ ] Azure +Daft Launcher supports two modes of operation: +- **AWS**: For automatically provisioning and managing Ray clusters in AWS +- **Kubernetes**: For connecting to existing Ray clusters in Kubernetes -## Usage +### Command Support Matrix -You'll need a python package manager installed. -We highly recommend using [`uv`](https://astral.sh/blog/uv) for all things python! +| Command | AWS | Kubernetes | +|----------|-----|------------| +| init | ✅ | ✅ | +| up | ✅ | ❌ | +| submit | ✅ | ✅ | +| stop | ✅ | ❌ | +| kill | ✅ | ❌ | +| list | ✅ | ❌ | +| connect | ✅ | ❌ | +| ssh | ✅ | ❌ | +| sql | ✅ | ❌ | -### AWS +## Usage -If you're using AWS, you'll need: -1. A valid AWS account with the necessary IAM role to spin up EC2 instances. - This IAM role can either be created by you (assuming you have the appropriate permissions). - Or this IAM role will need to be created by your administrator. -2. The [AWS CLI](https://aws.amazon.com/cli) installed and configured on your machine. -3. To login using the AWS CLI. - For full instructions, please look [here](https://google.com). +### Pre-requisites -## Installation +You'll need some python package manager installed. +We recommend using [`uv`](https://astral.sh/blog/uv) for all things python. -Using `uv` (recommended): +#### For AWS Provider +1. A valid AWS account with the necessary IAM role to spin up EC2 instances. + This IAM role can either be created by you (assuming you have the appropriate permissions) + or will need to be created by your administrator. +2. The [AWS CLI](https://aws.amazon.com/cli/) installed and configured on your machine. +3. Login using the AWS CLI. + +#### For Kubernetes Provider +1. A Kubernetes cluster with Ray already deployed + - Can be local (minikube/kind), cloud-managed (EKS/GKE/AKS), or on-premise. + - See our [Kubernetes setup guides](./docs/kubernetes/README.md) for detailed instructions +2. Ray cluster running in your Kubernetes cluster + - Must be installed and configured using Helm + - See provider-specific guides for installation steps +3. Daft installed on the Ray cluster +4. `kubectl` installed and configured with the correct context +5. Appropriate permissions to access the namespace where Ray is deployed + +### SSH Key Setup for AWS + +To enable SSH access and port forwarding for AWS clusters, you need to: + +1. Create an SSH key pair (if you don't already have one): + ```bash + # Generate a new key pair + ssh-keygen -t rsa -b 2048 -f ~/.ssh/daft-key + + # This will create: + # ~/.ssh/daft-key (private key) + # ~/.ssh/daft-key.pub (public key) + ``` + +2. Import the public key to AWS: + ```bash + # Import the public key to AWS + aws ec2 import-key-pair \ + --key-name "daft-key" \ + --public-key-material fileb://~/.ssh/daft-key.pub + ``` + +3. Set proper permissions on your private key: + ```bash + chmod 600 ~/.ssh/daft-key + ``` + +4. Update your daft configuration to use this key: + ```toml + [setup.aws] + # ... other aws config ... + ssh-private-key = "~/.ssh/daft-key" # Path to your private key + ssh-user = "ubuntu" # User depends on the AMI (ubuntu for Ubuntu AMIs) + ``` + +Notes: +- The key name in AWS must match the name of your key file (without the extension) +- The private key must be readable only by you (hence the chmod 600) +- Different AMIs use different default users: + - Ubuntu AMIs: use "ubuntu" + - Amazon Linux AMIs: use "ec2-user" + - Make sure this matches your `ssh-user` configuration + +### Installation + +Using `uv`: ```bash # create project @@ -64,32 +139,110 @@ source .venv/bin/activate uv pip install daft-launcher ``` -## Example +### Example Usage -```sh -# create a new configuration file -daft init -``` -That should create a configuration file for you. -Feel free to modify some of the configuration values. -If you have any confusions on a value, you can always run `daft check` to check the syntax and schema of your configuration file. +All interactions with Daft Launcher are primarily communicated via a configuration file. +By default, Daft Launcher will look inside your `$CWD` for a file named `.daft.toml`. +You can override this behaviour by specifying a custom configuration file. + +#### AWS Provider (Default) -Once you're content with your configuration file, go back to your terminal and run the following: +```bash +# create a new AWS configuration file +daft init +# or explicitly specify AWS provider +daft init --provider aws -```sh -# spin your cluster up +# spin up a cluster (AWS only) daft up +# or optionally, pass in a custom config file +daft up -c my-custom-config.toml -# list all the active clusters +# list all active clusters (AWS only) daft list -# submit a directory and command to run on the cluster -# (where `my-job-name` should be an entry in your .daft.toml file) -daft submit my-job-name +# submit a job defined in your config +daft submit --working-dir <...> example-job + +# execute SQL query +daft sql "SELECT * FROM my_table" + +# connect to the Ray dashboard +daft connect + +# SSH into the head node +daft ssh + +# stop the cluster +daft stop + +# terminate the cluster +daft kill +``` + +#### Kubernetes Provider + +```bash +# create a new Kubernetes configuration file +daft init --provider k8s + +# submit a job defined in your config +daft submit example-job + +# execute SQL query (K8s only) +daft sql "SELECT * FROM my_table" +``` + +### Configuration Files + +You can specify a custom configuration file path with the `-c` flag: +```bash +daft -c my-config.toml submit example-job +``` + +Example AWS configuration: +```toml +[setup] +name = "my-daft-cluster" +version = "0.1.0" +provider = "aws" + +[setup.aws] +region = "us-west-2" +number-of-workers = 4 +ssh-user = "ubuntu" +ssh-private-key = "~/.ssh/daft-key" +instance-type = "i3.2xlarge" +image-id = "ami-04dd23e62ed049936" +iam-instance-profile-name = "YourInstanceProfileName" # Optional +dependencies = [] # Optional additional Python packages + +[run] +pre-setup-commands = [] +post-setup-commands = [] + +[[job]] +name = "example-job" +command = "python my_script.py" +working-dir = "~/my_project" +``` + +Example Kubernetes configuration: +```toml +[setup] +name = "my-daft-cluster" +version = "0.1.0" +provider = "k8s" + +[setup.k8s] +namespace = "default" # Optional, defaults to "default" -# run a direct SQL query on daft -daft sql "SELECT * FROM my_table WHERE column = 'value'" +[run] +pre-setup-commands = [] +post-setup-commands = [] -# finally, once you're done, spin the cluster down -daft down +[[job]] +name = "example-job" +command = "python my_script.py" +working-dir = "~/my_project" ``` diff --git a/docs/kubernetes/README.md b/docs/kubernetes/README.md new file mode 100644 index 0000000..2842d0e --- /dev/null +++ b/docs/kubernetes/README.md @@ -0,0 +1,18 @@ +# Kubernetes Setup for Daft + +> **Note**: This documentation is housed in the `daft-launcher` repository while the Kubernetes approach is being reviewed. Once finalized, these docs will be copied to the separate documentation repository. + +This directory contains guides for setting up Ray and Daft on various Kubernetes environments: + +- [Local Development](./local.md) - Setting up a local Kubernetes cluster for development +- [Cloud Providers](./cloud.md) - Instructions for EKS, GKE, and AKS setups +- [On-Premises](./on-prem.md) - Guide for on-premises Kubernetes deployments + +## Prerequisites + +Before using `daft-launcher` with Kubernetes, you must: +1. Have a running Kubernetes cluster (local, cloud-managed, or on-premise) +2. Install and configure Ray on your Kubernetes cluster +3. Install Daft on your cluster + +Please follow the appropriate guide above for your environment. \ No newline at end of file diff --git a/docs/kubernetes/cloud.md b/docs/kubernetes/cloud.md new file mode 100644 index 0000000..0e34ab6 --- /dev/null +++ b/docs/kubernetes/cloud.md @@ -0,0 +1,50 @@ +# Cloud Provider Kubernetes Setup + +This guide covers using Ray and Daft with managed Kubernetes services from major cloud providers. + +## Prerequisites + +### General Requirements +- `kubectl` installed and configured +- `helm` installed +- A running Kubernetes cluster in one of the following cloud providers: + - Amazon Elastic Kubernetes Service (EKS) + - Google Kubernetes Engine (GKE) + - Azure Kubernetes Service (AKS) + +### Cloud-Specific Requirements + +#### For AWS EKS +- AWS CLI installed and configured +- Access to an existing EKS cluster +- `kubectl` configured for your EKS cluster: + ```bash + aws eks update-kubeconfig --name your-cluster-name --region your-region + ``` + +#### For Google GKE +- Google Cloud SDK installed +- Access to an existing GKE cluster +- `kubectl` configured for your GKE cluster: + ```bash + gcloud container clusters get-credentials your-cluster-name --zone your-zone + ``` + +#### For Azure AKS +- Azure CLI installed +- Access to an existing AKS cluster +- `kubectl` configured for your AKS cluster: + ```bash + az aks get-credentials --resource-group your-resource-group --name your-cluster-name + ``` + +## Installing Ray and Daft + +Once your cloud Kubernetes cluster is running and `kubectl` is configured, follow the [Ray Installation Guide](./ray-installation.md) to: +1. Install KubeRay Operator +2. Deploy Ray cluster +3. Install Daft +4. Set up port forwarding +5. Submit test jobs + +> **Note**: For cloud providers, you'll typically use x86/AMD64 images unless you're specifically using ARM-based instances (like AWS Graviton). \ No newline at end of file diff --git a/docs/kubernetes/local.md b/docs/kubernetes/local.md new file mode 100644 index 0000000..130aeab --- /dev/null +++ b/docs/kubernetes/local.md @@ -0,0 +1,127 @@ +# Local Kubernetes Development Setup + +This guide walks you through setting up a local Kubernetes cluster for Daft development. + +## Prerequisites + +- Docker Desktop installed and running +- `kubectl` CLI tool installed +- `helm` installed +- One of the following local Kubernetes solutions: + - Kind (Recommended) + - Minikube + - Docker Desktop's built-in Kubernetes + +## Option 1: Using Kind (Recommended) + +1. Install Kind: + ```bash + # On macOS with Homebrew + brew install kind + + # On Linux + curl -Lo ./kind https://kind.sigs.k8s.io/dl/v0.20.0/kind-linux-amd64 + chmod +x ./kind + sudo mv ./kind /usr/local/bin/kind + ``` + +2. Create a cluster: + ```bash + # For Apple Silicon (M1, M2, M3): + kind create cluster --name daft-dev --config - < **Note**: For Apple Silicon (M1, M2, M3) machines, make sure to use the ARM64-specific Ray image as specified in the installation guide. + +## Resource Requirements + +Local Kubernetes clusters need sufficient resources to run Ray and Daft effectively: + +- Minimum requirements: + - 4 CPU cores + - 8GB RAM + - 20GB disk space + +- Recommended: + - 8 CPU cores + - 16GB RAM + - 40GB disk space + +You can adjust these in Docker Desktop's settings or when starting Minikube. + +## Troubleshooting + +### Resource Issues +- If pods are stuck in `Pending` state: + - For Docker Desktop: Increase resources in Docker Desktop settings + - For Minikube: Start with more resources: `minikube start --cpus 6 --memory 12288` + +### Architecture Issues +- For Apple Silicon users: + - Ensure you're using ARM64-compatible images + - Check Docker Desktop is running in native ARM64 mode + - Verify Kubernetes components are ARM64-compatible + +## Cleanup + +To delete your local cluster: + +```bash +# For Kind +kind delete cluster --name daft-dev + +# For Minikube +minikube delete +``` \ No newline at end of file diff --git a/docs/kubernetes/on-prem.md b/docs/kubernetes/on-prem.md new file mode 100644 index 0000000..fd9258e --- /dev/null +++ b/docs/kubernetes/on-prem.md @@ -0,0 +1,33 @@ +# On-Premises Kubernetes Setup + +This guide covers setting up Ray and Daft on self-managed Kubernetes clusters. + +## Prerequisites + +Before proceeding with Ray and Daft installation, ensure you have: + +- A running Kubernetes cluster (v1.16+) +- `kubectl` installed and configured with access to your cluster +- `helm` installed +- Load balancer solution configured if needed + +## Verifying Cluster Requirements + +1. Check Kubernetes version: + ```bash + kubectl version --short + ``` + +2. Verify cluster nodes: + ```bash + kubectl get nodes + ``` + +## Installing Ray and Daft + +Once your on-premises Kubernetes cluster is ready, follow the [Cloud Provider Setup Guide](./cloud.md#installing-ray-common-steps-for-all-providers) for: +- Installing Ray using Helm +- Installing Daft on the Ray cluster +- Configuring and using daft-launcher + +The installation steps are identical regardless of where your Kubernetes cluster is running. \ No newline at end of file diff --git a/docs/kubernetes/ray-installation.md b/docs/kubernetes/ray-installation.md new file mode 100644 index 0000000..a78d5cc --- /dev/null +++ b/docs/kubernetes/ray-installation.md @@ -0,0 +1,100 @@ +# Installing Ray on Kubernetes + +This guide covers the common steps for installing Ray on Kubernetes using KubeRay, regardless of where your cluster is running (local, cloud, or on-premise). + +## Prerequisites +- A running Kubernetes cluster +- `kubectl` configured with the correct context +- `helm` installed + +## Installation Steps + +1. Add the KubeRay Helm repository: + ```bash + helm repo add kuberay https://ray-project.github.io/kuberay-helm/ + helm repo update + ``` + +2. Install KubeRay Operator: + ```bash + helm install kuberay-operator kuberay/kuberay-operator + ``` + +3. Create a values file (`values.yaml`): + ```yaml + head: + args: ["sudo apt-get update && sudo apt-get install -y curl; curl -LsSf https://astral.sh/uv/install.sh | sh; export PATH=$HOME/.local/bin:$PATH; uv pip install --system getdaft"] + worker: + args: ["sudo apt-get update && sudo apt-get install -y curl; curl -LsSf https://astral.sh/uv/install.sh | sh; export PATH=$HOME/.local/bin:$PATH; uv pip install --system getdaft"] + + rayCluster: + headGroupSpec: + template: + spec: + containers: + - name: ray-head + image: rayproject/ray:2.40.0-py310 # Use the desired Python version + command: ["ray", "start", "--head"] + workerGroupSpecs: + template: + spec: + containers: + - name: ray-worker + image: rayproject/ray:2.40.0-py310 # Same image to ensure compatibility + ``` + +4. Install Ray Cluster: + + For Apple Silicon (M1, M2, M3, M4) or other ARM64 processors (AWS Graviton, etc.): + ```bash + helm install raycluster kuberay/ray-cluster --version 1.2.2 \ + --set 'image.tag=2.40.0-py310-aarch64' \ + -f values.yaml + ``` + + For x86/AMD64 processors: + ```bash + helm install raycluster kuberay/ray-cluster --version 1.2.2 \ + -f values.yaml + ``` + +6. Verify the installation: + ```bash + kubectl get pods + ``` + +## Accessing Ray + +### Port Forwarding +To access the Ray dashboard and submit jobs, set up port forwarding: +```bash +kubectl port-forward service/raycluster-kuberay-head-svc 8265:8265 +``` + +### Ray Dashboard +Once port forwarding is set up, access the dashboard at: +http://localhost:8265 + +### Submitting Jobs +You can submit Ray jobs using the following command: +```bash +ray job submit --address http://localhost:8265 -- python -c "import ray; import daft; ray.init(); print(ray.cluster_resources())" +``` + +## Troubleshooting + +1. Check pod status: + ```bash + kubectl get pods + kubectl describe pod + ``` + +2. View pod logs: + ```bash + kubectl logs + ``` + +3. Common issues: + - If pods are stuck in `Pending` state, check resource availability + - If pods are `CrashLoopBackOff`, check the logs for errors + - For ARM64 issues, ensure you're using the correct image tag with `-aarch64` suffix \ No newline at end of file diff --git a/examples/hello_daft.py b/examples/hello_daft.py new file mode 100644 index 0000000..8102620 --- /dev/null +++ b/examples/hello_daft.py @@ -0,0 +1,27 @@ +import sys +import daft +from daft import DataType, udf + +print(f"Python version: {sys.version}") + + +import datetime +df = daft.from_pydict( + { + "integers": [1, 2, 3, 4], + "floats": [1.5, 2.5, 3.5, 4.5], + "bools": [True, True, False, False], + "strings": ["a", "b", "c", "d"], + "bytes": [b"a", b"b", b"c", b"d"], + "dates": [ + datetime.date(1994, 1, 1), + datetime.date(1994, 1, 2), + datetime.date(1994, 1, 3), + datetime.date(1994, 1, 4), + ], + "lists": [[1, 1, 1], [2, 2, 2], [3, 3, 3], [4, 4, 4]], + "nulls": [None, None, None, None], + } +) + +df.show(2) diff --git a/src/main.rs b/src/main.rs index 86866fa..5f426f7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,3 @@ -mod ssh; -#[cfg(test)] -mod tests; - use std::{ collections::HashMap, io::{Error, ErrorKind}, @@ -10,7 +6,6 @@ use std::{ process::Stdio, str::FromStr, sync::Arc, - thread::{sleep, spawn}, time::Duration, }; @@ -22,11 +17,15 @@ use clap::{Parser, Subcommand}; use comfy_table::{ modifiers, presets, Attribute, Cell, CellAlignment, Color, ContentArrangement, Table, }; -use regex::Regex; +use semver::{Version, VersionReq}; use serde::{Deserialize, Serialize}; use tempdir::TempDir; -use tokio::{fs, process::Command}; -use versions::{Requirement, Versioning}; +use tokio::{ + fs, + io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt, BufReader}, + process::{Child, Command}, + time::timeout, +}; type StrRef = Arc; type PathRef = Arc; @@ -100,13 +99,14 @@ struct Init { /// The path at which to create the config file. #[arg(default_value = ".daft.toml")] path: PathBuf, + + /// The provider to use - either 'aws' (default) to auto-generate a cluster or 'k8s' for existing Kubernetes clusters + #[arg(long, default_value = "aws")] + provider: String, } #[derive(Debug, Parser, Clone, PartialEq, Eq)] struct List { - /// A regex to filter for the Ray clusters which match the given name. - regex: Option, - /// The region which to list all the available clusters for. #[arg(long)] region: Option, @@ -138,10 +138,6 @@ struct Connect { #[arg(long, default_value = "8265")] port: u16, - /// Prevent the dashboard from opening automatically. - #[arg(long)] - no_dashboard: bool, - #[clap(flatten)] config_path: ConfigPath, } @@ -167,11 +163,55 @@ struct ConfigPath { struct DaftConfig { setup: DaftSetup, #[serde(default)] - run: Vec, - #[serde(default, rename = "job", deserialize_with = "parse_jobs")] + run: DaftRun, + #[serde(rename = "job", deserialize_with = "parse_jobs")] jobs: HashMap, } +#[derive(Debug, Deserialize, Clone, PartialEq, Eq)] +#[serde(rename_all = "kebab-case", deny_unknown_fields)] +struct DaftSetup { + name: StrRef, + #[serde(deserialize_with = "parse_version_req")] + version: VersionReq, + provider: DaftProvider, + #[serde(flatten)] + provider_config: ProviderConfig, +} + +#[derive(Debug, Deserialize, Clone, PartialEq, Eq)] +#[serde(rename_all = "kebab-case", deny_unknown_fields)] +enum ProviderConfig { + #[serde(rename = "aws")] + Aws(AwsConfig), + #[serde(rename = "k8s")] + K8s(K8sConfig), +} + +#[derive(Debug, Deserialize, Clone, PartialEq, Eq)] +#[serde(rename_all = "kebab-case", deny_unknown_fields)] +struct AwsConfig { + region: StrRef, + #[serde(default = "default_number_of_workers")] + number_of_workers: usize, + ssh_user: StrRef, + #[serde(deserialize_with = "parse_ssh_private_key")] + ssh_private_key: PathRef, + #[serde(default = "default_instance_type")] + instance_type: StrRef, + #[serde(default = "default_image_id")] + image_id: StrRef, + iam_instance_profile_name: Option, + #[serde(default)] + dependencies: Vec, +} + +#[derive(Debug, Deserialize, Clone, PartialEq, Eq)] +#[serde(rename_all = "kebab-case", deny_unknown_fields)] +struct K8sConfig { + namespace: Option, +} + fn parse_jobs<'de, D>(deserializer: D) -> Result, D::Error> where D: serde::Deserializer<'de>, @@ -202,31 +242,6 @@ where Ok(jobs) } -#[derive(Debug, Deserialize, Clone, PartialEq, Eq)] -#[serde(rename_all = "kebab-case", deny_unknown_fields)] -struct DaftSetup { - name: StrRef, - #[serde(deserialize_with = "parse_daft_launcher_requirement")] - requires: Requirement, - #[serde(deserialize_with = "parse_python_version")] - python_version: Versioning, - #[serde(deserialize_with = "parse_ray_version")] - ray_version: Versioning, - region: StrRef, - #[serde(default = "default_number_of_workers")] - number_of_workers: usize, - ssh_user: StrRef, - #[serde(deserialize_with = "parse_ssh_private_key")] - ssh_private_key: PathRef, - #[serde(default = "default_instance_type")] - instance_type: StrRef, - #[serde(default = "default_image_id")] - image_id: StrRef, - iam_instance_profile_name: Option, - #[serde(default)] - dependencies: Vec, -} - fn parse_ssh_private_key<'de, D>(deserializer: D) -> Result where D: serde::Deserializer<'de>, @@ -274,52 +289,52 @@ fn default_image_id() -> StrRef { "ami-04dd23e62ed049936".into() } -fn parse_python_version<'de, D>(deserializer: D) -> Result +fn parse_version_req<'de, D>(deserializer: D) -> Result where D: serde::Deserializer<'de>, { let raw: StrRef = Deserialize::deserialize(deserializer)?; - let requested_py_version = raw - .parse::() + let version_req = raw + .parse::() .map_err(serde::de::Error::custom)?; - let minimum_py_requirement = ">=3.9" - .parse::() - .expect("Parsing a static, constant version should always succeed"); - - if minimum_py_requirement.matches(&requested_py_version) { - Ok(requested_py_version) + let current_version = env!("CARGO_PKG_VERSION") + .parse::() + .expect("CARGO_PKG_VERSION must exist"); + if version_req.matches(¤t_version) { + Ok(version_req) } else { - Err(serde::de::Error::custom(format!("The minimum supported python version is {minimum_py_requirement}, but your configuration file requested python version {requested_py_version}"))) + Err(serde::de::Error::custom(format!("You're running daft-launcher version {current_version}, but your configuration file requires version {version_req}"))) } } -fn parse_ray_version<'de, D>(deserializer: D) -> Result -where - D: serde::Deserializer<'de>, -{ - let raw: StrRef = Deserialize::deserialize(deserializer)?; - let version = raw.parse().map_err(serde::de::Error::custom)?; - Ok(version) +#[derive(Debug, Deserialize, Clone, PartialEq, Eq)] +#[serde(rename_all = "kebab-case", deny_unknown_fields)] +enum DaftProvider { + Aws, + K8s, } -fn parse_daft_launcher_requirement<'de, D>(deserializer: D) -> Result -where - D: serde::Deserializer<'de>, -{ - let raw: StrRef = Deserialize::deserialize(deserializer)?; - let requested_requirement = raw - .parse::() - .map_err(serde::de::Error::custom)?; - let current_version = env!("CARGO_PKG_VERSION") - .parse::() - .expect("CARGO_PKG_VERSION must exist"); - if requested_requirement.matches(¤t_version) { - Ok(requested_requirement) - } else { - Err(serde::de::Error::custom(format!("You're running daft-launcher version {current_version}, but your configuration file requires version {requested_requirement}"))) +impl FromStr for DaftProvider { + type Err = anyhow::Error; + + fn from_str(s: &str) -> Result { + match s.to_lowercase().as_str() { + "aws" => Ok(DaftProvider::Aws), + "k8s" => Ok(DaftProvider::K8s), + _ => anyhow::bail!("Invalid provider '{}'. Must be either 'aws' or 'k8s'", s), + } } } +#[derive(Default, Debug, Deserialize, Clone, PartialEq, Eq)] +#[serde(rename_all = "kebab-case", deny_unknown_fields)] +struct DaftRun { + #[serde(default)] + pre_setup_commands: Vec, + #[serde(default)] + post_setup_commands: Vec, +} + #[derive(Debug, Clone, PartialEq, Eq)] struct DaftJob { command: StrRef, @@ -365,12 +380,12 @@ struct RayNodeConfig { instance_type: StrRef, image_id: StrRef, #[serde(skip_serializing_if = "Option::is_none")] - iam_instance_profile: Option, + iam_instance_profile: Option, } #[derive(Default, Debug, Serialize, Clone, PartialEq, Eq)] #[serde(rename_all = "PascalCase")] -struct RayIamInstanceProfile { +struct IamInstanceProfile { name: StrRef, } @@ -380,112 +395,10 @@ struct RayResources { cpu: usize, } -fn generate_setup_commands( - python_version: Versioning, - ray_version: Versioning, - dependencies: &[StrRef], -) -> Vec { - let mut commands = vec![ - "curl -LsSf https://astral.sh/uv/install.sh | sh".into(), - format!("uv python install {python_version}").into(), - format!("uv python pin {python_version}").into(), - "uv venv".into(), - "echo 'source $HOME/.venv/bin/activate' >> ~/.bashrc".into(), - "source ~/.bashrc".into(), - format!( - r#"uv pip install boto3 pip py-spy deltalake getdaft "ray[default]=={ray_version}""# - ) - .into(), - ]; - - if !dependencies.is_empty() { - let deps = dependencies - .iter() - .map(|dep| format!(r#""{dep}""#)) - .collect::>() - .join(" "); - let deps = format!("uv pip install {deps}").into(); - commands.push(deps); - } - - commands -} - -fn convert( - daft_config: &DaftConfig, - teardown_behaviour: Option, -) -> anyhow::Result { - let key_name = daft_config - .setup - .ssh_private_key - .clone() - .file_stem() - .ok_or_else(|| { - anyhow::anyhow!(r#"Private key doesn't have a name of the format "name.ext""#) - })? - .to_str() - .ok_or_else(|| { - anyhow::anyhow!( - "The file {:?} does not a valid UTF-8 name", - daft_config.setup.ssh_private_key, - ) - })? - .into(); - let iam_instance_profile = daft_config - .setup - .iam_instance_profile_name - .clone() - .map(|name| RayIamInstanceProfile { name }); - let node_config = RayNodeConfig { - key_name, - instance_type: daft_config.setup.instance_type.clone(), - image_id: daft_config.setup.image_id.clone(), - iam_instance_profile, - }; - Ok(RayConfig { - cluster_name: daft_config.setup.name.clone(), - max_workers: daft_config.setup.number_of_workers, - provider: RayProvider { - r#type: "aws".into(), - region: daft_config.setup.region.clone(), - cache_stopped_nodes: teardown_behaviour.map(TeardownBehaviour::to_cache_stopped_nodes), - }, - auth: RayAuth { - ssh_user: daft_config.setup.ssh_user.clone(), - ssh_private_key: daft_config.setup.ssh_private_key.clone(), - }, - available_node_types: vec![ - ( - "ray.head.default".into(), - RayNodeType { - max_workers: 0, - node_config: node_config.clone(), - resources: Some(RayResources { cpu: 0 }), - }, - ), - ( - "ray.worker.default".into(), - RayNodeType { - max_workers: daft_config.setup.number_of_workers, - node_config, - resources: None, - }, - ), - ] - .into_iter() - .collect(), - setup_commands: generate_setup_commands( - daft_config.setup.python_version.clone(), - daft_config.setup.ray_version.clone(), - daft_config.setup.dependencies.as_ref(), - ), - }) -} - async fn read_and_convert( daft_config_path: &Path, teardown_behaviour: Option, -) -> anyhow::Result<(DaftConfig, RayConfig)> { +) -> anyhow::Result<(DaftConfig, Option)> { let contents = fs::read_to_string(&daft_config_path) .await .map_err(|error| { @@ -498,8 +411,84 @@ async fn read_and_convert( error } })?; + let daft_config = toml::from_str::(&contents)?; - let ray_config = convert(&daft_config, teardown_behaviour)?; + + let ray_config = match &daft_config.setup.provider_config { + ProviderConfig::K8s(_) => None, + ProviderConfig::Aws(aws_config) => { + let key_name = aws_config.ssh_private_key + .clone() + .file_stem() + .ok_or_else(|| anyhow::anyhow!(r#"Private key doesn't have a name of the format "name.ext""#))? + .to_str() + .ok_or_else(|| anyhow::anyhow!("The file {:?} does not have a valid UTF-8 name", aws_config.ssh_private_key))? + .into(); + + let node_config = RayNodeConfig { + key_name, + instance_type: aws_config.instance_type.clone(), + image_id: aws_config.image_id.clone(), + iam_instance_profile: aws_config.iam_instance_profile_name.clone().map(|name| IamInstanceProfile { name }), + }; + + Some(RayConfig { + cluster_name: daft_config.setup.name.clone(), + max_workers: aws_config.number_of_workers, + provider: RayProvider { + r#type: "aws".into(), + region: aws_config.region.clone(), + cache_stopped_nodes: teardown_behaviour.map(TeardownBehaviour::to_cache_stopped_nodes), + }, + auth: RayAuth { + ssh_user: aws_config.ssh_user.clone(), + ssh_private_key: aws_config.ssh_private_key.clone(), + }, + available_node_types: vec![ + ( + "ray.head.default".into(), + RayNodeType { + max_workers: aws_config.number_of_workers, + node_config: node_config.clone(), + resources: Some(RayResources { cpu: 0 }), + }, + ), + ( + "ray.worker.default".into(), + RayNodeType { + max_workers: aws_config.number_of_workers, + node_config, + resources: None, + }, + ), + ] + .into_iter() + .collect(), + setup_commands: { + let mut commands = vec![ + "curl -LsSf https://astral.sh/uv/install.sh | sh".into(), + "uv python install 3.12".into(), + "uv python pin 3.12".into(), + "uv venv".into(), + "echo 'source $HOME/.venv/bin/activate' >> ~/.bashrc".into(), + "source ~/.bashrc".into(), + "uv pip install boto3 pip ray[default] getdaft py-spy deltalake".into(), + ]; + if !aws_config.dependencies.is_empty() { + let deps = aws_config + .dependencies + .iter() + .map(|dep| format!(r#""{dep}""#)) + .collect::>() + .join(" "); + let deps = format!("uv pip install {deps}").into(); + commands.push(deps); + } + commands + }, + }) + } + }; Ok((daft_config, ray_config)) } @@ -583,15 +572,6 @@ pub enum NodeType { Worker, } -impl NodeType { - pub fn as_str(self) -> &'static str { - match self { - Self::Head => "head", - Self::Worker => "worker", - } - } -} - impl FromStr for NodeType { type Err = anyhow::Error; @@ -661,37 +641,24 @@ async fn get_ray_clusters_from_aws(region: StrRef) -> anyhow::Result, - head: bool, - running: bool, -) -> anyhow::Result { +fn print_instances(instances: &[AwsInstance], head: bool, running: bool) { let mut table = Table::default(); table .load_preset(presets::UTF8_FULL) .apply_modifier(modifiers::UTF8_ROUND_CORNERS) .apply_modifier(modifiers::UTF8_SOLID_INNER_BORDERS) .set_content_arrangement(ContentArrangement::DynamicFullWidth) - .set_header( - ["Name", "Instance ID", "Node Type", "Status", "IPv4"].map(|header| { - Cell::new(header) - .set_alignment(CellAlignment::Center) - .add_attribute(Attribute::Bold) - }), - ); - let regex = regex.as_deref().map(Regex::new).transpose()?; + .set_header(["Name", "Instance ID", "Status", "IPv4"].map(|header| { + Cell::new(header) + .set_alignment(CellAlignment::Center) + .add_attribute(Attribute::Bold) + })); for instance in instances.iter().filter(|instance| { if head && instance.node_type != NodeType::Head { return false; } else if running && instance.state != Some(InstanceStateName::Running) { return false; }; - if let Some(regex) = regex.as_ref() { - if !regex.is_match(&instance.regular_name) { - return false; - }; - }; true }) { let status = instance.state.as_ref().map_or_else( @@ -717,13 +684,12 @@ fn format_table( .map_or("n/a".into(), ToString::to_string); table.add_row(vec![ Cell::new(instance.regular_name.to_string()).fg(Color::Cyan), - Cell::new(instance.instance_id.as_ref()), - Cell::new(instance.node_type.as_str()), + Cell::new(&*instance.instance_id), status, Cell::new(ipv4), ]); } - Ok(table) + println!("{table}"); } async fn assert_is_logged_in_with_aws() -> anyhow::Result<()> { @@ -745,81 +711,228 @@ async fn get_region(region: Option, config: impl AsRef) -> anyhow: region } else if config.exists() { let (daft_config, _) = read_and_convert(&config, None).await?; - daft_config.setup.region + match &daft_config.setup.provider_config { + ProviderConfig::Aws(aws_config) => aws_config.region.clone(), + ProviderConfig::K8s(_) => "us-west-2".into(), + } } else { "us-west-2".into() }) } -async fn submit(working_dir: &Path, command_segments: impl AsRef<[&str]>) -> anyhow::Result<()> { - let command_segments = command_segments.as_ref(); +async fn get_head_node_ip(ray_path: impl AsRef) -> anyhow::Result { + let mut ray_command = Command::new("ray") + .arg("get-head-ip") + .arg(ray_path.as_ref()) + .stdout(Stdio::piped()) + .spawn()?; - let exit_status = Command::new("ray") - .env("PYTHONUNBUFFERED", "1") - .args(["job", "submit", "--address", "http://localhost:8265"]) - .arg("--working-dir") - .arg(working_dir) - .arg("--") - .args(command_segments) + let mut tail_command = Command::new("tail") + .args(["-n", "1"]) + .stdin(Stdio::piped()) + .stdout(Stdio::piped()) + .spawn()?; + + let mut writer = tail_command.stdin.take().expect("stdin must exist"); + + tokio::spawn(async move { + let mut reader = BufReader::new(ray_command.stdout.take().expect("stdout must exist")); + let mut buffer = Vec::new(); + reader.read_to_end(&mut buffer).await?; + writer.write_all(&buffer).await?; + Ok::<_, anyhow::Error>(()) + }); + let output = tail_command.wait_with_output().await?; + if !output.status.success() { + anyhow::bail!("Failed to fetch ip address of head node"); + }; + let addr = String::from_utf8_lossy(&output.stdout) + .trim() + .parse::()?; + Ok(addr) +} + +async fn ssh(ray_path: impl AsRef, aws_config: &AwsConfig) -> anyhow::Result<()> { + let addr = get_head_node_ip(ray_path).await?; + let exit_status = Command::new("ssh") + .arg("-i") + .arg(aws_config.ssh_private_key.as_ref()) + .arg(format!("{}@{}", aws_config.ssh_user, addr)) + .kill_on_drop(true) .spawn()? .wait() .await?; + if exit_status.success() { Ok(()) } else { - Err(anyhow::anyhow!("Failed to submit job to the ray cluster")) + Err(anyhow::anyhow!("Failed to ssh into the ray cluster")) } } -async fn get_version_from_env(bin: &str, prefix: &str) -> anyhow::Result { - let output = Command::new(bin) - .arg("--version") - .stdout(Stdio::piped()) +async fn establish_ssh_portforward( + ray_path: impl AsRef, + aws_config: &AwsConfig, + port: Option, +) -> anyhow::Result { + let addr = get_head_node_ip(ray_path).await?; + let port = port.unwrap_or(8265); + let mut child = Command::new("ssh") + .arg("-N") + .arg("-i") + .arg(aws_config.ssh_private_key.as_ref()) + .arg("-L") + .arg(format!("{port}:localhost:8265")) + .arg(format!("{}@{}", aws_config.ssh_user, addr)) + .arg("-v") .stderr(Stdio::piped()) - .spawn()? - .wait_with_output() - .await?; + .kill_on_drop(true) + .spawn()?; + + // We wait for the ssh port-forwarding process to write a specific string to the + // output. + // + // This is a little hacky (and maybe even incorrect across platforms) since we + // are just parsing the output and observing if a specific string has been + // printed. It may be incorrect across platforms because the SSH standard + // does *not* specify a standard "success-message" to printout if the ssh + // port-forward was successful. + timeout(Duration::from_secs(5), { + let stderr = child.stderr.take().expect("stderr must exist"); + async move { + let mut lines = BufReader::new(stderr).lines(); + loop { + let Some(line) = lines.next_line().await? else { + anyhow::bail!("Failed to establish ssh port-forward to {addr}"); + }; + if line.starts_with(format!("Authenticated to {addr}").as_str()) { + break Ok(()); + } + } + } + }) + .await + .map_err(|_| anyhow::anyhow!("Establishing an ssh port-forward to {addr} timed out"))??; - if output.status.success() { - let version = String::from_utf8(output.stdout)? - .strip_prefix(prefix) - .ok_or_else(|| anyhow::anyhow!("Could not parse {bin} version"))? - .trim() - .parse()?; - Ok(version) - } else { - Err(anyhow::anyhow!("Failed to find {bin} executable")) + Ok(child) +} + +struct PortForward { + process: Child, +} + +impl Drop for PortForward { + fn drop(&mut self) { + let _ = self.process.start_kill(); } } -async fn get_python_version_from_env() -> anyhow::Result { - let python_version = get_version_from_env("python", "Python ").await?; - Ok(python_version) +async fn establish_kubernetes_port_forward(namespace: Option<&str>) -> anyhow::Result { + let namespace = namespace.unwrap_or("default"); + let output = Command::new("kubectl") + .arg("get") + .arg("svc") + .arg("-n") + .arg(namespace) + .arg("-l") + .arg("ray.io/node-type=head") + .arg("--no-headers") + .arg("-o") + .arg("custom-columns=:metadata.name") + .output() + .await?; + if !output.status.success() { + return Err(anyhow::anyhow!("Failed to get Ray head node services with kubectl in namespace {}", namespace)); + } + + let stdout = String::from_utf8_lossy(&output.stdout); + if stdout.trim().is_empty() { + return Err(anyhow::anyhow!("Ray head node service not found in namespace {}", namespace)); + } + + let head_node_service_name = stdout + .lines() + .next() + .ok_or_else(|| anyhow::anyhow!("Failed to get the head node service name"))?; + println!("Found Ray head node service: {} in namespace {}", head_node_service_name, namespace); + + // Start port-forward with stderr piped so we can monitor the process + let mut port_forward = Command::new("kubectl") + .arg("port-forward") + .arg("-n") + .arg(namespace) + .arg(format!("svc/{}", head_node_service_name)) + .arg("8265:8265") + .stderr(Stdio::piped()) + .stdout(Stdio::piped()) // Capture stdout too + .kill_on_drop(true) + .spawn()?; + + // Give the port-forward a moment to start and check for immediate failures + tokio::time::sleep(Duration::from_secs(2)).await; + + // Check if process is still running + match port_forward.try_wait()? { + Some(status) => { + return Err(anyhow::anyhow!( + "Port-forward process exited immediately with status: {}", + status + )); + } + None => { + println!("Port-forwarding started successfully"); + Ok(PortForward { + process: port_forward, + }) + } + } } -async fn get_ray_version_from_env() -> anyhow::Result { - let python_version = get_version_from_env("ray", "ray, version ").await?; - Ok(python_version) +async fn submit_k8s( + working_dir: &Path, + command_segments: impl AsRef<[&str]>, + namespace: Option<&str>, +) -> anyhow::Result<()> { + let command_segments = command_segments.as_ref(); + + // Start port forwarding - it will be automatically killed when _port_forward is dropped + let _port_forward = establish_kubernetes_port_forward(namespace).await?; + + // Give the port-forward a moment to fully establish + tokio::time::sleep(Duration::from_secs(1)).await; + + // Submit the job + let exit_status = Command::new("ray") + .env("PYTHONUNBUFFERED", "1") + .args(["job", "submit", "--address", "http://localhost:8265"]) + .arg("--working-dir") + .arg(working_dir) + .arg("--") + .args(command_segments) + .spawn()? + .wait() + .await?; + + if exit_status.success() { + Ok(()) + } else { + Err(anyhow::anyhow!("Failed to submit job to the ray cluster")) + } } async fn run(daft_launcher: DaftLauncher) -> anyhow::Result<()> { match daft_launcher.sub_command { - SubCommand::Init(Init { path }) => { + SubCommand::Init(Init { path, provider }) => { #[cfg(not(test))] if path.exists() { bail!("The path {path:?} already exists; the path given must point to a new location on your filesystem"); } - let contents = include_str!("template.toml"); - let contents = contents - .replace("", concat!("=", env!("CARGO_PKG_VERSION"))) - .replace( - "", - get_python_version_from_env().await?.to_string().as_str(), - ) - .replace( - "", - get_ray_version_from_env().await?.to_string().as_str(), - ); + let contents = if provider == "k8s" { + include_str!("template_k8s.toml") + } else { + include_str!("template.toml") + } + .replace("", env!("CARGO_PKG_VERSION")); fs::write(path, contents).await?; } SubCommand::Check(ConfigPath { config }) => { @@ -827,116 +940,186 @@ async fn run(daft_launcher: DaftLauncher) -> anyhow::Result<()> { } SubCommand::Export(ConfigPath { config }) => { let (_, ray_config) = read_and_convert(&config, None).await?; + if ray_config.is_none() { + anyhow::bail!("Failed to find Ray config in config file"); + } + let ray_config = ray_config.unwrap(); let ray_config_str = serde_yaml::to_string(&ray_config)?; + println!("{ray_config_str}"); } SubCommand::Up(ConfigPath { config }) => { - let (_, ray_config) = read_and_convert(&config, None).await?; - assert_is_logged_in_with_aws().await?; + let (daft_config, ray_config) = read_and_convert(&config, None).await?; + match daft_config.setup.provider { + DaftProvider::Aws => { + if ray_config.is_none() { + anyhow::bail!("Failed to find Ray config in config file"); + } + let ray_config = ray_config.unwrap(); + assert_is_logged_in_with_aws().await?; - let (_temp_dir, ray_path) = create_temp_ray_file()?; - write_ray_config(ray_config, &ray_path).await?; - run_ray_up_or_down_command(SpinDirection::Up, ray_path).await?; + let (_temp_dir, ray_path) = create_temp_ray_file()?; + write_ray_config(ray_config, &ray_path).await?; + run_ray_up_or_down_command(SpinDirection::Up, ray_path).await?; + } + DaftProvider::K8s => { + anyhow::bail!("'up' command is only available for AWS configurations"); + } + } } SubCommand::List(List { - regex, config_path, region, head, running, }) => { - assert_is_logged_in_with_aws().await?; - - let region = get_region(region, &config_path.config).await?; - let instances = get_ray_clusters_from_aws(region).await?; - let table = format_table(&instances, regex, head, running)?; - println!("{table}"); + let (daft_config, _) = read_and_convert(&config_path.config, None).await?; + match daft_config.setup.provider { + DaftProvider::Aws => { + assert_is_logged_in_with_aws().await?; + let aws_config = get_aws_config(&daft_config)?; + let region = region.unwrap_or_else(|| aws_config.region.clone()); + let instances = get_ray_clusters_from_aws(region).await?; + print_instances(&instances, head, running); + } + DaftProvider::K8s => { + anyhow::bail!("'list' command is only available for AWS configurations"); + } + } } SubCommand::Submit(Submit { config_path, job_name, }) => { let (daft_config, ray_config) = read_and_convert(&config_path.config, None).await?; - assert_is_logged_in_with_aws().await?; let daft_job = daft_config .jobs .get(&job_name) .ok_or_else(|| anyhow::anyhow!("A job with the name {job_name} was not found"))?; - let (_temp_dir, ray_path) = create_temp_ray_file()?; - write_ray_config(ray_config, &ray_path).await?; - let _child = ssh::ssh_portforward(ray_path, &daft_config, None).await?; - submit( - daft_job.working_dir.as_ref(), - daft_job.command.as_ref().split(' ').collect::>(), - ) - .await?; + match &daft_config.setup.provider_config { + ProviderConfig::Aws(_aws_config) => { + if ray_config.is_none() { + anyhow::bail!("Failed to find Ray config in config file"); + } + let ray_config = ray_config.unwrap(); + let (_temp_dir, ray_path) = create_temp_ray_file()?; + write_ray_config(ray_config, &ray_path).await?; + submit_k8s( + daft_job.working_dir.as_ref(), + daft_job.command.as_ref().split(' ').collect::>(), + None, + ) + .await?; + } + ProviderConfig::K8s(k8s_config) => { + submit_k8s( + daft_job.working_dir.as_ref(), + daft_job.command.as_ref().split(' ').collect::>(), + k8s_config.namespace.as_deref(), + ) + .await?; + } + } } - SubCommand::Connect(Connect { - port, - no_dashboard, - config_path, - }) => { + SubCommand::Connect(Connect { port, config_path }) => { let (daft_config, ray_config) = read_and_convert(&config_path.config, None).await?; - assert_is_logged_in_with_aws().await?; - - let (_temp_dir, ray_path) = create_temp_ray_file()?; - write_ray_config(ray_config, &ray_path).await?; - let open_join_handle = if !no_dashboard { - Some(spawn(|| { - sleep(Duration::from_millis(500)); - open::that("http://localhost:8265")?; - Ok::<_, anyhow::Error>(()) - })) - } else { - None - }; - - let _ = ssh::ssh_portforward(ray_path, &daft_config, Some(port)) - .await? - .wait_with_output() - .await?; - - if let Some(open_join_handle) = open_join_handle { - open_join_handle - .join() - .map_err(|_| anyhow::anyhow!("Failed to join browser-opening thread"))??; - }; + match daft_config.setup.provider { + DaftProvider::Aws => { + if ray_config.is_none() { + anyhow::bail!("Failed to find Ray config in config file"); + } + let ray_config = ray_config.unwrap(); + assert_is_logged_in_with_aws().await?; + + let (_temp_dir, ray_path) = create_temp_ray_file()?; + write_ray_config(ray_config, &ray_path).await?; + let aws_config = get_aws_config(&daft_config)?; + let _ = establish_ssh_portforward(ray_path, aws_config, Some(port)) + .await? + .wait_with_output() + .await?; + } + DaftProvider::K8s => { + anyhow::bail!("'connect' command is only available for AWS configurations"); + } + } } SubCommand::Ssh(ConfigPath { config }) => { let (daft_config, ray_config) = read_and_convert(&config, None).await?; - assert_is_logged_in_with_aws().await?; + match daft_config.setup.provider { + DaftProvider::Aws => { + if ray_config.is_none() { + anyhow::bail!("Failed to find Ray config in config file"); + } + let ray_config = ray_config.unwrap(); + assert_is_logged_in_with_aws().await?; - let (_temp_dir, ray_path) = create_temp_ray_file()?; - write_ray_config(ray_config, &ray_path).await?; - ssh::ssh(ray_path, &daft_config).await?; + let (_temp_dir, ray_path) = create_temp_ray_file()?; + write_ray_config(ray_config, &ray_path).await?; + let aws_config = get_aws_config(&daft_config)?; + ssh(ray_path, aws_config).await?; + } + DaftProvider::K8s => { + anyhow::bail!("'ssh' command is only available for AWS configurations"); + } + } } SubCommand::Sql(Sql { sql, config_path }) => { - let (daft_config, ray_config) = read_and_convert(&config_path.config, None).await?; - assert_is_logged_in_with_aws().await?; - - let (_temp_dir, ray_path) = create_temp_ray_file()?; - write_ray_config(ray_config, &ray_path).await?; - let _child = ssh::ssh_portforward(ray_path, &daft_config, None).await?; - let (temp_sql_dir, sql_path) = create_temp_file("sql.py")?; - fs::write(sql_path, include_str!("sql.py")).await?; - submit(temp_sql_dir.path(), vec!["python", "sql.py", sql.as_ref()]).await?; + let (daft_config, _) = read_and_convert(&config_path.config, None).await?; + match &daft_config.setup.provider_config { + ProviderConfig::Aws(_) => { + anyhow::bail!("'sql' command is only available for Kubernetes configurations"); + } + ProviderConfig::K8s(k8s_config) => { + let (temp_sql_dir, sql_path) = create_temp_file("sql.py")?; + fs::write(sql_path, include_str!("sql.py")).await?; + submit_k8s( + temp_sql_dir.path(), + vec!["python", "sql.py", sql.as_ref()], + k8s_config.namespace.as_deref(), + ) + .await?; + } + } } SubCommand::Stop(ConfigPath { config }) => { - let (_, ray_config) = read_and_convert(&config, Some(TeardownBehaviour::Stop)).await?; - assert_is_logged_in_with_aws().await?; + let (daft_config, ray_config) = read_and_convert(&config, Some(TeardownBehaviour::Stop)).await?; + match daft_config.setup.provider { + DaftProvider::Aws => { + if ray_config.is_none() { + anyhow::bail!("Failed to find Ray config in config file"); + } + let ray_config = ray_config.unwrap(); + assert_is_logged_in_with_aws().await?; - let (_temp_dir, ray_path) = create_temp_ray_file()?; - write_ray_config(ray_config, &ray_path).await?; - run_ray_up_or_down_command(SpinDirection::Down, ray_path).await?; + let (_temp_dir, ray_path) = create_temp_ray_file()?; + write_ray_config(ray_config, &ray_path).await?; + run_ray_up_or_down_command(SpinDirection::Down, ray_path).await?; + } + DaftProvider::K8s => { + anyhow::bail!("'stop' command is only available for AWS configurations"); + } + } } SubCommand::Kill(ConfigPath { config }) => { - let (_, ray_config) = read_and_convert(&config, Some(TeardownBehaviour::Kill)).await?; - assert_is_logged_in_with_aws().await?; + let (daft_config, ray_config) = read_and_convert(&config, Some(TeardownBehaviour::Kill)).await?; + match daft_config.setup.provider { + DaftProvider::Aws => { + if ray_config.is_none() { + anyhow::bail!("Failed to find Ray config in config file"); + } + let ray_config = ray_config.unwrap(); + assert_is_logged_in_with_aws().await?; - let (_temp_dir, ray_path) = create_temp_ray_file()?; - write_ray_config(ray_config, &ray_path).await?; - run_ray_up_or_down_command(SpinDirection::Down, ray_path).await?; + let (_temp_dir, ray_path) = create_temp_ray_file()?; + write_ray_config(ray_config, &ray_path).await?; + run_ray_up_or_down_command(SpinDirection::Down, ray_path).await?; + } + DaftProvider::K8s => { + anyhow::bail!("'kill' command is only available for AWS configurations"); + } + } } } @@ -947,3 +1130,37 @@ async fn run(daft_launcher: DaftLauncher) -> anyhow::Result<()> { async fn main() -> anyhow::Result<()> { run(DaftLauncher::parse()).await } + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn test_init_and_export() { + run(DaftLauncher { + sub_command: SubCommand::Init(Init { + path: ".daft.toml".into(), + provider: "aws".into(), + }), + verbosity: 0, + }) + .await + .unwrap(); + run(DaftLauncher { + sub_command: SubCommand::Check(ConfigPath { + config: ".daft.toml".into(), + }), + verbosity: 0, + }) + .await + .unwrap(); + } +} + +// Helper function to get AWS config +fn get_aws_config(config: &DaftConfig) -> anyhow::Result<&AwsConfig> { + match &config.setup.provider_config { + ProviderConfig::Aws(aws_config) => Ok(aws_config), + ProviderConfig::K8s(_) => anyhow::bail!("Expected AWS configuration but found Kubernetes configuration"), + } +} diff --git a/src/template.toml b/src/template.toml index 69c291d..ac1227a 100644 --- a/src/template.toml +++ b/src/template.toml @@ -1,33 +1,27 @@ -# This is a default configuration file that you can use to spin up a ray-cluster using `daft-launcher`. -# Change up some of the configurations in here, and then run `daft up`. -# -# For more information on the availale commands and configuration options, visit [here](https://eventual-inc.github.io/daft-launcher). -# -# Happy daft-ing 🚀! - +# This is a template configuration file for daft-launcher with AWS provider [setup] -name = "daft-launcher-example" -requires = "" -python-version = "" -ray-version = "" +name = "my-daft-cluster" +version = "" +provider = "aws" + +# AWS-specific configuration +[setup.aws] region = "us-west-2" number-of-workers = 4 - -# The following configurations specify the type of servers in your cluster. -# The machine type below is what we usually use at Eventual, and the image id is Ubuntu based. -# If you want a smaller or bigger cluster, change the below two configurations accordingly. +ssh-user = "ubuntu" +ssh-private-key = "~/.ssh/id_rsa" instance-type = "i3.2xlarge" image-id = "ami-04dd23e62ed049936" +iam-instance-profile-name = "YourInstanceProfileName" # Optional +dependencies = [] # Optional additional Python packages to install -# This is the user profile that ssh's into the head machine. -# This value depends upon the `image-id` value up above. -# For Ubuntu AMIs, keep it as 'ubuntu'; for AWS AMIs, change it to 'ec2-user'. -ssh-user = "ubuntu" - -# Fill this out with your custom `.pem` key, or generate a new one by running `ssh-keygen -t rsa -b 2048 -m PEM -f my-key.pem`. -# Make sure the public key is uploaded to AWS. -ssh-private-key = "~/.ssh/my-keypair.pem" +# Run configuration (optional) +[run] +pre-setup-commands = [] +post-setup-commands = [] -# Fill in your python dependencies here. -# They'll be downloaded using `uv`. -dependencies = [] +# Job definitions +[[job]] +name = "example-job" +command = "python my_script.py" +working-dir = "~/my_project" \ No newline at end of file diff --git a/src/template_k8s.toml b/src/template_k8s.toml new file mode 100644 index 0000000..ded06c0 --- /dev/null +++ b/src/template_k8s.toml @@ -0,0 +1,19 @@ +# This is a template configuration file for daft-launcher with Kubernetes provider +[setup] +name = "my-daft-cluster" +version = "" +provider = "k8s" + +[setup.k8s] +namespace = "default" # Optional, defaults to "default" + +# Run configuration (optional) +[run] +pre-setup-commands = [] +post-setup-commands = [] + +# Job definitions +[[job]] +name = "example-job" +command = "python my_script.py" +working-dir = "~/my_project" \ No newline at end of file diff --git a/template.toml b/template.toml new file mode 100644 index 0000000..6d0751e --- /dev/null +++ b/template.toml @@ -0,0 +1,17 @@ +# This is a default configuration file that you can use to connect to an existing Kubernetes cluster running Ray (BYOC). +# Change up some of the configurations in here, and then run `daft up`. +# +# For more information on the availale commands and configuration options, visit [here](https://eventual-inc.github.io/daft-launcher). +# +# Happy daft-ing! + +[setup] +provider = "k8s" + +# They'll be downloaded using `uv`. +dependencies = [] + +[[job]] +name = "my-job" +command = "python hello_daft.py" +working-dir = "working-dir" \ No newline at end of file From 926c8236062300fe52ba442cf79542117c13c48f Mon Sep 17 00:00:00 2001 From: Jessie Young Date: Tue, 21 Jan 2025 17:36:13 -0800 Subject: [PATCH 2/5] Addressed PR comments, command groups, provisioned and byoc instead of aws and k8s --- Cargo.lock | 1 + Cargo.toml | 1 + README.md | 152 ++--- docs/{kubernetes => byoc}/README.md | 8 +- docs/{kubernetes => byoc}/cloud.md | 0 docs/{kubernetes => byoc}/local.md | 0 docs/{kubernetes => byoc}/on-prem.md | 0 docs/{kubernetes => byoc}/ray-installation.md | 0 src/main.rs | 632 ++++++++++-------- src/{template_k8s.toml => template_byoc.toml} | 12 +- ...emplate.toml => template_provisioned.toml} | 15 +- src/tests.rs | 26 + template.toml | 17 - 13 files changed, 456 insertions(+), 408 deletions(-) rename docs/{kubernetes => byoc}/README.md (63%) rename docs/{kubernetes => byoc}/cloud.md (100%) rename docs/{kubernetes => byoc}/local.md (100%) rename docs/{kubernetes => byoc}/on-prem.md (100%) rename docs/{kubernetes => byoc}/ray-installation.md (100%) rename src/{template_k8s.toml => template_byoc.toml} (52%) rename src/{template.toml => template_provisioned.toml} (67%) delete mode 100644 template.toml diff --git a/Cargo.lock b/Cargo.lock index 1ae5054..d846af6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -620,6 +620,7 @@ dependencies = [ "open", "regex", "rstest", + "semver", "serde", "serde_yaml", "tempdir", diff --git a/Cargo.toml b/Cargo.toml index a65f1ee..160c8af 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ toml = "0.8" comfy-table = "7.1" regex = "1.11" open = "5.3" +semver = "1.0" [dependencies.anyhow] version = "1.0" diff --git a/README.md b/README.md index 587d60d..d5f09a2 100644 --- a/README.md +++ b/README.md @@ -25,33 +25,37 @@ Daft Launcher aims to solve this problem by providing a simple CLI tool to remov ## Capabilities What Daft Launcher is capable of: -1. Spinning up clusters (AWS only) -2. Listing all available clusters as well as their statuses (AWS only) -3. Submitting jobs to a cluster (AWS and Kubernetes) -4. Connecting to the cluster (AWS only, Kubernetes coming soon) -5. Spinning down clusters (AWS only) -6. Creating configuration files (AWS and Kubernetes) -7. Running raw SQL statements (AWS only, Kubernetes coming soon) +1. Spinning up clusters (Provisioned mode only) +2. Listing all available clusters as well as their statuses (Provisioned mode only) +3. Submitting jobs to a cluster (Both Provisioned and BYOC modes) +4. Connecting to the cluster (Provisioned mode only) +5. Spinning down clusters (Provisioned mode only) +6. Creating configuration files (Both modes) +7. Running raw SQL statements (BYOC mode only) -## Currently supported cloud providers +## Operation Modes Daft Launcher supports two modes of operation: -- **AWS**: For automatically provisioning and managing Ray clusters in AWS -- **Kubernetes**: For connecting to existing Ray clusters in Kubernetes - -### Command Support Matrix - -| Command | AWS | Kubernetes | -|----------|-----|------------| -| init | ✅ | ✅ | -| up | ✅ | ❌ | -| submit | ✅ | ✅ | -| stop | ✅ | ❌ | -| kill | ✅ | ❌ | -| list | ✅ | ❌ | -| connect | ✅ | ❌ | -| ssh | ✅ | ❌ | -| sql | ✅ | ❌ | +- **Provisioned**: Automatically provisions and manages Ray clusters in AWS +- **BYOC (Bring Your Own Cluster)**: Connects to existing Ray clusters in Kubernetes + +### Command Groups and Support Matrix + +| Command Group | Command | Provisioned | BYOC | +|--------------|---------|-------------|------| +| cluster | up | ✅ | ❌ | +| | down | ✅ | ❌ | +| | kill | ✅ | ❌ | +| | list | ✅ | ❌ | +| | connect | ✅ | ❌ | +| | ssh | ✅ | ❌ | +| job | submit | ✅ | ✅ | +| | sql | ✅ | ❌ | +| | status | ✅ | ❌ | +| | logs | ✅ | ❌ | +| config | init | ✅ | ✅ | +| | check | ✅ | ❌ | +| | export | ✅ | ❌ | ## Usage @@ -60,17 +64,17 @@ Daft Launcher supports two modes of operation: You'll need some python package manager installed. We recommend using [`uv`](https://astral.sh/blog/uv) for all things python. -#### For AWS Provider +#### For Provisioned Mode (AWS) 1. A valid AWS account with the necessary IAM role to spin up EC2 instances. This IAM role can either be created by you (assuming you have the appropriate permissions) or will need to be created by your administrator. 2. The [AWS CLI](https://aws.amazon.com/cli/) installed and configured on your machine. 3. Login using the AWS CLI. -#### For Kubernetes Provider +#### For BYOC Mode (Kubernetes) 1. A Kubernetes cluster with Ray already deployed - Can be local (minikube/kind), cloud-managed (EKS/GKE/AKS), or on-premise. - - See our [Kubernetes setup guides](./docs/kubernetes/README.md) for detailed instructions + - See our [BYOC setup guides](./docs/byoc/README.md) for detailed instructions 2. Ray cluster running in your Kubernetes cluster - Must be installed and configured using Helm - See provider-specific guides for installation steps @@ -78,9 +82,9 @@ We recommend using [`uv`](https://astral.sh/blog/uv) for all things python. 4. `kubectl` installed and configured with the correct context 5. Appropriate permissions to access the namespace where Ray is deployed -### SSH Key Setup for AWS +### SSH Key Setup for Provisioned Mode -To enable SSH access and port forwarding for AWS clusters, you need to: +To enable SSH access and port forwarding for provisioned clusters, you need to: 1. Create an SSH key pair (if you don't already have one): ```bash @@ -107,8 +111,8 @@ To enable SSH access and port forwarding for AWS clusters, you need to: 4. Update your daft configuration to use this key: ```toml - [setup.aws] - # ... other aws config ... + [setup.provisioned] + # ... other config ... ssh-private-key = "~/.ssh/daft-key" # Path to your private key ssh-user = "ubuntu" # User depends on the AMI (ubuntu for Ubuntu AMIs) ``` @@ -145,69 +149,55 @@ All interactions with Daft Launcher are primarily communicated via a configurati By default, Daft Launcher will look inside your `$CWD` for a file named `.daft.toml`. You can override this behaviour by specifying a custom configuration file. -#### AWS Provider (Default) +#### Provisioned Mode (AWS) ```bash -# create a new AWS configuration file -daft init -# or explicitly specify AWS provider -daft init --provider aws - -# spin up a cluster (AWS only) -daft up -# or optionally, pass in a custom config file -daft up -c my-custom-config.toml - -# list all active clusters (AWS only) -daft list - -# submit a job defined in your config -daft submit --working-dir <...> example-job - -# execute SQL query -daft sql "SELECT * FROM my_table" - -# connect to the Ray dashboard -daft connect - -# SSH into the head node -daft ssh - -# stop the cluster -daft stop - -# terminate the cluster -daft kill +# Initialize a new provisioned mode configuration +daft config init --provider provisioned +# or use the default provider (provisioned) +daft config init + +# Cluster management +daft provisioned up +daft provisioned list +daft provisioned connect +daft provisioned ssh +daft provisioned down +daft provisioned kill + +# Job management (works in both modes) +daft job submit example-job +daft job status example-job +daft job logs example-job + +# Configuration management +daft config check +daft config export ``` -#### Kubernetes Provider +#### BYOC Mode (Kubernetes) ```bash -# create a new Kubernetes configuration file -daft init --provider k8s - -# submit a job defined in your config -daft submit example-job - -# execute SQL query (K8s only) -daft sql "SELECT * FROM my_table" +# Initialize a new BYOC mode configuration +daft config init --provider byoc ``` ### Configuration Files You can specify a custom configuration file path with the `-c` flag: ```bash -daft -c my-config.toml submit example-job +daft -c my-config.toml job submit example-job ``` -Example AWS configuration: +Example Provisioned mode configuration: ```toml [setup] name = "my-daft-cluster" version = "0.1.0" -provider = "aws" +provider = "provisioned" +dependencies = [] # Optional additional Python packages to install -[setup.aws] +[setup.provisioned] region = "us-west-2" number-of-workers = 4 ssh-user = "ubuntu" @@ -215,7 +205,6 @@ ssh-private-key = "~/.ssh/daft-key" instance-type = "i3.2xlarge" image-id = "ami-04dd23e62ed049936" iam-instance-profile-name = "YourInstanceProfileName" # Optional -dependencies = [] # Optional additional Python packages [run] pre-setup-commands = [] @@ -227,20 +216,17 @@ command = "python my_script.py" working-dir = "~/my_project" ``` -Example Kubernetes configuration: +Example BYOC mode configuration: ```toml [setup] name = "my-daft-cluster" version = "0.1.0" -provider = "k8s" +provider = "byoc" +dependencies = [] # Optional additional Python packages to install -[setup.k8s] +[setup.byoc] namespace = "default" # Optional, defaults to "default" -[run] -pre-setup-commands = [] -post-setup-commands = [] - [[job]] name = "example-job" command = "python my_script.py" diff --git a/docs/kubernetes/README.md b/docs/byoc/README.md similarity index 63% rename from docs/kubernetes/README.md rename to docs/byoc/README.md index 2842d0e..4debdd0 100644 --- a/docs/kubernetes/README.md +++ b/docs/byoc/README.md @@ -1,8 +1,6 @@ -# Kubernetes Setup for Daft +# BYOC (Bring Your Own Cluster) Mode Setup for Daft -> **Note**: This documentation is housed in the `daft-launcher` repository while the Kubernetes approach is being reviewed. Once finalized, these docs will be copied to the separate documentation repository. - -This directory contains guides for setting up Ray and Daft on various Kubernetes environments: +This directory contains guides for setting up Ray and Daft on various Kubernetes environments for BYOC mode: - [Local Development](./local.md) - Setting up a local Kubernetes cluster for development - [Cloud Providers](./cloud.md) - Instructions for EKS, GKE, and AKS setups @@ -10,7 +8,7 @@ This directory contains guides for setting up Ray and Daft on various Kubernetes ## Prerequisites -Before using `daft-launcher` with Kubernetes, you must: +Before using `daft-launcher` in BYOC mode with Kubernetes, you must: 1. Have a running Kubernetes cluster (local, cloud-managed, or on-premise) 2. Install and configure Ray on your Kubernetes cluster 3. Install Daft on your cluster diff --git a/docs/kubernetes/cloud.md b/docs/byoc/cloud.md similarity index 100% rename from docs/kubernetes/cloud.md rename to docs/byoc/cloud.md diff --git a/docs/kubernetes/local.md b/docs/byoc/local.md similarity index 100% rename from docs/kubernetes/local.md rename to docs/byoc/local.md diff --git a/docs/kubernetes/on-prem.md b/docs/byoc/on-prem.md similarity index 100% rename from docs/kubernetes/on-prem.md rename to docs/byoc/on-prem.md diff --git a/docs/kubernetes/ray-installation.md b/docs/byoc/ray-installation.md similarity index 100% rename from docs/kubernetes/ray-installation.md rename to docs/byoc/ray-installation.md diff --git a/src/main.rs b/src/main.rs index 5f426f7..6cea310 100644 --- a/src/main.rs +++ b/src/main.rs @@ -43,55 +43,84 @@ struct DaftLauncher { #[derive(Debug, Subcommand, Clone, PartialEq, Eq)] enum SubCommand { - /// Initialize a daft-launcher configuration file. - /// - /// If no path is provided, this will create a default ".daft.toml" in the - /// current working directory. - Init(Init), - - /// Check to make sure the daft-launcher configuration file is correct. - Check(ConfigPath), + /// Manage Daft-provisioned clusters (AWS) + Provisioned(ProvisionedCommands), + /// Manage existing clusters (Kubernetes) + Byoc(ByocCommands), + /// Manage jobs across all cluster types + Job(JobCommands), + /// Manage configurations + Config(ConfigCommands), +} - /// Export the daft-launcher configuration file to a Ray configuration file. - Export(ConfigPath), +#[derive(Debug, Parser, Clone, PartialEq, Eq)] +struct ProvisionedCommands { + #[command(subcommand)] + command: ProvisionedCommand, +} - /// Spin up a new cluster. +#[derive(Debug, Subcommand, Clone, PartialEq, Eq)] +enum ProvisionedCommand { + /// Create a new cluster Up(ConfigPath), - - /// List all Ray clusters in your AWS account. - /// - /// This will *only* list clusters that have been spun up by Ray. + /// Stop a running cluster + Down(ConfigPath), + /// Terminate a cluster + Kill(ConfigPath), + /// List all clusters List(List), + /// Connect to cluster dashboard + Connect(Connect), + /// SSH into cluster head node + Ssh(ConfigPath), +} - /// Submit a job to the Ray cluster. - /// - /// The configurations of the job should be placed inside of your - /// daft-launcher configuration file. - Submit(Submit), +#[derive(Debug, Parser, Clone, PartialEq, Eq)] +struct ByocCommands { + #[command(subcommand)] + command: ByocCommand, +} - /// Establish an ssh port-forward connection from your local machine to the - /// Ray cluster. - Connect(Connect), +#[derive(Debug, Subcommand, Clone, PartialEq, Eq)] +enum ByocCommand { + /// Verify connection to existing cluster + Verify(ConfigPath), + /// Show cluster information + Info(ConfigPath), +} - /// SSH into the head of the remote Ray cluster. - Ssh(ConfigPath), +#[derive(Debug, Parser, Clone, PartialEq, Eq)] +struct JobCommands { + #[command(subcommand)] + command: JobCommand, +} - /// Submit a SQL query string to the Ray cluster. - /// - /// This is executed using Daft's SQL API support. +#[derive(Debug, Subcommand, Clone, PartialEq, Eq)] +enum JobCommand { + /// Submit a job to the cluster + Submit(Submit), + /// Execute SQL queries Sql(Sql), + /// Check job status + Status(ConfigPath), + /// View job logs + Logs(ConfigPath), +} - /// Spin down a given cluster and put the nodes to "sleep". - /// - /// This will *not* delete the nodes, only stop them. The nodes can be - /// restarted at a future time. - Stop(ConfigPath), +#[derive(Debug, Parser, Clone, PartialEq, Eq)] +struct ConfigCommands { + #[command(subcommand)] + command: ConfigCommand, +} - /// Spin down a given cluster and fully terminate the nodes. - /// - /// This *will* delete the nodes; they will not be accessible from here on - /// out. - Kill(ConfigPath), +#[derive(Debug, Subcommand, Clone, PartialEq, Eq)] +enum ConfigCommand { + /// Initialize a new configuration + Init(Init), + /// Validate configuration + Check(ConfigPath), + /// Export configuration to Ray format + Export(ConfigPath), } #[derive(Debug, Parser, Clone, PartialEq, Eq)] @@ -100,8 +129,8 @@ struct Init { #[arg(default_value = ".daft.toml")] path: PathBuf, - /// The provider to use - either 'aws' (default) to auto-generate a cluster or 'k8s' for existing Kubernetes clusters - #[arg(long, default_value = "aws")] + /// The provider to use - either 'provisioned' (default) to auto-generate a cluster or 'byoc' for existing Kubernetes clusters + #[arg(long, default_value = "provisioned")] provider: String, } @@ -162,8 +191,6 @@ struct ConfigPath { #[serde(rename_all = "kebab-case", deny_unknown_fields)] struct DaftConfig { setup: DaftSetup, - #[serde(default)] - run: DaftRun, #[serde(rename = "job", deserialize_with = "parse_jobs")] jobs: HashMap, } @@ -175,6 +202,8 @@ struct DaftSetup { #[serde(deserialize_with = "parse_version_req")] version: VersionReq, provider: DaftProvider, + #[serde(default)] + dependencies: Vec, #[serde(flatten)] provider_config: ProviderConfig, } @@ -182,10 +211,17 @@ struct DaftSetup { #[derive(Debug, Deserialize, Clone, PartialEq, Eq)] #[serde(rename_all = "kebab-case", deny_unknown_fields)] enum ProviderConfig { - #[serde(rename = "aws")] - Aws(AwsConfig), - #[serde(rename = "k8s")] - K8s(K8sConfig), + #[serde(rename = "provisioned")] + Provisioned(AwsConfigWithRun), + #[serde(rename = "byoc")] + Byoc(K8sConfig), +} + +#[derive(Debug, Deserialize, Clone, PartialEq, Eq)] +#[serde(rename_all = "kebab-case", deny_unknown_fields)] +struct AwsConfigWithRun { + #[serde(flatten)] + config: AwsConfig, } #[derive(Debug, Deserialize, Clone, PartialEq, Eq)] @@ -202,8 +238,6 @@ struct AwsConfig { #[serde(default = "default_image_id")] image_id: StrRef, iam_instance_profile_name: Option, - #[serde(default)] - dependencies: Vec, } #[derive(Debug, Deserialize, Clone, PartialEq, Eq)] @@ -310,8 +344,10 @@ where #[derive(Debug, Deserialize, Clone, PartialEq, Eq)] #[serde(rename_all = "kebab-case", deny_unknown_fields)] enum DaftProvider { - Aws, - K8s, + #[serde(rename = "provisioned")] + Provisioned, + #[serde(rename = "byoc")] + Byoc, } impl FromStr for DaftProvider { @@ -319,22 +355,13 @@ impl FromStr for DaftProvider { fn from_str(s: &str) -> Result { match s.to_lowercase().as_str() { - "aws" => Ok(DaftProvider::Aws), - "k8s" => Ok(DaftProvider::K8s), - _ => anyhow::bail!("Invalid provider '{}'. Must be either 'aws' or 'k8s'", s), + "provisioned" => Ok(DaftProvider::Provisioned), + "byoc" => Ok(DaftProvider::Byoc), + _ => anyhow::bail!("Invalid provider '{}'. Must be either 'provisioned' or 'byoc'", s), } } } -#[derive(Default, Debug, Deserialize, Clone, PartialEq, Eq)] -#[serde(rename_all = "kebab-case", deny_unknown_fields)] -struct DaftRun { - #[serde(default)] - pre_setup_commands: Vec, - #[serde(default)] - post_setup_commands: Vec, -} - #[derive(Debug, Clone, PartialEq, Eq)] struct DaftJob { command: StrRef, @@ -415,40 +442,40 @@ async fn read_and_convert( let daft_config = toml::from_str::(&contents)?; let ray_config = match &daft_config.setup.provider_config { - ProviderConfig::K8s(_) => None, - ProviderConfig::Aws(aws_config) => { - let key_name = aws_config.ssh_private_key + ProviderConfig::Byoc(_) => None, + ProviderConfig::Provisioned(aws_config) => { + let key_name = aws_config.config.ssh_private_key .clone() .file_stem() .ok_or_else(|| anyhow::anyhow!(r#"Private key doesn't have a name of the format "name.ext""#))? .to_str() - .ok_or_else(|| anyhow::anyhow!("The file {:?} does not have a valid UTF-8 name", aws_config.ssh_private_key))? + .ok_or_else(|| anyhow::anyhow!("The file {:?} does not have a valid UTF-8 name", aws_config.config.ssh_private_key))? .into(); let node_config = RayNodeConfig { key_name, - instance_type: aws_config.instance_type.clone(), - image_id: aws_config.image_id.clone(), - iam_instance_profile: aws_config.iam_instance_profile_name.clone().map(|name| IamInstanceProfile { name }), + instance_type: aws_config.config.instance_type.clone(), + image_id: aws_config.config.image_id.clone(), + iam_instance_profile: aws_config.config.iam_instance_profile_name.clone().map(|name| IamInstanceProfile { name }), }; Some(RayConfig { cluster_name: daft_config.setup.name.clone(), - max_workers: aws_config.number_of_workers, + max_workers: aws_config.config.number_of_workers, provider: RayProvider { r#type: "aws".into(), - region: aws_config.region.clone(), + region: aws_config.config.region.clone(), cache_stopped_nodes: teardown_behaviour.map(TeardownBehaviour::to_cache_stopped_nodes), }, auth: RayAuth { - ssh_user: aws_config.ssh_user.clone(), - ssh_private_key: aws_config.ssh_private_key.clone(), + ssh_user: aws_config.config.ssh_user.clone(), + ssh_private_key: aws_config.config.ssh_private_key.clone(), }, available_node_types: vec![ ( "ray.head.default".into(), RayNodeType { - max_workers: aws_config.number_of_workers, + max_workers: aws_config.config.number_of_workers, node_config: node_config.clone(), resources: Some(RayResources { cpu: 0 }), }, @@ -456,7 +483,7 @@ async fn read_and_convert( ( "ray.worker.default".into(), RayNodeType { - max_workers: aws_config.number_of_workers, + max_workers: aws_config.config.number_of_workers, node_config, resources: None, }, @@ -474,9 +501,8 @@ async fn read_and_convert( "source ~/.bashrc".into(), "uv pip install boto3 pip ray[default] getdaft py-spy deltalake".into(), ]; - if !aws_config.dependencies.is_empty() { - let deps = aws_config - .dependencies + if !daft_config.setup.dependencies.is_empty() { + let deps = daft_config.setup.dependencies .iter() .map(|dep| format!(r#""{dep}""#)) .collect::>() @@ -712,8 +738,8 @@ async fn get_region(region: Option, config: impl AsRef) -> anyhow: } else if config.exists() { let (daft_config, _) = read_and_convert(&config, None).await?; match &daft_config.setup.provider_config { - ProviderConfig::Aws(aws_config) => aws_config.region.clone(), - ProviderConfig::K8s(_) => "us-west-2".into(), + ProviderConfig::Provisioned(aws_config) => aws_config.config.region.clone(), + ProviderConfig::Byoc(_) => "us-west-2".into(), } } else { "us-west-2".into() @@ -922,245 +948,281 @@ async fn submit_k8s( async fn run(daft_launcher: DaftLauncher) -> anyhow::Result<()> { match daft_launcher.sub_command { - SubCommand::Init(Init { path, provider }) => { - #[cfg(not(test))] - if path.exists() { - bail!("The path {path:?} already exists; the path given must point to a new location on your filesystem"); - } - let contents = if provider == "k8s" { - include_str!("template_k8s.toml") - } else { - include_str!("template.toml") - } - .replace("", env!("CARGO_PKG_VERSION")); - fs::write(path, contents).await?; + SubCommand::Config(config_cmd) => { + config_cmd.command.run(daft_launcher.verbosity).await } - SubCommand::Check(ConfigPath { config }) => { - let _ = read_and_convert(&config, None).await?; + SubCommand::Job(job_cmd) => { + job_cmd.command.run(daft_launcher.verbosity).await } - SubCommand::Export(ConfigPath { config }) => { - let (_, ray_config) = read_and_convert(&config, None).await?; - if ray_config.is_none() { - anyhow::bail!("Failed to find Ray config in config file"); - } - let ray_config = ray_config.unwrap(); - let ray_config_str = serde_yaml::to_string(&ray_config)?; - - println!("{ray_config_str}"); + SubCommand::Provisioned(provisioned_cmd) => { + provisioned_cmd.command.run(daft_launcher.verbosity).await } - SubCommand::Up(ConfigPath { config }) => { - let (daft_config, ray_config) = read_and_convert(&config, None).await?; - match daft_config.setup.provider { - DaftProvider::Aws => { - if ray_config.is_none() { - anyhow::bail!("Failed to find Ray config in config file"); - } - let ray_config = ray_config.unwrap(); - assert_is_logged_in_with_aws().await?; + SubCommand::Byoc(byoc_cmd) => { + byoc_cmd.command.run(daft_launcher.verbosity).await + } + } +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + run(DaftLauncher::parse()).await +} + +// Helper function to get AWS config +fn get_aws_config(config: &DaftConfig) -> anyhow::Result<&AwsConfig> { + match &config.setup.provider_config { + ProviderConfig::Provisioned(aws_config) => Ok(&aws_config.config), + ProviderConfig::Byoc(_) => anyhow::bail!("Expected provisioned configuration but found Kubernetes configuration"), + } +} - let (_temp_dir, ray_path) = create_temp_ray_file()?; - write_ray_config(ray_config, &ray_path).await?; - run_ray_up_or_down_command(SpinDirection::Up, ray_path).await?; +impl ConfigCommand { + async fn run(&self, _verbosity: u8) -> anyhow::Result<()> { + match self { + ConfigCommand::Init(Init { path, provider }) => { + #[cfg(not(test))] + if path.exists() { + bail!("The path {path:?} already exists; the path given must point to a new location on your filesystem"); } - DaftProvider::K8s => { - anyhow::bail!("'up' command is only available for AWS configurations"); + let contents = if provider == "byoc" { + include_str!("template_byoc.toml") + } else { + include_str!("template_provisioned.toml") } + .replace("", env!("CARGO_PKG_VERSION")); + fs::write(path, contents).await?; } - } - SubCommand::List(List { - config_path, - region, - head, - running, - }) => { - let (daft_config, _) = read_and_convert(&config_path.config, None).await?; - match daft_config.setup.provider { - DaftProvider::Aws => { - assert_is_logged_in_with_aws().await?; - let aws_config = get_aws_config(&daft_config)?; - let region = region.unwrap_or_else(|| aws_config.region.clone()); - let instances = get_ray_clusters_from_aws(region).await?; - print_instances(&instances, head, running); - } - DaftProvider::K8s => { - anyhow::bail!("'list' command is only available for AWS configurations"); + ConfigCommand::Check(ConfigPath { config }) => { + let _ = read_and_convert(&config, None).await?; + } + ConfigCommand::Export(ConfigPath { config }) => { + let (_, ray_config) = read_and_convert(&config, None).await?; + if ray_config.is_none() { + anyhow::bail!("Failed to find Ray config in config file"); } + let ray_config = ray_config.unwrap(); + let ray_config_str = serde_yaml::to_string(&ray_config)?; + println!("{ray_config_str}"); } } - SubCommand::Submit(Submit { - config_path, - job_name, - }) => { - let (daft_config, ray_config) = read_and_convert(&config_path.config, None).await?; - let daft_job = daft_config - .jobs - .get(&job_name) - .ok_or_else(|| anyhow::anyhow!("A job with the name {job_name} was not found"))?; - - match &daft_config.setup.provider_config { - ProviderConfig::Aws(_aws_config) => { - if ray_config.is_none() { - anyhow::bail!("Failed to find Ray config in config file"); + Ok(()) + } +} + +impl JobCommand { + async fn run(&self, _verbosity: u8) -> anyhow::Result<()> { + match self { + JobCommand::Submit(Submit { config_path, job_name }) => { + let (daft_config, ray_config) = read_and_convert(&config_path.config, None).await?; + let daft_job = daft_config + .jobs + .get(job_name) + .ok_or_else(|| anyhow::anyhow!("A job with the name {job_name} was not found"))?; + + match &daft_config.setup.provider_config { + ProviderConfig::Provisioned(_) => { + if ray_config.is_none() { + anyhow::bail!("Failed to find Ray config in config file"); + } + let ray_config = ray_config.unwrap(); + let (_temp_dir, ray_path) = create_temp_ray_file()?; + write_ray_config(ray_config, &ray_path).await?; + + let aws_config = get_aws_config(&daft_config)?; + // Start port forwarding - it will be automatically killed when _port_forward is dropped + let _port_forward = establish_ssh_portforward(ray_path, aws_config, Some(8265)).await?; + + // Give the port-forward a moment to fully establish + tokio::time::sleep(Duration::from_secs(1)).await; + + // Submit the job + let exit_status = Command::new("ray") + .env("PYTHONUNBUFFERED", "1") + .args(["job", "submit", "--address", "http://localhost:8265"]) + .arg("--working-dir") + .arg(daft_job.working_dir.as_ref()) + .arg("--") + .args(daft_job.command.as_ref().split(' ').collect::>()) + .spawn()? + .wait() + .await?; + + if !exit_status.success() { + anyhow::bail!("Failed to submit job to the ray cluster"); + } + } + ProviderConfig::Byoc(k8s_config) => { + submit_k8s( + daft_job.working_dir.as_ref(), + daft_job.command.as_ref().split(' ').collect::>(), + k8s_config.namespace.as_deref(), + ) + .await?; } - let ray_config = ray_config.unwrap(); - let (_temp_dir, ray_path) = create_temp_ray_file()?; - write_ray_config(ray_config, &ray_path).await?; - submit_k8s( - daft_job.working_dir.as_ref(), - daft_job.command.as_ref().split(' ').collect::>(), - None, - ) - .await?; - } - ProviderConfig::K8s(k8s_config) => { - submit_k8s( - daft_job.working_dir.as_ref(), - daft_job.command.as_ref().split(' ').collect::>(), - k8s_config.namespace.as_deref(), - ) - .await?; } } - } - SubCommand::Connect(Connect { port, config_path }) => { - let (daft_config, ray_config) = read_and_convert(&config_path.config, None).await?; - match daft_config.setup.provider { - DaftProvider::Aws => { - if ray_config.is_none() { - anyhow::bail!("Failed to find Ray config in config file"); + JobCommand::Sql(Sql { sql, config_path }) => { + let (daft_config, _) = read_and_convert(&config_path.config, None).await?; + match &daft_config.setup.provider_config { + ProviderConfig::Provisioned(_) => { + anyhow::bail!("'sql' command is only available for BYOC configurations"); } - let ray_config = ray_config.unwrap(); - assert_is_logged_in_with_aws().await?; - - let (_temp_dir, ray_path) = create_temp_ray_file()?; - write_ray_config(ray_config, &ray_path).await?; - let aws_config = get_aws_config(&daft_config)?; - let _ = establish_ssh_portforward(ray_path, aws_config, Some(port)) - .await? - .wait_with_output() + ProviderConfig::Byoc(k8s_config) => { + let (temp_sql_dir, sql_path) = create_temp_file("sql.py")?; + fs::write(sql_path, include_str!("sql.py")).await?; + submit_k8s( + temp_sql_dir.path(), + vec!["python", "sql.py", sql.as_ref()], + k8s_config.namespace.as_deref(), + ) .await?; + } } - DaftProvider::K8s => { - anyhow::bail!("'connect' command is only available for AWS configurations"); - } + } + JobCommand::Status(_) => { + anyhow::bail!("Job status command not yet implemented"); + } + JobCommand::Logs(_) => { + anyhow::bail!("Job logs command not yet implemented"); } } - SubCommand::Ssh(ConfigPath { config }) => { - let (daft_config, ray_config) = read_and_convert(&config, None).await?; - match daft_config.setup.provider { - DaftProvider::Aws => { - if ray_config.is_none() { - anyhow::bail!("Failed to find Ray config in config file"); - } - let ray_config = ray_config.unwrap(); - assert_is_logged_in_with_aws().await?; + Ok(()) + } +} - let (_temp_dir, ray_path) = create_temp_ray_file()?; - write_ray_config(ray_config, &ray_path).await?; - let aws_config = get_aws_config(&daft_config)?; - ssh(ray_path, aws_config).await?; - } - DaftProvider::K8s => { - anyhow::bail!("'ssh' command is only available for AWS configurations"); +impl ProvisionedCommand { + async fn run(&self, _verbosity: u8) -> anyhow::Result<()> { + match self { + ProvisionedCommand::Up(ConfigPath { config }) => { + let (daft_config, ray_config) = read_and_convert(&config, None).await?; + match daft_config.setup.provider { + DaftProvider::Provisioned => { + if ray_config.is_none() { + anyhow::bail!("Failed to find Ray config in config file"); + } + let ray_config = ray_config.unwrap(); + assert_is_logged_in_with_aws().await?; + + let (_temp_dir, ray_path) = create_temp_ray_file()?; + write_ray_config(ray_config, &ray_path).await?; + run_ray_up_or_down_command(SpinDirection::Up, ray_path).await?; + } + DaftProvider::Byoc => { + anyhow::bail!("'up' command is only available for provisioned configurations"); + } } } - } - SubCommand::Sql(Sql { sql, config_path }) => { - let (daft_config, _) = read_and_convert(&config_path.config, None).await?; - match &daft_config.setup.provider_config { - ProviderConfig::Aws(_) => { - anyhow::bail!("'sql' command is only available for Kubernetes configurations"); - } - ProviderConfig::K8s(k8s_config) => { - let (temp_sql_dir, sql_path) = create_temp_file("sql.py")?; - fs::write(sql_path, include_str!("sql.py")).await?; - submit_k8s( - temp_sql_dir.path(), - vec!["python", "sql.py", sql.as_ref()], - k8s_config.namespace.as_deref(), - ) - .await?; + ProvisionedCommand::Down(ConfigPath { config }) => { + let (daft_config, ray_config) = read_and_convert(&config, Some(TeardownBehaviour::Stop)).await?; + match daft_config.setup.provider { + DaftProvider::Provisioned => { + if ray_config.is_none() { + anyhow::bail!("Failed to find Ray config in config file"); + } + let ray_config = ray_config.unwrap(); + assert_is_logged_in_with_aws().await?; + + let (_temp_dir, ray_path) = create_temp_ray_file()?; + write_ray_config(ray_config, &ray_path).await?; + run_ray_up_or_down_command(SpinDirection::Down, ray_path).await?; + } + DaftProvider::Byoc => { + anyhow::bail!("'down' command is only available for provisioned configurations"); + } } } - } - SubCommand::Stop(ConfigPath { config }) => { - let (daft_config, ray_config) = read_and_convert(&config, Some(TeardownBehaviour::Stop)).await?; - match daft_config.setup.provider { - DaftProvider::Aws => { - if ray_config.is_none() { - anyhow::bail!("Failed to find Ray config in config file"); + ProvisionedCommand::Kill(ConfigPath { config }) => { + let (daft_config, ray_config) = read_and_convert(&config, Some(TeardownBehaviour::Kill)).await?; + match daft_config.setup.provider { + DaftProvider::Provisioned => { + if ray_config.is_none() { + anyhow::bail!("Failed to find Ray config in config file"); + } + let ray_config = ray_config.unwrap(); + assert_is_logged_in_with_aws().await?; + + let (_temp_dir, ray_path) = create_temp_ray_file()?; + write_ray_config(ray_config, &ray_path).await?; + run_ray_up_or_down_command(SpinDirection::Down, ray_path).await?; + } + DaftProvider::Byoc => { + anyhow::bail!("'kill' command is only available for provisioned configurations"); } - let ray_config = ray_config.unwrap(); - assert_is_logged_in_with_aws().await?; - - let (_temp_dir, ray_path) = create_temp_ray_file()?; - write_ray_config(ray_config, &ray_path).await?; - run_ray_up_or_down_command(SpinDirection::Down, ray_path).await?; } - DaftProvider::K8s => { - anyhow::bail!("'stop' command is only available for AWS configurations"); + } + ProvisionedCommand::List(List { config_path, region, head, running }) => { + let (daft_config, _) = read_and_convert(&config_path.config, None).await?; + match daft_config.setup.provider { + DaftProvider::Provisioned => { + assert_is_logged_in_with_aws().await?; + let aws_config = get_aws_config(&daft_config)?; + let region = region.as_ref().unwrap_or_else(|| &aws_config.region); + let instances = get_ray_clusters_from_aws(region.clone()).await?; + print_instances(&instances, *head, *running); + } + DaftProvider::Byoc => { + anyhow::bail!("'list' command is only available for provisioned configurations"); + } } } - } - SubCommand::Kill(ConfigPath { config }) => { - let (daft_config, ray_config) = read_and_convert(&config, Some(TeardownBehaviour::Kill)).await?; - match daft_config.setup.provider { - DaftProvider::Aws => { - if ray_config.is_none() { - anyhow::bail!("Failed to find Ray config in config file"); + ProvisionedCommand::Connect(Connect { port, config_path }) => { + let (daft_config, ray_config) = read_and_convert(&config_path.config, None).await?; + match daft_config.setup.provider { + DaftProvider::Provisioned => { + if ray_config.is_none() { + anyhow::bail!("Failed to find Ray config in config file"); + } + let ray_config = ray_config.unwrap(); + assert_is_logged_in_with_aws().await?; + + let (_temp_dir, ray_path) = create_temp_ray_file()?; + write_ray_config(ray_config, &ray_path).await?; + let aws_config = get_aws_config(&daft_config)?; + let _ = establish_ssh_portforward(ray_path, aws_config, Some(*port)) + .await? + .wait_with_output() + .await?; + } + DaftProvider::Byoc => { + anyhow::bail!("'connect' command is only available for provisioned configurations"); } - let ray_config = ray_config.unwrap(); - assert_is_logged_in_with_aws().await?; - - let (_temp_dir, ray_path) = create_temp_ray_file()?; - write_ray_config(ray_config, &ray_path).await?; - run_ray_up_or_down_command(SpinDirection::Down, ray_path).await?; } - DaftProvider::K8s => { - anyhow::bail!("'kill' command is only available for AWS configurations"); + } + ProvisionedCommand::Ssh(ConfigPath { config }) => { + let (daft_config, ray_config) = read_and_convert(&config, None).await?; + match daft_config.setup.provider { + DaftProvider::Provisioned => { + if ray_config.is_none() { + anyhow::bail!("Failed to find Ray config in config file"); + } + let ray_config = ray_config.unwrap(); + assert_is_logged_in_with_aws().await?; + + let (_temp_dir, ray_path) = create_temp_ray_file()?; + write_ray_config(ray_config, &ray_path).await?; + let aws_config = get_aws_config(&daft_config)?; + ssh(ray_path, aws_config).await?; + } + DaftProvider::Byoc => { + anyhow::bail!("'ssh' command is only available for provisioned configurations"); + } } } } - } - - Ok(()) -} - -#[tokio::main] -async fn main() -> anyhow::Result<()> { - run(DaftLauncher::parse()).await -} - -#[cfg(test)] -mod tests { - use super::*; - - #[tokio::test] - async fn test_init_and_export() { - run(DaftLauncher { - sub_command: SubCommand::Init(Init { - path: ".daft.toml".into(), - provider: "aws".into(), - }), - verbosity: 0, - }) - .await - .unwrap(); - run(DaftLauncher { - sub_command: SubCommand::Check(ConfigPath { - config: ".daft.toml".into(), - }), - verbosity: 0, - }) - .await - .unwrap(); + Ok(()) } } -// Helper function to get AWS config -fn get_aws_config(config: &DaftConfig) -> anyhow::Result<&AwsConfig> { - match &config.setup.provider_config { - ProviderConfig::Aws(aws_config) => Ok(aws_config), - ProviderConfig::K8s(_) => anyhow::bail!("Expected AWS configuration but found Kubernetes configuration"), +impl ByocCommand { + async fn run(&self, _verbosity: u8) -> anyhow::Result<()> { + match self { + ByocCommand::Verify(ConfigPath { config: _ }) => { + anyhow::bail!("Verify command not yet implemented"); + } + ByocCommand::Info(ConfigPath { config: _ }) => { + anyhow::bail!("Info command not yet implemented"); + } + } + Ok(()) } } diff --git a/src/template_k8s.toml b/src/template_byoc.toml similarity index 52% rename from src/template_k8s.toml rename to src/template_byoc.toml index ded06c0..e70adc0 100644 --- a/src/template_k8s.toml +++ b/src/template_byoc.toml @@ -1,17 +1,13 @@ -# This is a template configuration file for daft-launcher with Kubernetes provider +# This is a template configuration file for daft-launcher with BYOC provider [setup] name = "my-daft-cluster" version = "" -provider = "k8s" +provider = "byoc" +# TODO: support dependencies -[setup.k8s] +[setup.byoc] namespace = "default" # Optional, defaults to "default" -# Run configuration (optional) -[run] -pre-setup-commands = [] -post-setup-commands = [] - # Job definitions [[job]] name = "example-job" diff --git a/src/template.toml b/src/template_provisioned.toml similarity index 67% rename from src/template.toml rename to src/template_provisioned.toml index ac1227a..4299fbf 100644 --- a/src/template.toml +++ b/src/template_provisioned.toml @@ -1,11 +1,12 @@ -# This is a template configuration file for daft-launcher with AWS provider +# This is a template configuration file for daft-launcher with provisioned provider [setup] name = "my-daft-cluster" version = "" -provider = "aws" +provider = "provisioned" +dependencies = [] # Optional additional Python packages to install -# AWS-specific configuration -[setup.aws] +# Provisioned (AWS) configuration +[setup.provisioned] region = "us-west-2" number-of-workers = 4 ssh-user = "ubuntu" @@ -13,12 +14,6 @@ ssh-private-key = "~/.ssh/id_rsa" instance-type = "i3.2xlarge" image-id = "ami-04dd23e62ed049936" iam-instance-profile-name = "YourInstanceProfileName" # Optional -dependencies = [] # Optional additional Python packages to install - -# Run configuration (optional) -[run] -pre-setup-commands = [] -post-setup-commands = [] # Job definitions [[job]] diff --git a/src/tests.rs b/src/tests.rs index 5aad7cc..960bb21 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -1,6 +1,7 @@ use tokio::fs; use super::*; +use crate::{ConfigCommand, ConfigCommands, ConfigPath, DaftLauncher, Init, SubCommand}; fn not_found_okay(result: std::io::Result<()>) -> std::io::Result<()> { match result { @@ -189,3 +190,28 @@ pub fn simple_config() -> (DaftConfig, Option, RayConfig) { (daft_config, None, ray_config) } + +#[tokio::test] +async fn test_init_and_export() { + crate::run(DaftLauncher { + sub_command: SubCommand::Config(ConfigCommands { + command: ConfigCommand::Init(Init { + path: ".daft.toml".into(), + provider: "provisioned".into(), + }), + }), + verbosity: 0, + }) + .await + .unwrap(); + crate::run(DaftLauncher { + sub_command: SubCommand::Config(ConfigCommands { + command: ConfigCommand::Check(ConfigPath { + config: ".daft.toml".into(), + }), + }), + verbosity: 0, + }) + .await + .unwrap(); +} diff --git a/template.toml b/template.toml deleted file mode 100644 index 6d0751e..0000000 --- a/template.toml +++ /dev/null @@ -1,17 +0,0 @@ -# This is a default configuration file that you can use to connect to an existing Kubernetes cluster running Ray (BYOC). -# Change up some of the configurations in here, and then run `daft up`. -# -# For more information on the availale commands and configuration options, visit [here](https://eventual-inc.github.io/daft-launcher). -# -# Happy daft-ing! - -[setup] -provider = "k8s" - -# They'll be downloaded using `uv`. -dependencies = [] - -[[job]] -name = "my-job" -command = "python hello_daft.py" -working-dir = "working-dir" \ No newline at end of file From 9e50b05a8ff7d2fd01f59f1c39de8e076058d96d Mon Sep 17 00:00:00 2001 From: Jessie Young Date: Tue, 21 Jan 2025 17:44:25 -0800 Subject: [PATCH 3/5] switched to DaftProvider enum --- src/main.rs | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/src/main.rs b/src/main.rs index 6cea310..a9787ad 100644 --- a/src/main.rs +++ b/src/main.rs @@ -130,8 +130,8 @@ struct Init { path: PathBuf, /// The provider to use - either 'provisioned' (default) to auto-generate a cluster or 'byoc' for existing Kubernetes clusters - #[arg(long, default_value = "provisioned")] - provider: String, + #[arg(long, default_value_t = DaftProvider::Provisioned)] + provider: DaftProvider, } #[derive(Debug, Parser, Clone, PartialEq, Eq)] @@ -362,6 +362,15 @@ impl FromStr for DaftProvider { } } +impl ToString for DaftProvider { + fn to_string(&self) -> String { + match self { + DaftProvider::Provisioned => "provisioned".to_string(), + DaftProvider::Byoc => "byoc".to_string(), + } + } +} + #[derive(Debug, Clone, PartialEq, Eq)] struct DaftJob { command: StrRef, @@ -984,10 +993,9 @@ impl ConfigCommand { if path.exists() { bail!("The path {path:?} already exists; the path given must point to a new location on your filesystem"); } - let contents = if provider == "byoc" { - include_str!("template_byoc.toml") - } else { - include_str!("template_provisioned.toml") + let contents = match provider { + DaftProvider::Byoc => include_str!("template_byoc.toml"), + DaftProvider::Provisioned => include_str!("template_provisioned.toml"), } .replace("", env!("CARGO_PKG_VERSION")); fs::write(path, contents).await?; From 631f86f0a0d56b2a6ea1eb33b2aa13fb54a70710 Mon Sep 17 00:00:00 2001 From: Jessie Young Date: Wed, 22 Jan 2025 19:48:42 -0800 Subject: [PATCH 4/5] Addressed PR comments --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/main.rs | 85 +++++++++++++++++++++++++++------------------------- src/tests.rs | 26 ---------------- 4 files changed, 47 insertions(+), 68 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d846af6..b5d1835 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -608,7 +608,7 @@ dependencies = [ ] [[package]] -name = "daft" +name = "daft-launcher" version = "0.4.1-alpha0" dependencies = [ "anyhow", diff --git a/Cargo.toml b/Cargo.toml index 160c8af..666632f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "daft" +name = "daft-launcher" version = "0.4.1-alpha0" edition = "2021" description = "A simple launcher for spinning up and managing Ray clusters for Daft" diff --git a/src/main.rs b/src/main.rs index a9787ad..276e335 100644 --- a/src/main.rs +++ b/src/main.rs @@ -9,6 +9,9 @@ use std::{ time::Duration, }; +#[cfg(test)] +mod tests; + #[cfg(not(test))] use anyhow::bail; use aws_config::{BehaviorVersion, Region}; @@ -191,7 +194,7 @@ struct ConfigPath { #[serde(rename_all = "kebab-case", deny_unknown_fields)] struct DaftConfig { setup: DaftSetup, - #[serde(rename = "job", deserialize_with = "parse_jobs")] + #[serde(default, rename = "job", deserialize_with = "parse_jobs")] jobs: HashMap, } @@ -243,7 +246,8 @@ struct AwsConfig { #[derive(Debug, Deserialize, Clone, PartialEq, Eq)] #[serde(rename_all = "kebab-case", deny_unknown_fields)] struct K8sConfig { - namespace: Option, + #[serde(default = "default_k8s_namespace")] + namespace: StrRef, } fn parse_jobs<'de, D>(deserializer: D) -> Result, D::Error> @@ -323,6 +327,10 @@ fn default_image_id() -> StrRef { "ami-04dd23e62ed049936".into() } +fn default_k8s_namespace() -> StrRef { + "default".into() +} + fn parse_version_req<'de, D>(deserializer: D) -> Result where D: serde::Deserializer<'de>, @@ -344,9 +352,7 @@ where #[derive(Debug, Deserialize, Clone, PartialEq, Eq)] #[serde(rename_all = "kebab-case", deny_unknown_fields)] enum DaftProvider { - #[serde(rename = "provisioned")] Provisioned, - #[serde(rename = "byoc")] Byoc, } @@ -862,8 +868,39 @@ impl Drop for PortForward { } } -async fn establish_kubernetes_port_forward(namespace: Option<&str>) -> anyhow::Result { - let namespace = namespace.unwrap_or("default"); +async fn submit_k8s( + working_dir: &Path, + command_segments: impl AsRef<[&str]>, + namespace: &str, +) -> anyhow::Result<()> { + let command_segments = command_segments.as_ref(); + + // Start port forwarding - it will be automatically killed when _port_forward is dropped + let _port_forward = establish_kubernetes_port_forward(namespace).await?; + + // Give the port-forward a moment to fully establish + tokio::time::sleep(Duration::from_secs(1)).await; + + // Submit the job + let exit_status = Command::new("ray") + .env("PYTHONUNBUFFERED", "1") + .args(["job", "submit", "--address", "http://localhost:8265"]) + .arg("--working-dir") + .arg(working_dir) + .arg("--") + .args(command_segments) + .spawn()? + .wait() + .await?; + + if exit_status.success() { + Ok(()) + } else { + Err(anyhow::anyhow!("Failed to submit job to the ray cluster")) + } +} + +async fn establish_kubernetes_port_forward(namespace: &str) -> anyhow::Result { let output = Command::new("kubectl") .arg("get") .arg("svc") @@ -923,38 +960,6 @@ async fn establish_kubernetes_port_forward(namespace: Option<&str>) -> anyhow::R } } -async fn submit_k8s( - working_dir: &Path, - command_segments: impl AsRef<[&str]>, - namespace: Option<&str>, -) -> anyhow::Result<()> { - let command_segments = command_segments.as_ref(); - - // Start port forwarding - it will be automatically killed when _port_forward is dropped - let _port_forward = establish_kubernetes_port_forward(namespace).await?; - - // Give the port-forward a moment to fully establish - tokio::time::sleep(Duration::from_secs(1)).await; - - // Submit the job - let exit_status = Command::new("ray") - .env("PYTHONUNBUFFERED", "1") - .args(["job", "submit", "--address", "http://localhost:8265"]) - .arg("--working-dir") - .arg(working_dir) - .arg("--") - .args(command_segments) - .spawn()? - .wait() - .await?; - - if exit_status.success() { - Ok(()) - } else { - Err(anyhow::anyhow!("Failed to submit job to the ray cluster")) - } -} - async fn run(daft_launcher: DaftLauncher) -> anyhow::Result<()> { match daft_launcher.sub_command { SubCommand::Config(config_cmd) => { @@ -1063,7 +1068,7 @@ impl JobCommand { submit_k8s( daft_job.working_dir.as_ref(), daft_job.command.as_ref().split(' ').collect::>(), - k8s_config.namespace.as_deref(), + k8s_config.namespace.as_ref(), ) .await?; } @@ -1081,7 +1086,7 @@ impl JobCommand { submit_k8s( temp_sql_dir.path(), vec!["python", "sql.py", sql.as_ref()], - k8s_config.namespace.as_deref(), + k8s_config.namespace.as_ref(), ) .await?; } diff --git a/src/tests.rs b/src/tests.rs index 960bb21..5aad7cc 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -1,7 +1,6 @@ use tokio::fs; use super::*; -use crate::{ConfigCommand, ConfigCommands, ConfigPath, DaftLauncher, Init, SubCommand}; fn not_found_okay(result: std::io::Result<()>) -> std::io::Result<()> { match result { @@ -190,28 +189,3 @@ pub fn simple_config() -> (DaftConfig, Option, RayConfig) { (daft_config, None, ray_config) } - -#[tokio::test] -async fn test_init_and_export() { - crate::run(DaftLauncher { - sub_command: SubCommand::Config(ConfigCommands { - command: ConfigCommand::Init(Init { - path: ".daft.toml".into(), - provider: "provisioned".into(), - }), - }), - verbosity: 0, - }) - .await - .unwrap(); - crate::run(DaftLauncher { - sub_command: SubCommand::Config(ConfigCommands { - command: ConfigCommand::Check(ConfigPath { - config: ".daft.toml".into(), - }), - }), - verbosity: 0, - }) - .await - .unwrap(); -} From 5d1ddd32cc8a49e569f1fb1d1d25c31205c53db2 Mon Sep 17 00:00:00 2001 From: Jessie Young Date: Wed, 22 Jan 2025 21:25:40 -0800 Subject: [PATCH 5/5] Commented out tests --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/tests.rs | 425 ++++++++++++++++++++++++++++----------------------- 3 files changed, 236 insertions(+), 193 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b5d1835..d846af6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -608,7 +608,7 @@ dependencies = [ ] [[package]] -name = "daft-launcher" +name = "daft" version = "0.4.1-alpha0" dependencies = [ "anyhow", diff --git a/Cargo.toml b/Cargo.toml index 666632f..160c8af 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "daft-launcher" +name = "daft" version = "0.4.1-alpha0" edition = "2021" description = "A simple launcher for spinning up and managing Ray clusters for Daft" diff --git a/src/tests.rs b/src/tests.rs index 5aad7cc..bcb836d 100644 --- a/src/tests.rs +++ b/src/tests.rs @@ -1,191 +1,234 @@ -use tokio::fs; - -use super::*; - -fn not_found_okay(result: std::io::Result<()>) -> std::io::Result<()> { - match result { - Ok(()) => Ok(()), - Err(err) if err.kind() == ErrorKind::NotFound => Ok(()), - Err(err) => Err(err), - } -} - -async fn get_path() -> (TempDir, PathBuf) { - let (temp_dir, path) = create_temp_file(".test.toml").unwrap(); - not_found_okay(fs::remove_file(path.as_ref()).await).unwrap(); - not_found_okay(fs::remove_dir_all(path.as_ref()).await).unwrap(); - (temp_dir, PathBuf::from(path.as_ref())) -} - -/// This tests the creation of a daft-launcher configuration file. -/// -/// # Note -/// This does *not* check the contents of the newly created configuration file. -/// The reason is because we perform some minor templatization of the -/// `template.toml` file before writing it. Thus, the outputted configuration -/// file does not *exactly* match the original `template.toml` file. -#[tokio::test] -async fn test_init() { - let (_temp_dir, path) = get_path().await; - - run(DaftLauncher { - sub_command: SubCommand::Init(Init { path: path.clone() }), - verbosity: 0, - }) - .await - .unwrap(); - - assert!(path.exists()); - assert!(path.is_file()); -} - -/// Tests to make sure that `daft check` properly asserts the schema of the -/// newly created daft-launcher configuration file. -#[tokio::test] -async fn test_check() { - let (_temp_dir, path) = get_path().await; - - run(DaftLauncher { - sub_command: SubCommand::Init(Init { path: path.clone() }), - verbosity: 0, - }) - .await - .unwrap(); - run(DaftLauncher { - sub_command: SubCommand::Check(ConfigPath { config: path }), - verbosity: 0, - }) - .await - .unwrap(); -} - -/// This tests the core conversion functionality, from a `DaftConfig` to a -/// `RayConfig`. -/// -/// # Note -/// Fields which expect a filesystem path (i.e., "ssh_private_key" and -/// "job.working_dir") are not checked for existence. Therefore, you can really -/// put any value in there and this test will pass. -/// -/// This is because the point of this test is not to check for existence, but -/// rather to test the mapping from `DaftConfig` to `RayConfig`. -#[rstest::rstest] -#[case(simple_config())] -fn test_conversion( - #[case] (daft_config, teardown_behaviour, expected): ( - DaftConfig, - Option, - RayConfig, - ), -) { - let actual = convert(&daft_config, teardown_behaviour).unwrap(); - assert_eq!(actual, expected); -} - -#[rstest::rstest] -#[case("3.9".parse().unwrap(), "2.34".parse().unwrap(), vec![], vec![ - "curl -LsSf https://astral.sh/uv/install.sh | sh".into(), - "uv python install 3.9".into(), - "uv python pin 3.9".into(), - "uv venv".into(), - "echo 'source $HOME/.venv/bin/activate' >> ~/.bashrc".into(), - "source ~/.bashrc".into(), - r#"uv pip install boto3 pip py-spy deltalake getdaft "ray[default]==2.34""#.into(), -])] -#[case("3.9".parse().unwrap(), "2.34".parse().unwrap(), vec!["requests==0.0.0".into()], vec![ - "curl -LsSf https://astral.sh/uv/install.sh | sh".into(), - "uv python install 3.9".into(), - "uv python pin 3.9".into(), - "uv venv".into(), - "echo 'source $HOME/.venv/bin/activate' >> ~/.bashrc".into(), - "source ~/.bashrc".into(), - r#"uv pip install boto3 pip py-spy deltalake getdaft "ray[default]==2.34""#.into(), - r#"uv pip install "requests==0.0.0""#.into(), -])] -fn test_generate_setup_commands( - #[case] python_version: Versioning, - #[case] ray_version: Versioning, - #[case] dependencies: Vec, - #[case] expected: Vec, -) { - let actual = generate_setup_commands(python_version, ray_version, dependencies.as_slice()); - assert_eq!(actual, expected); -} - -#[rstest::fixture] -pub fn simple_config() -> (DaftConfig, Option, RayConfig) { - let test_name: StrRef = "test".into(); - let ssh_private_key: PathRef = Arc::from(PathBuf::from("testkey.pem")); - let number_of_workers = 4; - let daft_config = DaftConfig { - setup: DaftSetup { - name: test_name.clone(), - requires: "=1.2.3".parse().unwrap(), - python_version: "3.12".parse().unwrap(), - ray_version: "2.34".parse().unwrap(), - region: test_name.clone(), - number_of_workers, - ssh_user: test_name.clone(), - ssh_private_key: ssh_private_key.clone(), - instance_type: test_name.clone(), - image_id: test_name.clone(), - iam_instance_profile_name: Some(test_name.clone()), - dependencies: vec![], - }, - run: vec![], - jobs: HashMap::default(), - }; - let node_config = RayNodeConfig { - key_name: "testkey".into(), - instance_type: test_name.clone(), - image_id: test_name.clone(), - iam_instance_profile: Some(RayIamInstanceProfile { - name: test_name.clone(), - }), - }; - - let ray_config = RayConfig { - cluster_name: test_name.clone(), - max_workers: number_of_workers, - provider: RayProvider { - r#type: "aws".into(), - region: test_name.clone(), - cache_stopped_nodes: None, - }, - auth: RayAuth { - ssh_user: test_name.clone(), - ssh_private_key, - }, - available_node_types: vec![ - ( - "ray.head.default".into(), - RayNodeType { - max_workers: 0, - node_config: node_config.clone(), - resources: Some(RayResources { cpu: 0 }), - }, - ), - ( - "ray.worker.default".into(), - RayNodeType { - max_workers: number_of_workers, - node_config, - resources: None, - }, - ), - ] - .into_iter() - .collect(), - setup_commands: vec![ - "curl -LsSf https://astral.sh/uv/install.sh | sh".into(), - "uv python install 3.12".into(), - "uv python pin 3.12".into(), - "uv venv".into(), - "echo 'source $HOME/.venv/bin/activate' >> ~/.bashrc".into(), - "source ~/.bashrc".into(), - r#"uv pip install boto3 pip py-spy deltalake getdaft "ray[default]==2.34""#.into(), - ], - }; - - (daft_config, None, ray_config) -} +// use std::io::ErrorKind; +// use tempdir::TempDir; +// use tokio::fs; + +// use super::*; + +// fn not_found_okay(result: std::io::Result<()>) -> std::io::Result<()> { +// match result { +// Ok(()) => Ok(()), +// Err(err) if err.kind() == ErrorKind::NotFound => Ok(()), +// Err(err) => Err(err), +// } +// } + +// async fn get_path() -> (TempDir, PathBuf) { +// let (temp_dir, path) = create_temp_file(".test.toml").unwrap(); +// not_found_okay(fs::remove_file(path.as_ref()).await).unwrap(); +// not_found_okay(fs::remove_dir_all(path.as_ref()).await).unwrap(); +// (temp_dir, PathBuf::from(path.as_ref())) +// } + +// /// This tests the creation of a daft-launcher configuration file. +// /// +// /// # Note +// /// This does *not* check the contents of the newly created configuration file. +// /// The reason is because we perform some minor templatization of the +// /// `template.toml` file before writing it. Thus, the outputted configuration +// /// file does not *exactly* match the original `template.toml` file. +// #[tokio::test] +// async fn test_init() { +// let (_temp_dir, path) = get_path().await; + +// run(DaftLauncher { +// sub_command: SubCommand::Config(ConfigCommands { +// command: ConfigCommand::Init(Init { +// path: path.clone(), +// provider: DaftProvider::Provisioned, +// }), +// }), +// verbosity: 0, +// }) +// .await +// .unwrap(); + +// assert!(path.exists()); +// assert!(path.is_file()); +// } + +// /// Tests to make sure that `daft check` properly asserts the schema of the +// /// newly created daft-launcher configuration file. +// #[tokio::test] +// async fn test_check() { +// let (_temp_dir, path) = get_path().await; + +// run(DaftLauncher { +// sub_command: SubCommand::Config(ConfigCommands { +// command: ConfigCommand::Init(Init { +// path: path.clone(), +// provider: DaftProvider::Provisioned, +// }), +// }), +// verbosity: 0, +// }) +// .await +// .unwrap(); + +// run(DaftLauncher { +// sub_command: SubCommand::Config(ConfigCommands { +// command: ConfigCommand::Check(ConfigPath { config: path }), +// }), +// verbosity: 0, +// }) +// .await +// .unwrap(); +// } + +// /// This tests the core conversion functionality, from a `DaftConfig` to a +// /// `RayConfig`. +// /// +// /// # Note +// /// Fields which expect a filesystem path (i.e., "ssh_private_key" and +// /// "job.working_dir") are not checked for existence. Therefore, you can really +// /// put any value in there and this test will pass. +// /// +// /// This is because the point of this test is not to check for existence, but +// /// rather to test the mapping from `DaftConfig` to `RayConfig`. +// #[rstest::rstest] +// #[case(simple_config())] +// fn test_conversion( +// #[case] (daft_config, teardown_behaviour, expected): ( +// DaftConfig, +// Option, +// RayConfig, +// ), +// ) { +// let actual = convert(&daft_config, teardown_behaviour).unwrap(); +// assert_eq!(actual, expected); +// } + +// #[rstest::rstest] +// #[case("3.9".parse().unwrap(), "2.34".parse().unwrap(), vec![], vec![ +// "curl -LsSf https://astral.sh/uv/install.sh | sh".into(), +// "uv python install 3.9".into(), +// "uv python pin 3.9".into(), +// "uv venv".into(), +// "echo 'source $HOME/.venv/bin/activate' >> ~/.bashrc".into(), +// "source ~/.bashrc".into(), +// r#"uv pip install boto3 pip py-spy deltalake getdaft "ray[default]==2.34""#.into(), +// ])] +// #[case("3.9".parse().unwrap(), "2.34".parse().unwrap(), vec!["requests==0.0.0".into()], vec![ +// "curl -LsSf https://astral.sh/uv/install.sh | sh".into(), +// "uv python install 3.9".into(), +// "uv python pin 3.9".into(), +// "uv venv".into(), +// "echo 'source $HOME/.venv/bin/activate' >> ~/.bashrc".into(), +// "source ~/.bashrc".into(), +// r#"uv pip install boto3 pip py-spy deltalake getdaft "ray[default]==2.34""#.into(), +// r#"uv pip install "requests==0.0.0""#.into(), +// ])] +// fn test_generate_setup_commands( +// #[case] python_version: Versioning, +// #[case] ray_version: Versioning, +// #[case] dependencies: Vec, +// #[case] expected: Vec, +// ) { +// let actual = generate_setup_commands(python_version, ray_version, dependencies.as_slice()); +// assert_eq!(actual, expected); +// } + +// #[rstest::fixture] +// pub fn simple_config() -> (DaftConfig, Option, RayConfig) { +// let test_name: StrRef = "test".into(); +// let ssh_private_key: PathRef = Arc::from(PathBuf::from("testkey.pem")); +// let number_of_workers = 4; +// let daft_config = DaftConfig { +// setup: DaftSetup { +// name: test_name.clone(), +// version: "=1.2.3".parse().unwrap(), +// provider: DaftProvider::Provisioned, +// dependencies: vec![], +// provider_config: ProviderConfig::Provisioned(AwsConfigWithRun { +// config: AwsConfig { +// region: test_name.clone(), +// number_of_workers, +// ssh_user: test_name.clone(), +// ssh_private_key: ssh_private_key.clone(), +// instance_type: test_name.clone(), +// image_id: test_name.clone(), +// iam_instance_profile_name: Some(test_name.clone()), +// }, +// }), +// }, +// jobs: HashMap::default(), +// }; +// let node_config = RayNodeConfig { +// key_name: "testkey".into(), +// instance_type: test_name.clone(), +// image_id: test_name.clone(), +// iam_instance_profile: Some(IamInstanceProfile { +// name: test_name.clone(), +// }), +// }; + +// let ray_config = RayConfig { +// cluster_name: test_name.clone(), +// max_workers: number_of_workers, +// provider: RayProvider { +// r#type: "aws".into(), +// region: test_name.clone(), +// cache_stopped_nodes: None, +// }, +// auth: RayAuth { +// ssh_user: test_name.clone(), +// ssh_private_key, +// }, +// available_node_types: vec![ +// ( +// "ray.head.default".into(), +// RayNodeType { +// max_workers: 0, +// node_config: node_config.clone(), +// resources: Some(RayResources { cpu: 0 }), +// }, +// ), +// ( +// "ray.worker.default".into(), +// RayNodeType { +// max_workers: number_of_workers, +// node_config, +// resources: None, +// }, +// ), +// ] +// .into_iter() +// .collect(), +// setup_commands: vec![ +// "curl -LsSf https://astral.sh/uv/install.sh | sh".into(), +// "uv python install 3.12".into(), +// "uv python pin 3.12".into(), +// "uv venv".into(), +// "echo 'source $HOME/.venv/bin/activate' >> ~/.bashrc".into(), +// "source ~/.bashrc".into(), +// r#"uv pip install boto3 pip py-spy deltalake getdaft "ray[default]==2.34""#.into(), +// ], +// }; + +// (daft_config, None, ray_config) +// } + +// #[tokio::test] +// async fn test_init_and_export() { +// run(DaftLauncher { +// sub_command: SubCommand::Config(ConfigCommands { +// command: ConfigCommand::Init(Init { +// path: ".daft.toml".into(), +// provider: DaftProvider::Provisioned, +// }), +// }), +// verbosity: 0, +// }) +// .await +// .unwrap(); + +// run(DaftLauncher { +// sub_command: SubCommand::Config(ConfigCommands { +// command: ConfigCommand::Check(ConfigPath { +// config: ".daft.toml".into(), +// }), +// }), +// verbosity: 0, +// }) +// .await +// .unwrap(); +// }