Skip to content

Commit

Permalink
Removed obsole part of scheduler that caused a crash
Browse files Browse the repository at this point in the history
  • Loading branch information
spirali committed Feb 9, 2024
1 parent 2771807 commit b405aed
Show file tree
Hide file tree
Showing 15 changed files with 47 additions and 314 deletions.
5 changes: 4 additions & 1 deletion benchmarks/src/monitoring/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,10 @@ def record_processes(processes: List[psutil.Process]) -> Dict[str, ProcessRecord
children_system=cpu_times.children_system,
)
data[str(process.pid)] = ProcessRecord(
rss=memory_info.rss, vm=memory_info.vms, cpu=cpu_utilization, cpu_times=cpu_times
rss=memory_info.rss,
vm=memory_info.vms,
cpu=cpu_utilization,
cpu_times=cpu_times,
)
except BaseException as e:
logging.error(e)
Expand Down
4 changes: 0 additions & 4 deletions crates/tako/src/internal/messages/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,6 @@ pub enum ToWorkerMessage {
#[derive(Deserialize, Serialize, Debug)]
pub struct TaskFinishedMsg {
pub id: TaskId,
pub size: u64,
/*#[serde(with = "serde_bytes")]
pub r#type: Vec<u8>,*/
}

#[derive(Deserialize, Serialize, Debug)]
Expand Down Expand Up @@ -141,7 +138,6 @@ pub enum FromWorkerMessage {
TaskFinished(TaskFinishedMsg),
TaskFailed(TaskFailedMsg),
TaskRunning(TaskRunningMsg),
DataDownloaded(DataDownloadedMsg),
StealResponse(StealResponseMsg),
Overview(WorkerOverview),
Heartbeat,
Expand Down
1 change: 0 additions & 1 deletion crates/tako/src/internal/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,3 @@ pub mod metrics;
pub mod multinode;
pub(crate) mod query;
pub mod state;
mod utils;
38 changes: 5 additions & 33 deletions crates/tako/src/internal/scheduler/state.rs
Original file line number Diff line number Diff line change
@@ -1,25 +1,22 @@
use std::cmp::{Ordering, Reverse};
use std::cmp::Reverse;
use std::rc::Rc;
use std::time::{Duration, Instant};

use tokio::sync::Notify;
use tokio::time::sleep;

//use crate::internal::common::trace::trace_time;
use crate::internal::common::Map;
use crate::internal::messages::worker::{TaskIdsMsg, ToWorkerMessage};
use crate::internal::scheduler::multinode::MultiNodeAllocator;
use crate::internal::server::comm::{Comm, CommSenderRef};
use crate::internal::server::core::{Core, CoreRef};
use crate::internal::server::task::{Task, TaskRuntimeState};
use crate::internal::server::taskmap::TaskMap;
use crate::internal::server::worker::Worker;
use crate::internal::server::workerload::ResourceRequestLowerBound;
use crate::internal::server::workermap::WorkerMap;
use crate::{TaskId, WorkerId};

use super::metrics::compute_b_level_metric;
use super::utils::task_transfer_cost;

// Long duration - 1 year
const LONG_DURATION: std::time::Duration = std::time::Duration::from_secs(365 * 24 * 60 * 60);
Expand Down Expand Up @@ -98,7 +95,6 @@ impl SchedulerState {
fn choose_worker_for_task(
&mut self,
task: &Task,
taskmap: &TaskMap,
worker_map: &Map<WorkerId, Worker>,
try_prev_worker: bool, // Enable heuristics that tries to fit tasks on fewer workers
) -> Option<WorkerId> {
Expand All @@ -119,22 +115,11 @@ impl SchedulerState {

self.tmp_workers.clear(); // This has to be called AFTER fast path

let mut costs = u64::MAX;
for worker in worker_map.values() {
if !worker.is_capable_to_run_rqv(&task.configuration.resources, self.now) {
continue;
}

let c = task_transfer_cost(taskmap, task, worker.id);
match c.cmp(&costs) {
Ordering::Less => {
costs = c;
self.tmp_workers.clear();
self.tmp_workers.push(worker.id);
}
Ordering::Equal => self.tmp_workers.push(worker.id),
Ordering::Greater => { /* Do nothing */ }
}
self.tmp_workers.push(worker.id)
}
self.pick_worker()
}
Expand Down Expand Up @@ -251,7 +236,7 @@ impl SchedulerState {
}

pub fn assign(&mut self, core: &mut Core, task_id: TaskId, worker_id: WorkerId) {
let (inputs, assigned_worker) = {
{
let (tasks, workers) = core.split_tasks_workers_mut();
let task = tasks.get_task(task_id);
let assigned_worker = task.get_assigned_worker();
Expand All @@ -274,14 +259,6 @@ impl SchedulerState {
(task.inputs.clone(), assigned_worker)
};

for ti in inputs.into_iter() {
let input = core.get_task_mut(ti.task());
if let Some(wr) = assigned_worker {
input.remove_future_placement(wr);
}
input.set_future_placement(worker_id);
}

let (tasks, workers) = core.split_tasks_workers_mut();
let task = tasks.get_task_mut(task_id);
workers.get_worker_mut(worker_id).insert_sn_task(task);
Expand Down Expand Up @@ -395,12 +372,7 @@ impl SchedulerState {
}

if let Some(task) = core.find_task(task_id) {
self.choose_worker_for_task(
task,
core.task_map(),
core.get_worker_map(),
try_prev_worker,
)
self.choose_worker_for_task(task, core.get_worker_map(), try_prev_worker)
} else {
continue;
}
Expand Down Expand Up @@ -491,7 +463,7 @@ impl SchedulerState {
// and tN is the lowest cost to schedule here
ts.sort_by_cached_key(|&task_id| {
let task = task_map.get_task(task_id);
let mut cost = task_transfer_cost(task_map, task, worker.id);
let mut cost = 0; // TODO: Transfer costs
if !task.is_fresh() && task.get_assigned_worker() != Some(worker.id) {
cost += 10_000_000;
}
Expand Down
22 changes: 0 additions & 22 deletions crates/tako/src/internal/scheduler/utils.rs

This file was deleted.

2 changes: 1 addition & 1 deletion crates/tako/src/internal/server/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ fn handle_new_tasks(
.shared_data
.into_iter()
.map(|c| {
assert!(c.n_outputs == 0 || c.n_outputs == 1); // TODO: Implementation for more outputs
assert_eq!(c.n_outputs, 0); // TODO: Implementation for more outputs
let keep = c.keep;
let observe = c.observe;
(Rc::new(create_task_configuration(core, c)), keep, observe)
Expand Down
37 changes: 4 additions & 33 deletions crates/tako/src/internal/server/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use crate::internal::messages::worker::{
};
use crate::internal::server::comm::Comm;
use crate::internal::server::core::Core;
use crate::internal::server::task::{DataInfo, Task, TaskRuntimeState};
use crate::internal::server::task::{FinishInfo, WaitingInfo};
use crate::internal::server::task::{Task, TaskRuntimeState};
use crate::internal::server::worker::Worker;
use crate::internal::server::workermap::WorkerMap;
use crate::{TaskId, WorkerId};
Expand Down Expand Up @@ -84,13 +84,7 @@ pub(crate) fn on_remove_worker(
continue;
}
}
TaskRuntimeState::Finished(finfo) => {
finfo.future_placement.remove(&worker_id);
finfo.placement.remove(&worker_id);
if finfo.placement.is_empty() {
todo!();
// We have lost last worker that have this data
}
TaskRuntimeState::Finished(_finfo) => {
continue;
}
TaskRuntimeState::RunningMultiNode(ws) => {
Expand Down Expand Up @@ -306,11 +300,7 @@ pub(crate) fn on_task_finished(
placement.insert(worker_id);
}

task.state = TaskRuntimeState::Finished(FinishInfo {
data_info: DataInfo { size: msg.size },
placement,
future_placement: Default::default(),
});
task.state = TaskRuntimeState::Finished(FinishInfo {});
comm.ask_for_scheduling();

if task.is_observed() {
Expand Down Expand Up @@ -601,25 +591,6 @@ pub(crate) fn on_cancel_tasks(
(to_unregister.into_iter().collect(), already_finished)
}

pub(crate) fn on_tasks_transferred(core: &mut Core, worker_id: WorkerId, task_id: TaskId) {
log::debug!("Task id={} transferred to worker={}", task_id, worker_id);
// TODO handle the race when task is removed from server before this message arrives
if let Some(task) = core.find_task_mut(task_id) {
match &mut task.state {
TaskRuntimeState::Finished(ref mut winfo) => {
winfo.placement.insert(worker_id);
}
TaskRuntimeState::Waiting(_)
| TaskRuntimeState::Running { .. }
| TaskRuntimeState::RunningMultiNode(_)
| TaskRuntimeState::Assigned(_)
| TaskRuntimeState::Stealing(_, _) => {
panic!("Invalid task state");
}
};
}
}

fn unregister_as_consumer(core: &mut Core, comm: &mut impl Comm, task_id: TaskId) {
let inputs: Vec<TaskId> = core
.get_task(task_id)
Expand All @@ -640,7 +611,7 @@ fn remove_task_if_possible(core: &mut Core, _comm: &mut impl Comm, task_id: Task
}

match core.remove_task(task_id) {
TaskRuntimeState::Finished(finfo) => finfo.placement,
TaskRuntimeState::Finished(_finfo) => { /* Ok */ }
_ => unreachable!(),
};
log::debug!("Task id={task_id} is no longer needed");
Expand Down
5 changes: 1 addition & 4 deletions crates/tako/src/internal/server/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use crate::internal::server::comm::{Comm, CommSenderRef};
use crate::internal::server::core::CoreRef;
use crate::internal::server::reactor::{
on_new_worker, on_remove_worker, on_steal_response, on_task_error, on_task_finished,
on_task_running, on_tasks_transferred,
on_task_running,
};
use crate::internal::server::worker::Worker;
use crate::internal::transfer::auth::{
Expand Down Expand Up @@ -275,9 +275,6 @@ pub(crate) async fn worker_receive_loop<
FromWorkerMessage::TaskFailed(msg) => {
on_task_error(&mut core, &mut *comm, worker_id, msg.id, msg.info);
}
FromWorkerMessage::DataDownloaded(msg) => {
on_tasks_transferred(&mut core, worker_id, msg.id)
}
FromWorkerMessage::StealResponse(msg) => {
on_steal_response(&mut core, &mut *comm, worker_id, msg)
}
Expand Down
58 changes: 2 additions & 56 deletions crates/tako/src/internal/server/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,31 +4,21 @@ use std::time::Duration;
use thin_vec::ThinVec;

use crate::internal::common::stablemap::ExtractKey;
use crate::internal::common::{Map, Set};
use crate::internal::common::Set;
use crate::internal::messages::worker::{ComputeTaskMsg, ToWorkerMessage};
use crate::internal::server::taskmap::TaskMap;
use crate::WorkerId;
use crate::{static_assert_size, TaskId};
use crate::{InstanceId, Priority};

#[derive(Debug)]
#[cfg_attr(test, derive(Eq, PartialEq))]
pub struct DataInfo {
pub size: u64,
}

#[cfg_attr(test, derive(Eq, PartialEq))]
pub struct WaitingInfo {
pub unfinished_deps: u32,
// pub scheduler_metric: i32,
}

#[cfg_attr(test, derive(Eq, PartialEq))]
pub struct FinishInfo {
pub data_info: DataInfo,
pub placement: Set<WorkerId>,
pub future_placement: Map<WorkerId, u32>,
}
pub struct FinishInfo {}

#[cfg_attr(test, derive(Eq, PartialEq))]
pub enum TaskRuntimeState {
Expand Down Expand Up @@ -350,14 +340,6 @@ impl Task {
)
}

#[inline]
pub(crate) fn data_info(&self) -> Option<&FinishInfo> {
match &self.state {
TaskRuntimeState::Finished(finfo) => Some(finfo),
_ => None,
}
}

#[inline]
pub(crate) fn get_assigned_worker(&self) -> Option<WorkerId> {
match &self.state {
Expand All @@ -371,42 +353,6 @@ impl Task {
}
}

#[cfg(test)]
#[inline]
pub(crate) fn get_placement(&self) -> Option<&Set<WorkerId>> {
match &self.state {
TaskRuntimeState::Finished(finfo) => Some(&finfo.placement),
_ => None,
}
}

pub(crate) fn remove_future_placement(&mut self, worker_id: WorkerId) {
match &mut self.state {
TaskRuntimeState::Finished(finfo) => {
let count = finfo.future_placement.get_mut(&worker_id).unwrap();
if *count <= 1 {
assert_ne!(*count, 0);
finfo.future_placement.remove(&worker_id);
} else {
*count -= 1;
}
}
_ => {
unreachable!()
}
}
}

#[inline]
pub(crate) fn set_future_placement(&mut self, worker_id: WorkerId) {
match self.state {
TaskRuntimeState::Finished(ref mut finfo) => {
(*finfo.future_placement.entry(worker_id).or_insert(0)) += 1;
}
_ => unreachable!(),
}
}

#[inline]
pub(crate) fn get_scheduler_priority(&self) -> i32 {
/*match self.state {
Expand Down
Loading

0 comments on commit b405aed

Please sign in to comment.