Skip to content

Commit

Permalink
Canonicalize PointStamp representation (#458)
Browse files Browse the repository at this point in the history
* Canonicalize PointStamp representation

* Make PointStamp::vector private
  • Loading branch information
frankmcsherry authored Jan 30, 2024
1 parent e153706 commit 2895164
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 10 deletions.
8 changes: 6 additions & 2 deletions src/dynamic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,10 +51,14 @@ where
input.for_each(|cap, data| {
data.swap(&mut vector);
let mut new_time = cap.time().clone();
new_time.inner.vector.truncate(level - 1);
let mut vec = std::mem::take(&mut new_time.inner).into_vec();
vec.truncate(level - 1);
new_time.inner = PointStamp::new(vec);
let new_cap = cap.delayed(&new_time);
for (_data, time, _diff) in vector.iter_mut() {
time.inner.vector.truncate(level - 1);
let mut vec = std::mem::take(&mut time.inner).into_vec();
vec.truncate(level - 1);
time.inner = PointStamp::new(vec);
}
output.session(&new_cap).give_vec(&mut vector);
});
Expand Down
29 changes: 21 additions & 8 deletions src/dynamic/pointstamp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,33 @@ use serde::{Deserialize, Serialize};
/// A sequence of timestamps, partially ordered by the product order.
///
/// Sequences of different lengths are compared as if extended indefinitely by `T::minimum()`.
/// Sequences are not guaranteed to be "minimal", and may end with `T::minimum()` entries.
/// Sequences are guaranteed to be "minimal", and may not end with `T::minimum()` entries.
#[derive(
Hash, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Debug, Serialize, Deserialize, Abomonation,
)]
pub struct PointStamp<T> {
/// A sequence of timestamps corresponding to timestamps in a sequence of nested scopes.
pub vector: Vec<T>,
vector: Vec<T>,
}

impl<T> PointStamp<T> {
impl<T: Timestamp> PointStamp<T> {
/// Create a new sequence.
pub fn new(vector: Vec<T>) -> Self {
///
/// This method will modify `vector` to ensure it does not end with `T::minimum()`.
pub fn new(mut vector: Vec<T>) -> Self {
while vector.last() == Some(&T::minimum()) {
vector.pop();
}
PointStamp { vector }
}
/// Returns the wrapped vector.
///
/// This method is the support way to mutate the contents of `self`, by extracting
/// the vector and then re-introducting it with `PointStamp::new` to re-establish
/// the invariant that the vector not end with `T::minimum`.
pub fn into_vec(self) -> Vec<T> {
self.vector
}
}

// Implement timely dataflow's `PartialOrder` trait.
Expand Down Expand Up @@ -109,7 +122,7 @@ impl<T: Timestamp> PathSummary<PointStamp<T>> for PointStampSummary<T::Summary>
vector.push(action.results_in(&T::minimum())?);
}

Some(PointStamp { vector })
Some(PointStamp::new(vector))
}
fn followed_by(&self, other: &Self) -> Option<Self> {
// The output `retain` will be the minimum of the two inputs.
Expand Down Expand Up @@ -166,7 +179,7 @@ impl<TS: PartialOrder> PartialOrder for PointStampSummary<TS> {
use timely::progress::Timestamp;
impl<T: Timestamp> Timestamp for PointStamp<T> {
fn minimum() -> Self {
Self { vector: Vec::new() }
Self::new(Vec::new())
}
type Summary = PointStampSummary<T::Summary>;
}
Expand All @@ -190,7 +203,7 @@ impl<T: Lattice + Timestamp + Clone> Lattice for PointStamp<T> {
for time in &other.vector[min_len..] {
vector.push(time.clone());
}
Self { vector }
Self::new(vector)
}
fn meet(&self, other: &Self) -> Self {
let min_len = ::std::cmp::min(self.vector.len(), other.vector.len());
Expand All @@ -200,7 +213,7 @@ impl<T: Lattice + Timestamp + Clone> Lattice for PointStamp<T> {
vector.push(self.vector[index].meet(&other.vector[index]));
}
// Remaining coordinates are `T::minimum()` in one input, and so in the output.
Self { vector }
Self::new(vector)
}
}

Expand Down

0 comments on commit 2895164

Please sign in to comment.