Skip to content

Commit

Permalink
compute: add new dyncfg to set replica expiration time
Browse files Browse the repository at this point in the history
  • Loading branch information
sdht0 committed Sep 19, 2024
1 parent 7b492cc commit 58184b8
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 0 deletions.
9 changes: 9 additions & 0 deletions src/compute-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,14 @@ pub const PERSIST_SINK_OBEY_READ_ONLY: Config<bool> = Config::new(
"Whether the compute persist_sink obeys read-only mode.",
);

/// Sets the max lifetime for replicas configured as an offset to the replica start time.
/// Diffs generated at timestamps beyond the expiration time are dropped.
pub const COMPUTE_REPLICA_EXPIRATION: Config<Duration> = Config::new(
"compute_replica_expiration",
Duration::ZERO,
"The expiration time for replicas. Zero disables expiration.",
);

/// Adds the full set of all compute `Config`s.
pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
configs
Expand All @@ -150,4 +158,5 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&COPY_TO_S3_ARROW_BUILDER_BUFFER_RATIO)
.add(&COPY_TO_S3_MULTIPART_PART_SIZE_BYTES)
.add(&PERSIST_SINK_OBEY_READ_ONLY)
.add(&COMPUTE_REPLICA_EXPIRATION)
}
20 changes: 20 additions & 0 deletions src/compute/src/compute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use mz_dyncfg::ConfigSet;
use mz_expr::SafeMfpPlan;
use mz_ore::cast::CastFrom;
use mz_ore::metrics::UIntGauge;
use mz_ore::now::EpochMillis;
use mz_ore::task::AbortOnDropHandle;
use mz_ore::tracing::{OpenTelemetryContext, TracingHandle};
use mz_persist_client::cache::PersistClientCache;
Expand Down Expand Up @@ -162,6 +163,9 @@ pub struct ComputeState {
/// Interval at which to perform server maintenance tasks. Set to a zero interval to
/// perform maintenance with every `step_or_park` invocation.
pub server_maintenance_interval: Duration,

/// The time at which to expire replicas.
pub replica_expiration: Option<Timestamp>,
}

impl ComputeState {
Expand Down Expand Up @@ -208,6 +212,7 @@ impl ComputeState {
read_only_tx,
read_only_rx,
server_maintenance_interval: Duration::ZERO,
replica_expiration: None,
}
}

Expand Down Expand Up @@ -294,6 +299,21 @@ impl ComputeState {
// Remember the maintenance interval locally to avoid reading it from the config set on
// every server iteration.
self.server_maintenance_interval = COMPUTE_SERVER_MAINTENANCE_INTERVAL.get(config);

if self.replica_expiration.is_none() {
let offset = COMPUTE_REPLICA_EXPIRATION.get(&self.worker_config);
if !offset.is_zero() {
// TODO(sdht0): consistently send now() from environmentd.
let now = mz_ore::now::SYSTEM_TIME.clone()();
let offset: EpochMillis = offset
.as_millis()
.try_into()
.expect("Duration did not fit within u64");
let replica_expiration = Timestamp::new(now + offset);
info!("Replica frontier expires at {:?}", replica_expiration);
self.replica_expiration = Some(replica_expiration)
}
}
}

/// Returns the cc or non-cc version of "dataflow_max_inflight_bytes", as
Expand Down

0 comments on commit 58184b8

Please sign in to comment.