diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index 0bd7ab6ce..e3371c1c6 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -212,18 +212,9 @@ where } #[inline] fn set_logical_compaction(&mut self, frontier: AntichainRef) { - // If the logical frontier is empty, we are not obliged to be able to represent any time, - // since no time is in advance of `[]`. In the case, we can drop all contents. - // - // We need to call `close` before advancing `logical_frontier` because otherwise we cannot - // insert anymore. - if frontier.is_empty() { - self.drop_batches(); - self.close(); - } - self.logical_frontier.clear(); self.logical_frontier.extend(frontier.iter().cloned()); + self.consider_closing(); } #[inline] fn get_logical_compaction(&mut self) -> AntichainRef { self.logical_frontier.borrow() } @@ -233,6 +224,7 @@ where debug_assert!(PartialOrder::less_equal(&self.physical_frontier.borrow(), &frontier), "FAIL\tthrough frontier !<= new frontier {:?} {:?}\n", self.physical_frontier, frontier); self.physical_frontier.clear(); self.physical_frontier.extend(frontier.iter().cloned()); + self.consider_closing(); self.consider_merges(); } #[inline] @@ -303,9 +295,8 @@ where // merging the batch. This means it is a good time to perform amortized work proportional // to the size of batch. fn insert(&mut self, batch: Self::Batch) { - // If the logical frontier is empty, we are not obliged to be able to represent any time, - // since no time is in advance of `[]`. In the case, we can drop the batch. - if self.logical_frontier.is_empty() { + // Once the upper frontier is empty, we don't expect any more data. + if self.upper.is_empty() { return } @@ -315,7 +306,7 @@ where length: batch.len() })); - assert!(batch.lower() != batch.upper()); + assert_ne!(batch.lower(), batch.upper()); assert_eq!(batch.lower(), &self.upper); self.upper.clone_from(batch.upper()); @@ -355,7 +346,7 @@ where B::Time: Lattice+Ord, B::R: Semigroup, { - /// Drops and logs batches. Used in `set_logical_compaction` and drop. + /// Drops and logs batches. Used in `consider_closing` and drop. fn drop_batches(&mut self) { let merging = std::mem::take(&mut self.merging); let pending = std::mem::take(&mut self.pending); @@ -399,7 +390,7 @@ where impl Spine where - B: Batch, + B: Batch+Clone+'static, B::Key: Ord+Clone, B::Val: Ord+Clone, B::Time: Lattice+timely::progress::Timestamp+Ord+Clone+Debug, @@ -779,6 +770,20 @@ where } } } + + /// Consider closing the trace + /// + /// Used by `set_logical_compaction` and `set_physical_compaction`. + /// + /// If both logical and physical compaction are empty, drop all batches and close the trace. + fn consider_closing(&mut self ) { + // If both logical and physical compaction are `[]`, we cannot represent any times + // anymore and are free to discard our contents. + if self.physical_frontier.is_empty() && self.logical_frontier.is_empty() { + self.drop_batches(); + self.close(); + } + } } @@ -948,15 +953,36 @@ mod test { use trace::{BatchReader, TraceReader}; #[test] - fn test_empty_logical_compaction() { + fn test_empty_batch() { timely::execute_directly(|worker| { + let (input, trace, probe) = worker.dataflow::(|scope| { + let (input, collection) = scope.new_collection::<(usize, usize), isize>(); + let arranged = collection.arrange_by_key(); + (input, arranged.trace, arranged.stream.probe()) + }); + + drop(input); + + worker.step_while(|| !probe.done()); + // Check that there is a single empty batch with empty upper + let (mut len, mut count) = (0, 0); + trace.map_batches(|batch| { + len += batch.len(); + count += 1; + assert!(batch.description().upper().is_empty()); + }); + assert_eq!(len, 0); + assert_eq!(count, 1); + }); + } + #[test] + fn test_empty_compaction() { + timely::execute_directly(|worker| { let (mut input, mut trace, probe) = worker.dataflow(|scope| { let (input, collection) = scope.new_collection(); - let arranged = collection.arrange_by_key(); - (input, arranged.trace, arranged.stream.probe()) }); @@ -968,7 +994,7 @@ mod test { input.advance_to(1); input.flush(); - worker.step_while(|| probe.less_than(&1)); + worker.step_while(|| probe.less_than(input.time())); // Check that the data is in the trace and that the upper has advanced to 1. let mut len = 0; @@ -978,36 +1004,46 @@ mod test { trace.read_upper(&mut upper); assert_eq!(upper, Antichain::from_elem(1)); - // Permit logical compaction to [] to trigger cleanup. + // Permit logical compaction to [] trace.set_logical_compaction(Antichain::new().borrow()); input.advance_to(2); input.flush(); - worker.step_while(|| probe.less_than(&2)); + worker.step_while(|| probe.less_than(input.time())); + // We still expect data because physical compaction hasn't been advanced + let mut len = 0; + trace.map_batches(|batch| len += batch.len()); + assert_eq!(len, 2); + + // Permit physical compaction to [] to trigger cleanup. + trace.set_physical_compaction(Antichain::new().borrow()); + input.advance_to(3); + input.flush(); + worker.step_while(|| probe.less_than(input.time())); + + // Assert no data, single empty batch with empty upper frontier. let mut len = 0; trace.map_batches(|batch| len += batch.len()); assert_eq!(len, 0); trace.map_batches(|batch| assert!(batch.description().upper().is_empty())); trace.read_upper(&mut upper); - assert!(upper.is_empty()); + assert_eq!(upper, Antichain::new()); // Insert more data and check that the trace stays empty. input.insert((3, 'c')); - input.advance_to(3); + input.advance_to(4); input.flush(); - worker.step_while(|| probe.less_than(&3)); + worker.step_while(|| probe.less_than(input.time())); let (mut len, mut count) = (0, 0); trace.map_batches(|batch| { len += batch.len(); count += 1; + assert!(batch.description().upper().is_empty()); }); assert_eq!(len, 0); assert_eq!(count, 1); - trace.map_batches(|batch| assert!(batch.description().upper().is_empty())); - trace.read_upper(&mut upper); - assert!(upper.is_empty()); }); } }