From 58184b8047362a603283c672c4fce82adac0ca8b Mon Sep 17 00:00:00 2001 From: Siddhartha Sahu Date: Thu, 19 Sep 2024 13:54:11 -0400 Subject: [PATCH] compute: add new dyncfg to set replica expiration time --- src/compute-types/src/dyncfgs.rs | 9 +++++++++ src/compute/src/compute_state.rs | 20 ++++++++++++++++++++ 2 files changed, 29 insertions(+) diff --git a/src/compute-types/src/dyncfgs.rs b/src/compute-types/src/dyncfgs.rs index f37aaec0991a3..878f85585a9e0 100644 --- a/src/compute-types/src/dyncfgs.rs +++ b/src/compute-types/src/dyncfgs.rs @@ -132,6 +132,14 @@ pub const PERSIST_SINK_OBEY_READ_ONLY: Config = Config::new( "Whether the compute persist_sink obeys read-only mode.", ); +/// Sets the max lifetime for replicas configured as an offset to the replica start time. +/// Diffs generated at timestamps beyond the expiration time are dropped. +pub const COMPUTE_REPLICA_EXPIRATION: Config = Config::new( + "compute_replica_expiration", + Duration::ZERO, + "The expiration time for replicas. Zero disables expiration.", +); + /// Adds the full set of all compute `Config`s. pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { configs @@ -150,4 +158,5 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet { .add(©_TO_S3_ARROW_BUILDER_BUFFER_RATIO) .add(©_TO_S3_MULTIPART_PART_SIZE_BYTES) .add(&PERSIST_SINK_OBEY_READ_ONLY) + .add(&COMPUTE_REPLICA_EXPIRATION) } diff --git a/src/compute/src/compute_state.rs b/src/compute/src/compute_state.rs index 782232e125fe5..306498ba21b57 100644 --- a/src/compute/src/compute_state.rs +++ b/src/compute/src/compute_state.rs @@ -34,6 +34,7 @@ use mz_dyncfg::ConfigSet; use mz_expr::SafeMfpPlan; use mz_ore::cast::CastFrom; use mz_ore::metrics::UIntGauge; +use mz_ore::now::EpochMillis; use mz_ore::task::AbortOnDropHandle; use mz_ore::tracing::{OpenTelemetryContext, TracingHandle}; use mz_persist_client::cache::PersistClientCache; @@ -162,6 +163,9 @@ pub struct ComputeState { /// Interval at which to perform server maintenance tasks. Set to a zero interval to /// perform maintenance with every `step_or_park` invocation. pub server_maintenance_interval: Duration, + + /// The time at which to expire replicas. + pub replica_expiration: Option, } impl ComputeState { @@ -208,6 +212,7 @@ impl ComputeState { read_only_tx, read_only_rx, server_maintenance_interval: Duration::ZERO, + replica_expiration: None, } } @@ -294,6 +299,21 @@ impl ComputeState { // Remember the maintenance interval locally to avoid reading it from the config set on // every server iteration. self.server_maintenance_interval = COMPUTE_SERVER_MAINTENANCE_INTERVAL.get(config); + + if self.replica_expiration.is_none() { + let offset = COMPUTE_REPLICA_EXPIRATION.get(&self.worker_config); + if !offset.is_zero() { + // TODO(sdht0): consistently send now() from environmentd. + let now = mz_ore::now::SYSTEM_TIME.clone()(); + let offset: EpochMillis = offset + .as_millis() + .try_into() + .expect("Duration did not fit within u64"); + let replica_expiration = Timestamp::new(now + offset); + info!("Replica frontier expires at {:?}", replica_expiration); + self.replica_expiration = Some(replica_expiration) + } + } } /// Returns the cc or non-cc version of "dataflow_max_inflight_bytes", as