diff --git a/Cargo.toml b/Cargo.toml index 03e62168d..e575aa313 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,7 +46,8 @@ fnv="1.0.2" timely = {workspace = true} [workspace.dependencies] -timely = { version = "0.13", default-features = false } +#timely = { version = "0.13", default-features = false } +timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false } #timely = { path = "../timely-dataflow/timely/", default-features = false } [features] diff --git a/src/capture.rs b/src/capture.rs index d546ad4a2..049441950 100644 --- a/src/capture.rs +++ b/src/capture.rs @@ -600,7 +600,7 @@ pub mod sink { } // Now record the update to the writer. - send_queue.push_back(Message::Updates(updates.replace(Vec::new()))); + send_queue.push_back(Message::Updates(std::mem::take(updates))); // Transmit timestamp counts downstream. output diff --git a/src/dynamic/mod.rs b/src/dynamic/mod.rs index d13eadcd8..e1d4c4474 100644 --- a/src/dynamic/mod.rs +++ b/src/dynamic/mod.rs @@ -45,22 +45,20 @@ where let (mut output, stream) = builder.new_output(); let mut input = builder.new_input_connection(&self.inner, Pipeline, vec![Antichain::from_elem(Product { outer: Default::default(), inner: PointStampSummary { retain: Some(level - 1), actions: Vec::new() } })]); - let mut vector = Default::default(); builder.build(move |_capability| move |_frontier| { let mut output = output.activate(); input.for_each(|cap, data| { - data.swap(&mut vector); let mut new_time = cap.time().clone(); let mut vec = std::mem::take(&mut new_time.inner).into_vec(); vec.truncate(level - 1); new_time.inner = PointStamp::new(vec); let new_cap = cap.delayed(&new_time); - for (_data, time, _diff) in vector.iter_mut() { + for (_data, time, _diff) in data.iter_mut() { let mut vec = std::mem::take(&mut time.inner).into_vec(); vec.truncate(level - 1); time.inner = PointStamp::new(vec); } - output.session(&new_cap).give_container(&mut vector); + output.session(&new_cap).give_container(data); }); }); diff --git a/src/operators/arrange/upsert.rs b/src/operators/arrange/upsert.rs index 5d687856f..867bc055c 100644 --- a/src/operators/arrange/upsert.rs +++ b/src/operators/arrange/upsert.rs @@ -161,7 +161,6 @@ where // Tracks the lower envelope of times in `priority_queue`. let mut capabilities = Antichain::>::new(); - let mut buffer = Vec::new(); // Form the trace we will both use internally and publish. let activator = Some(stream.scope().activator_for(info.address.clone())); let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator); @@ -186,8 +185,7 @@ where // Stash capabilities and associated data (ordered by time). input.for_each(|cap, data| { capabilities.insert(cap.retain()); - data.swap(&mut buffer); - for (key, val, time) in buffer.drain(..) { + for (key, val, time) in data.drain(..) { priority_queue.push(std::cmp::Reverse((time, key, val))) } }); diff --git a/src/operators/consolidate.rs b/src/operators/consolidate.rs index 4d89d7a86..41824bd6f 100644 --- a/src/operators/consolidate.rs +++ b/src/operators/consolidate.rs @@ -97,11 +97,9 @@ where self.inner .unary::, _, _, _>(Pipeline, "ConsolidateStream", |_cap, _info| { - let mut vector = Vec::new(); move |input, output| { input.for_each(|time, data| { - data.swap(&mut vector); - output.session_with_builder(&time).give_iterator(vector.drain(..)); + output.session_with_builder(&time).give_iterator(data.drain(..)); }) } }) diff --git a/src/operators/count.rs b/src/operators/count.rs index b39bb8769..af310b664 100644 --- a/src/operators/count.rs +++ b/src/operators/count.rs @@ -65,7 +65,6 @@ where fn count_total_core + 'static>(&self) -> Collection { let mut trace = self.trace.clone(); - let mut buffer = Vec::new(); self.stream.unary_frontier(Pipeline, "CountTotal", move |_,_| { @@ -87,8 +86,7 @@ where if cap.is_none() { // NB: Assumes batches are in-order cap = Some(capability.retain()); } - batches.swap(&mut buffer); - for batch in buffer.drain(..) { + for batch in batches.drain(..) { upper_limit.clone_from(batch.upper()); // NB: Assumes batches are in-order batch_cursors.push(batch.cursor()); batch_storage.push(batch); diff --git a/src/operators/join.rs b/src/operators/join.rs index e5772d1c6..f029a4c3d 100644 --- a/src/operators/join.rs +++ b/src/operators/join.rs @@ -444,10 +444,6 @@ where let mut trace1_option = Some(trace1); let mut trace2_option = Some(trace2); - // Swappable buffers for input extraction. - let mut input1_buffer = Vec::new(); - let mut input2_buffer = Vec::new(); - move |input1, input2, output| { // 1. Consuming input. @@ -468,8 +464,7 @@ where // This test *should* always pass, as we only drop a trace in response to the other input emptying. if let Some(ref mut trace2) = trace2_option { let capability = capability.retain(); - data.swap(&mut input1_buffer); - for batch1 in input1_buffer.drain(..) { + for batch1 in data.drain(..) { // Ignore any pre-loaded data. if PartialOrder::less_equal(&acknowledged1, batch1.lower()) { if !batch1.is_empty() { @@ -496,8 +491,7 @@ where // This test *should* always pass, as we only drop a trace in response to the other input emptying. if let Some(ref mut trace1) = trace1_option { let capability = capability.retain(); - data.swap(&mut input2_buffer); - for batch2 in input2_buffer.drain(..) { + for batch2 in data.drain(..) { // Ignore any pre-loaded data. if PartialOrder::less_equal(&acknowledged2, batch2.lower()) { if !batch2.is_empty() { diff --git a/src/operators/reduce.rs b/src/operators/reduce.rs index a26f0072e..6b1017130 100644 --- a/src/operators/reduce.rs +++ b/src/operators/reduce.rs @@ -373,8 +373,6 @@ where let mut output_upper = Antichain::from_elem(::minimum()); let mut output_lower = Antichain::from_elem(::minimum()); - let mut input_buffer = Vec::new(); - let id = trace.stream.scope().index(); move |input, output| { @@ -409,8 +407,7 @@ where // times in the batch. input.for_each(|capability, batches| { - batches.swap(&mut input_buffer); - for batch in input_buffer.drain(..) { + for batch in batches.drain(..) { upper_limit.clone_from(batch.upper()); batch_cursors.push(batch.cursor()); batch_storage.push(batch); diff --git a/src/operators/threshold.rs b/src/operators/threshold.rs index 3825f0e1c..d2add618f 100644 --- a/src/operators/threshold.rs +++ b/src/operators/threshold.rs @@ -112,7 +112,6 @@ where { let mut trace = self.trace.clone(); - let mut buffer = Vec::new(); self.stream.unary_frontier(Pipeline, "ThresholdTotal", move |_,_| { @@ -134,8 +133,7 @@ where if cap.is_none() { // NB: Assumes batches are in-order cap = Some(capability.retain()); } - batches.swap(&mut buffer); - for batch in buffer.drain(..) { + for batch in batches.drain(..) { upper_limit.clone_from(batch.upper()); // NB: Assumes batches are in-order batch_cursors.push(batch.cursor()); batch_storage.push(batch); diff --git a/src/trace/implementations/chunker.rs b/src/trace/implementations/chunker.rs index 527a614d0..478480803 100644 --- a/src/trace/implementations/chunker.rs +++ b/src/trace/implementations/chunker.rs @@ -1,7 +1,6 @@ //! Organize streams of data into sorted chunks. use std::collections::VecDeque; -use timely::communication::message::RefOrMut; use timely::Container; use timely::container::columnation::{Columnation, TimelyStack}; use timely::container::{ContainerBuilder, PushInto, SizableContainer}; @@ -64,14 +63,14 @@ where } } -impl<'a, K, V, T, R> PushInto>> for VecChunker<((K, V), T, R)> +impl<'a, K, V, T, R> PushInto<&'a mut Vec<((K, V), T, R)>> for VecChunker<((K, V), T, R)> where K: Ord + Clone, V: Ord + Clone, T: Ord + Clone, R: Semigroup + Clone, { - fn push_into(&mut self, container: RefOrMut<'a, Vec<((K, V), T, R)>>) { + fn push_into(&mut self, container: &'a mut Vec<((K, V), T, R)>) { // Ensure `self.pending` has the desired capacity. We should never have a larger capacity // because we don't write more than capacity elements into the buffer. // Important: Consolidation requires `pending` to have twice the chunk capacity to @@ -80,27 +79,11 @@ where self.pending.reserve(Self::chunk_capacity() * 2 - self.pending.len()); } - // `container` is either a shared reference or an owned allocations. - match container { - RefOrMut::Ref(vec) => { - let mut slice = &vec[..]; - while !slice.is_empty() { - let (head, tail) = slice.split_at(std::cmp::min(self.pending.capacity() - self.pending.len(), slice.len())); - slice = tail; - self.pending.extend_from_slice(head); - if self.pending.len() == self.pending.capacity() { - self.form_chunk(); - } - } - } - RefOrMut::Mut(vec) => { - let mut drain = vec.drain(..).peekable(); - while drain.peek().is_some() { - self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len())); - if self.pending.len() == self.pending.capacity() { - self.form_chunk(); - } - } + let mut drain = container.drain(..).peekable(); + while drain.peek().is_some() { + self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len())); + if self.pending.len() == self.pending.capacity() { + self.form_chunk(); } } } @@ -196,41 +179,25 @@ where } } -impl<'a, K, V, T, R> PushInto>> for ColumnationChunker<((K, V), T, R)> +impl<'a, K, V, T, R> PushInto<&'a mut Vec<((K, V), T, R)>> for ColumnationChunker<((K, V), T, R)> where K: Columnation + Ord + Clone, V: Columnation + Ord + Clone, T: Columnation + Ord + Clone, R: Columnation + Semigroup + Clone, { - fn push_into(&mut self, container: RefOrMut<'a, Vec<((K, V), T, R)>>) { + fn push_into(&mut self, container: &'a mut Vec<((K, V), T, R)>) { // Ensure `self.pending` has the desired capacity. We should never have a larger capacity // because we don't write more than capacity elements into the buffer. if self.pending.capacity() < Self::chunk_capacity() * 2 { self.pending.reserve(Self::chunk_capacity() * 2 - self.pending.len()); } - // `container` is either a shared reference or an owned allocations. - match container { - RefOrMut::Ref(vec) => { - let mut slice = &vec[..]; - while !slice.is_empty() { - let (head, tail) = slice.split_at(std::cmp::min(self.pending.capacity() - self.pending.len(), slice.len())); - slice = tail; - self.pending.extend_from_slice(head); - if self.pending.len() == self.pending.capacity() { - self.form_chunk(); - } - } - } - RefOrMut::Mut(vec) => { - let mut drain = vec.drain(..).peekable(); - while drain.peek().is_some() { - self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len())); - if self.pending.len() == self.pending.capacity() { - self.form_chunk(); - } - } + let mut drain = container.drain(..).peekable(); + while drain.peek().is_some() { + self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len())); + if self.pending.len() == self.pending.capacity() { + self.form_chunk(); } } } @@ -288,7 +255,7 @@ where } } -impl<'a, Input, Output> PushInto> for ContainerChunker +impl<'a, Input, Output> PushInto<&'a mut Input> for ContainerChunker where Input: Container, Output: SizableContainer @@ -296,7 +263,7 @@ where + PushInto> + PushInto>, { - fn push_into(&mut self, container: RefOrMut<'a, Input>) { + fn push_into(&mut self, container: &'a mut Input) { if self.pending.capacity() < Output::preferred_capacity() { self.pending.reserve(Output::preferred_capacity() - self.pending.len()); } @@ -313,19 +280,9 @@ where } } }; - match container { - RefOrMut::Ref(container) => { - for item in container.iter() { - self.pending.push(item); - form_batch(self); - } - } - RefOrMut::Mut(container) => { - for item in container.drain() { - self.pending.push(item); - form_batch(self); - } - } + for item in container.drain() { + self.pending.push(item); + form_batch(self); } } } diff --git a/src/trace/implementations/merge_batcher.rs b/src/trace/implementations/merge_batcher.rs index cd4e7e72a..4e5c6cb16 100644 --- a/src/trace/implementations/merge_batcher.rs +++ b/src/trace/implementations/merge_batcher.rs @@ -3,7 +3,6 @@ use std::collections::VecDeque; use std::marker::PhantomData; -use timely::communication::message::RefOrMut; use timely::logging::WorkerIdentifier; use timely::logging_core::Logger; use timely::progress::frontier::AntichainRef; @@ -45,7 +44,7 @@ where impl Batcher for MergeBatcher where - C: ContainerBuilder + Default + for<'a> PushInto>, + C: ContainerBuilder + Default + for<'a> PushInto<&'a mut Input>, M: Merger