Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Removed some old code related to data deps #782

Merged
merged 1 commit into from
Oct 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ anyhow = "1"
nix = { version = "0.29", features = ["process", "signal"] }
bstr = { version = "1.9", features = ["serde"] }
psutil = "3"
thin-vec = { version = "0.2", features = ["serde"] }


[profile.release]
panic = "abort"
Expand Down
1 change: 1 addition & 0 deletions crates/hyperqueue/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ nix = { workspace = true }
bstr = { workspace = true }
psutil = { workspace = true }
byteorder = { workspace = true }
thin-vec = { workspace = true }

humantime = "2"
num_cpus = "1"
Expand Down
3 changes: 2 additions & 1 deletion crates/hyperqueue/src/server/client/submit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use std::time::Duration;
use bstr::BString;
use tako::Map;
use tako::Set;
use thin_vec::ThinVec;

use tako::gateway::{
FromGatewayMessage, NewTasksMessage, ResourceRequestVariants, SharedTaskConfiguration,
Expand Down Expand Up @@ -272,7 +273,7 @@ fn build_tasks_array(
let build_task_conf = |body: Box<[u8]>, tako_id: TakoTaskId| TaskConfiguration {
id: tako_id,
shared_data_index: 0,
task_deps: Vec::new(),
task_deps: ThinVec::new(),
body,
};

Expand Down
3 changes: 2 additions & 1 deletion crates/tako/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ tracing = { workspace = true }
nix = { workspace = true }
bstr = { workspace = true }
psutil = { workspace = true }
thin-vec = { workspace = true }


hashbrown = { version = "0.15", features = ["serde", "inline-more"], default-features = false }
tracing-subscriber = { version = "0.3", features = ["json"] }
priority-queue = "2"
bitflags = "2"
fxhash = "0.2"
thin-vec = "0.2"
derive_more = { version = "1", features = ["add", "add_assign", "sum"] }

[dev-dependencies]
Expand Down
3 changes: 2 additions & 1 deletion crates/tako/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use crate::task::SerializedTaskContext;
use crate::{InstanceId, Map, Priority, TaskId, WorkerId};
use smallvec::{smallvec, SmallVec};
use std::time::Duration;
use thin_vec::ThinVec;

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq, Hash)]
pub struct ResourceRequestEntry {
Expand Down Expand Up @@ -116,7 +117,7 @@ pub struct TaskConfiguration {
/// Index into NewTasksMessage::shared_data that contains the shared data for this task.
pub shared_data_index: u32,

pub task_deps: Vec<TaskId>,
pub task_deps: ThinVec<TaskId>,

/// Opaque data that is passed by the gateway user to task launchers.
#[serde(with = "serde_bytes")]
Expand Down
7 changes: 3 additions & 4 deletions crates/tako/src/internal/scheduler/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,13 @@ fn crawl<F1: Fn(&Task) -> &Set<TaskId>>(tasks: &mut TaskMap, predecessor_fn: F1)
let task = tasks.get_task_mut(task_id);
task.set_scheduler_priority(level + 1);

for ti in task.inputs.iter() {
let input_id = ti.task();
for t in task.task_deps.iter() {
let v: &mut u32 = neighbours
.get_mut(&input_id)
.get_mut(t)
.expect("Couldn't find task neighbour in level computation");
if *v <= 1 {
assert_eq!(*v, 1);
stack.push(input_id);
stack.push(*t);
} else {
*v -= 1;
}
Expand Down
6 changes: 3 additions & 3 deletions crates/tako/src/internal/scheduler/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ impl SchedulerState {
try_prev_worker: bool, // Enable heuristics that tries to fit tasks on fewer workers
) -> Option<WorkerId> {
// Fast path
if try_prev_worker && task.inputs.is_empty() {
if try_prev_worker && task.task_deps.is_empty() {
// Note: We are *not* using "is_capable_to_run" but "have_immediate_resources_for_rq",
// because we want to enable fast path only if task can be directly executed
// We want to avoid creation of overloaded
Expand Down Expand Up @@ -256,7 +256,7 @@ impl SchedulerState {
worker_id
);
}
(task.inputs.clone(), assigned_worker)
(task.task_deps.clone(), assigned_worker)
};

let (tasks, workers) = core.split_tasks_workers_mut();
Expand Down Expand Up @@ -419,7 +419,7 @@ impl SchedulerState {
let task = tasks.get_task_mut(task_id);
if task.is_sn_running()
|| (not_overloaded
&& (task.is_fresh() || !task.inputs.is_empty())
&& (task.is_fresh() || !task.task_deps.is_empty())
&& worker.has_time_to_run_for_rqv(&task.configuration.resources, now))
{
continue;
Expand Down
58 changes: 2 additions & 56 deletions crates/tako/src/internal/server/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,57 +11,8 @@ use crate::internal::scheduler::query::compute_new_worker_query;
use crate::internal::server::comm::{Comm, CommSender, CommSenderRef};
use crate::internal::server::core::{Core, CoreRef};
use crate::internal::server::reactor::{on_cancel_tasks, on_new_tasks};
use crate::internal::server::task::{Task, TaskConfiguration, TaskInput, TaskRuntimeState};
use crate::internal::server::task::{Task, TaskConfiguration, TaskRuntimeState};
use std::rc::Rc;
use thin_vec::ThinVec;

/*pub(crate) async fn client_connection_handler(
core_ref: CoreRef,
comm_ref: CommSenderRef,
listener: UnixListener,
client_sender: UnboundedSender<ToGatewayMessage>,
client_receiver: UnboundedReceiver<ToGatewayMessage>,
) {
if let Ok((stream, _)) = listener.accept().await {
let framed = make_protocol_builder().new_framed(stream);
let (sender, mut receiver) = framed.split();
let send_loop = forward_queue_to_sink(client_receiver, sender, |msg| {
rmp_serde::to_vec_named(&msg).unwrap().into()
});
{
let core = core_ref.get();
let mut comm = comm_ref.get_mut();
for worker in core.get_workers() {
comm.send_client_worker_new(worker.id, &worker.configuration);
}
}
let receive_loop = async move {
while let Some(data) = receiver.next().await {
// TODO: Instead of unwrap, send error message to client
let data = data.unwrap();
let message: Result<FromGatewayMessage, _> = rmp_serde::from_slice(&data);
let error = match message {
Ok(message) => {
process_client_message(&core_ref, &comm_ref, &client_sender, message).await
}
Err(error) => Some(format!("Invalid format of message: {}", error)),
};
if let Some(message) = error {
client_sender
.send(ToGatewayMessage::Error(ErrorResponse { message }))
.unwrap();
}
}
};
tokio::select! {
r = send_loop => { r.unwrap() },
() = receive_loop => {},
}
} else {
panic!("Invalid connection from client");
}
log::info!("Client connection terminated");
}*/

fn create_task_configuration(
core_ref: &mut Core,
Expand Down Expand Up @@ -238,12 +189,7 @@ fn handle_new_tasks(
return Some(format!("Invalid configuration index {idx}"));
}
let conf = &configurations[idx];
let inputs: ThinVec<_> = task
.task_deps
.iter()
.map(|&task_id| TaskInput::new_task_dependency(task_id))
.collect();
let task = Task::new(task.id, inputs, conf.clone(), task.body);
let task = Task::new(task.id, task.task_deps, conf.clone(), task.body);
tasks.push(task);
}
if !msg.adjust_instance_id.is_empty() {
Expand Down
12 changes: 6 additions & 6 deletions crates/tako/src/internal/server/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,8 @@ impl Core {
#[cfg(test)]
pub fn sanity_check(&self) {
let fw_check = |task: &Task| {
for input in &task.inputs {
assert!(self.tasks.get_task(input.task()).is_finished());
for task_dep in &task.task_deps {
assert!(self.tasks.get_task(*task_dep).is_finished());
}
for &task_id in task.get_consumers() {
assert!(self.tasks.get_task(task_id).is_waiting());
Expand Down Expand Up @@ -398,8 +398,8 @@ impl Core {
match &task.state {
TaskRuntimeState::Waiting(winfo) => {
let mut count = 0;
for ti in &task.inputs {
if !self.tasks.get_task(ti.task()).is_finished() {
for task_dep in &task.task_deps {
if !self.tasks.get_task(*task_dep).is_finished() {
count += 1;
}
}
Expand All @@ -425,8 +425,8 @@ impl Core {
}

TaskRuntimeState::Finished(_) => {
for ti in &task.inputs {
assert!(self.tasks.get_task(ti.task()).is_finished());
for task_dep in &task.task_deps {
assert!(self.tasks.get_task(*task_dep).is_finished());
}
}
TaskRuntimeState::RunningMultiNode(ws) => {
Expand Down
14 changes: 3 additions & 11 deletions crates/tako/src/internal/server/reactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,11 +167,8 @@ pub(crate) fn on_new_tasks(core: &mut Core, comm: &mut impl Comm, new_tasks: Vec
let mut task = task_map.remove(&task_id).unwrap();

let mut count = 0;
for ti in task.inputs.iter() {
let input_id = ti.task();
let task_dep = task_map
.get_mut(&input_id)
.unwrap_or_else(|| core.get_task_mut(input_id));
for t in task.task_deps.iter() {
let task_dep = task_map.get_mut(t).unwrap_or_else(|| core.get_task_mut(*t));
task_dep.add_consumer(task.id);
if !task_dep.is_finished() {
count += 1
Expand Down Expand Up @@ -555,12 +552,7 @@ pub(crate) fn on_cancel_tasks(
}

fn unregister_as_consumer(core: &mut Core, comm: &mut impl Comm, task_id: TaskId) {
let inputs: Vec<TaskId> = core
.get_task(task_id)
.inputs
.iter()
.map(|ti| ti.task())
.collect();
let inputs: Vec<TaskId> = core.get_task(task_id).task_deps.iter().copied().collect();
for input_id in inputs {
let input = core.get_task_mut(input_id);
assert!(input.remove_consumer(task_id));
Expand Down
38 changes: 3 additions & 35 deletions crates/tako/src/internal/server/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,38 +54,6 @@ bitflags::bitflags! {
}
}

#[derive(Clone)]
#[cfg_attr(test, derive(Eq, PartialEq))]
pub struct TaskInput {
task: TaskId,
output_id: u32, // MAX = pure dependency on task, not real output id
}

impl TaskInput {
pub fn new(task: TaskId, output_id: u32) -> Self {
TaskInput { task, output_id }
}

pub fn new_task_dependency(task: TaskId) -> Self {
TaskInput {
task,
output_id: u32::MAX,
}
}

pub fn task(&self) -> TaskId {
self.task
}

pub fn output_id(&self) -> Option<u32> {
if self.output_id == u32::MAX {
None
} else {
Some(self.output_id)
}
}
}

#[derive(Debug)]
#[cfg_attr(test, derive(Eq, PartialEq))]
pub struct TaskConfiguration {
Expand All @@ -101,7 +69,7 @@ pub struct Task {
pub id: TaskId,
pub state: TaskRuntimeState,
consumers: Set<TaskId>,
pub inputs: ThinVec<TaskInput>,
pub task_deps: ThinVec<TaskId>,
pub flags: TaskFlags,
pub configuration: Rc<TaskConfiguration>,
pub scheduler_priority: Priority,
Expand All @@ -123,7 +91,7 @@ impl fmt::Debug for Task {
impl Task {
pub fn new(
id: TaskId,
inputs: ThinVec<TaskInput>,
dependencies: ThinVec<TaskId>,
configuration: Rc<TaskConfiguration>,
body: Box<[u8]>,
) -> Self {
Expand All @@ -134,7 +102,7 @@ impl Task {

Self {
id,
inputs,
task_deps: dependencies,
flags,
configuration,
body,
Expand Down
3 changes: 2 additions & 1 deletion crates/tako/src/internal/tests/integration/utils/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::time::Duration;
use crate::internal::common::index::ItemId;
use derive_builder::Builder;
use smallvec::smallvec;
use thin_vec::ThinVec;

use crate::gateway::{
ResourceRequest, ResourceRequestEntry, ResourceRequestVariants, SharedTaskConfiguration,
Expand Down Expand Up @@ -119,7 +120,7 @@ pub fn build_task_def_from_config(
TaskConfiguration {
id: TaskId::new(id.unwrap_or(1) as <TaskId as ItemId>::IdType),
shared_data_index: 0,
task_deps: Vec::new(),
task_deps: ThinVec::new(),
body: body.into_boxed_slice(),
},
conf,
Expand Down
Loading
Loading