Skip to content

Commit

Permalink
Update more core operators (TimelyDataflow#555)
Browse files Browse the repository at this point in the history
* Update Map

* Update Filter

* Update OkErr

* Update Input

* Update UnorderedInput

* Update ToStream
  • Loading branch information
frankmcsherry authored Mar 22, 2024
1 parent 258e3af commit b407978
Show file tree
Hide file tree
Showing 13 changed files with 828 additions and 742 deletions.
2 changes: 1 addition & 1 deletion timely/src/dataflow/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
pub use self::stream::{StreamCore, Stream};
pub use self::scopes::{Scope, ScopeParent};

pub use self::operators::input::HandleCore as InputHandleCore;
pub use self::operators::core::input::Handle as InputHandleCore;
pub use self::operators::input::Handle as InputHandle;
pub use self::operators::probe::Handle as ProbeHandle;

Expand Down
41 changes: 41 additions & 0 deletions timely/src/dataflow/operators/core/filter.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
//! Filters a stream by a predicate.
use timely_container::{Container, PushContainer, PushInto};

use crate::dataflow::channels::pact::Pipeline;
use crate::dataflow::{Scope, StreamCore};
use crate::dataflow::operators::generic::operator::Operator;

/// Extension trait for filtering.
pub trait Filter<C: Container> {
/// Returns a new instance of `self` containing only records satisfying `predicate`.
///
/// # Examples
/// ```
/// use timely::dataflow::operators::ToStream;
/// use timely::dataflow::operators::core::{Filter, Inspect};
///
/// timely::example(|scope| {
/// (0..10).to_stream(scope)
/// .filter(|x| *x % 2 == 0)
/// .inspect(|x| println!("seen: {:?}", x));
/// });
/// ```
fn filter<P: FnMut(&C::Item<'_>)->bool+'static>(&self, predicate: P) -> Self;
}

impl<G: Scope, C: PushContainer> Filter<C> for StreamCore<G, C>
where
for<'a> C::Item<'a>: PushInto<C>
{
fn filter<P: FnMut(&C::Item<'_>)->bool+'static>(&self, mut predicate: P) -> StreamCore<G, C> {
let mut container = Default::default();
self.unary(Pipeline, "Filter", move |_,_| move |input, output| {
input.for_each(|time, data| {
data.swap(&mut container);
if !container.is_empty() {
output.session(&time).give_iterator(container.drain().filter(&mut predicate));
}
});
})
}
}
Loading

0 comments on commit b407978

Please sign in to comment.