Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dataflow: Account for probe handles being dropped #490

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 33 additions & 6 deletions timely/src/dataflow/operators/probe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use std::rc::Rc;
use std::cell::RefCell;

use crate::progress::Timestamp;
use crate::order::PartialOrder;
use crate::progress::frontier::{AntichainRef, MutableAntichain};
use crate::dataflow::channels::pushers::CounterCore as PushCounter;
use crate::dataflow::channels::pushers::buffer::BufferCore as PushBuffer;
Expand Down Expand Up @@ -94,7 +95,7 @@ impl<G: Scope, D: Container> Probe<G, D> for StreamCore<G, D> {
let (tee, stream) = builder.new_output();
let mut output = PushBuffer::new(PushCounter::new(tee));

let shared_frontier = handle.frontier.clone();
let mut handle = handle.clone();
let mut started = false;

let mut vector = Default::default();
Expand All @@ -103,8 +104,7 @@ impl<G: Scope, D: Container> Probe<G, D> for StreamCore<G, D> {
move |progress| {

// surface all frontier changes to the shared frontier.
let mut borrow = shared_frontier.borrow_mut();
borrow.update_iter(progress.frontiers[0].drain());
handle.update_iter(progress.frontiers[0].drain());

if !started {
// discard initial capability.
Expand Down Expand Up @@ -139,7 +139,10 @@ impl<G: Scope, D: Container> Probe<G, D> for StreamCore<G, D> {
/// Reports information about progress at the probe.
#[derive(Debug)]
pub struct Handle<T:Timestamp> {
frontier: Rc<RefCell<MutableAntichain<T>>>
/// The overall shared frontier managed by all the handles
frontier: Rc<RefCell<MutableAntichain<T>>>,
/// The private frontier containing the changes produced by this handle only
handle_frontier: MutableAntichain<T>,
}

impl<T: Timestamp> Handle<T> {
Expand All @@ -150,7 +153,21 @@ impl<T: Timestamp> Handle<T> {
/// returns true iff the frontier is empty.
#[inline] pub fn done(&self) -> bool { self.frontier.borrow().is_empty() }
/// Allocates a new handle.
#[inline] pub fn new() -> Self { Handle { frontier: Rc::new(RefCell::new(MutableAntichain::new())) } }
#[inline] pub fn new() -> Self {
Handle {
frontier: Rc::new(RefCell::new(MutableAntichain::new())),
handle_frontier: MutableAntichain::new()
}
}

#[inline]
fn update_iter<I>(&mut self, updates: I)
where
T: Clone + PartialOrder + Ord,
I: IntoIterator<Item = (T, i64)>,
{
self.frontier.borrow_mut().update_iter(self.handle_frontier.update_iter(updates));
}

/// Invokes a method on the frontier, returning its result.
///
Expand All @@ -171,10 +188,20 @@ impl<T: Timestamp> Handle<T> {
}
}

impl<T: Timestamp> Drop for Handle<T> {
fn drop(&mut self) {
// This handle is being dropped so remove it from the overall calculation
self.frontier.borrow_mut().update_iter(
self.handle_frontier.frontier().iter().map(|t| (t.clone(), -1))
);
}
}

impl<T: Timestamp> Clone for Handle<T> {
fn clone(&self) -> Self {
Handle {
frontier: self.frontier.clone()
frontier: self.frontier.clone(),
handle_frontier: MutableAntichain::new(),
}
}
}
Expand Down