From cbcec7136806a7ff75118b6c754be7b794574905 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Thu, 22 Feb 2024 15:20:19 -0500 Subject: [PATCH] Change parameters to exert_logic from iterator to slice This avoid the intermediate allocation for every invocation of exert_logic to create a boxed iterator. Instead, pass a slice that contains the same information previously revealed by the iterator. At the same time, wrap the logic in an option to only invoke it if it is set. Signed-off-by: Moritz Hoffmann --- src/lib.rs | 4 ++-- src/trace/implementations/spine_fueled.rs | 22 ++++++++++++++-------- src/trace/mod.rs | 2 +- 3 files changed, 17 insertions(+), 11 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 7d91f2c28..dfd08f284 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -135,8 +135,8 @@ pub fn configure(config: &mut timely::WorkerConfig, options: &Config) { std::sync::Arc::new(move |batches| { let mut non_empty = 0; for (_index, count, length) in batches { - if count > 1 { return Some(effort as usize); } - if length > 0 { non_empty += 1; } + if *count > 1 { return Some(effort as usize); } + if *length > 0 { non_empty += 1; } if non_empty > 1 { return Some(effort as usize); } } None diff --git a/src/trace/implementations/spine_fueled.rs b/src/trace/implementations/spine_fueled.rs index 9ddc2952b..fd3af3b76 100644 --- a/src/trace/implementations/spine_fueled.rs +++ b/src/trace/implementations/spine_fueled.rs @@ -98,8 +98,10 @@ where upper: Antichain, effort: usize, activator: Option, + /// Parameters to `exert_logic`, containing tuples of `(index, count, length)`. + exert_logic_param: Vec<(usize, usize, usize)>, /// Logic to indicate whether and how many records we should introduce in the absence of actual updates. - exert_logic: ExertionLogic, + exert_logic: Option, phantom: std::marker::PhantomData<(BA, BU)>, } @@ -295,7 +297,7 @@ where } fn set_exert_logic(&mut self, logic: ExertionLogic) { - self.exert_logic = logic; + self.exert_logic = Some(logic); } // Ideally, this method acts as insertion of `batch`, even if we are not yet able to begin @@ -383,16 +385,19 @@ impl Spine { /// /// This method prepares an iterator over batches, including the level, count, and length of each layer. /// It supplies this to `self.exert_logic`, who produces the response of the amount of exertion to apply. - fn exert_effort(&self) -> Option { - (self.exert_logic)( - Box::new(self.merging.iter().enumerate().rev().map(|(index, batch)| { + fn exert_effort(&mut self) -> Option { + self.exert_logic.as_ref().and_then(|exert_logic| { + self.exert_logic_param.clear(); + self.exert_logic_param.extend(self.merging.iter().enumerate().rev().map(|(index, batch)| { match batch { MergeState::Vacant => (index, 0, 0), MergeState::Single(_) => (index, 1, batch.len()), MergeState::Double(_) => (index, 2, batch.len()), } - })) - ) + })); + + (exert_logic)(&self.exert_logic_param[..]) + }) } /// Describes the merge progress of layers in the trace. @@ -435,7 +440,8 @@ impl Spine { upper: Antichain::from_elem(::minimum()), effort, activator, - exert_logic: std::sync::Arc::new(|_batches| None), + exert_logic_param: Vec::default(), + exert_logic: None, phantom: std::marker::PhantomData, } } diff --git a/src/trace/mod.rs b/src/trace/mod.rs index 33fcc5da0..136654689 100644 --- a/src/trace/mod.rs +++ b/src/trace/mod.rs @@ -27,7 +27,7 @@ pub use self::cursor::Cursor; pub use self::description::Description; /// A type used to express how much effort a trace should exert even in the absence of updates. -pub type ExertionLogic = std::sync::Arc Fn(Box+'a>)->Option+Send+Sync>; +pub type ExertionLogic = std::sync::Arc Fn(&'a [(usize, usize, usize)])->Option+Send+Sync>; // The traces and batch and cursors want the flexibility to appear as if they manage certain types of keys and // values and such, while perhaps using other representations, I'm thinking mostly of wrappers around the keys