Skip to content

Commit

Permalink
Apply changes from Timely TimelyDataflow/timely-dataflow#515
Browse files Browse the repository at this point in the history
Signed-off-by: Moritz Hoffmann <[email protected]>
  • Loading branch information
antiguru committed Mar 17, 2023
1 parent 6ae61ad commit d6521f6
Show file tree
Hide file tree
Showing 10 changed files with 22 additions and 22 deletions.
4 changes: 2 additions & 2 deletions dogsdogsdogs/examples/delta_query2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ fn main() {

use timely::dataflow::operators::unordered_input::UnorderedHandle;

let ((input1, capability1), data1): ((UnorderedHandle<Product<usize, usize>, ((usize, usize), Product<usize, usize>, isize)>, _), _) = inner.new_unordered_input();
let ((input2, capability2), data2): ((UnorderedHandle<Product<usize, usize>, ((usize, usize), Product<usize, usize>, isize)>, _), _) = inner.new_unordered_input();
let ((input1, capability1), data1): ((UnorderedHandle<Product<usize, usize>, Vec<((usize, usize), Product<usize, usize>, isize)>>, _), _) = inner.new_unordered_input();
let ((input2, capability2), data2): ((UnorderedHandle<Product<usize, usize>, Vec<((usize, usize), Product<usize, usize>, isize)>>, _), _) = inner.new_unordered_input();

let edges1 = data1.as_collection();
let edges2 = data2.as_collection();
Expand Down
4 changes: 2 additions & 2 deletions examples/capture-test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ pub mod kafka {
use differential_dataflow::lattice::Lattice;

/// Creates a Kafka source from supplied configuration information.
pub fn create_source<G, D, T, R>(scope: G, addr: &str, topic: &str, group: &str) -> (Box<dyn std::any::Any + Send + Sync>, Stream<G, (D, T, R)>)
pub fn create_source<G, D, T, R>(scope: G, addr: &str, topic: &str, group: &str) -> (Box<dyn std::any::Any + Send + Sync>, Stream<G, Vec<(D, T, R)>>)
where
G: Scope<Timestamp = T>,
D: ExchangeData + Hash + for<'a> serde::Deserialize<'a>,
Expand All @@ -166,7 +166,7 @@ pub mod kafka {
})
}

