From 917679079376345e202b016f69e188e186fa83e2 Mon Sep 17 00:00:00 2001 From: Petros Angelatos Date: Fri, 21 Apr 2023 12:50:29 +0200 Subject: [PATCH] track scheduling frontier This PR introduces a scheduling frontier which is a frontier that tracks the dataflows that can be scheduled in the cluster. This is accomplished using a timestamp that can be one of two variants: * An `Installed(id)` variant, which represents the fact that `id` might be scheduled. * A `Future(lower)` variant, which represents the fact that dataflows with `id >= lower` might be scheduled. Each worker initializes a local `MutableAntichain` with `num_worker` copies of `Future(0)` elements (the minimum) which represents the fact that any worker might schedule any id. A channel is allocated through the allocator so that changes to that local view of the frontier can be communicated between all the workers. When a dataflow is installed in a worker `Installed(id)` and `Future(id+1)` elements are inserted into the frontier and a `Future(id)` element is retracted. When a dataflow naturally terminates an `Installed(id)` element is retracted from the frontier. When a dataflow is dropped through `drop_dataflow` is is first moved into a frozen dataflow list but all its resources are maintained. Then `Installed(id)` is retracted from the frontier to let the other workers know that it will not be scheduled anymore. Is is ensured that only one of the two paths above are taken to ensure that we only retract a `Installed(id)` element once. Eventually all workers will retract their copies of a `Dataflow(id)`, either because it was frozen or because it naturally terminated. When this happens `id` stops being beyond the global scheduling frontier and at that moment each worker is free to drop the dataflow and its associated resources since no worker will scheduled it again. Co-authored-by: Jan Teske Signed-off-by: Petros Angelatos --- timely/src/worker.rs | 173 ++++++++++++++++++++++++++++++++++++++----- 1 file changed, 154 insertions(+), 19 deletions(-) diff --git a/timely/src/worker.rs b/timely/src/worker.rs index 153f5c775..21b5c021b 100644 --- a/timely/src/worker.rs +++ b/timely/src/worker.rs @@ -2,6 +2,7 @@ use std::rc::Rc; use std::cell::{RefCell, RefMut}; +use std::cmp::Ordering; use std::any::Any; use std::str::FromStr; use std::time::{Instant, Duration}; @@ -13,10 +14,14 @@ use crate::communication::{Allocate, Data, Push, Pull}; use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller}; use crate::scheduling::{Schedule, Scheduler, Activations}; use crate::progress::timestamp::{Refines}; -use crate::progress::SubgraphBuilder; +use crate::progress::{ChangeBatch, SubgraphBuilder}; use crate::progress::operate::Operate; +use crate::progress::frontier::{AntichainRef, MutableAntichain}; use crate::dataflow::scopes::Child; use crate::logging::TimelyLogger; +use crate::order::PartialOrder; + +const SCHEDULING_CHANNEL: usize = 0; /// Different ways in which timely's progress tracking can work. /// @@ -216,12 +221,16 @@ pub struct Worker { identifiers: Rc>, // dataflows: Rc>>, dataflows: Rc>>, + frozen_dataflows: Rc>>, + scheduling_frontier: Rc>, + dataflow_counter: Rc>, logging: Rc>>, activations: Rc>, active_dataflows: Vec, + // Temporary storage for channel identifiers during dataflow construction. // These are then associated with a dataflow once constructed. temp_channel_ids: Rc>>, @@ -260,16 +269,21 @@ impl Scheduler for Worker { impl Worker { /// Allocates a new `Worker` bound to a channel allocator. - pub fn new(config: Config, c: A) -> Worker { + pub fn new(config: Config, mut c: A) -> Worker { let now = Instant::now(); let index = c.index(); + + let scheduling_frontier = SchedulingFrontier::new(&mut c); + Worker { config, timer: now, paths: Default::default(), allocator: Rc::new(RefCell::new(c)), - identifiers: Default::default(), + identifiers: Rc::new(RefCell::new(SCHEDULING_CHANNEL + 1)), dataflows: Default::default(), + frozen_dataflows: Default::default(), + scheduling_frontier: Rc::new(RefCell::new(scheduling_frontier)), dataflow_counter: Default::default(), logging: Rc::new(RefCell::new(crate::logging_core::Registry::new(now, index))), activations: Rc::new(RefCell::new(Activations::new(now))), @@ -332,6 +346,7 @@ impl Worker { /// ``` pub fn step_or_park(&mut self, duration: Option) -> bool { + let mut activate_scheduling_frontier = false; { // Process channel events. Activate responders. let mut allocator = self.allocator.borrow_mut(); allocator.receive(); @@ -345,13 +360,18 @@ impl Worker { // on the basis of non-empty channels. // TODO: This is a sloppy way to deal // with channels that may not be alloc'd. - if let Some(path) = paths.get(&channel) { + if channel == SCHEDULING_CHANNEL { + activate_scheduling_frontier = true; + } else if let Some(path) = paths.get(&channel) { self.activations .borrow_mut() .activate(&path[..]); } } } + if activate_scheduling_frontier { + self.scheduling_frontier.borrow_mut().step(); + } // Organize activations. self.activations @@ -400,15 +420,35 @@ impl Worker { paths.remove(&channel); } entry.remove_entry(); + self.scheduling_frontier.borrow_mut().update([(DataflowId::Installed(index), -1)]); } } } } + // Drop all frozen dataflows that are not beyond the scheduling frontier + { + let mut paths = self.paths.borrow_mut(); + let mut frozen_dataflows = self.frozen_dataflows.borrow_mut(); + let scheduling_frontier = self.scheduling_frontier.borrow(); + frozen_dataflows.retain(|id, dataflow| { + if !scheduling_frontier.frontier().less_equal(&DataflowId::Installed(*id)) { + // Garbage collect channel_id to path information. + for channel in dataflow.channel_ids.drain(..) { + paths.remove(&channel); + } + false + } else { + true + } + }); + } + + // Clean up, indicate if dataflows remain. self.logging.borrow_mut().flush(); self.allocator.borrow_mut().release(); - !self.dataflows.borrow().is_empty() + !self.dataflows.borrow().is_empty() || !self.frozen_dataflows.borrow().is_empty() } /// Calls `self.step()` as long as `func` evaluates to true. @@ -671,17 +711,17 @@ impl Worker { /// Drops an identified dataflow. /// - /// This method removes the identified dataflow, which will no longer be scheduled. - /// Various other resources will be cleaned up, though the method is currently in - /// public beta rather than expected to work. Please report all crashes and unmet - /// expectations! - pub fn drop_dataflow(&mut self, dataflow_identifier: usize) { - if let Some(mut entry) = self.dataflows.borrow_mut().remove(&dataflow_identifier) { - // Garbage collect channel_id to path information. - let mut paths = self.paths.borrow_mut(); - for channel in entry.channel_ids.drain(..) { - paths.remove(&channel); - } + /// This method immediately stops scheduling the the identified dataflow. Once all other + /// workers also stop scheduling the identified dataflow, due to an explicit drop or due to + /// graceful termination, the worker will proceed with dropping any resources held by this + /// dataflow. + /// + /// If the identified dataflow is in the process of being constructed, this function is a + /// no-op. + pub fn drop_dataflow(&mut self, id: usize) { + if let Some(entry) = self.dataflows.borrow_mut().remove(&id) { + self.frozen_dataflows.borrow_mut().insert(id, entry); + self.scheduling_frontier.borrow_mut().update([(DataflowId::Installed(id), -1)]); } } @@ -693,20 +733,27 @@ impl Worker { *self.dataflow_counter.borrow() } - /// List the current dataflow indices. + /// List the current dataflow indices that may be scheduled. pub fn installed_dataflows(&self) -> Vec { self.dataflows.borrow().keys().cloned().collect() } /// True if there is at least one dataflow under management. pub fn has_dataflows(&self) -> bool { - !self.dataflows.borrow().is_empty() + !self.dataflows.borrow().is_empty() || !self.frozen_dataflows.borrow().is_empty() } // Acquire a new distinct dataflow identifier. fn allocate_dataflow_index(&mut self) -> usize { *self.dataflow_counter.borrow_mut() += 1; - *self.dataflow_counter.borrow() - 1 + let new_id = *self.dataflow_counter.borrow() - 1; + + self.scheduling_frontier.borrow_mut().update([ + (DataflowId::Installed(new_id), 1), + (DataflowId::Future(new_id + 1), 1), + (DataflowId::Future(new_id), -1) + ]); + new_id } } @@ -721,6 +768,8 @@ impl Clone for Worker { allocator: self.allocator.clone(), identifiers: self.identifiers.clone(), dataflows: self.dataflows.clone(), + frozen_dataflows: self.frozen_dataflows.clone(), + scheduling_frontier: self.scheduling_frontier.clone(), dataflow_counter: self.dataflow_counter.clone(), logging: self.logging.clone(), activations: self.activations.clone(), @@ -776,3 +825,89 @@ impl Drop for Wrapper { self.resources = None; } } + + +#[derive(Debug, Eq, PartialEq, Clone, Abomonation, Serialize, Deserialize)] +/// A partial order representing the scheduling frontier. +enum DataflowId { + /// A lower bound on identifiers of future dataflows. + Future(usize), + /// An installed dataflow identifier. + Installed(usize), +} + +// A manual implementation of Ord to ensure that it is compatible with the PartialOrder below +impl Ord for DataflowId { + fn cmp(&self, other: &Self) -> Ordering { + match (self, other) { + (Self::Installed(this), Self::Installed(other)) => this.cmp(other), + (Self::Future(this), Self::Future(other)) => this.cmp(other), + (Self::Future(_), Self::Installed(_)) => Ordering::Less, + (Self::Installed(_), Self::Future(_)) => Ordering::Greater, + } + } +} +impl PartialOrd for DataflowId { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Default for DataflowId { + fn default() -> Self { + Self::Future(0) + } +} + +impl PartialOrder for DataflowId { + fn less_equal(&self, other: &Self) -> bool { + match (self, other) { + (Self::Future(lower), Self::Installed(id)) => lower <= id, + (Self::Installed(this), Self::Installed(other)) => this == other, + (Self::Future(this), Self::Future(other)) => this <= other, + (Self::Installed(_), Self::Future(_)) => false, + } + } +} + +/// Keeps track of the scheduling frontier and broadcasts any local updates to it to the other +/// workers. +struct SchedulingFrontier { + frontier: MutableAntichain, + pushers: Vec>>>>, + puller: Box>>>, +} + +impl SchedulingFrontier { + fn new(alloc: &mut A) -> Self { + let (pushers, puller) = alloc.allocate(SCHEDULING_CHANNEL); + let mut frontier = MutableAntichain::new(); + frontier.update_iter([(DataflowId::default(), alloc.peers() as i64)]); + Self { + frontier, + pushers, + puller, + } + } + + fn frontier(&self) -> AntichainRef<'_, DataflowId> { + self.frontier.frontier() + } + + fn step(&mut self) { + // TODO: reuse allocations + while let Some(mut msg) = self.puller.recv() { + self.frontier.update_iter(msg.as_mut().iter().cloned()); + } + } + + fn update>(&mut self, iter: I) { + let mut change_batch = ChangeBatch::new(); + change_batch.extend(iter.into_iter()); + // TODO: reduce clones and consolidate updates into one push + for pusher in self.pushers.iter_mut() { + pusher.send(Message::from_typed(change_batch.clone())); + pusher.done(); + } + } +}