Skip to content

Commit

Permalink
Test for columnar reachability
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Sep 5, 2024
1 parent e79b3ee commit c979fde
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 25 deletions.
2 changes: 2 additions & 0 deletions timely/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ timely_logging = { path = "../logging", version = "0.12" }
timely_communication = { path = "../communication", version = "0.12", default-features = false }
timely_container = { path = "../container", version = "0.12" }
crossbeam-channel = "0.5.0"
columnar = { git = "https://github.com/frankmcsherry/columnar" }


[dev-dependencies]
# timely_sort="0.1.6"
Expand Down
2 changes: 1 addition & 1 deletion timely/examples/event_driven.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ fn main() {

println!("{:?}\tdataflows built ({} x {})", timer.elapsed(), dataflows, length);

for round in 0 .. {
for round in 0 .. 10 {
let dataflow = round % dataflows;
if record {
inputs[dataflow].send(());
Expand Down
2 changes: 1 addition & 1 deletion timely/src/progress/frontier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -248,7 +248,7 @@ impl<T> Antichain<T> {
/// let mut frontier = Antichain::from_elem(2);
/// assert_eq!(frontier.elements(), &[2]);
///```
#[inline] pub fn elements(&self) -> &[T] { &self[..] }
#[inline] pub fn elements(&self) -> &Vec<T> { &self.elements }

/// Reveals the elements in the antichain.
///
Expand Down
109 changes: 86 additions & 23 deletions timely/src/progress/reachability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@
use std::collections::{BinaryHeap, HashMap, VecDeque};
use std::cmp::Reverse;

use columnar::{Columnar, Len, Index};
use columnar::ColumnVec;

use crate::progress::Timestamp;
use crate::progress::{Source, Target};
use crate::progress::ChangeBatch;
Expand All @@ -84,6 +87,62 @@ use crate::progress::frontier::{Antichain, MutableAntichain};
use crate::progress::timestamp::PathSummary;


use vec_antichain::VecAntichain;

/// A stand-in for `Vec<Antichain<T>>`.
mod vec_antichain {

use columnar::{Columnar, Len, Index, IndexMut};
use columnar::{ColumnVec, Slice};

use crate::progress::Antichain;

#[derive(Clone, Debug)]
pub struct VecAntichain<T> (ColumnVec<T>);

impl<TC: Default> Default for VecAntichain<TC> {
fn default() -> Self {
Self (Default::default())
}
}

impl<TC> Len for VecAntichain<TC> {
#[inline(always)] fn len(&self) -> usize { self.0.len() }
}

impl<TC> Index for VecAntichain<TC> {
type Index<'a> = Slice<&'a TC> where TC: 'a;

#[inline(always)]
fn index(&self, index: usize) -> Self::Index<'_> {
self.0.index(index)
}
}
impl<TC> IndexMut for VecAntichain<TC> {
type IndexMut<'a> = Slice<&'a mut TC> where TC: 'a;

#[inline(always)]
fn index_mut(&mut self, index: usize) -> Self::IndexMut<'_> {
self.0.index_mut(index)
}
}

impl<T, TC: Columnar<T>> Columnar<Antichain<T>> for VecAntichain<TC> {
#[inline(always)]
fn copy(&mut self, item: &Antichain<T>) {
self.0.copy(item.elements());
}
fn clear(&mut self) {
unimplemented!()
}
fn heap_size(&self) -> (usize, usize) {
unimplemented!()
}
}
}



/// A topology builder, which can summarize reachability along paths.
///
/// A `Builder` takes descriptions of the nodes and edges in a graph, and compiles
Expand Down Expand Up @@ -132,43 +191,42 @@ pub struct Builder<T: Timestamp> {
/// Indexed by operator index, then input port, then output port. This is the
/// same format returned by `get_internal_summary`, as if we simply appended
/// all of the summaries for the hosted nodes.
pub nodes: Vec<Vec<Vec<Antichain<T::Summary>>>>,
nodes: ColumnVec<ColumnVec<VecAntichain<Vec<T::Summary>>>>,
/// Direct connections from sources to targets.
///
/// Edges do not affect timestamps, so we only need to know the connectivity.
/// Indexed by operator index then output port.
pub edges: Vec<Vec<Vec<Target>>>,
edges: Vec<Vec<Vec<Target>>>,
/// Numbers of inputs and outputs for each node.
pub shape: Vec<(usize, usize)>,
shape: Vec<(usize, usize)>,
}

impl<T: Timestamp> Builder<T> {

/// Create a new empty topology builder.
pub fn new() -> Self {
Builder {
nodes: Vec::new(),
nodes: Default::default(),
edges: Vec::new(),
shape: Vec::new(),
}
}

/// Add links internal to operators.
///
/// This method overwrites any existing summary, instead of anything more sophisticated.
/// Nodes must be added in strictly increasing order of `index`.
pub fn add_node(&mut self, index: usize, inputs: usize, outputs: usize, summary: Vec<Vec<Antichain<T::Summary>>>) {

// Assert that all summaries exist.
debug_assert_eq!(inputs, summary.len());
for x in summary.iter() { debug_assert_eq!(outputs, x.len()); }

while self.nodes.len() <= index {
self.nodes.push(Vec::new());
self.edges.push(Vec::new());
self.shape.push((0, 0));
}
assert_eq!(self.nodes.len(), index);

self.nodes.push(summary);
self.edges.push(Vec::new());
self.shape.push((0, 0));

self.nodes[index] = summary;
if self.edges[index].len() != outputs {
self.edges[index] = vec![Vec::new(); outputs];
}
Expand Down Expand Up @@ -287,7 +345,7 @@ impl<T: Timestamp> Builder<T> {
in_degree.entry(target).or_insert(0);
for (output, summaries) in outputs.iter().enumerate() {
let source = Location::new_source(index, output);
for summary in summaries.elements().iter() {
for summary in summaries.iter() {
if summary == &Default::default() {
*in_degree.entry(source).or_insert(0) += 1;
}
Expand Down Expand Up @@ -322,9 +380,9 @@ impl<T: Timestamp> Builder<T> {
}
},
Port::Target(port) => {
for (output, summaries) in self.nodes[node][port].iter().enumerate() {
for (output, summaries) in self.nodes.index(node).index(port).iter().enumerate() {
let source = Location::new_source(node, output);
for summary in summaries.elements().iter() {
for summary in summaries.iter() {
if summary == &Default::default() {
*in_degree.get_mut(&source).unwrap() -= 1;
if in_degree[&source] == 0 {
Expand Down Expand Up @@ -361,12 +419,12 @@ pub struct Tracker<T:Timestamp> {
/// Indexed by operator index, then input port, then output port. This is the
/// same format returned by `get_internal_summary`, as if we simply appended
/// all of the summaries for the hosted nodes.
nodes: Vec<Vec<Vec<Antichain<T::Summary>>>>,
nodes: ColumnVec<ColumnVec<VecAntichain<Vec<T::Summary>>>>,
/// Direct connections from sources to targets.
///
/// Edges do not affect timestamps, so we only need to know the connectivity.
/// Indexed by operator index then output port.
edges: Vec<Vec<Vec<Target>>>,
edges: ColumnVec<ColumnVec<Vec<Target>>>,

// TODO: All of the sizes of these allocations are static (except internal to `ChangeBatch`).
// It seems we should be able to flatten most of these so that there are a few allocations
Expand Down Expand Up @@ -544,10 +602,15 @@ impl<T:Timestamp> Tracker<T> {
let scope_outputs = builder.shape[0].0;
let output_changes = vec![ChangeBatch::new(); scope_outputs];

let mut edges: ColumnVec<ColumnVec<Vec<Target>>> = Default::default();
for edge in builder.edges {
edges.push(edge);
}

let tracker =
Tracker {
nodes: builder.nodes,
edges: builder.edges,
edges,
per_operator,
target_changes: ChangeBatch::new(),
source_changes: ChangeBatch::new(),
Expand Down Expand Up @@ -663,10 +726,10 @@ impl<T:Timestamp> Tracker<T> {
.update_iter(Some((time, diff)));

for (time, diff) in changes {
let nodes = &self.nodes[location.node][port_index];
let nodes = &self.nodes.index(location.node).index(port_index);
for (output_port, summaries) in nodes.iter().enumerate() {
let source = Location { node: location.node, port: Port::Source(output_port) };
for summary in summaries.elements().iter() {
for summary in summaries.iter() {
if let Some(new_time) = summary.results_in(&time) {
self.worklist.push(Reverse((new_time, source, diff)));
}
Expand All @@ -686,7 +749,7 @@ impl<T:Timestamp> Tracker<T> {
.update_iter(Some((time, diff)));

for (time, diff) in changes {
for new_target in self.edges[location.node][port_index].iter() {
for new_target in self.edges.index(location.node).index(port_index).iter() {
self.worklist.push(Reverse((
time.clone(),
Location::from(*new_target),
Expand Down Expand Up @@ -738,7 +801,7 @@ impl<T:Timestamp> Tracker<T> {
/// Graph locations may be missing from the output, in which case they have no
/// paths to scope outputs.
fn summarize_outputs<T: Timestamp>(
nodes: &Vec<Vec<Vec<Antichain<T::Summary>>>>,
nodes: &ColumnVec<ColumnVec<VecAntichain<Vec<T::Summary>>>>,
edges: &Vec<Vec<Vec<Target>>>,
) -> HashMap<Location, Vec<Antichain<T::Summary>>>
{
Expand Down Expand Up @@ -780,7 +843,7 @@ fn summarize_outputs<T: Timestamp>(
Port::Source(output_port) => {

// Consider each input port of the associated operator.
for (input_port, summaries) in nodes[location.node].iter().enumerate() {
for (input_port, summaries) in nodes.index(location.node).iter().enumerate() {

// Determine the current path summaries from the input port.
let location = Location { node: location.node, port: Port::Target(input_port) };
Expand All @@ -792,7 +855,7 @@ fn summarize_outputs<T: Timestamp>(
while antichains.len() <= output { antichains.push(Antichain::new()); }

// Combine each operator-internal summary to the output with `summary`.
for operator_summary in summaries[output_port].elements().iter() {
for operator_summary in summaries.index(output_port).iter() {
if let Some(combined) = operator_summary.followed_by(&summary) {
if antichains[output].insert(combined.clone()) {
worklist.push_back((location, output, combined));
Expand Down

0 comments on commit c979fde

Please sign in to comment.