Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Cross join via batch broadcast #358

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions examples/broadcast_cross_join.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
extern crate timely;
extern crate differential_dataflow;

use std::ops::Mul;

use timely::Data;
use timely::dataflow::channels::pact::Pipeline;
use timely::dataflow::operators::broadcast::Broadcast;
use timely::dataflow::operators::Operator;
use timely::dataflow::{Scope, Stream};
use timely::order::TotalOrder;
use differential_dataflow::{Collection, AsCollection};
use differential_dataflow::difference::Semigroup;
use differential_dataflow::input::Input;
use differential_dataflow::lattice::Lattice;
use differential_dataflow::operators::arrange::{Arrange, Arranged};
use differential_dataflow::trace::{Cursor, BatchReader, TraceReader};
use differential_dataflow::trace::implementations::ord::OrdKeySpineAbomArc;

// This function is supposed to do one half of a cross join but its implementation is currently
// incorrect
// TODO: actually implement a half cross join
fn half_cross_join<G, Tr1, Key2, R2, Batch2>(
left: Arranged<G, Tr1>,
right: &Stream<G, Batch2>,
) -> Collection<G, (Tr1::Key, Key2), <R2 as Mul<Tr1::R>>::Output>
where
G: Scope,
G::Timestamp: Lattice + TotalOrder + Ord,
Tr1: TraceReader<Time = G::Timestamp> + Clone + 'static,
Tr1::Key: Clone,
Tr1::R: Clone,
Batch2: BatchReader<Key2, (), G::Timestamp, R2> + Data,
Key2: Clone + 'static,
R2: Semigroup + Clone + Mul<Tr1::R>,
<R2 as Mul<Tr1::R>>::Output: Semigroup,
{
let mut trace = left.trace;
right.unary(Pipeline, "CrossJoin", move |_cap, _info| {
let mut vector = Vec::new();
move |input, output| {
while let Some((time, data)) = input.next() {
data.swap(&mut vector);
for batch in vector.drain(..) {
let mut cursor = batch.cursor();
while let Some(key1) = cursor.get_key(&batch) {
let (mut trace_cursor, trace_storage) = trace.cursor();
cursor.map_times(&batch, |time1, diff1| {
while let Some(key2) = trace_cursor.get_key(&trace_storage) {
trace_cursor.map_times(&trace_storage, |time2, diff2| {
let effect_time = std::cmp::max(time1.clone(), time2.clone());
let cap_time = time.delayed(&effect_time);
let diff = diff1.clone().mul(diff2.clone());
let mut session = output.session(&cap_time);
session.give((((key2.clone(), key1.clone())), effect_time, diff));
});
trace_cursor.step_key(&trace_storage);
}
});
cursor.step_key(&batch);
}
}
}
}
})
.as_collection()
}

fn main() {
timely::execute_from_args(::std::env::args(), move |worker| {
let worker_idx = worker.index();
let (mut handle1, mut handle2, probe) = worker.dataflow::<u64, _, _>(|scope| {
let (handle1, input1) = scope.new_collection();
let (handle2, input2) = scope.new_collection();

let arranged1 = input1.arrange::<OrdKeySpineAbomArc<_, _, _>>();
let arranged2 = input2.arrange::<OrdKeySpineAbomArc<_, _, _>>();

let batches1 = arranged1.stream.broadcast();
let batches2 = arranged2.stream.broadcast();

// Changes from input1 need to be joined with the per-worker arrangement state of input2
let cross1 = half_cross_join(arranged2, &batches1);

// Changes from input2 need to be joined with the per-worker arrangement state of input1
let cross2 = half_cross_join(arranged1, &batches2);

// The final cross join is the combination of these two
let cross_join = cross1.map(|(key1, key2)| (key2, key1)).concat(&cross2);

let probe = cross_join
.inspect(move |d| {
println!("worker {} produced: {:?}", worker_idx, d);
})
.probe();

(handle1, handle2, probe)
});

handle1.insert(1i64);
handle1.advance_to(1);
handle1.insert(2);
handle1.advance_to(2);
handle1.flush();

handle2.insert("apple".to_string());
handle2.advance_to(1);
handle2.insert("orange".to_string());
handle2.advance_to(2);
handle2.flush();

while probe.less_than(handle1.time()) {
worker.step();
}
}).unwrap();
}
5 changes: 4 additions & 1 deletion src/trace/implementations/ord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use trace::layers::Builder as TrieBuilder;
use trace::layers::Cursor as TrieCursor;
use trace::layers::ordered::{OrdOffset, OrderedLayer, OrderedBuilder, OrderedCursor};
use trace::layers::ordered_leaf::{OrderedLeaf, OrderedLeafBuilder};
use trace::abomonated_arc_blanket_impls::AbomArc;
use trace::{Batch, BatchReader, Builder, Merger, Cursor};
use trace::description::Description;

