Skip to content

Commit

Permalink
fixes and flat_map_ref and the likes
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Sep 15, 2023
1 parent 5e81e2b commit a5ed004
Showing 1 changed file with 45 additions and 6 deletions.
51 changes: 45 additions & 6 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ use lattice::Lattice;
use trace::{Trace, TraceReader, Batch, BatchReader, Batcher, Cursor};
use trace::implementations::ord::OrdValSpine as DefaultValTrace;
use trace::implementations::ord::OrdKeySpine as DefaultKeyTrace;
use trace::layers::MergeContainer;
use trace::layers::{BatchContainer, MergeContainer};

use trace::wrappers::enter::{TraceEnter, BatchEnter};
use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
Expand Down Expand Up @@ -78,6 +78,7 @@ where

use ::timely::dataflow::scopes::Child;
use ::timely::progress::timestamp::Refines;
use timely::Container;

impl<G: Scope, Tr> Arranged<G, Tr>
where
Expand Down Expand Up @@ -198,12 +199,26 @@ where
/// The underlying `Stream<G, BatchWrapper<T::Batch>>` is a much more efficient way to access the data,
/// and this method should only be used when the data need to be transformed or exchanged, rather than
/// supplied as arguments to an operator using the same key-value structure.
pub fn as_collection<D: Data, L>(&self, mut logic: L) -> Collection<G, D, Tr::R>
pub fn as_collection<D: Data, L>(&self, logic: L) -> Collection<G, D, Tr::R>
where
Tr::R: Semigroup,
L: FnMut(&Tr::Key, &Tr::Val) -> D+'static,
{
self.flat_map_ref(move |key, val| Some(logic(key,val)))
self.as_collection_core(logic)
}

/// Flattens the stream into a `Collection`.
///
/// The underlying `Stream<G, BatchWrapper<T::Batch>>` is a much more efficient way to access the data,
/// and this method should only be used when the data need to be transformed or exchanged, rather than
/// supplied as arguments to an operator using the same key-value structure.
pub fn as_collection_core<D: Data, C, L>(&self, mut logic: L) -> Collection<G, D, Tr::R, C>
where
Tr::R: Semigroup,
L: FnMut(&Tr::Key, &Tr::Val) -> D+'static,
C: Container<Item=(D, G::Timestamp, Tr::R)> + BatchContainer<Item=(D, G::Timestamp, Tr::R)>,
{
self.flat_map_ref_core(move |key, val| Some(logic(key,val)))
}

/// Extracts elements from an arrangement as a collection.
Expand All @@ -216,6 +231,21 @@ where
I: IntoIterator,
I::Item: Data,
L: FnMut(&Tr::Key, &Tr::Val) -> I+'static,
{
self.flat_map_ref_core(logic)
}

/// Extracts elements from an arrangement as a collection.
///
/// The supplied logic may produce an iterator over output values, allowing either
/// filtering or flat mapping as part of the extraction.
pub fn flat_map_ref_core<I, L, C>(&self, logic: L) -> Collection<G, I::Item, Tr::R, C>
where
Tr::R: Semigroup,
I: IntoIterator,
I::Item: Data,
L: FnMut(&Tr::Key, &Tr::Val) -> I+'static,
C: Container<Item=(I::Item, G::Timestamp, Tr::R)> + BatchContainer<Item=(I::Item, G::Timestamp, Tr::R)>,
{
Self::flat_map_batches(&self.stream, logic)
}
Expand All @@ -227,14 +257,16 @@ where
///
/// This method exists for streams of batches without the corresponding arrangement.
/// If you have the arrangement, its `flat_map_ref` method is equivalent to this.
pub fn flat_map_batches<I, L>(stream: &Stream<G, Tr::Batch>, mut logic: L) -> Collection<G, I::Item, Tr::R>
pub fn flat_map_batches<I, L, C>(stream: &Stream<G, Tr::Batch>, mut logic: L) -> Collection<G, I::Item, Tr::R, C>
where
Tr::R: Semigroup,
I: IntoIterator,
I::Item: Data,
L: FnMut(&Tr::Key, &Tr::Val) -> I+'static,
C: Container<Item=(I::Item, G::Timestamp, Tr::R)> + BatchContainer<Item=(I::Item, G::Timestamp, Tr::R)>,
{
stream.unary(Pipeline, "AsCollection", move |_,_| move |input, output| {
let mut buffer = C::with_capacity(::timely::container::buffer::default_capacity::<(I::Item, G::Timestamp, Tr::R)>());
input.for_each(|time, data| {
let mut session = output.session(&time);
for wrapper in data.iter() {
Expand All @@ -244,14 +276,21 @@ where
while let Some(val) = cursor.get_val(batch) {
for datum in logic(key, val) {
cursor.map_times(batch, |time, diff| {
session.give((datum.clone(), time.clone(), diff.clone()));
buffer.copy(&(datum.clone(), time.clone(), diff.clone()));
if buffer.len() == buffer.capacity() {
session.give_container(&mut buffer);
buffer = C::with_capacity(::timely::container::buffer::default_capacity::<(I::Item, G::Timestamp, Tr::R)>());
}
});
}
cursor.step_val(batch);
}
cursor.step_key(batch);
}
}
if !buffer.is_empty() {
session.give_container(&mut buffer);
}
});
})
.as_collection()
Expand Down Expand Up @@ -506,7 +545,7 @@ where
K: Data,
V: Data,
R: Semigroup,
C: Data + TimelyContainer<Item=((K, V), G::Timestamp, R)> + MergeContainer,
C: TimelyContainer<Item=((K, V), G::Timestamp, R)> + MergeContainer,
{
fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
Expand Down

0 comments on commit a5ed004

Please sign in to comment.