diff --git a/src/operators/arrange/arrangement.rs b/src/operators/arrange/arrangement.rs index 370d085a6..ef14f2b92 100644 --- a/src/operators/arrange/arrangement.rs +++ b/src/operators/arrange/arrangement.rs @@ -369,15 +369,6 @@ where Tr::Batch: Batch, Tr::Batcher: Batcher, ; - - /// Arranges updates into a shared trace, using a supplied parallelization contract, with a supplied name. - fn arrange_core(&self, pact: P, name: &str) -> Arranged> - where - P: ParallelizationContract, - Tr: Trace+'static, - Tr::Batch: Batch, - Tr::Batcher: Batcher, - ; } impl Arrange> for Collection @@ -395,17 +386,7 @@ where Tr::Batcher: Batcher>, { let exchange = Exchange::new(move |update: &((K,V),G::Timestamp,R)| (update.0).0.hashed().into()); - self.arrange_core(exchange, name) - } - - fn arrange_core(&self, pact: P, name: &str) -> Arranged> - where - P: ParallelizationContract>, - Tr: Trace+'static, - Tr::Batch: Batch, - Tr::Batcher: Batcher>, - { - arrange_core(&self.inner, pact, name) + arrange_core(&self.inner, exchange, name) } } @@ -583,18 +564,7 @@ where Tr::Batcher: Batcher>, { let exchange = Exchange::new(move |update: &((K,()),G::Timestamp,R)| (update.0).0.hashed().into()); - self.arrange_core(exchange, name) - } - - fn arrange_core(&self, pact: P, name: &str) -> Arranged> - where - P: ParallelizationContract>, - Tr: Trace+'static, - Tr::Batch: Batch, - Tr::Batcher: Batcher>, - { - self.map(|k| (k, ())) - .arrange_core(pact, name) + arrange_core(&self.map(|k| (k, ())).inner, exchange, name) } }