Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added named regions for standalone relations #931

Merged
merged 17 commits into from
Apr 20, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,20 @@
# Changelog

All notable changes to this project will be documented in this file.

The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

## [Unreleased]

### Changes

- Upgraded to timely dataflow and differential dataflow dependencies to v0.12.
- Worked on improving the debuggability of ddlog dataflow graphs

### Bug Fixes

- Fixed crashes due to misused unsafe code
Kixiron marked this conversation as resolved.
Show resolved Hide resolved

## [0.39.0] - April 11, 2021

### D3log
Expand Down
32 changes: 16 additions & 16 deletions rust/template/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,29 +14,15 @@ command-line = ["cmd_parser", "rustop"]
nested_ts_32 = ["differential_datalog/nested_ts_32"]
c_api = ["differential_datalog/c_api"]

[target.'cfg(not(windows))'.build-dependencies]
libtool = "0.1"

[dependencies.differential_datalog]
path = "./differential_datalog"

[dependencies.cmd_parser]
path = "./cmd_parser"
optional = true

[dependencies.ddlog_ovsdb_adapter]
path = "./ovsdb"
optional = true

[dependencies]
abomonation = "0.7"
time = { version = "0.2", features = ["serde"] }
ordered-float = { version = "2.0.0", features = ["serde"] }
cpuprofiler = { version = "0.0", optional = true }
#differential-dataflow = "0.11.0"
differential-dataflow = { git = "https://github.com/ddlog-dev/differential-dataflow", branch = "ddlog-2" }
differential-dataflow = { git = "https://github.com/ddlog-dev/differential-dataflow", branch = "ddlog-4" }
Kixiron marked this conversation as resolved.
Show resolved Hide resolved
#timely = "0.11"
timely = { git = "https://github.com/ddlog-dev/timely-dataflow", branch = "ddlog-2" }
timely = { git = "https://github.com/ddlog-dev/timely-dataflow", branch = "ddlog-4", default-features = false }
fnv = "1.0.2"
once_cell = "1.4.1"
libc = "0.2"
Expand All @@ -53,6 +39,20 @@ enum-primitive-derive = "0.2.1"
# libraries: flatbuffers "0.6" <-> FlatBuffers "1.11.0".
flatbuffers = { version = "0.6", optional = true }

[dependencies.differential_datalog]
path = "./differential_datalog"

[dependencies.cmd_parser]
path = "./cmd_parser"
optional = true

[dependencies.ddlog_ovsdb_adapter]
path = "./ovsdb"
optional = true

[target.'cfg(not(windows))'.build-dependencies]
libtool = "0.1"

[[bin]]
name = "datalog_example_cli"
path = "src/main.rs"
Expand Down
6 changes: 3 additions & 3 deletions rust/template/differential_datalog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,10 @@ c_api = []

[dependencies]
#differential-dataflow = "0.11.0"
differential-dataflow = { git = "https://github.com/ddlog-dev/differential-dataflow", branch = "ddlog-2" }
dogsdogsdogs = { git = "https://github.com/ddlog-dev/differential-dataflow", branch = "ddlog-2" }
differential-dataflow = { git = "https://github.com/ddlog-dev/differential-dataflow", branch = "ddlog-4" }
dogsdogsdogs = { git = "https://github.com/ddlog-dev/differential-dataflow", branch = "ddlog-4" }
#timely = "0.11"
timely = { git = "https://github.com/ddlog-dev/timely-dataflow", branch = "ddlog-2" }
timely = { git = "https://github.com/ddlog-dev/timely-dataflow", branch = "ddlog-4", default-features = false }

abomonation = "0.7"
ordered-float = { version = "2.0.0", features = ["serde"] }
Expand Down
99 changes: 99 additions & 0 deletions rust/template/differential_datalog/src/dataflow/arrange.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use differential_dataflow::{
difference::Semigroup,
lattice::Lattice,
operators::arrange::{Arrange, Arranged, TraceAgent},
trace::implementations::ord::{OrdKeySpine, OrdValSpine},
Collection, ExchangeData, Hashable,
};
use timely::dataflow::{
channels::pact::{Exchange, Pipeline},
Scope,
};

pub trait ArrangeByKeyExt<K, V> {
type Output;

fn arrange_by_key_exchange<F>(&self, route: F) -> Self::Output
where
F: Fn(&K, &V) -> u64 + 'static,
{
self.arrange_by_key_exchange_named("ArrangeByKeyExchange", route)
}

fn arrange_by_key_exchange_named<F>(&self, name: &str, route: F) -> Self::Output
where
F: Fn(&K, &V) -> u64 + 'static;

fn arrange_by_key_pipelined(&self) -> Self::Output {
self.arrange_by_key_pipelined_named("ArrangeByKeyPipelined")
}

fn arrange_by_key_pipelined_named(&self, name: &str) -> Self::Output;
}

