From 7b5407e1ebaadc0829a69868197e237ca994c896 Mon Sep 17 00:00:00 2001 From: Jan Teske Date: Wed, 29 Nov 2023 11:33:40 +0100 Subject: [PATCH] Restructure `TrackerEvent` This commit changes the shape of `TrackerEvent` to include the source/target distinction in a `Location` rather than separate enum variants. Apart from less boilerplate, this speeds up the frontier logging in `propagate_all` as we can directly use the contents of `pushed_changes`, instead of having to split them into target and source changes. A side effect of this is that the definition of `ProgressEventTimestampVec` changes, which also affects the `TimelyProgressEvent` type. --- timely/examples/logging-send.rs | 8 +- timely/src/logging.rs | 22 ++-- timely/src/progress/broadcast.rs | 25 ++-- timely/src/progress/reachability.rs | 194 +++++++++------------------- 4 files changed, 86 insertions(+), 163 deletions(-) diff --git a/timely/examples/logging-send.rs b/timely/examples/logging-send.rs index 23228f4e83..518fce32f7 100644 --- a/timely/examples/logging-send.rs +++ b/timely/examples/logging-send.rs @@ -29,13 +29,13 @@ fn main() { println!("PROGRESS: {:?}", x); let (_, _, ev) = x; print!("PROGRESS: TYPED MESSAGES: "); - for (n, p, t, d) in ev.messages.iter() { - print!("{:?}, ", (n, p, t.as_any().downcast_ref::(), d)); + for (l, t, d) in ev.messages.iter() { + print!("{:?}, ", (l, t.as_any().downcast_ref::(), d)); } println!(); print!("PROGRESS: TYPED INTERNAL: "); - for (n, p, t, d) in ev.internal.iter() { - print!("{:?}, ", (n, p, t.as_any().downcast_ref::(), d)); + for (l, t, d) in ev.internal.iter() { + print!("{:?}, ", (l, t.as_any().downcast_ref::(), d)); } println!(); }) diff --git a/timely/src/logging.rs b/timely/src/logging.rs index ca9868fbe5..54cdaa6de5 100644 --- a/timely/src/logging.rs +++ b/timely/src/logging.rs @@ -11,6 +11,7 @@ pub type TimelyProgressLogger = Logger; use std::time::Duration; use crate::dataflow::operators::capture::{Event, EventPusher}; +use crate::progress::Location; /// Logs events as a timely stream, with progress statements. pub struct BatchLogger where P: EventPusher { @@ -80,10 +81,15 @@ pub trait ProgressEventTimestamp: std::fmt::Debug + std::any::Any { /// /// # Example /// ```rust - /// let ts = vec![(0usize, 0usize, (23u64, 10u64), -4i64), (0usize, 0usize, (23u64, 11u64), 1i64)]; + /// use timely::progress::Location; + /// + /// let ts = vec![ + /// (Location::new_target(0, 0), (23u64, 10u64), -4i64), + /// (Location::new_target(0, 0), (23u64, 11u64), 1i64), + /// ]; /// let ts: &timely::logging::ProgressEventTimestampVec = &ts; - /// for (n, p, t, d) in ts.iter() { - /// print!("{:?}, ", (n, p, t.as_any().downcast_ref::<(u64, u64)>(), d)); + /// for (l, t, d) in ts.iter() { + /// print!("{:?}, ", (l, t.as_any().downcast_ref::<(u64, u64)>(), d)); /// } /// println!(); /// ``` @@ -114,14 +120,14 @@ impl ProgressEventTimestamp fo /// for each dynamically typed element). pub trait ProgressEventTimestampVec: std::fmt::Debug + std::any::Any { /// Iterate over the contents of the vector - fn iter<'a>(&'a self) -> Box+'a>; + fn iter<'a>(&'a self) -> Box+'a>; } -impl ProgressEventTimestampVec for Vec<(usize, usize, T, i64)> { - fn iter<'a>(&'a self) -> Box+'a> { - Box::new(<[(usize, usize, T, i64)]>::iter(&self[..]).map(|(n, p, t, d)| { +impl ProgressEventTimestampVec for Vec<(Location, T, i64)> { + fn iter<'a>(&'a self) -> Box+'a> { + Box::new(<[_]>::iter(&self[..]).map(|(l, t, d)| { let t: &dyn ProgressEventTimestamp = t; - (n, p, t, d) + (l, t, d) })) } } diff --git a/timely/src/progress/broadcast.rs b/timely/src/progress/broadcast.rs index 7c0b95dfe9..6b9c8fda00 100644 --- a/timely/src/progress/broadcast.rs +++ b/timely/src/progress/broadcast.rs @@ -1,7 +1,7 @@ //! Broadcasts progress information among workers. use crate::progress::{ChangeBatch, Timestamp}; -use crate::progress::{Location, Port}; +use crate::progress::Location; use crate::communication::{Message, Push, Pull}; use crate::logging::TimelyLogger as Logger; use crate::logging::TimelyProgressLogger as ProgressLogger; @@ -68,13 +68,10 @@ impl Progcaster { let mut internal = Box::new(Vec::with_capacity(changes.len())); for ((location, time), diff) in changes.iter() { - match location.port { - Port::Target(port) => { - messages.push((location.node, port, time.clone(), *diff)) - }, - Port::Source(port) => { - internal.push((location.node, port, time.clone(), *diff)) - } + if location.is_target() { + messages.push((*location, time.clone(), *diff)) + } else { + internal.push((*location, time.clone(), *diff)) } } @@ -137,14 +134,10 @@ impl Progcaster { let mut internal = Box::new(Vec::with_capacity(changes.len())); for ((location, time), diff) in recv_changes.iter() { - - match location.port { - Port::Target(port) => { - messages.push((location.node, port, time.clone(), *diff)) - }, - Port::Source(port) => { - internal.push((location.node, port, time.clone(), *diff)) - } + if location.is_target() { + messages.push((*location, time.clone(), *diff)) + } else { + internal.push((*location, time.clone(), *diff)) } } diff --git a/timely/src/progress/reachability.rs b/timely/src/progress/reachability.rs index 67f3a20c87..fb4038dd01 100644 --- a/timely/src/progress/reachability.rs +++ b/timely/src/progress/reachability.rs @@ -565,25 +565,18 @@ impl Tracker { // Step 0: If logging is enabled, construct and log inbound changes. if let Some(logger) = &mut self.logger { + let changes_count = self.target_changes.len() + self.source_changes.len(); + let mut changes = Vec::with_capacity(changes_count); - let target_changes = - self.target_changes - .iter() - .map(|((target, time), diff)| (target.node, target.port, time.clone(), *diff)) - .collect::>(); - - if !target_changes.is_empty() { - logger.log_target_pointstamp_updates(Box::new(target_changes)); + for ((target, time), diff) in self.target_changes.iter() { + changes.push((Location::from(*target), time.clone(), *diff)); + } + for ((source, time), diff) in self.source_changes.iter() { + changes.push((Location::from(*source), time.clone(), *diff)); } - let source_changes = - self.source_changes - .iter() - .map(|((source, time), diff)| (source.node, source.port, time.clone(), *diff)) - .collect::>(); - - if !source_changes.is_empty() { - logger.log_source_pointstamp_updates(Box::new(source_changes)); + if !changes.is_empty() { + logger.log_pointstamp_updates(Box::new(changes)); } } @@ -699,25 +692,14 @@ impl Tracker { // Step 3: If logging is enabled, construct and log outbound changes. if let Some(logger) = &mut self.logger { - let mut target_changes = Vec::new(); - let mut source_changes = Vec::new(); - - for ((location, time), diff) in self.pushed_changes.iter() { - match location.port { - Port::Target(port) => { - target_changes.push((location.node, port, time.clone(), *diff)) - } - Port::Source(port) => { - source_changes.push((location.node, port, time.clone(), *diff)) - } - } - } + let changes: Vec<_> = self + .pushed_changes + .iter() + .map(|((location, time), diff)| (*location, time.clone(), *diff)) + .collect(); - if !target_changes.is_empty() || !source_changes.is_empty() { - logger.log_frontier_updates( - Box::new(target_changes), - Box::new(source_changes), - ); + if !changes.is_empty() { + logger.log_frontier_updates(Box::new(changes)); } } @@ -864,112 +846,60 @@ pub mod logging { Self { path, logger } } - /// Log source pointstamp update events with additional identifying information. - pub fn log_source_pointstamp_updates(&mut self, updates: Box) { - self.logger.log({ - SourcePointstampUpdate { + /// Log pointstamp update events with additional identifying information. + pub fn log_pointstamp_updates(&mut self, updates: Box) { + self.logger.log( + PointstampUpdates { tracker_id: self.path.clone(), updates, } - }) + ) } - /// Log target pointstamp update events with additional identifying information. - pub fn log_target_pointstamp_updates(&mut self, updates: Box) { - self.logger.log({ - TargetPointstampUpdate { + + /// Log frontier update events with additional identifying information. + pub fn log_frontier_updates(&mut self, updates: Box) { + self.logger.log( + FrontierUpdates { tracker_id: self.path.clone(), updates, } - }) - } - - /// Log frontier update events with additional identifying information. - /// - /// We want to log source and target updates at the same time to ensure callers observe - /// consistent frontiers at any point in time. - pub fn log_frontier_updates( - &mut self, - source_updates: Box, - target_updates: Box, - ) { - let source_event: TrackerEvent = SourceFrontierUpdate { - tracker_id: self.path.clone(), - updates: source_updates, - }.into(); - let target_event: TrackerEvent = TargetFrontierUpdate { - tracker_id: self.path.clone(), - updates: target_updates, - }.into(); - - self.logger.log_many([source_event, target_event]); + ) } } /// Events that the tracker may record. pub enum TrackerEvent { - /// Pointstamp updates made at a source of data. - SourcePointstampUpdate(SourcePointstampUpdate), - /// Pointstamp updates made at a target of data. - TargetPointstampUpdate(TargetPointstampUpdate), - /// Frontier updates made at a source of data. - SourceFrontierUpdate(SourceFrontierUpdate), - /// Frontier updates made at a target of data. - TargetFrontierUpdate(TargetFrontierUpdate), - } - - /// A pointstamp update made at a source of data. - pub struct SourcePointstampUpdate { - /// An identifier for the tracker. - pub tracker_id: Vec, - /// Updates themselves, as `(node, port, time, diff)`. - pub updates: Box, - } - - /// A pointstamp update made at a target of data. - pub struct TargetPointstampUpdate { - /// An identifier for the tracker. - pub tracker_id: Vec, - /// Updates themselves, as `(node, port, time, diff)`. - pub updates: Box, + /// Pointstamp updates made. + PointstampUpdates(PointstampUpdates), + /// Frontier updates made. + FrontierUpdates(FrontierUpdates), } - /// A frontier update at a source of data. - pub struct SourceFrontierUpdate { + /// Pointstamp updates reported by a tracker. + pub struct PointstampUpdates { /// An identifier for the tracker. pub tracker_id: Vec, - /// Updates themselves, as `(node, port, time, diff)`. + /// Updates themselves, as `(location, time, diff)`. pub updates: Box, } - /// A frontier update at a target of data. - pub struct TargetFrontierUpdate { + /// Frontier updates reported by a tracker. + pub struct FrontierUpdates { /// An identifier for the tracker. pub tracker_id: Vec, - /// Updates themselves, as `(node, port, time, diff)`. + /// Updates themselves, as `(location, time, diff)`. pub updates: Box, } - impl From for TrackerEvent { - fn from(v: SourcePointstampUpdate) -> Self { - Self::SourcePointstampUpdate(v) - } - } - - impl From for TrackerEvent { - fn from(v: TargetPointstampUpdate) -> Self { - Self::TargetPointstampUpdate(v) + impl From for TrackerEvent { + fn from(v: PointstampUpdates) -> Self { + Self::PointstampUpdates(v) } } - impl From for TrackerEvent { - fn from(v: SourceFrontierUpdate) -> Self { - Self::SourceFrontierUpdate(v) - } - } - - impl From for TrackerEvent { - fn from(v: TargetFrontierUpdate) -> Self { - Self::TargetFrontierUpdate(v) + impl From for TrackerEvent { + fn from(v: FrontierUpdates) -> Self { + Self::FrontierUpdates(v) } } } @@ -988,52 +918,46 @@ impl Drop for Tracker { }; // Retract pending data that `propagate_all` would normally log. - let mut target_pointstamp_changes = Vec::new(); - let mut source_pointstamp_changes = Vec::new(); - let mut target_frontier_changes = Vec::new(); - let mut source_frontier_changes = Vec::new(); + let mut pointstamp_changes = Vec::new(); + let mut frontier_changes = Vec::new(); for (index, per_operator) in self.per_operator.iter_mut().enumerate() { for (port, target) in per_operator.targets.iter_mut().enumerate() { + let location = Location::new_target(index, port); let pointstamp_retractions = target.pointstamps .updates() - .map(|(time, diff)| (index, port, time.clone(), -diff)); - target_pointstamp_changes.extend(pointstamp_retractions); + .map(|(time, diff)| (location, time.clone(), -diff)); + pointstamp_changes.extend(pointstamp_retractions); let frontier = target.implications.frontier().to_owned(); let frontier_retractions = frontier .into_iter() - .map(|time| (index, port, time, -1)); - target_frontier_changes.extend(frontier_retractions); + .map(|time| (location, time, -1)); + frontier_changes.extend(frontier_retractions); } } for (index, per_operator) in self.per_operator.iter_mut().enumerate() { for (port, source) in per_operator.sources.iter_mut().enumerate() { + let location = Location::new_source(index, port); let pointstamp_retractions = source.pointstamps .updates() - .map(|(time, diff)| (index, port, time.clone(), -diff)); - source_pointstamp_changes.extend(pointstamp_retractions); + .map(|(time, diff)| (location, time.clone(), -diff)); + pointstamp_changes.extend(pointstamp_retractions); let frontier = source.implications.frontier().to_owned(); let frontier_retractions = frontier .into_iter() - .map(|time| (index, port, time, -1)); - source_frontier_changes.extend(frontier_retractions); + .map(|time| (location, time, -1)); + frontier_changes.extend(frontier_retractions); } } - if !target_pointstamp_changes.is_empty() { - logger.log_target_pointstamp_updates(Box::new(target_pointstamp_changes)); - } - if !source_pointstamp_changes.is_empty() { - logger.log_source_pointstamp_updates(Box::new(source_pointstamp_changes)); + if !pointstamp_changes.is_empty() { + logger.log_pointstamp_updates(Box::new(pointstamp_changes)); } - if !source_frontier_changes.is_empty() || !target_frontier_changes.is_empty() { - logger.log_frontier_updates( - Box::new(source_frontier_changes), - Box::new(target_frontier_changes), - ); + if !frontier_changes.is_empty() { + logger.log_frontier_updates(Box::new(frontier_changes)); } } }