Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support GATs and containers in more places #550

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions container/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ license = "MIT"

[dependencies]
columnation = { git = "https://github.com/frankmcsherry/columnation" }
flatcontainer = "0.1"
serde = { version = "1.0"}
50 changes: 50 additions & 0 deletions container/src/flatcontainer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
//! Present a [`FlatStack`] as a timely container.

pub use flatcontainer::*;
use crate::{buffer, Container, PushContainer, PushInto};

impl<R: Region + Clone + 'static> Container for FlatStack<R> {
type ItemRef<'a> = R::ReadItem<'a> where Self: 'a;
type Item<'a> = R::ReadItem<'a> where Self: 'a;

fn len(&self) -> usize {
self.len()
}

fn clear(&mut self) {
self.clear()
}

type Iter<'a> = <&'a Self as IntoIterator>::IntoIter;

fn iter<'a>(&'a self) -> Self::Iter<'a> {
IntoIterator::into_iter(self)
}

type DrainIter<'a> = Self::Iter<'a>;

fn drain<'a>(&'a mut self) -> Self::DrainIter<'a> {
IntoIterator::into_iter(&*self)
}
}

impl<R: Region + Clone + 'static> PushContainer for FlatStack<R> {
fn capacity(&self) -> usize {
self.capacity()
}

fn preferred_capacity() -> usize {
buffer::default_capacity::<R::Index>()
}

fn reserve(&mut self, additional: usize) {
self.reserve(additional);
}
}

impl<R: Region + Clone + 'static, T: CopyOnto<R>> PushInto<FlatStack<R>> for T {
#[inline]
fn push_into(self, target: &mut FlatStack<R>) {
target.copy(self);
}
}
1 change: 1 addition & 0 deletions container/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#![forbid(missing_docs)]

pub mod columnation;
pub mod flatcontainer;

