diff --git a/src/trace/implementations/huffman_container.rs b/src/trace/implementations/huffman_container.rs index a58d6ba2c..288ba8d4c 100644 --- a/src/trace/implementations/huffman_container.rs +++ b/src/trace/implementations/huffman_container.rs @@ -49,13 +49,8 @@ impl PushInto> for HuffmanContainer { } } -impl BatchContainer for HuffmanContainer { - type Owned = Vec; - type ReadItem<'a> = Wrapped<'a, B>; - - fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item } - - fn copy(&mut self, item: Self::ReadItem<'_>) { +impl<'a, B: Ord + Clone + 'static> PushInto> for HuffmanContainer { + fn push_into(&mut self, item: Wrapped<'a, B>) { match item.decode() { Ok(decoded) => { for x in decoded { *self.stats.entry(x.clone()).or_insert(0) += 1; } @@ -74,21 +69,24 @@ impl BatchContainer for HuffmanContainer { raw.extend(decoded.cloned()); self.offsets.push(raw.len()); } - (Err(symbols), Ok((huffman, bytes))) => { + (Err(symbols), Ok((huffman, bytes))) => { bytes.extend(huffman.encode(symbols.iter())); self.offsets.push(bytes.len()); } - (Err(symbols), Err(raw)) => { + (Err(symbols), Err(raw)) => { raw.extend(symbols.iter().cloned()); self.offsets.push(raw.len()); } } } - fn copy_range(&mut self, other: &Self, start: usize, end: usize) { - for index in start .. end { - self.copy(other.index(index)); - } - } +} + +impl BatchContainer for HuffmanContainer { + type Owned = Vec; + type ReadItem<'a> = Wrapped<'a, B>; + + fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item } + fn with_capacity(size: usize) -> Self { let mut offsets = OffsetList::with_capacity(size + 1); offsets.push(0); diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index feed2ff81..e8d068387 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -300,16 +300,6 @@ impl BatchContainer for OffsetList { type Owned = usize; type ReadItem<'a> = usize; - fn copy(&mut self, item: Self::ReadItem<'_>) { - self.push(item); - } - - fn copy_range(&mut self, other: &Self, start: usize, end: usize) { - for offset in start..end { - self.push(other.index(offset)); - } - } - fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item } fn with_capacity(size: usize) -> Self { @@ -510,7 +500,7 @@ pub mod containers { use crate::trace::IntoOwned; /// A general-purpose container resembling `Vec`. - pub trait BatchContainer: 'static { + pub trait BatchContainer: for<'a> PushInto> + 'static { /// An owned instance of `Self::ReadItem<'_>`. type Owned; @@ -521,14 +511,6 @@ pub mod containers { fn push(&mut self, item: D) where Self: PushInto { self.push_into(item); } - /// Inserts a borrowed item. - fn copy(&mut self, item: Self::ReadItem<'_>); - /// Extends from a range of items in another`Self`. - fn copy_range(&mut self, other: &Self, start: usize, end: usize) { - for index in start .. end { - self.copy(other.index(index)); - } - } /// Creates a new container with sufficient capacity. fn with_capacity(size: usize) -> Self; /// Creates a new container with sufficient capacity. @@ -606,12 +588,6 @@ pub mod containers { fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item } - fn copy(&mut self, item: &T) { - self.push(item.clone()); - } - fn copy_range(&mut self, other: &Self, start: usize, end: usize) { - self.extend_from_slice(&other[start .. end]); - } fn with_capacity(size: usize) -> Self { Vec::with_capacity(size) } @@ -634,16 +610,6 @@ pub mod containers { fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item } - fn copy(&mut self, item: &T) { - self.copy(item); - } - fn copy_range(&mut self, other: &Self, start: usize, end: usize) { - let slice = &other[start .. end]; - self.reserve_items(slice.iter()); - for item in slice.iter() { - self.copy(item); - } - } fn with_capacity(size: usize) -> Self { Self::with_capacity(size) } @@ -672,10 +638,6 @@ pub mod containers { type Owned = R::Owned; type ReadItem<'a> = R::ReadItem<'a>; - fn copy(&mut self, item: Self::ReadItem<'_>) { - self.copy(item); - } - fn with_capacity(size: usize) -> Self { Self::with_capacity(size) } @@ -711,13 +673,16 @@ pub mod containers { impl PushInto<&[B]> for SliceContainer { fn push_into(&mut self, item: &[B]) { - self.copy(item); + for x in item.iter() { + self.inner.push_into(x); + } + self.offsets.push(self.inner.len()); } } impl PushInto<&Vec> for SliceContainer { fn push_into(&mut self, item: &Vec) { - self.copy(item); + self.push_into(&item[..]); } } @@ -739,17 +704,6 @@ pub mod containers { fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item } - fn copy(&mut self, item: Self::ReadItem<'_>) { - for x in item.iter() { - self.inner.copy(x); - } - self.offsets.push(self.inner.len()); - } - fn copy_range(&mut self, other: &Self, start: usize, end: usize) { - for index in start .. end { - self.copy(other.index(index)); - } - } fn with_capacity(size: usize) -> Self { let mut offsets = Vec::with_capacity(size + 1); offsets.push(0); diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index 7009c57f3..e475a0b3d 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -236,9 +236,9 @@ mod val_batch { // Mark explicit types because type inference fails to resolve it. let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs; - keys_offs.copy(0); + keys_offs.push(0); let vals_offs: &mut L::OffsetContainer = &mut storage.vals_offs; - vals_offs.copy(0); + vals_offs.push(0); OrdValMerger { key_cursor1: 0, @@ -302,16 +302,16 @@ mod val_batch { while lower < upper { self.stash_updates_for_val(source, lower); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.copy(off); - self.result.vals.copy(source.vals.index(lower)); + self.result.vals_offs.push(off); + self.result.vals.push(source.vals.index(lower)); } lower += 1; } // If we have pushed any values, copy the key as well. if self.result.vals.len() > init_vals { - self.result.keys.copy(source.keys.index(cursor)); - self.result.keys_offs.copy(self.result.vals.len()); + self.result.keys.push(source.keys.index(cursor)); + self.result.keys_offs.push(self.result.vals.len()); } } /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors. @@ -330,8 +330,8 @@ mod val_batch { let (lower1, upper1) = source1.values_for_key(self.key_cursor1); let (lower2, upper2) = source2.values_for_key(self.key_cursor2); if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) { - self.result.keys.copy(source1.keys.index(self.key_cursor1)); - self.result.keys_offs.copy(off); + self.result.keys.push(source1.keys.index(self.key_cursor1)); + self.result.keys_offs.push(off); } // Increment cursors in either case; the keys are merged. self.key_cursor1 += 1; @@ -364,8 +364,8 @@ mod val_batch { // Extend stash by updates, with logical compaction applied. self.stash_updates_for_val(source1, lower1); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.copy(off); - self.result.vals.copy(source1.vals.index(lower1)); + self.result.vals_offs.push(off); + self.result.vals.push(source1.vals.index(lower1)); } lower1 += 1; }, @@ -373,8 +373,8 @@ mod val_batch { self.stash_updates_for_val(source1, lower1); self.stash_updates_for_val(source2, lower2); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.copy(off); - self.result.vals.copy(source1.vals.index(lower1)); + self.result.vals_offs.push(off); + self.result.vals.push(source1.vals.index(lower1)); } lower1 += 1; lower2 += 1; @@ -383,8 +383,8 @@ mod val_batch { // Extend stash by updates, with logical compaction applied. self.stash_updates_for_val(source2, lower2); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.copy(off); - self.result.vals.copy(source2.vals.index(lower2)); + self.result.vals_offs.push(off); + self.result.vals.push(source2.vals.index(lower2)); } lower2 += 1; }, @@ -394,16 +394,16 @@ mod val_batch { while lower1 < upper1 { self.stash_updates_for_val(source1, lower1); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.copy(off); - self.result.vals.copy(source1.vals.index(lower1)); + self.result.vals_offs.push(off); + self.result.vals.push(source1.vals.index(lower1)); } lower1 += 1; } while lower2 < upper2 { self.stash_updates_for_val(source2, lower2); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.copy(off); - self.result.vals.copy(source2.vals.index(lower2)); + self.result.vals_offs.push(off); + self.result.vals.push(source2.vals.index(lower2)); } lower2 += 1; } @@ -616,16 +616,16 @@ mod val_batch { self.push_update(time, diff); } else { // New value; complete representation of prior value. - self.result.vals_offs.copy(self.result.times.len()); + self.result.vals_offs.push(self.result.times.len()); if self.singleton.take().is_some() { self.singletons += 1; } self.push_update(time, diff); self.result.vals.push(val); } } else { // New key; complete representation of prior key. - self.result.vals_offs.copy(self.result.times.len()); + self.result.vals_offs.push(self.result.times.len()); if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.copy(self.result.vals.len()); + self.result.keys_offs.push(self.result.vals.len()); self.push_update(time, diff); self.result.vals.push(val); self.result.keys.push(key); @@ -636,10 +636,10 @@ mod val_batch { #[inline(never)] fn done(mut self, lower: Antichain, upper: Antichain, since: Antichain) -> OrdValBatch { // Record the final offsets - self.result.vals_offs.copy(self.result.times.len()); + self.result.vals_offs.push(self.result.times.len()); // Remove any pending singleton, and if it was set increment our count. if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.copy(self.result.vals.len()); + self.result.keys_offs.push(self.result.vals.len()); OrdValBatch { updates: self.result.times.len() + self.singletons, storage: self.result, @@ -795,7 +795,7 @@ mod key_batch { }; let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs; - keys_offs.copy(0); + keys_offs.push(0); OrdKeyMerger { key_cursor1: 0, @@ -855,8 +855,8 @@ mod key_batch { fn copy_key(&mut self, source: &OrdKeyStorage, cursor: usize) { self.stash_updates_for_key(source, cursor); if let Some(off) = self.consolidate_updates() { - self.result.keys_offs.copy(off); - self.result.keys.copy(source.keys.index(cursor)); + self.result.keys_offs.push(off); + self.result.keys.push(source.keys.index(cursor)); } } /// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors. @@ -875,8 +875,8 @@ mod key_batch { self.stash_updates_for_key(source1, self.key_cursor1); self.stash_updates_for_key(source2, self.key_cursor2); if let Some(off) = self.consolidate_updates() { - self.result.keys_offs.copy(off); - self.result.keys.copy(source1.keys.index(self.key_cursor1)); + self.result.keys_offs.push(off); + self.result.keys.push(source1.keys.index(self.key_cursor1)); } // Increment cursors in either case; the keys are merged. self.key_cursor1 += 1; @@ -1078,7 +1078,7 @@ mod key_batch { self.push_update(time, diff); } else { // New key; complete representation of prior key. - self.result.keys_offs.copy(self.result.times.len()); + self.result.keys_offs.push(self.result.times.len()); // Remove any pending singleton, and if it was set increment our count. if self.singleton.take().is_some() { self.singletons += 1; } self.push_update(time, diff); @@ -1090,7 +1090,7 @@ mod key_batch { #[inline(never)] fn done(mut self, lower: Antichain, upper: Antichain, since: Antichain) -> OrdKeyBatch { // Record the final offsets - self.result.keys_offs.copy(self.result.times.len()); + self.result.keys_offs.push(self.result.times.len()); // Remove any pending singleton, and if it was set increment our count. if self.singleton.take().is_some() { self.singletons += 1; } OrdKeyBatch { diff --git a/src/trace/implementations/rhh.rs b/src/trace/implementations/rhh.rs index a2fb6241a..1d87d55d2 100644 --- a/src/trace/implementations/rhh.rs +++ b/src/trace/implementations/rhh.rs @@ -191,15 +191,15 @@ mod val_batch { while self.keys.len() < desired { // We insert a default (dummy) key and repeat the offset to indicate this. let current_offset = self.keys_offs.index(self.keys.len()); - self.keys.push(Default::default()); - self.keys_offs.copy(current_offset); + self.keys.push(<::Key as Default>::default()); + self.keys_offs.push(current_offset); } // Now we insert the key. Even if it is no longer the desired location because of contention. // If an offset has been supplied we insert it, and otherwise leave it for future determination. - self.keys.copy(key); + self.keys.push(key); if let Some(offset) = offset { - self.keys_offs.copy(offset); + self.keys_offs.push(offset); } self.key_count += 1; } @@ -368,9 +368,9 @@ mod val_batch { // Mark explicit types because type inference fails to resolve it. let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs; - keys_offs.copy(0); + keys_offs.push(0); let vals_offs: &mut L::OffsetContainer = &mut storage.vals_offs; - vals_offs.copy(0); + vals_offs.push(0); RhhValMerger { key_cursor1: 0, @@ -445,8 +445,8 @@ mod val_batch { while lower < upper { self.stash_updates_for_val(source, lower); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.copy(off); - self.result.vals.copy(source.vals.index(lower)); + self.result.vals_offs.push(off); + self.result.vals.push(source.vals.index(lower)); } lower += 1; } @@ -506,8 +506,8 @@ mod val_batch { // Extend stash by updates, with logical compaction applied. self.stash_updates_for_val(source1, lower1); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.copy(off); - self.result.vals.copy(source1.vals.index(lower1)); + self.result.vals_offs.push(off); + self.result.vals.push(source1.vals.index(lower1)); } lower1 += 1; }, @@ -515,8 +515,8 @@ mod val_batch { self.stash_updates_for_val(source1, lower1); self.stash_updates_for_val(source2, lower2); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.copy(off); - self.result.vals.copy(source1.vals.index(lower1)); + self.result.vals_offs.push(off); + self.result.vals.push(source1.vals.index(lower1)); } lower1 += 1; lower2 += 1; @@ -525,8 +525,8 @@ mod val_batch { // Extend stash by updates, with logical compaction applied. self.stash_updates_for_val(source2, lower2); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.copy(off); - self.result.vals.copy(source2.vals.index(lower2)); + self.result.vals_offs.push(off); + self.result.vals.push(source2.vals.index(lower2)); } lower2 += 1; }, @@ -536,16 +536,16 @@ mod val_batch { while lower1 < upper1 { self.stash_updates_for_val(source1, lower1); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.copy(off); - self.result.vals.copy(source1.vals.index(lower1)); + self.result.vals_offs.push(off); + self.result.vals.push(source1.vals.index(lower1)); } lower1 += 1; } while lower2 < upper2 { self.stash_updates_for_val(source2, lower2); if let Some(off) = self.consolidate_updates() { - self.result.vals_offs.copy(off); - self.result.vals.copy(source2.vals.index(lower2)); + self.result.vals_offs.push(off); + self.result.vals.push(source2.vals.index(lower2)); } lower2 += 1; } @@ -807,16 +807,16 @@ mod val_batch { self.push_update(time, diff); } else { // New value; complete representation of prior value. - self.result.vals_offs.copy(self.result.times.len()); + self.result.vals_offs.push(self.result.times.len()); if self.singleton.take().is_some() { self.singletons += 1; } self.push_update(time, diff); self.result.vals.push(val); } } else { // New key; complete representation of prior key. - self.result.vals_offs.copy(self.result.times.len()); + self.result.vals_offs.push(self.result.times.len()); if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.copy(self.result.vals.len()); + self.result.keys_offs.push(self.result.vals.len()); self.push_update(time, diff); self.result.vals.push(val); // Insert the key, but with no specified offset. @@ -828,10 +828,10 @@ mod val_batch { #[inline(never)] fn done(mut self, lower: Antichain, upper: Antichain, since: Antichain) -> RhhValBatch { // Record the final offsets - self.result.vals_offs.copy(self.result.times.len()); + self.result.vals_offs.push(self.result.times.len()); // Remove any pending singleton, and if it was set increment our count. if self.singleton.take().is_some() { self.singletons += 1; } - self.result.keys_offs.copy(self.result.vals.len()); + self.result.keys_offs.push(self.result.vals.len()); RhhValBatch { updates: self.result.times.len() + self.singletons, storage: self.result,