diff --git a/src/operators/count.rs b/src/operators/count.rs index e44a59a85..b39bb8769 100644 --- a/src/operators/count.rs +++ b/src/operators/count.rs @@ -69,51 +69,70 @@ where self.stream.unary_frontier(Pipeline, "CountTotal", move |_,_| { - // tracks the upper limit of known-complete timestamps. + // tracks the lower and upper limit of received batches. + let mut lower_limit = timely::progress::frontier::Antichain::from_elem(::minimum()); let mut upper_limit = timely::progress::frontier::Antichain::from_elem(::minimum()); move |input, output| { - use crate::trace::cursor::IntoOwned; + let mut batch_cursors = Vec::new(); + let mut batch_storage = Vec::new(); + + // Downgrade previous upper limit to be current lower limit. + lower_limit.clear(); + lower_limit.extend(upper_limit.borrow().iter().cloned()); + + let mut cap = None; input.for_each(|capability, batches| { + if cap.is_none() { // NB: Assumes batches are in-order + cap = Some(capability.retain()); + } batches.swap(&mut buffer); - let mut session = output.session(&capability); for batch in buffer.drain(..) { - let mut batch_cursor = batch.cursor(); - let (mut trace_cursor, trace_storage) = trace.cursor_through(batch.lower().borrow()).unwrap(); - upper_limit.clone_from(batch.upper()); - - while let Some(key) = batch_cursor.get_key(&batch) { - let mut count: Option = None; - - trace_cursor.seek_key(&trace_storage, key); - if trace_cursor.get_key(&trace_storage) == Some(key) { - trace_cursor.map_times(&trace_storage, |_, diff| { - count.as_mut().map(|c| c.plus_equals(&diff)); - if count.is_none() { count = Some(diff.into_owned()); } - }); - } + upper_limit.clone_from(batch.upper()); // NB: Assumes batches are in-order + batch_cursors.push(batch.cursor()); + batch_storage.push(batch); + } + }); - batch_cursor.map_times(&batch, |time, diff| { + if let Some(capability) = cap { - if let Some(count) = count.as_ref() { - if !count.is_zero() { - session.give(((key.into_owned(), count.clone()), time.into_owned(), R2::from(-1i8))); - } - } + let mut session = output.session(&capability); + + use crate::trace::cursor::CursorList; + let mut batch_cursor = CursorList::new(batch_cursors, &batch_storage); + let (mut trace_cursor, trace_storage) = trace.cursor_through(lower_limit.borrow()).unwrap(); + + while let Some(key) = batch_cursor.get_key(&batch_storage) { + let mut count: Option = None; + + trace_cursor.seek_key(&trace_storage, key); + if trace_cursor.get_key(&trace_storage) == Some(key) { + trace_cursor.map_times(&trace_storage, |_, diff| { count.as_mut().map(|c| c.plus_equals(&diff)); if count.is_none() { count = Some(diff.into_owned()); } - if let Some(count) = count.as_ref() { - if !count.is_zero() { - session.give(((key.into_owned(), count.clone()), time.into_owned(), R2::from(1i8))); - } - } }); - - batch_cursor.step_key(&batch); } + + batch_cursor.map_times(&batch_storage, |time, diff| { + + if let Some(count) = count.as_ref() { + if !count.is_zero() { + session.give(((key.into_owned(), count.clone()), time.into_owned(), R2::from(-1i8))); + } + } + count.as_mut().map(|c| c.plus_equals(&diff)); + if count.is_none() { count = Some(diff.into_owned()); } + if let Some(count) = count.as_ref() { + if !count.is_zero() { + session.give(((key.into_owned(), count.clone()), time.into_owned(), R2::from(1i8))); + } + } + }); + + batch_cursor.step_key(&batch_storage); } - }); + } // tidy up the shared input trace. trace.advance_upper(&mut upper_limit); diff --git a/src/operators/threshold.rs b/src/operators/threshold.rs index 3d8a405a2..3825f0e1c 100644 --- a/src/operators/threshold.rs +++ b/src/operators/threshold.rs @@ -15,7 +15,6 @@ use crate::hashable::Hashable; use crate::collection::AsCollection; use crate::operators::arrange::{Arranged, ArrangeBySelf}; use crate::trace::{BatchReader, Cursor, TraceReader}; -use crate::trace::cursor::IntoOwned; /// Extension trait for the `distinct` differential dataflow method. pub trait ThresholdTotal where G::Timestamp: TotalOrder+Lattice+Ord { @@ -117,66 +116,85 @@ where self.stream.unary_frontier(Pipeline, "ThresholdTotal", move |_,_| { - // tracks the upper limit of known-complete timestamps. + // tracks the lower and upper limit of received batches. + let mut lower_limit = timely::progress::frontier::Antichain::from_elem(::minimum()); let mut upper_limit = timely::progress::frontier::Antichain::from_elem(::minimum()); move |input, output| { + let mut batch_cursors = Vec::new(); + let mut batch_storage = Vec::new(); + + // Downgrde previous upper limit to be current lower limit. + lower_limit.clear(); + lower_limit.extend(upper_limit.borrow().iter().cloned()); + + let mut cap = None; input.for_each(|capability, batches| { + if cap.is_none() { // NB: Assumes batches are in-order + cap = Some(capability.retain()); + } batches.swap(&mut buffer); - let mut session = output.session(&capability); for batch in buffer.drain(..) { + upper_limit.clone_from(batch.upper()); // NB: Assumes batches are in-order + batch_cursors.push(batch.cursor()); + batch_storage.push(batch); + } + }); - let mut batch_cursor = batch.cursor(); - let (mut trace_cursor, trace_storage) = trace.cursor_through(batch.lower().borrow()).unwrap(); + use crate::trace::cursor::IntoOwned; + if let Some(capability) = cap { - upper_limit.clone_from(batch.upper()); + let mut session = output.session(&capability); - while let Some(key) = batch_cursor.get_key(&batch) { - let mut count: Option = None; + use crate::trace::cursor::CursorList; + let mut batch_cursor = CursorList::new(batch_cursors, &batch_storage); + let (mut trace_cursor, trace_storage) = trace.cursor_through(lower_limit.borrow()).unwrap(); - // Compute the multiplicity of this key before the current batch. - trace_cursor.seek_key(&trace_storage, key); - if trace_cursor.get_key(&trace_storage) == Some(key) { - trace_cursor.map_times(&trace_storage, |_, diff| { - count.as_mut().map(|c| c.plus_equals(&diff)); - if count.is_none() { count = Some(diff.into_owned()); } - }); - } + while let Some(key) = batch_cursor.get_key(&batch_storage) { + let mut count: Option = None; - // Apply `thresh` both before and after `diff` is applied to `count`. - // If the result is non-zero, send it along. - batch_cursor.map_times(&batch, |time, diff| { - - let difference = - match &count { - Some(old) => { - let mut temp = old.clone(); - temp.plus_equals(&diff); - thresh(key, &temp, Some(old)) - }, - None => { thresh(key, &diff.into_owned(), None) }, - }; - - // Either add or assign `diff` to `count`. - if let Some(count) = &mut count { - count.plus_equals(&diff); - } - else { - count = Some(diff.into_owned()); - } + // Compute the multiplicity of this key before the current batch. + trace_cursor.seek_key(&trace_storage, key); + if trace_cursor.get_key(&trace_storage) == Some(key) { + trace_cursor.map_times(&trace_storage, |_, diff| { + count.as_mut().map(|c| c.plus_equals(&diff)); + if count.is_none() { count = Some(diff.into_owned()); } + }); + } + + // Apply `thresh` both before and after `diff` is applied to `count`. + // If the result is non-zero, send it along. + batch_cursor.map_times(&batch_storage, |time, diff| { + + let difference = + match &count { + Some(old) => { + let mut temp = old.clone(); + temp.plus_equals(&diff); + thresh(key, &temp, Some(old)) + }, + None => { thresh(key, &diff.into_owned(), None) }, + }; + + // Either add or assign `diff` to `count`. + if let Some(count) = &mut count { + count.plus_equals(&diff); + } + else { + count = Some(diff.into_owned()); + } - if let Some(difference) = difference { - if !difference.is_zero() { - session.give((key.clone(), time.into_owned(), difference)); - } + if let Some(difference) = difference { + if !difference.is_zero() { + session.give((key.clone(), time.into_owned(), difference)); } - }); + } + }); - batch_cursor.step_key(&batch); - } + batch_cursor.step_key(&batch_storage); } - }); + } // tidy up the shared input trace. trace.advance_upper(&mut upper_limit);