diff --git a/timely/Cargo.toml b/timely/Cargo.toml index 9d8b68aad..a74f2f831 100644 --- a/timely/Cargo.toml +++ b/timely/Cargo.toml @@ -31,6 +31,8 @@ timely_logging = { path = "../logging", version = "0.12" } timely_communication = { path = "../communication", version = "0.12", default-features = false } timely_container = { path = "../container", version = "0.12" } crossbeam-channel = "0.5.0" +columnar = { git = "https://github.com/frankmcsherry/columnar" } + [dev-dependencies] # timely_sort="0.1.6" diff --git a/timely/examples/event_driven.rs b/timely/examples/event_driven.rs index ee3b52515..886d6b35b 100644 --- a/timely/examples/event_driven.rs +++ b/timely/examples/event_driven.rs @@ -34,7 +34,7 @@ fn main() { println!("{:?}\tdataflows built ({} x {})", timer.elapsed(), dataflows, length); - for round in 0 .. { + for round in 0 .. 10 { let dataflow = round % dataflows; if record { inputs[dataflow].send(()); diff --git a/timely/src/progress/frontier.rs b/timely/src/progress/frontier.rs index 202c6e983..394c79a51 100644 --- a/timely/src/progress/frontier.rs +++ b/timely/src/progress/frontier.rs @@ -248,7 +248,7 @@ impl Antichain { /// let mut frontier = Antichain::from_elem(2); /// assert_eq!(frontier.elements(), &[2]); ///``` - #[inline] pub fn elements(&self) -> &[T] { &self[..] } + #[inline] pub fn elements(&self) -> &Vec { &self.elements } /// Reveals the elements in the antichain. /// diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index 5a1f1f6f6..95fcea51c 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -75,6 +75,9 @@ use std::collections::{BinaryHeap, HashMap, VecDeque}; use std::cmp::Reverse; +use columnar::{Columnar, Len, Index}; +use columnar::ColumnVec; + use crate::progress::Timestamp; use crate::progress::{Source, Target}; use crate::progress::ChangeBatch; @@ -84,6 +87,62 @@ use crate::progress::frontier::{Antichain, MutableAntichain}; use crate::progress::timestamp::PathSummary; +use vec_antichain::VecAntichain; + +/// A stand-in for `Vec>`. +mod vec_antichain { + + use columnar::{Columnar, Len, Index, IndexMut}; + use columnar::{ColumnVec, Slice}; + + use crate::progress::Antichain; + + #[derive(Clone, Debug)] + pub struct VecAntichain (ColumnVec); + + impl Default for VecAntichain { + fn default() -> Self { + Self (Default::default()) + } + } + + impl Len for VecAntichain { + #[inline(always)] fn len(&self) -> usize { self.0.len() } + } + + impl Index for VecAntichain { + type Index<'a> = Slice<&'a TC> where TC: 'a; + + #[inline(always)] + fn index(&self, index: usize) -> Self::Index<'_> { + self.0.index(index) + } + } + impl IndexMut for VecAntichain { + type IndexMut<'a> = Slice<&'a mut TC> where TC: 'a; + + #[inline(always)] + fn index_mut(&mut self, index: usize) -> Self::IndexMut<'_> { + self.0.index_mut(index) + } + } + + impl> Columnar> for VecAntichain { + #[inline(always)] + fn copy(&mut self, item: &Antichain) { + self.0.copy(item.elements()); + } + fn clear(&mut self) { + unimplemented!() + } + fn heap_size(&self) -> (usize, usize) { + unimplemented!() + } + } +} + + + /// A topology builder, which can summarize reachability along paths. /// /// A `Builder` takes descriptions of the nodes and edges in a graph, and compiles @@ -132,14 +191,14 @@ pub struct Builder { /// Indexed by operator index, then input port, then output port. This is the /// same format returned by `get_internal_summary`, as if we simply appended /// all of the summaries for the hosted nodes. - pub nodes: Vec>>>, + nodes: ColumnVec>>>, /// Direct connections from sources to targets. /// /// Edges do not affect timestamps, so we only need to know the connectivity. /// Indexed by operator index then output port. - pub edges: Vec>>, + edges: Vec>>, /// Numbers of inputs and outputs for each node. - pub shape: Vec<(usize, usize)>, + shape: Vec<(usize, usize)>, } impl Builder { @@ -147,7 +206,7 @@ impl Builder { /// Create a new empty topology builder. pub fn new() -> Self { Builder { - nodes: Vec::new(), + nodes: Default::default(), edges: Vec::new(), shape: Vec::new(), } @@ -155,20 +214,19 @@ impl Builder { /// Add links internal to operators. /// - /// This method overwrites any existing summary, instead of anything more sophisticated. + /// Nodes must be added in strictly increasing order of `index`. pub fn add_node(&mut self, index: usize, inputs: usize, outputs: usize, summary: Vec>>) { // Assert that all summaries exist. debug_assert_eq!(inputs, summary.len()); for x in summary.iter() { debug_assert_eq!(outputs, x.len()); } - while self.nodes.len() <= index { - self.nodes.push(Vec::new()); - self.edges.push(Vec::new()); - self.shape.push((0, 0)); - } + assert_eq!(self.nodes.len(), index); + + self.nodes.push(summary); + self.edges.push(Vec::new()); + self.shape.push((0, 0)); - self.nodes[index] = summary; if self.edges[index].len() != outputs { self.edges[index] = vec![Vec::new(); outputs]; } @@ -287,7 +345,7 @@ impl Builder { in_degree.entry(target).or_insert(0); for (output, summaries) in outputs.iter().enumerate() { let source = Location::new_source(index, output); - for summary in summaries.elements().iter() { + for summary in summaries.iter() { if summary == &Default::default() { *in_degree.entry(source).or_insert(0) += 1; } @@ -322,9 +380,9 @@ impl Builder { } }, Port::Target(port) => { - for (output, summaries) in self.nodes[node][port].iter().enumerate() { + for (output, summaries) in self.nodes.index(node).index(port).iter().enumerate() { let source = Location::new_source(node, output); - for summary in summaries.elements().iter() { + for summary in summaries.iter() { if summary == &Default::default() { *in_degree.get_mut(&source).unwrap() -= 1; if in_degree[&source] == 0 { @@ -361,12 +419,12 @@ pub struct Tracker { /// Indexed by operator index, then input port, then output port. This is the /// same format returned by `get_internal_summary`, as if we simply appended /// all of the summaries for the hosted nodes. - nodes: Vec>>>, + nodes: ColumnVec>>>, /// Direct connections from sources to targets. /// /// Edges do not affect timestamps, so we only need to know the connectivity. /// Indexed by operator index then output port. - edges: Vec>>, + edges: ColumnVec>>, // TODO: All of the sizes of these allocations are static (except internal to `ChangeBatch`). // It seems we should be able to flatten most of these so that there are a few allocations @@ -544,10 +602,15 @@ impl Tracker { let scope_outputs = builder.shape[0].0; let output_changes = vec![ChangeBatch::new(); scope_outputs]; + let mut edges: ColumnVec>> = Default::default(); + for edge in builder.edges { + edges.push(edge); + } + let tracker = Tracker { nodes: builder.nodes, - edges: builder.edges, + edges, per_operator, target_changes: ChangeBatch::new(), source_changes: ChangeBatch::new(), @@ -663,10 +726,10 @@ impl Tracker { .update_iter(Some((time, diff))); for (time, diff) in changes { - let nodes = &self.nodes[location.node][port_index]; + let nodes = &self.nodes.index(location.node).index(port_index); for (output_port, summaries) in nodes.iter().enumerate() { let source = Location { node: location.node, port: Port::Source(output_port) }; - for summary in summaries.elements().iter() { + for summary in summaries.iter() { if let Some(new_time) = summary.results_in(&time) { self.worklist.push(Reverse((new_time, source, diff))); } @@ -686,7 +749,7 @@ impl Tracker { .update_iter(Some((time, diff))); for (time, diff) in changes { - for new_target in self.edges[location.node][port_index].iter() { + for new_target in self.edges.index(location.node).index(port_index).iter() { self.worklist.push(Reverse(( time.clone(), Location::from(*new_target), @@ -738,7 +801,7 @@ impl Tracker { /// Graph locations may be missing from the output, in which case they have no /// paths to scope outputs. fn summarize_outputs( - nodes: &Vec>>>, + nodes: &ColumnVec>>>, edges: &Vec>>, ) -> HashMap>> { @@ -780,7 +843,7 @@ fn summarize_outputs( Port::Source(output_port) => { // Consider each input port of the associated operator. - for (input_port, summaries) in nodes[location.node].iter().enumerate() { + for (input_port, summaries) in nodes.index(location.node).iter().enumerate() { // Determine the current path summaries from the input port. let location = Location { node: location.node, port: Port::Target(input_port) }; @@ -792,7 +855,7 @@ fn summarize_outputs( while antichains.len() <= output { antichains.push(Antichain::new()); } // Combine each operator-internal summary to the output with `summary`. - for operator_summary in summaries[output_port].elements().iter() { + for operator_summary in summaries.index(output_port).iter() { if let Some(combined) = operator_summary.followed_by(&summary) { if antichains[output].insert(combined.clone()) { worklist.push_back((location, output, combined));