Skip to content

Commit

Permalink
Changes to track timely's #597
Browse files Browse the repository at this point in the history
  • Loading branch information
frankmcsherry committed Nov 11, 2024
1 parent 0ccd434 commit c1b2106
Show file tree
Hide file tree
Showing 13 changed files with 36 additions and 100 deletions.
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ fnv="1.0.2"
timely = {workspace = true}

[workspace.dependencies]
timely = { version = "0.13", default-features = false }
#timely = { version = "0.13", default-features = false }
timely = { git = "https://github.com/TimelyDataflow/timely-dataflow", default-features = false }
#timely = { path = "../timely-dataflow/timely/", default-features = false }

[features]
Expand Down
2 changes: 1 addition & 1 deletion src/capture.rs
Original file line number Diff line number Diff line change
Expand Up @@ -600,7 +600,7 @@ pub mod sink {
}

// Now record the update to the writer.
send_queue.push_back(Message::Updates(updates.replace(Vec::new())));
send_queue.push_back(Message::Updates(std::mem::take(updates)));

// Transmit timestamp counts downstream.
output
Expand Down
6 changes: 2 additions & 4 deletions src/dynamic/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,20 @@ where
let (mut output, stream) = builder.new_output();
let mut input = builder.new_input_connection(&self.inner, Pipeline, vec![Antichain::from_elem(Product { outer: Default::default(), inner: PointStampSummary { retain: Some(level - 1), actions: Vec::new() } })]);

