diff --git a/benchmarks/src/monitoring/record.py b/benchmarks/src/monitoring/record.py index d89b72a87..9d31a7e6f 100644 --- a/benchmarks/src/monitoring/record.py +++ b/benchmarks/src/monitoring/record.py @@ -82,7 +82,10 @@ def record_processes(processes: List[psutil.Process]) -> Dict[str, ProcessRecord children_system=cpu_times.children_system, ) data[str(process.pid)] = ProcessRecord( - rss=memory_info.rss, vm=memory_info.vms, cpu=cpu_utilization, cpu_times=cpu_times + rss=memory_info.rss, + vm=memory_info.vms, + cpu=cpu_utilization, + cpu_times=cpu_times, ) except BaseException as e: logging.error(e) diff --git a/crates/tako/src/internal/messages/worker.rs b/crates/tako/src/internal/messages/worker.rs index fc4285d7a..04c4a5285 100644 --- a/crates/tako/src/internal/messages/worker.rs +++ b/crates/tako/src/internal/messages/worker.rs @@ -73,9 +73,6 @@ pub enum ToWorkerMessage { #[derive(Deserialize, Serialize, Debug)] pub struct TaskFinishedMsg { pub id: TaskId, - pub size: u64, - /*#[serde(with = "serde_bytes")] - pub r#type: Vec,*/ } #[derive(Deserialize, Serialize, Debug)] @@ -141,7 +138,6 @@ pub enum FromWorkerMessage { TaskFinished(TaskFinishedMsg), TaskFailed(TaskFailedMsg), TaskRunning(TaskRunningMsg), - DataDownloaded(DataDownloadedMsg), StealResponse(StealResponseMsg), Overview(WorkerOverview), Heartbeat, diff --git a/crates/tako/src/internal/scheduler/mod.rs b/crates/tako/src/internal/scheduler/mod.rs index da0b49489..4765ccab3 100644 --- a/crates/tako/src/internal/scheduler/mod.rs +++ b/crates/tako/src/internal/scheduler/mod.rs @@ -2,4 +2,3 @@ pub mod metrics; pub mod multinode; pub(crate) mod query; pub mod state; -mod utils; diff --git a/crates/tako/src/internal/scheduler/state.rs b/crates/tako/src/internal/scheduler/state.rs index 756b8b786..3f6f1daf6 100644 --- a/crates/tako/src/internal/scheduler/state.rs +++ b/crates/tako/src/internal/scheduler/state.rs @@ -1,25 +1,22 @@ -use std::cmp::{Ordering, Reverse}; +use std::cmp::Reverse; use std::rc::Rc; use std::time::{Duration, Instant}; use tokio::sync::Notify; use tokio::time::sleep; -//use crate::internal::common::trace::trace_time; use crate::internal::common::Map; use crate::internal::messages::worker::{TaskIdsMsg, ToWorkerMessage}; use crate::internal::scheduler::multinode::MultiNodeAllocator; use crate::internal::server::comm::{Comm, CommSenderRef}; use crate::internal::server::core::{Core, CoreRef}; use crate::internal::server::task::{Task, TaskRuntimeState}; -use crate::internal::server::taskmap::TaskMap; use crate::internal::server::worker::Worker; use crate::internal::server::workerload::ResourceRequestLowerBound; use crate::internal::server::workermap::WorkerMap; use crate::{TaskId, WorkerId}; use super::metrics::compute_b_level_metric; -use super::utils::task_transfer_cost; // Long duration - 1 year const LONG_DURATION: std::time::Duration = std::time::Duration::from_secs(365 * 24 * 60 * 60); @@ -98,7 +95,6 @@ impl SchedulerState { fn choose_worker_for_task( &mut self, task: &Task, - taskmap: &TaskMap, worker_map: &Map, try_prev_worker: bool, // Enable heuristics that tries to fit tasks on fewer workers ) -> Option { @@ -119,22 +115,11 @@ impl SchedulerState { self.tmp_workers.clear(); // This has to be called AFTER fast path - let mut costs = u64::MAX; for worker in worker_map.values() { if !worker.is_capable_to_run_rqv(&task.configuration.resources, self.now) { continue; } - - let c = task_transfer_cost(taskmap, task, worker.id); - match c.cmp(&costs) { - Ordering::Less => { - costs = c; - self.tmp_workers.clear(); - self.tmp_workers.push(worker.id); - } - Ordering::Equal => self.tmp_workers.push(worker.id), - Ordering::Greater => { /* Do nothing */ } - } + self.tmp_workers.push(worker.id) } self.pick_worker() } @@ -251,7 +236,7 @@ impl SchedulerState { } pub fn assign(&mut self, core: &mut Core, task_id: TaskId, worker_id: WorkerId) { - let (inputs, assigned_worker) = { + { let (tasks, workers) = core.split_tasks_workers_mut(); let task = tasks.get_task(task_id); let assigned_worker = task.get_assigned_worker(); @@ -274,14 +259,6 @@ impl SchedulerState { (task.inputs.clone(), assigned_worker) }; - for ti in inputs.into_iter() { - let input = core.get_task_mut(ti.task()); - if let Some(wr) = assigned_worker { - input.remove_future_placement(wr); - } - input.set_future_placement(worker_id); - } - let (tasks, workers) = core.split_tasks_workers_mut(); let task = tasks.get_task_mut(task_id); workers.get_worker_mut(worker_id).insert_sn_task(task); @@ -395,12 +372,7 @@ impl SchedulerState { } if let Some(task) = core.find_task(task_id) { - self.choose_worker_for_task( - task, - core.task_map(), - core.get_worker_map(), - try_prev_worker, - ) + self.choose_worker_for_task(task, core.get_worker_map(), try_prev_worker) } else { continue; } @@ -491,7 +463,7 @@ impl SchedulerState { // and tN is the lowest cost to schedule here ts.sort_by_cached_key(|&task_id| { let task = task_map.get_task(task_id); - let mut cost = task_transfer_cost(task_map, task, worker.id); + let mut cost = 0; // TODO: Transfer costs if !task.is_fresh() && task.get_assigned_worker() != Some(worker.id) { cost += 10_000_000; } diff --git a/crates/tako/src/internal/scheduler/utils.rs b/crates/tako/src/internal/scheduler/utils.rs deleted file mode 100644 index a3de4a4ed..000000000 --- a/crates/tako/src/internal/scheduler/utils.rs +++ /dev/null @@ -1,22 +0,0 @@ -use crate::internal::server::task::Task; -use crate::internal::server::taskmap::TaskMap; -use crate::WorkerId; - -pub(crate) fn task_transfer_cost(taskmap: &TaskMap, task: &Task, worker_id: WorkerId) -> u64 { - // TODO: For large number of inputs, only sample inputs - task.inputs - .iter() - .take(512) - .map(|ti| { - let t = taskmap.get_task(ti.task()); - let info = t.data_info().unwrap(); - if info.placement.contains(&worker_id) { - 0u64 - } else if info.future_placement.contains_key(&worker_id) { - 1u64 - } else { - info.data_info.size - } - }) - .sum() -} diff --git a/crates/tako/src/internal/server/client.rs b/crates/tako/src/internal/server/client.rs index dd2e6d550..ace22f8b9 100644 --- a/crates/tako/src/internal/server/client.rs +++ b/crates/tako/src/internal/server/client.rs @@ -242,7 +242,7 @@ fn handle_new_tasks( .shared_data .into_iter() .map(|c| { - assert!(c.n_outputs == 0 || c.n_outputs == 1); // TODO: Implementation for more outputs + assert_eq!(c.n_outputs, 0); // TODO: Implementation for more outputs let keep = c.keep; let observe = c.observe; (Rc::new(create_task_configuration(core, c)), keep, observe) diff --git a/crates/tako/src/internal/server/reactor.rs b/crates/tako/src/internal/server/reactor.rs index 9bfc0ee65..1be8a16bd 100644 --- a/crates/tako/src/internal/server/reactor.rs +++ b/crates/tako/src/internal/server/reactor.rs @@ -7,8 +7,8 @@ use crate::internal::messages::worker::{ }; use crate::internal::server::comm::Comm; use crate::internal::server::core::Core; -use crate::internal::server::task::{DataInfo, Task, TaskRuntimeState}; use crate::internal::server::task::{FinishInfo, WaitingInfo}; +use crate::internal::server::task::{Task, TaskRuntimeState}; use crate::internal::server::worker::Worker; use crate::internal::server::workermap::WorkerMap; use crate::{TaskId, WorkerId}; @@ -84,13 +84,7 @@ pub(crate) fn on_remove_worker( continue; } } - TaskRuntimeState::Finished(finfo) => { - finfo.future_placement.remove(&worker_id); - finfo.placement.remove(&worker_id); - if finfo.placement.is_empty() { - todo!(); - // We have lost last worker that have this data - } + TaskRuntimeState::Finished(_finfo) => { continue; } TaskRuntimeState::RunningMultiNode(ws) => { @@ -306,11 +300,7 @@ pub(crate) fn on_task_finished( placement.insert(worker_id); } - task.state = TaskRuntimeState::Finished(FinishInfo { - data_info: DataInfo { size: msg.size }, - placement, - future_placement: Default::default(), - }); + task.state = TaskRuntimeState::Finished(FinishInfo {}); comm.ask_for_scheduling(); if task.is_observed() { @@ -601,25 +591,6 @@ pub(crate) fn on_cancel_tasks( (to_unregister.into_iter().collect(), already_finished) } -pub(crate) fn on_tasks_transferred(core: &mut Core, worker_id: WorkerId, task_id: TaskId) { - log::debug!("Task id={} transferred to worker={}", task_id, worker_id); - // TODO handle the race when task is removed from server before this message arrives - if let Some(task) = core.find_task_mut(task_id) { - match &mut task.state { - TaskRuntimeState::Finished(ref mut winfo) => { - winfo.placement.insert(worker_id); - } - TaskRuntimeState::Waiting(_) - | TaskRuntimeState::Running { .. } - | TaskRuntimeState::RunningMultiNode(_) - | TaskRuntimeState::Assigned(_) - | TaskRuntimeState::Stealing(_, _) => { - panic!("Invalid task state"); - } - }; - } -} - fn unregister_as_consumer(core: &mut Core, comm: &mut impl Comm, task_id: TaskId) { let inputs: Vec = core .get_task(task_id) @@ -640,7 +611,7 @@ fn remove_task_if_possible(core: &mut Core, _comm: &mut impl Comm, task_id: Task } match core.remove_task(task_id) { - TaskRuntimeState::Finished(finfo) => finfo.placement, + TaskRuntimeState::Finished(_finfo) => { /* Ok */ } _ => unreachable!(), }; log::debug!("Task id={task_id} is no longer needed"); diff --git a/crates/tako/src/internal/server/rpc.rs b/crates/tako/src/internal/server/rpc.rs index 79cd8395f..96f9f236f 100644 --- a/crates/tako/src/internal/server/rpc.rs +++ b/crates/tako/src/internal/server/rpc.rs @@ -20,7 +20,7 @@ use crate::internal::server::comm::{Comm, CommSenderRef}; use crate::internal::server::core::CoreRef; use crate::internal::server::reactor::{ on_new_worker, on_remove_worker, on_steal_response, on_task_error, on_task_finished, - on_task_running, on_tasks_transferred, + on_task_running, }; use crate::internal::server::worker::Worker; use crate::internal::transfer::auth::{ @@ -275,9 +275,6 @@ pub(crate) async fn worker_receive_loop< FromWorkerMessage::TaskFailed(msg) => { on_task_error(&mut core, &mut *comm, worker_id, msg.id, msg.info); } - FromWorkerMessage::DataDownloaded(msg) => { - on_tasks_transferred(&mut core, worker_id, msg.id) - } FromWorkerMessage::StealResponse(msg) => { on_steal_response(&mut core, &mut *comm, worker_id, msg) } diff --git a/crates/tako/src/internal/server/task.rs b/crates/tako/src/internal/server/task.rs index 8672c6e63..4b6ecb888 100644 --- a/crates/tako/src/internal/server/task.rs +++ b/crates/tako/src/internal/server/task.rs @@ -4,19 +4,13 @@ use std::time::Duration; use thin_vec::ThinVec; use crate::internal::common::stablemap::ExtractKey; -use crate::internal::common::{Map, Set}; +use crate::internal::common::Set; use crate::internal::messages::worker::{ComputeTaskMsg, ToWorkerMessage}; use crate::internal::server::taskmap::TaskMap; use crate::WorkerId; use crate::{static_assert_size, TaskId}; use crate::{InstanceId, Priority}; -#[derive(Debug)] -#[cfg_attr(test, derive(Eq, PartialEq))] -pub struct DataInfo { - pub size: u64, -} - #[cfg_attr(test, derive(Eq, PartialEq))] pub struct WaitingInfo { pub unfinished_deps: u32, @@ -24,11 +18,7 @@ pub struct WaitingInfo { } #[cfg_attr(test, derive(Eq, PartialEq))] -pub struct FinishInfo { - pub data_info: DataInfo, - pub placement: Set, - pub future_placement: Map, -} +pub struct FinishInfo {} #[cfg_attr(test, derive(Eq, PartialEq))] pub enum TaskRuntimeState { @@ -350,14 +340,6 @@ impl Task { ) } - #[inline] - pub(crate) fn data_info(&self) -> Option<&FinishInfo> { - match &self.state { - TaskRuntimeState::Finished(finfo) => Some(finfo), - _ => None, - } - } - #[inline] pub(crate) fn get_assigned_worker(&self) -> Option { match &self.state { @@ -371,42 +353,6 @@ impl Task { } } - #[cfg(test)] - #[inline] - pub(crate) fn get_placement(&self) -> Option<&Set> { - match &self.state { - TaskRuntimeState::Finished(finfo) => Some(&finfo.placement), - _ => None, - } - } - - pub(crate) fn remove_future_placement(&mut self, worker_id: WorkerId) { - match &mut self.state { - TaskRuntimeState::Finished(finfo) => { - let count = finfo.future_placement.get_mut(&worker_id).unwrap(); - if *count <= 1 { - assert_ne!(*count, 0); - finfo.future_placement.remove(&worker_id); - } else { - *count -= 1; - } - } - _ => { - unreachable!() - } - } - } - - #[inline] - pub(crate) fn set_future_placement(&mut self, worker_id: WorkerId) { - match self.state { - TaskRuntimeState::Finished(ref mut finfo) => { - (*finfo.future_placement.entry(worker_id).or_insert(0)) += 1; - } - _ => unreachable!(), - } - } - #[inline] pub(crate) fn get_scheduler_priority(&self) -> i32 { /*match self.state { diff --git a/crates/tako/src/internal/tests/test_reactor.rs b/crates/tako/src/internal/tests/test_reactor.rs index 60046a105..b2a2f437c 100644 --- a/crates/tako/src/internal/tests/test_reactor.rs +++ b/crates/tako/src/internal/tests/test_reactor.rs @@ -14,7 +14,6 @@ use crate::internal::server::core::Core; use crate::internal::server::reactor::{ on_cancel_tasks, on_new_tasks, on_new_worker, on_remove_worker, on_reset_keep_flag, on_set_observe_flag, on_steal_response, on_task_error, on_task_finished, on_task_running, - on_tasks_transferred, }; use crate::internal::server::task::{Task, TaskRuntimeState}; use crate::internal::server::worker::Worker; @@ -260,10 +259,7 @@ fn test_assignments_and_finish() { &mut core, &mut comm, 100.into(), - TaskFinishedMsg { - id: 15.into(), - size: 301, - }, + TaskFinishedMsg { id: 15.into() }, ); assert!(core.find_task(15.into()).is_none()); @@ -283,10 +279,7 @@ fn test_assignments_and_finish() { &mut core, &mut comm, 101.into(), - TaskFinishedMsg { - id: 12.into(), - size: 5000, - }, + TaskFinishedMsg { id: 12.into() }, ); assert!(core.get_task(12.into()).is_finished()); @@ -303,10 +296,7 @@ fn test_assignments_and_finish() { &mut core, &mut comm, 100.into(), - TaskFinishedMsg { - id: 11.into(), - size: 1000, - }, + TaskFinishedMsg { id: 11.into() }, ); comm.check_need_scheduling(); @@ -329,10 +319,7 @@ fn test_assignments_and_finish() { &mut core, &mut comm, 101.into(), - TaskFinishedMsg { - id: 13.into(), - size: 1000, - }, + TaskFinishedMsg { id: 13.into() }, ); comm.check_need_scheduling(); @@ -387,33 +374,6 @@ fn test_running_task_on_error() { core.sanity_check(); } -#[test] -fn test_running_task_on_task_transferred_invalid() { - let mut core = Core::default(); - create_test_workers(&mut core, &[1, 1, 1]); - on_tasks_transferred(&mut core, 102.into(), 42.into()); - core.sanity_check(); -} - -#[test] -fn test_running_task_on_task_transferred() { - let mut core = Core::default(); - create_test_workers(&mut core, &[1, 1, 1]); - submit_example_1(&mut core); - start_and_finish_on_worker(&mut core, 11, 100, 1000); - start_and_finish_on_worker(&mut core, 12, 101, 1000); - start_on_worker(&mut core, 13, 101); - - on_tasks_transferred(&mut core, 101.into(), 11.into()); - - let ws = core.get_task(11.into()).get_placement().unwrap().clone(); - let mut set = Set::new(); - set.insert(WorkerId::new(100)); - set.insert(WorkerId::new(101)); - assert_eq!(ws, set); - core.sanity_check(); -} - #[test] fn test_steal_tasks_ok() { let mut core = Core::default(); @@ -529,10 +489,7 @@ fn finish_task_without_outputs() { &mut core, &mut comm, 100.into(), - TaskFinishedMsg { - id: 1.into(), - size: 0, - }, + TaskFinishedMsg { id: 1.into() }, ); comm.check_need_scheduling(); comm.emptiness_check(); @@ -850,10 +807,7 @@ fn test_finished_before_steal_response() { &mut core, &mut comm, 101.into(), - TaskFinishedMsg { - id: 1.into(), - size: 0, - }, + TaskFinishedMsg { id: 1.into() }, ); comm.check_need_scheduling(); @@ -939,10 +893,7 @@ fn test_after_cancel_messages() { &mut core, &mut comm, 101.into(), - TaskFinishedMsg { - id: 1.into(), - size: 100, - }, + TaskFinishedMsg { id: 1.into() }, ); comm.emptiness_check(); diff --git a/crates/tako/src/internal/tests/test_scheduler_sn.rs b/crates/tako/src/internal/tests/test_scheduler_sn.rs index fd5cf388e..21985db10 100644 --- a/crates/tako/src/internal/tests/test_scheduler_sn.rs +++ b/crates/tako/src/internal/tests/test_scheduler_sn.rs @@ -206,102 +206,6 @@ fn test_no_deps_distribute_with_balance() { core.sanity_check(); } -#[test] -fn test_minimal_transfer_no_balance1() { - /*11 12 - \ / \ - 13 14 - - 11 - is big on W100 - 12 - is small on W101 - */ - - let mut core = Core::default(); - create_test_workers(&mut core, &[2, 2, 2]); - submit_example_1(&mut core); - start_and_finish_on_worker(&mut core, 11, 100, 10000); - start_and_finish_on_worker(&mut core, 12, 101, 1000); - - let mut scheduler = create_test_scheduler(); - let mut comm = create_test_comm(); - scheduler.run_scheduling(&mut core, &mut comm); - - comm.take_worker_msgs(100, 1); - comm.take_worker_msgs(101, 1); - - check_task_has_worker(&core, 13, 100); - check_task_has_worker(&core, 14, 101); - - comm.emptiness_check(); - core.sanity_check(); -} - -#[test] -fn test_minimal_transfer_no_balance2() { - /*11 12 - \ / \ - 13 14 - - 11 - is small on W100 - 12 - is big on W102 - */ - //setup_logging(); - let mut core = Core::default(); - create_test_workers(&mut core, &[2, 2, 2]); - submit_example_1(&mut core); - start_and_finish_on_worker(&mut core, 11, 100, 1000); - start_and_finish_on_worker(&mut core, 12, 101, 10000); - - let mut scheduler = create_test_scheduler(); - let mut comm = create_test_comm(); - scheduler.run_scheduling(&mut core, &mut comm); - - comm.take_worker_msgs(101, 2); - - assert_eq!( - core.get_task(13.into()).get_assigned_worker().unwrap(), - WorkerId(101) - ); - assert_eq!( - core.get_task(14.into()).get_assigned_worker().unwrap(), - WorkerId(101) - ); - - comm.emptiness_check(); - core.sanity_check(); -} - -#[test] -fn test_minimal_transfer_after_balance() { - /*11 12 - \ / \ - 13 14 - - 11 - is on W100 - 12 - is on W100 - */ - - //setup_logging(); - let mut core = Core::default(); - create_test_workers(&mut core, &[1, 1]); - submit_example_1(&mut core); - start_and_finish_on_worker(&mut core, 11, 100, 10000); - start_and_finish_on_worker(&mut core, 12, 100, 10000); - - let mut scheduler = create_test_scheduler(); - let mut comm = create_test_comm(); - scheduler.run_scheduling(&mut core, &mut comm); - - comm.take_worker_msgs(100, 1); - comm.take_worker_msgs(101, 1); - - check_task_has_worker(&core, 13, 100); - check_task_has_worker(&core, 14, 101); - - comm.emptiness_check(); - core.sanity_check(); -} - #[test] fn test_resource_balancing1() { let mut rt = TestEnv::new(); diff --git a/crates/tako/src/internal/tests/utils/schedule.rs b/crates/tako/src/internal/tests/utils/schedule.rs index e1b595b4b..ee88421b5 100644 --- a/crates/tako/src/internal/tests/utils/schedule.rs +++ b/crates/tako/src/internal/tests/utils/schedule.rs @@ -141,10 +141,7 @@ pub fn finish_on_worker, T: Into>( core, &mut comm, worker_id.into(), - TaskFinishedMsg { - id: task_id.into(), - size, - }, + TaskFinishedMsg { id: task_id.into() }, ); } diff --git a/crates/tako/src/internal/worker/reactor.rs b/crates/tako/src/internal/worker/reactor.rs index 84dd1cca8..9ddece1c7 100644 --- a/crates/tako/src/internal/worker/reactor.rs +++ b/crates/tako/src/internal/worker/reactor.rs @@ -102,7 +102,7 @@ async fn handle_task_future(task_future: TaskFuture, state_ref: WorkerStateRef, match result { Ok(TaskResult::Finished) => { log::debug!("Inner task finished id={}", task_id); - state.finish_task(task_id, 0); + state.finish_task(task_id); } Ok(TaskResult::Canceled) => { log::debug!("Inner task canceled id={}", task_id); diff --git a/crates/tako/src/internal/worker/state.rs b/crates/tako/src/internal/worker/state.rs index a7f3661cf..7f772dc96 100644 --- a/crates/tako/src/internal/worker/state.rs +++ b/crates/tako/src/internal/worker/state.rs @@ -228,9 +228,9 @@ impl WorkerState { self.running_tasks.insert(task_id); } - pub fn finish_task(&mut self, task_id: TaskId, size: u64) { + pub fn finish_task(&mut self, task_id: TaskId) { self.remove_task(task_id, true); - let message = FromWorkerMessage::TaskFinished(TaskFinishedMsg { id: task_id, size }); + let message = FromWorkerMessage::TaskFinished(TaskFinishedMsg { id: task_id }); self.comm.send_message_to_server(message); } diff --git a/tests/pyapi/test_dependencies.py b/tests/pyapi/test_dependencies.py index 90be69e3d..c9625c9d8 100644 --- a/tests/pyapi/test_dependencies.py +++ b/tests/pyapi/test_dependencies.py @@ -2,6 +2,8 @@ from ..utils import wait_for_job_state from ..utils.cmd import bash from . import prepare_job_client +from hyperqueue import Client, Job +import time def test_single_dep(hq_env: HqEnv): @@ -38,3 +40,20 @@ def test_dep_failed(hq_env: HqEnv): assert table.get_row_value("1") == "CANCELED" assert table.get_row_value("2") == "CANCELED" assert table.get_row_value("3") == "FINISHED" + + +def test_kill_worker_with_deps(hq_env: HqEnv): + hq_env.start_server() + hq_env.start_worker(cpus="4") + hq_env.start_worker(cpus="4") + client = Client(hq_env.server_dir) + + job = Job() + jobs = [job.program(bash("sleep 1")) for _ in range(16)] + job.program(bash("sleep 1"), deps=jobs) + submitted_job = client.submit(job) + wait_for_job_state(hq_env, submitted_job.id, "RUNNING") + hq_env.kill_worker(1) + hq_env.kill_worker(2) + time.sleep(2.0) + wait_for_job_state(hq_env, submitted_job.id, "WAITING")