diff --git a/doc/developer/design/20240919_dataflow_expiration.md b/doc/developer/design/20240919_dataflow_expiration.md index 585e0c6740112..4479a27afd396 100644 --- a/doc/developer/design/20240919_dataflow_expiration.md +++ b/doc/developer/design/20240919_dataflow_expiration.md @@ -69,6 +69,15 @@ else: # The `upper` for a dataflow considering all its transitive inputs inputs_upper := meet(for all inputs i: i_upper) +# Compute the upper of replica_expiration and any set refresh schedule +# TODO: expiration_upper is only available in the optimizer, but replica_expiration +# is only available in cluster. +fn expiration_upper(obj, replica_expiration) := + match obj: + MV(inputs, refresh schedule) => refresh_schedule.round_up_timestamp(meet(for all o in inputs: expiration_upper(o, replica_expiration))) + Index(inputs) | Subscribe(inputs) | ? => meet(for all o in inputs: expiration_upper(o, replica_expiration)) + Table | Source => 1s.round_up_timestamp(expiration_time) + # Dataflow expiration logic if compute_replica_expiration_offset is not set: dataflow_replication := [] @@ -78,31 +87,26 @@ else for dataflows of type in [materialized view, index, subscribe]: # Dataflows that do not depend on any source or table are not in the # EpochMilliseconds timeline dataflow_expiration := [] - else if refresh_interval set in any transitive dependency of dataflow: - dataflow_expiration := [] - else if inputs_upper == []: - dataflow_expiration := [] - else if inputs_upper > expiration: - dataflow_expiration := inputs_upper else: - dataflow_expiration := replica_expiration + replica_expiration_upper := expiration_upper(dataflow, replica_expiration) + dataflow_expiration := join(replica_expiration_upper, inputs_upper) dataflow_until := dataflow_until.meet(dataflow_expiration) ``` -Note that we only consider dataflows representing materialized views, indexes, -and subscribes. These are long-running dataflows that maintain state during +Note that only dataflows representing materialized views, indexes, +and subscribes are considered. These are long-running dataflows that maintain state during their lifetime. Other dataflows such as peeks are transient and do not need to explicitly drop retraction diffs. -More concretely, we make the following changes: +More concretely, this feature involves the following changes: * Introduce a new dyncfg `compute_replica_expiration_offset`. * If the offset is configured with a non-zero value, compute - `replica_expiration = now() + offset`. This value specifies the maximum - time for which the replica is expected to be running. Consequently, diffs - associated with timestamps beyond this limit do not have to be stored and can - be dropped. + `replica_expiration = now() + offset`. This value indicates the maximum + time for which the replica is expected to be running. + * Existing replicas should be restarted to enable or disable replica + expiration for them. * When building a dataflow, compute `dataflow_expiration` as per the logic described above. If non-empty, the `dataflow_expiration` is added to the dataflow `until` that ensures that any diff beyond this limit is dropped in @@ -118,7 +122,7 @@ More concretely, we make the following changes: ## Open Questions - What is the appropriate default expiration time? - - Given that we currently restart replicas every week as part of the DB release + - Given that replicas are restarted every week as part of the DB release and leaving some buffer for skipped week, 3 weeks (+1 day margin) seems like a good limit to start with. diff --git a/src/adapter/src/catalog/dataflow_expiration.rs b/src/adapter/src/catalog/dataflow_expiration.rs index 7fa28e27a4cf8..fa90af4c78aef 100644 --- a/src/adapter/src/catalog/dataflow_expiration.rs +++ b/src/adapter/src/catalog/dataflow_expiration.rs @@ -5,24 +5,35 @@ //! Helper function for dataflow expiration checks. -use mz_repr::GlobalId; - use crate::catalog::Catalog; +use mz_compute_types::dataflows::{RefreshDep, RefreshDepIndex}; +use mz_repr::GlobalId; impl Catalog { - /// Whether the catalog entry `id` or any of its transitive dependencies is a materialized view - /// with a refresh schedule. Used to disable dataflow expiration if found. - pub(crate) fn item_has_transitive_refresh_schedule(&self, id: GlobalId) -> bool { - let test_has_transitive_refresh_schedule = |dep: GlobalId| -> bool { - if let Some(mv) = self.get_entry(&dep).materialized_view() { - return mv.refresh_schedule.is_some(); + /// Recursive function. + pub(crate) fn get_refresh_dependencies( + &self, + deps: impl Iterator, + deps_tree: &mut Vec, + ) -> Option { + let mut local_deps = Vec::new(); + for dep in deps { + let entry = self.get_entry(&dep); + let refresh_dep_index = + self.get_refresh_dependencies(entry.uses().into_iter(), deps_tree); + let refresh_schedule = entry + .materialized_view() + .and_then(|mv| mv.refresh_schedule.clone()); + if refresh_dep_index.is_some() || refresh_schedule.is_some() { + local_deps.push(RefreshDep { + refresh_dep_index, + refresh_schedule, + }); } - false - }; - test_has_transitive_refresh_schedule(id) - || self - .state() - .transitive_uses(id) - .any(test_has_transitive_refresh_schedule) + } + let start = deps_tree.len(); + deps_tree.extend(local_deps); + let end = deps_tree.len(); + (end > start).then_some(RefreshDepIndex { start, end }) } } diff --git a/src/adapter/src/coord/sequencer/inner/create_index.rs b/src/adapter/src/coord/sequencer/inner/create_index.rs index ec506d9e022a6..8ceeda06ddb0d 100644 --- a/src/adapter/src/coord/sequencer/inner/create_index.rs +++ b/src/adapter/src/coord/sequencer/inner/create_index.rs @@ -442,7 +442,6 @@ impl Coordinator { // Collect properties for `DataflowExpirationDesc`. let transitive_upper = self.least_valid_write(&id_bundle); - let has_transitive_refresh_schedule = self.catalog.item_has_transitive_refresh_schedule(on); // Pre-allocate a vector of transient GlobalIds for each notice. let notice_ids = std::iter::repeat_with(|| self.allocate_transient_id()) @@ -477,9 +476,6 @@ impl Coordinator { df_desc.set_as_of(since); df_desc.dataflow_expiration_desc.transitive_upper = Some(transitive_upper); - df_desc - .dataflow_expiration_desc - .has_transitive_refresh_schedule = has_transitive_refresh_schedule; coord .ship_dataflow_and_notice_builtin_table_updates( diff --git a/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs b/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs index f06eaae858025..35d0a864bcb47 100644 --- a/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs +++ b/src/adapter/src/coord/sequencer/inner/create_materialized_view.rs @@ -575,11 +575,10 @@ impl Coordinator { // Collect properties for `DataflowExpirationDesc`. let transitive_upper = self.least_valid_write(&id_bundle); - let has_transitive_refresh_schedule = refresh_schedule.is_some() - || raw_expr - .depends_on() - .into_iter() - .any(|id| self.catalog.item_has_transitive_refresh_schedule(id)); + let mut refresh_deps = Vec::new(); + let refresh_deps_index = self + .catalog + .get_refresh_dependencies(raw_expr.depends_on().into_iter(), &mut refresh_deps); let read_holds_owned; let read_holds = if let Some(txn_reads) = self.txn_read_holds.get(session.conn_id()) { @@ -675,9 +674,8 @@ impl Coordinator { df_desc.until = until; df_desc.dataflow_expiration_desc.transitive_upper = Some(transitive_upper); - df_desc - .dataflow_expiration_desc - .has_transitive_refresh_schedule = has_transitive_refresh_schedule; + df_desc.dataflow_expiration_desc.refresh_deps = refresh_deps; + df_desc.dataflow_expiration_desc.refresh_deps_index = refresh_deps_index; let storage_metadata = coord.catalog.state().storage_metadata(); diff --git a/src/adapter/src/coord/sequencer/inner/subscribe.rs b/src/adapter/src/coord/sequencer/inner/subscribe.rs index 8d9c7c9f51b29..63bd01d19645e 100644 --- a/src/adapter/src/coord/sequencer/inner/subscribe.rs +++ b/src/adapter/src/coord/sequencer/inner/subscribe.rs @@ -324,7 +324,6 @@ impl Coordinator { cluster_id, plan: plan::SubscribePlan { - from, copy_to, emit_progress, output, @@ -339,10 +338,6 @@ impl Coordinator { // Collect properties for `DataflowExpirationDesc`. let transitive_upper = self.least_valid_write(&id_bundle); - let has_transitive_refresh_schedule = from - .depends_on() - .into_iter() - .any(|id| self.catalog.item_has_transitive_refresh_schedule(id)); let sink_id = global_lir_plan.sink_id(); @@ -366,9 +361,6 @@ impl Coordinator { let (mut df_desc, df_meta) = global_lir_plan.unapply(); df_desc.dataflow_expiration_desc.transitive_upper = Some(transitive_upper); - df_desc - .dataflow_expiration_desc - .has_transitive_refresh_schedule = has_transitive_refresh_schedule; // Emit notices. self.emit_optimizer_notices(ctx.session(), &df_meta.optimizer_notices); diff --git a/src/compute-types/src/dataflows.proto b/src/compute-types/src/dataflows.proto index 3d990ef3d156d..1f39b53112908 100644 --- a/src/compute-types/src/dataflows.proto +++ b/src/compute-types/src/dataflows.proto @@ -48,10 +48,21 @@ message ProtoDataflowDescription { sinks.ProtoComputeSinkDesc sink_desc = 2; } + message ProtoRefreshDepIndex { + uint64 start = 1; + uint64 end = 2; + } + + message ProtoRefreshDep { + optional ProtoRefreshDepIndex refresh_dep_index = 1; + optional mz_repr.refresh_schedule.ProtoRefreshSchedule refresh_schedule = 2; + } + message ProtoDataflowExpirationDesc { optional mz_repr.antichain.ProtoU64Antichain transitive_upper = 1; - bool has_transitive_refresh_schedule = 2; bool is_timeline_epoch_ms = 3; + repeated ProtoRefreshDep refresh_deps = 4; + optional ProtoRefreshDepIndex refresh_deps_index = 5; } repeated ProtoSourceImport source_imports = 1; diff --git a/src/compute-types/src/dataflows.rs b/src/compute-types/src/dataflows.rs index a4b5779ac8c4b..31ee4cb918a87 100644 --- a/src/compute-types/src/dataflows.rs +++ b/src/compute-types/src/dataflows.rs @@ -17,17 +17,18 @@ use mz_expr::{CollectionPlan, MirRelationExpr, MirScalarExpr, OptimizedMirRelati use mz_ore::soft_assert_or_log; use mz_proto::{IntoRustIfSome, ProtoMapEntry, ProtoType, RustType, TryFromProtoError}; use mz_repr::refresh_schedule::RefreshSchedule; -use mz_repr::{GlobalId, RelationType}; +use mz_repr::{GlobalId, RelationType, Timestamp}; use mz_storage_types::controller::CollectionMetadata; use proptest::prelude::{any, Arbitrary}; use proptest::strategy::{BoxedStrategy, Strategy}; use proptest_derive::Arbitrary; use serde::{Deserialize, Serialize}; use timely::progress::Antichain; +use timely::Container; use crate::dataflows::proto_dataflow_description::{ - ProtoDataflowExpirationDesc, ProtoIndexExport, ProtoIndexImport, ProtoSinkExport, - ProtoSourceImport, + ProtoDataflowExpirationDesc, ProtoIndexExport, ProtoIndexImport, ProtoRefreshDep, + ProtoRefreshDepIndex, ProtoSinkExport, ProtoSourceImport, }; use crate::plan::flat_plan::FlatPlan; use crate::plan::Plan; @@ -78,7 +79,7 @@ pub struct DataflowDescription { pub dataflow_expiration_desc: DataflowExpirationDesc, } -impl DataflowDescription { +impl DataflowDescription { /// Tests if the dataflow refers to a single timestamp, namely /// that `as_of` has a single coordinate and that the `until` /// value corresponds to the `as_of` value plus one, or `as_of` @@ -119,26 +120,78 @@ impl DataflowDescription { /// which dataflow expiration should be disabled. pub fn expire_dataflow_at( &self, - replica_expiration: &Antichain, - ) -> Antichain { + replica_expiration: &Antichain, + ) -> Antichain { let dataflow_expiration_desc = &self.dataflow_expiration_desc; - // Disable dataflow expiration if `replica_expiration` is unset, the current dataflow has a - // refresh schedule, has a transitive dependency with a refresh schedule, or the dataflow's - // timeline is not `Timeline::EpochMilliSeconds`. - if replica_expiration.is_empty() - || self.refresh_schedule.is_some() - || dataflow_expiration_desc.has_transitive_refresh_schedule - || !dataflow_expiration_desc.is_timeline_epoch_ms - { + // Disable dataflow expiration if the dataflow's timeline is not `Timeline::EpochMilliSeconds`. + if !dataflow_expiration_desc.is_timeline_epoch_ms { return Antichain::default(); } + let Some(&replica_expiration_ts) = replica_expiration.as_option() else { + return Antichain::default(); + }; + let mut expiration_with_refresh_ts = + if let Some(refresh_deps_index) = dataflow_expiration_desc.refresh_deps_index { + get_expiration_with_refresh( + refresh_deps_index, + &dataflow_expiration_desc.refresh_deps, + replica_expiration_ts, + ) + } else { + replica_expiration_ts + }; + apply_optional_refresh_schedule(&mut expiration_with_refresh_ts, &self.refresh_schedule); + let expiration_with_refresh = Antichain::from_elem(expiration_with_refresh_ts); + if let Some(upper) = &dataflow_expiration_desc.transitive_upper { // Returns empty if `upper` is empty, else the max of `upper` and `replica_expiration`. - upper.join(replica_expiration) + upper.join(&expiration_with_refresh) } else { - replica_expiration.clone() + expiration_with_refresh + } + } +} + +fn get_expiration_with_refresh( + refresh_deps_index: RefreshDepIndex, + refresh_deps: &Vec, + replica_expiration_ts: Timestamp, +) -> Timestamp { + refresh_deps[refresh_deps_index.start..refresh_deps_index.end] + .iter() + .map( + |RefreshDep { + refresh_dep_index, + refresh_schedule, + }| { + let mut expiration_with_refresh = refresh_dep_index + .map(|refresh_dep_index| { + get_expiration_with_refresh( + refresh_dep_index, + refresh_deps, + replica_expiration_ts, + ) + }) + .unwrap_or(replica_expiration_ts); + apply_optional_refresh_schedule(&mut expiration_with_refresh, refresh_schedule); + expiration_with_refresh + }, + ) + .reduce(|acc, expiration_with_refresh| acc.meet(&expiration_with_refresh)) + .unwrap_or(replica_expiration_ts) +} + +fn apply_optional_refresh_schedule( + expiration_with_refresh: &mut Timestamp, + refresh_schedule: &Option, +) { + if let Some(refresh_schedule) = refresh_schedule { + if let Some(new_expiration_with_refresh) = + refresh_schedule.round_up_timestamp(*expiration_with_refresh) + { + *expiration_with_refresh = new_expiration_with_refresh; } } } @@ -893,68 +946,67 @@ impl RustType for BuildDesc { pub struct DataflowExpirationDesc { /// The upper of the dataflow considering all transitive dependencies. pub transitive_upper: Option>, - /// Whether the dataflow has a transitive dependency with a refresh schedule. - pub has_transitive_refresh_schedule: bool, /// Whether the timeline of the dataflow is [`mz_storage_types::sources::Timeline::EpochMilliseconds`]. pub is_timeline_epoch_ms: bool, + /// TODO + pub refresh_deps: Vec, + /// TODO + pub refresh_deps_index: Option, } impl Default for DataflowExpirationDesc { fn default() -> Self { Self { transitive_upper: None, - // Assume present unless explicitly checked. - has_transitive_refresh_schedule: true, // Assume any timeline type possible unless explicitly checked. is_timeline_epoch_ms: false, + refresh_deps: vec![], + refresh_deps_index: None, } } } -impl RustType for DataflowExpirationDesc { +impl RustType for DataflowExpirationDesc { fn into_proto(&self) -> ProtoDataflowExpirationDesc { ProtoDataflowExpirationDesc { transitive_upper: self.transitive_upper.into_proto(), - has_transitive_refresh_schedule: self.has_transitive_refresh_schedule.into_proto(), is_timeline_epoch_ms: self.is_timeline_epoch_ms.into_proto(), + refresh_deps: self.refresh_deps.into_proto(), + refresh_deps_index: self.refresh_deps_index.into_proto(), } } fn from_proto(x: ProtoDataflowExpirationDesc) -> Result { Ok(Self { transitive_upper: x.transitive_upper.into_rust()?, - has_transitive_refresh_schedule: x.has_transitive_refresh_schedule.into_rust()?, is_timeline_epoch_ms: x.is_timeline_epoch_ms.into_rust()?, + refresh_deps: x.refresh_deps.into_rust()?, + refresh_deps_index: x.refresh_deps_index.into_rust()?, }) } } -impl Arbitrary for DataflowExpirationDesc { +impl Arbitrary for DataflowExpirationDesc { type Strategy = BoxedStrategy; type Parameters = (); fn arbitrary_with(_: Self::Parameters) -> Self::Strategy { ( any::(), - any::(), - proptest::collection::vec(any::(), 1..5), + proptest::collection::vec(any::(), 1..5), any::(), ) .prop_map( - |( - has_transitive_refresh_schedule, - transitive_upper_some, - transitive_upper, - is_timeline_epoch_ms, - )| { + |(transitive_upper_some, transitive_upper, is_timeline_epoch_ms)| { DataflowExpirationDesc { transitive_upper: if transitive_upper_some { Some(Antichain::from(transitive_upper)) } else { None }, - has_transitive_refresh_schedule, is_timeline_epoch_ms, + refresh_deps: vec![], + refresh_deps_index: None, } }, ) @@ -962,6 +1014,56 @@ impl Arbitrary for DataflowExpirationDesc { } } +/// TODO +#[derive(Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct RefreshDep { + /// TODO + pub refresh_dep_index: Option, + /// TODO + pub refresh_schedule: Option, +} + +impl RustType for RefreshDep { + fn into_proto(&self) -> ProtoRefreshDep { + ProtoRefreshDep { + refresh_dep_index: self.refresh_dep_index.into_proto(), + refresh_schedule: self.refresh_schedule.into_proto(), + } + } + + fn from_proto(proto: ProtoRefreshDep) -> Result { + Ok(RefreshDep { + refresh_dep_index: proto.refresh_dep_index.into_rust()?, + refresh_schedule: proto.refresh_schedule.into_rust()?, + }) + } +} + +/// TODO +#[derive(Copy, Clone, Debug, Serialize, Deserialize, Eq, PartialEq)] +pub struct RefreshDepIndex { + /// TODO + pub start: usize, + /// TODO + pub end: usize, +} + +impl RustType for RefreshDepIndex { + fn into_proto(&self) -> ProtoRefreshDepIndex { + ProtoRefreshDepIndex { + start: self.start.into_proto(), + end: self.end.into_proto(), + } + } + + fn from_proto(proto: ProtoRefreshDepIndex) -> Result { + Ok(RefreshDepIndex { + start: proto.start.into_rust()?, + end: proto.end.into_rust()?, + }) + } +} + #[cfg(test)] mod tests { use mz_ore::assert_ok; diff --git a/test/testdrive/replica-expiration.td b/test/testdrive/replica-expiration.td index abe629c80af15..12682003f1428 100644 --- a/test/testdrive/replica-expiration.td +++ b/test/testdrive/replica-expiration.td @@ -239,3 +239,20 @@ ALTER SYSTEM SET compute_replica_expiration_offset = '30d'; 1000 > DROP MATERIALIZED VIEW events_mv CASCADE; > DROP CLUSTER test CASCADE; + +# Test refresh schedules + +## CREATE TABLE t (id int, ts timestamp); +## CREATE MATERIALIZED VIEW mv1 with (refresh at creation, refresh EVERY '7 days' ALIGNED TO '2024-06-04 12:00:00') as select * from t where id < 10; +## - `2024-10-15 12:00:00.000` +## CREATE MATERIALIZED VIEW mv2 with (refresh at creation, refresh EVERY '30 days' ALIGNED TO '2024-06-04 12:00:00') as select * from t where id < 15; +## - `2024-11-01 12:00:00.000` +## CREATE MATERIALIZED VIEW mv3 with (refresh at creation, refresh EVERY '60 days' ALIGNED TO '2024-06-04 12:00:00') as select * from mv2 where id < 15; +## - `2024-12-01 12:00:00.000` + +## CREATE VIEW v1 as select * from mv1 union all select * from mv2 where mz_now() <= ts + interval '4 days'; +## CREATE VIEW v2 as select * from mv2 union all select * from mv3 where mz_now() <= ts + interval '4 days'; +## CREATE VIEW v3 as select * from mv2 union all select * from mv3 where mz_now() <= ts + interval '4 days'; + +# Test dataflows multiple inputs (unions/joins), each of which can have a refresh schedule: meet(inputs) +# Test dataflows that recursively depend on MVs with refresh schedules