From 90c7e9f52493daa8fb448638983b07b3d33f90e7 Mon Sep 17 00:00:00 2001 From: Moritz Hoffmann Date: Mon, 5 Feb 2024 14:35:43 -0500 Subject: [PATCH] Probe only retains weak handle to Rc The probe operator writes into a shared frontier, but it does so even if there is no other reference to the shared frontier. This change adjusts the behavior to only write to the shared frontier while there exists another reference. Signed-off-by: Moritz Hoffmann --- timely/src/dataflow/operators/probe.rs | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/timely/src/dataflow/operators/probe.rs b/timely/src/dataflow/operators/probe.rs index 7c5a8567e..ad990cb79 100644 --- a/timely/src/dataflow/operators/probe.rs +++ b/timely/src/dataflow/operators/probe.rs @@ -76,7 +76,7 @@ pub trait Probe { /// } /// }).unwrap(); /// ``` - fn probe_with(&self, handle: &mut Handle) -> StreamCore; + fn probe_with(&self, handle: &Handle) -> StreamCore; } impl Probe for StreamCore { @@ -87,14 +87,14 @@ impl Probe for StreamCore { self.probe_with(&mut handle); handle } - fn probe_with(&self, handle: &mut Handle) -> StreamCore { + fn probe_with(&self, handle: &Handle) -> StreamCore { let mut builder = OperatorBuilder::new("Probe".to_owned(), self.scope()); let mut input = PullCounter::new(builder.new_input(self, Pipeline)); let (tee, stream) = builder.new_output(); let mut output = PushBuffer::new(PushCounter::new(tee)); - let shared_frontier = handle.frontier.clone(); + let shared_frontier = Rc::downgrade(&handle.frontier); let mut started = false; let mut vector = Default::default(); @@ -103,8 +103,10 @@ impl Probe for StreamCore { move |progress| { // surface all frontier changes to the shared frontier. - let mut borrow = shared_frontier.borrow_mut(); - borrow.update_iter(progress.frontiers[0].drain()); + if let Some(shared_frontier) = shared_frontier.upgrade() { + let mut borrow = shared_frontier.borrow_mut(); + borrow.update_iter(progress.frontiers[0].drain()); + } if !started { // discard initial capability.