Skip to content

Commit

Permalink
Correct flatcontainer.rs glitch I introduced
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Mar 23, 2024
1 parent fc50754 commit 52cf603
Showing 1 changed file with 17 additions and 9 deletions.
26 changes: 17 additions & 9 deletions timely/examples/flatcontainer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,26 @@ use {
std::collections::HashMap,
timely::container::flatcontainer::{Containerized, FlatStack},
timely::dataflow::channels::pact::{ExchangeCore, Pipeline},
timely::dataflow::operators::core::InputHandle,
timely::dataflow::InputHandleCore,
timely::dataflow::operators::{Inspect, Operator, Probe},
timely::dataflow::ProbeHandle,
};

#[cfg(feature = "bincode")]
fn main() {

type Container = FlatStack<<(String, i64) as Containerized>::Region>;

// initializes and runs a timely dataflow.
timely::execute_from_args(std::env::args(), |worker| {
let mut input =
<InputHandle<_, FlatStack<<(String, i64) as Containerized>::Region>>>::new();
let mut input = <InputHandleCore<_, Container>>::new();
let mut probe = ProbeHandle::new();

// create a new input, exchange data, and inspect its output
worker.dataflow::<usize, _, _>(|scope| {
input
.to_stream(scope)
.unary::<FlatStack<<(String, i64) as Containerized>::Region>, _, _, _>(
.unary::<Container, _, _, _>(
Pipeline,
"Split",
|_cap, _info| {
Expand All @@ -38,7 +40,7 @@ fn main() {
}
},
)
.unary_frontier::<FlatStack<<(String, i64) as Containerized>::Region>, _, _, _>(
.unary_frontier::<Container, _, _, _>(
ExchangeCore::new(|(s, _): &(&str, _)| s.len() as u64),
"WordCount",
|_capability, _info| {
Expand All @@ -58,10 +60,16 @@ fn main() {
let mut session = output.session(key);
for batch in val.drain(..) {
for (word, diff) in batch.iter() {
let entry =
counts.entry(word.to_string()).or_insert(0i64);
*entry += diff;
session.give((word, *entry));
let total =
if let Some(count) = counts.get_mut(word) {
*count += diff;
*count
}
else {
counts.insert(word.to_string(), diff);
diff
};
session.give((word, total));
}
}
}
Expand Down

0 comments on commit 52cf603

Please sign in to comment.