Skip to content

Commit

Permalink
chunk retractions
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Aug 31, 2023
1 parent 07a7f06 commit da1197c
Showing 1 changed file with 17 additions and 4 deletions.
21 changes: 17 additions & 4 deletions examples/compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ fn main() {
args.next();

let count: u64 = args.next().unwrap().parse().unwrap();
let batch: u32 = args.next().unwrap().parse().unwrap();
let batch: u64 = args.next().unwrap().parse().unwrap();

let mut timely_config = timely::Config::from_args(args.skip(2)).unwrap();
let config = Config::default().idle_merge_effort(Some(1000));
Expand All @@ -30,7 +30,7 @@ fn main() {
let index = worker.index();

// create a degree counting differential dataflow
let (mut input, probe, mut trace) = worker.dataflow::<u32,_,_>(|scope| {
let (mut input, probe, mut trace) = worker.dataflow::<u64,_,_>(|scope| {

// create edge input, count a few ways.
let (input, edges) = scope.new_collection::<_,i32>();
Expand Down Expand Up @@ -95,8 +95,21 @@ fn main() {
enter("Loading");

println!("{i}");
for x in 0 .. i / 2 {
input.update((x, x), -1);
{
let mut next = batch;
for x in 0 .. i / 2 {
input.advance_to(i + x);
input.update((x, x), -1);
if x > next {
let timer = ::std::time::Instant::now();
input.flush();
while probe.less_than(input.time()) {
worker.step();
}
println!("round {} finished after {:?}, trace len {}", next, timer.elapsed(), trace_len(&mut trace));
next += batch;
}
}
}
input.advance_to(input.time() + 1);
let timer = ::std::time::Instant::now();
Expand Down

0 comments on commit da1197c

Please sign in to comment.