From 612fc91b443eca6f17876b304c0c22a346b036b8 Mon Sep 17 00:00:00 2001 From: Frank McSherry Date: Tue, 7 Nov 2023 07:19:31 -0500 Subject: [PATCH] Extract logic to config parameter --- src/lib.rs | 12 ++++++++++++ src/operators/arrange/arrangement.rs | 19 +++++-------------- src/operators/arrange/upsert.rs | 16 ++++------------ src/operators/reduce.rs | 17 +++++------------ src/trace/implementations/spine_fueled.rs | 12 ++++++------ src/trace/mod.rs | 5 ++++- 6 files changed, 36 insertions(+), 45 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index c6b10904a..2b18bd796 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -140,5 +140,17 @@ impl Config { pub fn configure(config: &mut timely::WorkerConfig, options: &Config) { if let Some(effort) = options.idle_merge_effort { config.set("differential/idle_merge_effort".to_string(), effort); + config.set::( + "differential/default_exert_logic".to_string(), + std::sync::Arc::new(move |batches| { + let mut non_empty = 0; + for (_index, count, length) in batches { + if count > 1 { return Some(effort as usize); } + if length > 0 { non_empty += 1; } + if non_empty > 1 { return Some(effort as usize); } + } + None + }), + ); } } diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 0633c8c4d..1e94ed130 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -29,11 +29,11 @@ use timely::dataflow::operators::Capability; use ::{Data, ExchangeData, Collection, AsCollection, Hashable}; use ::difference::Semigroup; use lattice::Lattice; -use trace::{Trace, TraceReader, Batch, BatchReader, Batcher, Cursor}; +use trace::{self, Trace, TraceReader, Batch, BatchReader, Batcher, Cursor}; use trace::implementations::ord::OrdValSpine as DefaultValTrace; use trace::implementations::ord::OrdKeySpine as DefaultKeyTrace; -use trace::wrappers::enter::{TraceEnter, BatchEnter}; +use trace::wrappers::enter::{TraceEnter, BatchEnter,}; use trace::wrappers::enter_at::TraceEnter as TraceEnterAt; use trace::wrappers::enter_at::BatchEnter as BatchEnterAt; use trace::wrappers::filter::{TraceFilter, BatchFilter}; @@ -565,18 +565,9 @@ where let activator = Some(self.scope().activator_for(&info.address[..])); let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator); - - // If idle merge effort exists, configure aggressive idle merging logic. - if let Some(effort) = self.inner.scope().config().get::("differential/idle_merge_effort").cloned() { - empty_trace.set_exert_logic(Some(Box::new(move |batches| { - let mut non_empty = 0; - for (_index, count, length) in batches { - if count > 1 { return Some(effort as usize); } - if length > 0 { non_empty += 1; } - if non_empty > 1 { return Some(effort as usize); } - } - None - }))); + // If there is default exertion logic set, install it. + if let Some(exert_logic) = self.inner.scope().config().get::("differential/default_exert_logic").cloned() { + empty_trace.set_exert_logic(exert_logic); } let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger); diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index dbcd38c8e..869947ec3 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -117,7 +117,7 @@ use timely::dataflow::operators::Capability; use ::{ExchangeData, Hashable}; use lattice::Lattice; -use trace::{Trace, TraceReader, Batch, Cursor}; +use trace::{self, Trace, TraceReader, Batch, Cursor}; use trace::Builder; @@ -171,17 +171,9 @@ where // Form the trace we will both use internally and publish. let activator = Some(stream.scope().activator_for(&info.address[..])); let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator); - // If idle merge effort exists, configure aggressive idle merging logic. - if let Some(effort) = stream.scope().config().get::("differential/idle_merge_effort").cloned() { - empty_trace.set_exert_logic(Some(Box::new(move |batches| { - let mut non_empty = 0; - for (_index, count, length) in batches { - if count > 1 { return Some(effort as usize); } - if length > 0 { non_empty += 1; } - if non_empty > 1 { return Some(effort as usize); } - } - None - }))); + + if let Some(exert_logic) = stream.scope().config().get::("differential/default_exert_logic").cloned() { + empty_trace.set_exert_logic(exert_logic); } let (mut reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger); diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index 712843e1e..1a0356a80 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -19,7 +19,7 @@ use timely::dataflow::operators::Capability; use operators::arrange::{Arranged, ArrangeByKey, ArrangeBySelf, TraceAgent}; use lattice::Lattice; -use trace::{Batch, BatchReader, Cursor, Trace, Builder}; +use trace::{Batch, BatchReader, Cursor, Trace, Builder, ExertionLogic}; use trace::cursor::CursorList; use trace::implementations::ord::OrdValSpine as DefaultValTrace; use trace::implementations::ord::OrdKeySpine as DefaultKeyTrace; @@ -353,19 +353,12 @@ where let activator = Some(self.stream.scope().activator_for(&operator_info.address[..])); let mut empty = T2::new(operator_info.clone(), logger.clone(), activator); - // If idle merge effort exists, configure aggressive idle merging logic. - if let Some(effort) = self.stream.scope().config().get::("differential/idle_merge_effort").cloned() { - empty.set_exert_logic(Some(Box::new(move |batches| { - let mut non_empty = 0; - for (_index, count, length) in batches { - if count > 1 { return Some(effort as usize); } - if length > 0 { non_empty += 1; } - if non_empty > 1 { return Some(effort as usize); } - } - None - }))); + // If there is default exert logic set, install it. + if let Some(exert_logic) = self.stream.scope().config().get::("differential/default_exert_logic").cloned() { + empty.set_exert_logic(exert_logic); } + let mut source_trace = self.trace.clone(); let (mut output_reader, mut output_writer) = TraceAgent::new(empty, operator_info, logger); diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index 4425b46c5..8acfda51f 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -74,7 +74,7 @@ use std::fmt::Debug; use ::logging::Logger; use ::difference::Semigroup; use lattice::Lattice; -use trace::{Batch, BatchReader, Trace, TraceReader}; +use trace::{Batch, BatchReader, Trace, TraceReader, ExertionLogic}; use trace::cursor::{Cursor, CursorList}; use trace::Merger; @@ -98,7 +98,7 @@ pub struct Spine where B::Time: Lattice+Ord, B::R: Semigroup { effort: usize, activator: Option, /// Logic to indicate whether and how many records we should introduce in the absence of actual updates. - exert_logic: Option Fn(Box+'a>)->Option>>, + exert_logic: ExertionLogic, } impl TraceReader for Spine @@ -290,7 +290,7 @@ where } } - fn set_exert_logic(&mut self, logic: Option Fn(Box+'a>)->Option>>) { + fn set_exert_logic(&mut self, logic: ExertionLogic) { self.exert_logic = logic; } @@ -398,7 +398,7 @@ where /// This method prepares an iterator over batches, including the level, count, and length of each layer. /// It supplies this to `self.exert_logic`, who produces the response of the amount of exertion to apply. fn exert_effort(&self) -> Option { - self.exert_logic.as_ref().and_then(|l| (**l)( + (self.exert_logic)( Box::new(self.merging.iter().enumerate().rev().map(|(index, batch)| { match batch { MergeState::Vacant => (index, 0, 0), @@ -406,7 +406,7 @@ where MergeState::Double(_) => (index, 2, batch.len()), } })) - )) + ) } /// Describes the merge progress of layers in the trace. @@ -449,7 +449,7 @@ where upper: Antichain::from_elem(::minimum()), effort, activator, - exert_logic: None, + exert_logic: std::sync::Arc::new(|_batches| None), } } diff --git a/src/trace/mod.rs b/src/trace/mod.rs index 4593629e6..d9c5e3ca3 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -21,6 +21,9 @@ use timely::progress::Timestamp; pub use self::cursor::Cursor; pub use self::description::Description; +/// A type used to express how much effort a trace should exert even in the absence of updates. +pub type ExertionLogic = std::sync::Arc Fn(Box+'a>)->Option+Send+Sync>; + // The traces and batch and cursors want the flexibility to appear as if they manage certain types of keys and // values and such, while perhaps using other representations, I'm thinking mostly of wrappers around the keys // and vals that change the `Ord` implementation, or stash hash codes, or the like. @@ -216,7 +219,7 @@ where ::Batch: Batch { /// The function receives an iterator over batch levels, from large to small, as triples `(level, count, length)`, /// indicating the level, the number of batches, and their total length in updates. It should return a number of /// updates to perform, or `None` if no work is required. - fn set_exert_logic(&mut self, logic: Option Fn(Box+'a>)->Option>>); + fn set_exert_logic(&mut self, logic: ExertionLogic); /// Introduces a batch of updates to the trace. ///