Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support dataflow expiration for materialized views with REFRESH #29971

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 19 additions & 15 deletions doc/developer/design/20240919_dataflow_expiration.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ else:
# The `upper` for a dataflow considering all its transitive inputs
inputs_upper := meet(for all inputs i: i_upper)

# Compute the upper of replica_expiration and any set refresh schedule
# TODO: expiration_upper is only available in the optimizer, but replica_expiration
# is only available in cluster.
fn expiration_upper(obj, replica_expiration) :=
match obj:
MV(inputs, refresh schedule) => refresh_schedule.round_up_timestamp(meet(for all o in inputs: expiration_upper(o, replica_expiration)))
Index(inputs) | Subscribe(inputs) | ? => meet(for all o in inputs: expiration_upper(o, replica_expiration))
Table | Source => 1s.round_up_timestamp(expiration_time)

# Dataflow expiration logic
if compute_replica_expiration_offset is not set:
dataflow_replication := []
Expand All @@ -78,31 +87,26 @@ else for dataflows of type in [materialized view, index, subscribe]:
# Dataflows that do not depend on any source or table are not in the
# EpochMilliseconds timeline
dataflow_expiration := []
else if refresh_interval set in any transitive dependency of dataflow:
dataflow_expiration := []
else if inputs_upper == []:
dataflow_expiration := []
else if inputs_upper > expiration:
dataflow_expiration := inputs_upper
else:
dataflow_expiration := replica_expiration
replica_expiration_upper := expiration_upper(dataflow, replica_expiration)
dataflow_expiration := join(replica_expiration_upper, inputs_upper)

