diff --git a/clippy.toml b/clippy.toml index dc75f17e45f0..d88225b6f1e9 100644 --- a/clippy.toml +++ b/clippy.toml @@ -22,17 +22,22 @@ disallowed-methods = [ { path = "aws_sdk_s3::Client::new", reason = "use the `mz_aws_s3_util::new_client` function instead" }, - # Prevent access to Differential APIs that want to use the default trace or use a default name. + # Prevent access to Differential APIs that want to use the default trace or use a default name, or where we offer + # our own wrapper { path = "differential_dataflow::Collection::consolidate", reason = "use the `differential_dataflow::Collection::consolidate_named` function instead" }, - { path = "differential_dataflow::operators::arrange::arrangement::Arrange::arrange", reason = "use the `arrange_named` function instead" }, - { path = "differential_dataflow::operators::arrange::arrangement::ArrangeByKey::arrange_by_key", reason = "use the `Arrange::arrange_named` function instead" }, - { path = "differential_dataflow::operators::arrange::arrangement::ArrangeByKey::arrange_by_key_named", reason = "use the `Arrange::arrange_named` function instead" }, - { path = "differential_dataflow::operators::arrange::arrangement::ArrangeBySelf::arrange_by_self", reason = "use the `Arrange::arrange_named` function instead" }, - { path = "differential_dataflow::operators::arrange::arrangement::ArrangeBySelf::arrange_by_self_named", reason = "use the `Arrange::arrange_named` function instead" }, + { path = "differential_dataflow::operators::arrange::arrangement::Arrange::arrange", reason = "use the `MzArrange::mz_arrange_named` function instead" }, + { path = "differential_dataflow::operators::arrange::arrangement::Arrange::arrange_named", reason = "use the `MzArrange::mz_arrange_named` function instead" }, + { path = "differential_dataflow::operators::arrange::arrangement::Arrange::arrange_core", reason = "use the `MzArrange::mz_arrange_core` function instead" }, + { path = "differential_dataflow::operators::arrange::arrangement::ArrangeByKey::arrange_by_key", reason = "use the `MzArrange::mz_arrange_named` function instead" }, + { path = "differential_dataflow::operators::arrange::arrangement::ArrangeByKey::arrange_by_key_named", reason = "use the `MzArrange::mz_arrange_named` function instead" }, + { path = "differential_dataflow::operators::arrange::arrangement::ArrangeBySelf::arrange_by_self", reason = "use the `MzArrange::mz_arrange_named` function instead" }, + { path = "differential_dataflow::operators::arrange::arrangement::ArrangeBySelf::arrange_by_self_named", reason = "use the `MzArrange::mz_arrange_named` function instead" }, { path = "differential_dataflow::operators::reduce::Count::count", reason = "use the `differential_dataflow::operators::reduce::ReduceCore::reduce_abelian` function instead" }, { path = "differential_dataflow::operators::reduce::Count::count_core", reason = "use the `differential_dataflow::operators::reduce::ReduceCore::reduce_abelian` function instead" }, { path = "differential_dataflow::operators::reduce::Reduce::reduce", reason = "use the `differential_dataflow::operators::reduce::ReduceCore::reduce_abelian` function instead" }, { path = "differential_dataflow::operators::reduce::Reduce::reduce_named", reason = "use the `differential_dataflow::operators::reduce::ReduceCore::reduce_abelian` function instead" }, + { path = "differential_dataflow::operators::reduce::ReduceCore::reduce_abelian", reason = "use the `differential_dataflow::operators::reduce::ReduceCore::reduce_abelian` function instead" }, + { path = "differential_dataflow::operators::reduce::ReduceCore::reduce_core", reason = "use the `differential_dataflow::operators::reduce::ReduceCore::reduce_abelian` function instead" }, { path = "differential_dataflow::operators::reduce::Threshold::distinct", reason = "use the `differential_dataflow::operators::reduce::ReduceCore::reduce_abelian` function instead" }, { path = "differential_dataflow::operators::reduce::Threshold::distinct_core", reason = "use the `differential_dataflow::operators::reduce::ReduceCore::reduce_abelian` function instead" }, { path = "differential_dataflow::operators::reduce::Threshold::threshold", reason = "use the `differential_dataflow::operators::reduce::ReduceCore::reduce_abelian` function instead" }, diff --git a/doc/user/content/sql/system-catalog/mz_internal.md b/doc/user/content/sql/system-catalog/mz_internal.md index 5b9a6cd184a7..0b9df12dd620 100644 --- a/doc/user/content/sql/system-catalog/mz_internal.md +++ b/doc/user/content/sql/system-catalog/mz_internal.md @@ -371,14 +371,23 @@ The `mz_arrangement_sharing` view describes how many times each [arrangement] in The `mz_arrangement_sizes` view describes the size of each [arrangement] in the system. +The size, capacity, and allocations are an approximation, which may underestimate the actual size in memory. +Specifically, reductions can use more memory than we show here. + -| Field | Type | Meaning | -| -------------- |-------------| -------- | -| `operator_id` | [`uint8`] | The ID of the operator that created the arrangement. Corresponds to [`mz_dataflow_operators.id`](#mz_dataflow_operators). | -| `records` | [`numeric`] | The number of records in the arrangement. | -| `batches` | [`numeric`] | The number of batches in the arrangement. | +| Field | Type | Meaning | +|---------------|-------------| -------- | +| `operator_id` | [`uint8`] | The ID of the operator that created the arrangement. Corresponds to [`mz_dataflow_operators.id`](#mz_dataflow_operators). | +| `records` | [`numeric`] | The number of records in the arrangement. | +| `batches` | [`numeric`] | The number of batches in the arrangement. | +| `size` | [`numeric`] | The utilized size in bytes of the arrangement. | +| `capacity` | [`numeric`] | The capacity in bytes of the arrangement. Can be larger than the size. | +| `allocations` | [`numeric`] | The number of separate memory allocations backing the arrangement. | + + + ### `mz_compute_delays_histogram` @@ -484,6 +493,22 @@ The `mz_dataflow_addresses` view describes how the [dataflow] channels and opera +### `mz_dataflow_arrangement_sizes` + +The `mz_dataflow_arrangement_sizes` view describes how many records and batches +are contained in operators under each dataflow. + + +| Field | Type | Meaning | +|---------------|-------------|------------------------------------------------------------------------------| +| `id` | [`uint8`] | The ID of the [dataflow]. Corresponds to [`mz_dataflows.id`](#mz_dataflows). | +| `name` | [`text`] | The name of the object (e.g., index) maintained by the dataflow. | +| `records` | [`numeric`] | The number of records in all arrangements in the dataflow. | +| `batches` | [`numeric`] | The number of batches in all arrangements in the dataflow. | +| `size` | [`numeric`] | The utilized size in bytes of the arrangements. | +| `capacity` | [`numeric`] | The capacity in bytes of the arrangements. Can be larger than the size. | +| `allocations` | [`numeric`] | The number of separate memory allocations backing the arrangements. | + ### `mz_dataflow_channels` The `mz_dataflow_channels` view describes the communication channels between [dataflow] operators. @@ -553,18 +578,18 @@ The `mz_dataflow_operator_parents` view describes how [dataflow] operators are n -### `mz_dataflow_arrangement_sizes` +### `mz_dataflow_shutdown_durations_histogram` -The `mz_dataflow_arrangement_sizes` view describes how many records and batches -are contained in operators under each dataflow. +The `mz_dataflow_shutdown_durations_histogram` view describes a histogram of the time in nanoseconds required to fully shut down dropped [dataflows][dataflow]. - -| Field | Type | Meaning | -|-----------|-------------|------------------------------------------------------------------------------| -| `id` | [`uint8`] | The ID of the [dataflow]. Corresponds to [`mz_dataflows.id`](#mz_dataflows). | -| `name` | [`text`] | The name of the object (e.g., index) maintained by the dataflow. | -| `records` | [`numeric`] | The number of records in all arrangements in the dataflow. | -| `batches` | [`numeric`] | The number of batches in all arrangements in the dataflow. | + +| Field | Type | Meaning | +| -------------- |-------------| -------- | +| `duration_ns` | [`uint8`] | The upper bound of the bucket in nanoseconds. | +| `count` | [`numeric`] | The (noncumulative) count of dataflows in this bucket. | + + + ### `mz_message_counts` @@ -604,6 +629,9 @@ The `mz_records_per_dataflow` view describes the number of records in each [data | `id` | [`uint8`] | The ID of the dataflow. Corresponds to [`mz_dataflows.id`](#mz_dataflows). | | `name` | [`text`] | The internal name of the dataflow. | | `records` | [`numeric`] | The number of records in the dataflow. | +| `size` | [`numeric`] | The utilized size in bytes of the arrangements. | +| `capacity` | [`numeric`] | The capacity in bytes of the arrangements. Can be larger than the size. | +| `allocations` | [`numeric`] | The number of separate memory allocations backing the arrangements. | @@ -618,6 +646,9 @@ The `mz_records_per_dataflow_operator` view describes the number of records in e | `name` | [`text`] | The internal name of the operator. | | `dataflow_id` | [`uint8`] | The ID of the dataflow. Corresponds to [`mz_dataflows.id`](#mz_dataflows). | | `records` | [`numeric`] | The number of records in the operator. | +| `size` | [`numeric`] | The utilized size in bytes of the arrangement. | +| `capacity` | [`numeric`] | The capacity in bytes of the arrangement. Can be larger than the size. | +| `allocations` | [`numeric`] | The number of separate memory allocations backing the arrangement. | diff --git a/misc/python/materialize/mzcompose/services.py b/misc/python/materialize/mzcompose/services.py index 154bad7d5688..f1b13c2bdc9e 100644 --- a/misc/python/materialize/mzcompose/services.py +++ b/misc/python/materialize/mzcompose/services.py @@ -49,6 +49,7 @@ "persist_stats_filter_enabled": "true", "persist_stats_collection_enabled": "true", "persist_stats_audit_percent": "100", + "enable_arrangement_size_logging": "true", "enable_ld_rbac_checks": "true", "enable_rbac_checks": "true", "enable_monotonic_oneshot_selects": "true", diff --git a/src/adapter/src/catalog/builtin.rs b/src/adapter/src/catalog/builtin.rs index 35bf32cca5fb..4191c2faf41c 100644 --- a/src/adapter/src/catalog/builtin.rs +++ b/src/adapter/src/catalog/builtin.rs @@ -1319,6 +1319,30 @@ pub const MZ_PEEK_DURATIONS_HISTOGRAM_RAW: BuiltinLog = BuiltinLog { variant: LogVariant::Compute(ComputeLog::PeekDuration), }; +pub const MZ_DATAFLOW_SHUTDOWN_DURATIONS_HISTOGRAM_RAW: BuiltinLog = BuiltinLog { + name: "mz_dataflow_shutdown_durations_histogram_raw", + schema: MZ_INTERNAL_SCHEMA, + variant: LogVariant::Compute(ComputeLog::ShutdownDuration), +}; + +pub const MZ_ARRANGEMENT_HEAP_SIZE_RAW: BuiltinLog = BuiltinLog { + name: "mz_arrangement_heap_size_raw", + schema: MZ_INTERNAL_SCHEMA, + variant: LogVariant::Compute(ComputeLog::ArrangementHeapSize), +}; + +pub const MZ_ARRANGEMENT_HEAP_CAPACITY_RAW: BuiltinLog = BuiltinLog { + name: "mz_arrangement_heap_capacity_raw", + schema: MZ_INTERNAL_SCHEMA, + variant: LogVariant::Compute(ComputeLog::ArrangementHeapCapacity), +}; + +pub const MZ_ARRANGEMENT_HEAP_ALLOCATIONS_RAW: BuiltinLog = BuiltinLog { + name: "mz_arrangement_heap_allocations_raw", + schema: MZ_INTERNAL_SCHEMA, + variant: LogVariant::Compute(ComputeLog::ArrangementHeapAllocations), +}; + pub const MZ_MESSAGE_COUNTS_RECEIVED_RAW: BuiltinLog = BuiltinLog { name: "mz_message_counts_received_raw", schema: MZ_INTERNAL_SCHEMA, @@ -2247,28 +2271,21 @@ pub const MZ_RECORDS_PER_DATAFLOW_OPERATOR_PER_WORKER: BuiltinView = BuiltinView name: "mz_records_per_dataflow_operator_per_worker", schema: MZ_INTERNAL_SCHEMA, sql: "CREATE VIEW mz_internal.mz_records_per_dataflow_operator_per_worker AS -WITH records_cte AS ( - SELECT - operator_id, - worker_id, - pg_catalog.count(*) AS records - FROM - mz_internal.mz_arrangement_records_raw - GROUP BY - operator_id, worker_id -) SELECT dod.id, dod.name, dod.worker_id, dod.dataflow_id, - records_cte.records + ar_size.records, + ar_size.size, + ar_size.capacity, + ar_size.allocations FROM - records_cte, + mz_internal.mz_arrangement_sizes_per_worker ar_size, mz_internal.mz_dataflow_operator_dataflows_per_worker dod WHERE - dod.id = records_cte.operator_id AND - dod.worker_id = records_cte.worker_id", + dod.id = ar_size.operator_id AND + dod.worker_id = ar_size.worker_id", }; pub const MZ_RECORDS_PER_DATAFLOW_OPERATOR: BuiltinView = BuiltinView { @@ -2279,7 +2296,10 @@ SELECT id, name, dataflow_id, - pg_catalog.sum(records) AS records + pg_catalog.sum(records) AS records, + pg_catalog.sum(size) AS size, + pg_catalog.sum(capacity) AS capacity, + pg_catalog.sum(allocations) AS allocations FROM mz_internal.mz_records_per_dataflow_operator_per_worker GROUP BY id, name, dataflow_id", }; @@ -2287,11 +2307,15 @@ GROUP BY id, name, dataflow_id", pub const MZ_RECORDS_PER_DATAFLOW_PER_WORKER: BuiltinView = BuiltinView { name: "mz_records_per_dataflow_per_worker", schema: MZ_INTERNAL_SCHEMA, - sql: "CREATE VIEW mz_internal.mz_records_per_dataflow_per_worker AS SELECT + sql: "CREATE VIEW mz_internal.mz_records_per_dataflow_per_worker AS +SELECT rdo.dataflow_id as id, dfs.name, rdo.worker_id, - pg_catalog.SUM(rdo.records) as records + pg_catalog.SUM(rdo.records) as records, + pg_catalog.SUM(rdo.size) as size, + pg_catalog.SUM(rdo.capacity) as capacity, + pg_catalog.SUM(rdo.allocations) as allocations FROM mz_internal.mz_records_per_dataflow_operator_per_worker rdo, mz_internal.mz_dataflows_per_worker dfs @@ -2307,10 +2331,14 @@ GROUP BY pub const MZ_RECORDS_PER_DATAFLOW: BuiltinView = BuiltinView { name: "mz_records_per_dataflow", schema: MZ_INTERNAL_SCHEMA, - sql: "CREATE VIEW mz_internal.mz_records_per_dataflow AS SELECT + sql: "CREATE VIEW mz_internal.mz_records_per_dataflow AS +SELECT id, name, - pg_catalog.SUM(records) as records + pg_catalog.SUM(records) as records, + pg_catalog.SUM(size) as size, + pg_catalog.SUM(capacity) as capacity, + pg_catalog.SUM(allocations) as allocations FROM mz_internal.mz_records_per_dataflow_per_worker GROUP BY @@ -2806,6 +2834,28 @@ FROM mz_internal.mz_peek_durations_histogram_per_worker GROUP BY duration_ns", }; +pub const MZ_DATAFLOW_SHUTDOWN_DURATIONS_HISTOGRAM_PER_WORKER: BuiltinView = BuiltinView { + name: "mz_dataflow_shutdown_durations_histogram_per_worker", + schema: MZ_INTERNAL_SCHEMA, + sql: "CREATE VIEW mz_internal.mz_dataflow_shutdown_durations_histogram_per_worker AS SELECT + worker_id, duration_ns, pg_catalog.count(*) AS count +FROM + mz_internal.mz_dataflow_shutdown_durations_histogram_raw +GROUP BY + worker_id, duration_ns", +}; + +pub const MZ_DATAFLOW_SHUTDOWN_DURATIONS_HISTOGRAM: BuiltinView = BuiltinView { + name: "mz_dataflow_shutdown_durations_histogram", + schema: MZ_INTERNAL_SCHEMA, + sql: "CREATE VIEW mz_internal.mz_dataflow_shutdown_durations_histogram AS +SELECT + duration_ns, + pg_catalog.sum(count) AS count +FROM mz_internal.mz_dataflow_shutdown_durations_histogram_per_worker +GROUP BY duration_ns", +}; + pub const MZ_SCHEDULING_ELAPSED_PER_WORKER: BuiltinView = BuiltinView { name: "mz_scheduling_elapsed_per_worker", schema: MZ_INTERNAL_SCHEMA, @@ -3006,13 +3056,50 @@ records_cte AS ( mz_internal.mz_arrangement_records_raw GROUP BY operator_id, worker_id +), +heap_size_cte AS ( + SELECT + operator_id, + worker_id, + pg_catalog.count(*) AS size + FROM + mz_internal.mz_arrangement_heap_size_raw + GROUP BY + operator_id, worker_id +), +heap_capacity_cte AS ( + SELECT + operator_id, + worker_id, + pg_catalog.count(*) AS capacity + FROM + mz_internal.mz_arrangement_heap_capacity_raw + GROUP BY + operator_id, worker_id +), +heap_allocations_cte AS ( + SELECT + operator_id, + worker_id, + pg_catalog.count(*) AS allocations + FROM + mz_internal.mz_arrangement_heap_allocations_raw + GROUP BY + operator_id, worker_id ) SELECT batches_cte.operator_id, batches_cte.worker_id, - records_cte.records, - batches_cte.batches -FROM batches_cte JOIN records_cte USING (operator_id, worker_id)", + COALESCE(records_cte.records, 0) AS records, + batches_cte.batches, + COALESCE(heap_size_cte.size, 0) AS size, + COALESCE(heap_capacity_cte.capacity, 0) AS capacity, + COALESCE(heap_allocations_cte.allocations, 0) AS allocations +FROM batches_cte +LEFT OUTER JOIN records_cte USING (operator_id, worker_id) +LEFT OUTER JOIN heap_size_cte USING (operator_id, worker_id) +LEFT OUTER JOIN heap_capacity_cte USING (operator_id, worker_id) +LEFT OUTER JOIN heap_allocations_cte USING (operator_id, worker_id)", }; pub const MZ_ARRANGEMENT_SIZES: BuiltinView = BuiltinView { @@ -3022,7 +3109,10 @@ pub const MZ_ARRANGEMENT_SIZES: BuiltinView = BuiltinView { SELECT operator_id, pg_catalog.sum(records) AS records, - pg_catalog.sum(batches) AS batches + pg_catalog.sum(batches) AS batches, + pg_catalog.sum(size) AS size, + pg_catalog.sum(capacity) AS capacity, + pg_catalog.sum(allocations) AS allocations FROM mz_internal.mz_arrangement_sizes_per_worker GROUP BY operator_id", }; @@ -3108,7 +3198,10 @@ pub const MZ_DATAFLOW_ARRANGEMENT_SIZES: BuiltinView = BuiltinView { mdod.dataflow_id AS id, mo.name, COALESCE(sum(mas.records), 0) AS records, - COALESCE(sum(mas.batches), 0) AS batches + COALESCE(sum(mas.batches), 0) AS batches, + COALESCE(sum(mas.size), 0) AS size, + COALESCE(sum(mas.capacity), 0) AS capacity, + COALESCE(sum(mas.allocations), 0) AS allocations FROM mz_internal.mz_dataflow_operators AS mdo LEFT JOIN mz_internal.mz_arrangement_sizes AS mas ON mdo.id = mas.operator_id @@ -4061,6 +4154,10 @@ pub static BUILTINS_STATIC: Lazy>> = Lazy::new(|| { Builtin::Log(&MZ_MESSAGE_COUNTS_SENT_RAW), Builtin::Log(&MZ_ACTIVE_PEEKS_PER_WORKER), Builtin::Log(&MZ_PEEK_DURATIONS_HISTOGRAM_RAW), + Builtin::Log(&MZ_DATAFLOW_SHUTDOWN_DURATIONS_HISTOGRAM_RAW), + Builtin::Log(&MZ_ARRANGEMENT_HEAP_CAPACITY_RAW), + Builtin::Log(&MZ_ARRANGEMENT_HEAP_ALLOCATIONS_RAW), + Builtin::Log(&MZ_ARRANGEMENT_HEAP_SIZE_RAW), Builtin::Log(&MZ_SCHEDULING_ELAPSED_RAW), Builtin::Log(&MZ_COMPUTE_OPERATOR_DURATIONS_HISTOGRAM_RAW), Builtin::Log(&MZ_SCHEDULING_PARKS_HISTOGRAM_RAW), @@ -4151,6 +4248,8 @@ pub static BUILTINS_STATIC: Lazy>> = Lazy::new(|| { Builtin::View(&MZ_RECORDS_PER_DATAFLOW), Builtin::View(&MZ_PEEK_DURATIONS_HISTOGRAM_PER_WORKER), Builtin::View(&MZ_PEEK_DURATIONS_HISTOGRAM), + Builtin::View(&MZ_DATAFLOW_SHUTDOWN_DURATIONS_HISTOGRAM_PER_WORKER), + Builtin::View(&MZ_DATAFLOW_SHUTDOWN_DURATIONS_HISTOGRAM), Builtin::View(&MZ_SCHEDULING_ELAPSED_PER_WORKER), Builtin::View(&MZ_SCHEDULING_ELAPSED), Builtin::View(&MZ_SCHEDULING_PARKS_HISTOGRAM_PER_WORKER), diff --git a/src/adapter/src/flags.rs b/src/adapter/src/flags.rs index 86fe3306c9b4..ff9506d56f8f 100644 --- a/src/adapter/src/flags.rs +++ b/src/adapter/src/flags.rs @@ -19,6 +19,7 @@ pub fn compute_config(config: &SystemVars) -> ComputeParameters { ComputeParameters { max_result_size: Some(config.max_result_size()), dataflow_max_inflight_bytes: Some(config.dataflow_max_inflight_bytes()), + enable_arrangement_size_logging: Some(config.enable_arrangement_size_logging()), enable_mz_join_core: Some(config.enable_mz_join_core()), persist: persist_config(config), tracing: tracing_config(config), diff --git a/src/cluster/src/types.rs b/src/cluster/src/types.rs index 267071cfb53f..082ad8becdf3 100644 --- a/src/cluster/src/types.rs +++ b/src/cluster/src/types.rs @@ -30,7 +30,7 @@ pub trait AsRunnableWorker { /// Build and continuously run a worker. Called on each timely /// thread. - fn build_and_run( + fn build_and_run( config: Self, timely_worker: &mut TimelyWorker, client_rx: crossbeam_channel::Receiver<( diff --git a/src/compute-client/src/logging.proto b/src/compute-client/src/logging.proto index 090f53d54109..db787ad17db2 100644 --- a/src/compute-client/src/logging.proto +++ b/src/compute-client/src/logging.proto @@ -52,6 +52,10 @@ message ProtoComputeLog { google.protobuf.Empty peek_duration = 5; google.protobuf.Empty frontier_delay = 6; google.protobuf.Empty import_frontier_current = 7; + google.protobuf.Empty arrangement_heap_size = 8; + google.protobuf.Empty arrangement_heap_capacity = 9; + google.protobuf.Empty arrangement_heap_allocations = 10; + google.protobuf.Empty shutdown_duration = 11; } } message ProtoLogVariant { diff --git a/src/compute-client/src/logging.rs b/src/compute-client/src/logging.rs index db6f27219bc7..915aa7d0aebb 100644 --- a/src/compute-client/src/logging.rs +++ b/src/compute-client/src/logging.rs @@ -220,6 +220,10 @@ pub enum ComputeLog { PeekDuration, FrontierDelay, ImportFrontierCurrent, + ArrangementHeapSize, + ArrangementHeapCapacity, + ArrangementHeapAllocations, + ShutdownDuration, } impl RustType for ComputeLog { @@ -234,6 +238,10 @@ impl RustType for ComputeLog { ComputeLog::PeekDuration => PeekDuration(()), ComputeLog::FrontierDelay => FrontierDelay(()), ComputeLog::ImportFrontierCurrent => ImportFrontierCurrent(()), + ComputeLog::ArrangementHeapSize => ArrangementHeapSize(()), + ComputeLog::ArrangementHeapCapacity => ArrangementHeapCapacity(()), + ComputeLog::ArrangementHeapAllocations => ArrangementHeapAllocations(()), + ComputeLog::ShutdownDuration => ShutdownDuration(()), }), } } @@ -248,6 +256,10 @@ impl RustType for ComputeLog { Some(PeekDuration(())) => Ok(ComputeLog::PeekDuration), Some(FrontierDelay(())) => Ok(ComputeLog::FrontierDelay), Some(ImportFrontierCurrent(())) => Ok(ComputeLog::ImportFrontierCurrent), + Some(ArrangementHeapSize(())) => Ok(ComputeLog::ArrangementHeapSize), + Some(ArrangementHeapCapacity(())) => Ok(ComputeLog::ArrangementHeapCapacity), + Some(ArrangementHeapAllocations(())) => Ok(ComputeLog::ArrangementHeapAllocations), + Some(ShutdownDuration(())) => Ok(ComputeLog::ShutdownDuration), None => Err(TryFromProtoError::missing_field("ProtoComputeLog::kind")), } } @@ -365,7 +377,10 @@ impl LogVariant { LogVariant::Differential(DifferentialLog::ArrangementBatches) | LogVariant::Differential(DifferentialLog::ArrangementRecords) - | LogVariant::Differential(DifferentialLog::Sharing) => RelationDesc::empty() + | LogVariant::Differential(DifferentialLog::Sharing) + | LogVariant::Compute(ComputeLog::ArrangementHeapSize) + | LogVariant::Compute(ComputeLog::ArrangementHeapCapacity) + | LogVariant::Compute(ComputeLog::ArrangementHeapAllocations) => RelationDesc::empty() .with_column("operator_id", ScalarType::UInt64.nullable(false)) .with_column("worker_id", ScalarType::UInt64.nullable(false)), @@ -407,6 +422,10 @@ impl LogVariant { LogVariant::Compute(ComputeLog::PeekDuration) => RelationDesc::empty() .with_column("worker_id", ScalarType::UInt64.nullable(false)) .with_column("duration_ns", ScalarType::UInt64.nullable(false)), + + LogVariant::Compute(ComputeLog::ShutdownDuration) => RelationDesc::empty() + .with_column("worker_id", ScalarType::UInt64.nullable(false)) + .with_column("duration_ns", ScalarType::UInt64.nullable(false)), } } @@ -445,7 +464,10 @@ impl LogVariant { LogVariant::Timely(TimelyLog::Reachability) => vec![], LogVariant::Differential(DifferentialLog::ArrangementBatches) | LogVariant::Differential(DifferentialLog::ArrangementRecords) - | LogVariant::Differential(DifferentialLog::Sharing) => vec![( + | LogVariant::Differential(DifferentialLog::Sharing) + | LogVariant::Compute(ComputeLog::ArrangementHeapSize) + | LogVariant::Compute(ComputeLog::ArrangementHeapCapacity) + | LogVariant::Compute(ComputeLog::ArrangementHeapAllocations) => vec![( LogVariant::Timely(TimelyLog::Operates), vec![(0, 0), (1, 1)], )], @@ -456,6 +478,7 @@ impl LogVariant { LogVariant::Compute(ComputeLog::FrontierDelay) => vec![], LogVariant::Compute(ComputeLog::PeekCurrent) => vec![], LogVariant::Compute(ComputeLog::PeekDuration) => vec![], + LogVariant::Compute(ComputeLog::ShutdownDuration) => vec![], } } } diff --git a/src/compute-client/src/protocol/command.proto b/src/compute-client/src/protocol/command.proto index 51f6da472b0e..dbf8b2aa587c 100644 --- a/src/compute-client/src/protocol/command.proto +++ b/src/compute-client/src/protocol/command.proto @@ -68,4 +68,5 @@ message ProtoComputeParameters { optional uint64 dataflow_max_inflight_bytes = 3; optional bool enable_mz_join_core = 4; mz_tracing.params.ProtoTracingParameters tracing = 5; + optional bool enable_arrangement_size_logging = 6; } diff --git a/src/compute-client/src/protocol/command.rs b/src/compute-client/src/protocol/command.rs index 076972325a10..f84b60e80a6d 100644 --- a/src/compute-client/src/protocol/command.rs +++ b/src/compute-client/src/protocol/command.rs @@ -360,6 +360,8 @@ pub struct ComputeParameters { pub max_result_size: Option, /// The maximum number of in-flight bytes emitted by persist_sources feeding dataflows. pub dataflow_max_inflight_bytes: Option, + /// Enable arrangement size logging + pub enable_arrangement_size_logging: Option, /// Whether rendering should use `mz_join_core` rather than DD's `JoinCore::join_core`. pub enable_mz_join_core: Option, /// Persist client configuration. @@ -377,6 +379,7 @@ impl ComputeParameters { enable_mz_join_core, persist, tracing, + enable_arrangement_size_logging, } = other; if max_result_size.is_some() { @@ -389,6 +392,10 @@ impl ComputeParameters { self.enable_mz_join_core = enable_mz_join_core; } + if enable_arrangement_size_logging.is_some() { + self.enable_arrangement_size_logging = enable_arrangement_size_logging; + } + self.persist.update(persist); self.tracing.update(tracing); } @@ -404,6 +411,7 @@ impl RustType for ComputeParameters { ProtoComputeParameters { max_result_size: self.max_result_size.into_proto(), dataflow_max_inflight_bytes: self.dataflow_max_inflight_bytes.into_proto(), + enable_arrangement_size_logging: self.enable_arrangement_size_logging.into_proto(), enable_mz_join_core: self.enable_mz_join_core.into_proto(), persist: Some(self.persist.into_proto()), tracing: Some(self.tracing.into_proto()), @@ -414,6 +422,7 @@ impl RustType for ComputeParameters { Ok(Self { max_result_size: proto.max_result_size.into_rust()?, dataflow_max_inflight_bytes: proto.dataflow_max_inflight_bytes.into_rust()?, + enable_arrangement_size_logging: proto.enable_arrangement_size_logging.into_rust()?, enable_mz_join_core: proto.enable_mz_join_core.into_rust()?, persist: proto .persist diff --git a/src/compute/src/compute_state.rs b/src/compute/src/compute_state.rs index c2b1770bcf1c..cb10da616fc6 100644 --- a/src/compute/src/compute_state.rs +++ b/src/compute/src/compute_state.rs @@ -89,6 +89,8 @@ pub struct ComputeState { pub metrics: ComputeMetrics, /// A process-global handle to tracing configuration. pub tracing_handle: Arc, + /// Enable arrangement size logging + pub enable_arrangement_size_logging: bool, } impl ComputeState { @@ -118,7 +120,7 @@ impl SinkToken { } } -impl<'a, A: Allocate> ActiveComputeState<'a, A> { +impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> { /// Entrypoint for applying a compute command. #[tracing::instrument(level = "debug", skip(self))] pub fn handle_compute_command(&mut self, cmd: ComputeCommand) { @@ -153,6 +155,7 @@ impl<'a, A: Allocate> ActiveComputeState<'a, A> { let ComputeParameters { max_result_size, dataflow_max_inflight_bytes, + enable_arrangement_size_logging, enable_mz_join_core, persist, tracing, @@ -164,6 +167,9 @@ impl<'a, A: Allocate> ActiveComputeState<'a, A> { if let Some(v) = dataflow_max_inflight_bytes { self.compute_state.dataflow_max_inflight_bytes = v; } + if let Some(v) = enable_arrangement_size_logging { + self.compute_state.enable_arrangement_size_logging = v; + } if let Some(v) = enable_mz_join_core { self.compute_state.linear_join_impl = match v { false => LinearJoinImpl::DifferentialDataflow, @@ -337,7 +343,7 @@ impl<'a, A: Allocate> ActiveComputeState<'a, A> { panic!("dataflow server has already initialized logging"); } - let (logger, traces) = logging::initialize(self.timely_worker, config); + let (logger, traces) = logging::initialize(self.timely_worker, self.compute_state, config); // Install traces as maintained indexes for (log, trace) in traces { diff --git a/src/compute/src/extensions/arrange.rs b/src/compute/src/extensions/arrange.rs new file mode 100644 index 000000000000..44fcad23e7e1 --- /dev/null +++ b/src/compute/src/extensions/arrange.rs @@ -0,0 +1,503 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +use std::cell::Cell; +use std::rc::Rc; + +use differential_dataflow::difference::Semigroup; +use differential_dataflow::lattice::Lattice; +use differential_dataflow::operators::arrange::{Arrange, Arranged, TraceAgent}; +use differential_dataflow::trace::{Batch, Trace, TraceReader}; +use differential_dataflow::{Collection, Data, ExchangeData, Hashable}; +use timely::container::columnation::Columnation; +use timely::dataflow::channels::pact::{ParallelizationContract, Pipeline}; +use timely::dataflow::operators::Operator; +use timely::dataflow::scopes::Child; +use timely::dataflow::{Scope, ScopeParent, StreamCore}; +use timely::progress::timestamp::Refines; +use timely::progress::{Antichain, Timestamp}; + +use crate::logging::compute::ComputeEvent; +use crate::render::context::{ArrangementImport, ErrArrangementImport}; +use crate::typedefs::{RowKeySpine, RowSpine}; + +/// Extension trait to arrange data. +pub trait MzArrange +where + ::Timestamp: Lattice, +{ + /// The current scope. + type Scope: Scope; + /// The key type. + type Key: Data; + /// The value type. + type Val: Data; + /// The difference type. + type R: Data + Semigroup; + + /// Arranges a stream of `(Key, Val)` updates by `Key` into a trace of type `Tr`. + /// + /// This operator arranges a stream of values into a shared trace, whose contents it maintains. + /// This trace is current for all times marked completed in the output stream, and probing this stream + /// is the correct way to determine that times in the shared trace are committed. + fn mz_arrange(&self, name: &str) -> MzArranged> + where + Self::Key: ExchangeData + Hashable, + Self::Val: ExchangeData, + Self::R: ExchangeData, + Tr: Trace + + TraceReader< + Key = Self::Key, + Val = Self::Val, + Time = ::Timestamp, + R = Self::R, + > + 'static, + Tr::Batch: Batch; + + /// Arranges a stream of `(Key, Val)` updates by `Key` into a trace of type `Tr`. Partitions + /// the data according to `pact`. + /// + /// This operator arranges a stream of values into a shared trace, whose contents it maintains. + /// This trace is current for all times marked completed in the output stream, and probing this stream + /// is the correct way to determine that times in the shared trace are committed. + fn mz_arrange_core( + &self, + pact: P, + name: &str, + ) -> MzArranged> + where + P: ParallelizationContract< + ::Timestamp, + ( + (Self::Key, Self::Val), + ::Timestamp, + Self::R, + ), + >, + Tr: Trace + + TraceReader< + Key = Self::Key, + Val = Self::Val, + Time = ::Timestamp, + R = Self::R, + > + 'static, + Tr::Batch: Batch; +} + +impl MzArrange for Collection +where + G: Scope, + G::Timestamp: Lattice, + K: Data + Columnation, + V: Data + Columnation, + R: Semigroup, +{ + type Scope = G; + type Key = K; + type Val = V; + type R = R; + + fn mz_arrange(&self, name: &str) -> MzArranged> + where + K: ExchangeData + Hashable, + V: ExchangeData, + R: ExchangeData, + Tr: Trace + TraceReader + 'static, + Tr::Batch: Batch, + { + // Allow access to `arrange_named` because we're within Mz's wrapper. + #[allow(clippy::disallowed_methods)] + self.arrange_named(name).into() + } + + fn mz_arrange_core(&self, pact: P, name: &str) -> MzArranged> + where + P: ParallelizationContract, + Tr: Trace + TraceReader + 'static, + Tr::Batch: Batch, + { + // Allow access to `arrange_named` because we're within Mz's wrapper. + #[allow(clippy::disallowed_methods)] + self.arrange_core(pact, name).into() + } +} + +/// A specialized collection where data only has a key, but no associated value. +/// +/// Created by calling `collection.into()`. +pub struct KeyCollection(Collection); + +impl From> for KeyCollection { + fn from(value: Collection) -> Self { + KeyCollection(value) + } +} + +impl MzArrange for KeyCollection +where + G: Scope, + K: Data + Columnation, + G::Timestamp: Lattice, + R: Semigroup, +{ + type Scope = G; + type Key = K; + type Val = (); + type R = R; + + fn mz_arrange(&self, name: &str) -> MzArranged> + where + K: ExchangeData + Hashable, + R: ExchangeData, + Tr: Trace + TraceReader + 'static, + Tr::Batch: Batch, + { + self.0.map(|d| (d, ())).mz_arrange(name) + } + + fn mz_arrange_core(&self, pact: P, name: &str) -> MzArranged> + where + P: ParallelizationContract, + Tr: Trace + TraceReader + 'static, + Tr::Batch: Batch, + { + self.0.map(|d| (d, ())).mz_arrange_core(pact, name) + } +} + +/// A type that can log its heap size. +pub trait ArrangementSize { + /// Install a logger to track the heap size of the target. + fn log_arrangement_size(&self); +} + +/// Helper to compute the size of a vector in memory. +/// +/// The function only considers the immediate allocation of the vector, but is oblivious of any +/// pointers to owned allocations. +#[inline] +fn vec_size(data: &Vec, mut callback: impl FnMut(usize, usize)) { + let size_of_t = std::mem::size_of::(); + callback(data.len() * size_of_t, data.capacity() * size_of_t); +} + +/// Helper for [`ArrangementSize`] to install a common operator holding on to a trace. +/// +/// * `arranged`: The arrangement to inspect. +/// * `logic`: Closure that calculates the heap size/capacity/allocations for a trace. The return +/// value are size and capacity in bytes, and number of allocations, all in absolute values. +fn log_arrangement_size_inner(arranged: &Arranged>, mut logic: L) +where + G: Scope, + G::Timestamp: Timestamp + Lattice + Ord, + Tr: TraceReader + 'static, + Tr::Time: Timestamp + Lattice + Ord + Clone + 'static, + L: FnMut(&TraceAgent) -> (usize, usize, usize) + 'static, +{ + let scope = arranged.stream.scope(); + let Some(logger) = scope + .log_register() + .get::("materialize/compute") + else { + return; + }; + let mut trace = arranged.trace.clone(); + let operator = trace.operator().global_id; + + // We don't want to block compaction. + let empty = Antichain::new(); + trace.set_logical_compaction(empty.borrow()); + trace.set_physical_compaction(empty.borrow()); + + let (mut old_size, mut old_capacity, mut old_allocations) = (0isize, 0isize, 0isize); + + arranged + .stream + .unary(Pipeline, "ArrangementSize", |_cap, info| { + let mut buffer = Default::default(); + let address = info.address; + logger.log(ComputeEvent::ArrangementHeapSizeOperator { operator, address }); + move |input, output| { + while let Some((time, data)) = input.next() { + data.swap(&mut buffer); + output.session(&time).give_container(&mut buffer); + } + + let (size, capacity, allocations) = logic(&trace); + + let size = size.try_into().expect("must fit"); + if size != old_size { + logger.log(ComputeEvent::ArrangementHeapSize { + operator, + delta_size: size - old_size, + }); + } + + let capacity = capacity.try_into().expect("must fit"); + if capacity != old_capacity { + logger.log(ComputeEvent::ArrangementHeapCapacity { + operator, + delta_capacity: capacity - old_capacity, + }); + } + + let allocations = allocations.try_into().expect("must fit"); + if allocations != old_allocations { + logger.log(ComputeEvent::ArrangementHeapAllocations { + operator, + delta_allocations: allocations - old_allocations, + }); + } + + old_size = size; + old_capacity = capacity; + old_allocations = allocations; + } + }); +} + +impl ArrangementSize for MzArranged>> +where + G: Scope, + G::Timestamp: Lattice + Ord, + K: Data + Columnation, + V: Data + Columnation, + T: Lattice + Timestamp, + R: Semigroup, +{ + fn log_arrangement_size(&self) { + log_arrangement_size_inner(&self.arranged, |trace| { + let (mut size, mut capacity, mut allocations) = (0, 0, 0); + let mut callback = |siz, cap| { + allocations += 1; + size += siz; + capacity += cap + }; + trace.map_batches(|batch| { + batch.layer.keys.heap_size(&mut callback); + batch.layer.vals.keys.heap_size(&mut callback); + vec_size(&batch.layer.offs, &mut callback); + vec_size(&batch.layer.vals.offs, &mut callback); + vec_size(&batch.layer.vals.vals.vals, &mut callback); + }); + (size, capacity, allocations) + }); + } +} + +impl ArrangementSize for MzArranged>> +where + G: Scope, + G::Timestamp: Lattice + Ord, + K: Data + Columnation, + T: Lattice + Timestamp, + R: Semigroup, +{ + fn log_arrangement_size(&self) { + log_arrangement_size_inner(&self.arranged, |trace| { + let (mut size, mut capacity, mut allocations) = (0, 0, 0); + let mut callback = |siz, cap| { + allocations += 1; + size += siz; + capacity += cap + }; + trace.map_batches(|batch| { + batch.layer.keys.heap_size(&mut callback); + vec_size(&batch.layer.offs, &mut callback); + vec_size(&batch.layer.vals.vals, &mut callback); + }); + (size, capacity, allocations) + }); + } +} + +impl ArrangementSize for ArrangementImport +where + G: Scope, + G::Timestamp: Lattice, + G::Timestamp: Refines, + T: Timestamp + Lattice, + V: Data + Columnation, +{ + fn log_arrangement_size(&self) { + // Imported traces are logged during export, so we don't have to log anything here. + } +} + +impl ArrangementSize for ErrArrangementImport +where + G: Scope, + G::Timestamp: Lattice, + G::Timestamp: Refines, + T: Timestamp + Lattice, +{ + fn log_arrangement_size(&self) { + // Imported traces are logged during export, so we don't have to log anything here. + } +} + +/// A wrapper behaving similar to Differential's [`Arranged`] type. +/// +/// This type enables logging arrangement sizes, but only if the trace is consumed downstream. +#[derive(Clone)] +pub struct MzArranged +where + G: Scope, + G::Timestamp: Lattice, + Tr: TraceReader, config: &mz_compute_client::logging::LoggingConfig, event_queue: EventQueue, + shared_state: Rc>, + compute_state: &ComputeState, ) -> BTreeMap)> { let logging_interval_ms = std::cmp::max(1, config.interval.as_millis()); let worker_id = worker.index(); + let worker2 = worker.clone(); worker.dataflow_named("Dataflow: compute logging", move |scope| { let (mut logs, token) = Some(event_queue.link).mz_replay( @@ -138,8 +183,13 @@ pub(super) fn construct( let (mut frontier_delay_out, frontier_delay) = demux.new_output(); let (mut peek_out, peek) = demux.new_output(); let (mut peek_duration_out, peek_duration) = demux.new_output(); + let (mut shutdown_duration_out, shutdown_duration) = demux.new_output(); + let (mut arrangement_heap_size_out, arrangement_heap_size) = demux.new_output(); + let (mut arrangement_heap_capacity_out, arrangement_heap_capacity) = demux.new_output(); + let (mut arrangement_heap_allocations_out, arrangement_heap_allocations) = + demux.new_output(); - let mut demux_state = DemuxState::default(); + let mut demux_state = DemuxState::new(worker2); let mut demux_buffer = Vec::new(); demux.build(move |_capability| { move |_frontiers| { @@ -150,6 +200,10 @@ pub(super) fn construct( let mut frontier_delay = frontier_delay_out.activate(); let mut peek = peek_out.activate(); let mut peek_duration = peek_duration_out.activate(); + let mut shutdown_duration = shutdown_duration_out.activate(); + let mut arrangement_heap_size = arrangement_heap_size_out.activate(); + let mut arrangement_heap_capacity = arrangement_heap_capacity_out.activate(); + let mut arrangement_heap_allocations = arrangement_heap_allocations_out.activate(); input.for_each(|cap, data| { data.swap(&mut demux_buffer); @@ -162,6 +216,10 @@ pub(super) fn construct( frontier_delay: frontier_delay.session(&cap), peek: peek.session(&cap), peek_duration: peek_duration.session(&cap), + shutdown_duration: shutdown_duration.session(&cap), + arrangement_heap_size: arrangement_heap_size.session(&cap), + arrangement_heap_capacity: arrangement_heap_capacity.session(&cap), + arrangement_heap_allocations: arrangement_heap_allocations.session(&cap), }; for (time, logger_id, event) in demux_buffer.drain(..) { @@ -172,6 +230,7 @@ pub(super) fn construct( DemuxHandler { state: &mut demux_state, + shared_state: &mut shared_state.borrow_mut(), output: &mut output_sessions, logging_interval_ms, time, @@ -231,38 +290,54 @@ pub(super) fn construct( let peek_duration = peek_duration.as_collection().map(move |bucket| { Row::pack_slice(&[ Datum::UInt64(u64::cast_from(worker_id)), - Datum::UInt64(bucket.try_into().expect("pow too big")), + Datum::UInt64(bucket.try_into().expect("bucket too big")), ]) }); + let shutdown_duration = shutdown_duration.as_collection().map(move |bucket| { + Row::pack_slice(&[ + Datum::UInt64(u64::cast_from(worker_id)), + Datum::UInt64(bucket.try_into().expect("bucket too big")), + ]) + }); + + let arrangement_heap_datum_to_row = move |ArrangementHeapDatum { operator_id }| { + Row::pack_slice(&[ + Datum::UInt64(operator_id.try_into().expect("operator_id too big")), + Datum::UInt64(u64::cast_from(worker_id)), + ]) + }; + + let arrangement_heap_size = arrangement_heap_size + .as_collection() + .map(arrangement_heap_datum_to_row.clone()); + let arrangement_heap_capacity = arrangement_heap_capacity + .as_collection() + .map(arrangement_heap_datum_to_row.clone()); + + let arrangement_heap_allocations = arrangement_heap_allocations + .as_collection() + .map(arrangement_heap_datum_to_row); + + use ComputeLog::*; let logs = [ - ( - LogVariant::Compute(ComputeLog::DataflowCurrent), - dataflow_current, - ), - ( - LogVariant::Compute(ComputeLog::DataflowDependency), - dataflow_dependency, - ), - ( - LogVariant::Compute(ComputeLog::FrontierCurrent), - frontier_current, - ), - ( - LogVariant::Compute(ComputeLog::ImportFrontierCurrent), - import_frontier_current, - ), - ( - LogVariant::Compute(ComputeLog::FrontierDelay), - frontier_delay, - ), - (LogVariant::Compute(ComputeLog::PeekCurrent), peek_current), - (LogVariant::Compute(ComputeLog::PeekDuration), peek_duration), + (DataflowCurrent, dataflow_current), + (DataflowDependency, dataflow_dependency), + (FrontierCurrent, frontier_current), + (ImportFrontierCurrent, import_frontier_current), + (FrontierDelay, frontier_delay), + (PeekCurrent, peek_current), + (PeekDuration, peek_duration), + (ShutdownDuration, shutdown_duration), + (ArrangementHeapSize, arrangement_heap_size), + (ArrangementHeapCapacity, arrangement_heap_capacity), + (ArrangementHeapAllocations, arrangement_heap_allocations), ]; // Build the output arrangements. let mut traces = BTreeMap::new(); for (variant, collection) in logs { + let variant = LogVariant::Compute(variant); if config.index_logs.contains_key(&variant) { let key = variant.index_by(); let (_, value) = permutation_for_arrangement( @@ -272,7 +347,7 @@ pub(super) fn construct( .collect::>(), variant.desc().arity(), ); - let trace = collection + let arranged = collection .map({ let mut row_buf = Row::default(); let mut datums = DatumVec::new(); @@ -285,8 +360,10 @@ pub(super) fn construct( (row_key, row_val) } }) - .arrange_named::>(&format!("ArrangeByKey {:?}", variant)) - .trace; + .mz_arrange::>(&format!("ArrangeByKey {:?}", variant)); + // Safety: We're exporting the trace. + let trace = + unsafe { arranged.trace(compute_state.enable_arrangement_size_logging) }; traces.insert(variant.clone(), (trace, Rc::clone(&token))); } } @@ -296,14 +373,38 @@ pub(super) fn construct( } /// State maintained by the demux operator. -#[derive(Default)] -struct DemuxState { +struct DemuxState { + /// The worker hosting this operator. + worker: Worker, /// Maps dataflow exports to dataflow IDs. export_dataflows: BTreeMap, /// Maps dataflow exports to their imports and frontier delay tracking state. export_imports: BTreeMap>, - /// Maps pending peeks to their installation time (in ns). + /// Maps live dataflows to counts of their exports. + dataflow_export_counts: BTreeMap, + /// Maps dropped dataflows to their drop time. + dataflow_drop_times: BTreeMap, + /// Contains dataflows that have shut down but not yet been dropped. + shutdown_dataflows: BTreeSet, + /// Maps pending peeks to their installation time. peek_stash: BTreeMap, + /// Arrangement size stash + arrangement_size: BTreeMap, +} + +impl DemuxState { + fn new(worker: Worker) -> Self { + Self { + worker, + export_dataflows: Default::default(), + export_imports: Default::default(), + dataflow_export_counts: Default::default(), + dataflow_drop_times: Default::default(), + shutdown_dataflows: Default::default(), + peek_stash: Default::default(), + arrangement_size: Default::default(), + } + } } /// State for tracking import-export frontier lag. @@ -312,7 +413,7 @@ struct FrontierDelayState { /// A list of input timestamps that have appeared on the input /// frontier, but that the output frontier has not yet advanced beyond, /// and the time at which we were informed of their availability. - time_deque: VecDeque<(mz_repr::Timestamp, Duration)>, + time_deque: VecDeque<(Timestamp, Duration)>, /// A histogram of emitted delays (bucket size to bucket_count). delay_map: BTreeMap, } @@ -330,6 +431,10 @@ struct DemuxOutput<'a> { frontier_delay: OutputSession<'a, FrontierDelayDatum>, peek: OutputSession<'a, Peek>, peek_duration: OutputSession<'a, u128>, + shutdown_duration: OutputSession<'a, u128>, + arrangement_heap_size: OutputSession<'a, ArrangementHeapDatum>, + arrangement_heap_capacity: OutputSession<'a, ArrangementHeapDatum>, + arrangement_heap_allocations: OutputSession<'a, ArrangementHeapDatum>, } #[derive(Clone)] @@ -364,10 +469,24 @@ struct FrontierDelayDatum { delay_pow: u128, } +#[derive(Clone)] +struct ArrangementHeapDatum { + operator_id: usize, +} + +#[derive(Default)] +struct ArrangementSizeState { + size: isize, + capacity: isize, + count: isize, +} + /// Event handler of the demux operator. -struct DemuxHandler<'a, 'b> { +struct DemuxHandler<'a, 'b, A: Allocate + 'static> { /// State kept by the demux operator. - state: &'a mut DemuxState, + state: &'a mut DemuxState, + /// State shared across log receivers. + shared_state: &'a mut SharedLoggingState, /// Demux output sessions. output: &'a mut DemuxOutput<'b>, /// The logging interval specifying the time granularity for the updates. @@ -376,7 +495,7 @@ struct DemuxHandler<'a, 'b> { time: Duration, } -impl DemuxHandler<'_, '_> { +impl DemuxHandler<'_, '_, A> { /// Return the timestamp associated with the current event, based on the event time and the /// logging interval. fn ts(&self) -> Timestamp { @@ -406,6 +525,25 @@ impl DemuxHandler<'_, '_> { time, diff, } => self.handle_import_frontier(import_id, export_id, time, diff), + ArrangementHeapSize { + operator, + delta_size: size, + } => self.handle_arrangement_heap_size(operator, size), + ArrangementHeapCapacity { + operator, + delta_capacity: capacity, + } => self.handle_arrangement_heap_capacity(operator, capacity), + ArrangementHeapAllocations { + operator, + delta_allocations: allocations, + } => self.handle_arrangement_heap_allocations(operator, allocations), + ArrangementHeapSizeOperator { operator, address } => { + self.handle_arrangement_heap_size_operator(operator, address) + } + ArrangementHeapSizeOperatorDrop { operator } => { + self.handle_arrangement_heap_size_operator_dropped(operator) + } + DataflowShutdown { dataflow_index } => self.handle_dataflow_shutdown(dataflow_index), } } @@ -416,6 +554,11 @@ impl DemuxHandler<'_, '_> { self.state.export_dataflows.insert(id, dataflow_id); self.state.export_imports.insert(id, BTreeMap::new()); + *self + .state + .dataflow_export_counts + .entry(dataflow_id) + .or_default() += 1; } fn handle_export_dropped(&mut self, id: GlobalId) { @@ -423,6 +566,18 @@ impl DemuxHandler<'_, '_> { if let Some(dataflow_id) = self.state.export_dataflows.remove(&id) { let datum = ExportDatum { id, dataflow_id }; self.output.export.give((datum, ts, -1)); + + match self.state.dataflow_export_counts.get_mut(&dataflow_id) { + entry @ Some(0) | entry @ None => { + error!( + export = ?id, + dataflow = ?dataflow_id, + "invalid dataflow_export_counts entry at time of export drop: {entry:?}", + ); + } + Some(1) => self.handle_dataflow_dropped(dataflow_id), + Some(count) => *count -= 1, + } } else { error!( export = ?id, @@ -456,6 +611,38 @@ impl DemuxHandler<'_, '_> { } } + fn handle_dataflow_dropped(&mut self, id: usize) { + self.state.dataflow_export_counts.remove(&id); + + if self.state.shutdown_dataflows.remove(&id) { + // Dataflow has already shut down before it was dropped. + self.output.shutdown_duration.give((0, self.ts(), 1)); + } else { + // Dataflow has not yet shut down. + let existing = self.state.dataflow_drop_times.insert(id, self.time); + if existing.is_some() { + error!(dataflow = ?id, "dataflow already dropped"); + } + } + } + + fn handle_dataflow_shutdown(&mut self, id: usize) { + if let Some(start) = self.state.dataflow_drop_times.remove(&id) { + // Dataflow has alredy been dropped. + let elapsed_ns = self.time.saturating_sub(start).as_nanos(); + let elapsed_pow = elapsed_ns.next_power_of_two(); + self.output + .shutdown_duration + .give((elapsed_pow, self.ts(), 1)); + } else { + // Dataflow has not yet been dropped. + let was_new = self.state.shutdown_dataflows.insert(id); + if !was_new { + error!(dataflow = ?id, "dataflow already shutdown"); + } + } + } + fn handle_export_dependency(&mut self, export_id: GlobalId, import_id: GlobalId) { let ts = self.ts(); let datum = DependencyDatum { @@ -584,6 +771,82 @@ impl DemuxHandler<'_, '_> { } } } + + /// Update the allocation size for an arrangement. + fn handle_arrangement_heap_size(&mut self, operator_id: usize, size: isize) { + let ts = self.ts(); + let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {return}; + + let datum = ArrangementHeapDatum { operator_id }; + self.output + .arrangement_heap_size + .give((datum, ts, Diff::cast_from(size))); + + state.size += size; + } + + /// Update the allocation capacity for an arrangement. + fn handle_arrangement_heap_capacity(&mut self, operator_id: usize, capacity: isize) { + let ts = self.ts(); + let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {return}; + + let datum = ArrangementHeapDatum { operator_id }; + self.output + .arrangement_heap_capacity + .give((datum, ts, Diff::cast_from(capacity))); + + state.capacity += capacity; + } + + /// Update the allocation count for an arrangement. + fn handle_arrangement_heap_allocations(&mut self, operator_id: usize, count: isize) { + let ts = self.ts(); + let Some(state) = self.state.arrangement_size.get_mut(&operator_id) else {return}; + + let datum = ArrangementHeapDatum { operator_id }; + self.output + .arrangement_heap_allocations + .give((datum, ts, Diff::cast_from(count))); + + state.count += count; + } + + /// Indicate that a new arrangement exists, start maintaining the heap size state. + fn handle_arrangement_heap_size_operator(&mut self, operator_id: usize, address: Vec) { + let activator = self.state.worker.activator_for(&address); + self.state + .arrangement_size + .insert(operator_id, Default::default()); + self.shared_state + .arrangement_size_activators + .insert(operator_id, activator); + } + + /// Indicate that an arrangement has been dropped and we can cleanup the heap size state. + fn handle_arrangement_heap_size_operator_dropped(&mut self, operator_id: usize) { + if let Some(state) = self.state.arrangement_size.remove(&operator_id) { + let ts = self.ts(); + let datum = ArrangementHeapDatum { operator_id }; + self.output.arrangement_heap_size.give(( + datum.clone(), + ts, + -Diff::cast_from(state.size), + )); + self.output.arrangement_heap_capacity.give(( + datum.clone(), + ts, + -Diff::cast_from(state.capacity), + )); + self.output.arrangement_heap_allocations.give(( + datum, + ts, + -Diff::cast_from(state.count), + )); + } + self.shared_state + .arrangement_size_activators + .remove(&operator_id); + } } pub(crate) trait LogImportFrontiers { diff --git a/src/compute/src/logging/differential.rs b/src/compute/src/logging/differential.rs index 12a61f77d960..1d91b82d9335 100644 --- a/src/compute/src/logging/differential.rs +++ b/src/compute/src/logging/differential.rs @@ -10,27 +10,30 @@ //! Logging dataflows for events generated by differential dataflow. use std::any::Any; +use std::cell::RefCell; use std::collections::BTreeMap; use std::rc::Rc; use std::time::Duration; +use crate::compute_state::ComputeState; use differential_dataflow::collection::AsCollection; use differential_dataflow::logging::{ BatchEvent, DifferentialEvent, DropEvent, MergeEvent, TraceShare, }; -use differential_dataflow::operators::arrange::arrangement::Arrange; use mz_expr::{permutation_for_arrangement, MirScalarExpr}; use mz_ore::cast::CastFrom; use mz_repr::{Datum, DatumVec, Diff, Row, Timestamp}; use mz_timely_util::buffer::ConsolidateBuffer; use mz_timely_util::replay::MzReplay; use timely::communication::Allocate; -use timely::dataflow::channels::pact::Pipeline; +use timely::dataflow::channels::pact::{Exchange, Pipeline}; use timely::dataflow::channels::pushers::Tee; use timely::dataflow::operators::generic::builder_rc::OperatorBuilder; use timely::dataflow::operators::{Filter, InputCapability}; -use crate::logging::{DifferentialLog, EventQueue, LogVariant}; +use crate::extensions::arrange::MzArrange; +use crate::logging::compute::ComputeEvent; +use crate::logging::{DifferentialLog, EventQueue, LogVariant, SharedLoggingState}; use crate::typedefs::{KeysValsHandle, RowSpine}; /// Constructs the logging dataflow for differential logs. @@ -45,6 +48,8 @@ pub(super) fn construct( worker: &mut timely::worker::Worker, config: &mz_compute_client::logging::LoggingConfig, event_queue: EventQueue, + shared_state: Rc>, + compute_state: &ComputeState, ) -> BTreeMap)> { let logging_interval_ms = std::cmp::max(1, config.interval.as_millis()); let worker_id = worker.index(); @@ -73,6 +78,7 @@ pub(super) fn construct( let (mut sharing_out, sharing) = demux.new_output(); let mut demux_buffer = Vec::new(); + let mut demux_state = Default::default(); demux.build(move |_capability| { move |_frontiers| { let mut batches = batches_out.activate(); @@ -95,10 +101,12 @@ pub(super) fn construct( assert_eq!(logger_id, worker_id); DemuxHandler { + state: &mut demux_state, output: &mut output_buffers, logging_interval_ms, time, cap: &cap, + shared_state: &mut shared_state.borrow_mut(), } .handle(event); } @@ -119,28 +127,32 @@ pub(super) fn construct( Datum::UInt64(u64::cast_from(worker_id)), ]) }); - let sharing = sharing.as_collection().map(move |op| { + + let sharing = sharing + .as_collection() + .mz_arrange_core::<_, RowSpine<_, _, _, _>>( + Exchange::new(move |_| u64::cast_from(worker_id)), + "PreArrange Differential sharing", + ); + + let sharing = sharing.as_collection(move |op, ()| { Row::pack_slice(&[ - Datum::UInt64(u64::cast_from(op)), + Datum::UInt64(u64::cast_from(*op)), Datum::UInt64(u64::cast_from(worker_id)), ]) }); + use DifferentialLog::*; let logs = [ - ( - LogVariant::Differential(DifferentialLog::ArrangementBatches), - arrangement_batches, - ), - ( - LogVariant::Differential(DifferentialLog::ArrangementRecords), - arrangement_records, - ), - (LogVariant::Differential(DifferentialLog::Sharing), sharing), + (ArrangementBatches, arrangement_batches), + (ArrangementRecords, arrangement_records), + (Sharing, sharing), ]; // Build the output arrangements. let mut traces = BTreeMap::new(); for (variant, collection) in logs { + let variant = LogVariant::Differential(variant); if config.index_logs.contains_key(&variant) { let key = variant.index_by(); let (_, value) = permutation_for_arrangement( @@ -150,7 +162,7 @@ pub(super) fn construct( .collect::>(), variant.desc().arity(), ); - let trace = collection + let arranged = collection .map({ let mut row_buf = Row::default(); let mut datums = DatumVec::new(); @@ -163,8 +175,10 @@ pub(super) fn construct( (row_key, row_val) } }) - .arrange_named::>(&format!("ArrangeByKey {:?}", variant)) - .trace; + .mz_arrange::>(&format!("ArrangeByKey {:?}", variant)); + // Safety: We're exporting the trace. + let trace = + unsafe { arranged.trace(compute_state.enable_arrangement_size_logging) }; traces.insert(variant.clone(), (trace, Rc::clone(&token))); } } @@ -180,11 +194,20 @@ type OutputBuffer<'a, 'b, D> = ConsolidateBuffer<'a, 'b, Timestamp, D, Diff, Pus struct DemuxOutput<'a, 'b> { batches: OutputBuffer<'a, 'b, usize>, records: OutputBuffer<'a, 'b, usize>, - sharing: OutputBuffer<'a, 'b, usize>, + sharing: OutputBuffer<'a, 'b, (usize, ())>, +} + +/// State maintained by the demux operator. +#[derive(Default)] +struct DemuxState { + /// Arrangement trace sharing + sharing: BTreeMap, } /// Event handler of the demux operator. struct DemuxHandler<'a, 'b, 'c> { + /// State kept by the demux operator + state: &'a mut DemuxState, /// Demux output buffers. output: &'a mut DemuxOutput<'b, 'c>, /// The logging interval specifying the time granularity for the updates. @@ -193,6 +216,8 @@ struct DemuxHandler<'a, 'b, 'c> { time: Duration, /// A capability usable for emitting outputs. cap: &'a InputCapability, + /// State shared across log receivers. + shared_state: &'a mut SharedLoggingState, } impl DemuxHandler<'_, '_, '_> { @@ -225,6 +250,7 @@ impl DemuxHandler<'_, '_, '_> { let diff = Diff::try_from(event.length).expect("must fit"); self.output.records.give(self.cap, (op, ts, diff)); + self.notify_arrangement_size(op); } fn handle_merge(&mut self, event: MergeEvent) { @@ -239,6 +265,7 @@ impl DemuxHandler<'_, '_, '_> { if diff != 0 { self.output.records.give(self.cap, (op, ts, diff)); } + self.notify_arrangement_size(op); } fn handle_drop(&mut self, event: DropEvent) { @@ -250,6 +277,7 @@ impl DemuxHandler<'_, '_, '_> { if diff != 0 { self.output.records.give(self.cap, (op, ts, diff)); } + self.notify_arrangement_size(op); } fn handle_trace_share(&mut self, event: TraceShare) { @@ -257,6 +285,27 @@ impl DemuxHandler<'_, '_, '_> { let op = event.operator; let diff = Diff::cast_from(event.diff); debug_assert_ne!(diff, 0); - self.output.sharing.give(self.cap, (op, ts, diff)); + self.output.sharing.give(self.cap, ((op, ()), ts, diff)); + + if let Some(logger) = &mut self.shared_state.compute_logger { + let sharing = self.state.sharing.entry(op).or_default(); + *sharing = (i64::try_from(*sharing).expect("must fit") + diff) + .try_into() + .expect("under/overflow"); + if *sharing == 0 { + self.state.sharing.remove(&op); + logger.log(ComputeEvent::ArrangementHeapSizeOperatorDrop { operator: op }); + } + } + } + + fn notify_arrangement_size(&self, operator: usize) { + // While every arrangement should have a corresponding arrangement size operator, + // we have no guarantee that it already/still exists. Otherwise we could print a warning + // here, but it's difficult to implement without maintaining state for a longer period than + // while the arrangement actually exists. + if let Some(activator) = self.shared_state.arrangement_size_activators.get(&operator) { + activator.activate(); + } } } diff --git a/src/compute/src/logging/initialize.rs b/src/compute/src/logging/initialize.rs index 63b6e6faeeb8..dced08f03b96 100644 --- a/src/compute/src/logging/initialize.rs +++ b/src/compute/src/logging/initialize.rs @@ -5,11 +5,12 @@ //! Initialization of logging dataflows. +use std::cell::RefCell; use std::collections::BTreeMap; +use std::rc::Rc; use std::time::{Duration, Instant}; use differential_dataflow::logging::DifferentialEvent; -use differential_dataflow::operators::arrange::Arrange; use differential_dataflow::Collection; use mz_compute_client::logging::{LogVariant, LoggingConfig}; use mz_repr::{Diff, Timestamp}; @@ -20,16 +21,19 @@ use timely::logging::{Logger, TimelyEvent}; use timely::progress::reachability::logging::TrackerEvent; use crate::arrangement::manager::TraceBundle; +use crate::compute_state::ComputeState; +use crate::extensions::arrange::{KeyCollection, MzArrange}; use crate::logging::compute::ComputeEvent; use crate::logging::reachability::ReachabilityEvent; -use crate::logging::{BatchLogger, EventQueue}; +use crate::logging::{BatchLogger, EventQueue, SharedLoggingState}; /// Initialize logging dataflows. /// /// Returns a logger for compute events, and for each `LogVariant` a trace bundle usable for /// retrieving logged records. -pub fn initialize( +pub fn initialize( worker: &mut timely::worker::Worker, + compute_state: &ComputeState, config: &LoggingConfig, ) -> (super::compute::Logger, BTreeMap) { let interval_ms = std::cmp::max(1, config.interval.as_millis()) @@ -47,6 +51,7 @@ pub fn initialize( let mut context = LoggingContext { worker, config, + compute_state, interval_ms, now, start_offset, @@ -54,6 +59,7 @@ pub fn initialize( r_event_queue: EventQueue::new("r"), d_event_queue: EventQueue::new("d"), c_event_queue: EventQueue::new("c"), + shared_state: Default::default(), }; // Depending on whether we should log the creation of the logging dataflows, we register the @@ -74,6 +80,7 @@ pub fn initialize( struct LoggingContext<'a, A: Allocate> { worker: &'a mut timely::worker::Worker, config: &'a LoggingConfig, + compute_state: &'a ComputeState, interval_ms: u64, now: Instant, start_offset: Duration, @@ -81,38 +88,48 @@ struct LoggingContext<'a, A: Allocate> { r_event_queue: EventQueue, d_event_queue: EventQueue, c_event_queue: EventQueue, + shared_state: Rc>, } -impl LoggingContext<'_, A> { +impl LoggingContext<'_, A> { fn construct_dataflows(&mut self) -> BTreeMap { let mut traces = BTreeMap::new(); traces.extend(super::timely::construct( self.worker, self.config, self.t_event_queue.clone(), + Rc::clone(&self.shared_state), + self.compute_state, )); traces.extend(super::reachability::construct( self.worker, self.config, self.r_event_queue.clone(), + self.compute_state, )); traces.extend(super::differential::construct( self.worker, self.config, self.d_event_queue.clone(), + Rc::clone(&self.shared_state), + self.compute_state, )); traces.extend(super::compute::construct( self.worker, self.config, self.c_event_queue.clone(), + Rc::clone(&self.shared_state), + self.compute_state, )); let errs = self .worker .dataflow_named("Dataflow: logging errors", |scope| { - Collection::<_, DataflowError, Diff>::empty(scope) - .arrange_named("Arrange logging err") - .trace + let collection: KeyCollection<_, DataflowError, Diff> = + Collection::empty(scope).into(); + let arranged = collection.mz_arrange("Arrange logging err"); + // Safety: We're exporting the trace. + unsafe { arranged.trace(self.compute_state.enable_arrangement_size_logging) } }); traces @@ -125,20 +142,18 @@ impl LoggingContext<'_, A> { } fn register_loggers(&self) { - self.worker - .log_register() - .insert_logger("timely", self.simple_logger(self.t_event_queue.clone())); - self.worker - .log_register() - .insert_logger("timely/reachability", self.reachability_logger()); - self.worker.log_register().insert_logger( - "differential/arrange", - self.simple_logger(self.d_event_queue.clone()), - ); - self.worker.log_register().insert_logger( - "materialize/compute", - self.simple_logger(self.c_event_queue.clone()), - ); + let t_logger = self.simple_logger(self.t_event_queue.clone()); + let r_logger = self.reachability_logger(); + let d_logger = self.simple_logger(self.d_event_queue.clone()); + let c_logger = self.simple_logger(self.c_event_queue.clone()); + + let mut register = self.worker.log_register(); + register.insert_logger("timely", t_logger); + register.insert_logger("timely/reachability", r_logger); + register.insert_logger("differential/arrange", d_logger); + register.insert_logger("materialize/compute", c_logger.clone()); + + self.shared_state.borrow_mut().compute_logger = Some(c_logger); } fn simple_logger(&self, event_queue: EventQueue) -> Logger { diff --git a/src/compute/src/logging/mod.rs b/src/compute/src/logging/mod.rs index dcf7f269a2cc..3f82b47f3e7c 100644 --- a/src/compute/src/logging/mod.rs +++ b/src/compute/src/logging/mod.rs @@ -15,16 +15,20 @@ mod initialize; mod reachability; mod timely; +use std::collections::BTreeMap; use std::rc::Rc; use std::time::Duration; use ::timely::dataflow::operators::capture::{Event, EventLink, EventPusher}; use ::timely::logging::WorkerIdentifier; use ::timely::progress::Timestamp as TimelyTimestamp; +use ::timely::scheduling::Activator; use mz_compute_client::logging::{ComputeLog, DifferentialLog, LogVariant, TimelyLog}; use mz_repr::Timestamp; use mz_timely_util::activator::RcActivator; +use crate::logging::compute::Logger as ComputeLogger; + pub use crate::logging::initialize::initialize; /// Logs events as a timely stream, with progress statements. @@ -142,3 +146,12 @@ impl EventQueue { } } } + +/// State shared between different logging dataflows. +#[derive(Default)] +struct SharedLoggingState { + /// Activators for arrangement heap size operators. + arrangement_size_activators: BTreeMap, + /// Shared compute logger. + compute_logger: Option, +} diff --git a/src/compute/src/logging/reachability.rs b/src/compute/src/logging/reachability.rs index 5e029cb6e24c..91af1cf96dee 100644 --- a/src/compute/src/logging/reachability.rs +++ b/src/compute/src/logging/reachability.rs @@ -14,7 +14,6 @@ use std::collections::BTreeMap; use std::convert::TryInto; use std::rc::Rc; -use differential_dataflow::operators::arrange::arrangement::Arrange; use differential_dataflow::AsCollection; use mz_compute_client::logging::LoggingConfig; use mz_expr::{permutation_for_arrangement, MirScalarExpr}; @@ -27,6 +26,8 @@ use timely::communication::Allocate; use timely::dataflow::channels::pact::Exchange; use timely::dataflow::operators::Filter; +use crate::compute_state::ComputeState; +use crate::extensions::arrange::MzArrange; use crate::logging::{EventQueue, LogVariant, TimelyLog}; use crate::typedefs::{KeysValsHandle, RowSpine}; @@ -48,6 +49,7 @@ pub(super) fn construct( worker: &mut timely::worker::Worker, config: &LoggingConfig, event_queue: EventQueue, + compute_state: &ComputeState, ) -> BTreeMap)> { let interval_ms = std::cmp::max(1, config.interval.as_millis()); @@ -95,14 +97,8 @@ pub(super) fn construct( .try_into() .expect("must fit"); for (source, port, update_type, ts, diff) in massaged { - updates_session.give( - &cap, - ( - (update_type, addr.clone(), source, port, worker, ts), - time_ms, - diff, - ), - ); + let datum = (update_type, addr.clone(), source, port, worker, ts); + updates_session.give(&cap, ((datum, ()), time_ms, diff)); } } }); @@ -111,7 +107,7 @@ pub(super) fn construct( let updates = updates .as_collection() - .arrange_core::<_, RowSpine<_, _, _, _>>( + .mz_arrange_core::<_, RowSpine<_, _, _, _>>( Exchange::new(|(((_, _, _, _, w, _), ()), _, _)| u64::cast_from(*w)), "PreArrange Timely reachability", ); @@ -153,9 +149,11 @@ pub(super) fn construct( }, ); - let trace = updates - .arrange_named::>(&format!("Arrange {:?}", variant)) - .trace; + let arranged = + updates.mz_arrange::>(&format!("Arrange {:?}", variant)); + // Safety: We're exporting the trace. + let trace = + unsafe { arranged.trace(compute_state.enable_arrangement_size_logging) }; result.insert(variant.clone(), (trace, Rc::clone(&token))); } } diff --git a/src/compute/src/logging/timely.rs b/src/compute/src/logging/timely.rs index b745ae952faf..3435ab25025c 100644 --- a/src/compute/src/logging/timely.rs +++ b/src/compute/src/logging/timely.rs @@ -10,12 +10,12 @@ //! Logging dataflows for events generated by timely dataflow. use std::any::Any; +use std::cell::RefCell; use std::collections::BTreeMap; use std::rc::Rc; use std::time::Duration; use differential_dataflow::collection::AsCollection; -use differential_dataflow::operators::arrange::arrangement::Arrange; use mz_compute_client::logging::LoggingConfig; use mz_expr::{permutation_for_arrangement, MirScalarExpr}; use mz_ore::cast::CastFrom; @@ -35,7 +35,10 @@ use timely::logging::{ }; use tracing::error; -use crate::logging::{EventQueue, LogVariant, TimelyLog}; +use crate::compute_state::ComputeState; +use crate::extensions::arrange::MzArrange; +use crate::logging::compute::ComputeEvent; +use crate::logging::{EventQueue, LogVariant, SharedLoggingState, TimelyLog}; use crate::typedefs::{KeysValsHandle, RowSpine}; /// Constructs the logging dataflow for timely logs. @@ -50,6 +53,8 @@ pub(super) fn construct( worker: &mut timely::worker::Worker, config: &LoggingConfig, event_queue: EventQueue, + shared_state: Rc>, + compute_state: &ComputeState, ) -> BTreeMap)> { let logging_interval_ms = std::cmp::max(1, config.interval.as_millis()); let worker_id = worker.index(); @@ -123,6 +128,7 @@ pub(super) fn construct( DemuxHandler { state: &mut demux_state, + shared_state: &mut shared_state.borrow_mut(), output: &mut output_buffers, logging_interval_ms, peers, @@ -140,11 +146,11 @@ pub(super) fn construct( // updates that reach `Row` encoding. let operates = operates .as_collection() - .arrange_core::<_, RowSpine<_, _, _, _>>( + .mz_arrange_core::<_, RowSpine<_, _, _, _>>( Exchange::new(move |_| u64::cast_from(worker_id)), "PreArrange Timely operates", ) - .as_collection(move |(id, name), _| { + .as_collection(move |id, name| { Row::pack_slice(&[ Datum::UInt64(u64::cast_from(*id)), Datum::UInt64(u64::cast_from(worker_id)), @@ -153,7 +159,7 @@ pub(super) fn construct( }); let channels = channels .as_collection() - .arrange_core::<_, RowSpine<_, _, _, _>>( + .mz_arrange_core::<_, RowSpine<_, _, _, _>>( Exchange::new(move |_| u64::cast_from(worker_id)), "PreArrange Timely operates", ) @@ -171,14 +177,14 @@ pub(super) fn construct( }); let addresses = addresses .as_collection() - .arrange_core::<_, RowSpine<_, _, _, _>>( + .mz_arrange_core::<_, RowSpine<_, _, _, _>>( Exchange::new(move |_| u64::cast_from(worker_id)), "PreArrange Timely addresses", ) - .as_collection(move |(id, address), _| create_address_row(*id, address, worker_id)); + .as_collection(move |id, address| create_address_row(*id, address, worker_id)); let parks = parks .as_collection() - .arrange_core::<_, RowSpine<_, _, _, _>>( + .mz_arrange_core::<_, RowSpine<_, _, _, _>>( Exchange::new(move |_| u64::cast_from(worker_id)), "PreArrange Timely parks", ) @@ -194,7 +200,7 @@ pub(super) fn construct( }); let messages_sent = messages_sent .as_collection() - .arrange_core::<_, RowSpine<_, _, _, _>>( + .mz_arrange_core::<_, RowSpine<_, _, _, _>>( Exchange::new(move |_| u64::cast_from(worker_id)), "PreArrange Timely messages sent", ) @@ -207,7 +213,7 @@ pub(super) fn construct( }); let messages_received = messages_received .as_collection() - .arrange_core::<_, RowSpine<_, _, _, _>>( + .mz_arrange_core::<_, RowSpine<_, _, _, _>>( Exchange::new(move |_| u64::cast_from(worker_id)), "PreArrange Timely messages received", ) @@ -220,7 +226,7 @@ pub(super) fn construct( }); let elapsed = schedules_duration .as_collection() - .arrange_core::<_, RowSpine<_, _, _, _>>( + .mz_arrange_core::<_, RowSpine<_, _, _, _>>( Exchange::new(move |_| u64::cast_from(worker_id)), "PreArrange Timely duration", ) @@ -232,7 +238,7 @@ pub(super) fn construct( }); let histogram = schedules_histogram .as_collection() - .arrange_core::<_, RowSpine<_, _, _, _>>( + .mz_arrange_core::<_, RowSpine<_, _, _, _>>( Exchange::new(move |_| u64::cast_from(worker_id)), "PreArrange Timely histogram", ) @@ -245,23 +251,22 @@ pub(super) fn construct( row }); + use TimelyLog::*; let logs = [ - (LogVariant::Timely(TimelyLog::Operates), operates), - (LogVariant::Timely(TimelyLog::Channels), channels), - (LogVariant::Timely(TimelyLog::Elapsed), elapsed), - (LogVariant::Timely(TimelyLog::Histogram), histogram), - (LogVariant::Timely(TimelyLog::Addresses), addresses), - (LogVariant::Timely(TimelyLog::Parks), parks), - (LogVariant::Timely(TimelyLog::MessagesSent), messages_sent), - ( - LogVariant::Timely(TimelyLog::MessagesReceived), - messages_received, - ), + (Operates, operates), + (Channels, channels), + (Elapsed, elapsed), + (Histogram, histogram), + (Addresses, addresses), + (Parks, parks), + (MessagesSent, messages_sent), + (MessagesReceived, messages_received), ]; // Build the output arrangements. let mut traces = BTreeMap::new(); for (variant, collection) in logs { + let variant = LogVariant::Timely(variant); if config.index_logs.contains_key(&variant) { let key = variant.index_by(); let (_, value) = permutation_for_arrangement( @@ -271,7 +276,7 @@ pub(super) fn construct( .collect::>(), variant.desc().arity(), ); - let trace = collection + let arranged = collection .map({ let mut row_buf = Row::default(); let mut datums = DatumVec::new(); @@ -284,8 +289,10 @@ pub(super) fn construct( (row_key, row_val) } }) - .arrange_named::>(&format!("ArrangeByKey {:?}", variant)) - .trace; + .mz_arrange::>(&format!("ArrangeByKey {:?}", variant)); + // Safety: We're exporting the trace. + let trace = + unsafe { arranged.trace(compute_state.enable_arrangement_size_logging) }; traces.insert(variant.clone(), (trace, Rc::clone(&token))); } } @@ -354,13 +361,13 @@ type OutputBuffer<'a, 'b, D> = ConsolidateBuffer<'a, 'b, Timestamp, D, Diff, Pus // wouldn't be an issue. struct DemuxOutput<'a, 'b> { operates: OutputBuffer<'a, 'b, (usize, String)>, - channels: OutputBuffer<'a, 'b, ChannelDatum>, + channels: OutputBuffer<'a, 'b, (ChannelDatum, ())>, addresses: OutputBuffer<'a, 'b, (usize, Vec)>, - parks: OutputBuffer<'a, 'b, ParkDatum>, - messages_sent: OutputBuffer<'a, 'b, MessageDatum>, - messages_received: OutputBuffer<'a, 'b, MessageDatum>, - schedules_duration: OutputBuffer<'a, 'b, usize>, - schedules_histogram: OutputBuffer<'a, 'b, ScheduleHistogramDatum>, + parks: OutputBuffer<'a, 'b, (ParkDatum, ())>, + messages_sent: OutputBuffer<'a, 'b, (MessageDatum, ())>, + messages_received: OutputBuffer<'a, 'b, (MessageDatum, ())>, + schedules_duration: OutputBuffer<'a, 'b, (usize, ())>, + schedules_histogram: OutputBuffer<'a, 'b, (ScheduleHistogramDatum, ())>, } #[derive(Clone, Debug, PartialEq, Eq, PartialOrd, Ord, Hash, Serialize, Deserialize)] @@ -408,6 +415,8 @@ impl Columnation for ScheduleHistogramDatum { struct DemuxHandler<'a, 'b, 'c> { /// State kept by the demux operator. state: &'a mut DemuxState, + /// State shared across log receivers. + shared_state: &'a mut SharedLoggingState, /// Demux output buffers. output: &'a mut DemuxOutput<'b, 'c>, /// The logging interval specifying the time granularity for the updates. @@ -463,15 +472,15 @@ impl DemuxHandler<'_, '_, '_> { source: event.source, target: event.target, }; - self.output.channels.give(self.cap, (datum, ts, 1)); + self.output.channels.give(self.cap, ((datum, ()), ts, 1)); let datum = (event.id, event.scope_addr.clone()); self.output.addresses.give(self.cap, (datum, ts, 1)); - let dataflow_id = event.scope_addr[0]; + let dataflow_index = event.scope_addr[0]; self.state .dataflow_channels - .entry(dataflow_id) + .entry(dataflow_index) .or_default() .push(event); } @@ -501,7 +510,7 @@ impl DemuxHandler<'_, '_, '_> { { self.output .schedules_duration - .give(self.cap, (event.id, ts, -elapsed_ns)); + .give(self.cap, ((event.id, ()), ts, -elapsed_ns)); let datum = ScheduleHistogramDatum { operator: event.id, @@ -510,23 +519,27 @@ impl DemuxHandler<'_, '_, '_> { let diff = Diff::cast_from(-count); self.output .schedules_histogram - .give(self.cap, (datum, ts, diff)); + .give(self.cap, ((datum, ()), ts, diff)); } } if operator.addr.len() == 1 { - let dataflow_id = operator.addr[0]; - self.handle_dataflow_shutdown(dataflow_id); + let dataflow_index = operator.addr[0]; + self.handle_dataflow_shutdown(dataflow_index); } let datum = (operator.id, operator.addr); self.output.addresses.give(self.cap, (datum, ts, -1)); } - fn handle_dataflow_shutdown(&mut self, dataflow_id: usize) { - // When a dataflow shuts down, we need to retract all its channels. + fn handle_dataflow_shutdown(&mut self, dataflow_index: usize) { + // Notify compute logging about the shutdown. + if let Some(logger) = &self.shared_state.compute_logger { + logger.log(ComputeEvent::DataflowShutdown { dataflow_index }); + } - let Some(channels) = self.state.dataflow_channels.remove(&dataflow_id) else { + // When a dataflow shuts down, we need to retract all its channels. + let Some(channels) = self.state.dataflow_channels.remove(&dataflow_index) else { return; }; @@ -538,7 +551,7 @@ impl DemuxHandler<'_, '_, '_> { source: channel.source, target: channel.target, }; - self.output.channels.give(self.cap, (datum, ts, -1)); + self.output.channels.give(self.cap, ((datum, ()), ts, -1)); let datum = (channel.id, channel.scope_addr); self.output.addresses.give(self.cap, (datum, ts, -1)); @@ -552,7 +565,7 @@ impl DemuxHandler<'_, '_, '_> { }; self.output .messages_sent - .give(self.cap, (datum, ts, -count)); + .give(self.cap, ((datum, ()), ts, -count)); } } if let Some(received) = self.state.messages_received.remove(&channel.id) { @@ -563,7 +576,7 @@ impl DemuxHandler<'_, '_, '_> { }; self.output .messages_received - .give(self.cap, (datum, ts, -count)); + .give(self.cap, ((datum, ()), ts, -count)); } } } @@ -596,7 +609,7 @@ impl DemuxHandler<'_, '_, '_> { duration_pow, requested_pow, }; - self.output.parks.give(self.cap, (datum, ts, 1)); + self.output.parks.give(self.cap, ((datum, ()), ts, 1)); } } } @@ -610,7 +623,9 @@ impl DemuxHandler<'_, '_, '_> { channel: event.channel, worker: event.target, }; - self.output.messages_sent.give(self.cap, (datum, ts, count)); + self.output + .messages_sent + .give(self.cap, ((datum, ()), ts, count)); let sent_counts = self .state @@ -625,7 +640,7 @@ impl DemuxHandler<'_, '_, '_> { }; self.output .messages_received - .give(self.cap, (datum, ts, count)); + .give(self.cap, ((datum, ()), ts, count)); let received_counts = self .state @@ -658,7 +673,7 @@ impl DemuxHandler<'_, '_, '_> { let datum = event.id; self.output .schedules_duration - .give(self.cap, (datum, ts, elapsed_diff)); + .give(self.cap, ((datum, ()), ts, elapsed_diff)); let datum = ScheduleHistogramDatum { operator: event.id, @@ -666,7 +681,7 @@ impl DemuxHandler<'_, '_, '_> { }; self.output .schedules_histogram - .give(self.cap, (datum, ts, 1)); + .give(self.cap, ((datum, ()), ts, 1)); // Record count and elapsed time for later retraction. let index = usize::cast_from(elapsed_pow.trailing_zeros()); diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index 1f2bdffdc609..69df925b8e36 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -14,7 +14,6 @@ use std::collections::BTreeMap; use std::rc::Weak; use differential_dataflow::lattice::Lattice; -use differential_dataflow::operators::arrange::{Arrange, Arranged}; use differential_dataflow::trace::wrappers::enter::TraceEnter; use differential_dataflow::trace::wrappers::frontier::TraceFrontier; use differential_dataflow::trace::{BatchReader, Cursor, TraceReader}; @@ -36,21 +35,22 @@ use timely::dataflow::{Scope, ScopeParent}; use timely::progress::timestamp::Refines; use timely::progress::{Antichain, Timestamp}; +use crate::extensions::arrange::{KeyCollection, MzArrange, MzArranged}; use crate::render::errors::ErrorLogger; use crate::render::join::LinearJoinImpl; use crate::typedefs::{ErrSpine, RowSpine, TraceErrHandle, TraceRowHandle}; // Local type definition to avoid the horror in signatures. pub(crate) type KeyArrangement = - Arranged::Timestamp, Diff>>; + MzArranged::Timestamp, Diff>>; pub(crate) type Arrangement = KeyArrangement; pub(crate) type ErrArrangement = - Arranged::Timestamp, Diff>>; -pub(crate) type ArrangementImport = Arranged< + MzArranged::Timestamp, Diff>>; +pub(crate) type ArrangementImport = MzArranged< S, TraceEnter>, ::Timestamp>, >; -pub(crate) type ErrArrangementImport = Arranged< +pub(crate) type ErrArrangementImport = MzArranged< S, TraceEnter< TraceFrontier>, @@ -95,6 +95,7 @@ where pub(super) shutdown_token: ShutdownToken, /// The implementation to use for rendering linear joins. pub(super) linear_join_impl: LinearJoinImpl, + pub(super) enable_arrangement_size_logging: bool, } impl Context @@ -122,6 +123,7 @@ where bindings: BTreeMap::new(), shutdown_token: Default::default(), linear_join_impl: Default::default(), + enable_arrangement_size_logging: Default::default(), } } } @@ -327,8 +329,8 @@ where /// The scope containing the collection bundle. pub fn scope(&self) -> S { match self { - ArrangementFlavor::Local(oks, _errs) => oks.stream.scope(), - ArrangementFlavor::Trace(_gid, oks, _errs) => oks.stream.scope(), + ArrangementFlavor::Local(oks, _errs) => oks.stream().scope(), + ArrangementFlavor::Trace(_gid, oks, _errs) => oks.stream().scope(), } } @@ -568,7 +570,7 @@ where /// The function presents the contents of the trace as `(key, value, time, delta)` tuples, /// where key and value are rows. fn flat_map_core( - trace: &Arranged, + trace: &MzArranged, key: Option, mut logic: L, refuel: usize, @@ -590,10 +592,10 @@ where let mode = if key.is_some() { "index" } else { "scan" }; let name = format!("ArrangementFlatMap({})", mode); use timely::dataflow::operators::Operator; - trace.stream.unary(Pipeline, &name, move |_, info| { + trace.stream().unary(Pipeline, &name, move |_, info| { // Acquire an activator to reschedule the operator when it has unfinished work. use timely::scheduling::Activator; - let activations = trace.stream.scope().activations(); + let activations = trace.stream().scope().activations(); let activator = Activator::new(&info.address[..], activations); // Maintain a list of work to do, cursor to navigate and process. let mut todo = std::collections::VecDeque::new(); @@ -781,10 +783,9 @@ where Ok::<(Row, Row), DataflowError>((key_row, val_row)) }); - let oks = oks_keyed.arrange_named::>(&name); - let errs = errs - .concat(&errs_keyed) - .arrange_named::>(&format!("{}-errors", name)); + let oks = oks_keyed.mz_arrange::>(&name); + let errs: KeyCollection<_, _, _> = errs.concat(&errs_keyed).into(); + let errs = errs.mz_arrange::>(&format!("{}-errors", name)); self.arranged .insert(key, ArrangementFlavor::Local(oks, errs)); } diff --git a/src/compute/src/render/join/delta_join.rs b/src/compute/src/render/join/delta_join.rs index 140166757a28..cd0d5601c884 100644 --- a/src/compute/src/render/join/delta_join.rs +++ b/src/compute/src/render/join/delta_join.rs @@ -15,7 +15,6 @@ use std::collections::{BTreeMap, BTreeSet}; -use differential_dataflow::operators::arrange::Arranged; use differential_dataflow::trace::{BatchReader, Cursor, TraceReader}; use differential_dataflow::{AsCollection, Collection}; use mz_compute_client::plan::join::delta_join::{DeltaJoinPlan, DeltaPathPlan, DeltaStagePlan}; @@ -30,6 +29,7 @@ use timely::dataflow::Scope; use timely::progress::timestamp::Refines; use timely::progress::Antichain; +use crate::extensions::arrange::{ArrangementSize, MzArranged}; use crate::render::context::{ArrangementFlavor, CollectionBundle, Context, ShutdownToken}; use crate::render::RenderTimestamp; @@ -203,6 +203,7 @@ where |t1, t2| t1.le(t2), closure, self.shutdown_token.clone(), + self.enable_arrangement_size_logging, ) } else { build_halfjoin( @@ -213,6 +214,7 @@ where |t1, t2| t1.lt(t2), closure, self.shutdown_token.clone(), + self.enable_arrangement_size_logging, ) } } @@ -226,6 +228,7 @@ where |t1, t2| t1.le(t2), closure, self.shutdown_token.clone(), + self.enable_arrangement_size_logging, ) } else { build_halfjoin( @@ -236,6 +239,7 @@ where |t1, t2| t1.lt(t2), closure, self.shutdown_token.clone(), + self.enable_arrangement_size_logging, ) } } @@ -311,12 +315,13 @@ where /// to ensure that any two updates are matched at most once. fn build_halfjoin( updates: Collection, - trace: Arranged, + trace: MzArranged, prev_key: Vec, prev_thinning: Vec, comparison: CF, closure: JoinClosure, shutdown_token: ShutdownToken, + enable_arrangement_size_logging: bool, ) -> ( Collection, Collection, @@ -326,6 +331,7 @@ where G::Timestamp: crate::render::RenderTimestamp, Tr: TraceReader, client_rx: crossbeam_channel::Receiver<( @@ -175,7 +175,7 @@ impl mz_cluster::types::AsRunnableWorker for Co } } -impl<'w, A: Allocate> Worker<'w, A> { +impl<'w, A: Allocate + 'static> Worker<'w, A> { /// Waits for client connections and runs them to completion. pub fn run(&mut self) { let mut shutdown = false; @@ -414,6 +414,7 @@ impl<'w, A: Allocate> Worker<'w, A> { linear_join_impl: Default::default(), metrics: self.compute_metrics.clone(), tracing_handle: Arc::clone(&self.tracing_handle), + enable_arrangement_size_logging: Default::default(), }); } _ => (), diff --git a/src/compute/src/typedefs.rs b/src/compute/src/typedefs.rs index 5b6f90a062f3..d899bf3f330b 100644 --- a/src/compute/src/typedefs.rs +++ b/src/compute/src/typedefs.rs @@ -12,16 +12,14 @@ #![allow(missing_docs)] use differential_dataflow::operators::arrange::TraceAgent; -use differential_dataflow::trace::implementations::ord::{ - ColKeySpine, ColValSpine, OrdKeySpine, OrdValSpine, -}; +use differential_dataflow::trace::implementations::ord::{ColKeySpine, ColValSpine}; use mz_repr::{Diff, Row, Timestamp}; use mz_storage_client::types::errors::DataflowError; pub type RowSpine = ColValSpine; pub type RowKeySpine = ColKeySpine; -pub type ErrSpine = OrdKeySpine; -pub type ErrValSpine = OrdValSpine; +pub type ErrSpine = ColKeySpine; +pub type ErrValSpine = ColValSpine; pub type TraceRowHandle = TraceAgent>; pub type TraceErrHandle = TraceAgent>; pub type KeysValsHandle = TraceRowHandle; diff --git a/src/environmentd/src/http/static/js/hierarchical-memory.jsx b/src/environmentd/src/http/static/js/hierarchical-memory.jsx index d0bc2b1cb042..40967f612376 100644 --- a/src/environmentd/src/http/static/js/hierarchical-memory.jsx +++ b/src/environmentd/src/http/static/js/hierarchical-memory.jsx @@ -159,7 +159,7 @@ function Dataflows(props) { ON channels.id = counts.channel_id; SELECT - operator_id as id, records + operator_id as id, records, size FROM mz_internal.mz_arrangement_sizes; `); @@ -183,7 +183,10 @@ function Dataflows(props) { ); setChans(chans); - setRecords(Object.fromEntries(records_table.rows)); + const records = Object.fromEntries( + records_table.rows.map(([id, records, size]) => [id, [records, size]]) + ) + setRecords(records); try { const view = await getCreateView(stats.name); @@ -217,7 +220,10 @@ function Dataflows(props) { const id_to_addr = Object.fromEntries(Object.entries(addrs).map(([id, addr]) => [id, addr])); const id_to_name = Object.fromEntries(Object.entries(opers).map(([id, name]) => [id, name])); const addr_to_id = Object.fromEntries(Object.entries(opers).map(([id, name]) => [addrStr(id_to_addr[id]), id])); - const max_record_count = Math.max.apply(Math, Object.values(records)); + const max_record_count = Math.max.apply( + Math, + Object.values(records).map(([records, size]) => records) + ); // Map scopes to children. const scope_children = new Map(); @@ -284,9 +290,10 @@ function Dataflows(props) { } else { let my_records = records["".concat(id)]; if (my_records != null) { - return `${id} [label= "${id} : ${name} \n\t records : ${my_records}",style=filled,color=red,fillcolor="#ffbbbb"]`; + let my_size = Math.ceil(my_records[1]/1024); + return `${id} [label= "${id} : ${name}\nrecords: ${my_records[0]}, ${my_size} KiB",style=filled,color=red,fillcolor="#ffbbbb",shape=box]`; } else { - return `${id} [label="${id} : ${name}"]`; + return `${id} [label="${id} : ${name}",shape=box]`; } } } else { diff --git a/src/environmentd/src/http/static/js/memory.jsx b/src/environmentd/src/http/static/js/memory.jsx index 62ceec6904aa..a201d5721e29 100644 --- a/src/environmentd/src/http/static/js/memory.jsx +++ b/src/environmentd/src/http/static/js/memory.jsx @@ -164,7 +164,7 @@ function Views(props) { SET cluster = ${formatNameForQuery(props.clusterName)}; SET cluster_replica = ${formatNameForQuery(props.replicaName)}; SELECT - id, name, records + id, name, records, size, capacity, allocations FROM mz_internal.mz_records_per_dataflow ${whereFragment} @@ -215,6 +215,9 @@ function Views(props) { dataflow id index name records + size [KiB] + capacity [KiB] + allocations @@ -230,6 +233,9 @@ function Views(props) { {v[1]} {v[2]} + {Math.round(v[3]/1024)} + {Math.round(v[4]/1024)} + {v[5]} ))} @@ -271,7 +277,7 @@ function View(props) { SET cluster = ${formatNameForQuery(props.clusterName)}; SET cluster_replica = ${formatNameForQuery(props.replicaName)}; SELECT - name, records + name, records, size FROM mz_internal.mz_records_per_dataflow WHERE @@ -335,7 +341,7 @@ function View(props) { ); SELECT - id, records + id, records, size FROM mz_internal.mz_records_per_dataflow_operator WHERE @@ -348,6 +354,7 @@ function View(props) { const stats = { name: stats_row[0], records: stats_row[1], + size: stats_row[2], }; setStats(stats); @@ -370,7 +377,10 @@ function View(props) { ); setChans(chans); - setRecords(Object.fromEntries(records_table.rows)); + const records = Object.fromEntries( + records_table.rows.map(([id, records, size]) => [id, [records, size]]) + ) + setRecords(records); setElapsed(Object.fromEntries(elapsed_table.rows)); @@ -399,7 +409,10 @@ function View(props) { const lookup = Object.fromEntries( Object.entries(addrs).map(([id, addr]) => [addrStr(addr), id]) ); - const max_record_count = Math.max.apply(Math, Object.values(records)); + const max_record_count = Math.max.apply( + Math, + Object.values(records).map(([records, size]) => records) + ); const scopes = {}; // Find all the scopes. Object.entries(opers).forEach(([id, name]) => { @@ -452,13 +465,14 @@ function View(props) { const notes = [`id: ${id}`]; let style = ''; if (id in records) { - const record_count = records[id]; + const record_count = records[id][0]; + const size = Math.ceil(records[id][1]/1024); // Any operator that can have records will have a red border (even if it // currently has 0 records). The fill color is a deeper red based on how many // records this operator has compared to the operator with the most records. - const pct = record_count ? Math.floor((record_count / max_record_count) * 0xff) : 0; + const pct = record_count ? Math.floor((record_count / max_record_count) * 0xa0) : 0; const alpha = pct.toString(16).padStart(2, '0'); - notes.push(`${record_count} records`); + notes.push(`${record_count} rows, ${size} KiB`); style = `,style=filled,color=red,fillcolor="#ff0000${alpha}"`; } // Only display elapsed time if it's more than 1s. @@ -469,7 +483,7 @@ function View(props) { if (name.length > maxLen + 3) { name = name.slice(0, maxLen) + '...'; } - return `_${id} [label="${name} (${notes.join(', ')})"${style}]`; + return `_${id} [label="${name}\n${notes.join(', ')}"${style},shape=box]`; }); oper_labels.unshift(''); clusters.unshift(''); diff --git a/src/sql/src/session/vars.rs b/src/sql/src/session/vars.rs index 32300d14569c..1fae462a3b90 100644 --- a/src/sql/src/session/vars.rs +++ b/src/sql/src/session/vars.rs @@ -1197,6 +1197,7 @@ feature_flags!( enable_try_parse_monotonic_iso8601_timestamp, "the try_parse_monotonic_iso8601_timestamp function" ), + (enable_arrangement_size_logging, "arrangement size logging") ); /// Represents the input to a variable. @@ -3706,6 +3707,7 @@ pub fn is_tracing_var(name: &str) -> bool { pub fn is_compute_config_var(name: &str) -> bool { name == MAX_RESULT_SIZE.name() || name == DATAFLOW_MAX_INFLIGHT_BYTES.name() + || name == ENABLE_ARRANGEMENT_SIZE_LOGGING.name() || name == ENABLE_MZ_JOIN_CORE.name() || is_persist_config_var(name) || is_tracing_var(name) diff --git a/src/storage-client/src/types/errors.rs b/src/storage-client/src/types/errors.rs index b4b5a8e73bad..6c7ca5a86ef6 100644 --- a/src/storage-client/src/types/errors.rs +++ b/src/storage-client/src/types/errors.rs @@ -411,6 +411,26 @@ pub enum DataflowError { impl Error for DataflowError {} +mod columnation { + use crate::types::errors::DataflowError; + use timely::container::columnation::{CloneRegion, Columnation}; + + impl Columnation for DataflowError { + // Discussion of `Region` for `DataflowError`: Although `DataflowError` contains pointers, + // we treat it as a type that doesn't and can simply be cloned. The reason for this is + // threefold: + // 1. Cloning the type does not violate correctness. It comes with the disadvantage that its + // `heap_size` will not be accurate. + // 2. It is hard to implement a region allocator for `DataflowError`, because it contains + // many pointers across various enum types. Some are boxed, and it contains a box to + // itself, meaning we have to pass the outer region inwards to avoid creating recursive + // regions. + // 3. We accept the performance implication of not storing the errors in a region allocator, + // which should be similar to storing errors in vectors on the heap. + type InnerRegion = CloneRegion; + } +} + impl RustType for DataflowError { fn into_proto(&self) -> ProtoDataflowError { use proto_dataflow_error::Kind::*; diff --git a/src/storage/src/render/sinks.rs b/src/storage/src/render/sinks.rs index 644de2a78288..6acdd58aa606 100644 --- a/src/storage/src/render/sinks.rs +++ b/src/storage/src/render/sinks.rs @@ -174,6 +174,9 @@ where // (As part of doing so, it asserts that there are not multiple conflicting values at the same timestamp) let collection = match sink.envelope { Some(SinkEnvelope::Debezium) => { + // Allow access to `arrange_named` because we cannot access Mz's wrapper from here. + // TODO(#17413): Revisit with cluster unification. + #[allow(clippy::disallowed_methods)] let combined = combine_at_timestamp( keyed.arrange_named::>("Arrange Debezium"), ); @@ -204,6 +207,9 @@ where collection } Some(SinkEnvelope::Upsert) => { + // Allow access to `arrange_named` because we cannot access Mz's wrapper from here. + // TODO(#17413): Revisit with cluster unification. + #[allow(clippy::disallowed_methods)] let combined = combine_at_timestamp( keyed.arrange_named::>("Arrange Upsert"), ); diff --git a/src/timely-util/src/lib.rs b/src/timely-util/src/lib.rs index 2acd81ea57db..feac1e42ca12 100644 --- a/src/timely-util/src/lib.rs +++ b/src/timely-util/src/lib.rs @@ -93,5 +93,4 @@ pub mod pact; pub mod panic; pub mod probe; pub mod progress; -pub mod reduce; pub mod replay; diff --git a/src/timely-util/src/operator.rs b/src/timely-util/src/operator.rs index 5ce8e8ed8482..10907b370ecc 100644 --- a/src/timely-util/src/operator.rs +++ b/src/timely-util/src/operator.rs @@ -578,6 +578,8 @@ where data.hash(&mut h); h.finish() }); + // Access to `arrange_core` is OK because we specify the trace and don't hold on to it. + #[allow(clippy::disallowed_methods)] self.arrange_core::<_, Tr>(exchange, name) .as_collection(|k, _v| k.clone()) } else { diff --git a/test/sqllogictest/autogenerated/mz_internal.slt b/test/sqllogictest/autogenerated/mz_internal.slt index d00d728919b9..a73a7bdf4229 100644 --- a/test/sqllogictest/autogenerated/mz_internal.slt +++ b/test/sqllogictest/autogenerated/mz_internal.slt @@ -219,6 +219,9 @@ SELECT position, name, type FROM objects WHERE schema = 'mz_internal' AND object 1 operator_id uint8 2 records numeric 3 batches numeric +4 size numeric +5 capacity numeric +6 allocations numeric query ITT SELECT position, name, type FROM objects WHERE schema = 'mz_internal' AND object = 'mz_compute_delays_histogram' ORDER BY position @@ -272,6 +275,17 @@ SELECT position, name, type FROM objects WHERE schema = 'mz_internal' AND object 1 id uint8 2 address list +query ITT +SELECT position, name, type FROM objects WHERE schema = 'mz_internal' AND object = 'mz_dataflow_arrangement_sizes' ORDER BY position +---- +1 id uint8 +2 name text +3 records numeric +4 batches numeric +5 size numeric +6 capacity numeric +7 allocations numeric + query ITT SELECT position, name, type FROM objects WHERE schema = 'mz_internal' AND object = 'mz_dataflow_channels' ORDER BY position ---- @@ -311,12 +325,10 @@ SELECT position, name, type FROM objects WHERE schema = 'mz_internal' AND object 2 parent_id uint8 query ITT -SELECT position, name, type FROM objects WHERE schema = 'mz_internal' AND object = 'mz_dataflow_arrangement_sizes' ORDER BY position +SELECT position, name, type FROM objects WHERE schema = 'mz_internal' AND object = 'mz_dataflow_shutdown_durations_histogram' ORDER BY position ---- -1 id uint8 -2 name text -3 records numeric -4 batches numeric +1 duration_ns uint8 +2 count numeric query ITT SELECT position, name, type FROM objects WHERE schema = 'mz_internal' AND object = 'mz_message_counts' ORDER BY position @@ -337,6 +349,9 @@ SELECT position, name, type FROM objects WHERE schema = 'mz_internal' AND object 1 id uint8 2 name text 3 records numeric +4 size numeric +5 capacity numeric +6 allocations numeric query ITT SELECT position, name, type FROM objects WHERE schema = 'mz_internal' AND object = 'mz_records_per_dataflow_operator' ORDER BY position @@ -345,6 +360,9 @@ SELECT position, name, type FROM objects WHERE schema = 'mz_internal' AND object 2 name text 3 dataflow_id uint8 4 records numeric +5 size numeric +6 capacity numeric +7 allocations numeric query ITT SELECT position, name, type FROM objects WHERE schema = 'mz_internal' AND object = 'mz_scheduling_elapsed' ORDER BY position @@ -366,6 +384,9 @@ mz_active_peeks mz_active_peeks_per_worker mz_aggregates mz_arrangement_batches_raw +mz_arrangement_heap_allocations_raw +mz_arrangement_heap_capacity_raw +mz_arrangement_heap_size_raw mz_arrangement_records_raw mz_arrangement_sharing mz_arrangement_sharing_per_worker @@ -410,6 +431,9 @@ mz_dataflow_operator_reachability_per_worker mz_dataflow_operator_reachability_raw mz_dataflow_operators mz_dataflow_operators_per_worker +mz_dataflow_shutdown_durations_histogram +mz_dataflow_shutdown_durations_histogram_per_worker +mz_dataflow_shutdown_durations_histogram_raw mz_dataflows mz_dataflows_per_worker mz_kafka_sources diff --git a/test/sqllogictest/cluster.slt b/test/sqllogictest/cluster.slt index c892f8840f4f..26010bf7e899 100644 --- a/test/sqllogictest/cluster.slt +++ b/test/sqllogictest/cluster.slt @@ -198,6 +198,12 @@ bar mz_active_peeks_per_worker mz_active_peeks_per_worker_u7_primary_idx 1 i bar mz_active_peeks_per_worker mz_active_peeks_per_worker_u7_primary_idx 2 worker_id NULL false bar mz_arrangement_batches_raw mz_arrangement_batches_raw_u7_primary_idx 1 operator_id NULL false bar mz_arrangement_batches_raw mz_arrangement_batches_raw_u7_primary_idx 2 worker_id NULL false +bar mz_arrangement_heap_allocations_raw mz_arrangement_heap_allocations_raw_u7_primary_idx 1 operator_id NULL false +bar mz_arrangement_heap_allocations_raw mz_arrangement_heap_allocations_raw_u7_primary_idx 2 worker_id NULL false +bar mz_arrangement_heap_capacity_raw mz_arrangement_heap_capacity_raw_u7_primary_idx 1 operator_id NULL false +bar mz_arrangement_heap_capacity_raw mz_arrangement_heap_capacity_raw_u7_primary_idx 2 worker_id NULL false +bar mz_arrangement_heap_size_raw mz_arrangement_heap_size_raw_u7_primary_idx 1 operator_id NULL false +bar mz_arrangement_heap_size_raw mz_arrangement_heap_size_raw_u7_primary_idx 2 worker_id NULL false bar mz_arrangement_records_raw mz_arrangement_records_raw_u7_primary_idx 1 operator_id NULL false bar mz_arrangement_records_raw mz_arrangement_records_raw_u7_primary_idx 2 worker_id NULL false bar mz_arrangement_sharing_raw mz_arrangement_sharing_raw_u7_primary_idx 1 operator_id NULL false @@ -232,6 +238,8 @@ bar mz_dataflow_operator_reachability_raw mz_dataflow_operator_reachability_ra bar mz_dataflow_operator_reachability_raw mz_dataflow_operator_reachability_raw_u7_primary_idx 5 time NULL true bar mz_dataflow_operators_per_worker mz_dataflow_operators_per_worker_u7_primary_idx 1 id NULL false bar mz_dataflow_operators_per_worker mz_dataflow_operators_per_worker_u7_primary_idx 2 worker_id NULL false +bar mz_dataflow_shutdown_durations_histogram_raw mz_dataflow_shutdown_durations_histogram_raw_u7_primary_idx 1 worker_id NULL false +bar mz_dataflow_shutdown_durations_histogram_raw mz_dataflow_shutdown_durations_histogram_raw_u7_primary_idx 2 duration_ns NULL false bar mz_message_counts_received_raw mz_message_counts_received_raw_u7_primary_idx 1 channel_id NULL false bar mz_message_counts_received_raw mz_message_counts_received_raw_u7_primary_idx 2 from_worker_id NULL false bar mz_message_counts_received_raw mz_message_counts_received_raw_u7_primary_idx 3 to_worker_id NULL false @@ -394,7 +402,7 @@ DROP CLUSTER foo, foo2, foo3, foo4 CASCADE query I SELECT COUNT(name) FROM mz_indexes WHERE cluster_id = 'u1'; ---- -19 +23 query I SELECT COUNT(name) FROM mz_indexes WHERE cluster_id <> 'u1' AND cluster_id NOT LIKE 's%'; @@ -407,7 +415,7 @@ CREATE CLUSTER test REPLICAS (foo (SIZE '1')); query I SELECT COUNT(name) FROM mz_indexes; ---- -100 +116 statement ok DROP CLUSTER test CASCADE @@ -415,7 +423,7 @@ DROP CLUSTER test CASCADE query T SELECT COUNT(name) FROM mz_indexes; ---- -81 +93 statement error nvalid SIZE: must provide a string value CREATE CLUSTER REPLICA default.size_1 SIZE; diff --git a/test/sqllogictest/information_schema_tables.slt b/test/sqllogictest/information_schema_tables.slt index e3a962ad4841..524c1161a4f2 100644 --- a/test/sqllogictest/information_schema_tables.slt +++ b/test/sqllogictest/information_schema_tables.slt @@ -217,6 +217,18 @@ mz_arrangement_batches_raw SOURCE materialize mz_internal +mz_arrangement_heap_allocations_raw +SOURCE +materialize +mz_internal +mz_arrangement_heap_capacity_raw +SOURCE +materialize +mz_internal +mz_arrangement_heap_size_raw +SOURCE +materialize +mz_internal mz_arrangement_records_raw SOURCE materialize @@ -393,6 +405,18 @@ mz_dataflow_operators_per_worker SOURCE materialize mz_internal +mz_dataflow_shutdown_durations_histogram +VIEW +materialize +mz_internal +mz_dataflow_shutdown_durations_histogram_per_worker +VIEW +materialize +mz_internal +mz_dataflow_shutdown_durations_histogram_raw +SOURCE +materialize +mz_internal mz_dataflows VIEW materialize diff --git a/test/testdrive/catalog.td b/test/testdrive/catalog.td index 9863364ec672..901d2879cfb5 100644 --- a/test/testdrive/catalog.td +++ b/test/testdrive/catalog.td @@ -519,6 +519,9 @@ mz_active_peeks_per_worker log mz_arrangement_batches_raw log mz_arrangement_records_raw log mz_arrangement_sharing_raw log +mz_arrangement_heap_size_raw log +mz_arrangement_heap_capacity_raw log +mz_arrangement_heap_allocations_raw log mz_compute_delays_histogram_raw log mz_compute_dependencies_per_worker log mz_compute_exports_per_worker log @@ -529,6 +532,7 @@ mz_dataflow_addresses_per_worker log mz_dataflow_channels_per_worker log mz_dataflow_operator_reachability_raw log mz_dataflow_operators_per_worker log +mz_dataflow_shutdown_durations_histogram_raw log mz_message_counts_received_raw log mz_message_counts_sent_raw log mz_peek_durations_histogram_raw log @@ -578,6 +582,7 @@ mz_compute_import_frontiers mz_compute_operator_durations_histogram mz_compute_operator_durations_histogram_per_worker mz_dataflow_addresses +mz_dataflow_arrangement_sizes mz_dataflow_channel_operators mz_dataflow_channel_operators_per_worker mz_dataflow_channels @@ -585,10 +590,11 @@ mz_dataflow_operator_dataflows mz_dataflow_operator_dataflows_per_worker mz_dataflow_operator_parents mz_dataflow_operator_parents_per_worker -mz_dataflow_arrangement_sizes mz_dataflow_operator_reachability mz_dataflow_operator_reachability_per_worker mz_dataflow_operators +mz_dataflow_shutdown_durations_histogram +mz_dataflow_shutdown_durations_histogram_per_worker mz_dataflows mz_dataflows_per_worker mz_message_counts @@ -627,7 +633,7 @@ test_table # There is one entry in mz_indexes for each field_number/expression of the index. > SELECT COUNT(id) FROM mz_indexes WHERE id LIKE 's%' -81 +93 # Create a second schema with the same table name as above > CREATE SCHEMA tester2 diff --git a/test/testdrive/divergent-dataflow-cancellation.td b/test/testdrive/divergent-dataflow-cancellation.td index 48571095ada8..04a817d1faef 100644 --- a/test/testdrive/divergent-dataflow-cancellation.td +++ b/test/testdrive/divergent-dataflow-cancellation.td @@ -93,7 +93,7 @@ contains: canceling statement due to statement timeout > SELECT count(*) FROM mz_internal.mz_compute_frontiers_per_worker WHERE worker_id = 0 -19 +23 > SELECT count(*) FROM mz_internal.mz_compute_import_frontiers_per_worker 0 @@ -113,6 +113,10 @@ contains: canceling statement due to statement timeout > SELECT count(*) FROM mz_internal.mz_dataflow_operators_per_worker 0 +# This source never sees retractions. +> SELECT count(*) > 0 FROM mz_internal.mz_dataflow_shutdown_durations_histogram_raw +true + > SELECT count(*) FROM mz_internal.mz_message_counts_received_raw 0 diff --git a/test/testdrive/indexes.td b/test/testdrive/indexes.td index 2e61450304a6..18e0a292499b 100644 --- a/test/testdrive/indexes.td +++ b/test/testdrive/indexes.td @@ -290,6 +290,9 @@ mz_active_peeks_per_worker_s2_primary_idx mz_active_peeks_per_ mz_arrangement_batches_raw_s2_primary_idx mz_arrangement_batches_raw mz_introspection {operator_id,worker_id} mz_arrangement_records_raw_s2_primary_idx mz_arrangement_records_raw mz_introspection {operator_id,worker_id} mz_arrangement_sharing_raw_s2_primary_idx mz_arrangement_sharing_raw mz_introspection {operator_id,worker_id} +mz_arrangement_heap_capacity_raw_s2_primary_idx mz_arrangement_heap_capacity_raw mz_introspection {operator_id,worker_id} +mz_arrangement_heap_allocations_raw_s2_primary_idx mz_arrangement_heap_allocations_raw mz_introspection {operator_id,worker_id} +mz_arrangement_heap_size_raw_s2_primary_idx mz_arrangement_heap_size_raw mz_introspection {operator_id,worker_id} mz_cluster_replica_metrics_ind mz_cluster_replica_metrics mz_introspection {replica_id} mz_cluster_replica_sizes_ind mz_cluster_replica_sizes mz_introspection {size} mz_cluster_replica_statuses_ind mz_cluster_replica_statuses mz_introspection {replica_id} @@ -305,6 +308,7 @@ mz_dataflow_addresses_per_worker_s2_primary_idx mz_dataflow_addresse mz_dataflow_channels_per_worker_s2_primary_idx mz_dataflow_channels_per_worker mz_introspection {id,worker_id} mz_dataflow_operator_reachability_raw_s2_primary_idx mz_dataflow_operator_reachability_raw mz_introspection {address,port,worker_id,update_type,time} mz_dataflow_operators_per_worker_s2_primary_idx mz_dataflow_operators_per_worker mz_introspection {id,worker_id} +mz_dataflow_shutdown_durations_histogram_raw_s2_primary_idx mz_dataflow_shutdown_durations_histogram_raw mz_introspection {worker_id,duration_ns} mz_message_counts_received_raw_s2_primary_idx mz_message_counts_received_raw mz_introspection {channel_id,from_worker_id,to_worker_id} mz_message_counts_sent_raw_s2_primary_idx mz_message_counts_sent_raw mz_introspection {channel_id,from_worker_id,to_worker_id} mz_peek_durations_histogram_raw_s2_primary_idx mz_peek_durations_histogram_raw mz_introspection {worker_id,duration_ns} diff --git a/test/testdrive/introspection-sources.td b/test/testdrive/introspection-sources.td index eeb8c1c647c7..e056df243697 100644 --- a/test/testdrive/introspection-sources.td +++ b/test/testdrive/introspection-sources.td @@ -278,13 +278,16 @@ mv1_primary_idx mv1 > CREATE VIEW vv_arr AS SELECT sum(a) FROM t JOIN t2 ON t.a = t2.b; +$ postgres-execute connection=postgres://mz_system:materialize@${testdrive.materialize-internal-sql-addr} +ALTER SYSTEM SET enable_arrangement_size_logging = true + > CREATE MATERIALIZED VIEW mv_arr AS SELECT * FROM vv_arr > CREATE DEFAULT INDEX ii_arr ON vv_arr -> SELECT records >= 300 FROM mz_internal.mz_dataflow_arrangement_sizes WHERE name='ii_arr' OR name='mv_arr' -true -true +> SELECT records >= 300, size >= 300*96, capacity >= 300*96 FROM mz_internal.mz_dataflow_arrangement_sizes WHERE name='ii_arr' OR name='mv_arr' +true true true +true true true # Test that non-arranging dataflows show up in `mz_dataflow_arrangement_sizes` @@ -292,5 +295,28 @@ true > CREATE DEFAULT INDEX ii_empty ON t3 -> SELECT records, batches FROM mz_internal.mz_dataflow_arrangement_sizes WHERE name='ii_empty' -0 0 +> SELECT records, batches, size < 1000, capacity < 1000, allocations < 1000 FROM mz_internal.mz_dataflow_arrangement_sizes WHERE name='ii_empty' +0 32 true true true + +# Tests that arrangement sizes are approximate + +> CREATE TABLE t4 (c int8) + +> CREATE INDEX ii_t4 ON t4(c) + +> SELECT records, batches, size < 1000, capacity < 1000, allocations < 1000 FROM mz_internal.mz_dataflow_arrangement_sizes WHERE name='ii_t4' +0 32 true true true + +> INSERT INTO t4 SELECT generate_series(1, 10) + +> SELECT records, batches, size > 1000 AND size < 2000, capacity > 1000, allocations > 0 FROM mz_internal.mz_dataflow_arrangement_sizes WHERE name='ii_t4' +10 32 true true true + +> INSERT INTO t4 SELECT generate_series(10, 1010) + +> SELECT records >= 1010 AND records <= 1011, batches > 0, size > 96*1010 AND size < 2*96*1010, capacity > 96*1010, allocations > 0 FROM mz_internal.mz_dataflow_arrangement_sizes WHERE name='ii_t4' +true true true true true + +> DROP INDEX ii_t4 + +> SELECT records, batches, size, capacity, allocations FROM mz_internal.mz_dataflow_arrangement_sizes WHERE name='ii_t4' diff --git a/test/testdrive/logging.td b/test/testdrive/logging.td index a36f17ffbf0c..a400c289772b 100644 --- a/test/testdrive/logging.td +++ b/test/testdrive/logging.td @@ -75,6 +75,9 @@ $ set-regex match=s\d+ replacement=SID > SELECT count(*) FROM (SELECT count (*) FROM mz_internal.mz_peek_durations_histogram); 1 +> SELECT count(*) FROM (SELECT count (*) FROM mz_internal.mz_dataflow_shutdown_durations_histogram); +1 + > SELECT count(*) FROM (SELECT count (*) FROM mz_internal.mz_records_per_dataflow); 1 @@ -170,6 +173,9 @@ SID operator_id 1 uint8 SID worker_id 2 uint8 SID records 3 bigint SID batches 4 bigint +SID size 5 bigint +SID capacity 6 bigint +SID allocations 7 bigint > SELECT mz_columns.id, mz_columns.name, position, type FROM mz_views JOIN mz_columns USING (id) diff --git a/test/testdrive/mz-arrangement-sharing.td b/test/testdrive/mz-arrangement-sharing.td index d7a262551f67..7665f13adb04 100644 --- a/test/testdrive/mz-arrangement-sharing.td +++ b/test/testdrive/mz-arrangement-sharing.td @@ -804,6 +804,9 @@ ALTER SYSTEM SET log_filter = 'debug' > SELECT mdo.name FROM mz_internal.mz_arrangement_sharing mash JOIN mz_internal.mz_dataflow_operators mdo ON mash.operator_id = mdo.id ORDER BY mdo.name; "Arrange Timely(Reachability)" +"ArrangeByKey Compute(ArrangementHeapAllocations)" +"ArrangeByKey Compute(ArrangementHeapCapacity)" +"ArrangeByKey Compute(ArrangementHeapSize)" "ArrangeByKey Compute(DataflowCurrent)" "ArrangeByKey Compute(DataflowDependency)" "ArrangeByKey Compute(FrontierCurrent)" @@ -811,6 +814,7 @@ ALTER SYSTEM SET log_filter = 'debug' "ArrangeByKey Compute(ImportFrontierCurrent)" "ArrangeByKey Compute(PeekCurrent)" "ArrangeByKey Compute(PeekDuration)" +"ArrangeByKey Compute(ShutdownDuration)" "ArrangeByKey Differential(ArrangementBatches)" "ArrangeByKey Differential(ArrangementRecords)" "ArrangeByKey Differential(Sharing)"