Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Mar 22, 2024
1 parent b407978 commit 349def7
Show file tree
Hide file tree
Showing 10 changed files with 165 additions and 1 deletion.
1 change: 1 addition & 0 deletions container/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ license = "MIT"

[dependencies]
columnation = { git = "https://github.com/frankmcsherry/columnation" }
flatcontainer = "0.1"
serde = { version = "1.0"}
50 changes: 50 additions & 0 deletions container/src/flatcontainer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
//! Present a [`FlatStack`] as a timely container.

pub use flatcontainer::*;
use crate::{buffer, Container, PushContainer, PushInto};

impl<R: Region + Clone + 'static> Container for FlatStack<R> {
type ItemRef<'a> = R::ReadItem<'a> where Self: 'a;
type Item<'a> = R::ReadItem<'a> where Self: 'a;

fn len(&self) -> usize {
self.len()
}

fn clear(&mut self) {
self.clear()
}

type Iter<'a> = <&'a Self as IntoIterator>::IntoIter;

fn iter<'a>(&'a self) -> Self::Iter<'a> {
IntoIterator::into_iter(self)
}

type DrainIter<'a> = Self::Iter<'a>;

fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> {
IntoIterator::into_iter(&*self)
}
}

impl<R: Region + Clone + 'static> PushContainer for FlatStack<R> {
fn capacity(&self) -> usize {
self.capacity()
}

fn preferred_capacity() -> usize {
buffer::default_capacity::<R::Index>()
}

fn reserve(&mut self, additional: usize) {
self.reserve(additional);
}
}

impl<R: Region + Clone + 'static, T: CopyOnto<R>> PushInto<FlatStack<R>> for T {
#[inline]
fn push_into(self, target: &mut FlatStack<R>) {
target.copy(self);
}
}
1 change: 1 addition & 0 deletions container/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#![forbid(missing_docs)]

pub mod columnation;
pub mod flatcontainer;

/// A container transferring data through dataflow edges
///
Expand Down
1 change: 1 addition & 0 deletions timely/examples/distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ fn main() {
}
})
})
.container_type::<Vec<_>>()
.inspect(move |x| println!("worker {}:\tvalue {}", index, x))
.probe_with(&mut probe);
});
Expand Down
93 changes: 93 additions & 0 deletions timely/examples/flatcontainer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
//! Wordcount based on flatcontainer.

#[cfg(feature = "bincode")]
use {
std::collections::HashMap,
timely::container::flatcontainer::{Containerized, FlatStack},
timely::dataflow::channels::pact::{ExchangeCore, Pipeline},
timely::dataflow::operators::core::InputHandle,
timely::dataflow::operators::{Inspect, Operator, Probe},
timely::dataflow::ProbeHandle,
};

#[cfg(feature = "bincode")]
fn main() {
// initializes and runs a timely dataflow.
timely::execute_from_args(std::env::args(), |worker| {
let mut input =
<InputHandle<_, FlatStack<<(String, i64) as Containerized>::Region>>>::new();
let mut probe = ProbeHandle::new();

// create a new input, exchange data, and inspect its output
worker.dataflow::<usize, _, _>(|scope| {
input
.to_stream(scope)
.unary::<FlatStack<<(String, i64) as Containerized>::Region>, _, _, _>(
Pipeline,
"Split",
|_cap, _info| {
move |input, output| {
while let Some((time, data)) = input.next() {
let mut session = output.session(&time);
for (text, diff) in data.iter().flat_map(|(text, diff)| {
text.split_whitespace().map(move |s| (s, diff))
}) {
session.give((text, diff));
}
}
}
},
)
.unary_frontier::<FlatStack<<(String, i64) as Containerized>::Region>, _, _, _>(
ExchangeCore::new(|(s, _): &(&str, _)| s.len() as u64),
"WordCount",
|_capability, _info| {
let mut queues = HashMap::new();
let mut counts = HashMap::new();

move |input, output| {
while let Some((time, data)) = input.next() {
queues
.entry(time.retain())
.or_insert(Vec::new())
.push(data.take());
}

for (key, val) in queues.iter_mut() {
if !input.frontier().less_equal(key.time()) {
let mut session = output.session(key);
for batch in val.drain(..) {
for (word, diff) in batch.iter() {
let entry =
counts.entry(word.to_string()).or_insert(0i64);
*entry += diff;
session.give((word, *entry));
}
}
}
}

queues.retain(|_key, val| !val.is_empty());
}
},
)
.inspect(|x| println!("seen: {:?}", x))
.probe_with(&mut probe);
});

// introduce data and watch!
for round in 0..10 {
input.send(("flat container", 1));
input.advance_to(round + 1);
while probe.less_than(input.time()) {
worker.step();
}
}
})
.unwrap();
}

#[cfg(not(feature = "bincode"))]
fn main() {
eprintln!("Example requires feature bincode.");
}
1 change: 1 addition & 0 deletions timely/examples/hashjoin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ fn main() {
});
}
})
.container_type::<Vec<_>>()
.probe_with(&mut probe);
});

Expand Down
1 change: 1 addition & 0 deletions timely/examples/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use timely::dataflow::operators::*;
fn main() {
timely::example(|scope| {
(0..10).to_stream(scope)
.container_type::<Vec<_>>()
.inspect(|x| println!("seen: {:?}", x));
});
}
3 changes: 3 additions & 0 deletions timely/examples/wordcount.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ fn main() {
// create a new input, exchange data, and inspect its output
worker.dataflow::<usize,_,_>(|scope| {
input.to_stream(scope)
.container_type::<Vec<_>>()
.flat_map(|(text, diff): (String, i64)|
text.split_whitespace()
.map(move |word| (word.to_owned(), diff))
.collect::<Vec<_>>()
)
.container_type::<Vec<_>>()
.unary_frontier(exchange, "WordCount", |_capability, _info| {

let mut queues = HashMap::new();
Expand Down Expand Up @@ -51,6 +53,7 @@ fn main() {

queues.retain(|_key, val| !val.is_empty());
}})
.container_type::<Vec<_>>()
.inspect(|x| println!("seen: {:?}", x))
.probe_with(&mut probe);
});
Expand Down
2 changes: 1 addition & 1 deletion timely/src/dataflow/operators/core/rc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ mod test {
#[test]
fn test_shared() {
let output = crate::example(|scope| {
let shared = vec![Ok(0), Err(())].to_stream(scope).shared();
let shared = vec![Ok(0), Err(())].to_stream(scope).container_type::<Vec<_>>().shared();
scope
.concatenate([
shared.unary(Pipeline, "read shared 1", |_, _| {
Expand Down
13 changes: 13 additions & 0 deletions timely/src/dataflow/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,19 @@ impl<S: Scope, C: Container> StreamCore<S, C> {
pub fn name(&self) -> &Source { &self.name }
/// The scope immediately containing the stream.
pub fn scope(&self) -> S { self.scope.clone() }

/// Allows the assertion of a container type, for the benefit of type inference.
pub fn container_type<D: Container>(self) -> StreamCore<S, D> where Self: AsStream<S, D> { self.as_stream() }
}

/// A type that can be translated to a [StreamCore].
pub trait AsStream<S: Scope, C> {
/// Translate `self` to a [StreamCore].
fn as_stream(self) -> StreamCore<S, C>;
}

impl<S: Scope, C> AsStream<S, C> for StreamCore<S, C> {
fn as_stream(self) -> Self { self }
}

impl<S, C> Debug for StreamCore<S, C>
Expand Down

0 comments on commit 349def7

Please sign in to comment.