diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index f3b61ec94..e3371c1c6 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -214,6 +214,7 @@ where fn set_logical_compaction(&mut self, frontier: AntichainRef) { self.logical_frontier.clear(); self.logical_frontier.extend(frontier.iter().cloned()); + self.consider_closing(); } #[inline] fn get_logical_compaction(&mut self) -> AntichainRef { self.logical_frontier.borrow() } @@ -223,6 +224,7 @@ where debug_assert!(PartialOrder::less_equal(&self.physical_frontier.borrow(), &frontier), "FAIL\tthrough frontier !<= new frontier {:?} {:?}\n", self.physical_frontier, frontier); self.physical_frontier.clear(); self.physical_frontier.extend(frontier.iter().cloned()); + self.consider_closing(); self.consider_merges(); } #[inline] @@ -293,6 +295,10 @@ where // merging the batch. This means it is a good time to perform amortized work proportional // to the size of batch. fn insert(&mut self, batch: Self::Batch) { + // Once the upper frontier is empty, we don't expect any more data. + if self.upper.is_empty() { + return + } // Log the introduction of a batch. self.logger.as_ref().map(|l| l.log(::logging::BatchEvent { @@ -300,7 +306,7 @@ where length: batch.len() })); - assert!(batch.lower() != batch.upper()); + assert_ne!(batch.lower(), batch.upper()); assert_eq!(batch.lower(), &self.upper); self.upper.clone_from(batch.upper()); @@ -340,10 +346,12 @@ where B::Time: Lattice+Ord, B::R: Semigroup, { - /// Drops and logs batches. Used in `set_logical_compaction` and drop. + /// Drops and logs batches. Used in `consider_closing` and drop. fn drop_batches(&mut self) { + let merging = std::mem::take(&mut self.merging); + let pending = std::mem::take(&mut self.pending); if let Some(logger) = &self.logger { - for batch in self.merging.drain(..) { + for batch in merging { match batch { MergeState::Single(Some(batch)) => { logger.log(::logging::DropEvent { @@ -370,7 +378,7 @@ where _ => { }, } } - for batch in self.pending.drain(..) { + for batch in pending { logger.log(::logging::DropEvent { operator: self.operator.global_id, length: batch.len(), @@ -382,7 +390,7 @@ where impl Spine where - B: Batch, + B: Batch+Clone+'static, B::Key: Ord+Clone, B::Val: Ord+Clone, B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Debug, @@ -762,6 +770,20 @@ where } } } + + /// Consider closing the trace + /// + /// Used by `set_logical_compaction` and `set_physical_compaction`. + /// + /// If both logical and physical compaction are empty, drop all batches and close the trace. + fn consider_closing(&mut self ) { + // If both logical and physical compaction are `[]`, we cannot represent any times + // anymore and are free to discard our contents. + if self.physical_frontier.is_empty() && self.logical_frontier.is_empty() { + self.drop_batches(); + self.close(); + } + } } @@ -919,3 +941,109 @@ impl MergeVariant { } } } + +#[cfg(test)] +mod test { + use timely::dataflow::operators::Probe; + use timely::PartialOrder; + use timely::progress::Antichain; + + use input::Input; + use operators::arrange::ArrangeByKey; + use trace::{BatchReader, TraceReader}; + + #[test] + fn test_empty_batch() { + timely::execute_directly(|worker| { + let (input, trace, probe) = worker.dataflow::(|scope| { + let (input, collection) = scope.new_collection::<(usize, usize), isize>(); + let arranged = collection.arrange_by_key(); + (input, arranged.trace, arranged.stream.probe()) + }); + + drop(input); + + worker.step_while(|| !probe.done()); + + // Check that there is a single empty batch with empty upper + let (mut len, mut count) = (0, 0); + trace.map_batches(|batch| { + len += batch.len(); + count += 1; + assert!(batch.description().upper().is_empty()); + }); + assert_eq!(len, 0); + assert_eq!(count, 1); + }); + } + + #[test] + fn test_empty_compaction() { + timely::execute_directly(|worker| { + let (mut input, mut trace, probe) = worker.dataflow(|scope| { + let (input, collection) = scope.new_collection(); + let arranged = collection.arrange_by_key(); + (input, arranged.trace, arranged.stream.probe()) + }); + + let mut upper = Antichain::new(); + + // Insert some data at the minimum time. + input.insert((1, 'a')); + input.insert((2, 'b')); + input.advance_to(1); + input.flush(); + + worker.step_while(|| probe.less_than(input.time())); + + // Check that the data is in the trace and that the upper has advanced to 1. + let mut len = 0; + trace.map_batches(|batch| len += batch.len()); + assert_eq!(len, 2); + trace.map_batches(|batch| assert!(PartialOrder::less_equal(&Antichain::from_elem(1), batch.description().upper()))); + trace.read_upper(&mut upper); + assert_eq!(upper, Antichain::from_elem(1)); + + // Permit logical compaction to [] + trace.set_logical_compaction(Antichain::new().borrow()); + input.advance_to(2); + input.flush(); + worker.step_while(|| probe.less_than(input.time())); + + // We still expect data because physical compaction hasn't been advanced + let mut len = 0; + trace.map_batches(|batch| len += batch.len()); + assert_eq!(len, 2); + + // Permit physical compaction to [] to trigger cleanup. + trace.set_physical_compaction(Antichain::new().borrow()); + input.advance_to(3); + input.flush(); + worker.step_while(|| probe.less_than(input.time())); + + // Assert no data, single empty batch with empty upper frontier. + let mut len = 0; + trace.map_batches(|batch| len += batch.len()); + assert_eq!(len, 0); + trace.map_batches(|batch| assert!(batch.description().upper().is_empty())); + trace.read_upper(&mut upper); + assert_eq!(upper, Antichain::new()); + + // Insert more data and check that the trace stays empty. + input.insert((3, 'c')); + input.advance_to(4); + input.flush(); + + worker.step_while(|| probe.less_than(input.time())); + + let (mut len, mut count) = (0, 0); + trace.map_batches(|batch| { + len += batch.len(); + count += 1; + assert!(batch.description().upper().is_empty()); + }); + assert_eq!(len, 0); + assert_eq!(count, 1); + }); + } +}