Skip to content

Commit

Permalink
Restructure TrackerEvent
Browse files Browse the repository at this point in the history
This commit changes the shape of `TrackerEvent` to include the
source/target distinction in a `Location` rather than separate enum
variants. Apart from less boilerplate, this speeds up the frontier
logging in `propagate_all` as we can directly use the contents of
`pushed_changes`, instead of having to split them into target and source
changes.

A side effect of this is that the definition of
`ProgressEventTimestampVec` changes, which also affects the
`TimelyProgressEvent` type.
  • Loading branch information
teskje committed Nov 29, 2023
1 parent a92df78 commit 7b5407e
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 163 deletions.
8 changes: 4 additions & 4 deletions timely/examples/logging-send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ fn main() {
println!("PROGRESS: {:?}", x);
let (_, _, ev) = x;
print!("PROGRESS: TYPED MESSAGES: ");
for (n, p, t, d) in ev.messages.iter() {
print!("{:?}, ", (n, p, t.as_any().downcast_ref::<usize>(), d));
for (l, t, d) in ev.messages.iter() {
print!("{:?}, ", (l, t.as_any().downcast_ref::<usize>(), d));
}
println!();
print!("PROGRESS: TYPED INTERNAL: ");
for (n, p, t, d) in ev.internal.iter() {
print!("{:?}, ", (n, p, t.as_any().downcast_ref::<usize>(), d));
for (l, t, d) in ev.internal.iter() {
print!("{:?}, ", (l, t.as_any().downcast_ref::<usize>(), d));
}
println!();
})
Expand Down
22 changes: 14 additions & 8 deletions timely/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pub type TimelyProgressLogger = Logger<TimelyProgressEvent>;

use std::time::Duration;
use crate::dataflow::operators::capture::{Event, EventPusher};
use crate::progress::Location;

/// Logs events as a timely stream, with progress statements.
pub struct BatchLogger<T, E, P> where P: EventPusher<Duration, (Duration, E, T)> {
Expand Down Expand Up @@ -80,10 +81,15 @@ pub trait ProgressEventTimestamp: std::fmt::Debug + std::any::Any {
///
/// # Example
/// ```rust
/// let ts = vec![(0usize, 0usize, (23u64, 10u64), -4i64), (0usize, 0usize, (23u64, 11u64), 1i64)];
/// use timely::progress::Location;
///
/// let ts = vec![
/// (Location::new_target(0, 0), (23u64, 10u64), -4i64),
/// (Location::new_target(0, 0), (23u64, 11u64), 1i64),
/// ];
/// let ts: &timely::logging::ProgressEventTimestampVec = &ts;
/// for (n, p, t, d) in ts.iter() {
/// print!("{:?}, ", (n, p, t.as_any().downcast_ref::<(u64, u64)>(), d));
/// for (l, t, d) in ts.iter() {
/// print!("{:?}, ", (l, t.as_any().downcast_ref::<(u64, u64)>(), d));
/// }
/// println!();
/// ```
Expand Down Expand Up @@ -114,14 +120,14 @@ impl<T: crate::Data + std::fmt::Debug + std::any::Any> ProgressEventTimestamp fo
/// for each dynamically typed element).
pub trait ProgressEventTimestampVec: std::fmt::Debug + std::any::Any {
/// Iterate over the contents of the vector
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item=(&'a usize, &'a usize, &'a dyn ProgressEventTimestamp, &'a i64)>+'a>;
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item=(&'a Location, &'a dyn ProgressEventTimestamp, &'a i64)>+'a>;
}

impl<T: ProgressEventTimestamp> ProgressEventTimestampVec for Vec<(usize, usize, T, i64)> {
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item=(&'a usize, &'a usize, &'a dyn ProgressEventTimestamp, &'a i64)>+'a> {
Box::new(<[(usize, usize, T, i64)]>::iter(&self[..]).map(|(n, p, t, d)| {
impl<T: ProgressEventTimestamp> ProgressEventTimestampVec for Vec<(Location, T, i64)> {
fn iter<'a>(&'a self) -> Box<dyn Iterator<Item=(&'a Location, &'a dyn ProgressEventTimestamp, &'a i64)>+'a> {
Box::new(<[_]>::iter(&self[..]).map(|(l, t, d)| {
let t: &dyn ProgressEventTimestamp = t;
(n, p, t, d)
(l, t, d)
}))
}
}
Expand Down
25 changes: 9 additions & 16 deletions timely/src/progress/broadcast.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! Broadcasts progress information among workers.

use crate::progress::{ChangeBatch, Timestamp};
use crate::progress::{Location, Port};
use crate::progress::Location;
use crate::communication::{Message, Push, Pull};
use crate::logging::TimelyLogger as Logger;
use crate::logging::TimelyProgressLogger as ProgressLogger;
Expand Down Expand Up @@ -68,13 +68,10 @@ impl<T:Timestamp+Send> Progcaster<T> {
let mut internal = Box::new(Vec::with_capacity(changes.len()));

for ((location, time), diff) in changes.iter() {
match location.port {
Port::Target(port) => {
messages.push((location.node, port, time.clone(), *diff))
},
Port::Source(port) => {
internal.push((location.node, port, time.clone(), *diff))
}
if location.is_target() {
messages.push((*location, time.clone(), *diff))
} else {
internal.push((*location, time.clone(), *diff))
}
}

Expand Down Expand Up @@ -137,14 +134,10 @@ impl<T:Timestamp+Send> Progcaster<T> {
let mut internal = Box::new(Vec::with_capacity(changes.len()));

for ((location, time), diff) in recv_changes.iter() {

match location.port {
Port::Target(port) => {
messages.push((location.node, port, time.clone(), *diff))
},
Port::Source(port) => {
internal.push((location.node, port, time.clone(), *diff))
}
if location.is_target() {
messages.push((*location, time.clone(), *diff))
} else {
internal.push((*location, time.clone(), *diff))
}
}

Expand Down
194 changes: 59 additions & 135 deletions timely/src/progress/reachability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -565,25 +565,18 @@ impl<T:Timestamp> Tracker<T> {

// Step 0: If logging is enabled, construct and log inbound changes.
if let Some(logger) = &mut self.logger {
let changes_count = self.target_changes.len() + self.source_changes.len();
let mut changes = Vec::with_capacity(changes_count);

let target_changes =
self.target_changes
.iter()
.map(|((target, time), diff)| (target.node, target.port, time.clone(), *diff))
.collect::<Vec<_>>();

if !target_changes.is_empty() {
logger.log_target_pointstamp_updates(Box::new(target_changes));
for ((target, time), diff) in self.target_changes.iter() {
changes.push((Location::from(*target), time.clone(), *diff));
}
for ((source, time), diff) in self.source_changes.iter() {
changes.push((Location::from(*source), time.clone(), *diff));
}

let source_changes =
self.source_changes
.iter()
.map(|((source, time), diff)| (source.node, source.port, time.clone(), *diff))
.collect::<Vec<_>>();

if !source_changes.is_empty() {
logger.log_source_pointstamp_updates(Box::new(source_changes));
if !changes.is_empty() {
logger.log_pointstamp_updates(Box::new(changes));
}
}

Expand Down Expand Up @@ -699,25 +692,14 @@ impl<T:Timestamp> Tracker<T> {

// Step 3: If logging is enabled, construct and log outbound changes.
if let Some(logger) = &mut self.logger {
let mut target_changes = Vec::new();
let mut source_changes = Vec::new();

for ((location, time), diff) in self.pushed_changes.iter() {
match location.port {
Port::Target(port) => {
target_changes.push((location.node, port, time.clone(), *diff))
}
Port::Source(port) => {
source_changes.push((location.node, port, time.clone(), *diff))
}
}
}
let changes: Vec<_> = self
.pushed_changes
.iter()
.map(|((location, time), diff)| (*location, time.clone(), *diff))
.collect();

if !target_changes.is_empty() || !source_changes.is_empty() {
logger.log_frontier_updates(
Box::new(target_changes),
Box::new(source_changes),
);
if !changes.is_empty() {
logger.log_frontier_updates(Box::new(changes));
}
}

Expand Down Expand Up @@ -864,112 +846,60 @@ pub mod logging {
Self { path, logger }
}

/// Log source pointstamp update events with additional identifying information.
pub fn log_source_pointstamp_updates(&mut self, updates: Box<dyn ProgressEventTimestampVec>) {
self.logger.log({
SourcePointstampUpdate {
/// Log pointstamp update events with additional identifying information.
pub fn log_pointstamp_updates(&mut self, updates: Box<dyn ProgressEventTimestampVec>) {
self.logger.log(
PointstampUpdates {
tracker_id: self.path.clone(),
updates,
}
})
)
}
/// Log target pointstamp update events with additional identifying information.
pub fn log_target_pointstamp_updates(&mut self, updates: Box<dyn ProgressEventTimestampVec>) {
self.logger.log({
TargetPointstampUpdate {

/// Log frontier update events with additional identifying information.
pub fn log_frontier_updates(&mut self, updates: Box<dyn ProgressEventTimestampVec>) {
self.logger.log(
FrontierUpdates {
tracker_id: self.path.clone(),
updates,
}
})
}

/// Log frontier update events with additional identifying information.
///
/// We want to log source and target updates at the same time to ensure callers observe
/// consistent frontiers at any point in time.
pub fn log_frontier_updates(
&mut self,
source_updates: Box<dyn ProgressEventTimestampVec>,
target_updates: Box<dyn ProgressEventTimestampVec>,
) {
let source_event: TrackerEvent = SourceFrontierUpdate {
tracker_id: self.path.clone(),
updates: source_updates,
}.into();
let target_event: TrackerEvent = TargetFrontierUpdate {
tracker_id: self.path.clone(),
updates: target_updates,
}.into();

self.logger.log_many([source_event, target_event]);
)
}
}

/// Events that the tracker may record.
pub enum TrackerEvent {
/// Pointstamp updates made at a source of data.
SourcePointstampUpdate(SourcePointstampUpdate),
/// Pointstamp updates made at a target of data.
TargetPointstampUpdate(TargetPointstampUpdate),
/// Frontier updates made at a source of data.
SourceFrontierUpdate(SourceFrontierUpdate),
/// Frontier updates made at a target of data.
TargetFrontierUpdate(TargetFrontierUpdate),
}

/// A pointstamp update made at a source of data.
pub struct SourcePointstampUpdate {
/// An identifier for the tracker.
pub tracker_id: Vec<usize>,
/// Updates themselves, as `(node, port, time, diff)`.
pub updates: Box<dyn ProgressEventTimestampVec>,
}

/// A pointstamp update made at a target of data.
pub struct TargetPointstampUpdate {
/// An identifier for the tracker.
pub tracker_id: Vec<usize>,
/// Updates themselves, as `(node, port, time, diff)`.
pub updates: Box<dyn ProgressEventTimestampVec>,
/// Pointstamp updates made.
PointstampUpdates(PointstampUpdates),
/// Frontier updates made.
FrontierUpdates(FrontierUpdates),
}

/// A frontier update at a source of data.
pub struct SourceFrontierUpdate {
/// Pointstamp updates reported by a tracker.
pub struct PointstampUpdates {
/// An identifier for the tracker.
pub tracker_id: Vec<usize>,
/// Updates themselves, as `(node, port, time, diff)`.
/// Updates themselves, as `(location, time, diff)`.
pub updates: Box<dyn ProgressEventTimestampVec>,
}

/// A frontier update at a target of data.
pub struct TargetFrontierUpdate {
/// Frontier updates reported by a tracker.
pub struct FrontierUpdates {
/// An identifier for the tracker.
pub tracker_id: Vec<usize>,
/// Updates themselves, as `(node, port, time, diff)`.
/// Updates themselves, as `(location, time, diff)`.
pub updates: Box<dyn ProgressEventTimestampVec>,
}

impl From<SourcePointstampUpdate> for TrackerEvent {
fn from(v: SourcePointstampUpdate) -> Self {
Self::SourcePointstampUpdate(v)
}
}

impl From<TargetPointstampUpdate> for TrackerEvent {
fn from(v: TargetPointstampUpdate) -> Self {
Self::TargetPointstampUpdate(v)
impl From<PointstampUpdates> for TrackerEvent {
fn from(v: PointstampUpdates) -> Self {
Self::PointstampUpdates(v)
}
}

impl From<SourceFrontierUpdate> for TrackerEvent {
fn from(v: SourceFrontierUpdate) -> Self {
Self::SourceFrontierUpdate(v)
}
}

impl From<TargetFrontierUpdate> for TrackerEvent {
fn from(v: TargetFrontierUpdate) -> Self {
Self::TargetFrontierUpdate(v)
impl From<FrontierUpdates> for TrackerEvent {
fn from(v: FrontierUpdates) -> Self {
Self::FrontierUpdates(v)
}
}
}
Expand All @@ -988,52 +918,46 @@ impl<T: Timestamp> Drop for Tracker<T> {
};

// Retract pending data that `propagate_all` would normally log.
let mut target_pointstamp_changes = Vec::new();
let mut source_pointstamp_changes = Vec::new();
let mut target_frontier_changes = Vec::new();
let mut source_frontier_changes = Vec::new();
let mut pointstamp_changes = Vec::new();
let mut frontier_changes = Vec::new();

for (index, per_operator) in self.per_operator.iter_mut().enumerate() {
for (port, target) in per_operator.targets.iter_mut().enumerate() {
let location = Location::new_target(index, port);
let pointstamp_retractions = target.pointstamps
.updates()
.map(|(time, diff)| (index, port, time.clone(), -diff));
target_pointstamp_changes.extend(pointstamp_retractions);
.map(|(time, diff)| (location, time.clone(), -diff));
pointstamp_changes.extend(pointstamp_retractions);

let frontier = target.implications.frontier().to_owned();
let frontier_retractions = frontier
.into_iter()
.map(|time| (index, port, time, -1));
target_frontier_changes.extend(frontier_retractions);
.map(|time| (location, time, -1));
frontier_changes.extend(frontier_retractions);
}
}

for (index, per_operator) in self.per_operator.iter_mut().enumerate() {
for (port, source) in per_operator.sources.iter_mut().enumerate() {
let location = Location::new_source(index, port);
let pointstamp_retractions = source.pointstamps
.updates()
.map(|(time, diff)| (index, port, time.clone(), -diff));
source_pointstamp_changes.extend(pointstamp_retractions);
.map(|(time, diff)| (location, time.clone(), -diff));
pointstamp_changes.extend(pointstamp_retractions);

let frontier = source.implications.frontier().to_owned();
let frontier_retractions = frontier
.into_iter()
.map(|time| (index, port, time, -1));
source_frontier_changes.extend(frontier_retractions);
.map(|time| (location, time, -1));
frontier_changes.extend(frontier_retractions);
}
}

if !target_pointstamp_changes.is_empty() {
logger.log_target_pointstamp_updates(Box::new(target_pointstamp_changes));
}
if !source_pointstamp_changes.is_empty() {
logger.log_source_pointstamp_updates(Box::new(source_pointstamp_changes));
if !pointstamp_changes.is_empty() {
logger.log_pointstamp_updates(Box::new(pointstamp_changes));
}
if !source_frontier_changes.is_empty() || !target_frontier_changes.is_empty() {
logger.log_frontier_updates(
Box::new(source_frontier_changes),
Box::new(target_frontier_changes),
);
if !frontier_changes.is_empty() {
logger.log_frontier_updates(Box::new(frontier_changes));
}
}
}

0 comments on commit 7b5407e

Please sign in to comment.