Skip to content

Commit

Permalink
Extract logic to config parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Nov 7, 2023
1 parent bd12951 commit 612fc91
Show file tree
Hide file tree
Showing 6 changed files with 36 additions and 45 deletions.
12 changes: 12 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,5 +140,17 @@ impl Config {
pub fn configure(config: &mut timely::WorkerConfig, options: &Config) {
if let Some(effort) = options.idle_merge_effort {
config.set("differential/idle_merge_effort".to_string(), effort);
config.set::<trace::ExertionLogic>(
"differential/default_exert_logic".to_string(),
std::sync::Arc::new(move |batches| {
let mut non_empty = 0;
for (_index, count, length) in batches {
if count > 1 { return Some(effort as usize); }
if length > 0 { non_empty += 1; }
if non_empty > 1 { return Some(effort as usize); }
}
None
}),
);
}
}
19 changes: 5 additions & 14 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@ use timely::dataflow::operators::Capability;
use ::{Data, ExchangeData, Collection, AsCollection, Hashable};
use ::difference::Semigroup;
use lattice::Lattice;
use trace::{Trace, TraceReader, Batch, BatchReader, Batcher, Cursor};
use trace::{self, Trace, TraceReader, Batch, BatchReader, Batcher, Cursor};
use trace::implementations::ord::OrdValSpine as DefaultValTrace;
use trace::implementations::ord::OrdKeySpine as DefaultKeyTrace;

use trace::wrappers::enter::{TraceEnter, BatchEnter};
use trace::wrappers::enter::{TraceEnter, BatchEnter,};
use trace::wrappers::enter_at::TraceEnter as TraceEnterAt;
use trace::wrappers::enter_at::BatchEnter as BatchEnterAt;
use trace::wrappers::filter::{TraceFilter, BatchFilter};
Expand Down Expand Up @@ -565,18 +565,9 @@ where

let activator = Some(self.scope().activator_for(&info.address[..]));
let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);

// If idle merge effort exists, configure aggressive idle merging logic.
if let Some(effort) = self.inner.scope().config().get::<isize>("differential/idle_merge_effort").cloned() {
empty_trace.set_exert_logic(Some(Box::new(move |batches| {
let mut non_empty = 0;
for (_index, count, length) in batches {
if count > 1 { return Some(effort as usize); }
if length > 0 { non_empty += 1; }
if non_empty > 1 { return Some(effort as usize); }
}
None
})));
// If there is default exertion logic set, install it.
if let Some(exert_logic) = self.inner.scope().config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
empty_trace.set_exert_logic(exert_logic);
}

let (reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
Expand Down
16 changes: 4 additions & 12 deletions src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ use timely::dataflow::operators::Capability;

use ::{ExchangeData, Hashable};
use lattice::Lattice;
use trace::{Trace, TraceReader, Batch, Cursor};
use trace::{self, Trace, TraceReader, Batch, Cursor};

use trace::Builder;

Expand Down Expand Up @@ -171,17 +171,9 @@ where
// Form the trace we will both use internally and publish.
let activator = Some(stream.scope().activator_for(&info.address[..]));
let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
// If idle merge effort exists, configure aggressive idle merging logic.
if let Some(effort) = stream.scope().config().get::<isize>("differential/idle_merge_effort").cloned() {
empty_trace.set_exert_logic(Some(Box::new(move |batches| {
let mut non_empty = 0;
for (_index, count, length) in batches {
if count > 1 { return Some(effort as usize); }
if length > 0 { non_empty += 1; }
if non_empty > 1 { return Some(effort as usize); }
}
None
})));

if let Some(exert_logic) = stream.scope().config().get::<trace::ExertionLogic>("differential/default_exert_logic").cloned() {
empty_trace.set_exert_logic(exert_logic);
}

let (mut reader_local, mut writer) = TraceAgent::new(empty_trace, info, logger);
Expand Down
17 changes: 5 additions & 12 deletions src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use timely::dataflow::operators::Capability;

use operators::arrange::{Arranged, ArrangeByKey, ArrangeBySelf, TraceAgent};
use lattice::Lattice;
use trace::{Batch, BatchReader, Cursor, Trace, Builder};
use trace::{Batch, BatchReader, Cursor, Trace, Builder, ExertionLogic};
use trace::cursor::CursorList;
use trace::implementations::ord::OrdValSpine as DefaultValTrace;
use trace::implementations::ord::OrdKeySpine as DefaultKeyTrace;
Expand Down Expand Up @@ -353,19 +353,12 @@ where

let activator = Some(self.stream.scope().activator_for(&operator_info.address[..]));
let mut empty = T2::new(operator_info.clone(), logger.clone(), activator);
// If idle merge effort exists, configure aggressive idle merging logic.
if let Some(effort) = self.stream.scope().config().get::<isize>("differential/idle_merge_effort").cloned() {
empty.set_exert_logic(Some(Box::new(move |batches| {
let mut non_empty = 0;
for (_index, count, length) in batches {
if count > 1 { return Some(effort as usize); }
if length > 0 { non_empty += 1; }
if non_empty > 1 { return Some(effort as usize); }
}
None
})));
// If there is default exert logic set, install it.
if let Some(exert_logic) = self.stream.scope().config().get::<ExertionLogic>("differential/default_exert_logic").cloned() {
empty.set_exert_logic(exert_logic);
}


let mut source_trace = self.trace.clone();

let (mut output_reader, mut output_writer) = TraceAgent::new(empty, operator_info, logger);
Expand Down
12 changes: 6 additions & 6 deletions src/trace/implementations/spine_fueled.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ use std::fmt::Debug;
use ::logging::Logger;
use ::difference::Semigroup;
use lattice::Lattice;
use trace::{Batch, BatchReader, Trace, TraceReader};
use trace::{Batch, BatchReader, Trace, TraceReader, ExertionLogic};
use trace::cursor::{Cursor, CursorList};
use trace::Merger;

Expand All @@ -98,7 +98,7 @@ pub struct Spine<B: Batch> where B::Time: Lattice+Ord, B::R: Semigroup {
effort: usize,
activator: Option<timely::scheduling::activate::Activator>,
/// Logic to indicate whether and how many records we should introduce in the absence of actual updates.
exert_logic: Option<Box<dyn for<'a> Fn(Box<dyn Iterator<Item=(usize, usize, usize)>+'a>)->Option<usize>>>,
exert_logic: ExertionLogic,
}

impl<B> TraceReader for Spine<B>
Expand Down Expand Up @@ -290,7 +290,7 @@ where
}
}