/// A container transferring data through dataflow edges
///
Expand Down
6 changes: 3 additions & 3 deletions mdbook/src/chapter_2/chapter_2_4.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ fn main() {
timely::example(|scope| {
(0u64..10)
.to_stream(scope)
.unary(Pipeline, "increment", |capability, info| {
.unary::<Vec<_>, _, _, _>(Pipeline, "increment", |capability, info| {

let mut vector = Vec::new();
move |input, output| {
Expand Down Expand Up @@ -75,7 +75,7 @@ use timely::dataflow::operators::generic::operator::source;
fn main() {
timely::example(|scope| {

source(scope, "Source", |capability, info| {
source::<_, Vec<_>, _, _>(scope, "Source", |capability, info| {

// Acquire a re-activator for this operator.
use timely::scheduling::Scheduler;
Expand Down Expand Up @@ -131,7 +131,7 @@ fn main() {
timely::example(|scope| {
(0u64..10)
.to_stream(scope)
.unary(Pipeline, "increment", |capability, info| {
.unary::<Vec<_>, _, _, _>(Pipeline, "increment", |capability, info| {

let mut maximum = 0; // define this here; use in the closure
let mut vector = Vec::new();
Expand Down
2 changes: 1 addition & 1 deletion mdbook/src/chapter_2/chapter_2_5.md
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ As before, I'm just going to show you the new code, which now lives just after `
# .map(move |word| (word.to_owned(), diff))
# .collect::<Vec<_>>()
# )
.unary_frontier(
.unary_frontier::<Vec<_>, _, _, _>(
Exchange::new(|x: &(String, i64)| (x.0).len() as u64),
"WordCount",
|_capability, operator_info| {
Expand Down
2 changes: 1 addition & 1 deletion timely/examples/distinct.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ fn main() {
worker.dataflow::<usize,_,_>(|scope| {
let mut counts_by_time = HashMap::new();
scope.input_from(&mut input)
.unary(Exchange::new(|x| *x), "Distinct", move |_, _|
.unary::<Vec<_>, _, _, _>(Exchange::new(|x| *x), "Distinct", move |_, _|
move |input, output| {
input.for_each(|time, data| {
let counts =
Expand Down
2 changes: 1 addition & 1 deletion timely/examples/hashjoin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ fn main() {
let exchange2 = Exchange::new(|x: &(u64, u64)| x.0);

stream1
.binary(&stream2, exchange1, exchange2, "HashJoin", |_capability, _info| {
.binary::<_, Vec<_>, _, _, _, _>(&stream2, exchange1, exchange2, "HashJoin", |_capability, _info| {

let mut map1 = HashMap::<u64, Vec<u64>>::new();
let mut map2 = HashMap::<u64, Vec<u64>>::new();
Expand Down
2 changes: 1 addition & 1 deletion timely/examples/wordcount.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fn main() {
.map(move |word| (word.to_owned(), diff))
.collect::<Vec<_>>()
)
.unary_frontier(exchange, "WordCount", |_capability, _info| {
.unary_frontier::<Vec<_>, _, _, _>(exchange, "WordCount", |_capability, _info| {

let mut queues = HashMap::new();
let mut counts = HashMap::new();
Expand Down
93 changes: 93 additions & 0 deletions timely/examples/wordcount_flatcontainer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
//! Wordcount based on flatcontainer.

#[cfg(feature = "bincode")]
use {
std::collections::HashMap,
timely::container::flatcontainer::{Containerized, FlatStack},
timely::dataflow::channels::pact::{ExchangeCore, Pipeline},
timely::dataflow::operators::core::InputHandle,
timely::dataflow::operators::{Inspect, Operator, Probe},
timely::dataflow::ProbeHandle,
};

#[cfg(feature = "bincode")]
fn main() {
// initializes and runs a timely dataflow.
timely::execute_from_args(std::env::args(), |worker| {
let mut input =
<InputHandle<_, FlatStack<<(String, i64) as Containerized>::Region>>>::new();
let mut probe = ProbeHandle::new();

// create a new input, exchange data, and inspect its output
worker.dataflow::<usize, _, _>(|scope| {
input
.to_stream(scope)
.unary::<FlatStack<<(String, i64) as Containerized>::Region>, _, _, _>(
Pipeline,
"Split",
|_cap, _info| {
move |input, output| {
while let Some((time, data)) = input.next() {
let mut session = output.session(&time);
for (text, diff) in data.iter().flat_map(|(text, diff)| {
text.split_whitespace().map(move |s| (s, diff))
}) {
session.give((text, diff));
}
}
}
},
)
.unary_frontier::<FlatStack<<(String, i64) as Containerized>::Region>, _, _, _>(
ExchangeCore::new(|(s, _): &(&str, _)| s.len() as u64),
"WordCount",
|_capability, _info| {
let mut queues = HashMap::new();
let mut counts = HashMap::new();

move |input, output| {
while let Some((time, data)) = input.next() {
queues
.entry(time.retain())
.or_insert(Vec::new())
.push(data.take());
}

for (key, val) in queues.iter_mut() {
if !input.frontier().less_equal(key.time()) {
let mut session = output.session(key);
for batch in val.drain(..) {
for (word, diff) in batch.iter() {
let entry =
counts.entry(word.to_string()).or_insert(0i64);
*entry += diff;
session.give((word, *entry));
}
}
}
}

queues.retain(|_key, val| !val.is_empty());
}
},
)
.inspect(|x| println!("seen: {:?}", x))
.probe_with(&mut probe);
});

// introduce data and watch!
for round in 0..10 {
input.send(("flat container", 1));
input.advance_to(round + 1);
while probe.less_than(input.time()) {
worker.step();
}
}
})
.unwrap();
}

#[cfg(not(feature = "bincode"))]
fn main() {
eprintln!("Example requires feature bincode.");
}
9 changes: 3 additions & 6 deletions timely/src/dataflow/channels/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@ pub mod pullers;
pub mod pact;

/// The input to and output from timely dataflow communication channels.
pub type BundleCore<T, D> = crate::communication::Message<Message<T, D>>;

/// The input to and output from timely dataflow communication channels specialized to vectors.
pub type Bundle<T, D> = BundleCore<T, Vec<D>>;
pub type Bundle<T, D> = crate::communication::Message<Message<T, D>>;

/// A serializable representation of timestamped data.
#[derive(Clone, Abomonation, Serialize, Deserialize)]
Expand Down Expand Up @@ -46,11 +43,11 @@ impl<T, D: Container> Message<T, D> {
/// Forms a message, and pushes contents at `pusher`. Replaces `buffer` with what the pusher
/// leaves in place, or the container's default element.
#[inline]
pub fn push_at<P: Push<BundleCore<T, D>>>(buffer: &mut D, time: T, pusher: &mut P) {
pub fn push_at<P: Push<Bundle<T, D>>>(buffer: &mut D, time: T, pusher: &mut P) {

let data = ::std::mem::take(buffer);
let message = Message::new(time, data, 0, 0);
let mut bundle = Some(BundleCore::from_typed(message));
let mut bundle = Some(Bundle::from_typed(message));

pusher.push(&mut bundle);

Expand Down
30 changes: 15 additions & 15 deletions timely/src/dataflow/channels/pact.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,17 @@ use crate::communication::allocator::thread::{ThreadPusher, ThreadPuller};
use crate::communication::{Push, Pull, Data};
use crate::container::PushPartitioned;
use crate::dataflow::channels::pushers::Exchange as ExchangePusher;
use crate::dataflow::channels::{BundleCore, Message};
use crate::dataflow::channels::{Bundle, Message};
use crate::logging::{TimelyLogger as Logger, MessagesEvent};
use crate::progress::Timestamp;
use crate::worker::AsWorker;

/// A `ParallelizationContractCore` allocates paired `Push` and `Pull` implementors.
pub trait ParallelizationContractCore<T, D> {
/// Type implementing `Push` produced by this pact.
type Pusher: Push<BundleCore<T, D>>+'static;
type Pusher: Push<Bundle<T, D>>+'static;
/// Type implementing `Pull` produced by this pact.
type Puller: Pull<BundleCore<T, D>>+'static;
type Puller: Pull<Bundle<T, D>>+'static;
/// Allocates a matched pair of push and pull endpoints implementing the pact.
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller);
}
Expand All @@ -39,8 +39,8 @@ impl<T, D: Clone, P: ParallelizationContractCore<T, Vec<D>>> ParallelizationCont
pub struct Pipeline;

impl<T: 'static, D: Container> ParallelizationContractCore<T, D> for Pipeline {
type Pusher = LogPusher<T, D, ThreadPusher<BundleCore<T, D>>>;
type Puller = LogPuller<T, D, ThreadPuller<BundleCore<T, D>>>;
type Pusher = LogPusher<T, D, ThreadPusher<Bundle<T, D>>>;
type Puller = LogPuller<T, D, ThreadPuller<Bundle<T, D>>>;
fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
let (pusher, puller) = allocator.pipeline::<Message<T, D>>(identifier, address);
// // ignore `&mut A` and use thread allocator
Expand Down Expand Up @@ -76,8 +76,8 @@ where
C: Data + PushPartitioned,
for<'a> H: FnMut(&C::Item<'a>) -> u64
{
type Pusher = ExchangePusher<T, C, LogPusher<T, C, Box<dyn Push<BundleCore<T, C>>>>, H>;
type Puller = LogPuller<T, C, Box<dyn Pull<BundleCore<T, C>>>>;
type Pusher = ExchangePusher<T, C, LogPusher<T, C, Box<dyn Push<Bundle<T, C>>>>, H>;
type Puller = LogPuller<T, C, Box<dyn Pull<Bundle<T, C>>>>;

fn connect<A: AsWorker>(self, allocator: &mut A, identifier: usize, address: &[usize], logging: Option<Logger>) -> (Self::Pusher, Self::Puller) {
let (senders, receiver) = allocator.allocate::<Message<T, C>>(identifier, address);
Expand All @@ -94,7 +94,7 @@ impl<C, F> Debug for ExchangeCore<C, F> {

/// Wraps a `Message<T,D>` pusher to provide a `Push<(T, Content<D>)>`.
#[derive(Debug)]
pub struct LogPusher<T, D, P: Push<BundleCore<T, D>>> {
pub struct LogPusher<T, D, P: Push<Bundle<T, D>>> {
pusher: P,
channel: usize,
counter: usize,
Expand All @@ -104,7 +104,7 @@ pub struct LogPusher<T, D, P: Push<BundleCore<T, D>>> {
logging: Option<Logger>,
}

impl<T, D, P: Push<BundleCore<T, D>>> LogPusher<T, D, P> {
impl<T, D, P: Push<Bundle<T, D>>> LogPusher<T, D, P> {
/// Allocates a new pusher.
pub fn new(pusher: P, source: usize, target: usize, channel: usize, logging: Option<Logger>) -> Self {
LogPusher {
Expand All @@ -119,9 +119,9 @@ impl<T, D, P: Push<BundleCore<T, D>>> LogPusher<T, D, P> {
}
}

impl<T, D: Container, P: Push<BundleCore<T, D>>> Push<BundleCore<T, D>> for LogPusher<T, D, P> {
impl<T, D: Container, P: Push<Bundle<T, D>>> Push<Bundle<T, D>> for LogPusher<T, D, P> {
#[inline]
fn push(&mut self, pair: &mut Option<BundleCore<T, D>>) {
fn push(&mut self, pair: &mut Option<Bundle<T, D>>) {
if let Some(bundle) = pair {
self.counter += 1;

Expand Down Expand Up @@ -150,15 +150,15 @@ impl<T, D: Container, P: Push<BundleCore<T, D>>> Push<BundleCore<T, D>> for LogP

/// Wraps a `Message<T,D>` puller to provide a `Pull<(T, Content<D>)>`.
#[derive(Debug)]
pub struct LogPuller<T, D, P: Pull<BundleCore<T, D>>> {
pub struct LogPuller<T, D, P: Pull<Bundle<T, D>>> {
puller: P,
channel: usize,
index: usize,
phantom: PhantomData<(T, D)>,
logging: Option<Logger>,
}

impl<T, D, P: Pull<BundleCore<T, D>>> LogPuller<T, D, P> {
impl<T, D, P: Pull<Bundle<T, D>>> LogPuller<T, D, P> {
/// Allocates a new `Puller`.
pub fn new(puller: P, index: usize, channel: usize, logging: Option<Logger>) -> Self {
LogPuller {
Expand All @@ -171,9 +171,9 @@ impl<T, D, P: Pull<BundleCore<T, D>>> LogPuller<T, D, P> {
}
}

impl<T, D: Container, P: Pull<BundleCore<T, D>>> Pull<BundleCore<T, D>> for LogPuller<T, D, P> {
impl<T, D: Container, P: Pull<Bundle<T, D>>> Pull<Bundle<T, D>> for LogPuller<T, D, P> {
#[inline]
fn pull(&mut self) -> &mut Option<BundleCore<T,D>> {
fn pull(&mut self) -> &mut Option<Bundle<T,D>> {
let result = self.puller.pull();
if let Some(bundle) = result {
let channel = self.channel;
Expand Down
12 changes: 6 additions & 6 deletions timely/src/dataflow/channels/pullers/counter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
use std::rc::Rc;
use std::cell::RefCell;

use crate::dataflow::channels::BundleCore;
use crate::dataflow::channels::Bundle;
use crate::progress::ChangeBatch;
use crate::communication::Pull;
use crate::Container;

/// A wrapper which accounts records pulled past in a shared count map.
pub struct Counter<T: Ord+Clone+'static, D, P: Pull<BundleCore<T, D>>> {
pub struct Counter<T: Ord+Clone+'static, D, P: Pull<Bundle<T, D>>> {
pullable: P,
consumed: Rc<RefCell<ChangeBatch<T>>>,
phantom: ::std::marker::PhantomData<D>,
Expand All @@ -36,15 +36,15 @@ impl<T:Ord+Clone+'static> Drop for ConsumedGuard<T> {
}
}

impl<T:Ord+Clone+'static, D: Container, P: Pull<BundleCore<T, D>>> Counter<T, D, P> {
impl<T:Ord+Clone+'static, D: Container, P: Pull<Bundle<T, D>>> Counter<T, D, P> {
/// Retrieves the next timestamp and batch of data.
#[inline]
pub fn next(&mut self) -> Option<&mut BundleCore<T, D>> {
pub fn next(&mut self) -> Option<&mut Bundle<T, D>> {
self.next_guarded().map(|(_guard, bundle)| bundle)
}

#[inline]
pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard<T>, &mut BundleCore<T, D>)> {
pub(crate) fn next_guarded(&mut self) -> Option<(ConsumedGuard<T>, &mut Bundle<T, D>)> {
if let Some(message) = self.pullable.pull() {
let guard = ConsumedGuard {
consumed: Rc::clone(&self.consumed),
Expand All @@ -57,7 +57,7 @@ impl<T:Ord+Clone+'static, D: Container, P: Pull<BundleCore<T, D>>> Counter<T, D,
}
}

impl<T:Ord+Clone+'static, D, P: Pull<BundleCore<T, D>>> Counter<T, D, P> {
impl<T:Ord+Clone+'static, D, P: Pull<Bundle<T, D>>> Counter<T, D, P> {
/// Allocates a new `Counter` from a boxed puller.
pub fn new(pullable: P) -> Self {
Counter {
Expand Down
Loading
Loading