Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Generalize arrange to change type of records #300

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 72 additions & 28 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -517,29 +517,62 @@ where
Tr::Batch: Batch<K, V, G::Timestamp, R>,
Tr::Cursor: Cursor<K, V, G::Timestamp, R>,
{
// The `Arrange` operator is tasked with reacting to an advancing input
// frontier by producing the sequence of batches whose lower and upper
// bounds are those frontiers, containing updates at times greater or
// equal to lower and not greater or equal to upper.
//
// The operator uses its batch type's `Batcher`, which accepts update
// triples and responds to requests to "seal" batches (presented as new
// upper frontiers).
//
// Each sealed batch is presented to the trace, and if at all possible
// transmitted along the outgoing channel. Empty batches may not have
// a corresponding capability, as they are only retained for actual data
// held by the batcher, which may prevents the operator from sending an
// empty batch.

let mut reader: Option<TraceAgent<Tr>> = None;

// fabricate a data-parallel operator using the `unary_notify` pattern.
self.arrange_general::<P, Tr, Tr, _, _>(pact, name, |_capability, _trace_agent| {
|batch, _capability| (batch, Vec::new())
})
}
}


impl<G, K, V, R> Collection<G, (K, V), R>
where
G: Scope,
G::Timestamp: Lattice+Ord,
K: ExchangeData+Hashable,
V: ExchangeData,
R: Semigroup+ExchangeData,
{
/// Arranges a differential dataflow collection with custom user logic.
///
/// This method generalizes `arrange` in that the output type may differ
/// from the input type, and the user is allow to perform logic as the
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: is allowed

/// input batch is formed before it is accepted in to the output trace.
/// This allows users to perform non-standard update logic, perhaps like
/// a state machine more than the simple insertion that `arrange` applies.
///
/// The standard `arrange` operator provides a `logic_builder` that ignores
/// the supplied capability and trace handle, and produces a `logic` closure
/// that on each call just returns the supplied batch.
///
/// More generally, the harness operator will repeatedly peel out capabilities
/// that are not beyond the frontier, mint the batch of input updates that
/// correspond to that interval, and present them to the created closure.
/// The closure is expected to return an output batch (for immediate release)
/// and a list of capabilities the harness should re-introduce, to ensure
/// that the logic is given the opportunity to produce output at those times
/// in the future. These capabilities are both necessary to send outputs,
/// but also act as a scheduling mechanism to ensure that the logic is asked
/// when those times arrive, instead of the harness quietly minting an empty
/// output batch.
pub fn arrange_general<P, TrIn, TrOut, F, L>(&self, pact: P, name: &str, logic_builder: F) -> Arranged<G, TraceAgent<TrOut>>
where
P: ParallelizationContract<G::Timestamp, ((K,V),G::Timestamp,R)>,
TrIn: Trace+TraceReader<Key=K,Val=V,Time=G::Timestamp,R=R>+'static,
TrIn::Batch: Batch<K, V, G::Timestamp, R>,
TrIn::Cursor: Cursor<K, V, G::Timestamp, R>,
TrOut: Trace+TraceReader<Key=K,Time=G::Timestamp>+'static,
TrOut::Batch: Batch<K, TrOut::Val, G::Timestamp, TrOut::R>,
TrOut::Cursor: Cursor<K, TrOut::Val, G::Timestamp, TrOut::R>,
F: FnOnce(Capability<G::Timestamp>, TraceAgent<TrOut>) -> L,
L: FnMut(TrIn::Batch, &Capability<G::Timestamp>)->(TrOut::Batch, Vec<Capability<G::Timestamp>>)+'static,
{
// To be populated by the closure once it gets operator information.
let mut reader: Option<TraceAgent<TrOut>> = None;

let stream = {

let reader = &mut reader;

self.inner.unary_frontier(pact, name, move |_capability, info| {
self.inner.unary_frontier(pact, name, move |capability, info| {

// Acquire a logger for arrange events.
let logger = {
Expand All @@ -549,14 +582,13 @@ where
};

// Where we will deposit received updates, and from which we extract batches.
let mut batcher = <Tr::Batch as Batch<K,V,G::Timestamp,R>>::Batcher::new();
let mut batcher = <TrIn::Batch as Batch<K,V,G::Timestamp,R>>::Batcher::new();

// Capabilities for the lower envelope of updates in `batcher`.
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();

// Buffer for receiving input records.
let mut buffer = Vec::new();


let (activator, effort) =
if let Some(effort) = self.inner.scope().config().get::<isize>("differential/idle_merge_effort").cloned() {
(Some(self.scope().activator_for(&info.address[..])), Some(effort))
Expand All @@ -565,8 +597,10 @@ where
(None, None)
};

let empty_trace = Tr::new(info.clone(), logger.clone(), activator);
let empty_trace = TrOut::new(info.clone(), logger.clone(), activator);
let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
// Capture a reference to the output trace, to use as appropriate.
let mut logic = logic_builder(capability, reader_local.clone());

*reader = Some(reader_local);

Expand Down Expand Up @@ -597,6 +631,8 @@ where
// to the new frontier).
let progress = input_frontier.iter().any(|t2| !input.frontier().less_equal(t2));

let mut new_capabilities = Vec::new();

if progress {

// There are two cases to handle with some care:
Expand Down Expand Up @@ -633,15 +669,23 @@ where
}

// Extract updates not in advance of `upper`.
let batch = batcher.seal(upper.clone());

writer.insert(batch.clone(), Some(capability.time().clone()));
let input_batch = batcher.seal(upper.clone());
let (output_batch, new_caps) = logic(input_batch, &capability);

writer.insert(output_batch.clone(), Some(capability.time().clone()));
// send the batch to downstream consumers, empty or not.
output.session(&capabilities.elements()[index]).give(batch);
output.session(&capability).give(output_batch);

// Ideally all new capabilities are in the future of the supplied capabity,
// and not in the interval we just closed out.
assert!(new_caps.iter().all(|c| upper.less_equal(c) && capabilities.elements()[index].less_equal(c)));

new_capabilities.extend(new_caps);
}
}

capabilities.extend(new_capabilities.drain(..));

// Having extracted and sent batches between each capability and the input frontier,
// we should downgrade all capabilities to match the batcher's lower update frontier.
// This may involve discarding capabilities, which is fine as any new updates arrive
Expand Down