Skip to content

Commit

Permalink
Duplication really sucks
Browse files Browse the repository at this point in the history
  • Loading branch information
Kixiron committed Apr 10, 2021
1 parent 99509e7 commit 9f563be
Showing 1 changed file with 175 additions and 41 deletions.
216 changes: 175 additions & 41 deletions rust/template/differential_datalog/src/program/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1283,52 +1283,186 @@ impl Program {
T: Lattice + Timestamp,
Lookup: Fn(RelId) -> Option<Collection<S, DDValue, Weight>>,
{
if let XFormCollection::StreamXForm {
description,
xform,
next,
} = xform
{
let xformed = col
.scope()
.scoped::<AltNeu<S::Timestamp>, _, _>(description, |inner| {
let d_col = with_prof_context(
format!("differentiate stream before: {}", description).as_ref(),
|| col.differentiate(inner),
);

fn dummy_lookup_collection<S: Scope>(
_: RelId,
) -> Option<Collection<S, DDValue, Weight>> {
None
}
match *xform {
XFormCollection::Arrange {
ref description,
afun,
ref next,
} => {
let arr = with_prof_context(&description, || col.flat_map(afun).arrange_by_key());
Self::xform_arrangement(&arr, &*next, arrangements, lookup_collection)
}
XFormCollection::Differentiate {
ref description,
ref next,
} => {
#[allow(clippy::unnecessary_cast)]
let one = Any::downcast_ref::<<S::Timestamp as Timestamp>::Summary>(&(1 as TS))
.expect("Differentiate operator used in recursive context");

// We must call the streamless variant within the nested scope
// otherwise we force rustc to instantiate an infinitely long type
// since the function calls itself (a potentially infinite number of times),
// each requiring further nesting of the scopes (and their types)
let xformed = Self::streamless_xform_collection::<
Child<S, AltNeu<S::Timestamp>>,
S::Timestamp,
_,
>(
d_col,
&*xform,
&Arrangements {
arrangements: &FnvHashMap::default(),
},
dummy_lookup_collection,
);
let diff = with_prof_context(&description, || {
col.concat(
&col.delay(move |t| one.results_in(t).expect("Integer overflow in Differentiate: maximal number of transactions exceeded")).negate())
});

with_prof_context(
format!("integrate stream after: {}", description).as_ref(),
|| xformed.integrate(),
Self::xform_collection(diff, &*next, arrangements, lookup_collection)
}
XFormCollection::Map {
ref description,
mfun,
ref next,
} => {
let mapped = with_prof_context(&description, || col.map(mfun));
Self::xform_collection(mapped, &*next, arrangements, lookup_collection)
}
XFormCollection::FlatMap {
ref description,
fmfun,
ref next,
} => {
let flattened = with_prof_context(&description, || {
col.flat_map(move |x| fmfun(x).into_iter().flatten())
});
Self::xform_collection(flattened, &*next, arrangements, lookup_collection)
}
XFormCollection::Filter {
ref description,
ffun,
ref next,
} => {
let filtered = with_prof_context(&description, || col.filter(ffun));
Self::xform_collection(filtered, &*next, arrangements, lookup_collection)
}
XFormCollection::FilterMap {
ref description,
fmfun,
ref next,
} => {
let flattened = with_prof_context(&description, || col.flat_map(fmfun));
Self::xform_collection(flattened, &*next, arrangements, lookup_collection)
}
XFormCollection::Inspect {
ref description,
ifun,
ref next,
} => {
let inspect = with_prof_context(&description, || {
col.inspect(move |(v, ts, w)| ifun(v, ts.to_tuple_ts(), *w))
});
Self::xform_collection(inspect, &*next, arrangements, lookup_collection)
}
XFormCollection::StreamJoin {
ref description,
afun,
arrangement,
jfun,
ref next,
} => {
let join = with_prof_context(&description, || {
// arrange input collection
let collection_with_keys = col.flat_map(afun);
let arr = match arrangements.lookup_arr(arrangement) {
ArrangementFlavor::Local(DataflowArrangement::Map(arranged)) => arranged,
ArrangementFlavor::Local(DataflowArrangement::Set(_)) => {
panic!("StreamJoin: not a map arrangement {:?}", arrangement)
}
_ => panic!("StreamJoin in nested scope: {}", description),
};
lookup_map(
&collection_with_keys,
arr,
|(k, _), key| *key = k.clone(),
move |v1, w1, v2, w2| (jfun(&v1.1, v2), w1 * w2),
().into_ddvalue(),
().into_ddvalue(),
().into_ddvalue(),
)
// Filter out `None`'s.
// FIXME: We wouldn't need this if `lookup_map` allowed `output_func`
// to return `Option`.
.flat_map(|v| v)
});
Self::xform_collection(join, &*next, arrangements, lookup_collection)
}
XFormCollection::StreamSemijoin {
ref description,
afun,
arrangement,
jfun,
ref next,
} => {
let join = with_prof_context(&description, || {
// arrange input collection
let collection_with_keys = col.flat_map(afun);
let arr = match arrangements.lookup_arr(arrangement) {
ArrangementFlavor::Local(DataflowArrangement::Set(arranged)) => arranged,
ArrangementFlavor::Local(DataflowArrangement::Map(_)) => {
panic!("StreamSemijoin: not a set arrangement {:?}", arrangement)
}
_ => panic!("StreamSemijoin in nested scope: {}", description),
};
lookup_map(
&collection_with_keys,
arr,
|(k, _), key| *key = k.clone(),
move |v1, w1, _, w2| (jfun(&v1.1), w1 * w2),
().into_ddvalue(),
().into_ddvalue(),
().into_ddvalue(),
)
// Filter out `None`'s.
// FIXME: We wouldn't need this if `lookup_map` allowed `output_func`
// to return `Option`.
.flat_map(|v| v)
});
Self::xform_collection(join, &*next, arrangements, lookup_collection)
}

Self::xform_collection(xformed, &*next, arrangements, lookup_collection)
} else {
Self::streamless_xform_collection_ref(col, xform, arrangements, lookup_collection)
XFormCollection::StreamXForm {
ref description,
ref xform,
ref next,
} => {
let xformed =
col.scope()
.scoped::<AltNeu<S::Timestamp>, _, _>(description, |inner| {
let d_col = with_prof_context(
format!("differentiate stream before: {}", description).as_ref(),
|| col.differentiate(inner),
);

fn dummy_lookup_collection<S: Scope>(
_: RelId,
) -> Option<Collection<S, DDValue, Weight>>
{
None
}

// We must call the streamless variant within the nested scope
// otherwise we force rustc to instantiate an infinitely long type
// since the function calls itself (a potentially infinite number of times),
// each requiring further nesting of the scopes (and their types)
let xformed = Self::streamless_xform_collection::<
Child<S, AltNeu<S::Timestamp>>,
S::Timestamp,
_,
>(
d_col,
&*xform,
&Arrangements {
arrangements: &FnvHashMap::default(),
},
dummy_lookup_collection,
);

with_prof_context(
format!("integrate stream after: {}", description).as_ref(),
|| xformed.integrate(),
)
});

Self::xform_collection(xformed, &*next, arrangements, lookup_collection)
}
}
}

Expand Down

0 comments on commit 9f563be

Please sign in to comment.