Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spine drop batches once logical and physical compaction frontiers are empty #399

Closed
wants to merge 3 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 133 additions & 5 deletions src/trace/implementations/spine_fueled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ where
fn set_logical_compaction(&mut self, frontier: AntichainRef<B::Time>) {
self.logical_frontier.clear();
self.logical_frontier.extend(frontier.iter().cloned());
self.consider_closing();
}
#[inline]
fn get_logical_compaction(&mut self) -> AntichainRef<B::Time> { self.logical_frontier.borrow() }
Expand All @@ -223,6 +224,7 @@ where
debug_assert!(PartialOrder::less_equal(&self.physical_frontier.borrow(), &frontier), "FAIL\tthrough frontier !<= new frontier {:?} {:?}\n", self.physical_frontier, frontier);
self.physical_frontier.clear();
self.physical_frontier.extend(frontier.iter().cloned());
self.consider_closing();
self.consider_merges();
}
#[inline]
Expand Down Expand Up @@ -293,14 +295,18 @@ where
// merging the batch. This means it is a good time to perform amortized work proportional
// to the size of batch.
fn insert(&mut self, batch: Self::Batch) {
// Once the upper frontier is empty, we don't expect any more data.
if self.upper.is_empty() {
return
}

// Log the introduction of a batch.
self.logger.as_ref().map(|l| l.log(::logging::BatchEvent {
operator: self.operator.global_id,
length: batch.len()
}));

assert!(batch.lower() != batch.upper());
assert_ne!(batch.lower(), batch.upper());
assert_eq!(batch.lower(), &self.upper);

self.upper.clone_from(batch.upper());
Expand Down Expand Up @@ -340,10 +346,12 @@ where
B::Time: Lattice+Ord,
B::R: Semigroup,
{
/// Drops and logs batches. Used in `set_logical_compaction` and drop.
/// Drops and logs batches. Used in `consider_closing` and drop.
fn drop_batches(&mut self) {
let merging = std::mem::take(&mut self.merging);
let pending = std::mem::take(&mut self.pending);
if let Some(logger) = &self.logger {
for batch in self.merging.drain(..) {
for batch in merging {
match batch {
MergeState::Single(Some(batch)) => {
logger.log(::logging::DropEvent {
Expand All @@ -370,7 +378,7 @@ where
_ => { },
}
}
for batch in self.pending.drain(..) {
for batch in pending {
logger.log(::logging::DropEvent {
operator: self.operator.global_id,
length: batch.len(),
Expand All @@ -382,7 +390,7 @@ where

impl<B> Spine<B>
where
B: Batch,
B: Batch+Clone+'static,
B::Key: Ord+Clone,
B::Val: Ord+Clone,
B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Debug,
Expand Down Expand Up @@ -762,6 +770,20 @@ where
}
}
}

/// Consider closing the trace
///
/// Used by `set_logical_compaction` and `set_physical_compaction`.
///
/// If both logical and physical compaction are empty, drop all batches and close the trace.
fn consider_closing(&mut self ) {
// If both logical and physical compaction are `[]`, we cannot represent any times
// anymore and are free to discard our contents.
if self.physical_frontier.is_empty() && self.logical_frontier.is_empty() {
self.drop_batches();
self.close();
}
}
}


Expand Down Expand Up @@ -919,3 +941,109 @@ impl<B: Batch> MergeVariant<B> {
}
}
}

#[cfg(test)]
mod test {
use timely::dataflow::operators::Probe;
use timely::PartialOrder;
use timely::progress::Antichain;

use input::Input;
use operators::arrange::ArrangeByKey;
use trace::{BatchReader, TraceReader};

#[test]
fn test_empty_batch() {
timely::execute_directly(|worker| {
let (input, trace, probe) = worker.dataflow::<isize, _, _>(|scope| {
let (input, collection) = scope.new_collection::<(usize, usize), isize>();
let arranged = collection.arrange_by_key();
(input, arranged.trace, arranged.stream.probe())
});

drop(input);

worker.step_while(|| !probe.done());

// Check that there is a single empty batch with empty upper
let (mut len, mut count) = (0, 0);
trace.map_batches(|batch| {
len += batch.len();
count += 1;
assert!(batch.description().upper().is_empty());
});
assert_eq!(len, 0);
assert_eq!(count, 1);
});
}

#[test]
fn test_empty_compaction() {
timely::execute_directly(|worker| {
let (mut input, mut trace, probe) = worker.dataflow(|scope| {
let (input, collection) = scope.new_collection();
let arranged = collection.arrange_by_key();
(input, arranged.trace, arranged.stream.probe())
});

let mut upper = Antichain::new();

// Insert some data at the minimum time.
input.insert((1, 'a'));
input.insert((2, 'b'));
input.advance_to(1);
input.flush();

worker.step_while(|| probe.less_than(input.time()));

// Check that the data is in the trace and that the upper has advanced to 1.
let mut len = 0;
trace.map_batches(|batch| len += batch.len());
assert_eq!(len, 2);
trace.map_batches(|batch| assert!(PartialOrder::less_equal(&Antichain::from_elem(1), batch.description().upper())));
trace.read_upper(&mut upper);
assert_eq!(upper, Antichain::from_elem(1));

// Permit logical compaction to []
trace.set_logical_compaction(Antichain::new().borrow());
input.advance_to(2);
input.flush();
worker.step_while(|| probe.less_than(input.time()));

// We still expect data because physical compaction hasn't been advanced
let mut len = 0;
trace.map_batches(|batch| len += batch.len());
assert_eq!(len, 2);

// Permit physical compaction to [] to trigger cleanup.
trace.set_physical_compaction(Antichain::new().borrow());
input.advance_to(3);
input.flush();
worker.step_while(|| probe.less_than(input.time()));

// Assert no data, single empty batch with empty upper frontier.
let mut len = 0;
trace.map_batches(|batch| len += batch.len());
assert_eq!(len, 0);
trace.map_batches(|batch| assert!(batch.description().upper().is_empty()));
trace.read_upper(&mut upper);
assert_eq!(upper, Antichain::new());

// Insert more data and check that the trace stays empty.
input.insert((3, 'c'));
input.advance_to(4);
input.flush();

worker.step_while(|| probe.less_than(input.time()));

let (mut len, mut count) = (0, 0);
trace.map_batches(|batch| {
len += batch.len();
count += 1;
assert!(batch.description().upper().is_empty());
});
assert_eq!(len, 0);
assert_eq!(count, 1);
});
}
}