Skip to content

Commit

Permalink
Log size/capacity/allocations from columnar merge batcher
Browse files Browse the repository at this point in the history
This logs size/capacity/allocation changes from the columnar merge batcher.

Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Nov 29, 2023
1 parent 3182200 commit b6e23da
Show file tree
Hide file tree
Showing 6 changed files with 65 additions and 8 deletions.
17 changes: 17 additions & 0 deletions src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ pub enum DifferentialEvent {
MergeShortfall(MergeShortfall),
/// Trace sharing event.
TraceShare(TraceShare),
/// Batcher size event
Batcher(BatcherEvent),
}

/// Either the start or end of a merge event.
Expand All @@ -43,6 +45,21 @@ pub struct BatchEvent {
impl From<BatchEvent> for DifferentialEvent { fn from(e: BatchEvent) -> Self { DifferentialEvent::Batch(e) } }


/// Either the start or end of a merge event.
#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
pub struct BatcherEvent {
/// Operator identifier.
pub operator: usize,
/// Change in used size.
pub size_diff: isize,
/// Change in capacity.
pub capacity_diff: isize,
/// Change in number of allocations.
pub allocations_diff: isize,
}

impl From<BatcherEvent> for DifferentialEvent { fn from(e: BatcherEvent) -> Self { DifferentialEvent::Batcher(e) } }

/// Either the start or end of a merge event.
#[derive(Debug, Clone, Abomonation, Ord, PartialOrd, Eq, PartialEq)]
pub struct DropEvent {
Expand Down
2 changes: 1 addition & 1 deletion src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -545,7 +545,7 @@ where
};

// Where we will deposit received updates, and from which we extract batches.
let mut batcher = Tr::Batcher::new();
let mut batcher = Tr::Batcher::new(logger.clone(), info.global_id);

// Capabilities for the lower envelope of updates in `batcher`.
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
Expand Down
5 changes: 4 additions & 1 deletion src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
use std::collections::VecDeque;

use timely::communication::message::RefOrMut;
use timely::logging::WorkerIdentifier;
use timely::logging_core::Logger;
use timely::progress::{frontier::Antichain, Timestamp};

use ::difference::Semigroup;
use logging::DifferentialEvent;

use trace::{Batcher, Builder};

Expand All @@ -26,7 +29,7 @@ where
type Item = ((K,V),T,D);
type Time = T;

fn new() -> Self {
fn new(_logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, _operator_id: usize) -> Self {
MergeBatcher {
sorter: MergeSorter::new(),
frontier: Antichain::new(),
Expand Down
42 changes: 38 additions & 4 deletions src/trace/implementations/merge_batcher_col.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@
use timely::Container;
use timely::communication::message::RefOrMut;
use timely::container::columnation::{Columnation, TimelyStack};
use timely::logging::WorkerIdentifier;
use timely::logging_core::Logger;
use timely::progress::{frontier::Antichain, Timestamp};

use ::difference::Semigroup;
use logging::{BatcherEvent, DifferentialEvent};

use trace::{Batcher, Builder};

Expand All @@ -32,9 +35,9 @@ where
type Item = ((K,V),T,D);
type Time = T;

fn new() -> Self {
fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self {
ColumnatedMergeBatcher {
sorter: MergeSorterColumnation::new(),
sorter: MergeSorterColumnation::new(logger, operator_id),
frontier: Antichain::new(),
lower: Antichain::from_elem(<T as Timestamp>::minimum()),
}
Expand Down Expand Up @@ -186,6 +189,8 @@ struct MergeSorterColumnation<D: Columnation, T: Columnation, R: Columnation> {
queue: Vec<Vec<TimelyStack<(D, T, R)>>>, // each power-of-two length list of allocations.
stash: Vec<TimelyStack<(D, T, R)>>,
pending: Vec<(D, T, R)>,
logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>,
operator_id: usize,
}

impl<D: Ord+Columnation+'static, T: Ord+Columnation+'static, R: Semigroup+Columnation+'static> MergeSorterColumnation<D, T, R> {
Expand All @@ -209,11 +214,36 @@ impl<D: Ord+Columnation+'static, T: Ord+Columnation+'static, R: Semigroup+Column
Self::buffer_size() * 2
}

fn new() -> Self {
fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self {
Self {
logger,
operator_id,
queue: Vec::new(),
stash: Vec::new(),
pending: Vec::new()
pending: Vec::new(),
}
}

/// Account size changes. Only performs work if a logger exists.
///
/// Calculate the size based on the [`TimelyStack`]s passed along, with each attribute
/// multiplied by `diff`. Usually, one wants to pass 1 or -1 as the diff.
fn account<'a, I: IntoIterator<Item=&'a TimelyStack<(D, T, R)>>>(&self, items: I, diff: isize) {
if let Some(logger) = &self.logger {
let (mut siz, mut capacity, mut allocations) = (0isize, 0isize, 0isize);
for stack in items {
stack.heap_size(|s, c| {
siz = siz.saturating_add_unsigned(s);
capacity = capacity.saturating_add_unsigned(c);
allocations += isize::from(c > 0);
});
}
logger.log(BatcherEvent {
operator: self.operator_id,
size_diff: siz * diff,
capacity_diff: capacity * diff,
allocations_diff: allocations * diff,
})
}
}

Expand Down Expand Up @@ -261,11 +291,13 @@ impl<D: Ord+Columnation+'static, T: Ord+Columnation+'static, R: Semigroup+Column
for tuple in self.pending.drain(..) {
stack.copy(&tuple);
}
self.account([&stack], 1);
self.queue.push(vec![stack]);
while self.queue.len() > 1 && (self.queue[self.queue.len()-1].len() >= self.queue[self.queue.len()-2].len() / 2) {
let list1 = self.queue.pop().unwrap();
let list2 = self.queue.pop().unwrap();
let merged = self.merge_by(list1, list2);
self.account(&merged, 1);
self.queue.push(merged);
}
}
Expand All @@ -278,6 +310,7 @@ impl<D: Ord+Columnation+'static, T: Ord+Columnation+'static, R: Semigroup+Column
let list1 = self.queue.pop().unwrap();
let list2 = self.queue.pop().unwrap();
let merged = self.merge_by(list1, list2);
self.account(&merged, 1);
self.queue.push(merged);
}
self.queue.push(list);
Expand All @@ -300,6 +333,7 @@ impl<D: Ord+Columnation+'static, T: Ord+Columnation+'static, R: Semigroup+Column

// merges two sorted input lists into one sorted output list.
fn merge_by(&mut self, list1: Vec<TimelyStack<(D, T, R)>>, list2: Vec<TimelyStack<(D, T, R)>>) -> Vec<TimelyStack<(D, T, R)>> {
self.account(list1.iter().chain(list2.iter()), -1);

use std::cmp::Ordering;

Expand Down
5 changes: 4 additions & 1 deletion src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,11 @@ pub mod layers;
pub mod wrappers;

use timely::communication::message::RefOrMut;
use timely::logging::WorkerIdentifier;
use timely::logging_core::Logger;
use timely::progress::{Antichain, frontier::AntichainRef};
use timely::progress::Timestamp;
use logging::DifferentialEvent;

// use ::difference::Semigroup;
pub use self::cursor::Cursor;
Expand Down Expand Up @@ -304,7 +307,7 @@ pub trait Batcher {
/// Times at which batches are formed.
type Time: Timestamp;
/// Allocates a new empty batcher.
fn new() -> Self;
fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self;
/// Adds an unordered batch of elements to the batcher.
fn push_batch(&mut self, batch: RefOrMut<Vec<Self::Item>>);
/// Returns all updates not greater or equal to an element of `upper`.
Expand Down
2 changes: 1 addition & 1 deletion tests/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ fn get_trace() -> ValSpine<u64, u64, usize, i64> {
let op_info = OperatorInfo::new(0, 0, &[]);
let mut trace = IntegerTrace::new(op_info, None, None);
{
let mut batcher = <IntegerTrace as Trace>::Batcher::new();
let mut batcher = <IntegerTrace as Trace>::Batcher::new(None, 0);

use timely::communication::message::RefOrMut;
batcher.push_batch(RefOrMut::Mut(&mut vec![
Expand Down

0 comments on commit b6e23da

Please sign in to comment.