let mut vector = Default::default();
builder.build(move |_capability| move |_frontier| {
let mut output = output.activate();
input.for_each(|cap, data| {
data.swap(&mut vector);
let mut new_time = cap.time().clone();
let mut vec = std::mem::take(&mut new_time.inner).into_vec();
vec.truncate(level - 1);
new_time.inner = PointStamp::new(vec);
let new_cap = cap.delayed(&new_time);
for (_data, time, _diff) in vector.iter_mut() {
for (_data, time, _diff) in data.iter_mut() {
let mut vec = std::mem::take(&mut time.inner).into_vec();
vec.truncate(level - 1);
time.inner = PointStamp::new(vec);
}
output.session(&new_cap).give_container(&mut vector);
output.session(&new_cap).give_container(data);
});
});

Expand Down
4 changes: 1 addition & 3 deletions src/operators/arrange/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,6 @@ where

// Tracks the lower envelope of times in `priority_queue`.
let mut capabilities = Antichain::<Capability<G::Timestamp>>::new();
let mut buffer = Vec::new();
// Form the trace we will both use internally and publish.
let activator = Some(stream.scope().activator_for(info.address.clone()));
let mut empty_trace = Tr::new(info.clone(), logger.clone(), activator);
Expand All @@ -186,8 +185,7 @@ where
// Stash capabilities and associated data (ordered by time).
input.for_each(|cap, data| {
capabilities.insert(cap.retain());
data.swap(&mut buffer);
for (key, val, time) in buffer.drain(..) {
for (key, val, time) in data.drain(..) {
priority_queue.push(std::cmp::Reverse((time, key, val)))
}
});
Expand Down
4 changes: 1 addition & 3 deletions src/operators/consolidate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,9 @@ where
self.inner
.unary::<ConsolidatingContainerBuilder<_>, _, _, _>(Pipeline, "ConsolidateStream", |_cap, _info| {

let mut vector = Vec::new();
move |input, output| {
input.for_each(|time, data| {
data.swap(&mut vector);
output.session_with_builder(&time).give_iterator(vector.drain(..));
output.session_with_builder(&time).give_iterator(data.drain(..));
})
}
})
Expand Down
4 changes: 1 addition & 3 deletions src/operators/count.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,6 @@ where
fn count_total_core<R2: Semigroup + From<i8> + 'static>(&self) -> Collection<G, (K, T1::Diff), R2> {

let mut trace = self.trace.clone();
let mut buffer = Vec::new();

self.stream.unary_frontier(Pipeline, "CountTotal", move |_,_| {

Expand All @@ -87,8 +86,7 @@ where
if cap.is_none() { // NB: Assumes batches are in-order
cap = Some(capability.retain());
}
batches.swap(&mut buffer);
for batch in buffer.drain(..) {
for batch in batches.drain(..) {
upper_limit.clone_from(batch.upper()); // NB: Assumes batches are in-order
batch_cursors.push(batch.cursor());
batch_storage.push(batch);
Expand Down
10 changes: 2 additions & 8 deletions src/operators/join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -444,10 +444,6 @@ where
let mut trace1_option = Some(trace1);
let mut trace2_option = Some(trace2);

// Swappable buffers for input extraction.
let mut input1_buffer = Vec::new();
let mut input2_buffer = Vec::new();

move |input1, input2, output| {

// 1. Consuming input.
Expand All @@ -468,8 +464,7 @@ where
// This test *should* always pass, as we only drop a trace in response to the other input emptying.
if let Some(ref mut trace2) = trace2_option {
let capability = capability.retain();
data.swap(&mut input1_buffer);
for batch1 in input1_buffer.drain(..) {
for batch1 in data.drain(..) {
// Ignore any pre-loaded data.
if PartialOrder::less_equal(&acknowledged1, batch1.lower()) {
if !batch1.is_empty() {
Expand All @@ -496,8 +491,7 @@ where
// This test *should* always pass, as we only drop a trace in response to the other input emptying.
if let Some(ref mut trace1) = trace1_option {
let capability = capability.retain();
data.swap(&mut input2_buffer);
for batch2 in input2_buffer.drain(..) {
for batch2 in data.drain(..) {
// Ignore any pre-loaded data.
if PartialOrder::less_equal(&acknowledged2, batch2.lower()) {
if !batch2.is_empty() {
Expand Down
5 changes: 1 addition & 4 deletions src/operators/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -373,8 +373,6 @@ where
let mut output_upper = Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());
let mut output_lower = Antichain::from_elem(<G::Timestamp as timely::progress::Timestamp>::minimum());

let mut input_buffer = Vec::new();

let id = trace.stream.scope().index();

move |input, output| {
Expand Down Expand Up @@ -409,8 +407,7 @@ where
// times in the batch.
input.for_each(|capability, batches| {

batches.swap(&mut input_buffer);
for batch in input_buffer.drain(..) {
for batch in batches.drain(..) {
upper_limit.clone_from(batch.upper());
batch_cursors.push(batch.cursor());
batch_storage.push(batch);
Expand Down
4 changes: 1 addition & 3 deletions src/operators/threshold.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,6 @@ where
{

let mut trace = self.trace.clone();
let mut buffer = Vec::new();

self.stream.unary_frontier(Pipeline, "ThresholdTotal", move |_,_| {

Expand All @@ -134,8 +133,7 @@ where
if cap.is_none() { // NB: Assumes batches are in-order
cap = Some(capability.retain());
}
batches.swap(&mut buffer);
for batch in buffer.drain(..) {
for batch in batches.drain(..) {
upper_limit.clone_from(batch.upper()); // NB: Assumes batches are in-order
batch_cursors.push(batch.cursor());
batch_storage.push(batch);
Expand Down
81 changes: 19 additions & 62 deletions src/trace/implementations/chunker.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
//! Organize streams of data into sorted chunks.

use std::collections::VecDeque;
use timely::communication::message::RefOrMut;
use timely::Container;
use timely::container::columnation::{Columnation, TimelyStack};
use timely::container::{ContainerBuilder, PushInto, SizableContainer};
Expand Down Expand Up @@ -64,14 +63,14 @@ where
}
}

impl<'a, K, V, T, R> PushInto<RefOrMut<'a, Vec<((K, V), T, R)>>> for VecChunker<((K, V), T, R)>
impl<'a, K, V, T, R> PushInto<&'a mut Vec<((K, V), T, R)>> for VecChunker<((K, V), T, R)>
where
K: Ord + Clone,
V: Ord + Clone,
T: Ord + Clone,
R: Semigroup + Clone,
{
fn push_into(&mut self, container: RefOrMut<'a, Vec<((K, V), T, R)>>) {
fn push_into(&mut self, container: &'a mut Vec<((K, V), T, R)>) {
// Ensure `self.pending` has the desired capacity. We should never have a larger capacity
// because we don't write more than capacity elements into the buffer.
// Important: Consolidation requires `pending` to have twice the chunk capacity to
Expand All @@ -80,27 +79,11 @@ where
self.pending.reserve(Self::chunk_capacity() * 2 - self.pending.len());
}

// `container` is either a shared reference or an owned allocations.
match container {
RefOrMut::Ref(vec) => {
let mut slice = &vec[..];
while !slice.is_empty() {
let (head, tail) = slice.split_at(std::cmp::min(self.pending.capacity() - self.pending.len(), slice.len()));
slice = tail;
self.pending.extend_from_slice(head);
if self.pending.len() == self.pending.capacity() {
self.form_chunk();
}
}
}
RefOrMut::Mut(vec) => {
let mut drain = vec.drain(..).peekable();
while drain.peek().is_some() {
self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len()));
if self.pending.len() == self.pending.capacity() {
self.form_chunk();
}
}
let mut drain = container.drain(..).peekable();
while drain.peek().is_some() {
self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len()));
if self.pending.len() == self.pending.capacity() {
self.form_chunk();
}
}
}
Expand Down Expand Up @@ -196,41 +179,25 @@ where
}
}

impl<'a, K, V, T, R> PushInto<RefOrMut<'a, Vec<((K, V), T, R)>>> for ColumnationChunker<((K, V), T, R)>
impl<'a, K, V, T, R> PushInto<&'a mut Vec<((K, V), T, R)>> for ColumnationChunker<((K, V), T, R)>
where
K: Columnation + Ord + Clone,
V: Columnation + Ord + Clone,
T: Columnation + Ord + Clone,
R: Columnation + Semigroup + Clone,
{
fn push_into(&mut self, container: RefOrMut<'a, Vec<((K, V), T, R)>>) {
fn push_into(&mut self, container: &'a mut Vec<((K, V), T, R)>) {
// Ensure `self.pending` has the desired capacity. We should never have a larger capacity
// because we don't write more than capacity elements into the buffer.
if self.pending.capacity() < Self::chunk_capacity() * 2 {
self.pending.reserve(Self::chunk_capacity() * 2 - self.pending.len());
}

// `container` is either a shared reference or an owned allocations.
match container {
RefOrMut::Ref(vec) => {
let mut slice = &vec[..];
while !slice.is_empty() {
let (head, tail) = slice.split_at(std::cmp::min(self.pending.capacity() - self.pending.len(), slice.len()));
slice = tail;
self.pending.extend_from_slice(head);
if self.pending.len() == self.pending.capacity() {
self.form_chunk();
}
}
}
RefOrMut::Mut(vec) => {
let mut drain = vec.drain(..).peekable();
while drain.peek().is_some() {
self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len()));
if self.pending.len() == self.pending.capacity() {
self.form_chunk();
}
}
let mut drain = container.drain(..).peekable();
while drain.peek().is_some() {
self.pending.extend((&mut drain).take(self.pending.capacity() - self.pending.len()));
if self.pending.len() == self.pending.capacity() {
self.form_chunk();
}
}
}
Expand Down Expand Up @@ -288,15 +255,15 @@ where
}
}

impl<'a, Input, Output> PushInto<RefOrMut<'a, Input>> for ContainerChunker<Output>
impl<'a, Input, Output> PushInto<&'a mut Input> for ContainerChunker<Output>
where
Input: Container,
Output: SizableContainer
+ ConsolidateLayout
+ PushInto<Input::Item<'a>>
+ PushInto<Input::ItemRef<'a>>,
{
fn push_into(&mut self, container: RefOrMut<'a, Input>) {
fn push_into(&mut self, container: &'a mut Input) {
if self.pending.capacity() < Output::preferred_capacity() {
self.pending.reserve(Output::preferred_capacity() - self.pending.len());
}
Expand All @@ -313,19 +280,9 @@ where
}
}
};
match container {
RefOrMut::Ref(container) => {
for item in container.iter() {
self.pending.push(item);
form_batch(self);
}
}
RefOrMut::Mut(container) => {
for item in container.drain() {
self.pending.push(item);
form_batch(self);
}
}
for item in container.drain() {
self.pending.push(item);
form_batch(self);
}
}
}
Expand Down
5 changes: 2 additions & 3 deletions src/trace/implementations/merge_batcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
use std::collections::VecDeque;
use std::marker::PhantomData;

use timely::communication::message::RefOrMut;
use timely::logging::WorkerIdentifier;
use timely::logging_core::Logger;
use timely::progress::frontier::AntichainRef;
Expand Down Expand Up @@ -45,7 +44,7 @@ where

impl<Input, C, M, T> Batcher for MergeBatcher<Input, C, M, T>
where
C: ContainerBuilder<Container=M::Chunk> + Default + for<'a> PushInto<RefOrMut<'a, Input>>,
C: ContainerBuilder<Container=M::Chunk> + Default + for<'a> PushInto<&'a mut Input>,
M: Merger<Time = T>,
T: Timestamp,
{
Expand All @@ -69,7 +68,7 @@ where

/// Push a container of data into this merge batcher. Updates the internal chain structure if
/// needed.
fn push_container(&mut self, container: RefOrMut<Input>) {
fn push_container(&mut self, container: &mut Input) {
self.chunker.push_into(container);
while let Some(chunk) = self.chunker.extract() {
let chunk = std::mem::take(chunk);
Expand Down
3 changes: 1 addition & 2 deletions src/trace/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ pub mod description;
pub mod implementations;
pub mod wrappers;

use timely::communication::message::RefOrMut;
use timely::logging::WorkerIdentifier;
use timely::logging_core::Logger;
use timely::progress::{Antichain, frontier::AntichainRef};
Expand Down Expand Up @@ -315,7 +314,7 @@ pub trait Batcher {
/// Allocates a new empty batcher.
fn new(logger: Option<Logger<DifferentialEvent, WorkerIdentifier>>, operator_id: usize) -> Self;
/// Adds an unordered container of elements to the batcher.
fn push_container(&mut self, batch: RefOrMut<Self::Input>);
fn push_container(&mut self, batch: &mut Self::Input);
/// Returns all updates not greater or equal to an element of `upper`.
fn seal<B: Builder<Input=Self::Output, Time=Self::Time>>(&mut self, upper: Antichain<Self::Time>) -> B::Output;
/// Returns the lower envelope of contained update times.
Expand Down
5 changes: 2 additions & 3 deletions tests/trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,11 @@ fn get_trace() -> ValSpine<u64, u64, usize, i64> {
{
let mut batcher = <IntegerTrace as Trace>::Batcher::new(None, 0);

use timely::communication::message::RefOrMut;
batcher.push_container(RefOrMut::Mut(&mut vec![
batcher.push_container(&mut vec![
((1, 2), 0, 1),
((2, 3), 1, 1),
((2, 3), 2, -1),
]));
]);

let batch_ts = &[1, 2, 3];
let batches = batch_ts.iter().map(move |i| batcher.seal::<IntegerBuilder>(Antichain::from_elem(*i)));
Expand Down

0 comments on commit c1b2106

Please sign in to comment.