diff --git a/src/compute/src/expiration.rs b/src/compute/src/expiration.rs new file mode 100644 index 0000000000000..2be2283549f8c --- /dev/null +++ b/src/compute/src/expiration.rs @@ -0,0 +1,50 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! Helper functions to detect expired frontiers. + +use differential_dataflow::{AsCollection, Collection}; +use timely::dataflow::operators::InspectCore; +use timely::dataflow::{Scope, StreamCore}; +use timely::progress::frontier::AntichainRef; +use timely::Container; + +/// Panics if the frontier of a [`StreamCore`] exceeds a given `expiration` time. +pub fn expire_stream_at( + stream: &StreamCore, + expiration: G::Timestamp, +) -> StreamCore +where + G: Scope, + G::Timestamp: timely::progress::Timestamp, + D: Container, +{ + stream.inspect_container(move |data_or_frontier| { + if let Err(frontier) = data_or_frontier { + assert!( + frontier.is_empty() || AntichainRef::new(frontier).less_than(&expiration), + "frontier {frontier:?} has exceeded expiration {expiration:?}!", + ); + } + }) +} + +/// Wrapper around [`expire_stream_at`] for a [`Collection`]. +pub fn expire_collection_at( + collection: &Collection, + expiration: G::Timestamp, +) -> Collection +where + G: Scope, + G::Timestamp: timely::progress::Timestamp, + D: Clone + 'static, + R: Clone + 'static, +{ + expire_stream_at(&collection.inner, expiration).as_collection() +} diff --git a/src/compute/src/lib.rs b/src/compute/src/lib.rs index f62b5b84d587d..cac0590eb9e5f 100644 --- a/src/compute/src/lib.rs +++ b/src/compute/src/lib.rs @@ -13,6 +13,7 @@ pub(crate) mod arrangement; pub mod compute_state; +pub(crate) mod expiration; pub(crate) mod extensions; pub(crate) mod logging; pub(crate) mod metrics; diff --git a/src/compute/src/render.rs b/src/compute/src/render.rs index 7a0e0db48cb64..16c239f6f5557 100644 --- a/src/compute/src/render.rs +++ b/src/compute/src/render.rs @@ -127,6 +127,7 @@ use mz_repr::{Datum, GlobalId, Row, SharedRow}; use mz_storage_operators::persist_source; use mz_storage_types::controller::CollectionMetadata; use mz_storage_types::errors::DataflowError; +use mz_storage_types::sources::Timeline; use mz_timely_util::operator::CollectionExt; use timely::communication::Allocate; use timely::container::columnation::Columnation; @@ -144,6 +145,7 @@ use timely::PartialOrder; use crate::arrangement::manager::TraceBundle; use crate::compute_state::ComputeState; +use crate::expiration::expire_stream_at; use crate::extensions::arrange::{KeyCollection, MzArrange}; use crate::extensions::reduce::MzReduce; use crate::logging::compute::LogDataflowErrors; @@ -195,6 +197,13 @@ pub fn build_compute_dataflow( .map(|(sink_id, sink)| (*sink_id, dataflow.depends_on(sink.from), sink.clone())) .collect::>(); + let expire_at = compute_state + .replica_expiration + .map(Antichain::from_elem) + .unwrap_or_default(); + + let until = dataflow.until.meet(&expire_at); + let worker_logging = timely_worker.log_register().get("timely"); let name = format!("Dataflow: {}", &dataflow.debug_name); @@ -228,7 +237,7 @@ pub fn build_compute_dataflow( source.storage_metadata.clone(), dataflow.as_of.clone(), SnapshotMode::Include, - dataflow.until.clone(), + until.clone(), mfp.as_mut(), compute_state.dataflow_max_inflight_bytes(), start_signal.clone(), @@ -269,8 +278,13 @@ pub fn build_compute_dataflow( // in order to support additional timestamp coordinates for iteration. if recursive { scope.clone().iterative::, _, _>(|region| { - let mut context = - Context::for_dataflow_in(&dataflow, region.clone(), compute_state); + let mut context = Context::for_dataflow_in( + &dataflow, + region.clone(), + compute_state, + until, + expire_at, + ); for (id, (oks, errs)) in imported_sources.into_iter() { let bundle = crate::render::CollectionBundle::from_collections( @@ -338,8 +352,13 @@ pub fn build_compute_dataflow( }); } else { scope.clone().region_named(&build_name, |region| { - let mut context = - Context::for_dataflow_in(&dataflow, region.clone(), compute_state); + let mut context = Context::for_dataflow_in( + &dataflow, + region.clone(), + compute_state, + until, + expire_at, + ); for (id, (oks, errs)) in imported_sources.into_iter() { let bundle = crate::render::CollectionBundle::from_collections( @@ -499,6 +518,13 @@ where match bundle.arrangement(&idx.key) { Some(ArrangementFlavor::Local(oks, errs)) => { + // Ensure that the frontier does not advance past the expiration time, if set. + // Otherwise, we might write down incorrect data. + if let Some(&expiration) = self.expire_at.as_option() { + oks.expire_at(expiration); + expire_stream_at(&errs.stream, expiration); + } + // Obtain a specialized handle matching the specialized arrangement. let oks_trace = oks.trace_handle(); @@ -566,13 +592,21 @@ where match bundle.arrangement(&idx.key) { Some(ArrangementFlavor::Local(oks, errs)) => { let oks = self.dispatch_rearrange_iterative(oks, "Arrange export iterative"); - let oks_trace = oks.trace_handle(); let errs = errs .as_collection(|k, v| (k.clone(), v.clone())) .leave() .mz_arrange("Arrange export iterative err"); + // Ensure that the frontier does not advance past the expiration time, if set. + // Otherwise, we might write down incorrect data. + if let Some(&expiration) = self.expire_at.as_option() { + oks.expire_at(expiration); + expire_stream_at(&errs.stream, expiration); + } + + let oks_trace = oks.trace_handle(); + // Attach logging of dataflow errors. if let Some(logger) = compute_state.compute_logger.clone() { errs.stream.log_dataflow_errors(logger, idx_id); diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index 3e8be584c94b1..1a4ae1d2c20dd 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -37,10 +37,11 @@ use timely::dataflow::scopes::Child; use timely::dataflow::{Scope, ScopeParent}; use timely::progress::timestamp::Refines; use timely::progress::{Antichain, Timestamp}; -use tracing::error; +use tracing::{debug, error}; use crate::arrangement::manager::SpecializedTraceHandle; use crate::compute_state::{ComputeState, HydrationEvent}; +use crate::expiration::expire_stream_at; use crate::extensions::arrange::{KeyCollection, MzArrange}; use crate::render::errors::ErrorLogger; use crate::render::{LinearJoinSpec, RenderTimestamp}; @@ -87,6 +88,9 @@ where pub(super) hydration_logger: Option, /// Specification for rendering linear joins. pub(super) linear_join_spec: LinearJoinSpec, + /// The expiration time for data in this context. The output's frontier should never advance + /// past this frontier, except the empty frontier. + pub expire_at: Antichain, } impl Context @@ -98,6 +102,8 @@ where dataflow: &DataflowDescription, scope: S, compute_state: &ComputeState, + until: Antichain, + expire_at: Antichain, ) -> Self { use mz_ore::collections::CollectionExt as IteratorExt; let dataflow_id = *scope.addr().into_first(); @@ -118,16 +124,22 @@ where }) }; + debug!( + "Dataflow {dataflow_id}: timeline: {:?}, expire_at: {expire_at:?}, until: {until:?}", + dataflow.timeline + ); + Self { scope, debug_name: dataflow.debug_name.clone(), dataflow_id, as_of_frontier, - until: dataflow.until.clone(), + until, bindings: BTreeMap::new(), shutdown_token: Default::default(), hydration_logger, linear_join_spec: compute_state.linear_join_spec, + expire_at, } } } @@ -209,6 +221,7 @@ where hydration_logger: self.hydration_logger.clone(), linear_join_spec: self.linear_join_spec.clone(), bindings, + expire_at: self.expire_at.clone(), } } } @@ -299,6 +312,14 @@ where } } + pub fn expire_at(&self, expiration: S::Timestamp) { + match self { + MzArrangement::RowRow(inner) => { + expire_stream_at(&inner.stream, expiration); + } + } + } + /// Brings the underlying arrangement into a region. pub fn enter_region<'a>( &self, diff --git a/src/compute/src/render/sinks.rs b/src/compute/src/render/sinks.rs index 08b77708bdf27..b6ff96c89b15a 100644 --- a/src/compute/src/render/sinks.rs +++ b/src/compute/src/render/sinks.rs @@ -29,6 +29,7 @@ use timely::dataflow::Scope; use timely::progress::Antichain; use crate::compute_state::SinkToken; +use crate::expiration::expire_collection_at; use crate::logging::compute::LogDataflowErrors; use crate::render::context::Context; use crate::render::{RenderTimestamp, StartSignal}; @@ -91,6 +92,13 @@ where let mut ok_collection = ok_collection.leave(); let mut err_collection = err_collection.leave(); + // Ensure that the frontier does not advance past the expiration time, if set. Otherwise, + // we might write down incorrect data. + if let Some(&expiration) = self.expire_at.as_option() { + ok_collection = expire_collection_at(&ok_collection, expiration); + err_collection = expire_collection_at(&err_collection, expiration); + } + let non_null_assertions = sink.non_null_assertions.clone(); let from_desc = sink.from_desc.clone(); if !non_null_assertions.is_empty() {