From b83c65aa2298af2092a5ffc913553461910186b8 Mon Sep 17 00:00:00 2001 From: Siddhartha Sahu Date: Thu, 19 Sep 2024 14:06:09 -0400 Subject: [PATCH] compute: configure dataflows to respect the expiration time --- src/compute/src/expiration.rs | 50 +++++++++++++++++++++++++++++++ src/compute/src/lib.rs | 1 + src/compute/src/render.rs | 46 ++++++++++++++++++++++++---- src/compute/src/render/context.rs | 25 ++++++++++++++-- src/compute/src/render/sinks.rs | 8 +++++ 5 files changed, 122 insertions(+), 8 deletions(-) create mode 100644 src/compute/src/expiration.rs diff --git a/src/compute/src/expiration.rs b/src/compute/src/expiration.rs new file mode 100644 index 0000000000000..2be2283549f8c --- /dev/null +++ b/src/compute/src/expiration.rs @@ -0,0 +1,50 @@ +// Copyright Materialize, Inc. and contributors. All rights reserved. +// +// Use of this software is governed by the Business Source License +// included in the LICENSE file. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0. + +//! Helper functions to detect expired frontiers. + +use differential_dataflow::{AsCollection, Collection}; +use timely::dataflow::operators::InspectCore; +use timely::dataflow::{Scope, StreamCore}; +use timely::progress::frontier::AntichainRef; +use timely::Container; + +/// Panics if the frontier of a [`StreamCore`] exceeds a given `expiration` time. +pub fn expire_stream_at( + stream: &StreamCore, + expiration: G::Timestamp, +) -> StreamCore +where + G: Scope, + G::Timestamp: timely::progress::Timestamp, + D: Container, +{ + stream.inspect_container(move |data_or_frontier| { + if let Err(frontier) = data_or_frontier { + assert!( + frontier.is_empty() || AntichainRef::new(frontier).less_than(&expiration), + "frontier {frontier:?} has exceeded expiration {expiration:?}!", + ); + } + }) +} + +/// Wrapper around [`expire_stream_at`] for a [`Collection`]. +pub fn expire_collection_at( + collection: &Collection, + expiration: G::Timestamp, +) -> Collection +where + G: Scope, + G::Timestamp: timely::progress::Timestamp, + D: Clone + 'static, + R: Clone + 'static, +{ + expire_stream_at(&collection.inner, expiration).as_collection() +} diff --git a/src/compute/src/lib.rs b/src/compute/src/lib.rs index f62b5b84d587d..cac0590eb9e5f 100644 --- a/src/compute/src/lib.rs +++ b/src/compute/src/lib.rs @@ -13,6 +13,7 @@ pub(crate) mod arrangement; pub mod compute_state; +pub(crate) mod expiration; pub(crate) mod extensions; pub(crate) mod logging; pub(crate) mod metrics; diff --git a/src/compute/src/render.rs b/src/compute/src/render.rs index 25f760e8755c8..e41c94ca33fbc 100644 --- a/src/compute/src/render.rs +++ b/src/compute/src/render.rs @@ -127,6 +127,7 @@ use mz_repr::{Datum, GlobalId, Row, SharedRow}; use mz_storage_operators::persist_source; use mz_storage_types::controller::CollectionMetadata; use mz_storage_types::errors::DataflowError; +use mz_storage_types::sources::Timeline; use mz_timely_util::operator::CollectionExt; use timely::communication::Allocate; use timely::container::columnation::Columnation; @@ -144,6 +145,7 @@ use timely::PartialOrder; use crate::arrangement::manager::TraceBundle; use crate::compute_state::ComputeState; +use crate::expiration::expire_stream_at; use crate::extensions::arrange::{KeyCollection, MzArrange}; use crate::extensions::reduce::MzReduce; use crate::logging::compute::LogDataflowErrors; @@ -197,6 +199,13 @@ pub fn build_compute_dataflow( .map(|(sink_id, sink)| (*sink_id, dataflow.depends_on(sink.from), sink.clone())) .collect::>(); + let expire_at = compute_state + .replica_expiration + .map(Antichain::from_elem) + .unwrap_or_default(); + + let until = dataflow.until.meet(&expire_at); + let worker_logging = timely_worker.log_register().get("timely"); let name = format!("Dataflow: {}", &dataflow.debug_name); @@ -241,7 +250,7 @@ pub fn build_compute_dataflow( source.storage_metadata.clone(), dataflow.as_of.clone(), snapshot_mode, - dataflow.until.clone(), + until.clone(), mfp.as_mut(), compute_state.dataflow_max_inflight_bytes(), start_signal.clone(), @@ -296,8 +305,13 @@ pub fn build_compute_dataflow( // in order to support additional timestamp coordinates for iteration. if recursive { scope.clone().iterative::, _, _>(|region| { - let mut context = - Context::for_dataflow_in(&dataflow, region.clone(), compute_state); + let mut context = Context::for_dataflow_in( + &dataflow, + region.clone(), + compute_state, + until, + expire_at, + ); for (id, (oks, errs)) in imported_sources.into_iter() { let bundle = crate::render::CollectionBundle::from_collections( @@ -366,8 +380,13 @@ pub fn build_compute_dataflow( }); } else { scope.clone().region_named(&build_name, |region| { - let mut context = - Context::for_dataflow_in(&dataflow, region.clone(), compute_state); + let mut context = Context::for_dataflow_in( + &dataflow, + region.clone(), + compute_state, + until, + expire_at, + ); for (id, (oks, errs)) in imported_sources.into_iter() { let bundle = crate::render::CollectionBundle::from_collections( @@ -528,6 +547,13 @@ where match bundle.arrangement(&idx.key) { Some(ArrangementFlavor::Local(oks, errs)) => { + // Ensure that the frontier does not advance past the expiration time, if set. + // Otherwise, we might write down incorrect data. + if let Some(&expiration) = self.expire_at.as_option() { + oks.expire_at(expiration); + expire_stream_at(&errs.stream, expiration); + } + // Obtain a specialized handle matching the specialized arrangement. let oks_trace = oks.trace_handle(); @@ -595,13 +621,21 @@ where match bundle.arrangement(&idx.key) { Some(ArrangementFlavor::Local(oks, errs)) => { let oks = self.dispatch_rearrange_iterative(oks, "Arrange export iterative"); - let oks_trace = oks.trace_handle(); let errs = errs .as_collection(|k, v| (k.clone(), v.clone())) .leave() .mz_arrange("Arrange export iterative err"); + // Ensure that the frontier does not advance past the expiration time, if set. + // Otherwise, we might write down incorrect data. + if let Some(&expiration) = self.expire_at.as_option() { + oks.expire_at(expiration); + expire_stream_at(&errs.stream, expiration); + } + + let oks_trace = oks.trace_handle(); + // Attach logging of dataflow errors. if let Some(logger) = compute_state.compute_logger.clone() { errs.stream.log_dataflow_errors(logger, idx_id); diff --git a/src/compute/src/render/context.rs b/src/compute/src/render/context.rs index 3e8be584c94b1..1a4ae1d2c20dd 100644 --- a/src/compute/src/render/context.rs +++ b/src/compute/src/render/context.rs @@ -37,10 +37,11 @@ use timely::dataflow::scopes::Child; use timely::dataflow::{Scope, ScopeParent}; use timely::progress::timestamp::Refines; use timely::progress::{Antichain, Timestamp}; -use tracing::error; +use tracing::{debug, error}; use crate::arrangement::manager::SpecializedTraceHandle; use crate::compute_state::{ComputeState, HydrationEvent}; +use crate::expiration::expire_stream_at; use crate::extensions::arrange::{KeyCollection, MzArrange}; use crate::render::errors::ErrorLogger; use crate::render::{LinearJoinSpec, RenderTimestamp}; @@ -87,6 +88,9 @@ where pub(super) hydration_logger: Option, /// Specification for rendering linear joins. pub(super) linear_join_spec: LinearJoinSpec, + /// The expiration time for data in this context. The output's frontier should never advance + /// past this frontier, except the empty frontier. + pub expire_at: Antichain, } impl Context @@ -98,6 +102,8 @@ where dataflow: &DataflowDescription, scope: S, compute_state: &ComputeState, + until: Antichain, + expire_at: Antichain, ) -> Self { use mz_ore::collections::CollectionExt as IteratorExt; let dataflow_id = *scope.addr().into_first(); @@ -118,16 +124,22 @@ where }) }; + debug!( + "Dataflow {dataflow_id}: timeline: {:?}, expire_at: {expire_at:?}, until: {until:?}", + dataflow.timeline + ); + Self { scope, debug_name: dataflow.debug_name.clone(), dataflow_id, as_of_frontier, - until: dataflow.until.clone(), + until, bindings: BTreeMap::new(), shutdown_token: Default::default(), hydration_logger, linear_join_spec: compute_state.linear_join_spec, + expire_at, } } } @@ -209,6 +221,7 @@ where hydration_logger: self.hydration_logger.clone(), linear_join_spec: self.linear_join_spec.clone(), bindings, + expire_at: self.expire_at.clone(), } } } @@ -299,6 +312,14 @@ where } } + pub fn expire_at(&self, expiration: S::Timestamp) { + match self { + MzArrangement::RowRow(inner) => { + expire_stream_at(&inner.stream, expiration); + } + } + } + /// Brings the underlying arrangement into a region. pub fn enter_region<'a>( &self, diff --git a/src/compute/src/render/sinks.rs b/src/compute/src/render/sinks.rs index aecf6d43eadee..820be38e0b2d5 100644 --- a/src/compute/src/render/sinks.rs +++ b/src/compute/src/render/sinks.rs @@ -29,6 +29,7 @@ use timely::dataflow::Scope; use timely::progress::Antichain; use crate::compute_state::SinkToken; +use crate::expiration::expire_collection_at; use crate::logging::compute::LogDataflowErrors; use crate::render::context::Context; use crate::render::{RenderTimestamp, StartSignal}; @@ -92,6 +93,13 @@ where let mut ok_collection = ok_collection.leave(); let mut err_collection = err_collection.leave(); + // Ensure that the frontier does not advance past the expiration time, if set. Otherwise, + // we might write down incorrect data. + if let Some(&expiration) = self.expire_at.as_option() { + ok_collection = expire_collection_at(&ok_collection, expiration); + err_collection = expire_collection_at(&err_collection, expiration); + } + let non_null_assertions = sink.non_null_assertions.clone(); let from_desc = sink.from_desc.clone(); if !non_null_assertions.is_empty() {