Expand All @@ -46,6 +47,8 @@ pub type OrdKeySpine<K, T, R, O=usize> = Spine<K, (), T, R, Rc<OrdKeyBatch<K, T,
/// A trace implementation for empty values using a spine of abomonated ordered lists.
pub type OrdKeySpineAbom<K, T, R, O=usize> = Spine<K, (), T, R, Rc<Abomonated<OrdKeyBatch<K, T, R, O>, Vec<u8>>>>;

/// A trace implementation for empty values using a spine of atomic reference counted abomonated ordered lists.
pub type OrdKeySpineAbomArc<K, T, R, O = usize> = Spine<K, (), T, R, AbomArc<OrdKeyBatch<K, T, R, O>>>;

/// An immutable collection of update tuples, from a contiguous interval of logical times.
#[derive(Debug, Abomonation)]
Expand Down Expand Up @@ -396,7 +399,7 @@ where


/// An immutable collection of update tuples, from a contiguous interval of logical times.
#[derive(Debug, Abomonation)]
#[derive(Debug, Clone, Abomonation)]
pub struct OrdKeyBatch<K, T, R, O=usize>
where
K: Ord,
Expand Down
162 changes: 162 additions & 0 deletions src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -426,6 +426,168 @@ pub mod rc_blanket_impls {
}
}

/// Blanket implementations for atomic reference counted batches.
pub mod abomonated_arc_blanket_impls {
use std::io::Write;
use std::mem;
use std::ops::Deref;
use std::sync::Arc;

use abomonation::Abomonation;

/// Wrapper over Arc that can be safely Abomonated.
pub enum AbomArc<T> {
/// An Arc that has been constructed normally
Owned(Arc<T>),
/// The result of decoding an abomonated AbomArc
Abomonated(Box<T>),
}

impl<T> AbomArc<T> {
fn new(inner: T) -> Self {
Self::Owned(Arc::new(inner))
}
}

impl<T: Clone> Clone for AbomArc<T> {
fn clone(&self) -> Self {
match self {
Self::Owned(arc) => Self::Owned(Arc::clone(arc)),
Self::Abomonated(val) => Self::Owned(Arc::new(T::clone(&*val))),
}
}
}

impl<T> Deref for AbomArc<T> {
type Target = T;
fn deref(&self) -> &Self::Target {
match self {
Self::Owned(arc) => &**arc,
Self::Abomonated(val) => &*val,
}
}
}

impl<T: Abomonation> Abomonation for AbomArc<T> {
unsafe fn entomb<W: Write>(&self, bytes: &mut W) -> std::io::Result<()> {
bytes.write_all(std::slice::from_raw_parts(mem::transmute(&**self), mem::size_of::<T>()))?;
(**self).entomb(bytes)
}
unsafe fn exhume<'a,'b>(&'a mut self, bytes: &'b mut [u8]) -> Option<&'b mut [u8]> {
let binary_len = mem::size_of::<T>();
if binary_len > bytes.len() {
None
} else {
let (mine, rest) = bytes.split_at_mut(binary_len);
let mut value = Box::from_raw(mine.as_mut_ptr() as *mut T);
let rest = (*value).exhume(rest)?;
std::ptr::write(self, Self::Abomonated(value));
Some(rest)
}
}
fn extent(&self) -> usize {
mem::size_of::<T>() + (&**self).extent()
}
}

use timely::progress::{Antichain, frontier::AntichainRef};
use super::{Batch, BatchReader, Batcher, Builder, Merger, Cursor, Description};

impl<K, V, T, R, B: BatchReader<K,V,T,R>> BatchReader<K,V,T,R> for AbomArc<B> {

/// The type used to enumerate the batch's contents.
type Cursor = ArcBatchCursor<K, V, T, R, B>;
/// Acquires a cursor to the batch's contents.
fn cursor(&self) -> Self::Cursor {
ArcBatchCursor::new((&**self).cursor())
}

/// The number of updates in the batch.
fn len(&self) -> usize { (&**self).len() }
/// Describes the times of the updates in the batch.
fn description(&self) -> &Description<T> { (&**self).description() }
}

/// Wrapper to provide cursor to nested scope.
pub struct ArcBatchCursor<K, V, T, R, B: BatchReader<K, V, T, R>> {
phantom: ::std::marker::PhantomData<(K, V, T, R)>,
cursor: B::Cursor,
}

impl<K, V, T, R, B: BatchReader<K, V, T, R>> ArcBatchCursor<K, V, T, R, B> {
fn new(cursor: B::Cursor) -> Self {
ArcBatchCursor {
cursor,
phantom: ::std::marker::PhantomData,
}
}
}

