diff --git a/container/Cargo.toml b/container/Cargo.toml index 4d643425d..ae2060dbc 100644 --- a/container/Cargo.toml +++ b/container/Cargo.toml @@ -9,3 +9,5 @@ edition.workspace = true columnation = { git = "https://github.com/frankmcsherry/columnation" } flatcontainer = "0.5" serde = { version = "1.0", features = ["derive"] } +# columnar = { path = "../../columnar" } +columnar = { git = "https://github.com/frankmcsherry/columnar" } diff --git a/container/src/columnar.rs b/container/src/columnar.rs new file mode 100644 index 000000000..a63306a7f --- /dev/null +++ b/container/src/columnar.rs @@ -0,0 +1,88 @@ +//! Present a columnar container as a timely container. + +use serde::{Serialize, Deserialize}; + +pub use columnar::*; +use columnar::common::IterOwn; + +use crate::{Container, SizableContainer, PushInto}; + +/// A container based on a `columnar` store. +#[derive(Clone, Default, Serialize, Deserialize)] +pub struct Columnar { + store: C, +} + +impl Container for Columnar +where + for<'a> &'a C: columnar::IndexOwn, +{ + fn len(&self) -> usize { self.store.len() } + fn clear(&mut self) { self.store.clear() } + + type ItemRef<'a> = <&'a C as IndexOwn>::Ref where Self: 'a; + type Iter<'a> = IterOwn<&'a C>; + fn iter<'a>(&'a self) -> Self::Iter<'a> { self.store.iter() } + + type Item<'a> = <&'a C as IndexOwn>::Ref where Self: 'a; + type DrainIter<'a> = IterOwn<&'a C>; + fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> { self.store.iter() } +} + +impl SizableContainer for Columnar +where + for<'a> &'a C: columnar::IndexOwn, +{ + fn capacity(&self) -> usize { 1024 } + fn preferred_capacity() -> usize { 1024 } + fn reserve(&mut self, _additional: usize) { } +} + +impl, T> PushInto for Columnar { + #[inline] + fn push_into(&mut self, item: T) { + self.store.push(item); + } +} + + +use columnar::bytes::{AsBytes, FromBytes, serialization::decode}; + +/// A container based on a columnar store, encoded in aligned bytes. +#[derive(Clone, Default)] +pub struct ColumnarBytes { + bytes: B, + phantom: std::marker::PhantomData, +} + +impl + Clone + Default + 'static, C: AsBytes + Clone + Default + 'static> Container for ColumnarBytes +where + for<'a> C::Borrowed<'a> : Len + Clear + IndexOwn, +{ + fn len(&self) -> usize { + as FromBytes>::from_bytes(&mut decode(&self.bytes)).len() + } + // Perhpas this should be an enum that allows the bytes to be un-set, but .. not sure what this should do. + fn clear(&mut self) { unimplemented!() } + + type ItemRef<'a> = as IndexOwn>::Ref where Self: 'a; + type Iter<'a> = IterOwn>; + fn iter<'a>(&'a self) -> Self::Iter<'a> { + as FromBytes>::from_bytes(&mut decode(&self.bytes)).iter() + } + + type Item<'a> = as IndexOwn>::Ref where Self: 'a; + type DrainIter<'a> = IterOwn>; + fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> { + as FromBytes>::from_bytes(&mut decode(&self.bytes)).iter() + } +} + +impl + Clone + Default + 'static, C: AsBytes + Clone + Default + 'static> SizableContainer for ColumnarBytes +where + for<'a> C::Borrowed<'a> : Len + Clear + IndexOwn, +{ + fn capacity(&self) -> usize { 1024 } + fn preferred_capacity() -> usize { 1024 } + fn reserve(&mut self, _additional: usize) { } +} diff --git a/container/src/lib.rs b/container/src/lib.rs index a86c09b5a..ee2412fcf 100644 --- a/container/src/lib.rs +++ b/container/src/lib.rs @@ -6,6 +6,7 @@ use std::collections::VecDeque; pub mod columnation; pub mod flatcontainer; +pub mod columnar; /// A container transferring data through dataflow edges /// diff --git a/timely/examples/columnar.rs b/timely/examples/columnar.rs new file mode 100644 index 000000000..b3d01f851 --- /dev/null +++ b/timely/examples/columnar.rs @@ -0,0 +1,98 @@ +//! Wordcount based on flatcontainer. + +use { + std::collections::HashMap, + timely::{Container, container::CapacityContainerBuilder}, + timely::container::columnar::Columnar, + timely::dataflow::channels::pact::{ExchangeCore, Pipeline}, + timely::dataflow::InputHandleCore, + timely::dataflow::operators::{Inspect, Operator, Probe}, + timely::dataflow::ProbeHandle, +}; + +fn main() { + + use timely_container::columnar::Strings; + type Container = Columnar<(Strings, Vec)>; + + // initializes and runs a timely dataflow. + timely::execute_from_args(std::env::args(), |worker| { + let mut input = >>::new(); + let mut probe = ProbeHandle::new(); + + // create a new input, exchange data, and inspect its output + worker.dataflow::(|scope| { + input + .to_stream(scope) + .unary( + Pipeline, + "Split", + |_cap, _info| { + move |input, output| { + while let Some((time, data)) = input.next() { + let mut session = output.session(&time); + for (text, diff) in data.iter().flat_map(|(text, diff)| { + text.split_whitespace().map(move |s| (s, diff)) + }) { + session.give((text, diff)); + } + } + } + }, + ) + .container::() + .unary_frontier( + ExchangeCore::new(|(s, _): &(&str, _)| s.len() as u64), + "WordCount", + |_capability, _info| { + let mut queues = HashMap::new(); + let mut counts = HashMap::new(); + + move |input, output| { + while let Some((time, data)) = input.next() { + queues + .entry(time.retain()) + .or_insert(Vec::new()) + .push(data.take()); + } + + for (key, val) in queues.iter_mut() { + if !input.frontier().less_equal(key.time()) { + let mut session = output.session(key); + for batch in val.drain(..) { + for (word, diff) in batch.iter() { + let total = + if let Some(count) = counts.get_mut(word) { + *count += diff; + *count + } + else { + counts.insert(word.to_string(), *diff); + *diff + }; + session.give((word, total)); + } + } + } + } + + queues.retain(|_key, val| !val.is_empty()); + } + }, + ) + .container::() + .inspect(|x| println!("seen: {:?}", x)) + .probe_with(&mut probe); + }); + + // introduce data and watch! + for round in 0..10 { + input.send(("flat container", 1)); + input.advance_to(round + 1); + while probe.less_than(input.time()) { + worker.step(); + } + } + }) + .unwrap(); +}