Skip to content

Commit

Permalink
Pointstamps and dynamic scopes (TimelyDataflow#378)
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry authored Feb 16, 2023
1 parent e9e1157 commit 24df201
Show file tree
Hide file tree
Showing 4 changed files with 433 additions and 0 deletions.
153 changes: 153 additions & 0 deletions examples/dynamic.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
extern crate rand;
extern crate timely;
extern crate differential_dataflow;

use rand::{Rng, SeedableRng, StdRng};

use timely::dataflow::*;
use timely::dataflow::operators::probe::Handle;

use differential_dataflow::input::Input;
use differential_dataflow::Collection;
use differential_dataflow::operators::*;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::logging::DifferentialEvent;

type Node = u32;
type Edge = (Node, Node);

fn main() {

let nodes: u32 = std::env::args().nth(1).unwrap().parse().unwrap();
let edges: u32 = std::env::args().nth(2).unwrap().parse().unwrap();
let batch: u32 = std::env::args().nth(3).unwrap().parse().unwrap();
let rounds: u32 = std::env::args().nth(4).unwrap().parse().unwrap();
let inspect: bool = std::env::args().nth(5).unwrap() == "inspect";

// define a new computational scope, in which to run BFS
timely::execute_from_args(std::env::args(), move |worker| {

if let Ok(addr) = ::std::env::var("DIFFERENTIAL_LOG_ADDR") {

eprintln!("enabled DIFFERENTIAL logging to {}", addr);

if let Ok(stream) = ::std::net::TcpStream::connect(&addr) {
let writer = ::timely::dataflow::operators::capture::EventWriter::new(stream);
let mut logger = ::timely::logging::BatchLogger::new(writer);
worker.log_register().insert::<DifferentialEvent,_>("differential/arrange", move |time, data|
logger.publish_batch(time, data)
);
}
else {
panic!("Could not connect to differential log address: {:?}", addr);
}
}

let timer = ::std::time::Instant::now();

// define BFS dataflow; return handles to roots and edges inputs
let mut probe = Handle::new();
let (mut roots, mut graph) = worker.dataflow(|scope| {

let (root_input, roots) = scope.new_collection();
let (edge_input, graph) = scope.new_collection();

let mut result = bfs(&graph, &roots);

if !inspect {
result = result.filter(|_| false);
}

result.map(|(_,l)| l)
.consolidate()
.inspect(|x| println!("\t{:?}", x))
.probe_with(&mut probe);

(root_input, edge_input)
});

let seed: &[_] = &[1, 2, 3, 4];
let mut rng1: StdRng = SeedableRng::from_seed(seed); // rng for edge additions
let mut rng2: StdRng = SeedableRng::from_seed(seed); // rng for edge deletions

roots.insert(0);
roots.close();

println!("performing BFS on {} nodes, {} edges:", nodes, edges);

if worker.index() == 0 {
for _ in 0 .. edges {
graph.insert((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)));
}
}

println!("{:?}\tloaded", timer.elapsed());

graph.advance_to(1);
graph.flush();
worker.step_or_park_while(None, || probe.less_than(graph.time()));

println!("{:?}\tstable", timer.elapsed());

for round in 0 .. rounds {
for element in 0 .. batch {
if worker.index() == 0 {
graph.insert((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes)));
graph.remove((rng2.gen_range(0, nodes), rng2.gen_range(0, nodes)));
}
graph.advance_to(2 + round * batch + element);
}
graph.flush();

let timer2 = ::std::time::Instant::now();
worker.step_or_park_while(None, || probe.less_than(&graph.time()));

if worker.index() == 0 {
let elapsed = timer2.elapsed();
println!("{:?}\t{:?}:\t{}", timer.elapsed(), round, elapsed.as_secs() * 1000000000 + (elapsed.subsec_nanos() as u64));
}
}
println!("finished; elapsed: {:?}", timer.elapsed());
}).unwrap();
}

