Skip to content

Commit

Permalink
compute: configure dataflows to respect the expiration time
Browse files Browse the repository at this point in the history
  • Loading branch information
sdht0 committed Sep 19, 2024
1 parent 15ef2ca commit fd3ce12
Show file tree
Hide file tree
Showing 5 changed files with 122 additions and 8 deletions.
50 changes: 50 additions & 0 deletions src/compute/src/expiration.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright Materialize, Inc. and contributors. All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

//! Helper functions to detect expired frontiers.
use differential_dataflow::{AsCollection, Collection};
use timely::dataflow::operators::InspectCore;
use timely::dataflow::{Scope, StreamCore};
use timely::progress::frontier::AntichainRef;
use timely::Container;

/// Panics if the frontier of a [`StreamCore`] exceeds a given `expiration` time.
pub fn expire_stream_at<G, D>(
stream: &StreamCore<G, D>,
expiration: G::Timestamp,
) -> StreamCore<G, D>
where
G: Scope,
G::Timestamp: timely::progress::Timestamp,
D: Container,
{
stream.inspect_container(move |data_or_frontier| {
if let Err(frontier) = data_or_frontier {
assert!(
frontier.is_empty() || AntichainRef::new(frontier).less_than(&expiration),
"frontier {frontier:?} has exceeded expiration {expiration:?}!",
);
}
})
}

/// Wrapper around [`expire_stream_at`] for a [`Collection`].
pub fn expire_collection_at<G, D, R>(
collection: &Collection<G, D, R>,
expiration: G::Timestamp,
) -> Collection<G, D, R>
where
G: Scope,
G::Timestamp: timely::progress::Timestamp,
D: Clone + 'static,
R: Clone + 'static,
{
expire_stream_at(&collection.inner, expiration).as_collection()
}
1 change: 1 addition & 0 deletions src/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
pub(crate) mod arrangement;
pub mod compute_state;
pub(crate) mod expiration;
pub(crate) mod extensions;
pub(crate) mod logging;
pub(crate) mod metrics;
Expand Down
46 changes: 40 additions & 6 deletions src/compute/src/render.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ use mz_repr::{Datum, GlobalId, Row, SharedRow};
use mz_storage_operators::persist_source;
use mz_storage_types::controller::CollectionMetadata;
use mz_storage_types::errors::DataflowError;
use mz_storage_types::sources::Timeline;
use mz_timely_util::operator::CollectionExt;
use timely::communication::Allocate;
use timely::container::columnation::Columnation;
Expand All @@ -144,6 +145,7 @@ use timely::PartialOrder;

