Skip to content

Commit

Permalink
dogs^3 compaction improvement (#457)
Browse files Browse the repository at this point in the history
* Allow frontier function to populate antichain

* Clean up dependencies
  • Loading branch information
frankmcsherry authored Jan 25, 2024
1 parent dc5ebef commit e153706
Show file tree
Hide file tree
Showing 6 changed files with 10 additions and 12 deletions.
8 changes: 5 additions & 3 deletions dogsdogsdogs/examples/delta_query2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,15 @@ fn main() {
use dogsdogsdogs::operators::half_join;

// pick a frontier that will not mislead TOTAL ORDER comparisons.
let closure = |time: &Product<usize, usize>| Product::new(time.outer.saturating_sub(1), time.inner.saturating_sub(1));
let closure = |time: &Product<usize, usize>, antichain: &mut timely::progress::Antichain<Product<usize, usize>>| {
antichain.insert(Product::new(time.outer.saturating_sub(1), time.inner.saturating_sub(1)));
};

let path1 =
half_join(
&changes1,
forward2,
closure.clone(),
closure,
|t1,t2| t1.lt(t2), // This one ignores concurrent updates.
|key, val1, val2| (key.clone(), (val1.clone(), val2.clone())),
);
Expand All @@ -51,7 +53,7 @@ fn main() {
half_join(
&changes2,
forward1,
closure.clone(),
closure,
|t1,t2| t1.le(t2), // This one can "see" concurrent updates.
|key, val1, val2| (key.clone(), (val2.clone(), val1.clone())),
);
Expand Down
1 change: 0 additions & 1 deletion dogsdogsdogs/src/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use timely::dataflow::Scope;

use differential_dataflow::{ExchangeData, Collection, Hashable};
use differential_dataflow::difference::{Monoid, Multiply};
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::TraceReader;

Expand Down
10 changes: 5 additions & 5 deletions dogsdogsdogs/src/operators/half_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ where
Tr: TraceReader+Clone+'static,
R: Mul<Tr::Diff>,
<R as Mul<Tr::Diff>>::Output: Semigroup,
FF: Fn(&G::Timestamp) -> G::Timestamp + 'static,
FF: Fn(&G::Timestamp, &mut Antichain<G::Timestamp>) + 'static,
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
DOut: Clone+'static,
S: FnMut(&Tr::KeyOwned, &V, Tr::Val<'_>)->DOut+'static,
Expand Down Expand Up @@ -134,7 +134,7 @@ where
V: ExchangeData,
R: ExchangeData + Monoid,
Tr: TraceReader+Clone+'static,
FF: Fn(&G::Timestamp) -> G::Timestamp + 'static,
FF: Fn(&G::Timestamp, &mut Antichain<G::Timestamp>) + 'static,
CF: Fn(&G::Timestamp, &G::Timestamp) -> bool + 'static,
DOut: Clone+'static,
ROut: Semigroup,
Expand Down Expand Up @@ -281,10 +281,10 @@ where
// The logical merging frontier depends on both input1 and stash.
let mut frontier = timely::progress::frontier::Antichain::new();
for time in input1.frontier().frontier().iter() {
frontier.insert(frontier_func(time));
frontier_func(time, &mut frontier);
}
for key in stash.keys() {
frontier.insert(frontier_func(key.time()));
for time in stash.keys() {
frontier_func(time, &mut frontier);
}
arrangement_trace.as_mut().map(|trace| trace.set_logical_compaction(frontier.borrow()));

Expand Down
1 change: 0 additions & 1 deletion dogsdogsdogs/src/operators/lookup_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ use timely::progress::Antichain;

use differential_dataflow::{ExchangeData, Collection, AsCollection, Hashable};
use differential_dataflow::difference::{Semigroup, Monoid};
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::{Cursor, TraceReader};

Expand Down
1 change: 0 additions & 1 deletion dogsdogsdogs/src/operators/propose.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ use timely::dataflow::Scope;

use differential_dataflow::{ExchangeData, Collection, Hashable};
use differential_dataflow::difference::{Monoid, Multiply};
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::TraceReader;
use differential_dataflow::trace::cursor::MyTrait;
Expand Down
1 change: 0 additions & 1 deletion dogsdogsdogs/src/operators/validate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use timely::dataflow::Scope;

use differential_dataflow::{ExchangeData, Collection};
use differential_dataflow::difference::{Monoid, Multiply};
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::Arranged;
use differential_dataflow::trace::TraceReader;

Expand Down

0 comments on commit e153706

Please sign in to comment.