impl<S, K, V, R> ArrangeByKeyExt<K, V> for Collection<S, (K, V), R>
where
S: Scope,
S::Timestamp: Lattice,
K: ExchangeData + Hashable,
V: ExchangeData,
R: Semigroup + ExchangeData,
{
#[allow(clippy::type_complexity)]
type Output = Arranged<S, TraceAgent<OrdValSpine<K, V, S::Timestamp, R>>>;

fn arrange_by_key_exchange_named<F>(&self, name: &str, route: F) -> Self::Output
where
F: Fn(&K, &V) -> u64 + 'static,
{
let exchange = Exchange::new(move |((key, value), _time, _diff)| route(key, value));
self.arrange_core(exchange, name)
}

fn arrange_by_key_pipelined_named(&self, name: &str) -> Self::Output {
self.arrange_core(Pipeline, name)
}
}

pub trait ArrangeBySelfExt<K> {
type Output;

fn arrange_by_self_exchange<F>(&self, route: F) -> Self::Output
where
F: Fn(&K) -> u64 + 'static,
{
self.arrange_by_self_exchange_named("ArrangeBySelfExchange", route)
}

fn arrange_by_self_exchange_named<F>(&self, name: &str, route: F) -> Self::Output
where
F: Fn(&K) -> u64 + 'static;

fn arrange_by_self_pipelined(&self) -> Self::Output {
self.arrange_by_self_pipelined_named("ArrangeBySelfPipelined")
}

fn arrange_by_self_pipelined_named(&self, name: &str) -> Self::Output;
}

impl<S, K, R> ArrangeBySelfExt<K> for Collection<S, K, R>
where
S: Scope,
S::Timestamp: Lattice,
K: ExchangeData + Hashable,
R: Semigroup + ExchangeData,
{
type Output = Arranged<S, TraceAgent<OrdKeySpine<K, S::Timestamp, R>>>;

fn arrange_by_self_exchange_named<F>(&self, name: &str, route: F) -> Self::Output
where
F: Fn(&K) -> u64 + 'static,
{
let exchange = Exchange::new(move |((key, ()), _time, _diff)| route(key));
self.map(|key| (key, ())).arrange_core(exchange, name)
}

fn arrange_by_self_pipelined_named(&self, name: &str) -> Self::Output {
self.map(|key| (key, ())).arrange_core(Pipeline, name)
}
}
53 changes: 53 additions & 0 deletions rust/template/differential_datalog/src/dataflow/consolidate.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use differential_dataflow::{
difference::Semigroup,
lattice::Lattice,
operators::arrange::{Arrange, Arranged, TraceAgent},
trace::{implementations::ord::OrdKeySpine, layers::ordered::OrdOffset},
Collection, ExchangeData, Hashable,
};
use std::{
convert::{TryFrom, TryInto},
fmt::Debug,
};
use timely::dataflow::Scope;

pub trait ConsolidateExt<S, D, R, O>
where
S: Scope,
S::Timestamp: Lattice,
D: ExchangeData,
R: Semigroup + ExchangeData,
O: OrdOffset,
<O as TryFrom<usize>>::Error: Debug,
<O as TryInto<usize>>::Error: Debug,
{
/// A `.consolidate()` that returns its internal arrangement
fn consolidate_arranged(&self) -> Arranged<S, TraceAgent<OrdKeySpine<D, S::Timestamp, R, O>>> {
self.consolidate_arranged_named("ConsolidateArranged")
}

/// The same as `.consolidate_arranged()` but with the ability to name the operator.
fn consolidate_arranged_named(
&self,
name: &str,
) -> Arranged<S, TraceAgent<OrdKeySpine<D, S::Timestamp, R, O>>>;
}

