Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Backport 0.57.9 reverts" #20202

Closed
wants to merge 10 commits into from
16 changes: 8 additions & 8 deletions src/adapter/src/catalog/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3090,16 +3090,16 @@ heap_allocations_cte AS (
SELECT
batches_cte.operator_id,
batches_cte.worker_id,
records_cte.records,
COALESCE(records_cte.records, 0) AS records,
batches_cte.batches,
heap_size_cte.size,
heap_capacity_cte.capacity,
heap_allocations_cte.allocations
COALESCE(heap_size_cte.size, 0) AS size,
COALESCE(heap_capacity_cte.capacity, 0) AS capacity,
COALESCE(heap_allocations_cte.allocations, 0) AS allocations
FROM batches_cte
JOIN records_cte USING (operator_id, worker_id)
JOIN heap_size_cte USING (operator_id, worker_id)
JOIN heap_capacity_cte USING (operator_id, worker_id)
JOIN heap_allocations_cte USING (operator_id, worker_id)",
LEFT OUTER JOIN records_cte USING (operator_id, worker_id)
LEFT OUTER JOIN heap_size_cte USING (operator_id, worker_id)
LEFT OUTER JOIN heap_capacity_cte USING (operator_id, worker_id)
LEFT OUTER JOIN heap_allocations_cte USING (operator_id, worker_id)",
};

