Skip to content

Commit

Permalink
Merge pull request #19560 from teskje/dataflow-drop-duration
Browse files Browse the repository at this point in the history
Introspection of dataflow shutdown durations
  • Loading branch information
teskje authored Jun 2, 2023
2 parents b19cd44 + 9f0a03e commit ee878f6
Show file tree
Hide file tree
Showing 13 changed files with 203 additions and 44 deletions.
33 changes: 21 additions & 12 deletions doc/user/content/sql/system-catalog/mz_internal.md
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,21 @@ The `mz_dataflow_addresses` view describes how the [dataflow] channels and opera
| `id` | [`bigint`] | The ID of the channel or operator. Corresponds to [`mz_dataflow_channels.id`](#mz_dataflow_channels) or [`mz_dataflow_operators.id`](#mz_dataflow_operators). |
| `address` | [`bigint list`] | A list of scope-local indexes indicating the path from the root to this channel or operator. |

### `mz_dataflow_arrangement_sizes`

The `mz_dataflow_arrangement_sizes` view describes how many records and batches
are contained in operators under each dataflow.

| Field | Type | Meaning |
|---------------|------------|------------------------------------------------------------------------------|
| `id` | [`bigint`] | The ID of the [dataflow]. Corresponds to [`mz_dataflows.id`](#mz_dataflows). |
| `name` | [`bigint`] | The name of the object (e.g., index) maintained by the dataflow. |
| `records` | [`bigint`] | The number of records in all arrangements in the dataflow. |
| `batches` | [`bigint`] | The number of batches in all arrangements in the dataflow. |
| `size` | [`bigint`] | The utilized size in bytes of the arrangements. |
| `capacity` | [`bigint`] | The capacity in bytes of the arrangements. Can be larger than the size. |
| `allocations` | [`bigint`] | The number of separate memory allocations backing the arrangements. |

### `mz_dataflow_channels`

The `mz_dataflow_channels` view describes the communication channels between [dataflow] operators.
Expand Down Expand Up @@ -469,20 +484,14 @@ The `mz_dataflow_operator_parents` view describes how [dataflow] operators are n
| `id` | [`bigint`] | The ID of the operator. Corresponds to [`mz_dataflow_operators.id`](#mz_dataflow_operators). |
| `parent_id` | [`bigint`] | The ID of the operator's parent operator. Corresponds to [`mz_dataflow_operators.id`](#mz_dataflow_operators). |

### `mz_dataflow_arrangement_sizes`
### `mz_dataflow_shutdown_durations_histogram`

The `mz_dataflow_arrangement_sizes` view describes how many records and batches
are contained in operators under each dataflow.
The `mz_dataflow_shutdown_durations_histogram` view describes a histogram of the time in nanoseconds required to fully shut down dropped [dataflows][dataflow].

| Field | Type | Meaning |
|---------------|------------|------------------------------------------------------------------------------|
| `id` | [`bigint`] | The ID of the [dataflow]. Corresponds to [`mz_dataflows.id`](#mz_dataflows). |
| `name` | [`bigint`] | The name of the object (e.g., index) maintained by the dataflow. |
| `records` | [`bigint`] | The number of records in all arrangements in the dataflow. |
| `batches` | [`bigint`] | The number of batches in all arrangements in the dataflow. |
| `size` | [`bigint`] | The utilized size in bytes of the arrangements. |
| `capacity` | [`bigint`] | The capacity in bytes of the arrangements. Can be larger than the size. |
| `allocations` | [`bigint`] | The number of separate memory allocations backing the arrangements. |
| Field | Type | Meaning |
| -------------- | ------------ | -------- |
| `duration_ns` | [`bigint`] | The upper bound of the bucket in nanoseconds. |
| `count` | [`bigint`] | The (noncumulative) count of dataflows in this bucket. |

### `mz_message_counts`

Expand Down
31 changes: 31 additions & 0 deletions src/adapter/src/catalog/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1297,6 +1297,12 @@ pub const MZ_PEEK_DURATIONS_HISTOGRAM_RAW: BuiltinLog = BuiltinLog {
variant: LogVariant::Compute(ComputeLog::PeekDuration),
};

pub const MZ_DATAFLOW_SHUTDOWN_DURATIONS_HISTOGRAM_RAW: BuiltinLog = BuiltinLog {
name: "mz_dataflow_shutdown_durations_histogram_raw",
schema: MZ_INTERNAL_SCHEMA,
variant: LogVariant::Compute(ComputeLog::ShutdownDuration),
};

pub const MZ_ARRANGEMENT_HEAP_SIZE_RAW: BuiltinLog = BuiltinLog {
name: "mz_arrangement_heap_size_raw",
schema: MZ_INTERNAL_SCHEMA,
Expand Down Expand Up @@ -2639,6 +2645,28 @@ FROM mz_internal.mz_peek_durations_histogram_per_worker
GROUP BY duration_ns",
};

pub const MZ_DATAFLOW_SHUTDOWN_DURATIONS_HISTOGRAM_PER_WORKER: BuiltinView = BuiltinView {
name: "mz_dataflow_shutdown_durations_histogram_per_worker",
schema: MZ_INTERNAL_SCHEMA,
sql: "CREATE VIEW mz_internal.mz_dataflow_shutdown_durations_histogram_per_worker AS SELECT
worker_id, duration_ns, pg_catalog.count(*) AS count
FROM
mz_internal.mz_dataflow_shutdown_durations_histogram_raw
GROUP BY
worker_id, duration_ns",
};

pub const MZ_DATAFLOW_SHUTDOWN_DURATIONS_HISTOGRAM: BuiltinView = BuiltinView {
name: "mz_dataflow_shutdown_durations_histogram",
schema: MZ_INTERNAL_SCHEMA,
sql: "CREATE VIEW mz_internal.mz_dataflow_shutdown_durations_histogram AS
SELECT
duration_ns,
pg_catalog.sum(count) AS count
FROM mz_internal.mz_dataflow_shutdown_durations_histogram_per_worker
GROUP BY duration_ns",
};

pub const MZ_SCHEDULING_ELAPSED_PER_WORKER: BuiltinView = BuiltinView {
name: "mz_scheduling_elapsed_per_worker",
schema: MZ_INTERNAL_SCHEMA,
Expand Down Expand Up @@ -3760,6 +3788,7 @@ pub static BUILTINS_STATIC: Lazy<Vec<Builtin<NameReference>>> = Lazy::new(|| {
Builtin::Log(&MZ_MESSAGE_COUNTS_SENT_RAW),
Builtin::Log(&MZ_ACTIVE_PEEKS_PER_WORKER),
Builtin::Log(&MZ_PEEK_DURATIONS_HISTOGRAM_RAW),
Builtin::Log(&MZ_DATAFLOW_SHUTDOWN_DURATIONS_HISTOGRAM_RAW),
Builtin::Log(&MZ_ARRANGEMENT_HEAP_CAPACITY_RAW),
Builtin::Log(&MZ_ARRANGEMENT_HEAP_ALLOCATIONS_RAW),
Builtin::Log(&MZ_ARRANGEMENT_HEAP_SIZE_RAW),
Expand Down Expand Up @@ -3848,6 +3877,8 @@ pub static BUILTINS_STATIC: Lazy<Vec<Builtin<NameReference>>> = Lazy::new(|| {
Builtin::View(&MZ_RECORDS_PER_DATAFLOW),
Builtin::View(&MZ_PEEK_DURATIONS_HISTOGRAM_PER_WORKER),
Builtin::View(&MZ_PEEK_DURATIONS_HISTOGRAM),
Builtin::View(&MZ_DATAFLOW_SHUTDOWN_DURATIONS_HISTOGRAM_PER_WORKER),
Builtin::View(&MZ_DATAFLOW_SHUTDOWN_DURATIONS_HISTOGRAM),
Builtin::View(&MZ_SCHEDULING_ELAPSED_PER_WORKER),
Builtin::View(&MZ_SCHEDULING_ELAPSED),
Builtin::View(&MZ_SCHEDULING_PARKS_HISTOGRAM_PER_WORKER),
Expand Down
1 change: 1 addition & 0 deletions src/compute-client/src/logging.proto
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ message ProtoComputeLog {
google.protobuf.Empty arrangement_heap_size = 8;
google.protobuf.Empty arrangement_heap_capacity = 9;
google.protobuf.Empty arrangement_heap_allocations = 10;
google.protobuf.Empty shutdown_duration = 11;
}
}
message ProtoLogVariant {
Expand Down
8 changes: 8 additions & 0 deletions src/compute-client/src/logging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,7 @@ pub enum ComputeLog {
ArrangementHeapSize,
ArrangementHeapCapacity,
ArrangementHeapAllocations,
ShutdownDuration,
}

impl RustType<ProtoComputeLog> for ComputeLog {
Expand All @@ -240,6 +241,7 @@ impl RustType<ProtoComputeLog> for ComputeLog {
ComputeLog::ArrangementHeapSize => ArrangementHeapSize(()),
ComputeLog::ArrangementHeapCapacity => ArrangementHeapCapacity(()),
ComputeLog::ArrangementHeapAllocations => ArrangementHeapAllocations(()),
ComputeLog::ShutdownDuration => ShutdownDuration(()),
}),
}
}
Expand All @@ -257,6 +259,7 @@ impl RustType<ProtoComputeLog> for ComputeLog {
Some(ArrangementHeapSize(())) => Ok(ComputeLog::ArrangementHeapSize),
Some(ArrangementHeapCapacity(())) => Ok(ComputeLog::ArrangementHeapCapacity),
Some(ArrangementHeapAllocations(())) => Ok(ComputeLog::ArrangementHeapAllocations),
Some(ShutdownDuration(())) => Ok(ComputeLog::ShutdownDuration),
None => Err(TryFromProtoError::missing_field("ProtoComputeLog::kind")),
}
}
Expand Down Expand Up @@ -419,6 +422,10 @@ impl LogVariant {
LogVariant::Compute(ComputeLog::PeekDuration) => RelationDesc::empty()
.with_column("worker_id", ScalarType::UInt64.nullable(false))
.with_column("duration_ns", ScalarType::UInt64.nullable(false)),

LogVariant::Compute(ComputeLog::ShutdownDuration) => RelationDesc::empty()
.with_column("worker_id", ScalarType::UInt64.nullable(false))
.with_column("duration_ns", ScalarType::UInt64.nullable(false)),
}
}

Expand Down Expand Up @@ -471,6 +478,7 @@ impl LogVariant {
LogVariant::Compute(ComputeLog::FrontierDelay) => vec![],
LogVariant::Compute(ComputeLog::PeekCurrent) => vec![],
LogVariant::Compute(ComputeLog::PeekDuration) => vec![],
LogVariant::Compute(ComputeLog::ShutdownDuration) => vec![],
}
}
}
Expand Down
82 changes: 80 additions & 2 deletions src/compute/src/logging/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@

use std::any::Any;
use std::cell::RefCell;
use std::collections::{BTreeMap, VecDeque};
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::rc::Rc;
use std::time::Duration;

Expand Down Expand Up @@ -113,6 +113,11 @@ pub enum ComputeEvent {
/// Operator index
operator: usize,
},
/// All operators of a dataflow have shut down.
DataflowShutdown {
/// Timely worker index of the dataflow.
dataflow_index: usize,
},
}

/// A logged peek event.
Expand Down Expand Up @@ -176,6 +181,7 @@ pub(super) fn construct<A: Allocate + 'static>(
let (mut frontier_delay_out, frontier_delay) = demux.new_output();
let (mut peek_out, peek) = demux.new_output();
let (mut peek_duration_out, peek_duration) = demux.new_output();
let (mut shutdown_duration_out, shutdown_duration) = demux.new_output();
let (mut arrangement_heap_size_out, arrangement_heap_size) = demux.new_output();
let (mut arrangement_heap_capacity_out, arrangement_heap_capacity) = demux.new_output();
let (mut arrangement_heap_allocations_out, arrangement_heap_allocations) =
Expand All @@ -192,6 +198,7 @@ pub(super) fn construct<A: Allocate + 'static>(
let mut frontier_delay = frontier_delay_out.activate();
let mut peek = peek_out.activate();
let mut peek_duration = peek_duration_out.activate();
let mut shutdown_duration = shutdown_duration_out.activate();
let mut arrangement_heap_size = arrangement_heap_size_out.activate();
let mut arrangement_heap_capacity = arrangement_heap_capacity_out.activate();
let mut arrangement_heap_allocations = arrangement_heap_allocations_out.activate();
Expand All @@ -207,6 +214,7 @@ pub(super) fn construct<A: Allocate + 'static>(
frontier_delay: frontier_delay.session(&cap),
peek: peek.session(&cap),
peek_duration: peek_duration.session(&cap),
shutdown_duration: shutdown_duration.session(&cap),
arrangement_heap_size: arrangement_heap_size.session(&cap),
arrangement_heap_capacity: arrangement_heap_capacity.session(&cap),
arrangement_heap_allocations: arrangement_heap_allocations.session(&cap),
Expand Down Expand Up @@ -280,7 +288,13 @@ pub(super) fn construct<A: Allocate + 'static>(
let peek_duration = peek_duration.as_collection().map(move |bucket| {
Row::pack_slice(&[
Datum::UInt64(u64::cast_from(worker_id)),
Datum::UInt64(bucket.try_into().expect("pow too big")),
Datum::UInt64(bucket.try_into().expect("bucket too big")),
])
});
let shutdown_duration = shutdown_duration.as_collection().map(move |bucket| {
Row::pack_slice(&[
Datum::UInt64(u64::cast_from(worker_id)),
Datum::UInt64(bucket.try_into().expect("bucket too big")),
])
});

Expand Down Expand Up @@ -312,6 +326,7 @@ pub(super) fn construct<A: Allocate + 'static>(
(FrontierDelay, frontier_delay),
(PeekCurrent, peek_current),
(PeekDuration, peek_duration),
(ShutdownDuration, shutdown_duration),
(ArrangementHeapSize, arrangement_heap_size),
(ArrangementHeapCapacity, arrangement_heap_capacity),
(ArrangementHeapAllocations, arrangement_heap_allocations),
Expand Down Expand Up @@ -361,6 +376,12 @@ struct DemuxState<A: Allocate> {
export_dataflows: BTreeMap<GlobalId, usize>,
/// Maps dataflow exports to their imports and frontier delay tracking state.
export_imports: BTreeMap<GlobalId, BTreeMap<GlobalId, FrontierDelayState>>,
/// Maps live dataflows to counts of their exports.
dataflow_export_counts: BTreeMap<usize, u32>,
/// Maps dropped dataflows to their drop time (in ns).
dataflow_drop_times: BTreeMap<usize, u128>,
/// Contains dataflows that have shut down but not yet been dropped.
shutdown_dataflows: BTreeSet<usize>,
/// Maps pending peeks to their installation time (in ns).
peek_stash: BTreeMap<Uuid, u128>,
/// Arrangement size stash
Expand All @@ -373,6 +394,9 @@ impl<A: Allocate> DemuxState<A> {
worker,
export_dataflows: Default::default(),
export_imports: Default::default(),
dataflow_export_counts: Default::default(),
dataflow_drop_times: Default::default(),
shutdown_dataflows: Default::default(),
peek_stash: Default::default(),
arrangement_size: Default::default(),
}
Expand Down Expand Up @@ -403,6 +427,7 @@ struct DemuxOutput<'a> {
frontier_delay: OutputSession<'a, FrontierDelayDatum>,
peek: OutputSession<'a, Peek>,
peek_duration: OutputSession<'a, u128>,
shutdown_duration: OutputSession<'a, u128>,
arrangement_heap_size: OutputSession<'a, ArrangementHeapDatum>,
arrangement_heap_capacity: OutputSession<'a, ArrangementHeapDatum>,
arrangement_heap_allocations: OutputSession<'a, ArrangementHeapDatum>,
Expand Down Expand Up @@ -514,6 +539,7 @@ impl<A: Allocate> DemuxHandler<'_, '_, A> {
ArrangementHeapSizeOperatorDrop { operator } => {
self.handle_arrangement_heap_size_operator_dropped(operator)
}
DataflowShutdown { dataflow_index } => self.handle_dataflow_shutdown(dataflow_index),
}
}

Expand All @@ -524,13 +550,30 @@ impl<A: Allocate> DemuxHandler<'_, '_, A> {

self.state.export_dataflows.insert(id, dataflow_id);
self.state.export_imports.insert(id, BTreeMap::new());
*self
.state
.dataflow_export_counts
.entry(dataflow_id)
.or_default() += 1;
}

fn handle_export_dropped(&mut self, id: GlobalId) {
let ts = self.ts();
if let Some(dataflow_id) = self.state.export_dataflows.remove(&id) {
let datum = ExportDatum { id, dataflow_id };
self.output.export.give((datum, ts, -1));

match self.state.dataflow_export_counts.get_mut(&dataflow_id) {
entry @ Some(0) | entry @ None => {
error!(
export = ?id,
dataflow = ?dataflow_id,
"invalid dataflow_export_counts entry at time of export drop: {entry:?}",
);
}
Some(1) => self.handle_dataflow_dropped(dataflow_id),
Some(count) => *count -= 1,
}
} else {
error!(
export = ?id,
Expand Down Expand Up @@ -564,6 +607,41 @@ impl<A: Allocate> DemuxHandler<'_, '_, A> {
}
}

fn handle_dataflow_dropped(&mut self, id: usize) {
self.state.dataflow_export_counts.remove(&id);

if self.state.shutdown_dataflows.remove(&id) {
// Dataflow has already shut down before it was dropped.
self.output.shutdown_duration.give((0, self.ts(), 1));
} else {
// Dataflow has not yet shut down.
let existing = self
.state
.dataflow_drop_times
.insert(id, self.time.as_nanos());
if existing.is_some() {
error!(dataflow = ?id, "dataflow already dropped");
}
}
}

fn handle_dataflow_shutdown(&mut self, id: usize) {
if let Some(start) = self.state.dataflow_drop_times.remove(&id) {
// Dataflow has alredy been dropped.
let elapsed_ns = self.time.as_nanos() - start;
let elapsed_pow = elapsed_ns.next_power_of_two();
self.output
.shutdown_duration
.give((elapsed_pow, self.ts(), 1));
} else {
// Dataflow has not yet been dropped.
let was_new = self.state.shutdown_dataflows.insert(id);
if !was_new {
error!(dataflow = ?id, "dataflow already shutdown");
}
}
}

fn handle_export_dependency(&mut self, export_id: GlobalId, import_id: GlobalId) {
let ts = self.ts();
let datum = DependencyDatum {
Expand Down
28 changes: 13 additions & 15 deletions src/compute/src/logging/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ impl<A: Allocate + 'static> LoggingContext<'_, A> {
self.worker,
self.config,
self.t_event_queue.clone(),
Rc::clone(&self.shared_state),
));
traces.extend(super::reachability::construct(
self.worker,
Expand Down Expand Up @@ -131,21 +132,18 @@ impl<A: Allocate + 'static> LoggingContext<'_, A> {
}

fn register_loggers(&self) {
self.worker
.log_register()
.insert_logger("timely", self.simple_logger(self.t_event_queue.clone()));
self.worker
.log_register()
.insert_logger("timely/reachability", self.reachability_logger());
self.worker.log_register().insert_logger(
"differential/arrange",
self.simple_logger(self.d_event_queue.clone()),
);
let compute_logger = self.simple_logger(self.c_event_queue.clone());
self.worker
.log_register()
.insert_logger("materialize/compute", compute_logger.clone());
self.shared_state.borrow_mut().compute_logger = Some(compute_logger);
let t_logger = self.simple_logger(self.t_event_queue.clone());
let r_logger = self.reachability_logger();
let d_logger = self.simple_logger(self.d_event_queue.clone());
let c_logger = self.simple_logger(self.c_event_queue.clone());

let mut register = self.worker.log_register();
register.insert_logger("timely", t_logger);
register.insert_logger("timely/reachability", r_logger);
register.insert_logger("differential/arrange", d_logger);
register.insert_logger("materialize/compute", c_logger.clone());

self.shared_state.borrow_mut().compute_logger = Some(c_logger);
}

fn simple_logger<E: 'static>(&self, event_queue: EventQueue<E>) -> Logger<E> {
Expand Down
Loading

0 comments on commit ee878f6

Please sign in to comment.