use crate::arrangement::manager::TraceBundle;
use crate::compute_state::ComputeState;
use crate::expiration::expire_stream_at;
use crate::extensions::arrange::{KeyCollection, MzArrange};
use crate::extensions::reduce::MzReduce;
use crate::logging::compute::LogDataflowErrors;
Expand Down Expand Up @@ -195,6 +197,13 @@ pub fn build_compute_dataflow<A: Allocate>(
.map(|(sink_id, sink)| (*sink_id, dataflow.depends_on(sink.from), sink.clone()))
.collect::<Vec<_>>();

let expire_at = compute_state
.replica_expiration
.map(Antichain::from_elem)
.unwrap_or_default();

let until = dataflow.until.meet(&expire_at);

let worker_logging = timely_worker.log_register().get("timely");

let name = format!("Dataflow: {}", &dataflow.debug_name);
Expand Down Expand Up @@ -228,7 +237,7 @@ pub fn build_compute_dataflow<A: Allocate>(
source.storage_metadata.clone(),
dataflow.as_of.clone(),
SnapshotMode::Include,
dataflow.until.clone(),
until.clone(),
mfp.as_mut(),
compute_state.dataflow_max_inflight_bytes(),
start_signal.clone(),
Expand Down Expand Up @@ -269,8 +278,13 @@ pub fn build_compute_dataflow<A: Allocate>(
// in order to support additional timestamp coordinates for iteration.
if recursive {
scope.clone().iterative::<PointStamp<u64>, _, _>(|region| {
let mut context =
Context::for_dataflow_in(&dataflow, region.clone(), compute_state);
let mut context = Context::for_dataflow_in(
&dataflow,
region.clone(),
compute_state,
until,
expire_at,
);

for (id, (oks, errs)) in imported_sources.into_iter() {
let bundle = crate::render::CollectionBundle::from_collections(
Expand Down Expand Up @@ -338,8 +352,13 @@ pub fn build_compute_dataflow<A: Allocate>(
});
} else {
scope.clone().region_named(&build_name, |region| {
let mut context =
Context::for_dataflow_in(&dataflow, region.clone(), compute_state);
let mut context = Context::for_dataflow_in(
&dataflow,
region.clone(),
compute_state,
until,
expire_at,
);

for (id, (oks, errs)) in imported_sources.into_iter() {
let bundle = crate::render::CollectionBundle::from_collections(
Expand Down Expand Up @@ -499,6 +518,13 @@ where

match bundle.arrangement(&idx.key) {
Some(ArrangementFlavor::Local(oks, errs)) => {
// Ensure that the frontier does not advance past the expiration time, if set.
// Otherwise, we might write down incorrect data.
if let Some(&expiration) = self.expire_at.as_option() {
oks.expire_at(expiration);
expire_stream_at(&errs.stream, expiration);
}

// Obtain a specialized handle matching the specialized arrangement.
let oks_trace = oks.trace_handle();

Expand Down Expand Up @@ -566,13 +592,21 @@ where
match bundle.arrangement(&idx.key) {
Some(ArrangementFlavor::Local(oks, errs)) => {
let oks = self.dispatch_rearrange_iterative(oks, "Arrange export iterative");
let oks_trace = oks.trace_handle();

let errs = errs
.as_collection(|k, v| (k.clone(), v.clone()))
.leave()
.mz_arrange("Arrange export iterative err");

// Ensure that the frontier does not advance past the expiration time, if set.
// Otherwise, we might write down incorrect data.
if let Some(&expiration) = self.expire_at.as_option() {
oks.expire_at(expiration);
expire_stream_at(&errs.stream, expiration);
}

let oks_trace = oks.trace_handle();

// Attach logging of dataflow errors.
if let Some(logger) = compute_state.compute_logger.clone() {
errs.stream.log_dataflow_errors(logger, idx_id);
Expand Down
25 changes: 23 additions & 2 deletions src/compute/src/render/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,11 @@ use timely::dataflow::scopes::Child;
use timely::dataflow::{Scope, ScopeParent};
use timely::progress::timestamp::Refines;
use timely::progress::{Antichain, Timestamp};
use tracing::error;
use tracing::{debug, error};

use crate::arrangement::manager::SpecializedTraceHandle;
use crate::compute_state::{ComputeState, HydrationEvent};
use crate::expiration::expire_stream_at;
use crate::extensions::arrange::{KeyCollection, MzArrange};
use crate::render::errors::ErrorLogger;
use crate::render::{LinearJoinSpec, RenderTimestamp};
Expand Down Expand Up @@ -87,6 +88,9 @@ where
pub(super) hydration_logger: Option<HydrationLogger>,
/// Specification for rendering linear joins.
pub(super) linear_join_spec: LinearJoinSpec,
/// The expiration time for data in this context. The output's frontier should never advance
/// past this frontier, except the empty frontier.
pub expire_at: Antichain<T>,
}

impl<S: Scope> Context<S>
Expand All @@ -98,6 +102,8 @@ where
dataflow: &DataflowDescription<Plan, CollectionMetadata>,
scope: S,
compute_state: &ComputeState,
until: Antichain<mz_repr::Timestamp>,
expire_at: Antichain<mz_repr::Timestamp>,
) -> Self {
use mz_ore::collections::CollectionExt as IteratorExt;
let dataflow_id = *scope.addr().into_first();
Expand All @@ -118,16 +124,22 @@ where
})
};

debug!(
"Dataflow {dataflow_id}: timeline: {:?}, expire_at: {expire_at:?}, until: {until:?}",
dataflow.timeline
);

Self {
scope,
debug_name: dataflow.debug_name.clone(),
dataflow_id,
as_of_frontier,
until: dataflow.until.clone(),
until,
bindings: BTreeMap::new(),
shutdown_token: Default::default(),
hydration_logger,
linear_join_spec: compute_state.linear_join_spec,
expire_at,
}
}
}
Expand Down Expand Up @@ -209,6 +221,7 @@ where
hydration_logger: self.hydration_logger.clone(),
linear_join_spec: self.linear_join_spec.clone(),
bindings,
expire_at: self.expire_at.clone(),
}
}
}
Expand Down Expand Up @@ -299,6 +312,14 @@ where
}
}

pub fn expire_at(&self, expiration: S::Timestamp) {
match self {
MzArrangement::RowRow(inner) => {
expire_stream_at(&inner.stream, expiration);
}
}
}

/// Brings the underlying arrangement into a region.
pub fn enter_region<'a>(
&self,
Expand Down
8 changes: 8 additions & 0 deletions src/compute/src/render/sinks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ use timely::dataflow::Scope;
use timely::progress::Antichain;

use crate::compute_state::SinkToken;
use crate::expiration::expire_collection_at;
use crate::logging::compute::LogDataflowErrors;
use crate::render::context::Context;
use crate::render::{RenderTimestamp, StartSignal};
Expand Down Expand Up @@ -91,6 +92,13 @@ where
let mut ok_collection = ok_collection.leave();
let mut err_collection = err_collection.leave();

// Ensure that the frontier does not advance past the expiration time, if set. Otherwise,
// we might write down incorrect data.
if let Some(&expiration) = self.expire_at.as_option() {
ok_collection = expire_collection_at(&ok_collection, expiration);
err_collection = expire_collection_at(&err_collection, expiration);
}

let non_null_assertions = sink.non_null_assertions.clone();
let from_desc = sink.from_desc.clone();
if !non_null_assertions.is_empty() {
Expand Down

0 comments on commit fd3ce12

Please sign in to comment.