diff --git a/examples/compaction.rs b/examples/compaction.rs index 50630b515..13d1f03c8 100644 --- a/examples/compaction.rs +++ b/examples/compaction.rs @@ -17,7 +17,7 @@ fn main() { args.next(); let count: u64 = args.next().unwrap().parse().unwrap(); - let batch: u32 = args.next().unwrap().parse().unwrap(); + let batch: u64 = args.next().unwrap().parse().unwrap(); let mut timely_config = timely::Config::from_args(args.skip(2)).unwrap(); let config = Config::default().idle_merge_effort(Some(1000)); @@ -30,7 +30,7 @@ fn main() { let index = worker.index(); // create a degree counting differential dataflow - let (mut input, probe, mut trace) = worker.dataflow::(|scope| { + let (mut input, probe, mut trace) = worker.dataflow::(|scope| { // create edge input, count a few ways. let (input, edges) = scope.new_collection::<_,i32>(); @@ -95,8 +95,21 @@ fn main() { enter("Loading"); println!("{i}"); - for x in 0 .. i / 2 { - input.update((x, x), -1); + { + let mut next = batch; + for x in 0 .. i / 2 { + input.advance_to(i + x); + input.update((x, x), -1); + if x > next { + let timer = ::std::time::Instant::now(); + input.flush(); + while probe.less_than(input.time()) { + worker.step(); + } + println!("round {} finished after {:?}, trace len {}", next, timer.elapsed(), trace_len(&mut trace)); + next += batch; + } + } } input.advance_to(input.time() + 1); let timer = ::std::time::Instant::now();