diff --git a/examples/spines.rs b/examples/spines.rs index 6720575fe..934ec6696 100644 --- a/examples/spines.rs +++ b/examples/spines.rs @@ -65,9 +65,9 @@ fn main() { .probe_with(&mut probe); }, "flat" => { - use differential_dataflow::trace::implementations::ord_neu::FlatKeySpine; - let data = data.arrange::>(); - let keys = keys.arrange::>(); + use differential_dataflow::trace::implementations::ord_neu::FlatKeySpineDefault; + let data = data.arrange::>(); + let keys = keys.arrange::>(); keys.join_core(&data, |_k, (), ()| Option::<()>::None) .probe_with(&mut probe); } diff --git a/src/trace/implementations/merge_batcher_flat.rs b/src/trace/implementations/merge_batcher_flat.rs index d6296a4d8..ae8d2894f 100644 --- a/src/trace/implementations/merge_batcher_flat.rs +++ b/src/trace/implementations/merge_batcher_flat.rs @@ -7,23 +7,25 @@ use timely::{Container, Data, PartialOrder}; use timely::container::flatcontainer::{Push, FlatStack, Region, ReserveItems}; use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion}; -use crate::difference::Semigroup; +use crate::difference::{IsZero, Semigroup}; use crate::trace::implementations::merge_batcher::Merger; use crate::trace::Builder; use crate::trace::cursor::IntoOwned; -/// A merger for flat stacks. `T` describes the -pub struct FlatcontainerMerger { - _marker: PhantomData<(T, R, MC)>, +/// A merger for flat stacks. +/// +/// `MC` is a [`Region`] that implements [`MergerChunk`]. +pub struct FlatcontainerMerger { + _marker: PhantomData, } -impl Default for FlatcontainerMerger { +impl Default for FlatcontainerMerger { fn default() -> Self { Self { _marker: PhantomData, } } } -impl FlatcontainerMerger { +impl FlatcontainerMerger { const BUFFER_SIZE_BYTES: usize = 8 << 10; fn chunk_capacity(&self) -> usize { let size = ::std::mem::size_of::(); @@ -61,8 +63,12 @@ pub trait MergerChunk: Region { type Val<'a>: Ord where Self: 'a; /// The time of the update type Time<'a>: Ord where Self: 'a; + /// The owned time type. + type TimeOwned; /// The diff of the update type Diff<'a> where Self: 'a; + /// The owned diff type. + type DiffOwned; /// Split a read item into its constituents. Must be cheap. fn into_parts<'a>(item: Self::ReadItem<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time<'a>, Self::Diff<'a>); @@ -81,47 +87,48 @@ where type Key<'a> = K::ReadItem<'a> where Self: 'a; type Val<'a> = V::ReadItem<'a> where Self: 'a; type Time<'a> = T::ReadItem<'a> where Self: 'a; + type TimeOwned = T::Owned; type Diff<'a> = R::ReadItem<'a> where Self: 'a; + type DiffOwned = R::Owned; fn into_parts<'a>(((key, val), time, diff): Self::ReadItem<'a>) -> (Self::Key<'a>, Self::Val<'a>, Self::Time<'a>, Self::Diff<'a>) { (key, val, time, diff) } } -impl Merger for FlatcontainerMerger +impl Merger for FlatcontainerMerger where - for<'a> T: Ord + PartialOrder + PartialOrder> + Data, - for<'a> R: Default + Semigroup + Semigroup> + Data, - for<'a> FR: MergerChunk + Clone + 'static - + ReserveItems<::ReadItem<'a>> - + Push<::ReadItem<'a>> - + Push<((FR::Key<'a>, FR::Val<'a>), FR::Time<'a>, &'a R)> - + Push<((FR::Key<'a>, FR::Val<'a>), FR::Time<'a>, FR::Diff<'a>)>, - for<'a> FR::Time<'a>: PartialOrder + Copy + IntoOwned<'a, Owned=T>, - for<'a> FR::Diff<'a>: IntoOwned<'a, Owned=R>, - for<'a> FR::ReadItem<'a>: std::fmt::Debug, + for<'a> MC: MergerChunk + Clone + 'static + + ReserveItems<::ReadItem<'a>> + + Push<::ReadItem<'a>> + + Push<((MC::Key<'a>, MC::Val<'a>), MC::Time<'a>, &'a MC::DiffOwned)> + + Push<((MC::Key<'a>, MC::Val<'a>), MC::Time<'a>, MC::Diff<'a>)>, + for<'a> MC::Time<'a>: PartialOrder + Copy + IntoOwned<'a, Owned=MC::TimeOwned>, + for<'a> MC::Diff<'a>: IntoOwned<'a, Owned = MC::DiffOwned>, + for<'a> MC::TimeOwned: Ord + PartialOrder + PartialOrder> + Data, + for<'a> MC::DiffOwned: Default + Semigroup + Semigroup> + Data, { - type Time = T; - type Chunk = FlatStack; - type Output = FlatStack; + type Time = MC::TimeOwned; + type Chunk = FlatStack; + type Output = FlatStack; fn merge(&mut self, list1: Vec, list2: Vec, output: &mut Vec, stash: &mut Vec) { let mut list1 = list1.into_iter(); let mut list2 = list2.into_iter(); - let mut head1 = >::from(list1.next().unwrap_or_default()); - let mut head2 = >::from(list2.next().unwrap_or_default()); + let mut head1 = >::from(list1.next().unwrap_or_default()); + let mut head2 = >::from(list2.next().unwrap_or_default()); let mut result = self.empty(stash); - let mut diff = R::default(); + let mut diff = MC::DiffOwned::default(); // while we have valid data in each input, merge. while !head1.is_empty() && !head2.is_empty() { while (result.capacity() - result.len()) > 0 && !head1.is_empty() && !head2.is_empty() { let cmp = { - let (key1, val1, time1, _diff) = FR::into_parts(head1.peek()); - let (key2, val2, time2, _diff) = FR::into_parts(head2.peek()); + let (key1, val1, time1, _diff) = MC::into_parts(head1.peek()); + let (key2, val2, time2, _diff) = MC::into_parts(head2.peek()); ((key1, val1), time1).cmp(&((key2, val2), time2)) }; // TODO: The following less/greater branches could plausibly be a good moment for @@ -135,8 +142,8 @@ where result.copy(head2.pop()); } Ordering::Equal => { - let (key, val, time1, diff1) = FR::into_parts(head1.pop()); - let (_key, _val, _time2, diff2) = FR::into_parts(head2.pop()); + let (key, val, time1, diff1) = MC::into_parts(head1.pop()); + let (_key, _val, _time2, diff2) = MC::into_parts(head2.pop()); diff1.clone_onto(&mut diff); diff.plus_equals(&diff2); if !diff.is_zero() { @@ -207,7 +214,7 @@ where let mut ready = self.empty(stash); for buffer in merged { - for (key, val, time, diff) in buffer.iter().map(FR::into_parts) { + for (key, val, time, diff) in buffer.iter().map(MC::into_parts) { if upper.less_equal(&time) { frontier.insert_with(&time, |time| (*time).into_owned()); if keep.len() == keep.capacity() && !keep.is_empty() { @@ -247,7 +254,7 @@ where { let mut prev_keyval = None; for buffer in chain.iter() { - for (key, val, time, _diff) in buffer.iter().map(FR::into_parts) { + for (key, val, time, _diff) in buffer.iter().map(MC::into_parts) { if !upper.less_equal(&time) { if let Some((p_key, p_val)) = prev_keyval { debug_assert!(p_key <= key); diff --git a/src/trace/implementations/mod.rs b/src/trace/implementations/mod.rs index e19712f06..e39522a7e 100644 --- a/src/trace/implementations/mod.rs +++ b/src/trace/implementations/mod.rs @@ -140,9 +140,9 @@ where type OffsetContainer = OffsetList; } -/// A layout based on timely stacks -pub struct FlatLayout { - phantom: std::marker::PhantomData, +/// A layout based on flat containers. +pub struct FlatLayout { + phantom: std::marker::PhantomData<(K, V, T, R)>, } /// A type with a preferred container. @@ -400,49 +400,68 @@ where } mod flatcontainer { - use timely::container::flatcontainer::{Containerized, FlatStack, IntoOwned, Push, Region}; + use timely::container::flatcontainer::{FlatStack, IntoOwned, Push, Region}; use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion}; + use timely::progress::Timestamp; + + use crate::difference::Semigroup; + use crate::lattice::Lattice; use crate::trace::implementations::{BatchContainer, BuilderInput, FlatLayout, Layout, OffsetList, Update}; - impl Layout for FlatLayout + impl Update for FlatLayout where - U::Key: Containerized, - for<'a> ::Region: Push + Push<<::Region as Region>::ReadItem<'a>>, - for<'a> <::Region as Region>::ReadItem<'a>: Copy + Ord, - U::Val: Containerized, - for<'a> ::Region: Push + Push<<::Region as Region>::ReadItem<'a>>, - for<'a> <::Region as Region>::ReadItem<'a>: Copy + Ord, - U::Time: Containerized, - ::Region: Region, - for<'a> ::Region: Push + Push<<::Region as Region>::ReadItem<'a>>, - for<'a> <::Region as Region>::ReadItem<'a>: Copy + Ord, - U::Diff: Containerized, - ::Region: Region, - for<'a> ::Region: Push + Push<<::Region as Region>::ReadItem<'a>>, - for<'a> <::Region as Region>::ReadItem<'a>: Copy + Ord, + K: Region, + V: Region, + T: Region, + R: Region, + K::Owned: Ord + Clone + 'static, + V::Owned: Ord + Clone + 'static, + T::Owned: Ord + Clone + Lattice + Timestamp + 'static, + R::Owned: Ord + Semigroup + 'static, { - type Target = U; - type KeyContainer = FlatStack<::Region>; - type ValContainer = FlatStack<::Region>; - type TimeContainer = FlatStack<::Region>; - type DiffContainer = FlatStack<::Region>; + type Key = K::Owned; + type Val = V::Owned; + type Time = T::Owned; + type Diff = R::Owned; + } + + impl Layout for FlatLayout + where + K: Region + Push<::Owned> + for<'a> Push<::ReadItem<'a>> + 'static, + V: Region + Push<::Owned> + for<'a> Push<::ReadItem<'a>> + 'static, + T: Region + Push<::Owned> + for<'a> Push<::ReadItem<'a>> + 'static, + R: Region + Push<::Owned> + for<'a> Push<::ReadItem<'a>> + 'static, + K::Owned: Ord + Clone + 'static, + V::Owned: Ord + Clone + 'static, + T::Owned: Ord + Clone + Lattice + Timestamp + 'static, + R::Owned: Ord + Semigroup + 'static, + for<'a> K::ReadItem<'a>: Copy + Ord, + for<'a> V::ReadItem<'a>: Copy + Ord, + for<'a> T::ReadItem<'a>: Copy + Ord, + for<'a> R::ReadItem<'a>: Copy + Ord, + { + type Target = Self; + type KeyContainer = FlatStack; + type ValContainer = FlatStack; + type TimeContainer = FlatStack; + type DiffContainer = FlatStack; type OffsetContainer = OffsetList; } impl BuilderInput for FlatStack,T,R>> where K: Region + Clone + 'static, + V: Region + Clone + 'static, + T: Region + Clone + 'static, + R: Region + Clone + 'static, for<'a> K::ReadItem<'a>: Copy + Ord, - KBC: BatchContainer, - for<'a> KBC::ReadItem<'a>: PartialEq>, - for<'a> V: Region + Clone + 'static, for<'a> V::ReadItem<'a>: Copy + Ord, - VBC: BatchContainer, - for<'a> VBC::ReadItem<'a>: PartialEq>, - for<'a> T: Region + Clone + 'static, for<'a> T::ReadItem<'a>: Copy + Ord, - for<'a> R: Region + Clone + 'static, for<'a> R::ReadItem<'a>: Copy + Ord, + KBC: BatchContainer, + VBC: BatchContainer, + for<'a> KBC::ReadItem<'a>: PartialEq>, + for<'a> VBC::ReadItem<'a>: PartialEq>, { type Key<'a> = K::ReadItem<'a>; type Val<'a> = V::ReadItem<'a>; diff --git a/src/trace/implementations/ord_neu.rs b/src/trace/implementations/ord_neu.rs index 20b7cd002..f161bdc5b 100644 --- a/src/trace/implementations/ord_neu.rs +++ b/src/trace/implementations/ord_neu.rs @@ -11,12 +11,13 @@ use std::rc::Rc; use timely::container::columnation::{TimelyStack}; use timely::container::flatcontainer::{Containerized, FlatStack}; +use timely::container::flatcontainer::impls::tuple::{TupleABCRegion, TupleABRegion}; use crate::trace::implementations::chunker::{ColumnationChunker, ContainerChunker, VecChunker}; use crate::trace::implementations::spine_fueled::Spine; use crate::trace::implementations::merge_batcher::{MergeBatcher, VecMerger}; use crate::trace::implementations::merge_batcher_col::ColumnationMerger; -use crate::trace::implementations::merge_batcher_flat::FlatcontainerMerger; +use crate::trace::implementations::merge_batcher_flat::{FlatcontainerMerger, MergerChunk}; use crate::trace::rc_blanket_impls::RcBuilder; use super::{Update, Layout, Vector, TStack, Preferred, FlatLayout}; @@ -41,15 +42,17 @@ pub type ColValSpine = Spine< >; /// A trace implementation backed by flatcontainer storage. -pub type FlatValSpine = Spine< - Rc>>, - MergeBatcher< - C, - ContainerChunker::Region>>, - FlatcontainerMerger::Region>, - T, - >, - RcBuilder, FlatStack<<((K,V),T,R) as Containerized>::Region>>>, +pub type FlatValSpine = Spine< + Rc>, + MergeBatcher>, FlatcontainerMerger, ::TimeOwned>, + RcBuilder>>, +>; + +/// A trace implementation backed by flatcontainer storage, using [`FlatLayout`] as the layout. +pub type FlatValSpineDefault = FlatValSpine< + FlatLayout<::Region, ::Region, ::Region, ::Region>, + TupleABCRegion::Region, ::Region>, ::Region, ::Region>, + C, >; /// A trace implementation using a spine of ordered lists. @@ -69,15 +72,17 @@ pub type ColKeySpine = Spine< >; /// A trace implementation backed by flatcontainer storage. -pub type FlatKeySpine = Spine< - Rc>>, - MergeBatcher< - C, - ContainerChunker::Region>>, - FlatcontainerMerger::Region>, - T, - >, - RcBuilder, FlatStack<<((K,()),T,R) as Containerized>::Region>>>, +pub type FlatKeySpine = Spine< + Rc>, + MergeBatcher>, FlatcontainerMerger, ::TimeOwned>, + RcBuilder>>, +>; + +/// A trace implementation backed by flatcontainer storage, using [`FlatLayout`] as the layout. +pub type FlatKeySpineDefault = FlatKeySpine< + FlatLayout<::Region, <() as Containerized>::Region, ::Region, ::Region>, + TupleABCRegion::Region, <() as Containerized>::Region>, ::Region, ::Region>, + C, >; /// A trace implementation backed by columnar storage. diff --git a/tests/bfs.rs b/tests/bfs.rs index 930f4fead..dee2a9e2c 100644 --- a/tests/bfs.rs +++ b/tests/bfs.rs @@ -17,7 +17,7 @@ use differential_dataflow::Collection; use differential_dataflow::operators::*; use differential_dataflow::lattice::Lattice; use differential_dataflow::operators::arrange::Arrange; -use differential_dataflow::trace::implementations::ord_neu::{FlatKeySpine, FlatValSpine}; +use differential_dataflow::trace::implementations::ord_neu::{FlatKeySpineDefault, FlatValSpineDefault}; type Node = usize; type Edge = (Node, Node); @@ -246,8 +246,8 @@ fn bfs_differential_flat( let (edge_input, edges) = scope.new_collection(); let c = bfs_flat(&edges, &roots).map(|(_, dist)| (dist, ())); - let arranged = c.arrange::>>(); - type T2 = FlatValSpine>; + let arranged = c.arrange::>>(); + type T2 = FlatValSpineDefault>; let reduced = arranged.reduce_abelian::<_, _, _, T2>("Count", |_k, s, t| { t.push((s[0].1.clone(), isize::from(1i8))) }); @@ -315,7 +315,7 @@ where let edges = edges.enter(&inner.scope()); let nodes = nodes.enter(&inner.scope()); - type Spine = FlatValSpine>; + type Spine = FlatValSpineDefault>; let arranged1 = inner.arrange::>>(); let arranged2 = edges.arrange::>>(); arranged1