Skip to content

Commit 631f86f

Browse files
committed
Addressed PR comments
1 parent 9e50b05 commit 631f86f

File tree

4 files changed

+47
-68
lines changed

4 files changed

+47
-68
lines changed

Cargo.lock

+1-1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
[package]
2-
name = "daft"
2+
name = "daft-launcher"
33
version = "0.4.1-alpha0"
44
edition = "2021"
55
description = "A simple launcher for spinning up and managing Ray clusters for Daft"

src/main.rs

+45-40
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ use std::{
99
time::Duration,
1010
};
1111

12+
#[cfg(test)]
13+
mod tests;
14+
1215
#[cfg(not(test))]
1316
use anyhow::bail;
1417
use aws_config::{BehaviorVersion, Region};
@@ -191,7 +194,7 @@ struct ConfigPath {
191194
#[serde(rename_all = "kebab-case", deny_unknown_fields)]
192195
struct DaftConfig {
193196
setup: DaftSetup,
194-
#[serde(rename = "job", deserialize_with = "parse_jobs")]
197+
#[serde(default, rename = "job", deserialize_with = "parse_jobs")]
195198
jobs: HashMap<StrRef, DaftJob>,
196199
}
197200

@@ -243,7 +246,8 @@ struct AwsConfig {
243246
#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
244247
#[serde(rename_all = "kebab-case", deny_unknown_fields)]
245248
struct K8sConfig {
246-
namespace: Option<StrRef>,
249+
#[serde(default = "default_k8s_namespace")]
250+
namespace: StrRef,
247251
}
248252

249253
fn parse_jobs<'de, D>(deserializer: D) -> Result<HashMap<StrRef, DaftJob>, D::Error>
@@ -323,6 +327,10 @@ fn default_image_id() -> StrRef {
323327
"ami-04dd23e62ed049936".into()
324328
}
325329

330+
fn default_k8s_namespace() -> StrRef {
331+
"default".into()
332+
}
333+
326334
fn parse_version_req<'de, D>(deserializer: D) -> Result<VersionReq, D::Error>
327335
where
328336
D: serde::Deserializer<'de>,
@@ -344,9 +352,7 @@ where
344352
#[derive(Debug, Deserialize, Clone, PartialEq, Eq)]
345353
#[serde(rename_all = "kebab-case", deny_unknown_fields)]
346354
enum DaftProvider {
347-
#[serde(rename = "provisioned")]
348355
Provisioned,
349-
#[serde(rename = "byoc")]
350356
Byoc,
351357
}
352358

@@ -862,8 +868,39 @@ impl Drop for PortForward {
862868
}
863869
}
864870

865-
async fn establish_kubernetes_port_forward(namespace: Option<&str>) -> anyhow::Result<PortForward> {
866-
let namespace = namespace.unwrap_or("default");
871+
async fn submit_k8s(
872+
working_dir: &Path,
873+
command_segments: impl AsRef<[&str]>,
874+
namespace: &str,
875+
) -> anyhow::Result<()> {
876+
let command_segments = command_segments.as_ref();
877+
878+
// Start port forwarding - it will be automatically killed when _port_forward is dropped
879+
let _port_forward = establish_kubernetes_port_forward(namespace).await?;
880+
881+
// Give the port-forward a moment to fully establish
882+
tokio::time::sleep(Duration::from_secs(1)).await;
883+
884+
// Submit the job
885+
let exit_status = Command::new("ray")
886+
.env("PYTHONUNBUFFERED", "1")
887+
.args(["job", "submit", "--address", "http://localhost:8265"])
888+
.arg("--working-dir")
889+
.arg(working_dir)
890+
.arg("--")
891+
.args(command_segments)
892+
.spawn()?
893+
.wait()
894+
.await?;
895+
896+
if exit_status.success() {
897+
Ok(())
898+
} else {
899+
Err(anyhow::anyhow!("Failed to submit job to the ray cluster"))
900+
}
901+
}
902+
903+
async fn establish_kubernetes_port_forward(namespace: &str) -> anyhow::Result<PortForward> {
867904
let output = Command::new("kubectl")
868905
.arg("get")
869906
.arg("svc")
@@ -923,38 +960,6 @@ async fn establish_kubernetes_port_forward(namespace: Option<&str>) -> anyhow::R
923960
}
924961
}
925962

926-
async fn submit_k8s(
927-
working_dir: &Path,
928-
command_segments: impl AsRef<[&str]>,
929-
namespace: Option<&str>,
930-
) -> anyhow::Result<()> {
931-
let command_segments = command_segments.as_ref();
932-
933-
// Start port forwarding - it will be automatically killed when _port_forward is dropped
934-
let _port_forward = establish_kubernetes_port_forward(namespace).await?;
935-
936-
// Give the port-forward a moment to fully establish
937-
tokio::time::sleep(Duration::from_secs(1)).await;
938-
939-
// Submit the job
940-
let exit_status = Command::new("ray")
941-
.env("PYTHONUNBUFFERED", "1")
942-
.args(["job", "submit", "--address", "http://localhost:8265"])
943-
.arg("--working-dir")
944-
.arg(working_dir)
945-
.arg("--")
946-
.args(command_segments)
947-
.spawn()?
948-
.wait()
949-
.await?;
950-
951-
if exit_status.success() {
952-
Ok(())
953-
} else {
954-
Err(anyhow::anyhow!("Failed to submit job to the ray cluster"))
955-
}
956-
}
957-
958963
async fn run(daft_launcher: DaftLauncher) -> anyhow::Result<()> {
959964
match daft_launcher.sub_command {
960965
SubCommand::Config(config_cmd) => {
@@ -1063,7 +1068,7 @@ impl JobCommand {
10631068
submit_k8s(
10641069
daft_job.working_dir.as_ref(),
10651070
daft_job.command.as_ref().split(' ').collect::<Vec<_>>(),
1066-
k8s_config.namespace.as_deref(),
1071+
k8s_config.namespace.as_ref(),
10671072
)
10681073
.await?;
10691074
}
@@ -1081,7 +1086,7 @@ impl JobCommand {
10811086
submit_k8s(
10821087
temp_sql_dir.path(),
10831088
vec!["python", "sql.py", sql.as_ref()],
1084-
k8s_config.namespace.as_deref(),
1089+
k8s_config.namespace.as_ref(),
10851090
)
10861091
.await?;
10871092
}

src/tests.rs

-26
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
use tokio::fs;
22

33
use super::*;
4-
use crate::{ConfigCommand, ConfigCommands, ConfigPath, DaftLauncher, Init, SubCommand};
54

65
fn not_found_okay(result: std::io::Result<()>) -> std::io::Result<()> {
76
match result {
@@ -190,28 +189,3 @@ pub fn simple_config() -> (DaftConfig, Option<TeardownBehaviour>, RayConfig) {
190189

191190
(daft_config, None, ray_config)
192191
}
193-
194-
#[tokio::test]
195-
async fn test_init_and_export() {
196-
crate::run(DaftLauncher {
197-
sub_command: SubCommand::Config(ConfigCommands {
198-
command: ConfigCommand::Init(Init {
199-
path: ".daft.toml".into(),
200-
provider: "provisioned".into(),
201-
}),
202-
}),
203-
verbosity: 0,
204-
})
205-
.await
206-
.unwrap();
207-
crate::run(DaftLauncher {
208-
sub_command: SubCommand::Config(ConfigCommands {
209-
command: ConfigCommand::Check(ConfigPath {
210-
config: ".daft.toml".into(),
211-
}),
212-
}),
213-
verbosity: 0,
214-
})
215-
.await
216-
.unwrap();
217-
}

0 commit comments

Comments
 (0)