diff --git a/examples/dynamic.rs b/examples/dynamic.rs new file mode 100644 index 000000000..8a9dcdc81 --- /dev/null +++ b/examples/dynamic.rs @@ -0,0 +1,153 @@ +extern crate rand; +extern crate timely; +extern crate differential_dataflow; + +use rand::{Rng, SeedableRng, StdRng}; + +use timely::dataflow::*; +use timely::dataflow::operators::probe::Handle; + +use differential_dataflow::input::Input; +use differential_dataflow::Collection; +use differential_dataflow::operators::*; +use differential_dataflow::lattice::Lattice; +use differential_dataflow::logging::DifferentialEvent; + +type Node = u32; +type Edge = (Node, Node); + +fn main() { + + let nodes: u32 = std::env::args().nth(1).unwrap().parse().unwrap(); + let edges: u32 = std::env::args().nth(2).unwrap().parse().unwrap(); + let batch: u32 = std::env::args().nth(3).unwrap().parse().unwrap(); + let rounds: u32 = std::env::args().nth(4).unwrap().parse().unwrap(); + let inspect: bool = std::env::args().nth(5).unwrap() == "inspect"; + + // define a new computational scope, in which to run BFS + timely::execute_from_args(std::env::args(), move |worker| { + + if let Ok(addr) = ::std::env::var("DIFFERENTIAL_LOG_ADDR") { + + eprintln!("enabled DIFFERENTIAL logging to {}", addr); + + if let Ok(stream) = ::std::net::TcpStream::connect(&addr) { + let writer = ::timely::dataflow::operators::capture::EventWriter::new(stream); + let mut logger = ::timely::logging::BatchLogger::new(writer); + worker.log_register().insert::("differential/arrange", move |time, data| + logger.publish_batch(time, data) + ); + } + else { + panic!("Could not connect to differential log address: {:?}", addr); + } + } + + let timer = ::std::time::Instant::now(); + + // define BFS dataflow; return handles to roots and edges inputs + let mut probe = Handle::new(); + let (mut roots, mut graph) = worker.dataflow(|scope| { + + let (root_input, roots) = scope.new_collection(); + let (edge_input, graph) = scope.new_collection(); + + let mut result = bfs(&graph, &roots); + + if !inspect { + result = result.filter(|_| false); + } + + result.map(|(_,l)| l) + .consolidate() + .inspect(|x| println!("\t{:?}", x)) + .probe_with(&mut probe); + + (root_input, edge_input) + }); + + let seed: &[_] = &[1, 2, 3, 4]; + let mut rng1: StdRng = SeedableRng::from_seed(seed); // rng for edge additions + let mut rng2: StdRng = SeedableRng::from_seed(seed); // rng for edge deletions + + roots.insert(0); + roots.close(); + + println!("performing BFS on {} nodes, {} edges:", nodes, edges); + + if worker.index() == 0 { + for _ in 0 .. edges { + graph.insert((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes))); + } + } + + println!("{:?}\tloaded", timer.elapsed()); + + graph.advance_to(1); + graph.flush(); + worker.step_or_park_while(None, || probe.less_than(graph.time())); + + println!("{:?}\tstable", timer.elapsed()); + + for round in 0 .. rounds { + for element in 0 .. batch { + if worker.index() == 0 { + graph.insert((rng1.gen_range(0, nodes), rng1.gen_range(0, nodes))); + graph.remove((rng2.gen_range(0, nodes), rng2.gen_range(0, nodes))); + } + graph.advance_to(2 + round * batch + element); + } + graph.flush(); + + let timer2 = ::std::time::Instant::now(); + worker.step_or_park_while(None, || probe.less_than(&graph.time())); + + if worker.index() == 0 { + let elapsed = timer2.elapsed(); + println!("{:?}\t{:?}:\t{}", timer.elapsed(), round, elapsed.as_secs() * 1000000000 + (elapsed.subsec_nanos() as u64)); + } + } + println!("finished; elapsed: {:?}", timer.elapsed()); + }).unwrap(); +} + +// returns pairs (n, s) indicating node n can be reached from a root in s steps. +fn bfs(edges: &Collection, roots: &Collection) -> Collection +where G::Timestamp: Lattice+Ord { + + use timely::order::Product; + use iterate::Variable; + use differential_dataflow::dynamic::{feedback_summary, pointstamp::PointStamp}; + + // initialize roots as reaching themselves at distance 0 + let nodes = roots.map(|x| (x, 0)); + + // repeatedly update minimal distances each node can be reached from each root + nodes.scope().iterative::, _, _>(|inner| { + + // These enter the statically bound scope, rather than any iterative scopes. + // We do not *need* to enter them into the dynamic scope, as they are static + // within that scope. + let edges = edges.enter(inner); + let nodes = nodes.enter(inner); + + // Create a variable for label iteration. + let inner = feedback_summary::(1, 1); + let label = Variable::new_from(nodes.clone(), Product { outer: Default::default(), inner }); + + let next = + label + .join_map(&edges, |_k,l,d| (*d, l+1)) + .concat(&nodes) + .reduce(|_, s, t| t.push((*s[0].0, 1))) + ; + + label.set(&next); + // Leave the dynamic iteration, stripping off the last timestamp coordinate. + next + .leave_dynamic(1) + .inspect(|x| println!("{:?}", x)) + .leave() + }) + +} \ No newline at end of file diff --git a/src/dynamic/mod.rs b/src/dynamic/mod.rs new file mode 100644 index 000000000..aadefb05a --- /dev/null +++ b/src/dynamic/mod.rs @@ -0,0 +1,75 @@ +//! Types and operators for dynamically scoped iterative dataflows. +//! +//! Scopes in timely dataflow are expressed statically, as part of the type system. +//! This affords many efficiencies, as well as type-driven reassurance of correctness. +//! However, there are times you need scopes whose organization is discovered only at runtime. +//! Naiad and Materialize are examples: the latter taking arbitrary SQL into iterative dataflows. +//! +//! This module provides a timestamp type `Pointstamp` that can represent an update with an +//! unboundedly long sequence of some `T: Timestamp`, ordered by the product order by which times +//! in iterative dataflows are ordered. The module also provides methods for manipulating these +//! timestamps to emulate the movement of update streams in to, within, and out of iterative scopes. +//! + +pub mod pointstamp; + +use timely::dataflow::{Scope, scopes::Child}; +use timely::order::Product; +use timely::progress::Timestamp; +use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; +use timely::dataflow::channels::pact::Pipeline; +use timely::progress::Antichain; + +use difference::Semigroup; +use {Collection, Data}; +use collection::AsCollection; +use dynamic::pointstamp::PointStamp; +use dynamic::pointstamp::PointStampSummary; + +impl Collection>>, D, R> +where + G: Scope, + D: Data, + R: Semigroup, + T: Timestamp+Default, +{ + /// Enters a dynamically created scope which has `level` timestamp coordinates. + pub fn enter_dynamic(&self, _level: usize) -> Self { + (*self).clone() + } + /// Leaves a dynamically created scope which has `level` timestamp coordinates. + pub fn leave_dynamic(&self, level: usize) -> Self { + // Create a unary operator that will strip all but `level-1` timestamp coordinates. + let mut builder = OperatorBuilder::new("LeaveDynamic".to_string(), self.scope()); + let (mut output, stream) = builder.new_output(); + let mut input = builder.new_input_connection(&self.inner, Pipeline, vec![Antichain::from_elem(Product { outer: Default::default(), inner: PointStampSummary { retain: Some(level - 1), actions: Vec::new() } })]); + + let mut vector = Default::default(); + builder.build(move |_capability| move |_frontier| { + let mut output = output.activate(); + input.for_each(|cap, data| { + data.swap(&mut vector); + let mut new_time = cap.time().clone(); + new_time.inner.vector.truncate(level - 1); + let new_cap = cap.delayed(&new_time); + for (_data, time, _diff) in vector.iter_mut() { + time.inner.vector.truncate(level - 1); + } + output.session(&new_cap).give_vec(&mut vector); + }); + }); + + stream.as_collection() + } +} + +/// Produces the summary for a feedback operator at `level`, applying `summary` to that coordinate. +pub fn feedback_summary(level: usize, summary: T::Summary) -> PointStampSummary +where + T: Timestamp+Default, +{ + PointStampSummary { + retain: None, + actions: std::iter::repeat(Default::default()).take(level-1).chain(std::iter::once(summary)).collect(), + } +} diff --git a/src/dynamic/pointstamp.rs b/src/dynamic/pointstamp.rs new file mode 100644 index 000000000..1c2eb247d --- /dev/null +++ b/src/dynamic/pointstamp.rs @@ -0,0 +1,204 @@ +//! A timestamp type as in Naiad, where a vector of timestamps of different lengths are comparable. +//! +//! This type compares using "standard" tuple logic as if each timestamp were extended indefinitely with minimal elements. +//! +//! The path summary for this type allows *run-time* rather than *type-driven* iterative scopes. +//! Each summary represents some journey within and out of some number of scopes, followed by entry +//! into and iteration within some other number of scopes. +//! +//! As a result, summaries describe some number of trailing coordinates to truncate, and some increments +//! to the resulting vector. Structurally, the increments can only be to one non-truncated coordinate +//! (as iteration within a scope requires leaving contained scopes), and then to any number of appended +//! default coordinates (which is effectively just *setting* the coordinate). + +use serde::{Deserialize, Serialize}; + +/// A sequence of timestamps, partially ordered by the product order. +/// +/// Sequences of different lengths are compared as if extended indefinitely by `T::minimum()`. +/// Sequences are not guaranteed to be "minimal", and may end with `T::minimum()` entries. +#[derive( + Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize, Abomonation, +)] +pub struct PointStamp { + /// A sequence of timestamps corresponding to timestamps in a sequence of nested scopes. + pub vector: Vec, +} + +impl PointStamp { + /// Create a new sequence. + pub fn new(vector: Vec) -> Self { + PointStamp { vector } + } +} + +// Implement timely dataflow's `PartialOrder` trait. +use timely::order::PartialOrder; +impl PartialOrder for PointStamp { + fn less_equal(&self, other: &Self) -> bool { + // Every present coordinate must be less-equal the corresponding coordinate, + // where absent corresponding coordinates are `T::minimum()`. Coordinates + // absent from `self.vector` are themselves `T::minimum()` and are less-equal + // any corresponding coordinate in `other.vector`. + self.vector + .iter() + .zip(other.vector.iter().chain(std::iter::repeat(&T::minimum()))) + .all(|(t1, t2)| t1.less_equal(t2)) + } +} + +use timely::progress::timestamp::Refines; +impl Refines<()> for PointStamp { + fn to_inner(_outer: ()) -> Self { + Self { vector: Vec::new() } + } + fn to_outer(self) -> () { + () + } + fn summarize(_summary: ::Summary) -> () { + () + } +} + +// Implement timely dataflow's `PathSummary` trait. +// This is preparation for the `Timestamp` implementation below. +use timely::progress::PathSummary; + +/// Describes an action on a `PointStamp`: truncation to `length` followed by `actions`. +#[derive( + Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize, Abomonation +)] +pub struct PointStampSummary { + /// Number of leading coordinates to retain. + /// + /// A `None` value indicates that all coordinates should be retained. + pub retain: Option, + /// Summary actions to apply to all coordinates. + /// + /// If `actions.len()` is greater than `retain`, a timestamp should be extended by + /// `T::minimum()` in order to be subjected to `actions`. + pub actions: Vec, +} + +impl PathSummary> for PointStampSummary { + fn results_in(&self, timestamp: &PointStamp) -> Option> { + // Get a slice of timestamp coordinates appropriate for consideration. + let timestamps = if let Some(retain) = self.retain { + if retain < timestamp.vector.len() { + ×tamp.vector[..retain] + } else { + ×tamp.vector[..] + } + } else { + ×tamp.vector[..] + }; + + let mut vector = Vec::with_capacity(std::cmp::max(timestamps.len(), self.actions.len())); + // Introduce elements where both timestamp and action exist. + let min_len = std::cmp::min(timestamps.len(), self.actions.len()); + for (action, timestamp) in self.actions.iter().zip(timestamps.iter()) { + vector.push(action.results_in(timestamp)?); + } + // Any remaining timestamps should be copied in. + for timestamp in timestamps.iter().skip(min_len) { + vector.push(timestamp.clone()); + } + // Any remaining actions should be applied to the empty timestamp. + for action in self.actions.iter().skip(min_len) { + vector.push(action.results_in(&T::minimum())?); + } + + Some(PointStamp { vector }) + } + fn followed_by(&self, other: &Self) -> Option { + // The output `retain` will be the minimum of the two inputs. + let retain = match (self.retain, other.retain) { + (Some(x), Some(y)) => Some(std::cmp::min(x, y)), + (Some(x), None) => Some(x), + (None, Some(y)) => Some(y), + (None, None) => None, + }; + + // The output `actions` will depend on the relative sizes of the input `retain`s. + let self_actions = if let Some(retain) = other.retain { + if retain < self.actions.len() { + &self.actions[..retain] + } else { + &self.actions[..] + } + } else { + &self.actions[..] + }; + + let mut actions = Vec::with_capacity(std::cmp::max(self_actions.len(), other.actions.len())); + // Introduce actions where both input actions apply. + let min_len = std::cmp::min(self_actions.len(), other.actions.len()); + for (action1, action2) in self_actions.iter().zip(other.actions.iter()) { + actions.push(action1.followed_by(action2)?); + } + // Append any remaining self actions. + actions.extend(self_actions.iter().skip(min_len).cloned()); + // Append any remaining other actions. + actions.extend(other.actions.iter().skip(min_len).cloned()); + + Some(Self { retain, actions }) + } +} + +impl PartialOrder for PointStampSummary { + fn less_equal(&self, other: &Self) -> bool { + // If the `retain`s are not the same, there is some coordinate which + // could either be bigger or smaller as the timestamp or the replacemnt. + // In principle, a `T::minimum()` extension could break this rule, and + // we could tighten this logic if needed; I think it is fine not to though. + self.retain == other.retain + && self.actions.len() <= other.actions.len() + && self + .actions + .iter() + .zip(other.actions.iter()) + .all(|(t1, t2)| t1.less_equal(t2)) + } +} + +// Implement timely dataflow's `Timestamp` trait. +use timely::progress::Timestamp; +impl Timestamp for PointStamp { + fn minimum() -> Self { + Self { vector: Vec::new() } + } + type Summary = PointStampSummary; +} + +// Implement differential dataflow's `Lattice` trait. +// This extends the `PartialOrder` implementation with additional structure. +use lattice::Lattice; +impl Lattice for PointStamp { + fn join(&self, other: &Self) -> Self { + let min_len = ::std::cmp::min(self.vector.len(), other.vector.len()); + let max_len = ::std::cmp::max(self.vector.len(), other.vector.len()); + let mut vector = Vec::with_capacity(max_len); + // For coordinates in both inputs, apply `join` to the pair. + for index in 0..min_len { + vector.push(self.vector[index].join(&other.vector[index])); + } + // Only one of the two vectors will have remaining elements; copy them. + for time in &self.vector[min_len..] { + vector.push(time.clone()); + } + for time in &other.vector[min_len..] { + vector.push(time.clone()); + } + Self { vector } + } + fn meet(&self, other: &Self) -> Self { + let min_len = ::std::cmp::min(self.vector.len(), other.vector.len()); + let mut vector = Vec::with_capacity(min_len); + // For coordinates in both inputs, apply `meet` to the pair. + for index in 0..min_len { + vector.push(self.vector[index].meet(&other.vector[index])); + } + // Remaining coordinates are `T::minimum()` in one input, and so in the output. + Self { vector } + } +} diff --git a/src/lib.rs b/src/lib.rs index 2ff81e8b8..c753b931c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -109,6 +109,7 @@ pub mod lattice; pub mod trace; pub mod input; pub mod difference; +pub mod dynamic; pub mod collection; pub mod logging; pub mod consolidation;