// returns pairs (n, s) indicating node n can be reached from a root in s steps.
fn bfs<G: Scope>(edges: &Collection<G, Edge>, roots: &Collection<G, Node>) -> Collection<G, (Node, u32)>
where G::Timestamp: Lattice+Ord {

use timely::order::Product;
use iterate::Variable;
use differential_dataflow::dynamic::{feedback_summary, pointstamp::PointStamp};

// initialize roots as reaching themselves at distance 0
let nodes = roots.map(|x| (x, 0));

// repeatedly update minimal distances each node can be reached from each root
nodes.scope().iterative::<PointStamp<usize>, _, _>(|inner| {

// These enter the statically bound scope, rather than any iterative scopes.
// We do not *need* to enter them into the dynamic scope, as they are static
// within that scope.
let edges = edges.enter(inner);
let nodes = nodes.enter(inner);

// Create a variable for label iteration.
let inner = feedback_summary::<usize>(1, 1);
let label = Variable::new_from(nodes.clone(), Product { outer: Default::default(), inner });

let next =
label
.join_map(&edges, |_k,l,d| (*d, l+1))
.concat(&nodes)
.reduce(|_, s, t| t.push((*s[0].0, 1)))
;

label.set(&next);
// Leave the dynamic iteration, stripping off the last timestamp coordinate.
next
.leave_dynamic(1)
.inspect(|x| println!("{:?}", x))
.leave()
})

}
75 changes: 75 additions & 0 deletions src/dynamic/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
//! Types and operators for dynamically scoped iterative dataflows.
//!
//! Scopes in timely dataflow are expressed statically, as part of the type system.
//! This affords many efficiencies, as well as type-driven reassurance of correctness.
//! However, there are times you need scopes whose organization is discovered only at runtime.
//! Naiad and Materialize are examples: the latter taking arbitrary SQL into iterative dataflows.
//!
//! This module provides a timestamp type `Pointstamp` that can represent an update with an
//! unboundedly long sequence of some `T: Timestamp`, ordered by the product order by which times
//! in iterative dataflows are ordered. The module also provides methods for manipulating these
//! timestamps to emulate the movement of update streams in to, within, and out of iterative scopes.
//!

pub mod pointstamp;

use timely::dataflow::{Scope, scopes::Child};
use timely::order::Product;
use timely::progress::Timestamp;
use timely::dataflow::operators::generic::builder_rc::OperatorBuilder;
use timely::dataflow::channels::pact::Pipeline;
use timely::progress::Antichain;

use difference::Semigroup;
use {Collection, Data};
use collection::AsCollection;
use dynamic::pointstamp::PointStamp;
use dynamic::pointstamp::PointStampSummary;

impl<G, D, R, T> Collection<Child<'_, G, Product<G::Timestamp, PointStamp<T>>>, D, R>
where
G: Scope,
D: Data,
R: Semigroup,
T: Timestamp+Default,
{
/// Enters a dynamically created scope which has `level` timestamp coordinates.
pub fn enter_dynamic(&self, _level: usize) -> Self {
(*self).clone()
}
/// Leaves a dynamically created scope which has `level` timestamp coordinates.
pub fn leave_dynamic(&self, level: usize) -> Self {
// Create a unary operator that will strip all but `level-1` timestamp coordinates.
let mut builder = OperatorBuilder::new("LeaveDynamic".to_string(), self.scope());
let (mut output, stream) = builder.new_output();
let mut input = builder.new_input_connection(&self.inner, Pipeline, vec![Antichain::from_elem(Product { outer: Default::default(), inner: PointStampSummary { retain: Some(level - 1), actions: Vec::new() } })]);

let mut vector = Default::default();
builder.build(move |_capability| move |_frontier| {
let mut output = output.activate();
input.for_each(|cap, data| {
data.swap(&mut vector);
let mut new_time = cap.time().clone();
new_time.inner.vector.truncate(level - 1);
let new_cap = cap.delayed(&new_time);
for (_data, time, _diff) in vector.iter_mut() {
time.inner.vector.truncate(level - 1);
}
output.session(&new_cap).give_vec(&mut vector);
});
});

stream.as_collection()
}
}

/// Produces the summary for a feedback operator at `level`, applying `summary` to that coordinate.
pub fn feedback_summary<T>(level: usize, summary: T::Summary) -> PointStampSummary<T::Summary>
where
T: Timestamp+Default,
{
PointStampSummary {
retain: None,
actions: std::iter::repeat(Default::default()).take(level-1).chain(std::iter::once(summary)).collect(),
}
}
Loading

0 comments on commit 24df201

Please sign in to comment.