impl<K, V, T, R, B: BatchReader<K, V, T, R>> Cursor<K, V, T, R> for ArcBatchCursor<K, V, T, R, B> {

type Storage = AbomArc<B>;

#[inline] fn key_valid(&self, storage: &Self::Storage) -> bool { self.cursor.key_valid(storage) }
#[inline] fn val_valid(&self, storage: &Self::Storage) -> bool { self.cursor.val_valid(storage) }

#[inline] fn key<'a>(&self, storage: &'a Self::Storage) -> &'a K { self.cursor.key(storage) }
#[inline] fn val<'a>(&self, storage: &'a Self::Storage) -> &'a V { self.cursor.val(storage) }

#[inline]
fn map_times<L: FnMut(&T, &R)>(&mut self, storage: &Self::Storage, logic: L) {
self.cursor.map_times(storage, logic)
}

#[inline] fn step_key(&mut self, storage: &Self::Storage) { self.cursor.step_key(storage) }
#[inline] fn seek_key(&mut self, storage: &Self::Storage, key: &K) { self.cursor.seek_key(storage, key) }

#[inline] fn step_val(&mut self, storage: &Self::Storage) { self.cursor.step_val(storage) }
#[inline] fn seek_val(&mut self, storage: &Self::Storage, val: &V) { self.cursor.seek_val(storage, val) }

#[inline] fn rewind_keys(&mut self, storage: &Self::Storage) { self.cursor.rewind_keys(storage) }
#[inline] fn rewind_vals(&mut self, storage: &Self::Storage) { self.cursor.rewind_vals(storage) }
}

/// An immutable collection of updates.
impl<K,V,T,R,B: Batch<K,V,T,R>> Batch<K, V, T, R> for AbomArc<B> {
type Batcher = ArcBatcher<K, V, T, R, B>;
type Builder = ArcBuilder<K, V, T, R, B>;
type Merger = ArcMerger<K, V, T, R, B>;
}

/// Wrapper type for batching reference counted batches.
pub struct ArcBatcher<K,V,T,R,B:Batch<K,V,T,R>> { batcher: B::Batcher }

/// Functionality for collecting and batching updates.
impl<K,V,T,R,B:Batch<K,V,T,R>> Batcher<K, V, T, R, AbomArc<B>> for ArcBatcher<K,V,T,R,B> {
fn new() -> Self { ArcBatcher { batcher: <B::Batcher as Batcher<K,V,T,R,B>>::new() } }
fn push_batch(&mut self, batch: &mut Vec<((K, V), T, R)>) { self.batcher.push_batch(batch) }
fn seal(&mut self, upper: Antichain<T>) -> AbomArc<B> { AbomArc::new(self.batcher.seal(upper)) }
fn frontier(&mut self) -> timely::progress::frontier::AntichainRef<T> { self.batcher.frontier() }
}

/// Wrapper type for building reference counted batches.
pub struct ArcBuilder<K,V,T,R,B:Batch<K,V,T,R>> { builder: B::Builder }

/// Functionality for building batches from ordered update sequences.
impl<K,V,T,R,B:Batch<K,V,T,R>> Builder<K, V, T, R, AbomArc<B>> for ArcBuilder<K,V,T,R,B> {
fn new() -> Self { ArcBuilder { builder: <B::Builder as Builder<K,V,T,R,B>>::new() } }
fn with_capacity(cap: usize) -> Self { ArcBuilder { builder: <B::Builder as Builder<K,V,T,R,B>>::with_capacity(cap) } }
fn push(&mut self, element: (K, V, T, R)) { self.builder.push(element) }
fn done(self, lower: Antichain<T>, upper: Antichain<T>, since: Antichain<T>) -> AbomArc<B> { AbomArc::new(self.builder.done(lower, upper, since)) }
}

/// Wrapper type for merging reference counted batches.
pub struct ArcMerger<K,V,T,R,B:Batch<K,V,T,R>> { merger: B::Merger }

/// Represents a merge in progress.
impl<K,V,T,R,B:Batch<K,V,T,R>> Merger<K, V, T, R, AbomArc<B>> for ArcMerger<K,V,T,R,B> {
fn new(source1: &AbomArc<B>, source2: &AbomArc<B>, compaction_frontier: Option<AntichainRef<T>>) -> Self { ArcMerger { merger: B::begin_merge(source1, source2, compaction_frontier) } }
fn work(&mut self, source1: &AbomArc<B>, source2: &AbomArc<B>, fuel: &mut isize) { self.merger.work(source1, source2, fuel) }
fn done(self) -> AbomArc<B> { AbomArc::new(self.merger.done()) }
}
}


/// Blanket implementations for reference counted batches.
pub mod abomonated_blanket_impls {
Expand Down