diff --git a/timely/examples/flatcontainer.rs b/timely/examples/flatcontainer.rs index 8d528c954..21b62d44b 100644 --- a/timely/examples/flatcontainer.rs +++ b/timely/examples/flatcontainer.rs @@ -5,24 +5,26 @@ use { std::collections::HashMap, timely::container::flatcontainer::{Containerized, FlatStack}, timely::dataflow::channels::pact::{ExchangeCore, Pipeline}, - timely::dataflow::operators::core::InputHandle, + timely::dataflow::InputHandleCore, timely::dataflow::operators::{Inspect, Operator, Probe}, timely::dataflow::ProbeHandle, }; #[cfg(feature = "bincode")] fn main() { + + type Container = FlatStack<<(String, i64) as Containerized>::Region>; + // initializes and runs a timely dataflow. timely::execute_from_args(std::env::args(), |worker| { - let mut input = - ::Region>>>::new(); + let mut input = >::new(); let mut probe = ProbeHandle::new(); // create a new input, exchange data, and inspect its output worker.dataflow::(|scope| { input .to_stream(scope) - .unary::::Region>, _, _, _>( + .unary::( Pipeline, "Split", |_cap, _info| { @@ -38,7 +40,7 @@ fn main() { } }, ) - .unary_frontier::::Region>, _, _, _>( + .unary_frontier::( ExchangeCore::new(|(s, _): &(&str, _)| s.len() as u64), "WordCount", |_capability, _info| { @@ -58,10 +60,16 @@ fn main() { let mut session = output.session(key); for batch in val.drain(..) { for (word, diff) in batch.iter() { - let entry = - counts.entry(word.to_string()).or_insert(0i64); - *entry += diff; - session.give((word, *entry)); + let total = + if let Some(count) = counts.get_mut(word) { + *count += diff; + *count + } + else { + counts.insert(word.to_string(), diff); + diff + }; + session.give((word, total)); } } }