Skip to content

Commit

Permalink
Replace OrdOffset with OffsetList (TimelyDataflow#437)
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry authored Nov 28, 2023
1 parent c3ca293 commit 3182200
Show file tree
Hide file tree
Showing 5 changed files with 234 additions and 183 deletions.
128 changes: 94 additions & 34 deletions src/trace/implementations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ pub mod rhh;
pub use self::ord::OrdValSpine as ValSpine;
pub use self::ord::OrdKeySpine as KeySpine;

use std::ops::{Add, Sub};
use std::convert::{TryInto, TryFrom};
use std::borrow::{Borrow, ToOwned};

use timely::container::columnation::{Columnation, TimelyStack};
Expand Down Expand Up @@ -96,10 +94,6 @@ where
pub trait Layout {
/// The represented update.
type Target: Update + ?Sized;
/// Offsets to use from keys into vals.
type KeyOffset: OrdOffset;
/// Offsets to use from vals into updates.
type ValOffset: OrdOffset;
/// Container for update keys.
type KeyContainer:
RetainFrom<<Self::Target as Update>::Key>+
Expand All @@ -114,38 +108,34 @@ pub trait Layout {
}

/// A layout that uses vectors
pub struct Vector<U: Update, O: OrdOffset = usize> {
phantom: std::marker::PhantomData<(U, O)>,
pub struct Vector<U: Update> {
phantom: std::marker::PhantomData<U>,
}

impl<U: Update, O: OrdOffset> Layout for Vector<U, O>
impl<U: Update> Layout for Vector<U>
where
U::Key: ToOwned<Owned = U::Key> + Sized + Clone,
U::Val: ToOwned<Owned = U::Val> + Sized + Clone,
{
type Target = U;
type KeyOffset = O;
type ValOffset = O;
type KeyContainer = Vec<U::Key>;
type ValContainer = Vec<U::Val>;
type UpdContainer = Vec<(U::Time, U::Diff)>;
}

/// A layout based on timely stacks
pub struct TStack<U: Update, O: OrdOffset = usize> {
phantom: std::marker::PhantomData<(U, O)>,
pub struct TStack<U: Update> {
phantom: std::marker::PhantomData<U>,
}

impl<U: Update+Clone, O: OrdOffset> Layout for TStack<U, O>
impl<U: Update+Clone> Layout for TStack<U>
where
U::Key: Columnation + ToOwned<Owned = U::Key>,
U::Val: Columnation + ToOwned<Owned = U::Val>,
U::Time: Columnation,
U::Diff: Columnation,
{
type Target = U;
type KeyOffset = O;
type ValOffset = O;
type KeyContainer = TimelyStack<U::Key>;
type ValContainer = TimelyStack<U::Val>;
type UpdContainer = TimelyStack<(U::Time, U::Diff)>;
Expand All @@ -168,19 +158,18 @@ impl<T: Clone> PreferredContainer for [T] {
}

/// An update and layout description based on preferred containers.
pub struct Preferred<K: ?Sized, V: ?Sized, T, D, O: OrdOffset = usize> {
phantom: std::marker::PhantomData<(Box<K>, Box<V>, T, D, O)>,
pub struct Preferred<K: ?Sized, V: ?Sized, T, D> {
phantom: std::marker::PhantomData<(Box<K>, Box<V>, T, D)>,
}

impl<K,V,T,R,O> Update for Preferred<K, V, T, R, O>
impl<K,V,T,R> Update for Preferred<K, V, T, R>
where
K: Ord+ToOwned + ?Sized,
K::Owned: Ord+Clone,
V: Ord+ToOwned + ?Sized,
V::Owned: Ord+Clone,
T: Ord+Lattice+timely::progress::Timestamp+Clone,
R: Semigroup+Clone,
O: OrdOffset,
{
type Key = K;
type KeyOwned = K::Owned;
Expand All @@ -190,19 +179,16 @@ where
type Diff = R;
}

impl<K, V, T, D, O> Layout for Preferred<K, V, T, D, O>
impl<K, V, T, D> Layout for Preferred<K, V, T, D>
where
K: Ord+ToOwned+PreferredContainer + ?Sized,
K::Owned: Ord+Clone,
V: Ord+ToOwned+PreferredContainer + ?Sized,
V::Owned: Ord+Clone,
T: Ord+Lattice+timely::progress::Timestamp+Clone,
D: Semigroup+Clone,
O: OrdOffset,
{
type Target = Preferred<K, V, T, D, O>;
type KeyOffset = O;
type ValOffset = O;
type Target = Preferred<K, V, T, D>;
type KeyContainer = K::Container;
type ValContainer = V::Container;
type UpdContainer = Vec<(T, D)>;
Expand Down Expand Up @@ -239,16 +225,90 @@ impl<T: Columnation> RetainFrom<T> for TimelyStack<T> {
}
}

/// Trait for types used as offsets into an ordered layer.
/// This is usually `usize`, but `u32` can also be used in applications
/// where huge batches do not occur to reduce metadata size.
pub trait OrdOffset: Copy + PartialEq + Add<Output=Self> + Sub<Output=Self> + TryFrom<usize> + TryInto<usize>
{}
use std::convert::TryInto;

impl<O> OrdOffset for O
where
O: Copy + PartialEq + Add<Output=Self> + Sub<Output=Self> + TryFrom<usize> + TryInto<usize>,
{}
/// A list of unsigned integers that uses `u32` elements as long as they are small enough, and switches to `u64` once they are not.
#[derive(Eq, PartialEq, Ord, PartialOrd, Clone, Debug, Abomonation)]
pub struct OffsetList {
/// Offsets that fit within a `u32`.
smol: Vec<u32>,
/// Offsets that either do not fit in a `u32`, or are inserted after some offset that did not fit.
chonk: Vec<u64>,
}

impl OffsetList {
/// Inserts the offset, as a `u32` if that is still on the table.
pub fn push(&mut self, offset: usize) {
if self.chonk.is_empty() {
if let Ok(smol) = offset.try_into() {
self.smol.push(smol);
}
else {
self.chonk.push(offset.try_into().unwrap())
}
}
else {
self.chonk.push(offset.try_into().unwrap())
}
}
/// Like `std::ops::Index`, which we cannot implement as it must return a `&usize`.
pub fn index(&self, index: usize) -> usize {
if index < self.smol.len() {
self.smol[index].try_into().unwrap()
}
else {
self.chonk[index - self.smol.len()].try_into().unwrap()
}
}
/// Set the offset at location index.
///
/// Complicated if `offset` does not fit into `self.smol`.
pub fn set(&mut self, index: usize, offset: usize) {
if index < self.smol.len() {
if let Ok(off) = offset.try_into() {
self.smol[index] = off;
}
else {
// Move all `smol` elements from `index` onward to the front of `chonk`.
self.chonk.splice(0..0, self.smol.drain(index ..).map(|x| x.try_into().unwrap()));
self.chonk[index - self.smol.len()] = offset.try_into().unwrap();
}
}
else {
self.chonk[index - self.smol.len()] = offset.try_into().unwrap();
}
}
/// The last element in the list of offsets, if non-empty.
pub fn last(&self) -> Option<usize> {
if self.chonk.is_empty() {
self.smol.last().map(|x| (*x).try_into().unwrap())
}
else {
self.chonk.last().map(|x| (*x).try_into().unwrap())
}
}
/// THe number of offsets in the list.
pub fn len(&self) -> usize {
self.smol.len() + self.chonk.len()
}
/// Allocate a new list with a specified capacity.
pub fn with_capacity(cap: usize) -> Self {
Self {
smol: Vec::with_capacity(cap),
chonk: Vec::new(),
}
}
/// Trim all elements at index `length` and greater.
pub fn truncate(&mut self, length: usize) {
if length > self.smol.len() {
self.chonk.truncate(length - self.smol.len());
}
else {
assert!(self.chonk.is_empty());
self.smol.truncate(length);
}
}
}

pub use self::containers::{BatchContainer, SliceContainer};

Expand Down
Loading

0 comments on commit 3182200

Please sign in to comment.