Skip to content

Commit

Permalink
Prevent enter_at from using enter_at (#469)
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry authored Mar 21, 2024
1 parent 84d0d4d commit dfc0062
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 8 deletions.
9 changes: 8 additions & 1 deletion examples/accumulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,14 @@ fn main() {

let mut input = worker.dataflow::<(), _, _>(|scope| {
let (input, data) = scope.new_collection::<_, isize>();
data.consolidate();

use timely::dataflow::Scope;
scope.iterative::<u32,_,_>(|inner| {
data.enter_at(inner, |_| 0)
.consolidate()
.leave()
});

input
});

Expand Down
10 changes: 3 additions & 7 deletions src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,20 +322,16 @@ impl<G: Scope, D: Data, R: Semigroup> Collection<G, D, R> where G::Timestamp: Da
/// data.assert_eq(&result);
/// });
/// ```
pub fn enter_at<'a, T, F>(&self, child: &Iterative<'a, G, T>, initial: F) -> Collection<Iterative<'a, G, T>, D, R>
pub fn enter_at<'a, T, F>(&self, child: &Iterative<'a, G, T>, mut initial: F) -> Collection<Iterative<'a, G, T>, D, R>
where
T: Timestamp+Hash,
F: FnMut(&D) -> T + Clone + 'static,
G::Timestamp: Hash,
{

let mut initial1 = initial.clone();
let mut initial2 = initial.clone();

self.inner
.enter_at(child, move |x| initial1(&x.0))
.enter(child)
.map(move |(data, time, diff)| {
let new_time = Product::new(time, initial2(&data));
let new_time = Product::new(time, initial(&data));
(data, new_time, diff)
})
.as_collection()
Expand Down

0 comments on commit dfc0062

Please sign in to comment.