impl<S, D, R, O> ConsolidateExt<S, D, R, O> for Collection<S, D, R>
where
S: Scope,
S::Timestamp: Lattice,
D: ExchangeData + Hashable,
R: Semigroup + ExchangeData,
O: OrdOffset,
<O as TryFrom<usize>>::Error: Debug,
<O as TryInto<usize>>::Error: Debug,
{
fn consolidate_arranged_named(
&self,
name: &str,
) -> Arranged<S, TraceAgent<OrdKeySpine<D, S::Timestamp, R, O>>> {
// TODO: Name this map?
self.map(|key| (key, ())).arrange_named(name)
}
}
69 changes: 69 additions & 0 deletions rust/template/differential_datalog/src/dataflow/distinct.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use crate::dataflow::{ConsolidateExt, MapExt};
use differential_dataflow::{
difference::Abelian,
lattice::Lattice,
operators::{
arrange::{ArrangeBySelf, Arranged, TraceAgent},
Reduce,
},
trace::{implementations::ord::OrdKeySpine, layers::ordered::OrdOffset},
Collection, ExchangeData, Hashable,
};
use std::{
convert::{TryFrom, TryInto},
fmt::Debug,
ops::Add,
};
use timely::dataflow::Scope;

/// An alternative implementation of `distinct`.
///
/// The implementation of `distinct` in differential dataflow maintains both its input and output
/// arrangements. This implementation, suggested by @frankmcsherry instead uses a single
/// arrangement that produces the number of "surplus" records, which are then subtracted from the
/// input to get an output with distinct records. This has the advantage that for keys that are
/// already distinct, there is no additional memory used in the output (nothing to subtract). It
/// has the downside that if the input changes a lot, the output may have more changes (to track
/// the input changes) than if it just recorded distinct records (which is pretty stable).
pub fn diff_distinct<S, D, R, O>(
relation_name: &str,
collection: &Collection<S, D, R>,
) -> Arranged<S, TraceAgent<OrdKeySpine<D, S::Timestamp, R, O>>>
where
S: Scope,
S::Timestamp: Lattice,
D: ExchangeData + Hashable,
R: Abelian + ExchangeData + Add<Output = R> + From<i8>,
O: OrdOffset,
<O as TryFrom<usize>>::Error: Debug,
<O as TryInto<usize>>::Error: Debug,
{
// For each value with weight w != 1, compute an adjustment record with the same value and
// weight (1-w)
// TODO: What happens when negative weights get into this?
let negated = collection
.arrange_by_self_named(&format!(
"ArrangeBySelf: DiffDistinct for {}",
relation_name
))
.reduce_named(
&format!("Reduce: DiffDistinct for {}", relation_name),
|_, src, dst| {
// If the input weight is 1, don't produce a surplus record.
if src[0].1 != R::from(1) {
dst.push(((), R::from(1) + src[0].1.clone().neg()))
}
},
)
.map_named(&format!("Map: DiffDistinct for {}", relation_name), |x| x.0);

collection
// TODO: `.concat_named()`?
.concat(&negated)
// We directly return the consolidation arrangement,
// allowing us to potentially skip re-arranging it later
.consolidate_arranged_named(&format!(
"ConsolidateArranged: DiffDistinct for {}",
relation_name,
))
}
69 changes: 69 additions & 0 deletions rust/template/differential_datalog/src/dataflow/filter_map.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
use differential_dataflow::{collection::AsCollection, difference::Semigroup, Collection};
use timely::{
dataflow::{channels::pact::Pipeline, operators::Operator, Scope, Stream},
Data,
};

pub trait FilterMap<D, D2> {
type Output;

fn filter_map<L>(&self, logic: L) -> Self::Output
where
L: FnMut(D) -> Option<D2> + 'static,
{
self.filter_map_named("FilterMap", logic)
}

fn filter_map_named<L>(&self, name: &str, logic: L) -> Self::Output
where
L: FnMut(D) -> Option<D2> + 'static;
}

impl<S, D, D2> FilterMap<D, D2> for Stream<S, D>
where
S: Scope,
D: Data,
D2: Data,
{
type Output = Stream<S, D2>;

fn filter_map_named<L>(&self, name: &str, mut logic: L) -> Self::Output
where
L: FnMut(D) -> Option<D2> + 'static,
{
let mut buffer = Vec::new();

self.unary(Pipeline, name, move |_capability, _info| {
move |input, output| {
input.for_each(|capability, data| {
data.swap(&mut buffer);

output
.session(&capability)
.give_iterator(buffer.drain(..).filter_map(|data| logic(data)));
});
}
})
}
}

impl<S, D, D2, R> FilterMap<D, D2> for Collection<S, D, R>
where
S: Scope,
D: Data,
D2: Data,
R: Semigroup,
{
type Output = Collection<S, D2, R>;

fn filter_map_named<L>(&self, name: &str, mut logic: L) -> Self::Output
where
L: FnMut(D) -> Option<D2> + 'static,
{
self.inner
.filter_map_named(name, move |(data, time, diff)| {
logic(data).map(|data| (data, time, diff))
})
.as_collection()
}
}
Loading