Skip to content

Commit

Permalink
compute: simplify mz_join_core result computation
Browse files Browse the repository at this point in the history
Previously, the mz_join_core code wrapped the `result` closure into
another `work_result` closure, to deal with the fact that matches from
the `todo1` list had their value and diff fields swapped.

Instead of having a closure to unswap the fields, we can simply pass
them to the `Deferred` constructor in the correct order instead. That
is, the `Deferred` constructor should always receive the cursor/storage
from input 1 first and from input 2 second. If we make this change, no
`work_result` closure is necessary and the code become easier to reason
about.
  • Loading branch information
teskje committed Jun 2, 2023
1 parent ee878f6 commit 870de8c
Showing 1 changed file with 45 additions and 55 deletions.
100 changes: 45 additions & 55 deletions src/compute/src/render/join/mz_join_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,10 @@ where
trace2.cursor_through(acknowledged2.borrow()).unwrap();
let batch1_cursor = batch1.cursor();
todo1.push_back(Deferred::new(
trace2_cursor,
trace2_storage,
batch1_cursor,
batch1.clone(),
trace2_cursor,
trace2_storage,
capability.clone(),
));
}
Expand Down Expand Up @@ -273,27 +273,13 @@ where
// which results in unintentionally quadratic processing time (each batch of either
// input must scan all batches from the other input).

let mut work_result = |k: &Tr1::Key,
v1: &Tr1::Val,
v2: &Tr2::Val,
t: &G::Timestamp,
r1: &Tr1::R,
r2: &Tr2::R| {
let t = t.clone();
let r = r1.clone().multiply(r2);
result(k, v1, v2)
.into_iter()
.map(move |d| (d, t.clone(), r.clone()))
};

// Perform some amount of outstanding work.
let mut fuel = 1_000_000;
while !todo1.is_empty() && fuel > 0 {
todo1.front_mut().unwrap().work(
output,
|k, v2, v1, t, r2, r1| work_result(k, v1, v2, t, r1, r2),
&mut fuel,
);
todo1
.front_mut()
.unwrap()
.work(output, &mut result, &mut fuel);
if !todo1.front().unwrap().work_remains() {
todo1.pop_front();
}
Expand All @@ -305,7 +291,7 @@ where
todo2
.front_mut()
.unwrap()
.work(output, &mut work_result, &mut fuel);
.work(output, &mut result, &mut fuel);
if !todo2.front().unwrap().work_remains() {
todo2.pop_front();
}
Expand Down Expand Up @@ -373,10 +359,10 @@ where
C1: Cursor<Key = Row, Val = Row, Time = T, R = Diff>,
C2: Cursor<Key = Row, Val = Row, Time = T, R = Diff>,
{
trace: C1,
trace_storage: C1::Storage,
batch: C2,
batch_storage: C2::Storage,
cursor1: C1,
storage1: C1::Storage,
cursor2: C2,
storage2: C2::Storage,
capability: Capability<T>,
done: bool,
temp: Vec<(D, T, Diff)>,
Expand All @@ -390,17 +376,17 @@ where
D: Data,
{
fn new(
trace: C1,
trace_storage: C1::Storage,
batch: C2,
batch_storage: C2::Storage,
cursor1: C1,
storage1: C1::Storage,
cursor2: C2,
storage2: C2::Storage,
capability: Capability<T>,
) -> Self {
Deferred {
trace,
trace_storage,
batch,
batch_storage,
cursor1,
storage1,
cursor2,
storage2,
capability,
done: false,
temp: Vec::new(),
Expand All @@ -415,46 +401,50 @@ where
fn work<L, I>(
&mut self,
output: &mut OutputHandle<T, (D, T, Diff), Tee<T, (D, T, Diff)>>,
mut logic: L,
mut result: L,
fuel: &mut usize,
) where
I: IntoIterator<Item = (D, T, Diff)>,
L: FnMut(&C1::Key, &C1::Val, &C2::Val, &T, &C1::R, &C2::R) -> I,
I: IntoIterator<Item = D>,
L: FnMut(&C1::Key, &C1::Val, &C2::Val) -> I,
{
let meet = self.capability.time();

let mut session = output.session(&self.capability);

let trace_storage = &self.trace_storage;
let batch_storage = &self.batch_storage;
let storage1 = &self.storage1;
let storage2 = &self.storage2;

let trace = &mut self.trace;
let batch = &mut self.batch;
let cursor1 = &mut self.cursor1;
let cursor2 = &mut self.cursor2;

let temp = &mut self.temp;

while batch.key_valid(batch_storage) && trace.key_valid(trace_storage) {
match trace.key(trace_storage).cmp(batch.key(batch_storage)) {
Ordering::Less => trace.seek_key(trace_storage, batch.key(batch_storage)),
Ordering::Greater => batch.seek_key(batch_storage, trace.key(trace_storage)),
while cursor2.key_valid(storage2) && cursor1.key_valid(storage1) {
match cursor1.key(storage1).cmp(cursor2.key(storage2)) {
Ordering::Less => cursor1.seek_key(storage1, cursor2.key(storage2)),
Ordering::Greater => cursor2.seek_key(storage2, cursor1.key(storage1)),
Ordering::Equal => {
assert_eq!(temp.len(), 0);

// Populate `temp` with the results, as long as fuel remains.
let key = batch.key(batch_storage);
while let Some(val1) = trace.get_val(trace_storage) {
while let Some(val2) = batch.get_val(batch_storage) {
trace.map_times(trace_storage, |time1, diff1| {
let key = cursor2.key(storage2);
while let Some(val1) = cursor1.get_val(storage1) {
while let Some(val2) = cursor2.get_val(storage2) {
cursor1.map_times(storage1, |time1, diff1| {
let time1 = time1.join(meet);
batch.map_times(batch_storage, |time2, diff2| {
cursor2.map_times(storage2, |time2, diff2| {
let time = time1.join(time2);
temp.extend(logic(key, val1, val2, &time, diff1, diff2))
let diff = diff1.multiply(diff2);
let results = result(key, val1, val2)
.into_iter()
.map(|d| (d, time.clone(), diff.clone()));
temp.extend(results);
});
});
batch.step_val(batch_storage);
cursor2.step_val(storage2);
}
batch.rewind_vals(batch_storage);
trace.step_val(trace_storage);
cursor2.rewind_vals(storage2);
cursor1.step_val(storage1);

// TODO: This consolidation is optional, and it may not be very
// helpful. We might try harder to understand whether we
Expand All @@ -472,8 +462,8 @@ where
}
}

batch.step_key(batch_storage);
trace.step_key(trace_storage);
cursor2.step_key(storage2);
cursor1.step_key(storage1);
}
}
}
Expand Down

0 comments on commit 870de8c

Please sign in to comment.