From 51966ad62680bd7c674e786dd56d1c8daec8d9d7 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Fri, 12 Apr 2024 14:11:07 -0400 Subject: [PATCH] WIP: Extract batcher input to assoc type, arrange_core freestanding (#471) * Extract batcher input to assoc type, arrange_core freestanding Signed-off-by: Moritz Hoffmann * Fix warnings Signed-off-by: Moritz Hoffmann * documentation Signed-off-by: Moritz Hoffmann * formatting Signed-off-by: Moritz Hoffmann --------- Signed-off-by: Moritz Hoffmann --- .github/workflows/deploy.yml | 2 +- .github/workflows/test.yml | 4 +- doop/src/main.rs | 8 +- experiments/src/bin/graphs-interactive-alt.rs | 4 +- experiments/src/bin/graphs-interactive-neu.rs | 4 +- interactive/src/command.rs | 6 +- interactive/src/logging.rs | 6 +- src/operators/arrange/arrangement.rs | 284 +++++++++--------- src/operators/consolidate.rs | 2 +- src/trace/implementations/merge_batcher.rs | 1 + .../implementations/merge_batcher_col.rs | 1 + src/trace/mod.rs | 4 +- 12 files changed, 175 insertions(+), 151 deletions(-) diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml index babc088a8e..41c27604b7 100644 --- a/.github/workflows/deploy.yml +++ b/.github/workflows/deploy.yml @@ -9,7 +9,7 @@ jobs: deploy: runs-on: ubuntu-22.04 steps: - - uses: actions/checkout@v3 + - uses: actions/checkout@v4 - run: cargo install mdbook --version 0.4.31 - run: cd mdbook && mdbook build - uses: JamesIves/github-pages-deploy-action@v4 diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index e4643725de..e3c9b60874 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -16,7 +16,7 @@ jobs: toolchain: - stable - 1.72 - name: cargo test on ${{ matrix.os }} + name: cargo test on ${{ matrix.os }}, rust ${{ matrix.toolchain }} runs-on: ${{ matrix.os }}-latest steps: - uses: actions/checkout@v4 @@ -24,7 +24,7 @@ jobs: with: toolchain: ${{ matrix.toolchain }} - name: Cargo test - run: cargo test + run: cargo test --workspace --all-targets # Check formatting with rustfmt mdbook: diff --git a/doop/src/main.rs b/doop/src/main.rs index 6628a7e306..d6ad19c192 100644 --- a/doop/src/main.rs +++ b/doop/src/main.rs @@ -1,4 +1,4 @@ -#![allow(non_snake_case)] +#![allow(non_snake_case, dead_code)] use std::collections::HashMap; use std::rc::Rc; @@ -145,7 +145,7 @@ fn load<'a>(filename: &str, interner: Rc>) -> impl Itera }) } -fn load1<'a>(index: usize, prefix: &str, filename: &str, interner: Rc>) -> impl Iterator+'a { +fn load1<'a>(index: usize, prefix: &str, filename: &str, interner: Rc>) -> impl Iterator+'a { read_file(&format!("{}{}", prefix, filename)) .filter(move |_| index == 0) .map(move |line| { @@ -791,7 +791,7 @@ fn main() { let SupertypeOf = SupertypeOf.enter(scope); // Required by all - let mut Reachable = Relation::<_,(Method)>::new(scope); + let mut Reachable = Relation::<_,Method>::new(scope); // NOTE: Common subexpression. let Reachable_Invocation = @@ -805,7 +805,7 @@ fn main() { // let Reachable = ReachableFinal.clone(); // Class initialization - let mut InitializedClass = Relation::<_,(Type)>::new(scope); + let mut InitializedClass = Relation::<_,Type>::new(scope); // ClassInitializer(?type, ?method) :- basic.MethodImplemented("", "void()", ?type, ?method). let temp1 = interner.borrow_mut().intern(""); diff --git a/experiments/src/bin/graphs-interactive-alt.rs b/experiments/src/bin/graphs-interactive-alt.rs index 55c8a8cf67..18dc0923fd 100644 --- a/experiments/src/bin/graphs-interactive-alt.rs +++ b/experiments/src/bin/graphs-interactive-alt.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] + use rand::{Rng, SeedableRng, StdRng}; use timely::dataflow::*; @@ -389,4 +391,4 @@ where G::Timestamp: Lattice { .concat(&prop) .reduce(|_, s, t| { t.push((*s[0].0, 1)); }) }) -} \ No newline at end of file +} diff --git a/experiments/src/bin/graphs-interactive-neu.rs b/experiments/src/bin/graphs-interactive-neu.rs index a01f23d976..6210113ed5 100644 --- a/experiments/src/bin/graphs-interactive-neu.rs +++ b/experiments/src/bin/graphs-interactive-neu.rs @@ -1,3 +1,5 @@ +#![allow(dead_code)] + use rand::{Rng, SeedableRng, StdRng}; use timely::dataflow::*; @@ -392,4 +394,4 @@ where G::Timestamp: Lattice+Ord { reached.leave() }) -} \ No newline at end of file +} diff --git a/interactive/src/command.rs b/interactive/src/command.rs index c31963afe2..d795ec13a6 100644 --- a/interactive/src/command.rs +++ b/interactive/src/command.rs @@ -151,7 +151,7 @@ where println!("\tTimely logging connection {} of {}", index, number); let socket = listener.incoming().next().unwrap().unwrap(); socket.set_nonblocking(true).expect("failed to set nonblocking"); - streams.push(EventReader::::new(socket)); + streams.push(EventReader::,_>::new(socket)); } println!("\tAll logging connections established"); @@ -174,7 +174,7 @@ where for _ in 0 .. number { let socket = listener.incoming().next().unwrap().unwrap(); socket.set_nonblocking(true).expect("failed to set nonblocking"); - streams.push(EventReader::::new(socket)); + streams.push(EventReader::,_>::new(socket)); } } crate::logging::publish_differential_logging(manager, worker, granularity, &name_as, streams); @@ -195,4 +195,4 @@ where pub fn serialize_into(&self, writer: W) { bincode::serialize_into(writer, self).expect("bincode: serialization failed"); } -} \ No newline at end of file +} diff --git a/interactive/src/logging.rs b/interactive/src/logging.rs index cfc524660d..be07fa4b60 100644 --- a/interactive/src/logging.rs +++ b/interactive/src/logging.rs @@ -30,7 +30,7 @@ where V: ExchangeData+Hash+LoggingValue+Datum, A: Allocate, I : IntoIterator, - ::Item: EventIterator+'static + ::Item: EventIterator>+'static { let (operates, channels, schedule, messages, shutdown, park, text) = worker.dataflow(move |scope| { @@ -217,7 +217,7 @@ where V: ExchangeData+Hash+LoggingValue+Datum, A: Allocate, I : IntoIterator, - ::Item: EventIterator+'static + ::Item: EventIterator>+'static { let (merge,batch) = worker.dataflow(move |scope| { @@ -280,4 +280,4 @@ where manager.traces.set_unkeyed(&Plan::Source(format!("logs/{}/differential/arrange/batch", name)), &batch); manager.traces.set_unkeyed(&Plan::Source(format!("logs/{}/differential/arrange/merge", name)), &merge); -} \ No newline at end of file +} diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index fe65a702de..653ed1e876 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -19,7 +19,7 @@ use timely::dataflow::operators::{Enter, Map}; use timely::order::PartialOrder; -use timely::dataflow::{Scope, Stream}; +use timely::dataflow::{Scope, Stream, StreamCore}; use timely::dataflow::operators::generic::Operator; use timely::dataflow::channels::pact::{ParallelizationContract, Pipeline, Exchange}; use timely::progress::Timestamp; @@ -511,7 +511,7 @@ where V: ExchangeData, R: ExchangeData, Tr::Batch: Batch, - Tr::Batcher: Batcher, + Tr::Batcher: Batcher, Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>, Tr::Builder: Builder, { self.arrange_named("Arrange") @@ -527,7 +527,7 @@ where V: ExchangeData, R: ExchangeData, Tr::Batch: Batch, - Tr::Batcher: Batcher, + Tr::Batcher: Batcher, Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>, Tr::Builder: Builder, { let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); @@ -547,7 +547,7 @@ where R: Clone, Tr: Trace+'static, Tr::Batch: Batch, - Tr::Batcher: Batcher, + Tr::Batcher: Batcher, Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>, Tr::Builder: Builder, ; } @@ -565,160 +565,176 @@ where P: ParallelizationContract>, Tr: Trace+'static, Tr::Batch: Batch, - Tr::Batcher: Batcher, + Tr::Batcher: Batcher, Item = ((K,V),G::Timestamp,R), Time = G::Timestamp>, Tr::Builder: Builder, { - // The `Arrange` operator is tasked with reacting to an advancing input - // frontier by producing the sequence of batches whose lower and upper - // bounds are those frontiers, containing updates at times greater or - // equal to lower and not greater or equal to upper. - // - // The operator uses its batch type's `Batcher`, which accepts update - // triples and responds to requests to "seal" batches (presented as new - // upper frontiers). - // - // Each sealed batch is presented to the trace, and if at all possible - // transmitted along the outgoing channel. Empty batches may not have - // a corresponding capability, as they are only retained for actual data - // held by the batcher, which may prevents the operator from sending an - // empty batch. - - let mut reader: Option> = None; - - // fabricate a data-parallel operator using the `unary_notify` pattern. - let stream = { - - let reader = &mut reader; - - self.inner.unary_frontier(pact, name, move |_capability, info| { - - // Acquire a logger for arrange events. - let logger = { - let scope = self.scope(); - let register = scope.log_register(); - register.get::("differential/arrange") - }; - - // Where we will deposit received updates, and from which we extract batches. - let mut batcher = Tr::Batcher::new(logger.clone(), info.global_id); - - // Capabilities for the lower envelope of updates in `batcher`. - let mut capabilities = Antichain::>::new(); - - let activator = Some(self.scope().activator_for(&info.address[..])); - let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator); - // If there is default exertion logic set, install it. - if let Some(exert_logic) = self.inner.scope().config().get::("differential/default_exert_logic").cloned() { - empty_trace.set_exert_logic(exert_logic); - } - - let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger); - - *reader = Some(reader_local); - - // Initialize to the minimal input frontier. - let mut prev_frontier = Antichain::from_elem(::minimum()); - - move |input, output| { + arrange_core(&self.inner, pact, name) + } +} - // As we receive data, we need to (i) stash the data and (ii) keep *enough* capabilities. - // We don't have to keep all capabilities, but we need to be able to form output messages - // when we realize that time intervals are complete. +/// Arranges a stream of updates by a key, configured with a name and a parallelization contract. +/// +/// This operator arranges a stream of values into a shared trace, whose contents it maintains. +/// It uses the supplied parallelization contract to distribute the data, which does not need to +/// be consistently by key (though this is the most common). +fn arrange_core(stream: &StreamCore::Input>, pact: P, name: &str) -> Arranged> +where + G: Scope, + G::Timestamp: Lattice, + P: ParallelizationContract::Input>, + Tr: Trace+'static, + Tr::Batch: Batch, + Tr::Batcher: Batcher