fn set_exert_logic(&mut self, logic: Option<Box<dyn for<'a> Fn(Box<dyn Iterator<Item=(usize, usize, usize)>+'a>)->Option<usize>>>) {
fn set_exert_logic(&mut self, logic: ExertionLogic) {
self.exert_logic = logic;
}

Expand Down Expand Up @@ -398,15 +398,15 @@ where
/// This method prepares an iterator over batches, including the level, count, and length of each layer.
/// It supplies this to `self.exert_logic`, who produces the response of the amount of exertion to apply.
fn exert_effort(&self) -> Option<usize> {
self.exert_logic.as_ref().and_then(|l| (**l)(
(self.exert_logic)(
Box::new(self.merging.iter().enumerate().rev().map(|(index, batch)| {
match batch {
MergeState::Vacant => (index, 0, 0),
MergeState::Single(_) => (index, 1, batch.len()),
MergeState::Double(_) => (index, 2, batch.len()),
}
}))
))
)
}

/// Describes the merge progress of layers in the trace.
Expand Down Expand Up @@ -449,7 +449,7 @@ where
upper: Antichain::from_elem(<B::Time as timely::progress::Timestamp>::minimum()),
effort,
activator,
exert_logic: None,
exert_logic: std::sync::Arc::new(|_batches| None),
}
}

Expand Down
5 changes: 4 additions & 1 deletion src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ use timely::progress::Timestamp;
pub use self::cursor::Cursor;
pub use self::description::Description;

/// A type used to express how much effort a trace should exert even in the absence of updates.
pub type ExertionLogic = std::sync::Arc<dyn for<'a> Fn(Box<dyn Iterator<Item=(usize, usize, usize)>+'a>)->Option<usize>+Send+Sync>;

// The traces and batch and cursors want the flexibility to appear as if they manage certain types of keys and
// values and such, while perhaps using other representations, I'm thinking mostly of wrappers around the keys
// and vals that change the `Ord` implementation, or stash hash codes, or the like.
Expand Down Expand Up @@ -216,7 +219,7 @@ where <Self as TraceReader>::Batch: Batch {
/// The function receives an iterator over batch levels, from large to small, as triples `(level, count, length)`,
/// indicating the level, the number of batches, and their total length in updates. It should return a number of
/// updates to perform, or `None` if no work is required.
fn set_exert_logic(&mut self, logic: Option<Box<dyn for<'a> Fn(Box<dyn Iterator<Item=(usize, usize, usize)>+'a>)->Option<usize>>>);
fn set_exert_logic(&mut self, logic: ExertionLogic);

/// Introduces a batch of updates to the trace.
///
Expand Down

0 comments on commit 612fc91

Please sign in to comment.