Skip to content

Commit

Permalink
Introduce an empty batch on advancing logical compaction to []
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Jul 19, 2023
1 parent d85939e commit 74f3900
Showing 1 changed file with 87 additions and 6 deletions.
93 changes: 87 additions & 6 deletions src/trace/implementations/spine_fueled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,14 +212,18 @@ where
}
#[inline]
fn set_logical_compaction(&mut self, frontier: AntichainRef<B::Time>) {
self.logical_frontier.clear();
self.logical_frontier.extend(frontier.iter().cloned());

// If the logical frontier is empty, we are not obliged to be able to represent any time,
// since no time is in advance of `[]`. In the case, we can drop all contents.
if self.logical_frontier.is_empty() {
//
// We need to call `close` before advancing `logical_frontier` because otherwise we cannot
// insert anymore.
if frontier.is_empty() {
self.drop_batches();
self.close();
}

self.logical_frontier.clear();
self.logical_frontier.extend(frontier.iter().cloned());
}
#[inline]
fn get_logical_compaction(&mut self) -> AntichainRef<B::Time> { self.logical_frontier.borrow() }
Expand Down Expand Up @@ -353,8 +357,10 @@ where
{
/// Drops and logs batches. Used in `set_logical_compaction` and drop.
fn drop_batches(&mut self) {
let merging = std::mem::take(&mut self.merging);
let pending = std::mem::take(&mut self.pending);
if let Some(logger) = &self.logger {
for batch in self.merging.drain(..) {
for batch in merging {
match batch {
MergeState::Single(Some(batch)) => {
logger.log(::logging::DropEvent {
Expand All @@ -381,7 +387,7 @@ where
_ => { },
}
}
for batch in self.pending.drain(..) {
for batch in pending {
logger.log(::logging::DropEvent {
operator: self.operator.global_id,
length: batch.len(),
Expand Down Expand Up @@ -930,3 +936,78 @@ impl<B: Batch> MergeVariant<B> {
}
}
}

#[cfg(test)]
mod test {
use timely::dataflow::operators::Probe;
use timely::PartialOrder;
use timely::progress::Antichain;

use input::Input;
use operators::arrange::ArrangeByKey;
use trace::{BatchReader, TraceReader};

#[test]
fn test_empty_logical_compaction() {
timely::execute_directly(|worker| {


let (mut input, mut trace, probe) = worker.dataflow(|scope| {
let (input, collection) = scope.new_collection();

let arranged = collection.arrange_by_key();

(input, arranged.trace, arranged.stream.probe())
});

let mut upper = Antichain::new();

// Insert some data at the minimum time.
input.insert((1, 'a'));
input.insert((2, 'b'));
input.advance_to(1);
input.flush();

worker.step_while(|| probe.less_than(&1));

// Check that the data is in the trace and that the upper has advanced to 1.
let mut len = 0;
trace.map_batches(|batch| len += batch.len());
assert_eq!(len, 2);
trace.map_batches(|batch| assert!(PartialOrder::less_equal(&Antichain::from_elem(1), batch.description().upper())));
trace.read_upper(&mut upper);
assert_eq!(upper, Antichain::from_elem(1));

// Permit logical compaction to [] to trigger cleanup.
trace.set_logical_compaction(Antichain::new().borrow());
input.advance_to(2);
input.flush();
worker.step_while(|| probe.less_than(&2));

let mut len = 0;
trace.map_batches(|batch| len += batch.len());
assert_eq!(len, 0);
trace.map_batches(|batch| assert!(batch.description().upper().is_empty()));
trace.read_upper(&mut upper);
assert!(upper.is_empty());

// Insert more data and check that the trace stays empty.
input.insert((3, 'c'));
input.advance_to(3);
input.flush();

worker.step_while(|| probe.less_than(&3));

let (mut len, mut count) = (0, 0);
trace.map_batches(|batch| {
len += batch.len();
count += 1;
});
assert_eq!(len, 0);
assert_eq!(count, 1);
trace.map_batches(|batch| assert!(batch.description().upper().is_empty()));
trace.read_upper(&mut upper);
assert!(upper.is_empty());
});
}
}

0 comments on commit 74f3900

Please sign in to comment.