diff --git a/src/adapter/src/coord/introspection.rs b/src/adapter/src/coord/introspection.rs index e425a74df4d76..a74df59749519 100644 --- a/src/adapter/src/coord/introspection.rs +++ b/src/adapter/src/coord/introspection.rs @@ -255,7 +255,7 @@ impl Coordinator { let read_holds = self.acquire_read_holds(&id_bundle); let as_of = read_holds.least_valid_read(); - let global_mir_plan = global_mir_plan.resolve(as_of); + let global_mir_plan = global_mir_plan.resolve(as_of, None); let span = Span::current(); Ok(StageResult::Handle(mz_ore::task::spawn_blocking( diff --git a/src/adapter/src/coord/sequencer/inner/subscribe.rs b/src/adapter/src/coord/sequencer/inner/subscribe.rs index cf8fd1289dba8..3b674428a6887 100644 --- a/src/adapter/src/coord/sequencer/inner/subscribe.rs +++ b/src/adapter/src/coord/sequencer/inner/subscribe.rs @@ -283,7 +283,10 @@ impl Coordinator { self.store_transaction_read_holds(ctx.session(), read_holds); - let global_mir_plan = global_mir_plan.resolve(Antichain::from_elem(as_of)); + let global_mir_plan = global_mir_plan.resolve( + Antichain::from_elem(as_of), + determination.timestamp_context.timeline().cloned(), + ); // Optimize LIR let span = Span::current(); diff --git a/src/adapter/src/optimize/copy_to.rs b/src/adapter/src/optimize/copy_to.rs index dbc662e3fda99..f0a0a2dd498e3 100644 --- a/src/adapter/src/optimize/copy_to.rs +++ b/src/adapter/src/optimize/copy_to.rs @@ -274,6 +274,9 @@ impl<'s> Optimize>> for Optimizer { }; df_desc.export_sink(self.select_id, sink_description); + // Capture the timeline. + df_desc.timeline = timestamp_ctx.timeline().cloned(); + // Prepare expressions in the assembled dataflow. // // Resolve all unmaterializable function calls except mz_now(), because diff --git a/src/adapter/src/optimize/index.rs b/src/adapter/src/optimize/index.rs index 31c01b8e5dfb6..9e0ed7e6e9ed6 100644 --- a/src/adapter/src/optimize/index.rs +++ b/src/adapter/src/optimize/index.rs @@ -34,6 +34,7 @@ use mz_repr::explain::trace_plan; use mz_repr::GlobalId; use mz_sql::names::QualifiedItemName; use mz_sql::optimizer_metrics::OptimizerMetrics; +use mz_storage_types::sources::Timeline; use mz_transform::dataflow::DataflowMetainfo; use mz_transform::normalize_lets::normalize_lets; use mz_transform::notice::{IndexAlreadyExists, IndexKeyEmpty}; @@ -163,6 +164,9 @@ impl Optimize for Optimizer { }; df_desc.export_index(self.exported_index_id, index_desc, on_desc.typ().clone()); + // TODO(sdht0): consider other timelines such as `TimelineContext::TimestampIndependent`. + df_desc.timeline = Some(Timeline::EpochMilliseconds); + // Prepare expressions in the assembled dataflow. let style = ExprPrepStyle::Index; df_desc.visit_children( diff --git a/src/adapter/src/optimize/materialized_view.rs b/src/adapter/src/optimize/materialized_view.rs index a3d12c32246e4..57b9c8478e6e2 100644 --- a/src/adapter/src/optimize/materialized_view.rs +++ b/src/adapter/src/optimize/materialized_view.rs @@ -36,6 +36,7 @@ use mz_repr::refresh_schedule::RefreshSchedule; use mz_repr::{ColumnName, GlobalId, RelationDesc}; use mz_sql::optimizer_metrics::OptimizerMetrics; use mz_sql::plan::HirRelationExpr; +use mz_storage_types::sources::Timeline; use mz_transform::dataflow::DataflowMetainfo; use mz_transform::normalize_lets::normalize_lets; use mz_transform::typecheck::{empty_context, SharedContext as TypecheckContext}; @@ -247,6 +248,9 @@ impl Optimize for Optimizer { }; df_desc.export_sink(self.sink_id, sink_description); + // TODO(sdht0): consider other timelines such as `TimelineContext::TimestampIndependent`. + df_desc.timeline = Some(Timeline::EpochMilliseconds); + // Prepare expressions in the assembled dataflow. let style = ExprPrepStyle::Index; df_desc.visit_children( diff --git a/src/adapter/src/optimize/peek.rs b/src/adapter/src/optimize/peek.rs index 4f0fe2349f893..861b995e6fcc1 100644 --- a/src/adapter/src/optimize/peek.rs +++ b/src/adapter/src/optimize/peek.rs @@ -306,6 +306,9 @@ impl<'s> Optimize>> for Optimizer { df_desc.until = Antichain::from_elem(until); } + // Capture the timeline. + df_desc.timeline = timestamp_ctx.timeline().cloned(); + // Construct TransformCtx for global optimization. let mut transform_ctx = TransformCtx::global( &df_builder, diff --git a/src/adapter/src/optimize/subscribe.rs b/src/adapter/src/optimize/subscribe.rs index 047cbcb76d493..8d69505df8271 100644 --- a/src/adapter/src/optimize/subscribe.rs +++ b/src/adapter/src/optimize/subscribe.rs @@ -23,6 +23,7 @@ use mz_ore::soft_assert_or_log; use mz_repr::{GlobalId, RelationDesc, Timestamp}; use mz_sql::optimizer_metrics::OptimizerMetrics; use mz_sql::plan::SubscribeFrom; +use mz_storage_types::sources::Timeline; use mz_transform::dataflow::DataflowMetainfo; use mz_transform::normalize_lets::normalize_lets; use mz_transform::typecheck::{empty_context, SharedContext as TypecheckContext}; @@ -297,7 +298,11 @@ impl GlobalMirPlan { /// We need to resolve timestamps before the `GlobalMirPlan ⇒ GlobalLirPlan` /// optimization stage in order to profit from possible single-time /// optimizations in the `Plan::finalize_dataflow` call. - pub fn resolve(mut self, as_of: Antichain) -> GlobalMirPlan { + pub fn resolve( + mut self, + as_of: Antichain, + timeline: Option, + ) -> GlobalMirPlan { // A dataflow description for a `SUBSCRIBE` statement should not have // index exports. soft_assert_or_log!( @@ -316,6 +321,9 @@ impl GlobalMirPlan { self.df_desc.until.join_assign(&sink.up_to); } + // Capture the timeline. + self.df_desc.timeline = timeline; + GlobalMirPlan { df_desc: self.df_desc, df_meta: self.df_meta, diff --git a/src/compute-client/src/as_of_selection.rs b/src/compute-client/src/as_of_selection.rs index fe1b84dc55663..bac2052d263a5 100644 --- a/src/compute-client/src/as_of_selection.rs +++ b/src/compute-client/src/as_of_selection.rs @@ -997,6 +997,7 @@ mod tests { initial_storage_as_of: Default::default(), refresh_schedule: Default::default(), debug_name: Default::default(), + timeline: None, } } diff --git a/src/compute-client/src/controller/instance.rs b/src/compute-client/src/controller/instance.rs index fb84afa3be0f3..a9c03aca47ca6 100644 --- a/src/compute-client/src/controller/instance.rs +++ b/src/compute-client/src/controller/instance.rs @@ -1342,6 +1342,7 @@ where initial_storage_as_of: dataflow.initial_storage_as_of, refresh_schedule: dataflow.refresh_schedule, debug_name: dataflow.debug_name, + timeline: dataflow.timeline, }; if augmented_dataflow.is_transient() { diff --git a/src/compute-types/src/dataflows.proto b/src/compute-types/src/dataflows.proto index 95e0351fe34db..d81da248e38e8 100644 --- a/src/compute-types/src/dataflows.proto +++ b/src/compute-types/src/dataflows.proto @@ -20,6 +20,7 @@ import "repr/src/antichain.proto"; import "repr/src/global_id.proto"; import "repr/src/refresh_schedule.proto"; import "repr/src/relation_and_scalar.proto"; +import "storage-types/src/sources.proto"; package mz_compute_types.dataflows; @@ -57,6 +58,7 @@ message ProtoDataflowDescription { mz_repr.antichain.ProtoU64Antichain until = 7; optional mz_repr.antichain.ProtoU64Antichain initial_storage_as_of = 9; optional mz_repr.refresh_schedule.ProtoRefreshSchedule refresh_schedule = 10; + optional mz_storage_types.sources.ProtoTimeline timeline = 11; string debug_name = 8; } diff --git a/src/compute-types/src/dataflows.rs b/src/compute-types/src/dataflows.rs index df4793d86400b..7226a69c7150e 100644 --- a/src/compute-types/src/dataflows.rs +++ b/src/compute-types/src/dataflows.rs @@ -18,6 +18,7 @@ use mz_proto::{IntoRustIfSome, ProtoMapEntry, ProtoType, RustType, TryFromProtoE use mz_repr::refresh_schedule::RefreshSchedule; use mz_repr::{GlobalId, RelationType}; use mz_storage_types::controller::CollectionMetadata; +use mz_storage_types::sources::Timeline; use proptest::prelude::{any, Arbitrary}; use proptest::strategy::{BoxedStrategy, Strategy}; use proptest_derive::Arbitrary; @@ -72,6 +73,8 @@ pub struct DataflowDescription { pub refresh_schedule: Option, /// Human readable name pub debug_name: String, + /// The timeline of the dataflow. + pub timeline: Option, } impl DataflowDescription, (), mz_repr::Timestamp> { @@ -141,6 +144,7 @@ impl DataflowDescription { initial_storage_as_of: None, refresh_schedule: None, debug_name: name, + timeline: None, } } @@ -543,6 +547,7 @@ where initial_storage_as_of: self.initial_storage_as_of.clone(), refresh_schedule: self.refresh_schedule.clone(), debug_name: self.debug_name.clone(), + timeline: self.timeline.clone(), } } } @@ -560,6 +565,7 @@ impl RustType for DataflowDescription for DataflowDescription(), 1..5), refresh_schedule_some in any::(), refresh_schedule in any::(), + timeline_some in any::(), + timeline in any::(), ) -> DataflowDescription { DataflowDescription { source_imports: BTreeMap::from_iter(source_imports.into_iter()), @@ -743,6 +752,7 @@ proptest::prop_compose! { None }, debug_name, + timeline: timeline_some.then_some(timeline), } } } diff --git a/src/compute-types/src/plan/lowering.rs b/src/compute-types/src/plan/lowering.rs index e9dc039f7f2bf..6262ac286fcdc 100644 --- a/src/compute-types/src/plan/lowering.rs +++ b/src/compute-types/src/plan/lowering.rs @@ -117,6 +117,7 @@ impl Context { initial_storage_as_of: desc.initial_storage_as_of, refresh_schedule: desc.refresh_schedule, debug_name: desc.debug_name, + timeline: desc.timeline, }) } diff --git a/src/compute/src/render.rs b/src/compute/src/render.rs index e41c94ca33fbc..a0eb2db181817 100644 --- a/src/compute/src/render.rs +++ b/src/compute/src/render.rs @@ -199,10 +199,14 @@ pub fn build_compute_dataflow( .map(|(sink_id, sink)| (*sink_id, dataflow.depends_on(sink.from), sink.clone())) .collect::>(); - let expire_at = compute_state - .replica_expiration - .map(Antichain::from_elem) - .unwrap_or_default(); + let expire_at = if dataflow.timeline == Some(Timeline::EpochMilliseconds) { + compute_state + .replica_expiration + .map(Antichain::from_elem) + .unwrap_or_default() + } else { + Antichain::new() + }; let until = dataflow.until.meet(&expire_at); diff --git a/src/compute/src/server.rs b/src/compute/src/server.rs index 81808034faac8..49003d86a804a 100644 --- a/src/compute/src/server.rs +++ b/src/compute/src/server.rs @@ -324,6 +324,7 @@ impl<'w, A: Allocate + 'static> Worker<'w, A> { debug_name: dataflow.debug_name.clone(), initial_storage_as_of: dataflow.initial_storage_as_of.clone(), refresh_schedule: dataflow.refresh_schedule.clone(), + timeline: dataflow.timeline.clone(), }) .map(ComputeCommand::CreateDataflow) .collect()