pub fn create_sink<G, D, T, R>(stream: &Stream<G, (D, T, R)>, addr: &str, topic: &str) -> Box<dyn std::any::Any>
pub fn create_sink<G, D, T, R>(stream: &Stream<G, Vec<(D, T, R)>>, addr: &str, topic: &str) -> Box<dyn std::any::Any>
where
G: Scope<Timestamp = T>,
D: ExchangeData + Hash + Serialize + for<'a> Deserialize<'a>,
Expand Down
4 changes: 2 additions & 2 deletions src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ pub mod source {
pub fn build<G, B, I, D, T, R>(
scope: G,
source_builder: B,
) -> (Box<dyn std::any::Any + Send + Sync>, Stream<G, (D, T, R)>)
) -> (Box<dyn std::any::Any + Send + Sync>, Stream<G, Vec<(D, T, R)>>)
where
G: Scope<Timestamp = T>,
B: FnOnce(SyncActivator) -> I,
Expand Down Expand Up @@ -606,7 +606,7 @@ pub mod sink {
/// performed before calling the method, the recorded output may not be correctly
/// reconstructed by readers.
pub fn build<G, BS, D, T, R>(
stream: &Stream<G, (D, T, R)>,
stream: &Stream<G, Vec<(D, T, R)>>,
sink_hash: u64,
updates_sink: Weak<RefCell<BS>>,
progress_sink: Weak<RefCell<BS>>,
Expand Down
6 changes: 3 additions & 3 deletions src/collection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub struct Collection<G: Scope, D, R: Semigroup = isize> {
///
/// This field is exposed to support direct timely dataflow manipulation when required, but it is
/// not intended to be the idiomatic way to work with the collection.
pub inner: Stream<G, (D, G::Timestamp, R)>
pub inner: Stream<G, Vec<(D, G::Timestamp, R)>>
}

impl<G: Scope, D: Data, R: Semigroup> Collection<G, D, R> where G::Timestamp: Data {
Expand All @@ -52,7 +52,7 @@ impl<G: Scope, D: Data, R: Semigroup> Collection<G, D, R> where G::Timestamp: Da
/// idiomatic approach to convert timely streams to collections. Also, the `input::Input` trait
/// provides a `new_collection` method which will create a new collection for you without exposing
/// the underlying timely stream at all.
pub fn new(stream: Stream<G, (D, G::Timestamp, R)>) -> Collection<G, D, R> {
pub fn new(stream: Stream<G, Vec<(D, G::Timestamp, R)>>) -> Collection<G, D, R> {
Collection { inner: stream }
}
/// Creates a new collection by applying the supplied function to each input element.
Expand Down Expand Up @@ -678,7 +678,7 @@ pub trait AsCollection<G: Scope, D: Data, R: Semigroup> {
fn as_collection(&self) -> Collection<G, D, R>;
}

impl<G: Scope, D: Data, R: Semigroup> AsCollection<G, D, R> for Stream<G, (D, G::Timestamp, R)> {
impl<G: Scope, D: Data, R: Semigroup> AsCollection<G, D, R> for Stream<G, Vec<(D, G::Timestamp, R)>> {
fn as_collection(&self) -> Collection<G, D, R> {
Collection::new(self.clone())
}
Expand Down
4 changes: 2 additions & 2 deletions src/input.rs
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ impl<G: TimelyInput> Input for G where <G as ScopeParent>::Timestamp: Lattice {
pub struct InputSession<T: Timestamp+Clone, D: Data, R: Semigroup> {
time: T,
buffer: Vec<(D, T, R)>,
handle: Handle<T,(D,T,R)>,
handle: Handle<T,Vec<(D,T,R)>>,
}

impl<T: Timestamp+Clone, D: Data> InputSession<T, D, isize> {
Expand Down Expand Up @@ -236,7 +236,7 @@ impl<T: Timestamp+Clone, D: Data, R: Semigroup> InputSession<T, D, R> {
}

/// Creates a new session from a reference to an input handle.
pub fn from(handle: Handle<T,(D,T,R)>) -> Self {
pub fn from(handle: Handle<T,Vec<(D,T,R)>>) -> Self {
InputSession {
time: handle.time().clone(),
buffer: Vec::new(),
Expand Down
12 changes: 6 additions & 6 deletions src/operators/arrange/arrangement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ where
/// This stream contains the same batches of updates the trace itself accepts, so there should
/// be no additional overhead to receiving these records. The batches can be navigated just as
/// the batches in the trace, by key and by value.
pub stream: Stream<G, Tr::Batch>,
pub stream: Stream<G, Vec<Tr::Batch>>,
/// A shared trace, updated by the `Arrange` operator and readable by others.
pub trace: Tr,
// TODO : We might have an `Option<Collection<G, (K, V)>>` here, which `as_collection` sets and
Expand Down Expand Up @@ -225,7 +225,7 @@ where
///
/// This method exists for streams of batches without the corresponding arrangement.
/// If you have the arrangement, its `flat_map_ref` method is equivalent to this.
pub fn flat_map_batches<I, L>(stream: &Stream<G, Tr::Batch>, mut logic: L) -> Collection<G, I::Item, Tr::R>
pub fn flat_map_batches<I, L>(stream: &Stream<G, Vec<Tr::Batch>>, mut logic: L) -> Collection<G, I::Item, Tr::R>
where
Tr::R: Semigroup,
I: IntoIterator,
Expand Down Expand Up @@ -259,7 +259,7 @@ where
///
/// This method consumes a stream of (key, time) queries and reports the corresponding stream of
/// (key, value, time, diff) accumulations in the `self` trace.
pub fn lookup(&self, queries: &Stream<G, (Tr::Key, G::Timestamp)>) -> Stream<G, (Tr::Key, Tr::Val, G::Timestamp, Tr::R)>
pub fn lookup(&self, queries: &Stream<G, Vec<(Tr::Key, G::Timestamp)>>) -> Stream<G, Vec<(Tr::Key, Tr::Val, G::Timestamp, Tr::R)>>
where
G::Timestamp: Data+Lattice+Ord+TotalOrder,
Tr::Key: ExchangeData+Hashable,
Expand Down Expand Up @@ -485,7 +485,7 @@ where
/// is the correct way to determine that times in the shared trace are committed.
fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
P: ParallelizationContract<G::Timestamp, ((K,V),G::Timestamp,R)>,
P: ParallelizationContract<G::Timestamp, Vec<((K,V),G::Timestamp,R)>>,
Tr: Trace+TraceReader<Key=K,Val=V,Time=G::Timestamp,R=R>+'static,
Tr::Batch: Batch,
;
Expand Down Expand Up @@ -522,7 +522,7 @@ where

fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
P: ParallelizationContract<G::Timestamp, ((K,V),G::Timestamp,R)>,
P: ParallelizationContract<G::Timestamp, Vec<((K,V),G::Timestamp,R)>>,
Tr: Trace+TraceReader<Key=K,Val=V,Time=G::Timestamp,R=R>+'static,
Tr::Batch: Batch,
{
Expand Down Expand Up @@ -689,7 +689,7 @@ where
{
fn arrange_core<P, Tr>(&self, pact: P, name: &str) -> Arranged<G, TraceAgent<Tr>>
where
P: ParallelizationContract<G::Timestamp, ((K,()),G::Timestamp,R)>,
P: ParallelizationContract<G::Timestamp, Vec<((K,()),G::Timestamp,R)>>,
Tr: Trace+TraceReader<Key=K, Val=(), Time=G::Timestamp, R=R>+'static,
Tr::Batch: Batch,
{
Expand Down
2 changes: 1 addition & 1 deletion src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ use super::TraceAgent;
/// understand what a "sequence" of upserts would mean for partially ordered
/// timestamps.
pub fn arrange_from_upsert<G, Tr>(
stream: &Stream<G, (Tr::Key, Option<Tr::Val>, G::Timestamp)>,
stream: &Stream<G, Vec<(Tr::Key, Option<Tr::Val>, G::Timestamp)>>,
name: &str,
) -> Arranged<G, TraceAgent<Tr>>
where
Expand Down
4 changes: 2 additions & 2 deletions src/operators/iterate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ impl<G: Scope, D: Ord+Data+Debug, R: Semigroup> Iterate<G, D, R> for G {
pub struct Variable<G: Scope, D: Data, R: Abelian>
where G::Timestamp: Lattice {
collection: Collection<G, D, R>,
feedback: Handle<G, (D, G::Timestamp, R)>,
feedback: Handle<G, Vec<(D, G::Timestamp, R)>>,
source: Option<Collection<G, D, R>>,
step: <G::Timestamp as Timestamp>::Summary,
}
Expand Down Expand Up @@ -235,7 +235,7 @@ impl<G: Scope, D: Data, R: Abelian> Deref for Variable<G, D, R> where G::Timesta
pub struct SemigroupVariable<G: Scope, D: Data, R: Semigroup>
where G::Timestamp: Lattice {
collection: Collection<G, D, R>,
feedback: Handle<G, (D, G::Timestamp, R)>,
feedback: Handle<G, Vec<(D, G::Timestamp, R)>>,
step: <G::Timestamp as Timestamp>::Summary,
}

Expand Down
2 changes: 1 addition & 1 deletion src/operators/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -689,7 +689,7 @@ where

/// Process keys until at least `fuel` output tuples produced, or the work is exhausted.
#[inline(never)]
fn work<L, I>(&mut self, output: &mut OutputHandle<T, (D, T, R), Tee<T, (D, T, R)>>, mut logic: L, fuel: &mut usize)
fn work<L, I>(&mut self, output: &mut OutputHandle<T, Vec<(D, T, R)>, Tee<T, Vec<(D, T, R)>>>, mut logic: L, fuel: &mut usize)
where I: IntoIterator<Item=(D, T, R)>, L: FnMut(&K, &C1::Val, &C2::Val, &T, &C1::R, &C2::R)->I {

let meet = self.capability.time();
Expand Down
2 changes: 1 addition & 1 deletion tests/import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use differential_dataflow::operators::reduce::Reduce;
use differential_dataflow::trace::TraceReader;
use itertools::Itertools;

type Result = std::sync::mpsc::Receiver<timely::dataflow::operators::capture::Event<usize, ((u64, i64), usize, i64)>>;
type Result = std::sync::mpsc::Receiver<timely::dataflow::operators::capture::Event<usize, Vec<((u64, i64), usize, i64)>>>;

fn run_test<T>(test: T, expected: Vec<(usize, Vec<((u64, i64), i64)>)>) -> ()
where T: FnOnce(Vec<Vec<((u64, u64), i64)>>)-> Result + ::std::panic::UnwindSafe
Expand Down

0 comments on commit d6521f6

Please sign in to comment.