dataflow_until := dataflow_until.meet(dataflow_expiration)
```

Note that we only consider dataflows representing materialized views, indexes,
and subscribes. These are long-running dataflows that maintain state during
Note that only dataflows representing materialized views, indexes,
and subscribes are considered. These are long-running dataflows that maintain state during
their lifetime. Other dataflows such as peeks are transient and do not need to
explicitly drop retraction diffs.

More concretely, we make the following changes:
More concretely, this feature involves the following changes:

* Introduce a new dyncfg `compute_replica_expiration_offset`.
* If the offset is configured with a non-zero value, compute
`replica_expiration = now() + offset`. This value specifies the maximum
time for which the replica is expected to be running. Consequently, diffs
associated with timestamps beyond this limit do not have to be stored and can
be dropped.
`replica_expiration = now() + offset`. This value indicates the maximum
time for which the replica is expected to be running.
* Existing replicas should be restarted to enable or disable replica
expiration for them.
* When building a dataflow, compute `dataflow_expiration` as per the logic
described above. If non-empty, the `dataflow_expiration` is added to the
dataflow `until` that ensures that any diff beyond this limit is dropped in
Expand All @@ -118,7 +122,7 @@ More concretely, we make the following changes:
## Open Questions

- What is the appropriate default expiration time?
- Given that we currently restart replicas every week as part of the DB release
- Given that replicas are restarted every week as part of the DB release
and leaving some buffer for skipped week, 3 weeks (+1 day margin) seems like
a good limit to start with.

Expand Down
41 changes: 26 additions & 15 deletions src/adapter/src/catalog/dataflow_expiration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,24 +5,35 @@

//! Helper function for dataflow expiration checks.

use mz_repr::GlobalId;

use crate::catalog::Catalog;
use mz_compute_types::dataflows::{RefreshDep, RefreshDepIndex};
use mz_repr::GlobalId;

impl Catalog {
/// Whether the catalog entry `id` or any of its transitive dependencies is a materialized view
/// with a refresh schedule. Used to disable dataflow expiration if found.
pub(crate) fn item_has_transitive_refresh_schedule(&self, id: GlobalId) -> bool {
let test_has_transitive_refresh_schedule = |dep: GlobalId| -> bool {
if let Some(mv) = self.get_entry(&dep).materialized_view() {
return mv.refresh_schedule.is_some();
/// Recursive function.
pub(crate) fn get_refresh_dependencies(
&self,
deps: impl Iterator<Item = GlobalId>,
deps_tree: &mut Vec<RefreshDep>,
) -> Option<RefreshDepIndex> {
let mut local_deps = Vec::new();
for dep in deps {
let entry = self.get_entry(&dep);
let refresh_dep_index =
self.get_refresh_dependencies(entry.uses().into_iter(), deps_tree);
let refresh_schedule = entry
.materialized_view()
.and_then(|mv| mv.refresh_schedule.clone());
if refresh_dep_index.is_some() || refresh_schedule.is_some() {
local_deps.push(RefreshDep {
refresh_dep_index,
refresh_schedule,
});
}
false
};
test_has_transitive_refresh_schedule(id)
|| self
.state()
.transitive_uses(id)
.any(test_has_transitive_refresh_schedule)
}
let start = deps_tree.len();
deps_tree.extend(local_deps);
let end = deps_tree.len();
(end > start).then_some(RefreshDepIndex { start, end })
}
}
4 changes: 0 additions & 4 deletions src/adapter/src/coord/sequencer/inner/create_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,6 @@ impl Coordinator {

// Collect properties for `DataflowExpirationDesc`.
let transitive_upper = self.least_valid_write(&id_bundle);
let has_transitive_refresh_schedule = self.catalog.item_has_transitive_refresh_schedule(on);

// Pre-allocate a vector of transient GlobalIds for each notice.
let notice_ids = std::iter::repeat_with(|| self.allocate_transient_id())
Expand Down Expand Up @@ -477,9 +476,6 @@ impl Coordinator {
df_desc.set_as_of(since);

df_desc.dataflow_expiration_desc.transitive_upper = Some(transitive_upper);
df_desc
.dataflow_expiration_desc
.has_transitive_refresh_schedule = has_transitive_refresh_schedule;

coord
.ship_dataflow_and_notice_builtin_table_updates(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -575,11 +575,10 @@ impl Coordinator {

// Collect properties for `DataflowExpirationDesc`.
let transitive_upper = self.least_valid_write(&id_bundle);
let has_transitive_refresh_schedule = refresh_schedule.is_some()
|| raw_expr
.depends_on()
.into_iter()
.any(|id| self.catalog.item_has_transitive_refresh_schedule(id));
let mut refresh_deps = Vec::new();
let refresh_deps_index = self
.catalog
.get_refresh_dependencies(raw_expr.depends_on().into_iter(), &mut refresh_deps);

let read_holds_owned;
let read_holds = if let Some(txn_reads) = self.txn_read_holds.get(session.conn_id()) {
Expand Down Expand Up @@ -675,9 +674,8 @@ impl Coordinator {
df_desc.until = until;

df_desc.dataflow_expiration_desc.transitive_upper = Some(transitive_upper);
df_desc
.dataflow_expiration_desc
.has_transitive_refresh_schedule = has_transitive_refresh_schedule;
df_desc.dataflow_expiration_desc.refresh_deps = refresh_deps;
df_desc.dataflow_expiration_desc.refresh_deps_index = refresh_deps_index;

let storage_metadata = coord.catalog.state().storage_metadata();

Expand Down
8 changes: 0 additions & 8 deletions src/adapter/src/coord/sequencer/inner/subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,6 @@ impl Coordinator {
cluster_id,
plan:
plan::SubscribePlan {
from,
copy_to,
emit_progress,
output,
Expand All @@ -339,10 +338,6 @@ impl Coordinator {

// Collect properties for `DataflowExpirationDesc`.
let transitive_upper = self.least_valid_write(&id_bundle);
let has_transitive_refresh_schedule = from
.depends_on()
.into_iter()
.any(|id| self.catalog.item_has_transitive_refresh_schedule(id));

let sink_id = global_lir_plan.sink_id();

Expand All @@ -366,9 +361,6 @@ impl Coordinator {
let (mut df_desc, df_meta) = global_lir_plan.unapply();

df_desc.dataflow_expiration_desc.transitive_upper = Some(transitive_upper);
df_desc
.dataflow_expiration_desc
.has_transitive_refresh_schedule = has_transitive_refresh_schedule;

// Emit notices.
self.emit_optimizer_notices(ctx.session(), &df_meta.optimizer_notices);
Expand Down
13 changes: 12 additions & 1 deletion src/compute-types/src/dataflows.proto
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,21 @@ message ProtoDataflowDescription {
sinks.ProtoComputeSinkDesc sink_desc = 2;
}

message ProtoRefreshDepIndex {
uint64 start = 1;
uint64 end = 2;
}

message ProtoRefreshDep {
optional ProtoRefreshDepIndex refresh_dep_index = 1;
optional mz_repr.refresh_schedule.ProtoRefreshSchedule refresh_schedule = 2;
}

message ProtoDataflowExpirationDesc {
optional mz_repr.antichain.ProtoU64Antichain transitive_upper = 1;
bool has_transitive_refresh_schedule = 2;
bool is_timeline_epoch_ms = 3;
repeated ProtoRefreshDep refresh_deps = 4;
optional ProtoRefreshDepIndex refresh_deps_index = 5;
}

repeated ProtoSourceImport source_imports = 1;
Expand Down
Loading
Loading