Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

compute: simplify mz_join_core result computation #19667

Merged
merged 1 commit into from
Jun 14, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 45 additions & 55 deletions src/compute/src/render/join/mz_join_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,10 @@ where
trace2.cursor_through(acknowledged2.borrow()).unwrap();
let batch1_cursor = batch1.cursor();
todo1.push_back(Deferred::new(
trace2_cursor,
trace2_storage,
batch1_cursor,
batch1.clone(),
trace2_cursor,
trace2_storage,
capability.clone(),
));
}
Expand Down Expand Up @@ -273,27 +273,13 @@ where
// which results in unintentionally quadratic processing time (each batch of either
// input must scan all batches from the other input).

let mut work_result = |k: &Tr1::Key,
v1: &Tr1::Val,
v2: &Tr2::Val,
t: &G::Timestamp,
r1: &Tr1::R,
r2: &Tr2::R| {
let t = t.clone();
let r = r1.clone().multiply(r2);
result(k, v1, v2)
.into_iter()
.map(move |d| (d, t.clone(), r.clone()))
};

// Perform some amount of outstanding work.
let mut fuel = 1_000_000;
while !todo1.is_empty() && fuel > 0 {
todo1.front_mut().unwrap().work(
output,
|k, v2, v1, t, r2, r1| work_result(k, v1, v2, t, r1, r2),
&mut fuel,
);
todo1
.front_mut()
.unwrap()
.work(output, &mut result, &mut fuel);
if !todo1.front().unwrap().work_remains() {
todo1.pop_front();
}
Expand All @@ -305,7 +291,7 @@ where
todo2
.front_mut()
.unwrap()
.work(output, &mut work_result, &mut fuel);
.work(output, &mut result, &mut fuel);
if !todo2.front().unwrap().work_remains() {
todo2.pop_front();
}
Expand Down Expand Up @@ -373,10 +359,10 @@ where
C1: Cursor<Key = Row, Val = Row, Time = T, R = Diff>,
C2: Cursor<Key = Row, Val = Row, Time = T, R = Diff>,
{
trace: C1,
trace_storage: C1::Storage,
batch: C2,
batch_storage: C2::Storage,
cursor1: C1,
storage1: C1::Storage,
cursor2: C2,
storage2: C2::Storage,
capability: Capability<T>,
done: bool,
temp: Vec<(D, T, Diff)>,
Expand All @@ -390,17 +376,17 @@ where
D: Data,
{
fn new(
trace: C1,
trace_storage: C1::Storage,
batch: C2,
batch_storage: C2::Storage,
cursor1: C1,
storage1: C1::Storage,
cursor2: C2,
storage2: C2::Storage,
capability: Capability<T>,
) -> Self {
Deferred {
trace,
trace_storage,
batch,
batch_storage,
cursor1,
storage1,
cursor2,
storage2,
capability,
done: false,
temp: Vec::new(),
Expand All @@ -415,46 +401,50 @@ where
fn work<L, I>(
&mut self,
output: &mut OutputHandle<T, (D, T, Diff), Tee<T, (D, T, Diff)>>,
mut logic: L,
mut result: L,
fuel: &mut usize,
) where
I: IntoIterator<Item = (D, T, Diff)>,
L: FnMut(&C1::Key, &C1::Val, &C2::Val, &T, &C1::R, &C2::R) -> I,
I: IntoIterator<Item = D>,
L: FnMut(&C1::Key, &C1::Val, &C2::Val) -> I,
{
let meet = self.capability.time();

let mut session = output.session(&self.capability);

let trace_storage = &self.trace_storage;
let batch_storage = &self.batch_storage;
let storage1 = &self.storage1;
let storage2 = &self.storage2;

let trace = &mut self.trace;
let batch = &mut self.batch;
let cursor1 = &mut self.cursor1;
let cursor2 = &mut self.cursor2;

let temp = &mut self.temp;

while batch.key_valid(batch_storage) && trace.key_valid(trace_storage) {
match trace.key(trace_storage).cmp(batch.key(batch_storage)) {
Ordering::Less => trace.seek_key(trace_storage, batch.key(batch_storage)),
Ordering::Greater => batch.seek_key(batch_storage, trace.key(trace_storage)),
while cursor1.key_valid(storage1) && cursor2.key_valid(storage2) {
match cursor1.key(storage1).cmp(cursor2.key(storage2)) {
Ordering::Less => cursor1.seek_key(storage1, cursor2.key(storage2)),
Ordering::Greater => cursor2.seek_key(storage2, cursor1.key(storage1)),
Ordering::Equal => {
assert_eq!(temp.len(), 0);

// Populate `temp` with the results, as long as fuel remains.
let key = batch.key(batch_storage);
while let Some(val1) = trace.get_val(trace_storage) {
while let Some(val2) = batch.get_val(batch_storage) {
trace.map_times(trace_storage, |time1, diff1| {
let key = cursor2.key(storage2);
while let Some(val1) = cursor1.get_val(storage1) {
while let Some(val2) = cursor2.get_val(storage2) {
cursor1.map_times(storage1, |time1, diff1| {
let time1 = time1.join(meet);
batch.map_times(batch_storage, |time2, diff2| {
cursor2.map_times(storage2, |time2, diff2| {
let time = time1.join(time2);
temp.extend(logic(key, val1, val2, &time, diff1, diff2))
let diff = diff1.multiply(diff2);
let results = result(key, val1, val2)
.into_iter()
.map(|d| (d, time.clone(), diff.clone()));
temp.extend(results);
});
});
batch.step_val(batch_storage);
cursor2.step_val(storage2);
}
batch.rewind_vals(batch_storage);
trace.step_val(trace_storage);
cursor1.step_val(storage1);
cursor2.rewind_vals(storage2);

// TODO: This consolidation is optional, and it may not be very
// helpful. We might try harder to understand whether we
Expand All @@ -472,8 +462,8 @@ where
}
}

batch.step_key(batch_storage);
trace.step_key(trace_storage);
cursor1.step_key(storage1);
cursor2.step_key(storage2);
}
}
}
Expand Down