Skip to content

Commit

Permalink
*: configure expiration based on dataflow timeline
Browse files Browse the repository at this point in the history
  • Loading branch information
sdht0 committed Sep 19, 2024
1 parent fd3ce12 commit 0eb513d
Show file tree
Hide file tree
Showing 14 changed files with 52 additions and 7 deletions.
2 changes: 1 addition & 1 deletion src/adapter/src/coord/introspection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,7 @@ impl Coordinator {
let read_holds = self.acquire_read_holds(&id_bundle);
let as_of = read_holds.least_valid_read();

let global_mir_plan = global_mir_plan.resolve(as_of);
let global_mir_plan = global_mir_plan.resolve(as_of, None);

let span = Span::current();
Ok(StageResult::Handle(mz_ore::task::spawn_blocking(
Expand Down
5 changes: 4 additions & 1 deletion src/adapter/src/coord/sequencer/inner/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,10 @@ impl Coordinator {

self.store_transaction_read_holds(ctx.session(), read_holds);

let global_mir_plan = global_mir_plan.resolve(Antichain::from_elem(as_of));
let global_mir_plan = global_mir_plan.resolve(
Antichain::from_elem(as_of),
determination.timestamp_context.timeline().cloned(),
);

// Optimize LIR
let span = Span::current();
Expand Down
3 changes: 3 additions & 0 deletions src/adapter/src/optimize/copy_to.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,9 @@ impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
};
df_desc.export_sink(self.select_id, sink_description);

// Capture the timeline.
df_desc.timeline = timestamp_ctx.timeline().cloned();

// Prepare expressions in the assembled dataflow.
//
// Resolve all unmaterializable function calls except mz_now(), because
Expand Down
4 changes: 4 additions & 0 deletions src/adapter/src/optimize/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ use mz_repr::explain::trace_plan;
use mz_repr::GlobalId;
use mz_sql::names::QualifiedItemName;
use mz_sql::optimizer_metrics::OptimizerMetrics;
use mz_storage_types::sources::Timeline;
use mz_transform::dataflow::DataflowMetainfo;
use mz_transform::normalize_lets::normalize_lets;
use mz_transform::notice::{IndexAlreadyExists, IndexKeyEmpty};
Expand Down Expand Up @@ -163,6 +164,9 @@ impl Optimize<Index> for Optimizer {
};
df_desc.export_index(self.exported_index_id, index_desc, on_desc.typ().clone());

// TODO(sdht0): consider other timelines such as `TimelineContext::TimestampIndependent`.
df_desc.timeline = Some(Timeline::EpochMilliseconds);

// Prepare expressions in the assembled dataflow.
let style = ExprPrepStyle::Index;
df_desc.visit_children(
Expand Down
4 changes: 4 additions & 0 deletions src/adapter/src/optimize/materialized_view.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ use mz_repr::refresh_schedule::RefreshSchedule;
use mz_repr::{ColumnName, GlobalId, RelationDesc};
use mz_sql::optimizer_metrics::OptimizerMetrics;
use mz_sql::plan::HirRelationExpr;
use mz_storage_types::sources::Timeline;
use mz_transform::dataflow::DataflowMetainfo;
use mz_transform::normalize_lets::normalize_lets;
use mz_transform::typecheck::{empty_context, SharedContext as TypecheckContext};
Expand Down Expand Up @@ -247,6 +248,9 @@ impl Optimize<LocalMirPlan> for Optimizer {
};
df_desc.export_sink(self.sink_id, sink_description);

// TODO(sdht0): consider other timelines such as `TimelineContext::TimestampIndependent`.
df_desc.timeline = Some(Timeline::EpochMilliseconds);

// Prepare expressions in the assembled dataflow.
let style = ExprPrepStyle::Index;
df_desc.visit_children(
Expand Down
3 changes: 3 additions & 0 deletions src/adapter/src/optimize/peek.rs
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,9 @@ impl<'s> Optimize<LocalMirPlan<Resolved<'s>>> for Optimizer {
df_desc.until = Antichain::from_elem(until);
}

// Capture the timeline.
df_desc.timeline = timestamp_ctx.timeline().cloned();

// Construct TransformCtx for global optimization.
let mut transform_ctx = TransformCtx::global(
&df_builder,
Expand Down
10 changes: 9 additions & 1 deletion src/adapter/src/optimize/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use mz_ore::soft_assert_or_log;
use mz_repr::{GlobalId, RelationDesc, Timestamp};
use mz_sql::optimizer_metrics::OptimizerMetrics;
use mz_sql::plan::SubscribeFrom;
use mz_storage_types::sources::Timeline;
use mz_transform::dataflow::DataflowMetainfo;
use mz_transform::normalize_lets::normalize_lets;
use mz_transform::typecheck::{empty_context, SharedContext as TypecheckContext};
Expand Down Expand Up @@ -297,7 +298,11 @@ impl GlobalMirPlan<Unresolved> {
/// We need to resolve timestamps before the `GlobalMirPlan ⇒ GlobalLirPlan`
/// optimization stage in order to profit from possible single-time
/// optimizations in the `Plan::finalize_dataflow` call.
pub fn resolve(mut self, as_of: Antichain<Timestamp>) -> GlobalMirPlan<Resolved> {
pub fn resolve(
mut self,
as_of: Antichain<Timestamp>,
timeline: Option<Timeline>,
) -> GlobalMirPlan<Resolved> {
// A dataflow description for a `SUBSCRIBE` statement should not have
// index exports.
soft_assert_or_log!(
Expand All @@ -316,6 +321,9 @@ impl GlobalMirPlan<Unresolved> {
self.df_desc.until.join_assign(&sink.up_to);
}

// Capture the timeline.
self.df_desc.timeline = timeline;

GlobalMirPlan {
df_desc: self.df_desc,
df_meta: self.df_meta,
Expand Down
1 change: 1 addition & 0 deletions src/compute-client/src/as_of_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,7 @@ mod tests {
initial_storage_as_of: Default::default(),
refresh_schedule: Default::default(),
debug_name: Default::default(),
timeline: None,
}
}

Expand Down
1 change: 1 addition & 0 deletions src/compute-client/src/controller/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1328,6 +1328,7 @@ where
initial_storage_as_of: dataflow.initial_storage_as_of,
refresh_schedule: dataflow.refresh_schedule,
debug_name: dataflow.debug_name,
timeline: dataflow.timeline,
};

if augmented_dataflow.is_transient() {
Expand Down
2 changes: 2 additions & 0 deletions src/compute-types/src/dataflows.proto
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import "repr/src/antichain.proto";
import "repr/src/global_id.proto";
import "repr/src/refresh_schedule.proto";
import "repr/src/relation_and_scalar.proto";
import "storage-types/src/sources.proto";

package mz_compute_types.dataflows;

Expand Down Expand Up @@ -57,6 +58,7 @@ message ProtoDataflowDescription {
mz_repr.antichain.ProtoU64Antichain until = 7;
optional mz_repr.antichain.ProtoU64Antichain initial_storage_as_of = 9;
optional mz_repr.refresh_schedule.ProtoRefreshSchedule refresh_schedule = 10;
optional mz_storage_types.sources.ProtoTimeline timeline = 11;

string debug_name = 8;
}
Expand Down
10 changes: 10 additions & 0 deletions src/compute-types/src/dataflows.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use mz_proto::{IntoRustIfSome, ProtoMapEntry, ProtoType, RustType, TryFromProtoE
use mz_repr::refresh_schedule::RefreshSchedule;
use mz_repr::{GlobalId, RelationType};
use mz_storage_types::controller::CollectionMetadata;
use mz_storage_types::sources::Timeline;
use proptest::prelude::{any, Arbitrary};
use proptest::strategy::{BoxedStrategy, Strategy};
use proptest_derive::Arbitrary;
Expand Down Expand Up @@ -72,6 +73,8 @@ pub struct DataflowDescription<P, S: 'static = (), T = mz_repr::Timestamp> {
pub refresh_schedule: Option<RefreshSchedule>,
/// Human readable name
pub debug_name: String,
/// The timeline of the dataflow.
pub timeline: Option<Timeline>,
}

impl<T> DataflowDescription<Plan<T>, (), mz_repr::Timestamp> {
Expand Down Expand Up @@ -141,6 +144,7 @@ impl<T> DataflowDescription<OptimizedMirRelationExpr, (), T> {
initial_storage_as_of: None,
refresh_schedule: None,
debug_name: name,
timeline: None,
}
}

Expand Down Expand Up @@ -543,6 +547,7 @@ where
initial_storage_as_of: self.initial_storage_as_of.clone(),
refresh_schedule: self.refresh_schedule.clone(),
debug_name: self.debug_name.clone(),
timeline: self.timeline.clone(),
}
}
}
Expand All @@ -560,6 +565,7 @@ impl RustType<ProtoDataflowDescription> for DataflowDescription<FlatPlan, Collec
initial_storage_as_of: self.initial_storage_as_of.into_proto(),
refresh_schedule: self.refresh_schedule.into_proto(),
debug_name: self.debug_name.clone(),
timeline: self.timeline.into_proto(),
}
}

Expand All @@ -582,6 +588,7 @@ impl RustType<ProtoDataflowDescription> for DataflowDescription<FlatPlan, Collec
.transpose()?,
refresh_schedule: proto.refresh_schedule.into_rust()?,
debug_name: proto.debug_name,
timeline: proto.timeline.into_rust()?,
})
}
}
Expand Down Expand Up @@ -717,6 +724,8 @@ proptest::prop_compose! {
initial_as_of in proptest::collection::vec(any::<mz_repr::Timestamp>(), 1..5),
refresh_schedule_some in any::<bool>(),
refresh_schedule in any::<RefreshSchedule>(),
timeline_some in any::<bool>(),
timeline in any::<Timeline>(),
) -> DataflowDescription<FlatPlan, CollectionMetadata, mz_repr::Timestamp> {
DataflowDescription {
source_imports: BTreeMap::from_iter(source_imports.into_iter()),
Expand All @@ -743,6 +752,7 @@ proptest::prop_compose! {
None
},
debug_name,
timeline: timeline_some.then_some(timeline),
}
}
}
Expand Down
1 change: 1 addition & 0 deletions src/compute-types/src/plan/lowering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ impl Context {
initial_storage_as_of: desc.initial_storage_as_of,
refresh_schedule: desc.refresh_schedule,
debug_name: desc.debug_name,
timeline: desc.timeline,
})
}

Expand Down
12 changes: 8 additions & 4 deletions src/compute/src/render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,14 @@ pub fn build_compute_dataflow<A: Allocate>(
.map(|(sink_id, sink)| (*sink_id, dataflow.depends_on(sink.from), sink.clone()))
.collect::<Vec<_>>();

let expire_at = compute_state
.replica_expiration
.map(Antichain::from_elem)
.unwrap_or_default();
let expire_at = if dataflow.timeline == Some(Timeline::EpochMilliseconds) {
compute_state
.replica_expiration
.map(Antichain::from_elem)
.unwrap_or_default()
} else {
Antichain::new()
};

let until = dataflow.until.meet(&expire_at);

Expand Down
1 change: 1 addition & 0 deletions src/compute/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ impl<'w, A: Allocate + 'static> Worker<'w, A> {
debug_name: dataflow.debug_name.clone(),
initial_storage_as_of: dataflow.initial_storage_as_of.clone(),
refresh_schedule: dataflow.refresh_schedule.clone(),
timeline: dataflow.timeline.clone(),
})
.map(ComputeCommand::CreateDataflow)
.collect()
Expand Down

0 comments on commit 0eb513d

Please sign in to comment.