From 101ef073d012b47f0d43a9ec02e9ac5e5157aee6 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Thu, 5 Sep 2024 15:46:41 -0400 Subject: [PATCH 1/2] Test for columnar reachability --- timely/Cargo.toml | 1 + timely/examples/event_driven.rs | 2 +- timely/src/progress/frontier.rs | 2 +- timely/src/progress/reachability.rs | 109 ++++++++++++++++++++++------ 4 files changed, 89 insertions(+), 25 deletions(-) diff --git a/timely/Cargo.toml b/timely/Cargo.toml index 5deb5fed8..b9cbaf6c7 100644 --- a/timely/Cargo.toml +++ b/timely/Cargo.toml @@ -28,6 +28,7 @@ timely_communication = { path = "../communication", version = "0.12", default-fe timely_container = { path = "../container", version = "0.12" } crossbeam-channel = "0.5.0" smallvec = { version = "1.13.2", features = ["serde", "const_generics"] } +columnar = { git = "https://github.com/frankmcsherry/columnar" } [dev-dependencies] # timely_sort="0.1.6" diff --git a/timely/examples/event_driven.rs b/timely/examples/event_driven.rs index ee3b52515..886d6b35b 100644 --- a/timely/examples/event_driven.rs +++ b/timely/examples/event_driven.rs @@ -34,7 +34,7 @@ fn main() { println!("{:?}\tdataflows built ({} x {})", timer.elapsed(), dataflows, length); - for round in 0 .. { + for round in 0 .. 10 { let dataflow = round % dataflows; if record { inputs[dataflow].send(()); diff --git a/timely/src/progress/frontier.rs b/timely/src/progress/frontier.rs index 82f2bbc8b..51473d8b3 100644 --- a/timely/src/progress/frontier.rs +++ b/timely/src/progress/frontier.rs @@ -255,7 +255,7 @@ impl Antichain { /// let mut frontier = Antichain::from_elem(2); /// assert_eq!(frontier.elements(), &[2]); ///``` - #[inline] pub fn elements(&self) -> &[T] { &self[..] } + #[inline] pub fn elements(&self) -> &Vec { &self.elements } /// Reveals the elements in the antichain. /// diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index 0380216c8..e75488cce 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -75,6 +75,9 @@ use std::collections::{BinaryHeap, HashMap, VecDeque}; use std::cmp::Reverse; +use columnar::{Columnar, Len, Index}; +use columnar::ColumnVec; + use crate::progress::Timestamp; use crate::progress::{Source, Target}; use crate::progress::ChangeBatch; @@ -84,6 +87,62 @@ use crate::progress::frontier::{Antichain, MutableAntichain}; use crate::progress::timestamp::PathSummary; +use vec_antichain::VecAntichain; + +/// A stand-in for `Vec>`. +mod vec_antichain { + + use columnar::{Columnar, Len, Index, IndexMut}; + use columnar::{ColumnVec, Slice}; + + use crate::progress::Antichain; + + #[derive(Clone, Debug)] + pub struct VecAntichain (ColumnVec); + + impl Default for VecAntichain { + fn default() -> Self { + Self (Default::default()) + } + } + + impl Len for VecAntichain { + #[inline(always)] fn len(&self) -> usize { self.0.len() } + } + + impl Index for VecAntichain { + type Index<'a> = Slice<&'a TC> where TC: 'a; + + #[inline(always)] + fn index(&self, index: usize) -> Self::Index<'_> { + self.0.index(index) + } + } + impl IndexMut for VecAntichain { + type IndexMut<'a> = Slice<&'a mut TC> where TC: 'a; + + #[inline(always)] + fn index_mut(&mut self, index: usize) -> Self::IndexMut<'_> { + self.0.index_mut(index) + } + } + + impl> Columnar> for VecAntichain { + #[inline(always)] + fn copy(&mut self, item: &Antichain) { + self.0.copy(item.elements()); + } + fn clear(&mut self) { + unimplemented!() + } + fn heap_size(&self) -> (usize, usize) { + unimplemented!() + } + } +} + + + /// A topology builder, which can summarize reachability along paths. /// /// A `Builder` takes descriptions of the nodes and edges in a graph, and compiles @@ -132,14 +191,14 @@ pub struct Builder { /// Indexed by operator index, then input port, then output port. This is the /// same format returned by `get_internal_summary`, as if we simply appended /// all of the summaries for the hosted nodes. - pub nodes: Vec>>>, + nodes: ColumnVec>>>, /// Direct connections from sources to targets. /// /// Edges do not affect timestamps, so we only need to know the connectivity. /// Indexed by operator index then output port. - pub edges: Vec>>, + edges: Vec>>, /// Numbers of inputs and outputs for each node. - pub shape: Vec<(usize, usize)>, + shape: Vec<(usize, usize)>, } impl Builder { @@ -147,7 +206,7 @@ impl Builder { /// Create a new empty topology builder. pub fn new() -> Self { Builder { - nodes: Vec::new(), + nodes: Default::default(), edges: Vec::new(), shape: Vec::new(), } @@ -155,20 +214,19 @@ impl Builder { /// Add links internal to operators. /// - /// This method overwrites any existing summary, instead of anything more sophisticated. + /// Nodes must be added in strictly increasing order of `index`. pub fn add_node(&mut self, index: usize, inputs: usize, outputs: usize, summary: Vec>>) { // Assert that all summaries exist. debug_assert_eq!(inputs, summary.len()); for x in summary.iter() { debug_assert_eq!(outputs, x.len()); } - while self.nodes.len() <= index { - self.nodes.push(Vec::new()); - self.edges.push(Vec::new()); - self.shape.push((0, 0)); - } + assert_eq!(self.nodes.len(), index); + + self.nodes.push(summary); + self.edges.push(Vec::new()); + self.shape.push((0, 0)); - self.nodes[index] = summary; if self.edges[index].len() != outputs { self.edges[index] = vec![Vec::new(); outputs]; } @@ -287,7 +345,7 @@ impl Builder { in_degree.entry(target).or_insert(0); for (output, summaries) in outputs.iter().enumerate() { let source = Location::new_source(index, output); - for summary in summaries.elements().iter() { + for summary in summaries.iter() { if summary == &Default::default() { *in_degree.entry(source).or_insert(0) += 1; } @@ -322,9 +380,9 @@ impl Builder { } }, Port::Target(port) => { - for (output, summaries) in self.nodes[node][port].iter().enumerate() { + for (output, summaries) in self.nodes.index(node).index(port).iter().enumerate() { let source = Location::new_source(node, output); - for summary in summaries.elements().iter() { + for summary in summaries.iter() { if summary == &Default::default() { *in_degree.get_mut(&source).unwrap() -= 1; if in_degree[&source] == 0 { @@ -361,12 +419,12 @@ pub struct Tracker { /// Indexed by operator index, then input port, then output port. This is the /// same format returned by `get_internal_summary`, as if we simply appended /// all of the summaries for the hosted nodes. - nodes: Vec>>>, + nodes: ColumnVec>>>, /// Direct connections from sources to targets. /// /// Edges do not affect timestamps, so we only need to know the connectivity. /// Indexed by operator index then output port. - edges: Vec>>, + edges: ColumnVec>>, // TODO: All of the sizes of these allocations are static (except internal to `ChangeBatch`). // It seems we should be able to flatten most of these so that there are a few allocations @@ -544,10 +602,15 @@ impl Tracker { let scope_outputs = builder.shape[0].0; let output_changes = vec![ChangeBatch::new(); scope_outputs]; + let mut edges: ColumnVec>> = Default::default(); + for edge in builder.edges { + edges.push(edge); + } + let tracker = Tracker { nodes: builder.nodes, - edges: builder.edges, + edges, per_operator, target_changes: ChangeBatch::new(), source_changes: ChangeBatch::new(), @@ -663,10 +726,10 @@ impl Tracker { .update_iter(Some((time, diff))); for (time, diff) in changes { - let nodes = &self.nodes[location.node][port_index]; + let nodes = &self.nodes.index(location.node).index(port_index); for (output_port, summaries) in nodes.iter().enumerate() { let source = Location { node: location.node, port: Port::Source(output_port) }; - for summary in summaries.elements().iter() { + for summary in summaries.iter() { if let Some(new_time) = summary.results_in(&time) { self.worklist.push(Reverse((new_time, source, diff))); } @@ -686,7 +749,7 @@ impl Tracker { .update_iter(Some((time, diff))); for (time, diff) in changes { - for new_target in self.edges[location.node][port_index].iter() { + for new_target in self.edges.index(location.node).index(port_index).iter() { self.worklist.push(Reverse(( time.clone(), Location::from(*new_target), @@ -738,7 +801,7 @@ impl Tracker { /// Graph locations may be missing from the output, in which case they have no /// paths to scope outputs. fn summarize_outputs( - nodes: &Vec>>>, + nodes: &ColumnVec>>>, edges: &Vec>>, ) -> HashMap>> { @@ -780,7 +843,7 @@ fn summarize_outputs( Port::Source(output_port) => { // Consider each input port of the associated operator. - for (input_port, summaries) in nodes[location.node].iter().enumerate() { + for (input_port, summaries) in nodes.index(location.node).iter().enumerate() { // Determine the current path summaries from the input port. let location = Location { node: location.node, port: Port::Target(input_port) }; @@ -792,7 +855,7 @@ fn summarize_outputs( while antichains.len() <= output { antichains.push(Antichain::new()); } // Combine each operator-internal summary to the output with `summary`. - for operator_summary in summaries[output_port].elements().iter() { + for operator_summary in summaries.index(output_port).iter() { if let Some(combined) = operator_summary.followed_by(&summary) { if antichains[output].insert(combined.clone()) { worklist.push_back((location, output, combined)); From f46d8a94f59f288c04f5b6b0ac22f47b2d1ffc8c Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Mon, 30 Sep 2024 20:03:10 -0400 Subject: [PATCH 2/2] rebase and update --- timely/src/progress/frontier.rs | 2 +- timely/src/progress/reachability.rs | 96 +++++++++++++---------------- 2 files changed, 43 insertions(+), 55 deletions(-) diff --git a/timely/src/progress/frontier.rs b/timely/src/progress/frontier.rs index 51473d8b3..82f2bbc8b 100644 --- a/timely/src/progress/frontier.rs +++ b/timely/src/progress/frontier.rs @@ -255,7 +255,7 @@ impl Antichain { /// let mut frontier = Antichain::from_elem(2); /// assert_eq!(frontier.elements(), &[2]); ///``` - #[inline] pub fn elements(&self) -> &Vec { &self.elements } + #[inline] pub fn elements(&self) -> &[T] { &self[..] } /// Reveals the elements in the antichain. /// diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index e75488cce..c223c448f 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -75,8 +75,8 @@ use std::collections::{BinaryHeap, HashMap, VecDeque}; use std::cmp::Reverse; -use columnar::{Columnar, Len, Index}; -use columnar::ColumnVec; +use columnar::{Len, Index}; +use columnar::Vecs; use crate::progress::Timestamp; use crate::progress::{Source, Target}; @@ -87,56 +87,43 @@ use crate::progress::frontier::{Antichain, MutableAntichain}; use crate::progress::timestamp::PathSummary; -use vec_antichain::VecAntichain; +use antichains::Antichains; /// A stand-in for `Vec>`. -mod vec_antichain { +mod antichains { - use columnar::{Columnar, Len, Index, IndexMut}; - use columnar::{ColumnVec, Slice}; + use columnar::{Len, Index, Push}; + use columnar::Vecs; use crate::progress::Antichain; #[derive(Clone, Debug)] - pub struct VecAntichain (ColumnVec); + pub struct Antichains (Vecs>); - impl Default for VecAntichain { + impl Default for Antichains { fn default() -> Self { Self (Default::default()) } } - impl Len for VecAntichain { + impl Len for Antichains { #[inline(always)] fn len(&self) -> usize { self.0.len() } } - impl Index for VecAntichain { - type Index<'a> = Slice<&'a TC> where TC: 'a; - + impl Push> for Antichains { #[inline(always)] - fn index(&self, index: usize) -> Self::Index<'_> { - self.0.index(index) + fn push(&mut self, item: Antichain) { + columnar::Push::extend(&mut self.0.values, item); + self.0.bounds.push(self.0.values.len()); } } - impl IndexMut for VecAntichain { - type IndexMut<'a> = Slice<&'a mut TC> where TC: 'a; - #[inline(always)] - fn index_mut(&mut self, index: usize) -> Self::IndexMut<'_> { - self.0.index_mut(index) - } - } + impl<'a, T> Index for &'a Antichains { + type Ref = <&'a Vecs> as Index>::Ref; - impl> Columnar> for VecAntichain { #[inline(always)] - fn copy(&mut self, item: &Antichain) { - self.0.copy(item.elements()); - } - fn clear(&mut self) { - unimplemented!() - } - fn heap_size(&self) -> (usize, usize) { - unimplemented!() + fn get(&self, index: usize) -> Self::Ref { + (&self.0).get(index) } } } @@ -191,7 +178,7 @@ pub struct Builder { /// Indexed by operator index, then input port, then output port. This is the /// same format returned by `get_internal_summary`, as if we simply appended /// all of the summaries for the hosted nodes. - nodes: ColumnVec>>>, + nodes: Vecs>>, /// Direct connections from sources to targets. /// /// Edges do not affect timestamps, so we only need to know the connectivity. @@ -223,6 +210,7 @@ impl Builder { assert_eq!(self.nodes.len(), index); + use columnar::Push; self.nodes.push(summary); self.edges.push(Vec::new()); self.shape.push((0, 0)); @@ -328,7 +316,7 @@ impl Builder { // Load edges as default summaries. for (index, ports) in self.edges.iter().enumerate() { - for (output, targets) in ports.iter().enumerate() { + for (output, targets) in (*ports).iter().enumerate() { let source = Location::new_source(index, output); in_degree.entry(source).or_insert(0); for &target in targets.iter() { @@ -339,13 +327,13 @@ impl Builder { } // Load default intra-node summaries. - for (index, summary) in self.nodes.iter().enumerate() { - for (input, outputs) in summary.iter().enumerate() { + for (index, summary) in (&self.nodes).into_iter().enumerate() { + for (input, outputs) in summary.into_iter().enumerate() { let target = Location::new_target(index, input); in_degree.entry(target).or_insert(0); - for (output, summaries) in outputs.iter().enumerate() { + for (output, summaries) in outputs.into_iter().enumerate() { let source = Location::new_source(index, output); - for summary in summaries.iter() { + for summary in summaries.into_iter() { if summary == &Default::default() { *in_degree.entry(source).or_insert(0) += 1; } @@ -380,9 +368,9 @@ impl Builder { } }, Port::Target(port) => { - for (output, summaries) in self.nodes.index(node).index(port).iter().enumerate() { + for (output, summaries) in (&self.nodes).get(node).get(port).into_iter().enumerate() { let source = Location::new_source(node, output); - for summary in summaries.iter() { + for summary in summaries.into_iter() { if summary == &Default::default() { *in_degree.get_mut(&source).unwrap() -= 1; if in_degree[&source] == 0 { @@ -419,12 +407,12 @@ pub struct Tracker { /// Indexed by operator index, then input port, then output port. This is the /// same format returned by `get_internal_summary`, as if we simply appended /// all of the summaries for the hosted nodes. - nodes: ColumnVec>>>, + nodes: Vecs>>, /// Direct connections from sources to targets. /// /// Edges do not affect timestamps, so we only need to know the connectivity. /// Indexed by operator index then output port. - edges: ColumnVec>>, + edges: Vecs>>, // TODO: All of the sizes of these allocations are static (except internal to `ChangeBatch`). // It seems we should be able to flatten most of these so that there are a few allocations @@ -602,7 +590,8 @@ impl Tracker { let scope_outputs = builder.shape[0].0; let output_changes = vec![ChangeBatch::new(); scope_outputs]; - let mut edges: ColumnVec>> = Default::default(); + use columnar::Push; + let mut edges: Vecs>> = Default::default(); for edge in builder.edges { edges.push(edge); } @@ -726,10 +715,10 @@ impl Tracker { .update_iter(Some((time, diff))); for (time, diff) in changes { - let nodes = &self.nodes.index(location.node).index(port_index); - for (output_port, summaries) in nodes.iter().enumerate() { + let nodes = &(&self.nodes).get(location.node).get(port_index); + for (output_port, summaries) in nodes.into_iter().enumerate() { let source = Location { node: location.node, port: Port::Source(output_port) }; - for summary in summaries.iter() { + for summary in summaries.into_iter() { if let Some(new_time) = summary.results_in(&time) { self.worklist.push(Reverse((new_time, source, diff))); } @@ -749,7 +738,7 @@ impl Tracker { .update_iter(Some((time, diff))); for (time, diff) in changes { - for new_target in self.edges.index(location.node).index(port_index).iter() { + for new_target in (&self.edges).get(location.node).get(port_index).into_iter() { self.worklist.push(Reverse(( time.clone(), Location::from(*new_target), @@ -801,14 +790,14 @@ impl Tracker { /// Graph locations may be missing from the output, in which case they have no /// paths to scope outputs. fn summarize_outputs( - nodes: &ColumnVec>>>, + nodes: &Vecs>>, edges: &Vec>>, ) -> HashMap>> { // A reverse edge map, to allow us to walk back up the dataflow graph. let mut reverse = HashMap::new(); - for (node, outputs) in edges.iter().enumerate() { - for (output, targets) in outputs.iter().enumerate() { + for (node, outputs) in columnar::Index::into_iter(edges).enumerate() { + for (output, targets) in columnar::Index::into_iter(outputs).enumerate() { for target in targets.iter() { reverse.insert( Location::from(*target), @@ -822,10 +811,9 @@ fn summarize_outputs( let mut worklist = VecDeque::<(Location, usize, T::Summary)>::new(); let outputs = - edges - .iter() - .flat_map(|x| x.iter()) - .flat_map(|x| x.iter()) + columnar::Index::into_iter(edges) + .flat_map(|x| columnar::Index::into_iter(x)) + .flat_map(|x| columnar::Index::into_iter(x)) .filter(|target| target.node == 0); // The scope may have no outputs, in which case we can do no work. @@ -843,7 +831,7 @@ fn summarize_outputs( Port::Source(output_port) => { // Consider each input port of the associated operator. - for (input_port, summaries) in nodes.index(location.node).iter().enumerate() { + for (input_port, summaries) in nodes.get(location.node).into_iter().enumerate() { // Determine the current path summaries from the input port. let location = Location { node: location.node, port: Port::Target(input_port) }; @@ -855,7 +843,7 @@ fn summarize_outputs( while antichains.len() <= output { antichains.push(Antichain::new()); } // Combine each operator-internal summary to the output with `summary`. - for operator_summary in summaries.index(output_port).iter() { + for operator_summary in summaries.get(output_port).into_iter() { if let Some(combined) = operator_summary.followed_by(&summary) { if antichains[output].insert(combined.clone()) { worklist.push_back((location, output, combined));