Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only decode rows up to demand #30808

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 19 additions & 10 deletions src/compute/src/render/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,6 +356,7 @@ where
key: Option<Row>,
mut logic: L,
refuel: usize,
max_demand: usize,
) -> timely::dataflow::Stream<S, I::Item>
where
T: Timestamp + Lattice + Columnation,
Expand All @@ -373,8 +374,9 @@ where
key,
move |k, v, t, d| {
let mut datums_borrow = datums.borrow();
datums_borrow.extend(k.to_datum_iter());
datums_borrow.extend(v.to_datum_iter());
datums_borrow.extend(k.to_datum_iter().take(max_demand));
let max_demand = max_demand.saturating_sub(datums_borrow.len());
datums_borrow.extend(v.to_datum_iter().take(max_demand));
logic(&mut datums_borrow, t, d)
},
refuel,
Expand Down Expand Up @@ -467,6 +469,7 @@ where
key: Option<Row>,
mut logic: L,
refuel: usize,
max_demand: usize,
) -> timely::dataflow::Stream<S, I::Item>
where
I: IntoIterator<Item = (D, S::Timestamp, Diff)>,
Expand All @@ -486,8 +489,9 @@ where
key,
move |k, v, t, d| {
let mut datums_borrow = datums.borrow();
datums_borrow.extend(k.to_datum_iter());
datums_borrow.extend(v.to_datum_iter());
datums_borrow.extend(k.to_datum_iter().take(max_demand));
let max_demand = max_demand.saturating_sub(datums_borrow.len());
datums_borrow.extend(v.to_datum_iter().take(max_demand));
logic(&mut datums_borrow, t, d)
},
refuel,
Expand Down Expand Up @@ -567,6 +571,7 @@ where
pub fn flat_map<D, I, C, L>(
&self,
key: Option<Row>,
max_demand: usize,
constructor: C,
) -> (
timely::dataflow::Stream<S, I::Item>,
Expand All @@ -586,13 +591,13 @@ where
match &self {
ArrangementFlavor::Local(oks, errs) => {
let logic = constructor();
let oks = oks.flat_map(key, logic, refuel);
let oks = oks.flat_map(key, logic, refuel, max_demand);
let errs = errs.as_collection(|k, &()| k.clone());
(oks, errs)
}
ArrangementFlavor::Trace(_, oks, errs) => {
let logic = constructor();
let oks = oks.flat_map(key, logic, refuel);
let oks = oks.flat_map(key, logic, refuel, max_demand);
let errs = errs.as_collection(|k, &()| k.clone());
(oks, errs)
}
Expand Down Expand Up @@ -803,6 +808,7 @@ where
pub fn flat_map<D, I, C, L>(
&self,
key_val: Option<(Vec<MirScalarExpr>, Option<Row>)>,
max_demand: usize,
constructor: C,
) -> (
timely::dataflow::Stream<S, I::Item>,
Expand All @@ -821,7 +827,7 @@ where
let flavor = self
.arrangement(&key)
.expect("Should have ensured during planning that this arrangement exists.");
flavor.flat_map(val, constructor)
flavor.flat_map(val, max_demand, constructor)
} else {
use timely::dataflow::operators::Map;
let (oks, errs) = self
Expand All @@ -831,8 +837,9 @@ where
let mut logic = constructor();
let mut datums = DatumVec::new();
(
oks.inner
.flat_map(move |(v, t, d)| logic(&mut datums.borrow_with(&v), &t, &d)),
oks.inner.flat_map(move |(v, t, d)| {
logic(&mut datums.borrow_with_limit(&v, max_demand), &t, &d)
}),
errs,
)
}
Expand Down Expand Up @@ -938,6 +945,8 @@ where
Collection<S, mz_repr::Row, Diff>,
Collection<S, DataflowError, Diff>,
) {
let max_demand = mfp.demand().iter().max().map(|x| *x + 1).unwrap_or(0);
mfp.permute_fn(|c| c, max_demand);
mfp.optimize();
let mfp_plan = mfp.into_plan().unwrap();

Expand All @@ -956,7 +965,7 @@ where
let key = key_val.map(|(k, _v)| k);
return self.as_specific_collection(key.as_deref());
}
let (stream, errors) = self.flat_map(key_val, || {
let (stream, errors) = self.flat_map(key_val, max_demand, || {
let mut datum_vec = DatumVec::new();
// Wrap in an `Rc` so that lifetimes work out.
let until = std::rc::Rc::new(until);
Expand Down
47 changes: 24 additions & 23 deletions src/compute/src/render/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,27 +87,26 @@ where
} = key_val_plan;
let key_arity = key_plan.projection.len();
let mut datums = DatumVec::new();
let (key_val_input, err_input): (
timely::dataflow::Stream<_, (Result<(Row, Row), DataflowError>, _, _)>,
_,
) = input
.enter_region(inner)
.flat_map(input_key.map(|k| (k, None)), || {
// Determine the columns we'll need from the row.
let mut demand = Vec::new();
demand.extend(key_plan.demand());
demand.extend(val_plan.demand());
demand.sort();
demand.dedup();
// remap column references to the subset we use.
let mut demand_map = BTreeMap::new();
for column in demand.iter() {
demand_map.insert(*column, demand_map.len());
}
let demand_map_len = demand_map.len();
key_plan.permute_fn(|c| demand_map[&c], demand_map_len);
val_plan.permute_fn(|c| demand_map[&c], demand_map_len);
let skips = mz_compute_types::plan::reduce::convert_indexes_to_skips(demand);
// Determine the columns we'll need from the row.
let mut demand = Vec::new();
demand.extend(key_plan.demand());
demand.extend(val_plan.demand());
demand.sort();
demand.dedup();
// remap column references to the subset we use.
let mut demand_map = BTreeMap::new();
for column in demand.iter() {
demand_map.insert(*column, demand_map.len());
}
let demand_map_len = demand_map.len();
key_plan.permute_fn(|c| demand_map[&c], demand_map_len);
val_plan.permute_fn(|c| demand_map[&c], demand_map_len);
let max_demand = demand.iter().max().map(|x| *x + 1).unwrap_or(0);
let skips = mz_compute_types::plan::reduce::convert_indexes_to_skips(demand);
let (key_val_input, err_input) = input.enter_region(inner).flat_map(
input_key.map(|k| (k, None)),
max_demand,
|| {
move |row_datums, time, diff| {
let binding = SharedRow::get();
let mut row_builder = binding.borrow_mut();
Expand Down Expand Up @@ -152,12 +151,14 @@ where
let row = row_builder.clone();
Some((Ok((key, row)), time.clone(), diff.clone()))
}
});
},
);

// Demux out the potential errors from key and value selector evaluation.
type CB<T> = ConsolidatingContainerBuilder<T>;
let (ok, mut err) = key_val_input
.as_collection()
.flat_map_fallible::<ConsolidatingContainerBuilder<_>, ConsolidatingContainerBuilder<_>, _, _, _, _>("OkErrDemux", Some);
.flat_map_fallible::<CB<_>, CB<_>, _, _, _, _>("OkErrDemux", Some);

err = err.concat(&err_input);

Expand Down
12 changes: 12 additions & 0 deletions src/repr/src/datum_vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,18 @@ impl DatumVec {
borrow.extend(row.iter());
borrow
}

/// Borrow an instance with a specific lifetime, and pre-populate with a `Row` with up to
/// `limit` elements.
pub fn borrow_with_limit<'a>(
&'a mut self,
row: &'a RowRef,
limit: usize,
) -> DatumVecBorrow<'a> {
let mut borrow = self.borrow();
borrow.extend(row.iter().take(limit));
borrow
}
}

/// A borrowed allocation of `Datum` with a specific lifetime.
Expand Down
Loading