From 74f39000beaa08eb4166f94ac1968ad18366e769 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 6 Jul 2023 10:53:13 +0200 Subject: [PATCH] Introduce an empty batch on advancing logical compaction to [] Signed-off-by: Moritz Hoffmann --- src/trace/implementations/spine_fueled.rs | 93 +++++++++++++++++++++-- 1 file changed, 87 insertions(+), 6 deletions(-) diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index 411856640..0bd7ab6ce 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -212,14 +212,18 @@ where } #[inline] fn set_logical_compaction(&mut self, frontier: AntichainRef) { - self.logical_frontier.clear(); - self.logical_frontier.extend(frontier.iter().cloned()); - // If the logical frontier is empty, we are not obliged to be able to represent any time, // since no time is in advance of `[]`. In the case, we can drop all contents. - if self.logical_frontier.is_empty() { + // + // We need to call `close` before advancing `logical_frontier` because otherwise we cannot + // insert anymore. + if frontier.is_empty() { self.drop_batches(); + self.close(); } + + self.logical_frontier.clear(); + self.logical_frontier.extend(frontier.iter().cloned()); } #[inline] fn get_logical_compaction(&mut self) -> AntichainRef { self.logical_frontier.borrow() } @@ -353,8 +357,10 @@ where { /// Drops and logs batches. Used in `set_logical_compaction` and drop. fn drop_batches(&mut self) { + let merging = std::mem::take(&mut self.merging); + let pending = std::mem::take(&mut self.pending); if let Some(logger) = &self.logger { - for batch in self.merging.drain(..) { + for batch in merging { match batch { MergeState::Single(Some(batch)) => { logger.log(::logging::DropEvent { @@ -381,7 +387,7 @@ where _ => { }, } } - for batch in self.pending.drain(..) { + for batch in pending { logger.log(::logging::DropEvent { operator: self.operator.global_id, length: batch.len(), @@ -930,3 +936,78 @@ impl MergeVariant { } } } + +#[cfg(test)] +mod test { + use timely::dataflow::operators::Probe; + use timely::PartialOrder; + use timely::progress::Antichain; + + use input::Input; + use operators::arrange::ArrangeByKey; + use trace::{BatchReader, TraceReader}; + + #[test] + fn test_empty_logical_compaction() { + timely::execute_directly(|worker| { + + + let (mut input, mut trace, probe) = worker.dataflow(|scope| { + let (input, collection) = scope.new_collection(); + + let arranged = collection.arrange_by_key(); + + (input, arranged.trace, arranged.stream.probe()) + }); + + let mut upper = Antichain::new(); + + // Insert some data at the minimum time. + input.insert((1, 'a')); + input.insert((2, 'b')); + input.advance_to(1); + input.flush(); + + worker.step_while(|| probe.less_than(&1)); + + // Check that the data is in the trace and that the upper has advanced to 1. + let mut len = 0; + trace.map_batches(|batch| len += batch.len()); + assert_eq!(len, 2); + trace.map_batches(|batch| assert!(PartialOrder::less_equal(&Antichain::from_elem(1), batch.description().upper()))); + trace.read_upper(&mut upper); + assert_eq!(upper, Antichain::from_elem(1)); + + // Permit logical compaction to [] to trigger cleanup. + trace.set_logical_compaction(Antichain::new().borrow()); + input.advance_to(2); + input.flush(); + worker.step_while(|| probe.less_than(&2)); + + let mut len = 0; + trace.map_batches(|batch| len += batch.len()); + assert_eq!(len, 0); + trace.map_batches(|batch| assert!(batch.description().upper().is_empty())); + trace.read_upper(&mut upper); + assert!(upper.is_empty()); + + // Insert more data and check that the trace stays empty. + input.insert((3, 'c')); + input.advance_to(3); + input.flush(); + + worker.step_while(|| probe.less_than(&3)); + + let (mut len, mut count) = (0, 0); + trace.map_batches(|batch| { + len += batch.len(); + count += 1; + }); + assert_eq!(len, 0); + assert_eq!(count, 1); + trace.map_batches(|batch| assert!(batch.description().upper().is_empty())); + trace.read_upper(&mut upper); + assert!(upper.is_empty()); + }); + } +}