Skip to content

Commit

Permalink
Remove BatchContainer::{copy, copy_range}. (#508)
Browse files Browse the repository at this point in the history
* Add PushInto<Self::ReadItem> constraint

* Remove BatchContainer::{copy, copy_range}
  • Loading branch information
frankmcsherry authored May 31, 2024
1 parent f6076ad commit be698bc
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 119 deletions.
26 changes: 12 additions & 14 deletions src/trace/implementations/huffman_container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,8 @@ impl<B: Ord + Clone + 'static> PushInto<Vec<B>> for HuffmanContainer<B> {
}
}

impl<B: Ord + Clone + 'static> BatchContainer for HuffmanContainer<B> {
type Owned = Vec<B>;
type ReadItem<'a> = Wrapped<'a, B>;

fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }

fn copy(&mut self, item: Self::ReadItem<'_>) {
impl<'a, B: Ord + Clone + 'static> PushInto<Wrapped<'a, B>> for HuffmanContainer<B> {
fn push_into(&mut self, item: Wrapped<'a, B>) {
match item.decode() {
Ok(decoded) => {
for x in decoded { *self.stats.entry(x.clone()).or_insert(0) += 1; }
Expand All @@ -74,21 +69,24 @@ impl<B: Ord + Clone + 'static> BatchContainer for HuffmanContainer<B> {
raw.extend(decoded.cloned());
self.offsets.push(raw.len());
}
(Err(symbols), Ok((huffman, bytes))) => {
(Err(symbols), Ok((huffman, bytes))) => {
bytes.extend(huffman.encode(symbols.iter()));
self.offsets.push(bytes.len());
}
(Err(symbols), Err(raw)) => {
(Err(symbols), Err(raw)) => {
raw.extend(symbols.iter().cloned());
self.offsets.push(raw.len());
}
}
}
fn copy_range(&mut self, other: &Self, start: usize, end: usize) {
for index in start .. end {
self.copy(other.index(index));
}
}
}

impl<B: Ord + Clone + 'static> BatchContainer for HuffmanContainer<B> {
type Owned = Vec<B>;
type ReadItem<'a> = Wrapped<'a, B>;

fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }

fn with_capacity(size: usize) -> Self {
let mut offsets = OffsetList::with_capacity(size + 1);
offsets.push(0);
Expand Down
58 changes: 6 additions & 52 deletions src/trace/implementations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,16 +300,6 @@ impl BatchContainer for OffsetList {
type Owned = usize;
type ReadItem<'a> = usize;

fn copy(&mut self, item: Self::ReadItem<'_>) {
self.push(item);
}

fn copy_range(&mut self, other: &Self, start: usize, end: usize) {
for offset in start..end {
self.push(other.index(offset));
}
}

fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }

fn with_capacity(size: usize) -> Self {
Expand Down Expand Up @@ -510,7 +500,7 @@ pub mod containers {
use crate::trace::IntoOwned;

/// A general-purpose container resembling `Vec<T>`.
pub trait BatchContainer: 'static {
pub trait BatchContainer: for<'a> PushInto<Self::ReadItem<'a>> + 'static {
/// An owned instance of `Self::ReadItem<'_>`.
type Owned;

Expand All @@ -521,14 +511,6 @@ pub mod containers {
fn push<D>(&mut self, item: D) where Self: PushInto<D> {
self.push_into(item);
}
/// Inserts a borrowed item.
fn copy(&mut self, item: Self::ReadItem<'_>);
/// Extends from a range of items in another`Self`.
fn copy_range(&mut self, other: &Self, start: usize, end: usize) {
for index in start .. end {
self.copy(other.index(index));
}
}
/// Creates a new container with sufficient capacity.
fn with_capacity(size: usize) -> Self;
/// Creates a new container with sufficient capacity.
Expand Down Expand Up @@ -606,12 +588,6 @@ pub mod containers {

fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }

fn copy(&mut self, item: &T) {
self.push(item.clone());
}
fn copy_range(&mut self, other: &Self, start: usize, end: usize) {
self.extend_from_slice(&other[start .. end]);
}
fn with_capacity(size: usize) -> Self {
Vec::with_capacity(size)
}
Expand All @@ -634,16 +610,6 @@ pub mod containers {

fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }

fn copy(&mut self, item: &T) {
self.copy(item);
}
fn copy_range(&mut self, other: &Self, start: usize, end: usize) {
let slice = &other[start .. end];
self.reserve_items(slice.iter());
for item in slice.iter() {
self.copy(item);
}
}
fn with_capacity(size: usize) -> Self {
Self::with_capacity(size)
}
Expand Down Expand Up @@ -672,10 +638,6 @@ pub mod containers {
type Owned = R::Owned;
type ReadItem<'a> = R::ReadItem<'a>;

fn copy(&mut self, item: Self::ReadItem<'_>) {
self.copy(item);
}

fn with_capacity(size: usize) -> Self {
Self::with_capacity(size)
}
Expand Down Expand Up @@ -711,13 +673,16 @@ pub mod containers {

impl<B: Ord + Clone + 'static> PushInto<&[B]> for SliceContainer<B> {
fn push_into(&mut self, item: &[B]) {
self.copy(item);
for x in item.iter() {
self.inner.push_into(x);
}
self.offsets.push(self.inner.len());
}
}

impl<B: Ord + Clone + 'static> PushInto<&Vec<B>> for SliceContainer<B> {
fn push_into(&mut self, item: &Vec<B>) {
self.copy(item);
self.push_into(&item[..]);
}
}

Expand All @@ -739,17 +704,6 @@ pub mod containers {

fn reborrow<'b, 'a: 'b>(item: Self::ReadItem<'a>) -> Self::ReadItem<'b> { item }

fn copy(&mut self, item: Self::ReadItem<'_>) {
for x in item.iter() {
self.inner.copy(x);
}
self.offsets.push(self.inner.len());
}
fn copy_range(&mut self, other: &Self, start: usize, end: usize) {
for index in start .. end {
self.copy(other.index(index));
}
}
fn with_capacity(size: usize) -> Self {
let mut offsets = Vec::with_capacity(size + 1);
offsets.push(0);
Expand Down
60 changes: 30 additions & 30 deletions src/trace/implementations/ord_neu.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,9 @@ mod val_batch {

// Mark explicit types because type inference fails to resolve it.
let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
keys_offs.copy(0);
keys_offs.push(0);
let vals_offs: &mut L::OffsetContainer = &mut storage.vals_offs;
vals_offs.copy(0);
vals_offs.push(0);

OrdValMerger {
key_cursor1: 0,
Expand Down Expand Up @@ -302,16 +302,16 @@ mod val_batch {
while lower < upper {
self.stash_updates_for_val(source, lower);
if let Some(off) = self.consolidate_updates() {
self.result.vals_offs.copy(off);
self.result.vals.copy(source.vals.index(lower));
self.result.vals_offs.push(off);
self.result.vals.push(source.vals.index(lower));
}
lower += 1;
}

// If we have pushed any values, copy the key as well.
if self.result.vals.len() > init_vals {
self.result.keys.copy(source.keys.index(cursor));
self.result.keys_offs.copy(self.result.vals.len());
self.result.keys.push(source.keys.index(cursor));
self.result.keys_offs.push(self.result.vals.len());
}
}
/// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors.
Expand All @@ -330,8 +330,8 @@ mod val_batch {
let (lower1, upper1) = source1.values_for_key(self.key_cursor1);
let (lower2, upper2) = source2.values_for_key(self.key_cursor2);
if let Some(off) = self.merge_vals((source1, lower1, upper1), (source2, lower2, upper2)) {
self.result.keys.copy(source1.keys.index(self.key_cursor1));
self.result.keys_offs.copy(off);
self.result.keys.push(source1.keys.index(self.key_cursor1));
self.result.keys_offs.push(off);
}
// Increment cursors in either case; the keys are merged.
self.key_cursor1 += 1;
Expand Down Expand Up @@ -364,17 +364,17 @@ mod val_batch {
// Extend stash by updates, with logical compaction applied.
self.stash_updates_for_val(source1, lower1);
if let Some(off) = self.consolidate_updates() {
self.result.vals_offs.copy(off);
self.result.vals.copy(source1.vals.index(lower1));
self.result.vals_offs.push(off);
self.result.vals.push(source1.vals.index(lower1));
}
lower1 += 1;
},
Ordering::Equal => {
self.stash_updates_for_val(source1, lower1);
self.stash_updates_for_val(source2, lower2);
if let Some(off) = self.consolidate_updates() {
self.result.vals_offs.copy(off);
self.result.vals.copy(source1.vals.index(lower1));
self.result.vals_offs.push(off);
self.result.vals.push(source1.vals.index(lower1));
}
lower1 += 1;
lower2 += 1;
Expand All @@ -383,8 +383,8 @@ mod val_batch {
// Extend stash by updates, with logical compaction applied.
self.stash_updates_for_val(source2, lower2);
if let Some(off) = self.consolidate_updates() {
self.result.vals_offs.copy(off);
self.result.vals.copy(source2.vals.index(lower2));
self.result.vals_offs.push(off);
self.result.vals.push(source2.vals.index(lower2));
}
lower2 += 1;
},
Expand All @@ -394,16 +394,16 @@ mod val_batch {
while lower1 < upper1 {
self.stash_updates_for_val(source1, lower1);
if let Some(off) = self.consolidate_updates() {
self.result.vals_offs.copy(off);
self.result.vals.copy(source1.vals.index(lower1));
self.result.vals_offs.push(off);
self.result.vals.push(source1.vals.index(lower1));
}
lower1 += 1;
}
while lower2 < upper2 {
self.stash_updates_for_val(source2, lower2);
if let Some(off) = self.consolidate_updates() {
self.result.vals_offs.copy(off);
self.result.vals.copy(source2.vals.index(lower2));
self.result.vals_offs.push(off);
self.result.vals.push(source2.vals.index(lower2));
}
lower2 += 1;
}
Expand Down Expand Up @@ -616,16 +616,16 @@ mod val_batch {
self.push_update(time, diff);
} else {
// New value; complete representation of prior value.
self.result.vals_offs.copy(self.result.times.len());
self.result.vals_offs.push(self.result.times.len());
if self.singleton.take().is_some() { self.singletons += 1; }
self.push_update(time, diff);
self.result.vals.push(val);
}
} else {
// New key; complete representation of prior key.
self.result.vals_offs.copy(self.result.times.len());
self.result.vals_offs.push(self.result.times.len());
if self.singleton.take().is_some() { self.singletons += 1; }
self.result.keys_offs.copy(self.result.vals.len());
self.result.keys_offs.push(self.result.vals.len());
self.push_update(time, diff);
self.result.vals.push(val);
self.result.keys.push(key);
Expand All @@ -636,10 +636,10 @@ mod val_batch {
#[inline(never)]
fn done(mut self, lower: Antichain<Self::Time>, upper: Antichain<Self::Time>, since: Antichain<Self::Time>) -> OrdValBatch<L> {
// Record the final offsets
self.result.vals_offs.copy(self.result.times.len());
self.result.vals_offs.push(self.result.times.len());
// Remove any pending singleton, and if it was set increment our count.
if self.singleton.take().is_some() { self.singletons += 1; }
self.result.keys_offs.copy(self.result.vals.len());
self.result.keys_offs.push(self.result.vals.len());
OrdValBatch {
updates: self.result.times.len() + self.singletons,
storage: self.result,
Expand Down Expand Up @@ -795,7 +795,7 @@ mod key_batch {
};

let keys_offs: &mut L::OffsetContainer = &mut storage.keys_offs;
keys_offs.copy(0);
keys_offs.push(0);

OrdKeyMerger {
key_cursor1: 0,
Expand Down Expand Up @@ -855,8 +855,8 @@ mod key_batch {
fn copy_key(&mut self, source: &OrdKeyStorage<L>, cursor: usize) {
self.stash_updates_for_key(source, cursor);
if let Some(off) = self.consolidate_updates() {
self.result.keys_offs.copy(off);
self.result.keys.copy(source.keys.index(cursor));
self.result.keys_offs.push(off);
self.result.keys.push(source.keys.index(cursor));
}
}
/// Merge the next key in each of `source1` and `source2` into `self`, updating the appropriate cursors.
Expand All @@ -875,8 +875,8 @@ mod key_batch {
self.stash_updates_for_key(source1, self.key_cursor1);
self.stash_updates_for_key(source2, self.key_cursor2);
if let Some(off) = self.consolidate_updates() {
self.result.keys_offs.copy(off);
self.result.keys.copy(source1.keys.index(self.key_cursor1));
self.result.keys_offs.push(off);
self.result.keys.push(source1.keys.index(self.key_cursor1));
}
// Increment cursors in either case; the keys are merged.
self.key_cursor1 += 1;
Expand Down Expand Up @@ -1078,7 +1078,7 @@ mod key_batch {
self.push_update(time, diff);
} else {
// New key; complete representation of prior key.
self.result.keys_offs.copy(self.result.times.len());
self.result.keys_offs.push(self.result.times.len());
// Remove any pending singleton, and if it was set increment our count.
if self.singleton.take().is_some() { self.singletons += 1; }
self.push_update(time, diff);
Expand All @@ -1090,7 +1090,7 @@ mod key_batch {
#[inline(never)]
fn done(mut self, lower: Antichain<Self::Time>, upper: Antichain<Self::Time>, since: Antichain<Self::Time>) -> OrdKeyBatch<L> {
// Record the final offsets
self.result.keys_offs.copy(self.result.times.len());
self.result.keys_offs.push(self.result.times.len());
// Remove any pending singleton, and if it was set increment our count.
if self.singleton.take().is_some() { self.singletons += 1; }
OrdKeyBatch {
Expand Down
Loading

0 comments on commit be698bc

Please sign in to comment.