pub const MZ_ARRANGEMENT_SIZES: BuiltinView = BuiltinView {
Expand Down
1 change: 1 addition & 0 deletions src/adapter/src/flags.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ pub fn compute_config(config: &SystemVars) -> ComputeParameters {
ComputeParameters {
max_result_size: Some(config.max_result_size()),
dataflow_max_inflight_bytes: Some(config.dataflow_max_inflight_bytes()),
enable_arrangement_size_logging: Some(config.enable_arrangement_size_logging()),
enable_mz_join_core: Some(config.enable_mz_join_core()),
persist: persist_config(config),
tracing: tracing_config(config),
Expand Down
1 change: 1 addition & 0 deletions src/compute-client/src/protocol/command.proto
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,5 @@ message ProtoComputeParameters {
optional uint64 dataflow_max_inflight_bytes = 3;
optional bool enable_mz_join_core = 4;
mz_tracing.params.ProtoTracingParameters tracing = 5;
optional bool enable_arrangement_size_logging = 6;
}
9 changes: 9 additions & 0 deletions src/compute-client/src/protocol/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,8 @@ pub struct ComputeParameters {
pub persist: PersistParameters,
/// Tracing configuration.
pub tracing: TracingParameters,
/// Enable arrangement size logging
pub enable_arrangement_size_logging: Option<bool>,
antiguru marked this conversation as resolved.
Show resolved Hide resolved
}

impl ComputeParameters {
Expand All @@ -377,6 +379,7 @@ impl ComputeParameters {
enable_mz_join_core,
persist,
tracing,
enable_arrangement_size_logging,
} = other;

if max_result_size.is_some() {
Expand All @@ -389,6 +392,10 @@ impl ComputeParameters {
self.enable_mz_join_core = enable_mz_join_core;
}

if enable_arrangement_size_logging.is_some() {
self.enable_arrangement_size_logging = enable_arrangement_size_logging;
}

self.persist.update(persist);
self.tracing.update(tracing);
}
Expand All @@ -404,6 +411,7 @@ impl RustType<ProtoComputeParameters> for ComputeParameters {
ProtoComputeParameters {
max_result_size: self.max_result_size.into_proto(),
dataflow_max_inflight_bytes: self.dataflow_max_inflight_bytes.into_proto(),
enable_arrangement_size_logging: self.enable_arrangement_size_logging.into_proto(),
enable_mz_join_core: self.enable_mz_join_core.into_proto(),
persist: Some(self.persist.into_proto()),
tracing: Some(self.tracing.into_proto()),
Expand All @@ -414,6 +422,7 @@ impl RustType<ProtoComputeParameters> for ComputeParameters {
Ok(Self {
max_result_size: proto.max_result_size.into_rust()?,
dataflow_max_inflight_bytes: proto.dataflow_max_inflight_bytes.into_rust()?,
enable_arrangement_size_logging: proto.enable_arrangement_size_logging.into_rust()?,
enable_mz_join_core: proto.enable_mz_join_core.into_rust()?,
persist: proto
.persist
Expand Down
8 changes: 7 additions & 1 deletion src/compute/src/compute_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ pub struct ComputeState {
pub metrics: ComputeMetrics,
/// A process-global handle to tracing configuration.
pub tracing_handle: Arc<TracingHandle>,
/// Enable arrangement size logging
pub enable_arrangement_size_logging: bool,
}

impl ComputeState {
Expand Down Expand Up @@ -153,6 +155,7 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
let ComputeParameters {
max_result_size,
dataflow_max_inflight_bytes,
enable_arrangement_size_logging,
enable_mz_join_core,
persist,
tracing,
Expand All @@ -164,6 +167,9 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
if let Some(v) = dataflow_max_inflight_bytes {
self.compute_state.dataflow_max_inflight_bytes = v;
}
if let Some(v) = enable_arrangement_size_logging {
self.compute_state.enable_arrangement_size_logging = v;
}
if let Some(v) = enable_mz_join_core {
self.compute_state.linear_join_impl = match v {
false => LinearJoinImpl::DifferentialDataflow,
Expand Down Expand Up @@ -337,7 +343,7 @@ impl<'a, A: Allocate + 'static> ActiveComputeState<'a, A> {
panic!("dataflow server has already initialized logging");
}

let (logger, traces) = logging::initialize(self.timely_worker, config);
let (logger, traces) = logging::initialize(self.timely_worker, self.compute_state, config);

// Install traces as maintained indexes
for (log, trace) in traces {
Expand Down
12 changes: 6 additions & 6 deletions src/compute/src/extensions/arrange.rs
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,8 @@ where
///
/// This is unsafe because clients are required to use the trace, otherwise the arrangement size
/// logging infrastructure will cause a memory leak.
teskje marked this conversation as resolved.
Show resolved Hide resolved
pub unsafe fn inner(&self) -> Arranged<G, Tr> {
self.consume_trace();
pub unsafe fn inner(&self, enable_arrangement_size_logging: bool) -> Arranged<G, Tr> {
self.consume_trace(enable_arrangement_size_logging);
self.arranged.clone()
}

Expand All @@ -385,18 +385,18 @@ where
/// Similarly to `inner`, clients must use the trace, otherwise a memory leak can occur because
/// the arrangement size logging operator holds on the the trace.
#[must_use]
pub fn trace(&self) -> Tr {
self.consume_trace();
pub fn trace(&self, enable_arrangement_size_logging: bool) -> Tr {
antiguru marked this conversation as resolved.
Show resolved Hide resolved
self.consume_trace(enable_arrangement_size_logging);

self.arranged.trace.clone()
}

/// Indicate that something uses the trace, so we can install the arrangement size logging
/// infrastructure. This uses inner mutability to only ever install the operator once for each
antiguru marked this conversation as resolved.
Show resolved Hide resolved
/// trace.
fn consume_trace(&self) {
fn consume_trace(&self, enable_arrangement_size_logging: bool) {
let mut trace_consumed = self.trace_consumed.borrow_mut();
if !*trace_consumed {
if enable_arrangement_size_logging && !*trace_consumed {
self.log_arrangement_size();
*trace_consumed = true;
}
Expand Down
24 changes: 19 additions & 5 deletions src/compute/src/extensions/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,12 @@ where
G::Timestamp: Lattice + Ord,
{
/// Applies `reduce` to arranged data, and returns an arrangement of output data.
fn mz_reduce_abelian<L, T2>(&self, name: &str, logic: L) -> MzArranged<G, TraceAgent<T2>>
fn mz_reduce_abelian<L, T2>(
&self,
name: &str,
enable_arrangement_size_logging: bool,
logic: L,
) -> MzArranged<G, TraceAgent<T2>>
where
T2: Trace + TraceReader<Key = K, Time = G::Timestamp> + 'static,
T2::Val: Data,
Expand All @@ -49,7 +54,12 @@ where
T1: TraceReader<Key = K, Val = V, Time = G::Timestamp, R = R> + Clone + 'static,
Arranged<G, T1>: ReduceCore<G, K, V, R>,
{
fn mz_reduce_abelian<L, T2>(&self, name: &str, logic: L) -> MzArranged<G, TraceAgent<T2>>
fn mz_reduce_abelian<L, T2>(
&self,
name: &str,
enable_arrangement_size_logging: bool,
logic: L,
) -> MzArranged<G, TraceAgent<T2>>
where
T2: Trace + TraceReader<Key = K, Time = G::Timestamp> + 'static,
T2::Val: Data,
Expand All @@ -58,7 +68,7 @@ where
L: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val, T2::R)>) + 'static,
Self: ArrangementSize,
{
let inner = unsafe { self.inner() };
let inner = unsafe { self.inner(enable_arrangement_size_logging) };
// Allow access to `reduce_abelian` since we're within Mz's wrapper.
#[allow(clippy::disallowed_methods)]
inner.reduce_abelian::<_, T2>(name, logic).into()
Expand All @@ -79,6 +89,7 @@ where
&self,
name1: &str,
name2: &str,
enable_arrangement_size_logging: bool,
logic1: L1,
logic2: L2,
) -> (MzArranged<G, TraceAgent<T1>>, MzArranged<G, TraceAgent<T2>>)
Expand All @@ -105,6 +116,7 @@ where
&self,
name1: &str,
name2: &str,
enable_arrangement_size_logging: bool,
logic1: L1,
logic2: L2,
) -> (MzArranged<G, TraceAgent<T1>>, MzArranged<G, TraceAgent<T2>>)
Expand All @@ -121,8 +133,10 @@ where
L2: FnMut(&K, &[(&V, R)], &mut Vec<(T2::Val, T2::R)>) + 'static,
Self: ArrangementSize,
{
let arranged1 = self.mz_reduce_abelian::<L1, T1>(name1, logic1);
let arranged2 = self.mz_reduce_abelian::<L2, T2>(name2, logic2);
let arranged1 =
self.mz_reduce_abelian::<L1, T1>(name1, enable_arrangement_size_logging, logic1);
let arranged2 =
self.mz_reduce_abelian::<L2, T2>(name2, enable_arrangement_size_logging, logic2);
(arranged1, arranged2)
}
}
4 changes: 3 additions & 1 deletion src/compute/src/logging/compute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::rc::Rc;
use std::time::Duration;

use crate::compute_state::ComputeState;
use differential_dataflow::collection::AsCollection;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::TraceReader;
Expand Down Expand Up @@ -151,6 +152,7 @@ pub(super) fn construct<A: Allocate + 'static>(
config: &mz_compute_client::logging::LoggingConfig,
event_queue: EventQueue<ComputeEvent>,
shared_state: Rc<RefCell<SharedLoggingState>>,
compute_state: &ComputeState,
) -> BTreeMap<LogVariant, (KeysValsHandle, Rc<dyn Any>)> {
let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
let worker_id = worker.index();
Expand Down Expand Up @@ -359,7 +361,7 @@ pub(super) fn construct<A: Allocate + 'static>(
}
})
.mz_arrange::<RowSpine<_, _, _, _>>(&format!("ArrangeByKey {:?}", variant))
.trace();
.trace(compute_state.enable_arrangement_size_logging);
traces.insert(variant.clone(), (trace, Rc::clone(&token)));
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/compute/src/logging/differential.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::collections::BTreeMap;
use std::rc::Rc;
use std::time::Duration;

use crate::compute_state::ComputeState;
use differential_dataflow::collection::AsCollection;
use differential_dataflow::logging::{
BatchEvent, DifferentialEvent, DropEvent, MergeEvent, TraceShare,
Expand Down Expand Up @@ -48,6 +49,7 @@ pub(super) fn construct<A: Allocate>(
config: &mz_compute_client::logging::LoggingConfig,
event_queue: EventQueue<DifferentialEvent>,
shared_state: Rc<RefCell<SharedLoggingState>>,
compute_state: &ComputeState,
) -> BTreeMap<LogVariant, (KeysValsHandle, Rc<dyn Any>)> {
let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
let worker_id = worker.index();
Expand Down Expand Up @@ -174,7 +176,7 @@ pub(super) fn construct<A: Allocate>(
}
})
.mz_arrange::<RowSpine<_, _, _, _>>(&format!("ArrangeByKey {:?}", variant))
.trace();
.trace(compute_state.enable_arrangement_size_logging);
traces.insert(variant.clone(), (trace, Rc::clone(&token)));
}
}
Expand Down
12 changes: 11 additions & 1 deletion src/compute/src/logging/initialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use timely::logging::{Logger, TimelyEvent};
use timely::progress::reachability::logging::TrackerEvent;

use crate::arrangement::manager::TraceBundle;
use crate::compute_state::ComputeState;
use crate::extensions::arrange::{KeyCollection, MzArrange};
use crate::logging::compute::ComputeEvent;
use crate::logging::reachability::ReachabilityEvent;
Expand All @@ -32,6 +33,7 @@ use crate::logging::{BatchLogger, EventQueue, SharedLoggingState};
/// retrieving logged records.
pub fn initialize<A: Allocate + 'static>(
worker: &mut timely::worker::Worker<A>,
compute_state: &ComputeState,
config: &LoggingConfig,
) -> (super::compute::Logger, BTreeMap<LogVariant, TraceBundle>) {
let interval_ms = std::cmp::max(1, config.interval.as_millis())
Expand All @@ -49,6 +51,7 @@ pub fn initialize<A: Allocate + 'static>(
let mut context = LoggingContext {
worker,
config,
compute_state,
interval_ms,
now,
start_offset,
Expand Down Expand Up @@ -77,6 +80,7 @@ pub fn initialize<A: Allocate + 'static>(
struct LoggingContext<'a, A: Allocate> {
worker: &'a mut timely::worker::Worker<A>,
config: &'a LoggingConfig,
compute_state: &'a ComputeState,
interval_ms: u64,
now: Instant,
start_offset: Duration,
Expand All @@ -95,31 +99,37 @@ impl<A: Allocate + 'static> LoggingContext<'_, A> {
self.config,
self.t_event_queue.clone(),
Rc::clone(&self.shared_state),
self.compute_state,
));
traces.extend(super::reachability::construct(
self.worker,
self.config,
self.r_event_queue.clone(),
self.compute_state,
));
traces.extend(super::differential::construct(
self.worker,
self.config,
self.d_event_queue.clone(),
Rc::clone(&self.shared_state),
self.compute_state,
));
traces.extend(super::compute::construct(
self.worker,
self.config,
self.c_event_queue.clone(),
Rc::clone(&self.shared_state),
self.compute_state,
));

let errs = self
.worker
.dataflow_named("Dataflow: logging errors", |scope| {
let collection: KeyCollection<_, DataflowError, Diff> =
Collection::empty(scope).into();
collection.mz_arrange("Arrange logging err").trace()
collection
.mz_arrange("Arrange logging err")
.trace(self.compute_state.enable_arrangement_size_logging)
});

traces
Expand Down
4 changes: 3 additions & 1 deletion src/compute/src/logging/reachability.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use std::collections::BTreeMap;
use std::convert::TryInto;
use std::rc::Rc;

use crate::compute_state::ComputeState;
antiguru marked this conversation as resolved.
Show resolved Hide resolved
use differential_dataflow::AsCollection;
use mz_compute_client::logging::LoggingConfig;
use mz_expr::{permutation_for_arrangement, MirScalarExpr};
Expand Down Expand Up @@ -48,6 +49,7 @@ pub(super) fn construct<A: Allocate>(
worker: &mut timely::worker::Worker<A>,
config: &LoggingConfig,
event_queue: EventQueue<ReachabilityEvent>,
compute_state: &ComputeState,
) -> BTreeMap<LogVariant, (KeysValsHandle, Rc<dyn Any>)> {
let interval_ms = std::cmp::max(1, config.interval.as_millis());

Expand Down Expand Up @@ -149,7 +151,7 @@ pub(super) fn construct<A: Allocate>(

let trace = updates
.mz_arrange::<RowSpine<_, _, _, _>>(&format!("Arrange {:?}", variant))
.trace();
.trace(compute_state.enable_arrangement_size_logging);
result.insert(variant.clone(), (trace, Rc::clone(&token)));
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/compute/src/logging/timely.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use std::collections::BTreeMap;
use std::rc::Rc;
use std::time::Duration;

use crate::compute_state::ComputeState;
antiguru marked this conversation as resolved.
Show resolved Hide resolved
use differential_dataflow::collection::AsCollection;
use mz_compute_client::logging::LoggingConfig;
use mz_expr::{permutation_for_arrangement, MirScalarExpr};
Expand Down Expand Up @@ -53,6 +54,7 @@ pub(super) fn construct<A: Allocate>(
config: &LoggingConfig,
event_queue: EventQueue<TimelyEvent>,
shared_state: Rc<RefCell<SharedLoggingState>>,
compute_state: &ComputeState,
) -> BTreeMap<LogVariant, (KeysValsHandle, Rc<dyn Any>)> {
let logging_interval_ms = std::cmp::max(1, config.interval.as_millis());
let worker_id = worker.index();
Expand Down Expand Up @@ -288,7 +290,7 @@ pub(super) fn construct<A: Allocate>(
}
})
.mz_arrange::<RowSpine<_, _, _, _>>(&format!("ArrangeByKey {:?}", variant))
.trace();
.trace(compute_state.enable_arrangement_size_logging);
traces.insert(variant.clone(), (trace, Rc::clone(&token)));
}
}
Expand Down
Loading