Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[optimizer] report per-transform metrics #30806

Merged
merged 9 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4019,7 +4019,10 @@ pub fn serve(

let metrics = Metrics::register_into(&metrics_registry);
let metrics_clone = metrics.clone();
let optimizer_metrics = OptimizerMetrics::register_into(&metrics_registry);
let optimizer_metrics = OptimizerMetrics::register_into(
&metrics_registry,
catalog.system_config().optimizer_e2e_latency_warning_threshold(),
);
let segment_client_clone = segment_client.clone();
let coord_now = now.clone();
let advance_timelines_interval = tokio::time::interval(catalog.config().timestamp_interval);
Expand Down
4 changes: 2 additions & 2 deletions src/adapter/src/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ fn optimize_mir_local(
ctx: &mut TransformCtx,
) -> Result<OptimizedMirRelationExpr, OptimizerError> {
#[allow(deprecated)]
let optimizer = mz_transform::Optimizer::logical_optimizer(ctx);
let mut optimizer = mz_transform::Optimizer::logical_optimizer(ctx);
let expr = optimizer.optimize(expr, ctx)?;

// Trace the result of this phase.
Expand All @@ -366,7 +366,7 @@ fn optimize_mir_constant(
expr: MirRelationExpr,
ctx: &mut TransformCtx,
) -> Result<MirRelationExpr, OptimizerError> {
let optimizer = mz_transform::Optimizer::constant_optimizer(ctx);
let mut optimizer = mz_transform::Optimizer::constant_optimizer(ctx);
let expr = optimizer.optimize(expr, ctx)?;

// Trace the result of this phase.
Expand Down
9 changes: 7 additions & 2 deletions src/adapter/src/optimize/copy_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,8 +160,12 @@ impl Optimize<HirRelationExpr> for Optimizer {

// MIR ⇒ MIR optimization (local)
let mut df_meta = DataflowMetainfo::default();
let mut transform_ctx =
TransformCtx::local(&self.config.features, &self.typecheck_ctx, &mut df_meta);
let mut transform_ctx = TransformCtx::local(
&self.config.features,
&self.typecheck_ctx,
&mut df_meta,
Some(&self.metrics),
);
let expr = optimize_mir_local(expr, &mut transform_ctx)?.into_inner();

self.duration += time.elapsed();
Expand Down Expand Up @@ -317,6 +321,7 @@ impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
&self.config.features,
&self.typecheck_ctx,
&mut df_meta,
Some(&self.metrics),
);
// Run global optimization.
mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/optimize/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ impl Optimize<Index> for Optimizer {
&self.config.features,
&self.typecheck_ctx,
&mut df_meta,
Some(&self.metrics),
);
// Run global optimization.
mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
Expand Down
9 changes: 7 additions & 2 deletions src/adapter/src/optimize/materialized_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,12 @@ impl Optimize<HirRelationExpr> for Optimizer {

// MIR ⇒ MIR optimization (local)
let mut df_meta = DataflowMetainfo::default();
let mut transform_ctx =
TransformCtx::local(&self.config.features, &self.typecheck_ctx, &mut df_meta);
let mut transform_ctx = TransformCtx::local(
&self.config.features,
&self.typecheck_ctx,
&mut df_meta,
Some(&self.metrics),
);
let expr = optimize_mir_local(expr, &mut transform_ctx)?.into_inner();

self.duration += time.elapsed();
Expand Down Expand Up @@ -282,6 +286,7 @@ impl Optimize<LocalMirPlan> for Optimizer {
&self.config.features,
&self.typecheck_ctx,
&mut df_meta,
Some(&self.metrics),
);
// Apply source monotonicity overrides.
for id in self.force_source_non_monotonic.iter() {
Expand Down
9 changes: 7 additions & 2 deletions src/adapter/src/optimize/peek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,12 @@ impl Optimize<HirRelationExpr> for Optimizer {

// MIR ⇒ MIR optimization (local)
let mut df_meta = DataflowMetainfo::default();
let mut transform_ctx =
TransformCtx::local(&self.config.features, &self.typecheck_ctx, &mut df_meta);
let mut transform_ctx = TransformCtx::local(
&self.config.features,
&self.typecheck_ctx,
&mut df_meta,
Some(&self.metrics),
);
let expr = optimize_mir_local(expr, &mut transform_ctx)?.into_inner();

self.duration += time.elapsed();
Expand Down Expand Up @@ -313,6 +317,7 @@ impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
&self.config.features,
&self.typecheck_ctx,
&mut df_meta,
Some(&self.metrics),
);

// Let's already try creating a fast path plan. If successful, we don't need to run the
Expand Down
9 changes: 7 additions & 2 deletions src/adapter/src/optimize/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,12 @@ impl Optimize<SubscribeFrom> for Optimizer {
// let expr = expr.lower(&self.config)?;

// MIR ⇒ MIR optimization (local)
let mut transform_ctx =
TransformCtx::local(&self.config.features, &self.typecheck_ctx, &mut df_meta);
let mut transform_ctx = TransformCtx::local(
&self.config.features,
&self.typecheck_ctx,
&mut df_meta,
Some(&self.metrics),
);
let expr = optimize_mir_local(expr, &mut transform_ctx)?;

df_builder.import_view_into_dataflow(
Expand Down Expand Up @@ -268,6 +272,7 @@ impl Optimize<SubscribeFrom> for Optimizer {
&self.config.features,
&self.typecheck_ctx,
&mut df_meta,
Some(&self.metrics),
);
// Run global optimization.
mz_transform::optimize_dataflow(&mut df_desc, &mut transform_ctx, false)?;
Expand Down
8 changes: 6 additions & 2 deletions src/adapter/src/optimize/view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,12 @@ impl Optimize<HirRelationExpr> for Optimizer {
let mut expr = expr.lower(&self.config, self.metrics.as_ref())?;

let mut df_meta = DataflowMetainfo::default();
let mut transform_ctx =
TransformCtx::local(&self.config.features, &self.typecheck_ctx, &mut df_meta);
let mut transform_ctx = TransformCtx::local(
&self.config.features,
&self.typecheck_ctx,
&mut df_meta,
self.metrics.as_ref(),
);

// First, we run a very simple optimizer pipeline, which only folds constants. This takes
// care of constant INSERTs. (This optimizer is also used for INSERTs, not just VIEWs.)
Expand Down
43 changes: 41 additions & 2 deletions src/sql/src/optimizer_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

//! Metrics collected by the optimizer.

use std::cell::RefCell;
use std::time::Duration;

use mz_ore::metric;
Expand All @@ -20,33 +21,55 @@ use prometheus::{HistogramVec, IntCounterVec};
#[derive(Debug, Clone)]
pub struct OptimizerMetrics {
e2e_optimization_time_seconds: HistogramVec,
e2e_optimization_time_seconds_log_threshold: Duration,
outer_join_lowering_cases: IntCounterVec,
transform_hits: IntCounterVec,
transform_total: IntCounterVec,
/// Local storage of transform times; these are emitted as part of the
/// log-line when end-to-end optimization times exceed the configured threshold.
transform_time_seconds: RefCell<std::collections::BTreeMap<String, Vec<Duration>>>,
}

impl OptimizerMetrics {
pub fn register_into(registry: &MetricsRegistry) -> Self {
pub fn register_into(
registry: &MetricsRegistry,
e2e_optimization_time_seconds_log_threshold: Duration,
) -> Self {
Self {
e2e_optimization_time_seconds: registry.register(metric!(
name: "mz_optimizer_e2e_optimization_time_seconds",
help: "A histogram of end-to-end optimization times since restart.",
var_labels: ["object_type"],
buckets: histogram_seconds_buckets(0.000_128, 8.0),
)),
e2e_optimization_time_seconds_log_threshold,
outer_join_lowering_cases: registry.register(metric!(
name: "outer_join_lowering_cases",
help: "How many times the different outer join lowering cases happened.",
var_labels: ["case"],
)),
transform_hits: registry.register(metric!(
name: "transform_hits",
help: "How many times a given transform changed the plan.",
var_labels: ["transform"],
)),
transform_total: registry.register(metric!(
name: "transform_total",
help: "How many times a given transform was applied.",
var_labels: ["transform"],
)),
transform_time_seconds: RefCell::new(std::collections::BTreeMap::new()),
}
}

pub fn observe_e2e_optimization_time(&self, object_type: &str, duration: Duration) {
self.e2e_optimization_time_seconds
.with_label_values(&[object_type])
.observe(duration.as_secs_f64());
if duration > Duration::from_millis(500) {
if duration > self.e2e_optimization_time_seconds_log_threshold {
tracing::warn!(
object_type = object_type,
transform_times = ?self.transform_time_seconds.borrow(),
duration = format!("{}ms", duration.as_millis()),
"optimizer took more than 500ms"
);
Expand All @@ -58,4 +81,20 @@ impl OptimizerMetrics {
.with_label_values(&[case])
.inc()
}

pub fn inc_transform(&self, hit: bool, transform: &str) {
self.transform_hits
.with_label_values(&[transform])
.inc_by(if hit { 1 } else { 0 });
mgree marked this conversation as resolved.
Show resolved Hide resolved
self.transform_total.with_label_values(&[transform]).inc();
}

pub fn observe_transform_time(&self, transform: &str, duration: Duration) {
let mut transform_time_seconds = self.transform_time_seconds.borrow_mut();
if let Some(times) = transform_time_seconds.get_mut(transform) {
times.push(duration);
} else {
transform_time_seconds.insert(transform.to_string(), vec![duration]);
}
}
}
5 changes: 5 additions & 0 deletions src/sql/src/session/vars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1215,6 +1215,7 @@ impl SystemVars {
&USER_STORAGE_MANAGED_COLLECTIONS_BATCH_DURATION,
&ENABLE_CREATE_TABLE_FROM_SOURCE,
&FORCE_SOURCE_TABLE_SYNTAX,
&OPTIMIZER_E2E_LATENCY_WARNING_THRESHOLD,
];

let dyncfgs = mz_dyncfgs::all_dyncfgs();
Expand Down Expand Up @@ -2196,6 +2197,10 @@ impl SystemVars {
*self.expect_value(&FORCE_SOURCE_TABLE_SYNTAX)
}

pub fn optimizer_e2e_latency_warning_threshold(&self) -> Duration {
*self.expect_value(&OPTIMIZER_E2E_LATENCY_WARNING_THRESHOLD)
}

/// Returns whether the named variable is a compute configuration parameter
/// (things that go in `ComputeParameters` and are sent to replicas via `UpdateConfiguration`
/// commands).
Expand Down
9 changes: 9 additions & 0 deletions src/sql/src/session/vars/definitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1539,6 +1539,15 @@ pub static FORCE_SOURCE_TABLE_SYNTAX: VarDefinition = VarDefinition::new(
true,
);

pub static OPTIMIZER_E2E_LATENCY_WARNING_THRESHOLD: VarDefinition = VarDefinition::new(
"optimizer_e2e_latency_warning_threshold",
value!(Duration; Duration::from_millis(500)),
"Sets the duration that a query can take to compile; queries that take longer \
will trigger a warning. If this value is specified without units, it is taken as \
milliseconds. A value of zero disables the timeout (Materialize).",
true,
);

/// Configuration for gRPC client connections.
pub mod grpc_client {
use super::*;
Expand Down
5 changes: 5 additions & 0 deletions src/transform/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ rust_library(
"//src/expr:mz_expr",
"//src/ore:mz_ore",
"//src/repr:mz_repr",
"//src/sql:mz_sql",
] + all_crate_deps(normal = True),
)

Expand Down Expand Up @@ -70,6 +71,7 @@ rust_test(
"//src/lowertest:mz_lowertest",
"//src/ore:mz_ore",
"//src/repr:mz_repr",
"//src/sql:mz_sql",
] + all_crate_deps(
normal = True,
normal_dev = True,
Expand All @@ -87,6 +89,7 @@ rust_doc_test(
"//src/lowertest:mz_lowertest",
"//src/ore:mz_ore",
"//src/repr:mz_repr",
"//src/sql:mz_sql",
] + all_crate_deps(
normal = True,
normal_dev = True,
Expand Down Expand Up @@ -124,6 +127,7 @@ rust_test(
"//src/lowertest:mz_lowertest",
"//src/ore:mz_ore",
"//src/repr:mz_repr",
"//src/sql:mz_sql",
] + all_crate_deps(
normal = True,
normal_dev = True,
Expand Down Expand Up @@ -161,6 +165,7 @@ rust_test(
"//src/lowertest:mz_lowertest",
"//src/ore:mz_ore",
"//src/repr:mz_repr",
"//src/sql:mz_sql",
] + all_crate_deps(
normal = True,
normal_dev = True,
Expand Down
1 change: 1 addition & 0 deletions src/transform/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ mz-compute-types = { path = "../compute-types" }
mz-expr = { path = "../expr" }
mz-ore = { path = "../ore" }
mz-repr = { path = "../repr", features = ["tracing_"] }
mz-sql = { path = "../sql" }
num-traits = "0.2"
ordered-float = { version = "4.2.0", features = ["serde"] }
paste = "1.0.11"
Expand Down
10 changes: 5 additions & 5 deletions src/transform/src/dataflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ pub fn optimize_dataflow(
if fast_path_optimizer {
optimize_dataflow_relations(
dataflow,
&Optimizer::fast_path_optimizer(transform_ctx),
&mut Optimizer::fast_path_optimizer(transform_ctx),
transform_ctx,
)?;
} else {
// Logical optimization pass after view inlining
optimize_dataflow_relations(
dataflow,
#[allow(deprecated)]
&Optimizer::logical_optimizer(transform_ctx),
&mut Optimizer::logical_optimizer(transform_ctx),
transform_ctx,
)?;

Expand All @@ -79,14 +79,14 @@ pub fn optimize_dataflow(
// pushed down across views.
optimize_dataflow_relations(
dataflow,
&Optimizer::logical_cleanup_pass(transform_ctx, false),
&mut Optimizer::logical_cleanup_pass(transform_ctx, false),
transform_ctx,
)?;

// Physical optimization pass
optimize_dataflow_relations(
dataflow,
&Optimizer::physical_optimizer(transform_ctx),
&mut Optimizer::physical_optimizer(transform_ctx),
transform_ctx,
)?;

Expand Down Expand Up @@ -217,7 +217,7 @@ fn inline_views(dataflow: &mut DataflowDesc) -> Result<(), TransformError> {
)]
fn optimize_dataflow_relations(
dataflow: &mut DataflowDesc,
optimizer: &Optimizer,
optimizer: &mut Optimizer,
mgree marked this conversation as resolved.
Show resolved Hide resolved
ctx: &mut TransformCtx,
) -> Result<(), TransformError> {
// Re-optimize each dataflow
Expand Down
Loading
Loading